You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/07/19 14:52:17 UTC

[1/4] flink git commit: [FLINK-7102] improve ClassLoaderITCase

Repository: flink
Updated Branches:
  refs/heads/master 8a43b9c0b -> 9beccec45


[FLINK-7102] improve ClassLoaderITCase

* ClassLoaderITCase unnecessarily runs multiple tests in a single test case
* ClassLoaderITCase#testDisposeSavepointWithCustomKvState() does not cancel its
  job (thus the order of execution of test cases defines the outcome)
* ClassLoaderITCase uses e.getCause().getCause() which may cause
  {{NullPointerException}}s hiding the original error

This closes #4255.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5ac1324b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5ac1324b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5ac1324b

Branch: refs/heads/master
Commit: 5ac1324b9eb89afb87de7810e7bd4b1209b544b6
Parents: 8a43b9c
Author: Nico Kruber <ni...@data-artisans.com>
Authored: Tue Jul 4 13:49:55 2017 +0200
Committer: zentol <ch...@apache.org>
Committed: Wed Jul 19 11:33:35 2017 +0200

----------------------------------------------------------------------
 .../test/classloading/ClassLoaderITCase.java    | 255 +++++++++++--------
 1 file changed, 145 insertions(+), 110 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5ac1324b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
index 31b6bcc..98bb0ea 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/classloading/ClassLoaderITCase.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.client.JobCancellationException;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.messages.JobManagerMessages;
@@ -42,9 +43,13 @@ import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.test.util.TestEnvironment;
 import org.apache.flink.util.TestLogger;
 
+import org.hamcrest.Matchers;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 import org.junit.rules.TemporaryFolder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -61,8 +66,11 @@ import scala.concurrent.Future;
 import scala.concurrent.duration.Deadline;
 import scala.concurrent.duration.FiniteDuration;
 
-import static org.junit.Assert.assertEquals;
+import static org.hamcrest.CoreMatchers.equalTo;
+import static org.hamcrest.CoreMatchers.isA;
+import static org.hamcrest.Matchers.hasProperty;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Test job classloader.
@@ -87,16 +95,18 @@ public class ClassLoaderITCase extends TestLogger {
 
 	private static final String CHECKPOINTING_CUSTOM_KV_STATE_JAR_PATH = "checkpointing_custom_kv_state-test-jar.jar";
 
+	@ClassRule
 	public static final TemporaryFolder FOLDER = new TemporaryFolder();
 
+	@Rule
+	public ExpectedException expectedException = ExpectedException.none();
+
 	private static TestingCluster testCluster;
 
 	private static int parallelism;
 
 	@BeforeClass
 	public static void setUp() throws Exception {
-		FOLDER.create();
-
 		Configuration config = new Configuration();
 		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
 		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 2);
@@ -121,133 +131,150 @@ public class ClassLoaderITCase extends TestLogger {
 			testCluster.shutdown();
 		}
 
-		FOLDER.delete();
-
 		TestStreamEnvironment.unsetAsContext();
 		TestEnvironment.unsetAsContext();
 	}
 
 	@Test
-	public void testJobsWithCustomClassLoader() throws IOException, ProgramInvocationException {
-		try {
-			int port = testCluster.getLeaderRPCPort();
+	public void testCustomSplitJobWithCustomClassLoaderJar() throws IOException, ProgramInvocationException {
 
-			PackagedProgram inputSplitTestProg = new PackagedProgram(new File(INPUT_SPLITS_PROG_JAR_FILE));
+		PackagedProgram inputSplitTestProg = new PackagedProgram(new File(INPUT_SPLITS_PROG_JAR_FILE));
 
-			TestEnvironment.setAsContext(
-				testCluster,
-				parallelism,
-				Collections.singleton(new Path(INPUT_SPLITS_PROG_JAR_FILE)),
-				Collections.<URL>emptyList());
+		TestEnvironment.setAsContext(
+			testCluster,
+			parallelism,
+			Collections.singleton(new Path(INPUT_SPLITS_PROG_JAR_FILE)),
+			Collections.<URL>emptyList());
 
-			inputSplitTestProg.invokeInteractiveModeForExecution();
+		inputSplitTestProg.invokeInteractiveModeForExecution();
+	}
 
-			PackagedProgram streamingInputSplitTestProg = new PackagedProgram(new File(STREAMING_INPUT_SPLITS_PROG_JAR_FILE));
+	@Test
+	public void testStreamingCustomSplitJobWithCustomClassLoader() throws IOException, ProgramInvocationException {
+		PackagedProgram streamingInputSplitTestProg = new PackagedProgram(new File(STREAMING_INPUT_SPLITS_PROG_JAR_FILE));
 
-			TestStreamEnvironment.setAsContext(
-				testCluster,
-				parallelism,
-				Collections.singleton(new Path(STREAMING_INPUT_SPLITS_PROG_JAR_FILE)),
-				Collections.<URL>emptyList());
+		TestStreamEnvironment.setAsContext(
+			testCluster,
+			parallelism,
+			Collections.singleton(new Path(STREAMING_INPUT_SPLITS_PROG_JAR_FILE)),
+			Collections.<URL>emptyList());
 
-			streamingInputSplitTestProg.invokeInteractiveModeForExecution();
+		streamingInputSplitTestProg.invokeInteractiveModeForExecution();
+	}
 
-			URL classpath = new File(INPUT_SPLITS_PROG_JAR_FILE).toURI().toURL();
-			PackagedProgram inputSplitTestProg2 = new PackagedProgram(new File(INPUT_SPLITS_PROG_JAR_FILE));
+	@Test
+	public void testCustomSplitJobWithCustomClassLoaderPath() throws IOException, ProgramInvocationException {
+		URL classpath = new File(INPUT_SPLITS_PROG_JAR_FILE).toURI().toURL();
+		PackagedProgram inputSplitTestProg2 = new PackagedProgram(new File(INPUT_SPLITS_PROG_JAR_FILE));
 
-			TestEnvironment.setAsContext(
-				testCluster,
-				parallelism,
-				Collections.<Path>emptyList(),
-				Collections.singleton(classpath));
+		TestEnvironment.setAsContext(
+			testCluster,
+			parallelism,
+			Collections.<Path>emptyList(),
+			Collections.singleton(classpath));
 
-			inputSplitTestProg2.invokeInteractiveModeForExecution();
+		inputSplitTestProg2.invokeInteractiveModeForExecution();
+	}
 
-			// regular streaming job
-			PackagedProgram streamingProg = new PackagedProgram(new File(STREAMING_PROG_JAR_FILE));
+	@Test
+	public void testStreamingClassloaderJobWithCustomClassLoader() throws IOException, ProgramInvocationException {
+		// regular streaming job
+		PackagedProgram streamingProg = new PackagedProgram(new File(STREAMING_PROG_JAR_FILE));
 
-			TestStreamEnvironment.setAsContext(
-				testCluster,
-				parallelism,
-				Collections.singleton(new Path(STREAMING_PROG_JAR_FILE)),
-				Collections.<URL>emptyList());
+		TestStreamEnvironment.setAsContext(
+			testCluster,
+			parallelism,
+			Collections.singleton(new Path(STREAMING_PROG_JAR_FILE)),
+			Collections.<URL>emptyList());
 
-			streamingProg.invokeInteractiveModeForExecution();
+		streamingProg.invokeInteractiveModeForExecution();
+	}
 
-			// checkpointed streaming job with custom classes for the checkpoint (FLINK-2543)
-			// the test also ensures that user specific exceptions are serializable between JobManager <--> JobClient.
-			try {
-				PackagedProgram streamingCheckpointedProg = new PackagedProgram(new File(STREAMING_CHECKPOINTED_PROG_JAR_FILE));
+	@Test
+	public void testCheckpointedStreamingClassloaderJobWithCustomClassLoader() throws IOException, ProgramInvocationException {
+		// checkpointed streaming job with custom classes for the checkpoint (FLINK-2543)
+		// the test also ensures that user specific exceptions are serializable between JobManager <--> JobClient.
+		PackagedProgram streamingCheckpointedProg = new PackagedProgram(new File(STREAMING_CHECKPOINTED_PROG_JAR_FILE));
 
-				TestStreamEnvironment.setAsContext(
-					testCluster,
-					parallelism,
-					Collections.singleton(new Path(STREAMING_CHECKPOINTED_PROG_JAR_FILE)),
-					Collections.<URL>emptyList());
+		TestStreamEnvironment.setAsContext(
+			testCluster,
+			parallelism,
+			Collections.singleton(new Path(STREAMING_CHECKPOINTED_PROG_JAR_FILE)),
+			Collections.<URL>emptyList());
+
+		// Program should terminate with a 'SuccessException':
+		// we can not access the SuccessException here when executing the tests with maven, because its not available in the jar.
+		expectedException.expectCause(
+			Matchers.<Throwable>hasProperty("cause",
+				hasProperty("class",
+					hasProperty("canonicalName", equalTo(
+						"org.apache.flink.test.classloading.jar.CheckpointedStreamingProgram.SuccessException")))));
+
+		streamingCheckpointedProg.invokeInteractiveModeForExecution();
+	}
 
-				streamingCheckpointedProg.invokeInteractiveModeForExecution();
-			} catch (Exception e) {
-				// we can not access the SuccessException here when executing the tests with maven, because its not available in the jar.
-				assertEquals("Program should terminate with a 'SuccessException'",
-						"org.apache.flink.test.classloading.jar.CheckpointedStreamingProgram.SuccessException",
-						e.getCause().getCause().getClass().getCanonicalName());
-			}
+	@Test
+	public void testKMeansJobWithCustomClassLoader() throws IOException, ProgramInvocationException {
+		PackagedProgram kMeansProg = new PackagedProgram(
+			new File(KMEANS_JAR_PATH),
+			new String[] {
+				KMeansData.DATAPOINTS,
+				KMeansData.INITIAL_CENTERS,
+				"25"
+			});
+
+		TestEnvironment.setAsContext(
+			testCluster,
+			parallelism,
+			Collections.singleton(new Path(KMEANS_JAR_PATH)),
+			Collections.<URL>emptyList());
 
-			PackagedProgram kMeansProg = new PackagedProgram(
-					new File(KMEANS_JAR_PATH),
-					new String[] {
-						KMeansData.DATAPOINTS,
-						KMeansData.INITIAL_CENTERS,
-						"25"
-					});
-
-			TestEnvironment.setAsContext(
-				testCluster,
-				parallelism,
-				Collections.singleton(new Path(KMEANS_JAR_PATH)),
-				Collections.<URL>emptyList());
-
-			kMeansProg.invokeInteractiveModeForExecution();
-
-			// test FLINK-3633
-			final PackagedProgram userCodeTypeProg = new PackagedProgram(
-					new File(USERCODETYPE_JAR_PATH),
-					new String[] { USERCODETYPE_JAR_PATH,
-							"localhost",
-							String.valueOf(port),
-					});
-
-			TestEnvironment.setAsContext(
-				testCluster,
-				parallelism,
-				Collections.singleton(new Path(USERCODETYPE_JAR_PATH)),
-				Collections.<URL>emptyList());
-
-			userCodeTypeProg.invokeInteractiveModeForExecution();
-
-			File checkpointDir = FOLDER.newFolder();
-			File outputDir = FOLDER.newFolder();
-
-			final PackagedProgram program = new PackagedProgram(
-					new File(CHECKPOINTING_CUSTOM_KV_STATE_JAR_PATH),
-					new String[] {
-							checkpointDir.toURI().toString(),
-							outputDir.toURI().toString()
-					});
-
-			TestStreamEnvironment.setAsContext(
-				testCluster,
-				parallelism,
-				Collections.singleton(new Path(CHECKPOINTING_CUSTOM_KV_STATE_JAR_PATH)),
-				Collections.<URL>emptyList());
-
-			program.invokeInteractiveModeForExecution();
-
-		} catch (Exception e) {
-			if (!(e.getCause().getCause() instanceof SuccessException)) {
-				throw e;
-			}
-		}
+		kMeansProg.invokeInteractiveModeForExecution();
+	}
+
+	@Test
+	public void testUserCodeTypeJobWithCustomClassLoader() throws IOException, ProgramInvocationException {
+		int port = testCluster.getLeaderRPCPort();
+
+		// test FLINK-3633
+		final PackagedProgram userCodeTypeProg = new PackagedProgram(
+			new File(USERCODETYPE_JAR_PATH),
+			new String[] { USERCODETYPE_JAR_PATH,
+				"localhost",
+				String.valueOf(port),
+			});
+
+		TestEnvironment.setAsContext(
+			testCluster,
+			parallelism,
+			Collections.singleton(new Path(USERCODETYPE_JAR_PATH)),
+			Collections.<URL>emptyList());
+
+		userCodeTypeProg.invokeInteractiveModeForExecution();
+	}
+
+	@Test
+	public void testCheckpointingCustomKvStateJobWithCustomClassLoader() throws IOException, ProgramInvocationException {
+		File checkpointDir = FOLDER.newFolder();
+		File outputDir = FOLDER.newFolder();
+
+		final PackagedProgram program = new PackagedProgram(
+			new File(CHECKPOINTING_CUSTOM_KV_STATE_JAR_PATH),
+			new String[] {
+				checkpointDir.toURI().toString(),
+				outputDir.toURI().toString()
+			});
+
+		TestStreamEnvironment.setAsContext(
+			testCluster,
+			parallelism,
+			Collections.singleton(new Path(CHECKPOINTING_CUSTOM_KV_STATE_JAR_PATH)),
+			Collections.<URL>emptyList());
+
+		expectedException.expectCause(
+			Matchers.<Throwable>hasProperty("cause", isA(SuccessException.class)));
+
+		program.invokeInteractiveModeForExecution();
 	}
 
 	/**
@@ -283,7 +310,10 @@ public class ClassLoaderITCase extends TestLogger {
 				try {
 					program.invokeInteractiveModeForExecution();
 				} catch (ProgramInvocationException ignored) {
-					ignored.printStackTrace();
+					if (ignored.getCause() == null ||
+						!(ignored.getCause() instanceof JobCancellationException)) {
+						ignored.printStackTrace();
+					}
 				}
 			}
 		});
@@ -356,5 +386,10 @@ public class ClassLoaderITCase extends TestLogger {
 		} else {
 			throw new IllegalStateException("Unexpected response to DisposeSavepoint");
 		}
+
+		// Cancel job, wait for success
+		Future<?> cancelFuture = jm.ask(new JobManagerMessages.CancelJob(jobId), deadline.timeLeft());
+		Object response = Await.result(cancelFuture, deadline.timeLeft());
+		assertTrue("Unexpected response: " + response, response instanceof JobManagerMessages.CancellationSuccess);
 	}
 }


[2/4] flink git commit: [FLINK-7222] [kafka] Fix invalid symbol * when create directory on windows

Posted by ch...@apache.org.
[FLINK-7222] [kafka] Fix invalid symbol * when create directory on windows

This closes #4361.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2ee6ff59
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2ee6ff59
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2ee6ff59

Branch: refs/heads/master
Commit: 2ee6ff59819b6a8f9035fd0abeb5ffedc8052f4b
Parents: 5ac1324
Author: zjureel <zj...@gmail.com>
Authored: Wed Jul 19 09:44:53 2017 +0800
Committer: zentol <ch...@apache.org>
Committed: Wed Jul 19 11:35:16 2017 +0200

----------------------------------------------------------------------
 .../flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java | 2 +-
 .../flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2ee6ff59/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index c7e793a..051d91e 100644
--- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -223,7 +223,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 		tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + (UUID.randomUUID().toString()));
 		assertTrue("cannot create zookeeper temp dir", tmpZkDir.mkdirs());
 
-		tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir*" + (UUID.randomUUID().toString()));
+		tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir-" + (UUID.randomUUID().toString()));
 		assertTrue("cannot create kafka temp dir", tmpKafkaParent.mkdirs());
 
 		tmpKafkaDirs = new ArrayList<>(numKafkaServers);

http://git-wip-us.apache.org/repos/asf/flink/blob/2ee6ff59/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index ab82ef3..710d917 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -214,7 +214,7 @@ public class KafkaTestEnvironmentImpl extends KafkaTestEnvironment {
 		tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + (UUID.randomUUID().toString()));
 		assertTrue("cannot create zookeeper temp dir", tmpZkDir.mkdirs());
 
-		tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir*" + (UUID.randomUUID().toString()));
+		tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir-" + (UUID.randomUUID().toString()));
 		assertTrue("cannot create kafka temp dir", tmpKafkaParent.mkdirs());
 
 		tmpKafkaDirs = new ArrayList<>(numKafkaServers);


[4/4] flink git commit: [FLINK-5541] Missing null check for localJar in FlinkSubmitter#submitTopology

Posted by ch...@apache.org.
[FLINK-5541] Missing null check for localJar in FlinkSubmitter#submitTopology

This closes #4315.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9beccec4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9beccec4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9beccec4

Branch: refs/heads/master
Commit: 9beccec4501e84cf8c9fa5e8ee0a491afb39f4ac
Parents: ae826a6
Author: zhangminglei <zm...@163.com>
Authored: Wed Jul 19 10:46:21 2017 +0800
Committer: zentol <ch...@apache.org>
Committed: Wed Jul 19 11:39:19 2017 +0200

----------------------------------------------------------------------
 .../src/main/java/org/apache/flink/storm/api/FlinkSubmitter.java  | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9beccec4/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkSubmitter.java
----------------------------------------------------------------------
diff --git a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkSubmitter.java b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkSubmitter.java
index c36942e..5b3f609 100644
--- a/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkSubmitter.java
+++ b/flink-contrib/flink-storm/src/main/java/org/apache/flink/storm/api/FlinkSubmitter.java
@@ -22,6 +22,7 @@ import org.apache.flink.client.program.ContextEnvironment;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.util.Preconditions;
 
 import org.apache.storm.Config;
 import org.apache.storm.StormSubmitter;
@@ -120,7 +121,7 @@ public class FlinkSubmitter {
 					// ignore
 				}
 			}
-
+			Preconditions.checkNotNull(localJar, "LocalJar must not be null.");
 			LOG.info("Submitting topology " + name + " in distributed mode with conf " + serConf);
 			client.submitTopologyWithOpts(name, localJar, topology);
 		} catch (final InvalidTopologyException e) {


[3/4] flink git commit: [FLINK-7197] [gelly] Missing call to GraphAlgorithmWrappingBase#canMergeConfigurationWith()

Posted by ch...@apache.org.
[FLINK-7197] [gelly] Missing call to GraphAlgorithmWrappingBase#canMergeConfigurationWith()

Fix for methods calling the incorrect super function.

This closes #4345.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ae826a6f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ae826a6f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ae826a6f

Branch: refs/heads/master
Commit: ae826a6f784277c85067a3855a023835a924f459
Parents: 2ee6ff5
Author: Greg Hogan <co...@greghogan.com>
Authored: Fri Jul 14 15:23:29 2017 -0400
Committer: zentol <ch...@apache.org>
Committed: Wed Jul 19 11:37:58 2017 +0200

----------------------------------------------------------------------
 .../org/apache/flink/graph/asm/translate/TranslateGraphIds.java  | 4 +++-
 .../apache/flink/graph/asm/translate/TranslateVertexValues.java  | 4 +++-
 2 files changed, 6 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ae826a6f/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java
index 5568775..70df655 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java
@@ -56,7 +56,9 @@ extends GraphAlgorithmWrappingGraph<OLD, VV, EV, NEW, VV, EV> {
 
 	@Override
 	protected boolean canMergeConfigurationWith(GraphAlgorithmWrappingBase other) {
-		super.mergeConfiguration(other);
+		if (!super.canMergeConfigurationWith(other)) {
+			return false;
+		}
 
 		TranslateGraphIds rhs = (TranslateGraphIds) other;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ae826a6f/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java
index c9c94d7..99ea701 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java
@@ -54,7 +54,9 @@ extends GraphAlgorithmWrappingGraph<K, OLD, EV, K, NEW, EV> {
 
 	@Override
 	protected boolean canMergeConfigurationWith(GraphAlgorithmWrappingBase other) {
-		super.mergeConfiguration(other);
+		if (!super.canMergeConfigurationWith(other)) {
+			return false;
+		}
 
 		TranslateVertexValues rhs = (TranslateVertexValues) other;