You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2017/12/31 16:14:07 UTC
flink git commit: [FLINK-8171] [flip6] Remove work arounds from
Flip6LocalStreamEnvironment
Repository: flink
Updated Branches:
refs/heads/master b5db8d908 -> 8e7a71c05
[FLINK-8171] [flip6] Remove work arounds from Flip6LocalStreamEnvironment
It is no longer needed to wait for the registration of task managers and
to not use slot sharing when submitting jobs to the Flip-6 MiniCluster.
Therefore, we can remove these work arounds from the
Flip6LocalStreamEnvironment.
Adapt comment in RestClusterClient
This closes #5101.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8e7a71c0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8e7a71c0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8e7a71c0
Branch: refs/heads/master
Commit: 8e7a71c053135fdce4073eaf8022d5727d87b5fa
Parents: b5db8d9
Author: Till Rohrmann <tr...@apache.org>
Authored: Wed Nov 29 16:53:59 2017 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Dec 29 18:29:53 2017 +0100
----------------------------------------------------------------------
.../client/program/rest/RestClusterClient.java | 2 +-
.../environment/Flip6LocalStreamEnvironment.java | 16 ++--------------
.../LocalStreamEnvironmentITCase.java | 19 +++----------------
3 files changed, 6 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8e7a71c0/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index 8c0462d..eedcee1 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -98,7 +98,7 @@ public class RestClusterClient extends ClusterClient {
protected JobSubmissionResult submitJob(JobGraph jobGraph, ClassLoader classLoader) throws ProgramInvocationException {
log.info("Submitting job.");
try {
- // temporary hack for FLIP-6 since slot-sharing isn't implemented yet
+ // we have to enable queued scheduling because slot will be allocated lazily
jobGraph.setAllowQueuedScheduling(true);
submitJob(jobGraph);
} catch (JobSubmissionException e) {
http://git-wip-us.apache.org/repos/asf/flink/blob/8e7a71c0/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
index e276ac7..8720e7a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
@@ -20,12 +20,10 @@ package org.apache.flink.streaming.api.environment;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.streaming.api.graph.StreamGraph;
@@ -86,9 +84,6 @@ public class Flip6LocalStreamEnvironment extends StreamExecutionEnvironment {
StreamGraph streamGraph = getStreamGraph();
streamGraph.setJobName(jobName);
- // TODO - temp fix to enforce restarts due to a bug in the allocation protocol
- streamGraph.getExecutionConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 5));
-
JobGraph jobGraph = streamGraph.getJobGraph();
jobGraph.setAllowQueuedScheduling(true);
@@ -99,16 +94,9 @@ public class Flip6LocalStreamEnvironment extends StreamExecutionEnvironment {
// add (and override) the settings with what the user defined
configuration.addAll(this.conf);
- // Currently we do not reuse slot anymore,
- // so we need to sum up the parallelism of all vertices
- int slotsCount = 0;
- for (JobVertex jobVertex : jobGraph.getVertices()) {
- slotsCount += jobVertex.getParallelism();
- }
-
MiniClusterConfiguration cfg = new MiniClusterConfiguration.Builder()
.setConfiguration(configuration)
- .setNumSlotsPerTaskManager(slotsCount)
+ .setNumSlotsPerTaskManager(jobGraph.getMaximumParallelism())
.build();
if (LOG.isInfoEnabled()) {
@@ -116,9 +104,9 @@ public class Flip6LocalStreamEnvironment extends StreamExecutionEnvironment {
}
MiniCluster miniCluster = new MiniCluster(cfg);
+
try {
miniCluster.start();
- miniCluster.waitUntilTaskManagerRegistrationsComplete();
return miniCluster.runJobBlocking(jobGraph);
}
finally {
http://git-wip-us.apache.org/repos/asf/flink/blob/8e7a71c0/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironmentITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironmentITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironmentITCase.java
index c053598..f302eda 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironmentITCase.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironmentITCase.java
@@ -18,7 +18,6 @@
package org.apache.flink.streaming.api.environment;
-import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.util.TestLogger;
@@ -63,23 +62,11 @@ public class LocalStreamEnvironmentITCase extends TestLogger {
// ------------------------------------------------------------------------
private static void addSmallBoundedJob(StreamExecutionEnvironment env, int parallelism) {
- DataStream<Long> stream = env
- .generateSequence(1, 100)
- .setParallelism(parallelism)
- .slotSharingGroup("group_1");
+ DataStream<Long> stream = env.generateSequence(1, 100).setParallelism(parallelism);
stream
- .filter(new FilterFunction<Long>() {
- @Override
- public boolean filter(Long value) {
- return false;
- }
- })
- .setParallelism(parallelism)
+ .filter(ignored -> false).setParallelism(parallelism)
.startNewChain()
- .slotSharingGroup("group_2")
-
- .print()
- .setParallelism(parallelism);
+ .print().setParallelism(parallelism);
}
}