You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/12/23 20:22:51 UTC
[32/52] [abbrv] flink git commit: [FLINK-5141] [runtime] Add
'waitUntilTaskManagerRegistrationsComplete()' to MiniCluster
[FLINK-5141] [runtime] Add 'waitUntilTaskManagerRegistrationsComplete()' to MiniCluster
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3b5bdc35
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3b5bdc35
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3b5bdc35
Branch: refs/heads/master
Commit: 3b5bdc35c354e535534ec0c48c2d42f86f7f14a1
Parents: 73d27d7
Author: Stephan Ewen <se...@apache.org>
Authored: Wed Nov 30 17:35:47 2016 +0100
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Dec 23 20:54:26 2016 +0100
----------------------------------------------------------------------
.../leaderelection/LeaderAddressAndId.java | 73 ++++++++++++++++++
.../flink/runtime/minicluster/MiniCluster.java | 58 +++++++++++++-
.../minicluster/MiniClusterJobDispatcher.java | 2 +-
.../OneTimeLeaderListenerFuture.java | 60 +++++++++++++++
.../resourcemanager/ResourceManager.java | 11 +++
.../resourcemanager/ResourceManagerGateway.java | 8 ++
.../runtime/minicluster/MiniClusterITCase.java | 8 ++
.../Flip6LocalStreamEnvironment.java | 23 +++---
.../LocalStreamEnvironmentITCase.java | 81 ++++++++++++++++++++
9 files changed, 307 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/3b5bdc35/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderAddressAndId.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderAddressAndId.java b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderAddressAndId.java
new file mode 100644
index 0000000..23cd34b
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/leaderelection/LeaderAddressAndId.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.leaderelection;
+
+import java.util.UUID;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A combination of a leader address and leader id.
+ */
+public class LeaderAddressAndId {
+
+ private final String leaderAddress;
+ private final UUID leaderId;
+
+ public LeaderAddressAndId(String leaderAddress, UUID leaderId) {
+ this.leaderAddress = checkNotNull(leaderAddress);
+ this.leaderId = checkNotNull(leaderId);
+ }
+
+ // ------------------------------------------------------------------------
+
+ public String leaderAddress() {
+ return leaderAddress;
+ }
+
+ public UUID leaderId() {
+ return leaderId;
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public int hashCode() {
+ return 31 * leaderAddress.hashCode()+ leaderId.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ else if (o != null && o.getClass() == LeaderAddressAndId.class) {
+ final LeaderAddressAndId that = (LeaderAddressAndId) o;
+ return this.leaderAddress.equals(that.leaderAddress) && this.leaderId.equals(that.leaderId);
+ }
+ else {
+ return false;
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "LeaderAddressAndId (" + leaderAddress + " / " + leaderId + ')';
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/3b5bdc35/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index 3ede5b5..1b9f265 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -27,11 +27,15 @@ import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.leaderelection.LeaderAddressAndId;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.ResourceManagerRunner;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
@@ -169,6 +173,7 @@ public class MiniCluster {
final boolean singleRpc = config.getUseSingleRpcSystem();
try {
+ LOG.info("Starting Metrics Registry");
metricRegistry = createMetricRegistry(configuration);
RpcService[] jobManagerRpcServices = new RpcService[numJobManagers];
@@ -176,10 +181,12 @@ public class MiniCluster {
RpcService[] resourceManagerRpcServices = new RpcService[numResourceManagers];
// bring up all the RPC services
- if (singleRpc) {
- // one common RPC for all
- commonRpcService = createRpcService(configuration, rpcTimeout, false, null);
+ LOG.info("Starting RPC Service(s)");
+
+ // we always need the 'commonRpcService' for auxiliary calls
+ commonRpcService = createRpcService(configuration, rpcTimeout, false, null);
+ if (singleRpc) {
// set that same RPC service for all JobManagers and TaskManagers
for (int i = 0; i < numJobManagers; i++) {
jobManagerRpcServices[i] = commonRpcService;
@@ -236,7 +243,7 @@ public class MiniCluster {
configuration, haServices, metricRegistry, numTaskManagers, taskManagerRpcServices);
// bring up the dispatcher that launches JobManagers when jobs submitted
- LOG.info("Starting job dispatcher for {} JobManger(s)", numJobManagers);
+ LOG.info("Starting job dispatcher(s) for {} JobManger(s)", numJobManagers);
jobDispatcher = new MiniClusterJobDispatcher(
configuration, haServices, metricRegistry, numJobManagers, jobManagerRpcServices);
}
@@ -357,6 +364,49 @@ public class MiniCluster {
}
}
+ public void waitUntilTaskManagerRegistrationsComplete() throws Exception {
+ LeaderRetrievalService rmMasterListener = null;
+ Future<LeaderAddressAndId> addressAndIdFuture;
+
+ try {
+ synchronized (lock) {
+ checkState(running, "FlinkMiniCluster is not running");
+
+ OneTimeLeaderListenerFuture listenerFuture = new OneTimeLeaderListenerFuture();
+ rmMasterListener = haServices.getResourceManagerLeaderRetriever();
+ rmMasterListener.start(listenerFuture);
+ addressAndIdFuture = listenerFuture.future();
+ }
+
+ final LeaderAddressAndId addressAndId = addressAndIdFuture.get();
+
+ final ResourceManagerGateway resourceManager =
+ commonRpcService.connect(addressAndId.leaderAddress(), ResourceManagerGateway.class).get();
+
+ final int numTaskManagersToWaitFor = taskManagerRunners.length;
+
+ // poll and wait until enough TaskManagers are available
+ while (true) {
+ int numTaskManagersAvailable =
+ resourceManager.getNumberOfRegisteredTaskManagers(addressAndId.leaderId()).get();
+
+ if (numTaskManagersAvailable >= numTaskManagersToWaitFor) {
+ break;
+ }
+ Thread.sleep(2);
+ }
+ }
+ finally {
+ try {
+ if (rmMasterListener != null) {
+ rmMasterListener.stop();
+ }
+ } catch (Exception e) {
+ LOG.warn("Error shutting down leader listener for ResourceManager");
+ }
+ }
+ }
+
// ------------------------------------------------------------------------
// running jobs
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/3b5bdc35/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
index 8ac8eba..7fffaee 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniClusterJobDispatcher.java
@@ -143,7 +143,7 @@ public class MiniClusterJobDispatcher {
if (!shutdown) {
shutdown = true;
- LOG.info("Shutting down the dispatcher");
+ LOG.info("Shutting down the job dispatcher");
// in this shutdown code we copy the references to the stack first,
// to avoid concurrent modification
http://git-wip-us.apache.org/repos/asf/flink/blob/3b5bdc35/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/OneTimeLeaderListenerFuture.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/OneTimeLeaderListenerFuture.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/OneTimeLeaderListenerFuture.java
new file mode 100644
index 0000000..b0157d8
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/OneTimeLeaderListenerFuture.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.minicluster;
+
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
+import org.apache.flink.runtime.leaderelection.LeaderAddressAndId;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
+
+import java.util.UUID;
+
+/**
+ * A leader listener that exposes a future for the first leader notification.
+ *
+ * <p>The future can be obtained via the {@link #future()} method.
+ */
+public class OneTimeLeaderListenerFuture implements LeaderRetrievalListener {
+
+ private final FlinkCompletableFuture<LeaderAddressAndId> future;
+
+ public OneTimeLeaderListenerFuture() {
+ this.future = new FlinkCompletableFuture<>();
+ }
+
+ /**
+ * Gets the future that is completed with the leader address and ID.
+ * @return The future.
+ */
+ public FlinkFuture<LeaderAddressAndId> future() {
+ return future;
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) {
+ future.complete(new LeaderAddressAndId(leaderAddress, leaderSessionID));
+ }
+
+ @Override
+ public void handleError(Exception exception) {
+ future.completeExceptionally(exception);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/3b5bdc35/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 145cc40..76b4a86 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.concurrent.BiFunction;
import org.apache.flink.runtime.concurrent.Future;
import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.LeaderIdMismatchException;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
import org.apache.flink.runtime.leaderelection.LeaderContender;
@@ -502,6 +503,16 @@ public abstract class ResourceManager<WorkerType extends Serializable>
shutDownApplication(finalStatus, optionalDiagnostics);
}
+ @RpcMethod
+ public Integer getNumberOfRegisteredTaskManagers(UUID leaderSessionId) throws LeaderIdMismatchException {
+ if (this.leaderSessionId != null && this.leaderSessionId.equals(leaderSessionId)) {
+ return taskExecutors.size();
+ }
+ else {
+ throw new LeaderIdMismatchException(this.leaderSessionId, leaderSessionId);
+ }
+ }
+
// ------------------------------------------------------------------------
// Testing methods
// ------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/3b5bdc35/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
index 0a37bb9..8235ea7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
@@ -122,4 +122,12 @@ public interface ResourceManagerGateway extends RpcGateway {
* @param optionalDiagnostics
*/
void shutDownCluster(final ApplicationStatus finalStatus, final String optionalDiagnostics);
+
+ /**
+ * Gets the currently registered number of TaskManagers.
+ *
+ * @param leaderSessionId The leader session ID with which to address the ResourceManager.
+ * @return The future to the number of registered TaskManagers.
+ */
+ Future<Integer> getNumberOfRegisteredTaskManagers(UUID leaderSessionId);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3b5bdc35/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
index 2cf2d4d..d9a1896 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/minicluster/MiniClusterITCase.java
@@ -33,6 +33,10 @@ import org.junit.Test;
*/
public class MiniClusterITCase extends TestLogger {
+ // ------------------------------------------------------------------------
+ // Simple Job Running Tests
+ // ------------------------------------------------------------------------
+
@Test
public void runJobWithSingleRpcService() throws Exception {
MiniClusterConfiguration cfg = new MiniClusterConfiguration();
@@ -63,6 +67,10 @@ public class MiniClusterITCase extends TestLogger {
executeJob(miniCluster);
}
+ // ------------------------------------------------------------------------
+ // Utilities
+ // ------------------------------------------------------------------------
+
private static void executeJob(MiniCluster miniCluster) throws Exception {
miniCluster.start();
http://git-wip-us.apache.org/repos/asf/flink/blob/3b5bdc35/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
index a0c128e..2007d35 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/Flip6LocalStreamEnvironment.java
@@ -18,7 +18,6 @@
package org.apache.flink.streaming.api.environment;
import org.apache.flink.annotation.Public;
-import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
@@ -30,6 +29,7 @@ import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
import org.apache.flink.streaming.api.graph.StreamGraph;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -67,8 +67,9 @@ public class Flip6LocalStreamEnvironment extends StreamExecutionEnvironment {
"The Flip6LocalStreamEnvironment cannot be used when submitting a program through a client, " +
"or running in a TestEnvironment context.");
}
-
+
this.conf = config == null ? new Configuration() : config;
+ setParallelism(1);
}
/**
@@ -85,17 +86,12 @@ public class Flip6LocalStreamEnvironment extends StreamExecutionEnvironment {
StreamGraph streamGraph = getStreamGraph();
streamGraph.setJobName(jobName);
- JobGraph jobGraph = streamGraph.getJobGraph();
+ // TODO - temp fix to enforce restarts due to a bug in the allocation protocol
+ streamGraph.getExecutionConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 5));
+ JobGraph jobGraph = streamGraph.getJobGraph();
jobGraph.setAllowQueuedScheduling(true);
- // As jira FLINK-5140 described,
- // we have to set restart strategy to handle NoResourceAvailableException.
- ExecutionConfig executionConfig = new ExecutionConfig();
- executionConfig.setRestartStrategy(
- RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 1000));
- jobGraph.setExecutionConfig(executionConfig);
-
Configuration configuration = new Configuration();
configuration.addAll(jobGraph.getJobConfiguration());
configuration.setLong(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, -1L);
@@ -105,7 +101,8 @@ public class Flip6LocalStreamEnvironment extends StreamExecutionEnvironment {
MiniClusterConfiguration cfg = new MiniClusterConfiguration(configuration);
- // Currently we do not reuse slot anymore, so we need to sum all parallelism of vertices up.
+ // Currently we do not reuse slot anymore,
+ // so we need to sum up the parallelism of all vertices
int slotsCount = 0;
for (JobVertex jobVertex : jobGraph.getVertices()) {
slotsCount += jobVertex.getParallelism();
@@ -119,8 +116,10 @@ public class Flip6LocalStreamEnvironment extends StreamExecutionEnvironment {
MiniCluster miniCluster = new MiniCluster(cfg);
try {
miniCluster.start();
+ miniCluster.waitUntilTaskManagerRegistrationsComplete();
return miniCluster.runJobBlocking(jobGraph);
- } finally {
+ }
+ finally {
transformations.clear();
miniCluster.shutdown();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/3b5bdc35/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironmentITCase.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironmentITCase.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironmentITCase.java
new file mode 100644
index 0000000..a360d0e
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironmentITCase.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.environment;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.streaming.api.datastream.DataStream;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+@SuppressWarnings("serial")
+public class LocalStreamEnvironmentITCase {
+
+ /**
+ * Test test verifies that the execution environment can be used to execute a
+ * single job with multiple slots.
+ */
+ @Test
+ public void testRunIsolatedJob() throws Exception {
+ Flip6LocalStreamEnvironment env = new Flip6LocalStreamEnvironment();
+ assertEquals(1, env.getParallelism());
+
+ addSmallBoundedJob(env, 3);
+ env.execute();
+ }
+
+ /**
+ * Test test verifies that the execution environment can be used to execute multiple
+ * bounded streaming jobs after one another.
+ */
+ @Test
+ public void testMultipleJobsAfterAnother() throws Exception {
+ Flip6LocalStreamEnvironment env = new Flip6LocalStreamEnvironment();
+
+ addSmallBoundedJob(env, 3);
+ env.execute();
+
+ addSmallBoundedJob(env, 5);
+ env.execute();
+ }
+
+ // ------------------------------------------------------------------------
+
+ private static void addSmallBoundedJob(StreamExecutionEnvironment env, int parallelism) {
+ DataStream<Long> stream = env
+ .generateSequence(1, 100)
+ .setParallelism(parallelism)
+ .slotSharingGroup("group_1");
+
+ stream
+ .filter(new FilterFunction<Long>() {
+ @Override
+ public boolean filter(Long value) {
+ return false;
+ }
+ })
+ .setParallelism(parallelism)
+ .startNewChain()
+ .slotSharingGroup("group_2")
+
+ .print()
+ .setParallelism(parallelism);
+ }
+}