You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by kk...@apache.org on 2020/05/18 16:38:18 UTC
[flink] 01/02: [hotfix][client] Make
ConfigUtils.decodeListFromConfig return new array list and throw exception
This is an automated email from the ASF dual-hosted git repository.
kkloudas pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
commit dea6dace89da6b7363a997667553c1da625dddd9
Author: wangyang0918 <da...@alibaba-inc.com>
AuthorDate: Mon May 18 20:15:56 2020 +0800
[hotfix][client] Make ConfigUtils.decodeListFromConfig return new array list and throw exception
---
.../flink/client/cli/ExecutionConfigAccessor.java | 19 ++++---------------
.../application/executors/EmbeddedExecutor.java | 5 +++--
.../client/deployment/executors/LocalExecutor.java | 3 ++-
.../deployment/executors/PipelineExecutorUtils.java | 4 +++-
.../ClassPathPackagedProgramRetrieverTest.java | 3 ++-
.../org/apache/flink/configuration/ConfigUtils.java | 18 ++++++++++++------
6 files changed, 26 insertions(+), 26 deletions(-)
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java b/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java
index d1a93a8..3894a92 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/ExecutionConfigAccessor.java
@@ -19,7 +19,6 @@
package org.apache.flink.client.cli;
import org.apache.flink.annotation.Internal;
-import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
@@ -72,22 +71,12 @@ public class ExecutionConfigAccessor {
return baseConfiguration;
}
- public List<URL> getJars() {
- return decodeUrlList(configuration, PipelineOptions.JARS);
+ public List<URL> getJars() throws MalformedURLException {
+ return ConfigUtils.decodeListFromConfig(configuration, PipelineOptions.JARS, URL::new);
}
- public List<URL> getClasspaths() {
- return decodeUrlList(configuration, PipelineOptions.CLASSPATHS);
- }
-
- private List<URL> decodeUrlList(final Configuration configuration, final ConfigOption<List<String>> configOption) {
- return ConfigUtils.decodeListFromConfig(configuration, configOption, url -> {
- try {
- return new URL(url);
- } catch (MalformedURLException e) {
- throw new IllegalArgumentException("Invalid URL", e);
- }
- });
+ public List<URL> getClasspaths() throws MalformedURLException {
+ return ConfigUtils.decodeListFromConfig(configuration, PipelineOptions.CLASSPATHS, URL::new);
}
public int getParallelism() {
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java
index 9c14e60..febf3ae 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/application/executors/EmbeddedExecutor.java
@@ -38,6 +38,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.InetSocketAddress;
+import java.net.MalformedURLException;
import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
@@ -81,7 +82,7 @@ public class EmbeddedExecutor implements PipelineExecutor {
}
@Override
- public CompletableFuture<JobClient> execute(final Pipeline pipeline, final Configuration configuration) {
+ public CompletableFuture<JobClient> execute(final Pipeline pipeline, final Configuration configuration) throws MalformedURLException {
checkNotNull(pipeline);
checkNotNull(configuration);
@@ -101,7 +102,7 @@ public class EmbeddedExecutor implements PipelineExecutor {
return CompletableFuture.completedFuture(jobClientCreator.getJobClient(jobId));
}
- private CompletableFuture<JobClient> submitAndGetJobClientFuture(final Pipeline pipeline, final Configuration configuration) {
+ private CompletableFuture<JobClient> submitAndGetJobClientFuture(final Pipeline pipeline, final Configuration configuration) throws MalformedURLException {
final Time timeout = Time.milliseconds(configuration.get(ExecutionOptions.EMBEDDED_RPC_TIMEOUT).toMillis());
final JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration);
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/LocalExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/LocalExecutor.java
index 401ea19..a64f030 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/LocalExecutor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/LocalExecutor.java
@@ -32,6 +32,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import java.net.MalformedURLException;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
@@ -80,7 +81,7 @@ public class LocalExecutor implements PipelineExecutor {
return PerJobMiniClusterFactory.createWithFactory(effectiveConfig, miniClusterFactory).submitJob(jobGraph);
}
- private JobGraph getJobGraph(Pipeline pipeline, Configuration configuration) {
+ private JobGraph getJobGraph(Pipeline pipeline, Configuration configuration) throws MalformedURLException {
// This is a quirk in how LocalEnvironment used to work. It sets the default parallelism
// to <num taskmanagers> * <num task slots>. Might be questionable but we keep the behaviour
// for now.
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/PipelineExecutorUtils.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/PipelineExecutorUtils.java
index 7fec53d..0647091 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/PipelineExecutorUtils.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/executors/PipelineExecutorUtils.java
@@ -28,6 +28,8 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
import javax.annotation.Nonnull;
+import java.net.MalformedURLException;
+
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
@@ -44,7 +46,7 @@ public class PipelineExecutorUtils {
* savepoint settings used to bootstrap its state.
* @return the corresponding {@link JobGraph}.
*/
- public static JobGraph getJobGraph(@Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration) {
+ public static JobGraph getJobGraph(@Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration) throws MalformedURLException {
checkNotNull(pipeline);
checkNotNull(configuration);
diff --git a/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ClassPathPackagedProgramRetrieverTest.java b/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ClassPathPackagedProgramRetrieverTest.java
index 4101918..acdc6c2 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ClassPathPackagedProgramRetrieverTest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/deployment/application/ClassPathPackagedProgramRetrieverTest.java
@@ -48,6 +48,7 @@ import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.IOException;
+import java.net.MalformedURLException;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -328,7 +329,7 @@ public class ClassPathPackagedProgramRetrieverTest extends TestLogger {
containsInAnyOrder(expectedURLs.stream().map(URL::toString).toArray()));
}
- private JobGraph retrieveJobGraph(ClassPathPackagedProgramRetriever retrieverUnderTest, Configuration configuration) throws FlinkException, ProgramInvocationException {
+ private JobGraph retrieveJobGraph(ClassPathPackagedProgramRetriever retrieverUnderTest, Configuration configuration) throws FlinkException, ProgramInvocationException, MalformedURLException {
final PackagedProgram packagedProgram = retrieverUnderTest.getPackagedProgram();
final int defaultParallelism = configuration.getInteger(CoreOptions.DEFAULT_PARALLELISM);
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigUtils.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigUtils.java
index bb7f470..117a316 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigUtils.java
@@ -19,13 +19,13 @@
package org.apache.flink.configuration;
import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.function.FunctionWithException;
import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
-import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.function.Function;
@@ -108,19 +108,25 @@ public class ConfigUtils {
* @param mapper the transformation function from {@code IN} to {@code OUT}.
* @return the transformed values in a list of type {@code OUT}.
*/
- public static <IN, OUT> List<OUT> decodeListFromConfig(
+ public static <IN, OUT, E extends Throwable> List<OUT> decodeListFromConfig(
final ReadableConfig configuration,
final ConfigOption<List<IN>> key,
- final Function<IN, OUT> mapper) {
+ final FunctionWithException<IN, OUT, E> mapper) throws E {
checkNotNull(configuration);
checkNotNull(key);
checkNotNull(mapper);
final List<IN> encodedString = configuration.get(key);
- return encodedString != null
- ? encodedString.stream().map(mapper).collect(Collectors.toList())
- : Collections.emptyList();
+ if (encodedString == null || encodedString.isEmpty()) {
+ return new ArrayList<>();
+ }
+
+ final List<OUT> result = new ArrayList<>(encodedString.size());
+ for (IN input : encodedString) {
+ result.add(mapper.apply(input));
+ }
+ return result;
}
private ConfigUtils() {