You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2019/01/17 16:28:16 UTC

[flink] branch master updated: [FLINK-10558][Yarn tests] Port YARNHighAvailabilityITCase to new code base

This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 8f5cd0d  [FLINK-10558][Yarn tests] Port YARNHighAvailabilityITCase to new code base
8f5cd0d is described below

commit 8f5cd0db28ade5a285211bc6949541d1cf4cfa70
Author: Till Rohrmann <tr...@apache.org>
AuthorDate: Sun Oct 28 03:16:15 2018 +0800

    [FLINK-10558][Yarn tests] Port YARNHighAvailabilityITCase to new code base
    
    This commit ports the YARNHighAvailabilityITCase to the new code base. It now uses
    the YarnClusterDescriptor and the RestClusterClient to directly interact with the
    deployed Flink cluster.
    
    Since the test uses the pkill command to kill the application master, the test only
    runs on Linux, MacOS, FreeBSD and Solaris. On Windows this test won't be executed.
    
    This closes #7509.
---
 .../flink/yarn/YARNHighAvailabilityITCase.java     | 310 +++++++++++----------
 .../java/org/apache/flink/yarn/YARNITCase.java     |   2 +-
 .../java/org/apache/flink/yarn/YarnTestBase.java   |  18 +-
 .../org/apache/flink/yarn/testjob/YarnTestJob.java |  73 +++++
 4 files changed, 249 insertions(+), 154 deletions(-)

diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
index 9a8f503..1a2eb92 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNHighAvailabilityITCase.java
@@ -18,43 +18,54 @@
 
 package org.apache.flink.yarn;
 
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobSubmissionResult;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.client.deployment.ClusterDeploymentException;
 import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.configuration.CheckpointingOptions;
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.concurrent.Executors;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
-import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
-import org.apache.flink.runtime.messages.Acknowledge;
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
-import org.apache.flink.runtime.util.LeaderRetrievalUtils;
-
-import akka.actor.ActorSystem;
-import akka.actor.PoisonPill;
-import akka.testkit.JavaTestKit;
+import org.apache.flink.configuration.ResourceManagerOptions;
+import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.util.OperatingSystem;
+import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
+import org.apache.flink.yarn.entrypoint.YarnSessionClusterEntrypoint;
+import org.apache.flink.yarn.testjob.YarnTestJob;
+import org.apache.flink.yarn.util.YarnTestUtils;
+
 import org.apache.curator.test.TestingServer;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
-import org.junit.Rule;
+import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
-import java.io.File;
-import java.util.Arrays;
-import java.util.concurrent.TimeUnit;
-
-import scala.concurrent.duration.FiniteDuration;
+import javax.annotation.Nonnull;
 
+import java.io.File;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeoutException;
+
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assume.assumeTrue;
 
 /**
@@ -62,30 +73,26 @@ import static org.junit.Assume.assumeTrue;
  */
 public class YARNHighAvailabilityITCase extends YarnTestBase {
 
-	private static TestingServer zkServer;
-
-	private static ActorSystem actorSystem;
+	@ClassRule
+	public static final TemporaryFolder FOLDER = new TemporaryFolder();
 
-	private static final int numberApplicationAttempts = 3;
+	private static final String LOG_DIR = "flink-yarn-tests-ha";
+	private static final Duration TIMEOUT = Duration.ofSeconds(200L);
+	private static final long RETRY_TIMEOUT = 100L;
 
-	@Rule
-	public TemporaryFolder temp = new TemporaryFolder();
+	private static TestingServer zkServer;
+	private static String storageDir;
 
 	@BeforeClass
-	public static void setup() {
-		actorSystem = AkkaUtils.createDefaultActorSystem();
-
-		try {
-			zkServer = new TestingServer();
-			zkServer.start();
-		} catch (Exception e) {
-			e.printStackTrace();
-			Assert.fail("Could not start ZooKeeper testing cluster.");
-		}
+	public static void setup() throws Exception {
+		zkServer = new TestingServer();
 
-		YARN_CONFIGURATION.set(YarnTestBase.TEST_CLUSTER_NAME_KEY, "flink-yarn-tests-ha");
-		YARN_CONFIGURATION.set(YarnConfiguration.RM_AM_MAX_ATTEMPTS, "" + numberApplicationAttempts);
+		storageDir = FOLDER.newFolder().getAbsolutePath();
 
+		// startYARNWithConfig should be implemented by subclass
+		YARN_CONFIGURATION.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, ResourceScheduler.class);
+		YARN_CONFIGURATION.set(YarnTestBase.TEST_CLUSTER_NAME_KEY, LOG_DIR);
+		YARN_CONFIGURATION.setInt(YarnConfiguration.NM_PMEM_MB, 4096);
 		startYARNWithConfig(YARN_CONFIGURATION);
 	}
 
@@ -93,123 +100,122 @@ public class YARNHighAvailabilityITCase extends YarnTestBase {
 	public static void teardown() throws Exception {
 		if (zkServer != null) {
 			zkServer.stop();
+			zkServer = null;
 		}
-
-		JavaTestKit.shutdownActorSystem(actorSystem);
-		actorSystem = null;
 	}
 
 	/**
-	 * Tests that the application master can be killed multiple times and that the surviving
-	 * TaskManager successfully reconnects to the newly started JobManager.
-	 * @throws Exception
+	 * Tests that Yarn will restart a killed {@link YarnSessionClusterEntrypoint} which will then resume
+	 * a persisted {@link JobGraph}.
 	 */
 	@Test
-	public void testMultipleAMKill() throws Exception {
-		assumeTrue("This test only works with the old actor based code.", !isNewMode);
-		final int numberKillingAttempts = numberApplicationAttempts - 1;
-		String confDirPath = System.getenv(ConfigConstants.ENV_FLINK_CONF_DIR);
-		final Configuration configuration = GlobalConfiguration.loadConfiguration();
-		TestingYarnClusterDescriptor flinkYarnClient = new TestingYarnClusterDescriptor(
-			configuration,
-			getYarnConfiguration(),
-			confDirPath,
-			getYarnClient(),
-			true);
-
-		Assert.assertNotNull("unable to get yarn client", flinkYarnClient);
-		flinkYarnClient.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
-		flinkYarnClient.addShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
-
-		String fsStateHandlePath = temp.getRoot().getPath();
-
-		// load the configuration
-		File configDirectory = new File(confDirPath);
-		GlobalConfiguration.loadConfiguration(configDirectory.getAbsolutePath());
-
-		flinkYarnClient.setDynamicPropertiesEncoded("recovery.mode=zookeeper@@recovery.zookeeper.quorum=" +
-			zkServer.getConnectString() + "@@yarn.application-attempts=" + numberApplicationAttempts +
-			"@@" + CheckpointingOptions.STATE_BACKEND.key() + "=FILESYSTEM" +
-			"@@" + CheckpointingOptions.CHECKPOINTS_DIRECTORY + "=" + fsStateHandlePath + "/checkpoints" +
-			"@@" + HighAvailabilityOptions.HA_STORAGE_PATH.key() + "=" + fsStateHandlePath + "/recovery");
-
-		ClusterClient<ApplicationId> yarnClusterClient = null;
-
-		final FiniteDuration timeout = new FiniteDuration(2, TimeUnit.MINUTES);
-
-		HighAvailabilityServices highAvailabilityServices = null;
-
-		final ClusterSpecification clusterSpecification = new ClusterSpecification.ClusterSpecificationBuilder()
-			.setMasterMemoryMB(768)
-			.setTaskManagerMemoryMB(1024)
-			.setNumberTaskManagers(1)
-			.setSlotsPerTaskManager(1)
-			.createClusterSpecification();
-
-		try {
-			yarnClusterClient = flinkYarnClient.deploySessionCluster(clusterSpecification);
-
-			highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(
-				yarnClusterClient.getFlinkConfiguration(),
-				Executors.directExecutor(),
-				HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);
-
-			final HighAvailabilityServices finalHighAvailabilityServices = highAvailabilityServices;
-
-			new JavaTestKit(actorSystem) {{
-				for (int attempt = 0; attempt < numberKillingAttempts; attempt++) {
-					new Within(timeout) {
-						@Override
-						protected void run() {
-							try {
-								ActorGateway gateway = LeaderRetrievalUtils.retrieveLeaderGateway(
-									finalHighAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
-									actorSystem,
-									timeout);
-								ActorGateway selfGateway = new AkkaActorGateway(getRef(), gateway.leaderSessionID());
-
-								gateway.tell(new TestingJobManagerMessages.NotifyWhenAtLeastNumTaskManagerAreRegistered(1), selfGateway);
-
-								expectMsgEquals(Acknowledge.get());
-
-								gateway.tell(PoisonPill.getInstance());
-							} catch (Exception e) {
-								throw new AssertionError("Could not complete test.", e);
-							}
-						}
-					};
-				}
-
-				new Within(timeout) {
-					@Override
-					protected void run() {
-						try {
-							ActorGateway gateway = LeaderRetrievalUtils.retrieveLeaderGateway(
-								finalHighAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID),
-								actorSystem,
-								timeout);
-							ActorGateway selfGateway = new AkkaActorGateway(getRef(), gateway.leaderSessionID());
-
-							gateway.tell(new TestingJobManagerMessages.NotifyWhenAtLeastNumTaskManagerAreRegistered(1), selfGateway);
-
-							expectMsgEquals(Acknowledge.get());
-						} catch (Exception e) {
-							throw new AssertionError("Could not complete test.", e);
-						}
-					}
-				};
-
-			}};
-		} finally {
-			if (yarnClusterClient != null) {
-				log.info("Shutting down the Flink Yarn application.");
-				yarnClusterClient.shutDownCluster();
-				yarnClusterClient.shutdown();
-			}
-
-			if (highAvailabilityServices != null) {
-				highAvailabilityServices.closeAndCleanupAllData();
-			}
+	public void testKillYarnSessionClusterEntrypoint() throws Exception {
+		assumeTrue(
+			"This test kills processes via the pkill command. Thus, it only runs on Linux, Mac OS, Free BSD and Solaris.",
+			OperatingSystem.isLinux() || OperatingSystem.isMac() || OperatingSystem.isFreeBSD() || OperatingSystem.isSolaris());
+
+		final YarnClusterDescriptor yarnClusterDescriptor = setupYarnClusterDescriptor();
+
+		final RestClusterClient<ApplicationId> restClusterClient = deploySessionCluster(yarnClusterDescriptor);
+
+		final JobGraph job = createJobGraph();
+
+		final JobID jobId = submitJob(restClusterClient, job);
+
+		final ApplicationId id = restClusterClient.getClusterId();
+
+		waitUntilJobIsRunning(restClusterClient, jobId, RETRY_TIMEOUT);
+
+		killApplicationMaster(yarnClusterDescriptor.getYarnSessionClusterEntrypoint());
+
+		final YarnClient yarnClient = getYarnClient();
+		Assert.assertNotNull(yarnClient);
+
+		while (yarnClient.getApplicationReport(id).getCurrentApplicationAttemptId().getAttemptId() < 2) {
+			Thread.sleep(RETRY_TIMEOUT);
+		}
+
+		waitUntilJobIsRunning(restClusterClient, jobId, RETRY_TIMEOUT);
+
+		yarnClient.killApplication(id);
+
+		while (yarnClient.getApplications(EnumSet.of(YarnApplicationState.KILLED, YarnApplicationState.FINISHED)).isEmpty()) {
+			Thread.sleep(RETRY_TIMEOUT);
+		}
+	}
+
+	@Nonnull
+	private YarnClusterDescriptor setupYarnClusterDescriptor() {
+		final Configuration flinkConfiguration = new Configuration();
+		flinkConfiguration.setString(YarnConfigOptions.APPLICATION_ATTEMPTS, "10");
+		flinkConfiguration.setString(HighAvailabilityOptions.HA_MODE, "zookeeper");
+		flinkConfiguration.setString(HighAvailabilityOptions.HA_STORAGE_PATH, storageDir);
+		flinkConfiguration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkServer.getConnectString());
+		flinkConfiguration.setInteger(HighAvailabilityOptions.ZOOKEEPER_SESSION_TIMEOUT, 1000);
+
+		final int minMemory = 100;
+		flinkConfiguration.setInteger(ResourceManagerOptions.CONTAINERIZED_HEAP_CUTOFF_MIN, minMemory);
+
+		return createYarnClusterDescriptor(flinkConfiguration);
+	}
+
+	private RestClusterClient<ApplicationId> deploySessionCluster(YarnClusterDescriptor yarnClusterDescriptor) throws ClusterDeploymentException {
+		final int containerMemory = 256;
+		final ClusterClient<ApplicationId> yarnClusterClient = yarnClusterDescriptor.deploySessionCluster(
+			new ClusterSpecification.ClusterSpecificationBuilder()
+				.setMasterMemoryMB(containerMemory)
+				.setTaskManagerMemoryMB(containerMemory)
+				.setSlotsPerTaskManager(1)
+				.createClusterSpecification());
+
+		assertThat(yarnClusterClient, is(instanceOf(RestClusterClient.class)));
+		return (RestClusterClient<ApplicationId>) yarnClusterClient;
+	}
+
+	private JobID submitJob(RestClusterClient<ApplicationId> restClusterClient, JobGraph job) throws InterruptedException, java.util.concurrent.ExecutionException {
+		final CompletableFuture<JobSubmissionResult> jobSubmissionResultCompletableFuture = restClusterClient.submitJob(job);
+
+		final JobSubmissionResult jobSubmissionResult = jobSubmissionResultCompletableFuture.get();
+		return jobSubmissionResult.getJobID();
+	}
+
+	private void killApplicationMaster(final String processName) throws IOException, InterruptedException {
+		final Process exec = Runtime.getRuntime().exec("pkill -f " + processName);
+		assertThat(exec.waitFor(), is(0));
+	}
+
+	@Nonnull
+	private JobGraph createJobGraph() {
+		final JobGraph job = YarnTestJob.createJob();
+		final File testingJar =
+			YarnTestBase.findFile("..", new YarnTestUtils.TestJarFinder("flink-yarn-tests"));
+
+		assertThat(testingJar, notNullValue());
+
+		job.addJar(new org.apache.flink.core.fs.Path(testingJar.toURI()));
+		return job;
+	}
+
+	private void waitUntilJobIsRunning(RestClusterClient<ApplicationId> restClusterClient, JobID jobId, long retryTimeout) throws Exception {
+		waitUntilCondition(
+			() -> {
+				final Collection<JobStatusMessage> jobStatusMessages = restClusterClient.listJobs().get();
+
+				return jobStatusMessages.stream()
+					.filter(jobStatusMessage -> jobStatusMessage.getJobId().equals(jobId))
+					.anyMatch(jobStatusMessage -> jobStatusMessage.getJobState() == JobStatus.RUNNING);
+			},
+			Deadline.fromNow(TIMEOUT),
+			retryTimeout);
+	}
+
+	private void waitUntilCondition(SupplierWithException<Boolean, Exception> condition, Deadline timeout, long retryTimeout) throws Exception {
+		while (timeout.hasTimeLeft() && !condition.get()) {
+			Thread.sleep(Math.min(retryTimeout, timeout.timeLeft().toMillis()));
+		}
+
+		if (!timeout.hasTimeLeft()) {
+			throw new TimeoutException("Condition was not met in given timeout.");
 		}
 	}
 }
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
index 814e808..7f9ea7e 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNITCase.java
@@ -53,7 +53,7 @@ public class YARNITCase extends YarnTestBase {
 
 	@BeforeClass
 	public static void setup() {
-		YARN_CONFIGURATION.set(YarnTestBase.TEST_CLUSTER_NAME_KEY, "flink-yarn-tests-ha");
+		YARN_CONFIGURATION.set(YarnTestBase.TEST_CLUSTER_NAME_KEY, "flink-yarn-tests-per-job");
 		startYARNWithConfig(YARN_CONFIGURATION);
 	}
 
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index 3763f65..b7b08ae 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -32,6 +32,7 @@ import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.service.Service;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -55,6 +56,7 @@ import org.slf4j.LoggerFactory;
 import org.slf4j.Marker;
 import org.slf4j.MarkerFactory;
 
+import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
 import java.io.ByteArrayOutputStream;
@@ -70,6 +72,7 @@ import java.io.PrintStream;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -268,6 +271,19 @@ public abstract class YarnTestBase extends TestLogger {
 		return null;
 	}
 
+	@Nonnull
+	YarnClusterDescriptor createYarnClusterDescriptor(org.apache.flink.configuration.Configuration flinkConfiguration) {
+		final YarnClusterDescriptor yarnClusterDescriptor = new YarnClusterDescriptor(
+			flinkConfiguration,
+			YARN_CONFIGURATION,
+			CliFrontend.getConfigurationDirectoryFromEnv(),
+			yarnClient,
+			true);
+		yarnClusterDescriptor.setLocalJarPath(new Path(flinkUberjar.toURI()));
+		yarnClusterDescriptor.addShipFiles(Collections.singletonList(flinkLibFolder));
+		return yarnClusterDescriptor;
+	}
+
 	/**
 	 * Filter to find root dir of the flink-yarn dist.
 	 */
@@ -583,7 +599,7 @@ public abstract class YarnTestBase extends TestLogger {
 	 * Default @BeforeClass impl. Overwrite this for passing a different configuration
 	 */
 	@BeforeClass
-	public static void setup() {
+	public static void setup() throws Exception {
 		startYARNWithConfig(YARN_CONFIGURATION);
 	}
 
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/testjob/YarnTestJob.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/testjob/YarnTestJob.java
new file mode 100644
index 0000000..f86d6e4
--- /dev/null
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/testjob/YarnTestJob.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.testjob;
+
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
+
+/**
+ * Testing job for {@link org.apache.flink.runtime.jobmaster.JobMaster} failover.
+ * Covering stream case that have a infinite source and a sink, scheduling by
+ * EAGER mode, with PIPELINED edges.
+ */
+public class YarnTestJob {
+
+	public static JobGraph createJob() {
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		env.addSource(new InfiniteSourceFunction())
+			.setParallelism(2)
+			.shuffle()
+			.addSink(new DiscardingSink<>())
+			.setParallelism(2);
+
+		return env.getStreamGraph().getJobGraph();
+	}
+
+	// *************************************************************************
+	//     USER FUNCTIONS
+	// *************************************************************************
+
+	private static final class InfiniteSourceFunction extends RichParallelSourceFunction<Integer> {
+		private static final long serialVersionUID = -8758033916372648233L;
+		private boolean running;
+
+		InfiniteSourceFunction() {
+			running = true;
+		}
+
+		@Override
+		public void run(SourceContext<Integer> ctx) throws Exception {
+			while (running) {
+				synchronized (ctx.getCheckpointLock()) {
+					ctx.collect(0);
+				}
+
+				Thread.sleep(5L);
+			}
+		}
+
+		@Override
+		public void cancel() {
+			running = false;
+		}
+	}
+}