You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2020/05/18 16:38:18 UTC

[flink] 01/02: [hotfix][client] Make ConfigUtils.decodeListFromConfig return new array list and throw exception

This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit dea6dace89da6b7363a997667553c1da625dddd9
Author: wangyang0918 <da...@alibaba-inc.com>
AuthorDate: Mon May 18 20:15:56 2020 +0800

    [hotfix][client] Make ConfigUtils.decodeListFromConfig return new array list and throw exception
---
 .../flink/client/cli/ExecutionConfigAccessor.java     | 19 ++++---------------
 .../application/executors/EmbeddedExecutor.java       |  5 +++--
 .../client/deployment/executors/LocalExecutor.java    |  3 ++-
 .../deployment/executors/PipelineExecutorUtils.java   |  4 +++-
 .../ClassPathPackagedProgramRetrieverTest.java        |  3 ++-
 .../org/apache/flink/configuration/ConfigUtils.java   | 18 ++++++++++++------
 6 files changed, 26 insertions(+), 26 deletions(-)

diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java b/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java
index d1a93a8..3894a92 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java
@@ -19,7 +19,6 @@
 package org.apache.flink.client.cli;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigUtils;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
@@ -72,22 +71,12 @@ public class ExecutionConfigAccessor {
 		return baseConfiguration;
 	}
 
-	public List<URL> getJars() {
-		return decodeUrlList(configuration, PipelineOptions.JARS);
+	public List<URL> getJars() throws MalformedURLException {
+		return ConfigUtils.decodeListFromConfig(configuration, PipelineOptions.JARS, URL::new);
 	}
 
-	public List<URL> getClasspaths() {
-		return decodeUrlList(configuration, PipelineOptions.CLASSPATHS);
-	}
-
-	private List<URL> decodeUrlList(final Configuration configuration, final ConfigOption<List<String>> configOption) {
-		return ConfigUtils.decodeListFromConfig(configuration, configOption, url -> {
-			try {
-				return new URL(url);
-			} catch (MalformedURLException e) {
-				throw new IllegalArgumentException("Invalid URL", e);
-			}
-		});
+	public List<URL> getClasspaths() throws MalformedURLException {
+		return ConfigUtils.decodeListFromConfig(configuration, PipelineOptions.CLASSPATHS, URL::new);
 	}
 
 	public int getParallelism() {
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java
index 9c14e60..febf3ae 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java
@@ -38,6 +38,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.net.InetSocketAddress;
+import java.net.MalformedURLException;
 import java.util.Collection;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
@@ -81,7 +82,7 @@ public class EmbeddedExecutor implements PipelineExecutor {
 	}
 
 	@Override
-	public CompletableFuture<JobClient> execute(final Pipeline pipeline, final Configuration configuration) {
+	public CompletableFuture<JobClient> execute(final Pipeline pipeline, final Configuration configuration) throws MalformedURLException {
 		checkNotNull(pipeline);
 		checkNotNull(configuration);
 
@@ -101,7 +102,7 @@ public class EmbeddedExecutor implements PipelineExecutor {
 		return CompletableFuture.completedFuture(jobClientCreator.getJobClient(jobId));
 	}
 
-	private CompletableFuture<JobClient> submitAndGetJobClientFuture(final Pipeline pipeline, final Configuration configuration) {
+	private CompletableFuture<JobClient> submitAndGetJobClientFuture(final Pipeline pipeline, final Configuration configuration) throws MalformedURLException {
 		final Time timeout = Time.milliseconds(configuration.get(ExecutionOptions.EMBEDDED_RPC_TIMEOUT).toMillis());
 
 		final JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration);
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/LocalExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/LocalExecutor.java
index 401ea19..a64f030 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/LocalExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/LocalExecutor.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
 
+import java.net.MalformedURLException;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.Function;
 
@@ -80,7 +81,7 @@ public class LocalExecutor implements PipelineExecutor {
 		return PerJobMiniClusterFactory.createWithFactory(effectiveConfig, miniClusterFactory).submitJob(jobGraph);
 	}
 
-	private JobGraph getJobGraph(Pipeline pipeline, Configuration configuration) {
+	private JobGraph getJobGraph(Pipeline pipeline, Configuration configuration) throws MalformedURLException {
 		// This is a quirk in how LocalEnvironment used to work. It sets the default parallelism
 		// to <num taskmanagers> * <num task slots>. Might be questionable but we keep the behaviour
 		// for now.
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/PipelineExecutorUtils.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/PipelineExecutorUtils.java
index 7fec53d..0647091 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/PipelineExecutorUtils.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/PipelineExecutorUtils.java
@@ -28,6 +28,8 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 
 import javax.annotation.Nonnull;
 
+import java.net.MalformedURLException;
+
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -44,7 +46,7 @@ public class PipelineExecutorUtils {
 	 *                         savepoint settings used to bootstrap its state.
 	 * @return the corresponding {@link JobGraph}.
 	 */
-	public static JobGraph getJobGraph(@Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration) {
+	public static JobGraph getJobGraph(@Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration) throws MalformedURLException {
 		checkNotNull(pipeline);
 		checkNotNull(configuration);
 
diff --git a/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ClassPathPackagedProgramRetrieverTest.java b/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ClassPathPackagedProgramRetrieverTest.java
index 4101918..acdc6c2 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ClassPathPackagedProgramRetrieverTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ClassPathPackagedProgramRetrieverTest.java
@@ -48,6 +48,7 @@ import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
 import java.io.IOException;
+import java.net.MalformedURLException;
 import java.net.URL;
 import java.nio.file.Files;
 import java.nio.file.Path;
@@ -328,7 +329,7 @@ public class ClassPathPackagedProgramRetrieverTest extends TestLogger {
 			containsInAnyOrder(expectedURLs.stream().map(URL::toString).toArray()));
 	}
 
-	private JobGraph retrieveJobGraph(ClassPathPackagedProgramRetriever retrieverUnderTest, Configuration configuration) throws FlinkException, ProgramInvocationException {
+	private JobGraph retrieveJobGraph(ClassPathPackagedProgramRetriever retrieverUnderTest, Configuration configuration) throws FlinkException, ProgramInvocationException, MalformedURLException {
 		final PackagedProgram packagedProgram = retrieverUnderTest.getPackagedProgram();
 
 		final int defaultParallelism = configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM);
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigUtils.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigUtils.java
index bb7f470..117a316 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigUtils.java
@@ -19,13 +19,13 @@
 package org.apache.flink.configuration;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.function.FunctionWithException;
 
 import javax.annotation.Nullable;
 
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.List;
 import java.util.Objects;
 import java.util.function.Function;
@@ -108,19 +108,25 @@ public class ConfigUtils {
 	 * @param mapper the transformation function from {@code IN} to {@code OUT}.
 	 * @return the transformed values in a list of type {@code OUT}.
 	 */
-	public static <IN, OUT> List<OUT> decodeListFromConfig(
+	public static <IN, OUT, E extends Throwable> List<OUT> decodeListFromConfig(
 			final ReadableConfig configuration,
 			final ConfigOption<List<IN>> key,
-			final Function<IN, OUT> mapper) {
+			final FunctionWithException<IN, OUT, E> mapper) throws E {
 
 		checkNotNull(configuration);
 		checkNotNull(key);
 		checkNotNull(mapper);
 
 		final List<IN> encodedString = configuration.get(key);
-		return encodedString != null
-				? encodedString.stream().map(mapper).collect(Collectors.toList())
-				: Collections.emptyList();
+		if (encodedString == null || encodedString.isEmpty()) {
+			return new ArrayList<>();
+		}
+
+		final List<OUT> result = new ArrayList<>(encodedString.size());
+		for (IN input : encodedString) {
+			result.add(mapper.apply(input));
+		}
+		return result;
 	}
 
 	private ConfigUtils() {