You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/07/19 14:52:17 UTC
[1/4] flink git commit: [FLINK-7102] improve ClassLoaderITCase
Repository: flink
Updated Branches:
refs/heads/master 8a43b9c0b -> 9beccec45
[FLINK-7102] improve ClassLoaderITCase
* ClassLoaderITCase unnecessarily runs multiple tests in a single test case
* ClassLoaderITCase#testDisposeSavepointWithCustomKvState() does not cancel its
job (thus the order of execution of test cases defines the outcome)
* ClassLoaderITCase uses e.getCause().getCause() which may cause
{{NullPointerException}}s hiding the original error
This closes #4255.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5ac1324b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5ac1324b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5ac1324b
Branch: refs/heads/master
Commit: 5ac1324b9eb89afb87de7810e7bd4b1209b544b6
Parents: 8a43b9c
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Tue Jul 4 13:49:55 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Wed Jul 19 11:33:35 2017 +0200
----------------------------------------------------------------------
.../test/classloading/ClassLoaderITCase.java | 255 +++++++++++--------
1 file changed, 145 insertions(+), 110 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/5ac1324b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
index 31b6bcc..98bb0ea 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.client.JobCancellationException;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.messages.JobManagerMessages;
@@ -42,9 +43,13 @@ import org.apache.flink.test.util.SuccessException;
import org.apache.flink.test.util.TestEnvironment;
import org.apache.flink.util.TestLogger;
+import org.hamcrest.Matchers;
import org.junit.AfterClass;
import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -61,8 +66,11 @@ import scala.concurrent.Future;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;
-import static org.junit.Assert.assertEquals;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.isA;
+import static org.hamcrest.Matchers.hasProperty;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
/**
* Test job classloader.
@@ -87,16 +95,18 @@ public class ClassLoaderITCase extends TestLogger {
private static final String CHECKPOINTING_CUSTOM_KV_STATE_JAR_PATH = "checkpointing_custom_kv_state-test-jar.jar";
+ @ClassRule
public static final TemporaryFolder FOLDER = new TemporaryFolder();
+ @Rule
+ public ExpectedException expectedException = ExpectedException.none();
+
private static TestingCluster testCluster;
private static int parallelism;
@BeforeClass
public static void setUp() throws Exception {
- FOLDER.create();
-
Configuration config = new Configuration();
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
@@ -121,133 +131,150 @@ public class ClassLoaderITCase extends TestLogger {
testCluster.shutdown();
}
- FOLDER.delete();
-
TestStreamEnvironment.unsetAsContext();
TestEnvironment.unsetAsContext();
}
@Test
- public void testJobsWithCustomClassLoader() throws IOException, ProgramInvocationException {
- try {
- int port = testCluster.getLeaderRPCPort();
+ public void testCustomSplitJobWithCustomClassLoaderJar() throws IOException, ProgramInvocationException {
- PackagedProgram inputSplitTestProg = new PackagedProgram(new File(INPUT_SPLITS_PROG_JAR_FILE));
+ PackagedProgram inputSplitTestProg = new PackagedProgram(new File(INPUT_SPLITS_PROG_JAR_FILE));
- TestEnvironment.setAsContext(
- testCluster,
- parallelism,
- Collections.singleton(new Path(INPUT_SPLITS_PROG_JAR_FILE)),
- Collections.<URL>emptyList());
+ TestEnvironment.setAsContext(
+ testCluster,
+ parallelism,
+ Collections.singleton(new Path(INPUT_SPLITS_PROG_JAR_FILE)),
+ Collections.<URL>emptyList());
- inputSplitTestProg.invokeInteractiveModeForExecution();
+ inputSplitTestProg.invokeInteractiveModeForExecution();
+ }
- PackagedProgram streamingInputSplitTestProg = new PackagedProgram(new File(STREAMING_INPUT_SPLITS_PROG_JAR_FILE));
+ @Test
+ public void testStreamingCustomSplitJobWithCustomClassLoader() throws IOException, ProgramInvocationException {
+ PackagedProgram streamingInputSplitTestProg = new PackagedProgram(new File(STREAMING_INPUT_SPLITS_PROG_JAR_FILE));
- TestStreamEnvironment.setAsContext(
- testCluster,
- parallelism,
- Collections.singleton(new Path(STREAMING_INPUT_SPLITS_PROG_JAR_FILE)),
- Collections.<URL>emptyList());
+ TestStreamEnvironment.setAsContext(
+ testCluster,
+ parallelism,
+ Collections.singleton(new Path(STREAMING_INPUT_SPLITS_PROG_JAR_FILE)),
+ Collections.<URL>emptyList());
- streamingInputSplitTestProg.invokeInteractiveModeForExecution();
+ streamingInputSplitTestProg.invokeInteractiveModeForExecution();
+ }
- URL classpath = new File(INPUT_SPLITS_PROG_JAR_FILE).toURI().toURL();
- PackagedProgram inputSplitTestProg2 = new PackagedProgram(new File(INPUT_SPLITS_PROG_JAR_FILE));
+ @Test
+ public void testCustomSplitJobWithCustomClassLoaderPath() throws IOException, ProgramInvocationException {
+ URL classpath = new File(INPUT_SPLITS_PROG_JAR_FILE).toURI().toURL();
+ PackagedProgram inputSplitTestProg2 = new PackagedProgram(new File(INPUT_SPLITS_PROG_JAR_FILE));
- TestEnvironment.setAsContext(
- testCluster,
- parallelism,
- Collections.<Path>emptyList(),
- Collections.singleton(classpath));
+ TestEnvironment.setAsContext(
+ testCluster,
+ parallelism,
+ Collections.<Path>emptyList(),
+ Collections.singleton(classpath));
- inputSplitTestProg2.invokeInteractiveModeForExecution();
+ inputSplitTestProg2.invokeInteractiveModeForExecution();
+ }
- // regular streaming job
- PackagedProgram streamingProg = new PackagedProgram(new File(STREAMING_PROG_JAR_FILE));
+ @Test
+ public void testStreamingClassloaderJobWithCustomClassLoader() throws IOException, ProgramInvocationException {
+ // regular streaming job
+ PackagedProgram streamingProg = new PackagedProgram(new File(STREAMING_PROG_JAR_FILE));
- TestStreamEnvironment.setAsContext(
- testCluster,
- parallelism,
- Collections.singleton(new Path(STREAMING_PROG_JAR_FILE)),
- Collections.<URL>emptyList());
+ TestStreamEnvironment.setAsContext(
+ testCluster,
+ parallelism,
+ Collections.singleton(new Path(STREAMING_PROG_JAR_FILE)),
+ Collections.<URL>emptyList());
- streamingProg.invokeInteractiveModeForExecution();
+ streamingProg.invokeInteractiveModeForExecution();
+ }
- // checkpointed streaming job with custom classes for the checkpoint (FLINK-2543)
- // the test also ensures that user specific exceptions are serializable between JobManager <--> JobClient.
- try {
- PackagedProgram streamingCheckpointedProg = new PackagedProgram(new File(STREAMING_CHECKPOINTED_PROG_JAR_FILE));
+ @Test
+ public void testCheckpointedStreamingClassloaderJobWithCustomClassLoader() throws IOException, ProgramInvocationException {
+ // checkpointed streaming job with custom classes for the checkpoint (FLINK-2543)
+ // the test also ensures that user specific exceptions are serializable between JobManager <--> JobClient.
+ PackagedProgram streamingCheckpointedProg = new PackagedProgram(new File(STREAMING_CHECKPOINTED_PROG_JAR_FILE));
- TestStreamEnvironment.setAsContext(
- testCluster,
- parallelism,
- Collections.singleton(new Path(STREAMING_CHECKPOINTED_PROG_JAR_FILE)),
- Collections.<URL>emptyList());
+ TestStreamEnvironment.setAsContext(
+ testCluster,
+ parallelism,
+ Collections.singleton(new Path(STREAMING_CHECKPOINTED_PROG_JAR_FILE)),
+ Collections.<URL>emptyList());
+
+ // Program should terminate with a 'SuccessException':
+ // we can not access the SuccessException here when executing the tests with maven, because its not available in the jar.
+ expectedException.expectCause(
+ Matchers.<Throwable>hasProperty("cause",
+ hasProperty("class",
+ hasProperty("canonicalName", equalTo(
+ "org.apache.flink.test.classloading.jar.CheckpointedStreamingProgram.SuccessException")))));
+
+ streamingCheckpointedProg.invokeInteractiveModeForExecution();
+ }
- streamingCheckpointedProg.invokeInteractiveModeForExecution();
- } catch (Exception e) {
- // we can not access the SuccessException here when executing the tests with maven, because its not available in the jar.
- assertEquals("Program should terminate with a 'SuccessException'",
- "org.apache.flink.test.classloading.jar.CheckpointedStreamingProgram.SuccessException",
- e.getCause().getCause().getClass().getCanonicalName());
- }
+ @Test
+ public void testKMeansJobWithCustomClassLoader() throws IOException, ProgramInvocationException {
+ PackagedProgram kMeansProg = new PackagedProgram(
+ new File(KMEANS_JAR_PATH),
+ new String[] {
+ KMeansData.DATAPOINTS,
+ KMeansData.INITIAL_CENTERS,
+ "25"
+ });
+
+ TestEnvironment.setAsContext(
+ testCluster,
+ parallelism,
+ Collections.singleton(new Path(KMEANS_JAR_PATH)),
+ Collections.<URL>emptyList());
- PackagedProgram kMeansProg = new PackagedProgram(
- new File(KMEANS_JAR_PATH),
- new String[] {
- KMeansData.DATAPOINTS,
- KMeansData.INITIAL_CENTERS,
- "25"
- });
-
- TestEnvironment.setAsContext(
- testCluster,
- parallelism,
- Collections.singleton(new Path(KMEANS_JAR_PATH)),
- Collections.<URL>emptyList());
-
- kMeansProg.invokeInteractiveModeForExecution();
-
- // test FLINK-3633
- final PackagedProgram userCodeTypeProg = new PackagedProgram(
- new File(USERCODETYPE_JAR_PATH),
- new String[] { USERCODETYPE_JAR_PATH,
- "localhost",
- String.valueOf(port),
- });
-
- TestEnvironment.setAsContext(
- testCluster,
- parallelism,
- Collections.singleton(new Path(USERCODETYPE_JAR_PATH)),
- Collections.<URL>emptyList());
-
- userCodeTypeProg.invokeInteractiveModeForExecution();
-
- File checkpointDir = FOLDER.newFolder();
- File outputDir = FOLDER.newFolder();
-
- final PackagedProgram program = new PackagedProgram(
- new File(CHECKPOINTING_CUSTOM_KV_STATE_JAR_PATH),
- new String[] {
- checkpointDir.toURI().toString(),
- outputDir.toURI().toString()
- });
-
- TestStreamEnvironment.setAsContext(
- testCluster,
- parallelism,
- Collections.singleton(new Path(CHECKPOINTING_CUSTOM_KV_STATE_JAR_PATH)),
- Collections.<URL>emptyList());
-
- program.invokeInteractiveModeForExecution();
-
- } catch (Exception e) {
- if (!(e.getCause().getCause() instanceof SuccessException)) {
- throw e;
- }
- }
+ kMeansProg.invokeInteractiveModeForExecution();
+ }
+
+ @Test
+ public void testUserCodeTypeJobWithCustomClassLoader() throws IOException, ProgramInvocationException {
+ int port = testCluster.getLeaderRPCPort();
+
+ // test FLINK-3633
+ final PackagedProgram userCodeTypeProg = new PackagedProgram(
+ new File(USERCODETYPE_JAR_PATH),
+ new String[] { USERCODETYPE_JAR_PATH,
+ "localhost",
+ String.valueOf(port),
+ });
+
+ TestEnvironment.setAsContext(
+ testCluster,
+ parallelism,
+ Collections.singleton(new Path(USERCODETYPE_JAR_PATH)),
+ Collections.<URL>emptyList());
+
+ userCodeTypeProg.invokeInteractiveModeForExecution();
+ }
+
+ @Test
+ public void testCheckpointingCustomKvStateJobWithCustomClassLoader() throws IOException, ProgramInvocationException {
+ File checkpointDir = FOLDER.newFolder();
+ File outputDir = FOLDER.newFolder();
+
+ final PackagedProgram program = new PackagedProgram(
+ new File(CHECKPOINTING_CUSTOM_KV_STATE_JAR_PATH),
+ new String[] {
+ checkpointDir.toURI().toString(),
+ outputDir.toURI().toString()
+ });
+
+ TestStreamEnvironment.setAsContext(
+ testCluster,
+ parallelism,
+ Collections.singleton(new Path(CHECKPOINTING_CUSTOM_KV_STATE_JAR_PATH)),
+ Collections.<URL>emptyList());
+
+ expectedException.expectCause(
+ Matchers.<Throwable>hasProperty("cause", isA(SuccessException.class)));
+
+ program.invokeInteractiveModeForExecution();
}
/**
@@ -283,7 +310,10 @@ public class ClassLoaderITCase extends TestLogger {
try {
program.invokeInteractiveModeForExecution();
} catch (ProgramInvocationException ignored) {
- ignored.printStackTrace();
+ if (ignored.getCause() == null ||
+ !(ignored.getCause() instanceof JobCancellationException)) {
+ ignored.printStackTrace();
+ }
}
}
});
@@ -356,5 +386,10 @@ public class ClassLoaderITCase extends TestLogger {
} else {
throw new IllegalStateException("Unexpected response to DisposeSavepoint");
}
+
+ // Cancel job, wait for success
+ Future<?> cancelFuture = jm.ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft());
+ Object response = Await.result(cancelFuture, deadline.timeLeft());
+ assertTrue("Unexpected response: " + response, response instanceof JobManagerMessages.CancellationSuccess);
}
}
[2/4] flink git commit: [FLINK-7222] [kafka] Fix invalid symbol *
when create directory on windows
Posted by ch...@apache.org.
[FLINK-7222] [kafka] Fix invalid symbol * when create directory on windows
This closes #4361.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2ee6ff59
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2ee6ff59
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2ee6ff59
Branch: refs/heads/master
Commit: 2ee6ff59819b6a8f9035fd0abeb5ffedc8052f4b
Parents: 5ac1324
Author: zjureel <zj...@gmail.com>
Authored: Wed Jul 19 09:44:53 2017 +0800
Committer: zentol <ch...@apache.org>
Committed: Wed Jul 19 11:35:16 2017 +0200
----------------------------------------------------------------------
.../flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java | 2 +-
.../flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java | 2 +-
2 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/2ee6ff59/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index c7e793a..051d91e 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -223,7 +223,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + (UUID.randomUUID().toString()));
assertTrue("cannot create zookeeper temp dir", tmpZkDir.mkdirs());
- tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir*" + (UUID.randomUUID().toString()));
+ tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir-" + (UUID.randomUUID().toString()));
assertTrue("cannot create kafka temp dir", tmpKafkaParent.mkdirs());
tmpKafkaDirs = new ArrayList<>(numKafkaServers);
http://git-wip-us.apache.org/repos/asf/flink/blob/2ee6ff59/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index ab82ef3..710d917 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -214,7 +214,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + (UUID.randomUUID().toString()));
assertTrue("cannot create zookeeper temp dir", tmpZkDir.mkdirs());
- tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir*" + (UUID.randomUUID().toString()));
+ tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir-" + (UUID.randomUUID().toString()));
assertTrue("cannot create kafka temp dir", tmpKafkaParent.mkdirs());
tmpKafkaDirs = new ArrayList<>(numKafkaServers);
[4/4] flink git commit: [FLINK-5541] Missing null check for localJar
in FlinkSubmitter#submitTopology
Posted by ch...@apache.org.
[FLINK-5541] Missing null check for localJar in FlinkSubmitter#submitTopology
This closes #4315.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9beccec4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9beccec4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9beccec4
Branch: refs/heads/master
Commit: 9beccec4501e84cf8c9fa5e8ee0a491afb39f4ac
Parents: ae826a6
Author: zhangminglei <zm...@163.com>
Authored: Wed Jul 19 10:46:21 2017 +0800
Committer: zentol <ch...@apache.org>
Committed: Wed Jul 19 11:39:19 2017 +0200
----------------------------------------------------------------------
.../src/main/java/org/apache/flink/storm/api/FlinkSubmitter.java | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9beccec4/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkSubmitter.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkSubmitter.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkSubmitter.java
index c36942e..5b3f609 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkSubmitter.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkSubmitter.java
@@ -22,6 +22,7 @@ import org.apache.flink.client.program.ContextEnvironment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.util.Preconditions;
import org.apache.storm.Config;
import org.apache.storm.StormSubmitter;
@@ -120,7 +121,7 @@ public class FlinkSubmitter {
// ignore
}
}
-
+ Preconditions.checkNotNull(localJar, "LocalJar must not be null.");
LOG.info("Submitting topology " + name + " in distributed mode with conf " + serConf);
client.submitTopologyWithOpts(name, localJar, topology);
} catch (final InvalidTopologyException e) {
[3/4] flink git commit: [FLINK-7197] [gelly] Missing call to
GraphAlgorithmWrappingBase#canMergeConfigurationWith()
Posted by ch...@apache.org.
[FLINK-7197] [gelly] Missing call to GraphAlgorithmWrappingBase#canMergeConfigurationWith()
Fix for methods calling the incorrect super function.
This closes #4345.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ae826a6f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ae826a6f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ae826a6f
Branch: refs/heads/master
Commit: ae826a6f784277c85067a3855a023835a924f459
Parents: 2ee6ff5
Author: Greg Hogan <co...@greghogan.com>
Authored: Fri Jul 14 15:23:29 2017 -0400
Committer: zentol <ch...@apache.org>
Committed: Wed Jul 19 11:37:58 2017 +0200
----------------------------------------------------------------------
.../org/apache/flink/graph/asm/translate/TranslateGraphIds.java | 4 +++-
.../apache/flink/graph/asm/translate/TranslateVertexValues.java | 4 +++-
2 files changed, 6 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ae826a6f/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java
index 5568775..70df655 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java
@@ -56,7 +56,9 @@ extends GraphAlgorithmWrappingGraph<OLD, VV, EV, NEW, VV, EV> {
@Override
protected boolean canMergeConfigurationWith(GraphAlgorithmWrappingBase other) {
- super.mergeConfiguration(other);
+ if (!super.canMergeConfigurationWith(other)) {
+ return false;
+ }
TranslateGraphIds rhs = (TranslateGraphIds) other;
http://git-wip-us.apache.org/repos/asf/flink/blob/ae826a6f/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java
index c9c94d7..99ea701 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java
@@ -54,7 +54,9 @@ extends GraphAlgorithmWrappingGraph<K, OLD, EV, K, NEW, EV> {
@Override
protected boolean canMergeConfigurationWith(GraphAlgorithmWrappingBase other) {
- super.mergeConfiguration(other);
+ if (!super.canMergeConfigurationWith(other)) {
+ return false;
+ }
TranslateVertexValues rhs = (TranslateVertexValues) other;