You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/12/31 16:14:07 UTC

flink git commit: [FLINK-8171] [flip6] Remove work arounds from Flip6LocalStreamEnvironment

Repository: flink
Updated Branches:
  refs/heads/master b5db8d908 -> 8e7a71c05


[FLINK-8171] [flip6] Remove work arounds from Flip6LocalStreamEnvironment

It is no longer needed to wait for the registration of task managers and
to not use slot sharing when submitting jobs to the Flip-6 MiniCluster.
Therefore, we can remove these work arounds from the
Flip6LocalStreamEnvironment.

Adapt comment in RestClusterClient

This closes #5101.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8e7a71c0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8e7a71c0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8e7a71c0

Branch: refs/heads/master
Commit: 8e7a71c053135fdce4073eaf8022d5727d87b5fa
Parents: b5db8d9
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Nov 29 16:53:59 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Dec 29 18:29:53 2017 +0100

----------------------------------------------------------------------
 .../client/program/rest/RestClusterClient.java   |  2 +-
 .../environment/Flip6LocalStreamEnvironment.java | 16 ++--------------
 .../LocalStreamEnvironmentITCase.java            | 19 +++----------------
 3 files changed, 6 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8e7a71c0/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index 8c0462d..eedcee1 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -98,7 +98,7 @@ public class RestClusterClient extends ClusterClient {
 	protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
 		log.info("Submitting job.");
 		try {
-			// temporary hack for FLIP-6 since slot-sharing isn't implemented yet
+			// we have to enable queued scheduling because slot will be allocated lazily
 			jobGraph.setAllowQueuedScheduling(true);
 			submitJob(jobGraph);
 		} catch (JobSubmissionException e) {

http://git-wip-us.apache.org/repos/asf/flink/blob/8e7a71c0/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
index e276ac7..8720e7a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
@@ -20,12 +20,10 @@ package org.apache.flink.streaming.api.environment;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.InvalidProgramException;
 import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.minicluster.MiniCluster;
 import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
 import org.apache.flink.streaming.api.graph.StreamGraph;
@@ -86,9 +84,6 @@ public class Flip6LocalStreamEnvironment extends StreamExecutionEnvironment {
 		StreamGraph streamGraph = getStreamGraph();
 		streamGraph.setJobName(jobName);
 
-		// TODO - temp fix to enforce restarts due to a bug in the allocation protocol
-		streamGraph.getExecutionConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 5));
-
 		JobGraph jobGraph = streamGraph.getJobGraph();
 		jobGraph.setAllowQueuedScheduling(true);
 
@@ -99,16 +94,9 @@ public class Flip6LocalStreamEnvironment extends StreamExecutionEnvironment {
 		// add (and override) the settings with what the user defined
 		configuration.addAll(this.conf);
 
-		// Currently we do not reuse slot anymore,
-		// so we need to sum up the parallelism of all vertices
-		int slotsCount = 0;
-		for (JobVertex jobVertex : jobGraph.getVertices()) {
-			slotsCount += jobVertex.getParallelism();
-		}
-
 		MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
 			.setConfiguration(configuration)
-			.setNumSlotsPerTaskManager(slotsCount)
+			.setNumSlotsPerTaskManager(jobGraph.getMaximumParallelism())
 			.build();
 
 		if (LOG.isInfoEnabled()) {
@@ -116,9 +104,9 @@ public class Flip6LocalStreamEnvironment extends StreamExecutionEnvironment {
 		}
 
 		MiniCluster miniCluster = new MiniCluster(cfg);
+
 		try {
 			miniCluster.start();
-			miniCluster.waitUntilTaskManagerRegistrationsComplete();
 			return miniCluster.runJobBlocking(jobGraph);
 		}
 		finally {

http://git-wip-us.apache.org/repos/asf/flink/blob/8e7a71c0/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironmentITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironmentITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironmentITCase.java
index c053598..f302eda 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironmentITCase.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironmentITCase.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.streaming.api.environment;
 
-import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.util.TestLogger;
 
@@ -63,23 +62,11 @@ public class LocalStreamEnvironmentITCase extends TestLogger {
 	// ------------------------------------------------------------------------
 
 	private static void addSmallBoundedJob(StreamExecutionEnvironment env, int parallelism) {
-		DataStream<Long> stream = env
-				.generateSequence(1, 100)
-					.setParallelism(parallelism)
-					.slotSharingGroup("group_1");
+		DataStream<Long> stream = env.generateSequence(1, 100).setParallelism(parallelism);
 
 		stream
-				.filter(new FilterFunction<Long>() {
-					@Override
-					public boolean filter(Long value) {
-						return false;
-					}
-				})
-					.setParallelism(parallelism)
+				.filter(ignored -> false).setParallelism(parallelism)
 					.startNewChain()
-					.slotSharingGroup("group_2")
-
-				.print()
-					.setParallelism(parallelism);
+					.print().setParallelism(parallelism);
 	}
 }