You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2018/03/23 18:12:48 UTC

[1/7] flink git commit: [FLINK-8956][tests] Port RescalingITCase to flip6

Repository: flink
Updated Branches:
  refs/heads/release-1.5 c94b88aa4 -> 81d809a5f


[FLINK-8956][tests] Port RescalingITCase to flip6

This closes #5715.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0acc1e29
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0acc1e29
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0acc1e29

Branch: refs/heads/release-1.5
Commit: 0acc1e299fe34f9562c52e062d9759b7afe46dcc
Parents: c94b88a
Author: zentol <ch...@apache.org>
Authored: Mon Mar 19 11:36:39 2018 +0100
Committer: zentol <ch...@apache.org>
Committed: Fri Mar 23 19:12:28 2018 +0100

----------------------------------------------------------------------
 .../test/checkpointing/RescalingITCase.java     | 282 +++++++------------
 1 file changed, 97 insertions(+), 185 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0acc1e29/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
index a23c679..e4f4389 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
@@ -25,23 +25,24 @@ import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
 import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.common.typeutils.base.IntSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.CheckpointingOptions;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.client.JobStatusMessage;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
-import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
-import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
-import org.apache.flink.runtime.testingUtils.TestingCluster;
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import org.apache.flink.streaming.api.datastream.DataStream;
@@ -50,7 +51,9 @@ import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.test.util.MiniClusterResource;
 import org.apache.flink.util.Collector;
+import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.AfterClass;
@@ -62,25 +65,20 @@ import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 import java.io.File;
+import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import scala.Option;
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.Deadline;
-import scala.concurrent.duration.FiniteDuration;
+import java.util.stream.Collectors;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
 
 /**
  * Test savepoint rescaling.
@@ -106,7 +104,7 @@ public class RescalingITCase extends TestLogger {
 		NON_PARTITIONED, CHECKPOINTED_FUNCTION, CHECKPOINTED_FUNCTION_BROADCAST, LIST_CHECKPOINTED
 	}
 
-	private static TestingCluster cluster;
+	private static MiniClusterResource cluster;
 
 	@ClassRule
 	public static TemporaryFolder temporaryFolder = new TemporaryFolder();
@@ -120,8 +118,6 @@ public class RescalingITCase extends TestLogger {
 			currentBackend = backend;
 
 			Configuration config = new Configuration();
-			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, numTaskManagers);
-			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, slotsPerTaskManager);
 
 			final File checkpointDir = temporaryFolder.newFolder();
 			final File savepointDir = temporaryFolder.newFolder();
@@ -130,15 +126,20 @@ public class RescalingITCase extends TestLogger {
 			config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir.toURI().toString());
 			config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir.toURI().toString());
 
-			cluster = new TestingCluster(config);
-			cluster.start();
+			cluster = new MiniClusterResource(
+				new MiniClusterResource.MiniClusterResourceConfiguration(
+					config,
+					numTaskManagers,
+					slotsPerTaskManager),
+				true);
+			cluster.before();
 		}
 	}
 
 	@AfterClass
 	public static void shutDownExistingCluster() {
 		if (cluster != null) {
-			cluster.stop();
+			cluster.after();
 			cluster = null;
 		}
 	}
@@ -175,20 +176,18 @@ public class RescalingITCase extends TestLogger {
 		final int parallelism2 = scaleOut ? numSlots : numSlots / 2;
 		final int maxParallelism = 13;
 
-		FiniteDuration timeout = new FiniteDuration(3, TimeUnit.MINUTES);
-		Deadline deadline = timeout.fromNow();
+		Duration timeout = Duration.ofMinutes(3);
+		Deadline deadline = Deadline.now().plus(timeout);
 
-		ActorGateway jobManager = null;
-		JobID jobID = null;
+		ClusterClient<?> client = cluster.getClusterClient();
 
 		try {
-			jobManager = cluster.getLeaderGateway(deadline.timeLeft());
-
 			JobGraph jobGraph = createJobGraphWithKeyedState(parallelism, maxParallelism, numberKeys, numberElements, false, 100);
 
-			jobID = jobGraph.getJobID();
+			final JobID jobID = jobGraph.getJobID();
 
-			cluster.submitJobDetached(jobGraph);
+			client.setDetached(true);
+			client.submitJob(jobGraph, RescalingITCase.class.getClassLoader());
 
 			// wait til the sources have emitted numberElements for each key and completed a checkpoint
 			SubtaskIndexFlatMapper.workCompletedLatch.await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
@@ -210,22 +209,15 @@ public class RescalingITCase extends TestLogger {
 			// clear the CollectionSink set for the restarted job
 			CollectionSink.clearElementsSet();
 
-			Future<Object> savepointPathFuture = jobManager.ask(new JobManagerMessages.TriggerSavepoint(jobID, Option.<String>empty()), deadline.timeLeft());
-
-			final String savepointPath = ((JobManagerMessages.TriggerSavepointSuccess)
-					Await.result(savepointPathFuture, deadline.timeLeft())).savepointPath();
-
-			Future<Object> jobRemovedFuture = jobManager.ask(new TestingJobManagerMessages.NotifyWhenJobRemoved(jobID), deadline.timeLeft());
+			CompletableFuture<String> savepointPathFuture = client.triggerSavepoint(jobID, null);
 
-			Future<Object> cancellationResponseFuture = jobManager.ask(new JobManagerMessages.CancelJob(jobID), deadline.timeLeft());
+			final String savepointPath = savepointPathFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
 
-			Object cancellationResponse = Await.result(cancellationResponseFuture, deadline.timeLeft());
+			client.cancel(jobID);
 
-			assertTrue(cancellationResponse instanceof JobManagerMessages.CancellationSuccess);
-
-			Await.ready(jobRemovedFuture, deadline.timeLeft());
-
-			jobID = null;
+			while (!getRunningJobs(client).isEmpty()) {
+				Thread.sleep(50);
+			}
 
 			int restoreMaxParallelism = deriveMaxParallelism ? ExecutionJobVertex.VALUE_NOT_SET : maxParallelism;
 
@@ -233,11 +225,8 @@ public class RescalingITCase extends TestLogger {
 
 			scaledJobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
 
-			jobID = scaledJobGraph.getJobID();
-
-			cluster.submitJobAndWait(scaledJobGraph, false);
-
-			jobID = null;
+			client.setDetached(false);
+			client.submitJob(scaledJobGraph, RescalingITCase.class.getClassLoader());
 
 			Set<Tuple2<Integer, Integer>> actualResult2 = CollectionSink.getElementsSet();
 
@@ -253,17 +242,6 @@ public class RescalingITCase extends TestLogger {
 		} finally {
 			// clear the CollectionSink set for the restarted job
 			CollectionSink.clearElementsSet();
-
-			// clear any left overs from a possibly failed job
-			if (jobID != null && jobManager != null) {
-				Future<Object> jobRemovedFuture = jobManager.ask(new TestingJobManagerMessages.NotifyWhenJobRemoved(jobID), timeout);
-
-				try {
-					Await.ready(jobRemovedFuture, timeout);
-				} catch (TimeoutException | InterruptedException ie) {
-					fail("Failed while cleaning up the cluster.");
-				}
-			}
 		}
 	}
 
@@ -279,57 +257,39 @@ public class RescalingITCase extends TestLogger {
 		final int parallelism2 = numSlots;
 		final int maxParallelism = 13;
 
-		FiniteDuration timeout = new FiniteDuration(3, TimeUnit.MINUTES);
-		Deadline deadline = timeout.fromNow();
+		Duration timeout = Duration.ofMinutes(3);
+		Deadline deadline = Deadline.now().plus(timeout);
 
-		JobID jobID = null;
-		ActorGateway jobManager = null;
+		ClusterClient<?> client = cluster.getClusterClient();
 
 		try {
-			jobManager = cluster.getLeaderGateway(deadline.timeLeft());
-
 			JobGraph jobGraph = createJobGraphWithOperatorState(parallelism, maxParallelism, OperatorCheckpointMethod.NON_PARTITIONED);
 
-			jobID = jobGraph.getJobID();
-
-			cluster.submitJobDetached(jobGraph);
+			final JobID jobID = jobGraph.getJobID();
 
-			Object savepointResponse = null;
+			client.setDetached(true);
+			client.submitJob(jobGraph, RescalingITCase.class.getClassLoader());
 
 			// wait until the operator is started
 			StateSourceBase.workStartedLatch.await();
 
-			Future<Object> savepointPathFuture = jobManager.ask(new JobManagerMessages.TriggerSavepoint(jobID, Option.<String>empty()), deadline.timeLeft());
-			FiniteDuration waitingTime = new FiniteDuration(10, TimeUnit.SECONDS);
-			savepointResponse = Await.result(savepointPathFuture, waitingTime);
+			CompletableFuture<String> savepointPathFuture = client.triggerSavepoint(jobID, null);
 
-			assertTrue(String.valueOf(savepointResponse), savepointResponse instanceof JobManagerMessages.TriggerSavepointSuccess);
+			final String savepointPath = savepointPathFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
 
-			final String savepointPath = ((JobManagerMessages.TriggerSavepointSuccess) savepointResponse).savepointPath();
+			client.cancel(jobID);
 
-			Future<Object> jobRemovedFuture = jobManager.ask(new TestingJobManagerMessages.NotifyWhenJobRemoved(jobID), deadline.timeLeft());
-
-			Future<Object> cancellationResponseFuture = jobManager.ask(new JobManagerMessages.CancelJob(jobID), deadline.timeLeft());
-
-			Object cancellationResponse = Await.result(cancellationResponseFuture, deadline.timeLeft());
-
-			assertTrue(cancellationResponse instanceof JobManagerMessages.CancellationSuccess);
-
-			Await.ready(jobRemovedFuture, deadline.timeLeft());
+			while (!getRunningJobs(client).isEmpty()) {
+				Thread.sleep(50);
+			}
 
 			// job successfully removed
-			jobID = null;
-
 			JobGraph scaledJobGraph = createJobGraphWithOperatorState(parallelism2, maxParallelism, OperatorCheckpointMethod.NON_PARTITIONED);
 
 			scaledJobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
 
-			jobID = scaledJobGraph.getJobID();
-
-			cluster.submitJobAndWait(scaledJobGraph, false);
-
-			jobID = null;
-
+			client.setDetached(false);
+			client.submitJob(scaledJobGraph, RescalingITCase.class.getClassLoader());
 		} catch (JobExecutionException exception) {
 			if (exception.getCause() instanceof IllegalStateException) {
 				// we expect a IllegalStateException wrapped
@@ -338,17 +298,6 @@ public class RescalingITCase extends TestLogger {
 			} else {
 				throw exception;
 			}
-		} finally {
-			// clear any left overs from a possibly failed job
-			if (jobID != null && jobManager != null) {
-				Future<Object> jobRemovedFuture = jobManager.ask(new TestingJobManagerMessages.NotifyWhenJobRemoved(jobID), timeout);
-
-				try {
-					Await.ready(jobRemovedFuture, timeout);
-				} catch (TimeoutException | InterruptedException ie) {
-					fail("Failed while cleaning up the cluster.");
-				}
-			}
 		}
 	}
 
@@ -367,14 +316,12 @@ public class RescalingITCase extends TestLogger {
 		int parallelism2 = numSlots;
 		int maxParallelism = 13;
 
-		FiniteDuration timeout = new FiniteDuration(3, TimeUnit.MINUTES);
-		Deadline deadline = timeout.fromNow();
+		Duration timeout = Duration.ofMinutes(3);
+		Deadline deadline = Deadline.now().plus(timeout);
 
-		ActorGateway jobManager = null;
-		JobID jobID = null;
+		ClusterClient<?> client = cluster.getClusterClient();
 
 		try {
-			jobManager = cluster.getLeaderGateway(deadline.timeLeft());
 
 			JobGraph jobGraph = createJobGraphWithKeyedAndNonPartitionedOperatorState(
 					parallelism,
@@ -385,9 +332,10 @@ public class RescalingITCase extends TestLogger {
 					false,
 					100);
 
-			jobID = jobGraph.getJobID();
+			final JobID jobID = jobGraph.getJobID();
 
-			cluster.submitJobDetached(jobGraph);
+			client.setDetached(true);
+			client.submitJob(jobGraph, RescalingITCase.class.getClassLoader());
 
 			// wait til the sources have emitted numberElements for each key and completed a checkpoint
 			SubtaskIndexFlatMapper.workCompletedLatch.await(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
@@ -409,22 +357,15 @@ public class RescalingITCase extends TestLogger {
 			// clear the CollectionSink set for the restarted job
 			CollectionSink.clearElementsSet();
 
-			Future<Object> savepointPathFuture = jobManager.ask(new JobManagerMessages.TriggerSavepoint(jobID, Option.<String>empty()), deadline.timeLeft());
-
-			final String savepointPath = ((JobManagerMessages.TriggerSavepointSuccess)
-					Await.result(savepointPathFuture, deadline.timeLeft())).savepointPath();
-
-			Future<Object> jobRemovedFuture = jobManager.ask(new TestingJobManagerMessages.NotifyWhenJobRemoved(jobID), deadline.timeLeft());
-
-			Future<Object> cancellationResponseFuture = jobManager.ask(new JobManagerMessages.CancelJob(jobID), deadline.timeLeft());
+			CompletableFuture<String> savepointPathFuture = client.triggerSavepoint(jobID, null);
 
-			Object cancellationResponse = Await.result(cancellationResponseFuture, deadline.timeLeft());
+			final String savepointPath = savepointPathFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
 
-			assertTrue(cancellationResponse instanceof JobManagerMessages.CancellationSuccess);
+			client.cancel(jobID);
 
-			Await.ready(jobRemovedFuture, deadline.timeLeft());
-
-			jobID = null;
+			while (!getRunningJobs(client).isEmpty()) {
+				Thread.sleep(50);
+			}
 
 			JobGraph scaledJobGraph = createJobGraphWithKeyedAndNonPartitionedOperatorState(
 					parallelism2,
@@ -437,11 +378,8 @@ public class RescalingITCase extends TestLogger {
 
 			scaledJobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
 
-			jobID = scaledJobGraph.getJobID();
-
-			cluster.submitJobAndWait(scaledJobGraph, false);
-
-			jobID = null;
+			client.setDetached(false);
+			client.submitJob(scaledJobGraph, RescalingITCase.class.getClassLoader());
 
 			Set<Tuple2<Integer, Integer>> actualResult2 = CollectionSink.getElementsSet();
 
@@ -457,17 +395,6 @@ public class RescalingITCase extends TestLogger {
 		} finally {
 			// clear the CollectionSink set for the restarted job
 			CollectionSink.clearElementsSet();
-
-			// clear any left overs from a possibly failed job
-			if (jobID != null && jobManager != null) {
-				Future<Object> jobRemovedFuture = jobManager.ask(new TestingJobManagerMessages.NotifyWhenJobRemoved(jobID), timeout);
-
-				try {
-					Await.ready(jobRemovedFuture, timeout);
-				} catch (TimeoutException | InterruptedException ie) {
-					fail("Failed while cleaning up the cluster.");
-				}
-			}
 		}
 	}
 
@@ -510,11 +437,10 @@ public class RescalingITCase extends TestLogger {
 		final int parallelism2 = scaleOut ? numSlots / 2 : numSlots;
 		final int maxParallelism = 13;
 
-		FiniteDuration timeout = new FiniteDuration(3, TimeUnit.MINUTES);
-		Deadline deadline = timeout.fromNow();
+		Duration timeout = Duration.ofMinutes(3);
+		Deadline deadline = Deadline.now().plus(timeout);
 
-		JobID jobID = null;
-		ActorGateway jobManager = null;
+		ClusterClient<?> client = cluster.getClusterClient();
 
 		int counterSize = Math.max(parallelism, parallelism2);
 
@@ -528,54 +454,44 @@ public class RescalingITCase extends TestLogger {
 		}
 
 		try {
-			jobManager = cluster.getLeaderGateway(deadline.timeLeft());
-
 			JobGraph jobGraph = createJobGraphWithOperatorState(parallelism, maxParallelism, checkpointMethod);
 
-			jobID = jobGraph.getJobID();
-
-			cluster.submitJobDetached(jobGraph);
+			final JobID jobID = jobGraph.getJobID();
 
-			Object savepointResponse = null;
+			client.setDetached(true);
+			client.submitJob(jobGraph, RescalingITCase.class.getClassLoader());
 
 			// wait until the operator is started
 			StateSourceBase.workStartedLatch.await();
 
-			while (deadline.hasTimeLeft()) {
-
-				Future<Object> savepointPathFuture = jobManager.ask(new JobManagerMessages.TriggerSavepoint(jobID, Option.<String>empty()), deadline.timeLeft());
-				FiniteDuration waitingTime = new FiniteDuration(10, TimeUnit.SECONDS);
-				savepointResponse = Await.result(savepointPathFuture, waitingTime);
-
-				if (savepointResponse instanceof JobManagerMessages.TriggerSavepointSuccess) {
-					break;
-				}
-			}
-
-			assertTrue(savepointResponse instanceof JobManagerMessages.TriggerSavepointSuccess);
-
-			final String savepointPath = ((JobManagerMessages.TriggerSavepointSuccess) savepointResponse).savepointPath();
-
-			Future<Object> jobRemovedFuture = jobManager.ask(new TestingJobManagerMessages.NotifyWhenJobRemoved(jobID), deadline.timeLeft());
-
-			Future<Object> cancellationResponseFuture = jobManager.ask(new JobManagerMessages.CancelJob(jobID), deadline.timeLeft());
-
-			Object cancellationResponse = Await.result(cancellationResponseFuture, deadline.timeLeft());
+			CompletableFuture<String> savepointPathFuture = FutureUtils.retryWithDelay(
+				() -> {
+					try {
+						return client.triggerSavepoint(jobID, null);
+					} catch (FlinkException e) {
+						return FutureUtils.completedExceptionally(e);
+					}
+				},
+				(int) deadline.timeLeft().getSeconds() / 10,
+				Time.seconds(10),
+				(throwable) -> true,
+				TestingUtils.defaultScheduledExecutor()
+			);
 
-			assertTrue(cancellationResponse instanceof JobManagerMessages.CancellationSuccess);
+			final String savepointPath = savepointPathFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
 
-			Await.ready(jobRemovedFuture, deadline.timeLeft());
+			client.cancel(jobID);
 
-			// job successfully removed
-			jobID = null;
+			while (!getRunningJobs(client).isEmpty()) {
+				Thread.sleep(50);
+			}
 
 			JobGraph scaledJobGraph = createJobGraphWithOperatorState(parallelism2, maxParallelism, checkpointMethod);
 
 			scaledJobGraph.setSavepointRestoreSettings(SavepointRestoreSettings.forPath(savepointPath));
 
-			jobID = scaledJobGraph.getJobID();
-
-			cluster.submitJobAndWait(scaledJobGraph, false);
+			client.setDetached(false);
+			client.submitJob(scaledJobGraph, RescalingITCase.class.getClassLoader());
 
 			int sumExp = 0;
 			int sumAct = 0;
@@ -609,19 +525,7 @@ public class RescalingITCase extends TestLogger {
 			}
 
 			assertEquals(sumExp, sumAct);
-			jobID = null;
-
 		} finally {
-			// clear any left overs from a possibly failed job
-			if (jobID != null && jobManager != null) {
-				Future<Object> jobRemovedFuture = jobManager.ask(new TestingJobManagerMessages.NotifyWhenJobRemoved(jobID), timeout);
-
-				try {
-					Await.ready(jobRemovedFuture, timeout);
-				} catch (TimeoutException | InterruptedException ie) {
-					fail("Failed while cleaning up the cluster.");
-				}
-			}
 		}
 	}
 
@@ -1028,4 +932,12 @@ public class RescalingITCase extends TestLogger {
 			}
 		}
 	}
+
+	private static List<JobID> getRunningJobs(ClusterClient<?> client) throws Exception {
+		Collection<JobStatusMessage> statusMessages = client.listJobs().get();
+		return statusMessages.stream()
+			.filter(status -> !status.getJobState().isGloballyTerminalState())
+			.map(JobStatusMessage::getJobId)
+			.collect(Collectors.toList());
+	}
 }


[4/7] flink git commit: [hotfix][utils] Add ExceptionUtils#findThrowable with predicate

Posted by ch...@apache.org.
[hotfix][utils] Add ExceptionUtils#findThrowable with predicate


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/555e5e8c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/555e5e8c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/555e5e8c

Branch: refs/heads/release-1.5
Commit: 555e5e8cd0a7b08457f65364ae62d13eb593fb44
Parents: 4ebbbe7
Author: zentol <ch...@apache.org>
Authored: Tue Mar 20 11:38:24 2018 +0100
Committer: zentol <ch...@apache.org>
Committed: Fri Mar 23 19:12:29 2018 +0100

----------------------------------------------------------------------
 .../org/apache/flink/util/ExceptionUtils.java   | 25 ++++++++++++++++++++
 1 file changed, 25 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/555e5e8c/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
index 6af16fc..459648f 100644
--- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
+++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java
@@ -35,6 +35,7 @@ import java.io.StringWriter;
 import java.util.Optional;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ExecutionException;
+import java.util.function.Predicate;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -326,6 +327,30 @@ public final class ExceptionUtils {
 	}
 
 	/**
+	 * Checks whether a throwable chain contains an exception matching a predicate and returns it.
+	 *
+	 * @param throwable the throwable chain to check.
+	 * @param predicate the predicate of the exception to search for in the chain.
+	 * @return Optional throwable of the requested type if available, otherwise empty
+	 */
+	public static Optional<Throwable> findThrowable(Throwable throwable, Predicate<Throwable> predicate) {
+		if (throwable == null || predicate == null) {
+			return Optional.empty();
+		}
+
+		Throwable t = throwable;
+		while (t != null) {
+			if (predicate.test(t)) {
+				return Optional.of(t);
+			} else {
+				t = t.getCause();
+			}
+		}
+
+		return Optional.empty();
+	}
+
+	/**
 	 * Checks whether a throwable chain contains a specific error message and returns the corresponding throwable.
 	 *
 	 * @param throwable the throwable chain to check.


[2/7] flink git commit: [FLINK-8965][tests] Port TimestampITCase to flip6

Posted by ch...@apache.org.
[FLINK-8965][tests] Port TimestampITCase to flip6

This closes #5728.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/81d809a5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/81d809a5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/81d809a5

Branch: refs/heads/release-1.5
Commit: 81d809a5f0030b14e0b7128d298cd9904e474ebd
Parents: f0bd7b6
Author: zentol <ch...@apache.org>
Authored: Mon Feb 26 17:19:15 2018 +0100
Committer: zentol <ch...@apache.org>
Committed: Fri Mar 23 19:12:29 2018 +0100

----------------------------------------------------------------------
 .../test/streaming/runtime/TimestampITCase.java | 73 ++++++++++----------
 1 file changed, 38 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/81d809a5/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
index 5e08e8a..3b46c82 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
@@ -24,11 +24,11 @@ import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.StoppableFunction;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.core.testutils.MultiShotLatch;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
+import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -45,17 +45,18 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.apache.flink.test.util.MiniClusterResource;
 import org.apache.flink.util.TestLogger;
 
-import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
-import org.junit.BeforeClass;
+import org.junit.ClassRule;
 import org.junit.Test;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
+import java.util.stream.Collectors;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
@@ -75,34 +76,24 @@ public class TimestampITCase extends TestLogger {
 	// this is used in some tests to synchronize
 	static MultiShotLatch latch;
 
-	private static LocalFlinkMiniCluster cluster;
+	@ClassRule
+	public static final MiniClusterResource CLUSTER = new MiniClusterResource(
+		new MiniClusterResource.MiniClusterResourceConfiguration(
+			getConfiguration(),
+			NUM_TASK_MANAGERS,
+			NUM_TASK_SLOTS),
+		true);
 
-	@Before
-	public void setupLatch() {
-		// ensure that we get a fresh latch for each test
-		latch = new MultiShotLatch();
-	}
-
-	@BeforeClass
-	public static void startCluster() {
+	private static Configuration getConfiguration() {
 		Configuration config = new Configuration();
-		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TASK_MANAGERS);
-		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_TASK_SLOTS);
 		config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 12L);
-
-		cluster = new LocalFlinkMiniCluster(config, false);
-
-		cluster.start();
-
-		TestStreamEnvironment.setAsContext(cluster, PARALLELISM);
+		return config;
 	}
 
-	@AfterClass
-	public static void shutdownCluster() {
-		cluster.stop();
-		cluster = null;
-
-		TestStreamEnvironment.unsetAsContext();
+	@Before
+	public void setupLatch() {
+		// ensure that we get a fresh latch for each test
+		latch = new MultiShotLatch();
 	}
 
 	/**
@@ -162,7 +153,8 @@ public class TimestampITCase extends TestLogger {
 	public void testWatermarkPropagationNoFinalWatermarkOnStop() throws Exception {
 
 		// for this test to work, we need to be sure that no other jobs are being executed
-		while (!cluster.getCurrentlyRunningJobsJava().isEmpty()) {
+		final ClusterClient<?> clusterClient = CLUSTER.getClusterClient();
+		while (!getRunningJobs(clusterClient).isEmpty()) {
 			Thread.sleep(100);
 		}
 
@@ -185,14 +177,15 @@ public class TimestampITCase extends TestLogger {
 				.transform("Custom Operator", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(true))
 				.addSink(new DiscardingSink<Integer>());
 
-		new Thread("stopper") {
+		Thread t = new Thread("stopper") {
 			@Override
 			public void run() {
 				try {
 					// try until we get the running jobs
-					List<JobID> running;
-					while ((running = cluster.getCurrentlyRunningJobsJava()).isEmpty()) {
+					List<JobID> running = getRunningJobs(clusterClient);
+					while (running.isEmpty()) {
 						Thread.sleep(10);
+						running = getRunningJobs(clusterClient);
 					}
 
 					JobID id = running.get(0);
@@ -200,7 +193,7 @@ public class TimestampITCase extends TestLogger {
 					// send stop until the job is stopped
 					do {
 						try {
-							cluster.stopJob(id);
+							clusterClient.stop(id);
 						}
 						catch (Exception e) {
 							if (e.getCause() instanceof IllegalStateException) {
@@ -214,13 +207,14 @@ public class TimestampITCase extends TestLogger {
 						}
 						Thread.sleep(10);
 					}
-					while (!cluster.getCurrentlyRunningJobsJava().isEmpty());
+					while (!getRunningJobs(clusterClient).isEmpty());
 				}
 				catch (Throwable t) {
 					t.printStackTrace();
 				}
 			}
-		}.start();
+		};
+		t.start();
 
 		env.execute();
 
@@ -246,6 +240,7 @@ public class TimestampITCase extends TestLogger {
 						subtaskWatermarks.get(subtaskWatermarks.size() - 1));
 			}
 		}
+		t.join();
 	}
 
 	/**
@@ -855,4 +850,12 @@ public class TimestampITCase extends TestLogger {
 		@Override
 		public void cancel() {}
 	}
+
+	private static List<JobID> getRunningJobs(ClusterClient<?> client) throws Exception {
+		Collection<JobStatusMessage> statusMessages = client.listJobs().get();
+		return statusMessages.stream()
+			.filter(status -> !status.getJobState().isGloballyTerminalState())
+			.map(JobStatusMessage::getJobId)
+			.collect(Collectors.toList());
+	}
 }


[6/7] flink git commit: [FLINK-8964][tests] Port JobSubmissionFailsITCase to flip6

Posted by ch...@apache.org.
[FLINK-8964][tests] Port JobSubmissionFailsITCase to flip6

This closes #5727.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f0bd7b67
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f0bd7b67
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f0bd7b67

Branch: refs/heads/release-1.5
Commit: f0bd7b67aa348597cda9c4d3cf920ffd5a320896
Parents: 555e5e8
Author: zentol <ch...@apache.org>
Authored: Tue Mar 20 11:40:48 2018 +0100
Committer: zentol <ch...@apache.org>
Committed: Fri Mar 23 19:12:29 2018 +0100

----------------------------------------------------------------------
 .../failing/JobSubmissionFailsITCase.java       | 169 ++++++-------------
 1 file changed, 55 insertions(+), 114 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/f0bd7b67/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java
index a647af9..ecd16a1 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/example/failing/JobSubmissionFailsITCase.java
@@ -19,30 +19,25 @@
 
 package org.apache.flink.test.example.failing;
 
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
-import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.runtime.client.JobSubmissionException;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
-import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
 
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Optional;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 /**
@@ -51,47 +46,32 @@ import static org.junit.Assert.fail;
 @RunWith(Parameterized.class)
 public class JobSubmissionFailsITCase extends TestLogger {
 
+	private static final int NUM_TM = 2;
 	private static final int NUM_SLOTS = 20;
 
-	private static LocalFlinkMiniCluster cluster;
-	private static JobGraph workingJobGraph;
-
-	@BeforeClass
-	public static void setup() {
-		try {
-			Configuration config = new Configuration();
-			config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
-			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
-			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_SLOTS / 2);
-
-			cluster = new LocalFlinkMiniCluster(config);
-
-			cluster.start();
-
-			final JobVertex jobVertex = new JobVertex("Working job vertex.");
-			jobVertex.setInvokableClass(NoOpInvokable.class);
-			workingJobGraph = new JobGraph("Working testing job", jobVertex);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
+	@ClassRule
+	public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
+		new MiniClusterResource.MiniClusterResourceConfiguration(
+			getConfiguration(),
+			NUM_TM,
+			NUM_SLOTS / NUM_TM),
+		true);
+
+	private static Configuration getConfiguration() {
+		Configuration config = new Configuration();
+		config.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, 4L);
+		return config;
 	}
 
-	@AfterClass
-	public static void teardown() {
-		try {
-			cluster.stop();
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
+	private static JobGraph getWorkingJobGraph() {
+		final JobVertex jobVertex = new JobVertex("Working job vertex.");
+		jobVertex.setInvokableClass(NoOpInvokable.class);
+		return new JobGraph("Working testing job", jobVertex);
 	}
 
 	// --------------------------------------------------------------------------------------------
 
-	private boolean detached;
+	private final boolean detached;
 
 	public JobSubmissionFailsITCase(boolean detached) {
 		this.detached = detached;
@@ -105,90 +85,51 @@ public class JobSubmissionFailsITCase extends TestLogger {
 
 	// --------------------------------------------------------------------------------------------
 
-	private JobExecutionResult submitJob(JobGraph jobGraph) throws Exception {
-		if (detached) {
-			cluster.submitJobDetached(jobGraph);
-			return null;
-		}
-		else {
-			return cluster.submitJobAndWait(jobGraph, false, TestingUtils.TESTING_DURATION());
-		}
-	}
-
 	@Test
-	public void testExceptionInInitializeOnMaster() {
-		try {
-			final JobVertex failingJobVertex = new FailingJobVertex("Failing job vertex");
-			failingJobVertex.setInvokableClass(NoOpInvokable.class);
-
-			final JobGraph failingJobGraph = new JobGraph("Failing testing job", failingJobVertex);
+	public void testExceptionInInitializeOnMaster() throws Exception {
+		final JobVertex failingJobVertex = new FailingJobVertex("Failing job vertex");
+		failingJobVertex.setInvokableClass(NoOpInvokable.class);
 
-			try {
-				submitJob(failingJobGraph);
-				fail("Expected JobExecutionException.");
-			}
-			catch (JobExecutionException e) {
-				assertEquals("Test exception.", e.getCause().getMessage());
-			}
-			catch (Throwable t) {
-				t.printStackTrace();
-				fail("Caught wrong exception of type " + t.getClass() + ".");
-			}
+		final JobGraph failingJobGraph = new JobGraph("Failing testing job", failingJobVertex);
 
-			cluster.submitJobAndWait(workingJobGraph, false);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
+		ClusterClient<?> client = MINI_CLUSTER_RESOURCE.getClusterClient();
+		client.setDetached(detached);
 
-	@Test
-	public void testSubmitEmptyJobGraph() {
 		try {
-			final JobGraph jobGraph = new JobGraph("Testing job");
-
-			try {
-				submitJob(jobGraph);
-				fail("Expected JobSubmissionException.");
-			}
-			catch (JobSubmissionException e) {
-				assertTrue(e.getMessage() != null && e.getMessage().contains("empty"));
+			client.submitJob(failingJobGraph, JobSubmissionFailsITCase.class.getClassLoader());
+			fail("Job submission should have thrown an exception.");
+		} catch (Exception e) {
+			Optional<Throwable> expectedCause = ExceptionUtils.findThrowable(e,
+				candidate -> "Test exception.".equals(candidate.getMessage()));
+			if (!expectedCause.isPresent()) {
+				throw e;
 			}
-			catch (Throwable t) {
-				t.printStackTrace();
-				fail("Caught wrong exception of type " + t.getClass() + ".");
-			}
-
-			cluster.submitJobAndWait(workingJobGraph, false);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
 		}
+
+		client.setDetached(false);
+		client.submitJob(getWorkingJobGraph(), JobSubmissionFailsITCase.class.getClassLoader());
 	}
 
 	@Test
-	public void testSubmitNullJobGraph() {
+	public void testSubmitEmptyJobGraph() throws Exception {
+		final JobGraph jobGraph = new JobGraph("Testing job");
+
+		ClusterClient<?> client = MINI_CLUSTER_RESOURCE.getClusterClient();
+		client.setDetached(detached);
+
 		try {
-			try {
-				submitJob(null);
-				fail("Expected JobSubmissionException.");
-			}
-			catch (NullPointerException e) {
-				// yo!
+			client.submitJob(jobGraph, JobSubmissionFailsITCase.class.getClassLoader());
+			fail("Job submission should have thrown an exception.");
+		} catch (Exception e) {
+			Optional<Throwable> expectedCause = ExceptionUtils.findThrowable(e,
+				throwable -> throwable.getMessage() != null && throwable.getMessage().contains("empty"));
+			if (!expectedCause.isPresent()) {
+				throw e;
 			}
-			catch (Throwable t) {
-				t.printStackTrace();
-				fail("Caught wrong exception of type " + t.getClass() + ".");
-			}
-
-			cluster.submitJobAndWait(workingJobGraph, false);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
 		}
+
+		client.setDetached(false);
+		client.submitJob(getWorkingJobGraph(), JobSubmissionFailsITCase.class.getClassLoader());
 	}
 
 	// --------------------------------------------------------------------------------------------


[7/7] flink git commit: [FLINK-8958][tests] Port TaskCancelAsyncProducerConsumerITCase to flip6

Posted by ch...@apache.org.
[FLINK-8958][tests] Port TaskCancelAsyncProducerConsumerITCase to flip6

This closes #5722.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4ebbbe7d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4ebbbe7d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4ebbbe7d

Branch: refs/heads/release-1.5
Commit: 4ebbbe7dc4bbf1da66449acba0b5af2feb409de6
Parents: ecdeb35
Author: zentol <ch...@apache.org>
Authored: Mon Mar 19 15:16:18 2018 +0100
Committer: zentol <ch...@apache.org>
Committed: Fri Mar 23 19:12:29 2018 +0100

----------------------------------------------------------------------
 ...cyTaskCancelAsyncProducerConsumerITCase.java | 287 +++++++++++++++++++
 .../TaskCancelAsyncProducerConsumerITCase.java  |  82 +++---
 2 files changed, 329 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4ebbbe7d/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/LegacyTaskCancelAsyncProducerConsumerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/LegacyTaskCancelAsyncProducerConsumerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/LegacyTaskCancelAsyncProducerConsumerITCase.java
new file mode 100644
index 0000000..ee0bfda
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/LegacyTaskCancelAsyncProducerConsumerITCase.java
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.execution.Environment;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobStatus;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunning;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.Deadline;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.apache.flink.runtime.io.network.buffer.LocalBufferPoolDestroyTest.isInBlockingBufferRequest;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+public class LegacyTaskCancelAsyncProducerConsumerITCase extends TestLogger {
+
+	// The Exceptions thrown by the producer/consumer Threads
+	private static volatile Exception ASYNC_PRODUCER_EXCEPTION;
+	private static volatile Exception ASYNC_CONSUMER_EXCEPTION;
+
+	// The Threads producing/consuming the intermediate stream
+	private static volatile Thread ASYNC_PRODUCER_THREAD;
+	private static volatile Thread ASYNC_CONSUMER_THREAD;
+
+	/**
+	 * Tests that a task waiting on an async producer/consumer that is stuck
+	 * in a blocking buffer request can be properly cancelled.
+	 *
+	 * <p>This is currently required for the Flink Kafka sources, which spawn
+	 * a separate Thread consuming from Kafka and producing the intermediate
+	 * streams in the spawned Thread instead of the main task Thread.
+	 */
+	@Test
+	public void testCancelAsyncProducerAndConsumer() throws Exception {
+		Deadline deadline = new FiniteDuration(2, TimeUnit.MINUTES).fromNow();
+		TestingCluster flink = null;
+
+		try {
+			// Cluster
+			Configuration config = new Configuration();
+			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
+			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
+			config.setInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE, 4096);
+			config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 9);
+
+			flink = new TestingCluster(config, true);
+			flink.start();
+
+			// Job with async producer and consumer
+			JobVertex producer = new JobVertex("AsyncProducer");
+			producer.setParallelism(1);
+			producer.setInvokableClass(AsyncProducer.class);
+
+			JobVertex consumer = new JobVertex("AsyncConsumer");
+			consumer.setParallelism(1);
+			consumer.setInvokableClass(AsyncConsumer.class);
+			consumer.connectNewDataSetAsInput(producer, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
+
+			SlotSharingGroup slot = new SlotSharingGroup(producer.getID(), consumer.getID());
+			producer.setSlotSharingGroup(slot);
+			consumer.setSlotSharingGroup(slot);
+
+			JobGraph jobGraph = new JobGraph(producer, consumer);
+
+			// Submit job and wait until running
+			ActorGateway jobManager = flink.getLeaderGateway(deadline.timeLeft());
+			flink.submitJobDetached(jobGraph);
+
+			Object msg = new WaitForAllVerticesToBeRunning(jobGraph.getJobID());
+			Future<?> runningFuture = jobManager.ask(msg, deadline.timeLeft());
+			Await.ready(runningFuture, deadline.timeLeft());
+
+			// Wait for blocking requests, cancel and wait for cancellation
+			msg = new NotifyWhenJobStatus(jobGraph.getJobID(), JobStatus.CANCELED);
+			Future<?> cancelledFuture = jobManager.ask(msg, deadline.timeLeft());
+
+			boolean producerBlocked = false;
+			for (int i = 0; i < 50; i++) {
+				Thread thread = ASYNC_PRODUCER_THREAD;
+
+				if (thread != null && thread.isAlive()) {
+					StackTraceElement[] stackTrace = thread.getStackTrace();
+					producerBlocked = isInBlockingBufferRequest(stackTrace);
+				}
+
+				if (producerBlocked) {
+					break;
+				} else {
+					// Retry
+					Thread.sleep(500L);
+				}
+			}
+
+			// Verify that async producer is in blocking request
+			assertTrue("Producer thread is not blocked: " + Arrays.toString(ASYNC_PRODUCER_THREAD.getStackTrace()), producerBlocked);
+
+			boolean consumerWaiting = false;
+			for (int i = 0; i < 50; i++) {
+				Thread thread = ASYNC_CONSUMER_THREAD;
+
+				if (thread != null && thread.isAlive()) {
+					consumerWaiting = thread.getState() == Thread.State.WAITING;
+				}
+
+				if (consumerWaiting) {
+					break;
+				} else {
+					// Retry
+					Thread.sleep(500L);
+				}
+			}
+
+			// Verify that async consumer is in blocking request
+			assertTrue("Consumer thread is not blocked.", consumerWaiting);
+
+			msg = new CancelJob(jobGraph.getJobID());
+			Future<?> cancelFuture = jobManager.ask(msg, deadline.timeLeft());
+			Await.ready(cancelFuture, deadline.timeLeft());
+
+			Await.ready(cancelledFuture, deadline.timeLeft());
+
+			// Verify the expected Exceptions
+			assertNotNull(ASYNC_PRODUCER_EXCEPTION);
+			assertEquals(IllegalStateException.class, ASYNC_PRODUCER_EXCEPTION.getClass());
+
+			assertNotNull(ASYNC_CONSUMER_EXCEPTION);
+			assertEquals(IllegalStateException.class, ASYNC_CONSUMER_EXCEPTION.getClass());
+		} finally {
+			if (flink != null) {
+				flink.stop();
+			}
+		}
+	}
+
+	/**
+	 * Invokable emitting records in a separate Thread (not the main Task
+	 * thread).
+	 */
+	public static class AsyncProducer extends AbstractInvokable {
+
+		public AsyncProducer(Environment environment) {
+			super(environment);
+		}
+
+		@Override
+		public void invoke() throws Exception {
+			Thread producer = new ProducerThread(getEnvironment().getWriter(0));
+
+			// Publish the async producer for the main test Thread
+			ASYNC_PRODUCER_THREAD = producer;
+
+			producer.start();
+
+			// Wait for the producer Thread to finish. This is executed in the
+			// main Task thread and will be interrupted on cancellation.
+			while (producer.isAlive()) {
+				try {
+					producer.join();
+				} catch (InterruptedException ignored) {
+				}
+			}
+		}
+
+		/**
+		 * The Thread emitting the records.
+		 */
+		private static class ProducerThread extends Thread {
+
+			private final RecordWriter<LongValue> recordWriter;
+
+			public ProducerThread(ResultPartitionWriter partitionWriter) {
+				this.recordWriter = new RecordWriter<>(partitionWriter);
+			}
+
+			@Override
+			public void run() {
+				LongValue current = new LongValue(0);
+
+				try {
+					while (true) {
+						current.setValue(current.getValue() + 1);
+						recordWriter.emit(current);
+						recordWriter.flushAll();
+					}
+				} catch (Exception e) {
+					ASYNC_PRODUCER_EXCEPTION = e;
+				}
+			}
+		}
+	}
+
+	/**
+	 * Invokable consuming buffers in a separate Thread (not the main Task
+	 * thread).
+	 */
+	public static class AsyncConsumer extends AbstractInvokable {
+
+		public AsyncConsumer(Environment environment) {
+			super(environment);
+		}
+
+		@Override
+		public void invoke() throws Exception {
+			Thread consumer = new ConsumerThread(getEnvironment().getInputGate(0));
+
+			// Publish the async consumer for the main test Thread
+			ASYNC_CONSUMER_THREAD = consumer;
+
+			consumer.start();
+
+			// Wait for the consumer Thread to finish. This is executed in the
+			// main Task thread and will be interrupted on cancellation.
+			while (consumer.isAlive()) {
+				try {
+					consumer.join();
+				} catch (InterruptedException ignored) {
+				}
+			}
+		}
+
+		/**
+		 * The Thread consuming buffers.
+		 */
+		private static class ConsumerThread extends Thread {
+
+			private final InputGate inputGate;
+
+			public ConsumerThread(InputGate inputGate) {
+				this.inputGate = inputGate;
+			}
+
+			@Override
+			public void run() {
+				try {
+					while (true) {
+						inputGate.getNextBufferOrEvent();
+					}
+				} catch (Exception e) {
+					ASYNC_CONSUMER_EXCEPTION = e;
+				}
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4ebbbe7d/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
index c63af83..4b73b09 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskCancelAsyncProducerConsumerITCase.java
@@ -18,11 +18,12 @@
 
 package org.apache.flink.runtime.taskmanager;
 
-import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
@@ -33,28 +34,26 @@ import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
-import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
-import org.apache.flink.runtime.testingUtils.TestingCluster;
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobStatus;
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.WaitForAllVerticesToBeRunning;
+import org.apache.flink.runtime.minicluster.MiniCluster;
+import org.apache.flink.runtime.minicluster.MiniClusterConfiguration;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.testutils.category.Flip6;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.concurrent.TimeUnit;
 
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.Deadline;
-import scala.concurrent.duration.FiniteDuration;
-
 import static org.apache.flink.runtime.io.network.buffer.LocalBufferPoolDestroyTest.isInBlockingBufferRequest;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
+@Category(Flip6.class)
 public class TaskCancelAsyncProducerConsumerITCase extends TestLogger {
 
 	// The Exceptions thrown by the producer/consumer Threads
@@ -75,18 +74,20 @@ public class TaskCancelAsyncProducerConsumerITCase extends TestLogger {
 	 */
 	@Test
 	public void testCancelAsyncProducerAndConsumer() throws Exception {
-		Deadline deadline = new FiniteDuration(2, TimeUnit.MINUTES).fromNow();
-		TestingCluster flink = null;
-
-		try {
-			// Cluster
-			Configuration config = new Configuration();
-			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
-			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
-			config.setInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE, 4096);
-			config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 9);
-
-			flink = new TestingCluster(config, true);
+		Deadline deadline = Deadline.now().plus(Duration.ofMinutes(2));
+
+		// Cluster
+		Configuration config = new Configuration();
+		config.setInteger(TaskManagerOptions.MEMORY_SEGMENT_SIZE, 4096);
+		config.setInteger(TaskManagerOptions.NETWORK_NUM_BUFFERS, 9);
+
+		MiniClusterConfiguration miniClusterConfiguration = new MiniClusterConfiguration.Builder()
+			.setConfiguration(config)
+			.setNumTaskManagers(1)
+			.setNumSlotsPerTaskManager(1)
+			.build();
+
+		try (MiniCluster flink = new MiniCluster(miniClusterConfiguration)) {
 			flink.start();
 
 			// Job with async producer and consumer
@@ -106,16 +107,15 @@ public class TaskCancelAsyncProducerConsumerITCase extends TestLogger {
 			JobGraph jobGraph = new JobGraph(producer, consumer);
 
 			// Submit job and wait until running
-			ActorGateway jobManager = flink.getLeaderGateway(deadline.timeLeft());
-			flink.submitJobDetached(jobGraph);
-
-			Object msg = new WaitForAllVerticesToBeRunning(jobGraph.getJobID());
-			Future<?> runningFuture = jobManager.ask(msg, deadline.timeLeft());
-			Await.ready(runningFuture, deadline.timeLeft());
+			flink.runDetached(jobGraph);
 
-			// Wait for blocking requests, cancel and wait for cancellation
-			msg = new NotifyWhenJobStatus(jobGraph.getJobID(), JobStatus.CANCELED);
-			Future<?> cancelledFuture = jobManager.ask(msg, deadline.timeLeft());
+			FutureUtils.retrySuccesfulWithDelay(
+				() -> flink.getJobStatus(jobGraph.getJobID()),
+				Time.milliseconds(10),
+				deadline,
+				status -> status == JobStatus.RUNNING,
+				TestingUtils.defaultScheduledExecutor()
+			).get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
 
 			boolean producerBlocked = false;
 			for (int i = 0; i < 50; i++) {
@@ -156,11 +156,17 @@ public class TaskCancelAsyncProducerConsumerITCase extends TestLogger {
 			// Verify that async consumer is in blocking request
 			assertTrue("Consumer thread is not blocked.", consumerWaiting);
 
-			msg = new CancelJob(jobGraph.getJobID());
-			Future<?> cancelFuture = jobManager.ask(msg, deadline.timeLeft());
-			Await.ready(cancelFuture, deadline.timeLeft());
+			flink.cancelJob(jobGraph.getJobID())
+				.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
 
-			Await.ready(cancelledFuture, deadline.timeLeft());
+			// wait until the job is canceled
+			FutureUtils.retrySuccesfulWithDelay(
+				() -> flink.getJobStatus(jobGraph.getJobID()),
+				Time.milliseconds(10),
+				deadline,
+				status -> status == JobStatus.CANCELED,
+				TestingUtils.defaultScheduledExecutor()
+			).get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
 
 			// Verify the expected Exceptions
 			assertNotNull(ASYNC_PRODUCER_EXCEPTION);
@@ -168,10 +174,6 @@ public class TaskCancelAsyncProducerConsumerITCase extends TestLogger {
 
 			assertNotNull(ASYNC_CONSUMER_EXCEPTION);
 			assertEquals(IllegalStateException.class, ASYNC_CONSUMER_EXCEPTION.getClass());
-		} finally {
-			if (flink != null) {
-				flink.stop();
-			}
 		}
 	}
 


[5/7] flink git commit: [FLINK-8957][tests] Port JMXJobManagerMetricTest to flip6

Posted by ch...@apache.org.
[FLINK-8957][tests] Port JMXJobManagerMetricTest to flip6

This closes #5720.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ecdeb35c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ecdeb35c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ecdeb35c

Branch: refs/heads/release-1.5
Commit: ecdeb35cf43b160b3ff86d8acfb9672e758bd2b7
Parents: 765bcc1
Author: zentol <ch...@apache.org>
Authored: Mon Mar 19 14:17:34 2018 +0100
Committer: zentol <ch...@apache.org>
Committed: Fri Mar 23 19:12:29 2018 +0100

----------------------------------------------------------------------
 flink-metrics/flink-metrics-jmx/pom.xml         |  6 ++
 .../jobmanager/JMXJobManagerMetricTest.java     | 69 +++++++++++---------
 2 files changed, 44 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ecdeb35c/flink-metrics/flink-metrics-jmx/pom.xml
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-jmx/pom.xml b/flink-metrics/flink-metrics-jmx/pom.xml
index 66de1b3..d90595c 100644
--- a/flink-metrics/flink-metrics-jmx/pom.xml
+++ b/flink-metrics/flink-metrics-jmx/pom.xml
@@ -85,5 +85,11 @@ under the License.
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
+		<dependency>
+			<groupId>org.apache.flink</groupId>
+			<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
+			<version>${project.version}</version>
+			<scope>test</scope>
+		</dependency>
 	</dependencies>
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/ecdeb35c/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
----------------------------------------------------------------------
diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
index 6770ec3..c00b5d3 100644
--- a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
+++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java
@@ -18,38 +18,40 @@
 
 package org.apache.flink.runtime.jobmanager;
 
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.metrics.jmx.JMXReporter;
 import org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
 import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
-import org.apache.flink.runtime.testingUtils.TestingCluster;
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.test.util.MiniClusterResource;
 
 import org.junit.Assert;
+import org.junit.ClassRule;
 import org.junit.Test;
 
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
 import java.lang.management.ManagementFactory;
+import java.time.Duration;
 import java.util.Collections;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.Deadline;
-import scala.concurrent.duration.FiniteDuration;
-
 import static org.junit.Assert.assertEquals;
 
 /**
@@ -57,24 +59,31 @@ import static org.junit.Assert.assertEquals;
  */
 public class JMXJobManagerMetricTest {
 
-	/**
-	 * Tests that metrics registered on the JobManager are actually accessible via JMX.
-	 */
-	@Test
-	public void testJobManagerJMXMetricAccess() throws Exception {
-		Deadline deadline = new FiniteDuration(2, TimeUnit.MINUTES).fromNow();
+	@ClassRule
+	public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
+		new MiniClusterResource.MiniClusterResourceConfiguration(
+			getConfiguration(),
+			1,
+			1),
+		true);
+
+	private static Configuration getConfiguration() {
 		Configuration flinkConfiguration = new Configuration();
 
 		flinkConfiguration.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test." + ConfigConstants.METRICS_REPORTER_CLASS_SUFFIX, JMXReporter.class.getName());
-		flinkConfiguration.setString(ConfigConstants.METRICS_REPORTER_PREFIX + "test.port", "9060-9075");
-
 		flinkConfiguration.setString(MetricOptions.SCOPE_NAMING_JM_JOB, "jobmanager.<job_name>");
 
-		TestingCluster flink = new TestingCluster(flinkConfiguration);
+		return flinkConfiguration;
+	}
 
-		try {
-			flink.start();
+	/**
+	 * Tests that metrics registered on the JobManager are actually accessible via JMX.
+	 */
+	@Test
+	public void testJobManagerJMXMetricAccess() throws Exception {
+		Deadline deadline = Deadline.now().plus(Duration.ofMinutes(2));
 
+		try {
 			JobVertex sourceJobVertex = new JobVertex("Source");
 			sourceJobVertex.setInvokableClass(BlockingInvokable.class);
 
@@ -92,28 +101,26 @@ public class JMXJobManagerMetricTest {
 					true),
 				null));
 
-			flink.waitForActorsToBeAlive();
-
-			flink.submitJobDetached(jobGraph);
+			ClusterClient<?> client = MINI_CLUSTER_RESOURCE.getClusterClient();
+			client.setDetached(true);
+			client.submitJob(jobGraph, JMXJobManagerMetricTest.class.getClassLoader());
 
-			Future<Object> jobRunning = flink.getLeaderGateway(deadline.timeLeft())
-				.ask(new TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobGraph.getJobID()), deadline.timeLeft());
-			Await.ready(jobRunning, deadline.timeLeft());
+			FutureUtils.retrySuccesfulWithDelay(
+				() -> client.getJobStatus(jobGraph.getJobID()),
+				Time.milliseconds(10),
+				deadline,
+				status -> status == JobStatus.RUNNING,
+				TestingUtils.defaultScheduledExecutor()
+			).get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
 
 			MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
 			Set<ObjectName> nameSet = mBeanServer.queryNames(new ObjectName("org.apache.flink.jobmanager.job.lastCheckpointSize:job_name=TestingJob,*"), null);
 			Assert.assertEquals(1, nameSet.size());
 			assertEquals(-1L, mBeanServer.getAttribute(nameSet.iterator().next(), "Value"));
 
-			Future<Object> jobFinished = flink.getLeaderGateway(deadline.timeLeft())
-				.ask(new TestingJobManagerMessages.NotifyWhenJobRemoved(jobGraph.getJobID()), deadline.timeLeft());
-
 			BlockingInvokable.unblock();
-
-			// wait til the job has finished
-			Await.ready(jobFinished, deadline.timeLeft());
 		} finally {
-			flink.stop();
+			BlockingInvokable.unblock();
 		}
 	}
 


[3/7] flink git commit: [FLINK-8959][tests] Port AccumulatorLiveITCase to flip6

Posted by ch...@apache.org.
[FLINK-8959][tests] Port AccumulatorLiveITCase to flip6

This closes #5719.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/765bcc15
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/765bcc15
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/765bcc15

Branch: refs/heads/release-1.5
Commit: 765bcc154cc10133b2a9ceb884c62138560370e0
Parents: 0acc1e2
Author: zentol <ch...@apache.org>
Authored: Mon Mar 19 13:59:22 2018 +0100
Committer: zentol <ch...@apache.org>
Committed: Fri Mar 23 19:12:29 2018 +0100

----------------------------------------------------------------------
 .../accumulators/AccumulatorLiveITCase.java     | 336 +++++-----------
 .../LegacyAccumulatorLiveITCase.java            | 386 +++++++++++++++++++
 2 files changed, 482 insertions(+), 240 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/765bcc15/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
index 756b81e..ff362dd 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/AccumulatorLiveITCase.java
@@ -18,292 +18,199 @@
 
 package org.apache.flink.test.accumulators;
 
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.IntCounter;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.LocalEnvironment;
+import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.AkkaOptions;
-import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HeartbeatManagerOptions;
+import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.optimizer.DataStatistics;
 import org.apache.flink.optimizer.Optimizer;
 import org.apache.flink.optimizer.plan.OptimizedPlan;
 import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
-import org.apache.flink.runtime.akka.AkkaUtils;
-import org.apache.flink.runtime.akka.ListeningBehaviour;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.testingUtils.TestingCluster;
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
-import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.testutils.category.Flip6;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
 
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.pattern.Patterns;
-import akka.testkit.JavaTestKit;
-import akka.util.Timeout;
-import org.junit.After;
 import org.junit.Before;
+import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
-import scala.concurrent.Await;
-import scala.concurrent.Future;
-import scala.concurrent.duration.FiniteDuration;
-
-import static org.junit.Assert.fail;
-
 /**
- * Tests the availability of accumulator results during runtime. The test case tests a user-defined
- * accumulator and Flink's internal accumulators for two consecutive tasks.
- *
- * <p>CHAINED[Source -> Map] -> Sink
- *
- * <p>Checks are performed as the elements arrive at the operators. Checks consist of a message sent by
- * the task to the task manager which notifies the job manager and sends the current accumulators.
- * The task blocks until the test has been notified about the current accumulator values.
- *
- * <p>A barrier between the operators ensures that that pipelining is disabled for the streaming test.
- * The batch job reads the records one at a time. The streaming code buffers the records beforehand;
- * that's why exact guarantees about the number of records read are very hard to make. Thus, why we
- * check for an upper bound of the elements read.
+ * Tests the availability of accumulator results during runtime.
  */
+@Category(Flip6.class)
 public class AccumulatorLiveITCase extends TestLogger {
 
 	private static final Logger LOG = LoggerFactory.getLogger(AccumulatorLiveITCase.class);
 
-	private static ActorSystem system;
-	private static ActorGateway jobManagerGateway;
-	private static ActorRef taskManager;
-
-	private static JobID jobID;
-	private static JobGraph jobGraph;
-
 	// name of user accumulator
 	private static final String ACCUMULATOR_NAME = "test";
 
+	private static final long HEARTBEAT_INTERVAL = 50L;
+
 	// number of heartbeat intervals to check
 	private static final int NUM_ITERATIONS = 5;
 
-	private static List<String> inputData = new ArrayList<>(NUM_ITERATIONS);
+	private static final List<Integer> inputData = new ArrayList<>(NUM_ITERATIONS);
 
-	private static final FiniteDuration TIMEOUT = new FiniteDuration(10, TimeUnit.SECONDS);
+	static {
+		// generate test data
+		for (int i = 0; i < NUM_ITERATIONS; i++) {
+			inputData.add(i);
+		}
+	}
 
-	@Before
-	public void before() throws Exception {
-		system = AkkaUtils.createLocalActorSystem(new Configuration());
+	@ClassRule
+	public static final MiniClusterResource MINI_CLUSTER_RESOURCE = new MiniClusterResource(
+		new MiniClusterResource.MiniClusterResourceConfiguration(
+			getConfiguration(),
+			1,
+			1),
+		true);
 
+	private static Configuration getConfiguration() {
 		Configuration config = new Configuration();
-		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
-		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
 		config.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
-		TestingCluster testingCluster = new TestingCluster(config, false, true);
-		testingCluster.start();
-
-		jobManagerGateway = testingCluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
-		taskManager = testingCluster.getTaskManagersAsJava().get(0);
-
-		// generate test data
-		for (int i = 0; i < NUM_ITERATIONS; i++) {
-			inputData.add(i, String.valueOf(i + 1));
-		}
+		config.setLong(HeartbeatManagerOptions.HEARTBEAT_INTERVAL, HEARTBEAT_INTERVAL);
 
-		NotifyingMapper.finished = false;
+		return config;
 	}
 
-	@After
-	public void after() throws Exception {
-		JavaTestKit.shutdownActorSystem(system);
-		inputData.clear();
+	@Before
+	public void resetLatches() throws InterruptedException {
+		NotifyingMapper.reset();
 	}
 
 	@Test
 	public void testBatch() throws Exception {
-
-		/** The program **/
-		ExecutionEnvironment env = new BatchPlanExtractor();
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		env.setParallelism(1);
 
-		DataSet<String> input = env.fromCollection(inputData);
+		DataSet<Integer> input = env.fromCollection(inputData);
 		input
 				.flatMap(new NotifyingMapper())
-				.output(new NotifyingOutputFormat());
-
-		env.execute();
+				.output(new DummyOutputFormat());
 
 		// Extract job graph and set job id for the task to notify of accumulator changes.
-		jobGraph = getOptimizedPlan(((BatchPlanExtractor) env).plan);
-		jobID = jobGraph.getJobID();
+		JobGraph jobGraph = getJobGraph(env.createProgramPlan());
 
-		verifyResults();
+		submitJobAndVerifyResults(jobGraph);
 	}
 
 	@Test
 	public void testStreaming() throws Exception {
 
-		StreamExecutionEnvironment env = new DummyStreamExecutionEnvironment();
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
 		env.setParallelism(1);
 
-		DataStream<String> input = env.fromCollection(inputData);
+		DataStream<Integer> input = env.fromCollection(inputData);
 		input
 				.flatMap(new NotifyingMapper())
-				.writeUsingOutputFormat(new NotifyingOutputFormat()).disableChaining();
+				.writeUsingOutputFormat(new DummyOutputFormat()).disableChaining();
 
-		jobGraph = env.getStreamGraph().getJobGraph();
-		jobID = jobGraph.getJobID();
+		JobGraph jobGraph = env.getStreamGraph().getJobGraph();
 
-		verifyResults();
+		submitJobAndVerifyResults(jobGraph);
 	}
 
-	private static void verifyResults() {
-		new JavaTestKit(system) {{
-
-			ActorGateway selfGateway = new AkkaActorGateway(getRef(), jobManagerGateway.leaderSessionID());
-
-			// register for accumulator changes
-			jobManagerGateway.tell(new TestingJobManagerMessages.NotifyWhenAccumulatorChange(jobID), selfGateway);
-			expectMsgEquals(TIMEOUT, true);
-
-			// submit job
-
-			jobManagerGateway.tell(
-					new JobManagerMessages.SubmitJob(
-							jobGraph,
-							ListeningBehaviour.EXECUTION_RESULT),
-					selfGateway);
-			expectMsgClass(TIMEOUT, JobManagerMessages.JobSubmitSuccess.class);
-
-			TestingJobManagerMessages.UpdatedAccumulators msg = (TestingJobManagerMessages.UpdatedAccumulators) receiveOne(TIMEOUT);
-			Map<String, Accumulator<?, ?>> userAccumulators = msg.userAccumulators();
-
-			ExecutionAttemptID mapperTaskID = null;
-
-			ExecutionAttemptID sinkTaskID = null;
-
-			/* Check for accumulator values */
-			if (checkUserAccumulators(0, userAccumulators)) {
-				LOG.info("Passed initial check for map task.");
-			} else {
-				fail("Wrong accumulator results when map task begins execution.");
-			}
-
-			int expectedAccVal = 0;
-
-			/* for mapper task */
-			for (int i = 1; i <= NUM_ITERATIONS; i++) {
-				expectedAccVal += i;
-
-				// receive message
-				msg = (TestingJobManagerMessages.UpdatedAccumulators) receiveOne(TIMEOUT);
-				userAccumulators = msg.userAccumulators();
-
-				LOG.info("{}", userAccumulators);
-
-				if (checkUserAccumulators(expectedAccVal, userAccumulators)) {
-					LOG.info("Passed round #" + i);
-				} else if (checkUserAccumulators(expectedAccVal, userAccumulators)) {
-					// we determined the wrong task id and need to switch the two here
-					ExecutionAttemptID temp = mapperTaskID;
-					mapperTaskID = sinkTaskID;
-					sinkTaskID = temp;
-					LOG.info("Passed round #" + i);
-				} else {
-					fail("Failed in round #" + i);
-				}
-			}
-
-			msg = (TestingJobManagerMessages.UpdatedAccumulators) receiveOne(TIMEOUT);
-			userAccumulators = msg.userAccumulators();
-
-			if (checkUserAccumulators(expectedAccVal, userAccumulators)) {
-				LOG.info("Passed initial check for sink task.");
-			} else {
-				fail("Wrong accumulator results when sink task begins execution.");
-			}
-
-			/* for sink task */
-			for (int i = 1; i <= NUM_ITERATIONS; i++) {
-
-				// receive message
-				msg = (TestingJobManagerMessages.UpdatedAccumulators) receiveOne(TIMEOUT);
-
-				userAccumulators = msg.userAccumulators();
-
-				LOG.info("{}", userAccumulators);
-
-				if (checkUserAccumulators(expectedAccVal, userAccumulators)) {
-					LOG.info("Passed round #" + i);
-				} else {
-					fail("Failed in round #" + i);
-				}
-			}
-
-			expectMsgClass(TIMEOUT, JobManagerMessages.JobResultSuccess.class);
-
-		}};
-	}
-
-	private static boolean checkUserAccumulators(int expected, Map<String, Accumulator<?, ?>> accumulatorMap) {
-		LOG.info("checking user accumulators");
-		return accumulatorMap.containsKey(ACCUMULATOR_NAME) && expected == ((IntCounter) accumulatorMap.get(ACCUMULATOR_NAME)).getLocalValue();
+	private static void submitJobAndVerifyResults(JobGraph jobGraph) throws Exception {
+		Deadline deadline = Deadline.now().plus(Duration.ofSeconds(30));
+
+		ClusterClient<?> client = MINI_CLUSTER_RESOURCE.getClusterClient();
+
+		client.setDetached(true);
+		client.submitJob(jobGraph, AccumulatorLiveITCase.class.getClassLoader());
+
+		try {
+			NotifyingMapper.notifyLatch.await();
+
+			FutureUtils.retrySuccesfulWithDelay(
+				() -> {
+					try {
+						return CompletableFuture.completedFuture(client.getAccumulators(jobGraph.getJobID()));
+					} catch (Exception e) {
+						return FutureUtils.completedExceptionally(e);
+					}
+				},
+				Time.milliseconds(20),
+				deadline,
+				accumulators -> accumulators.size() == 1
+					&& accumulators.containsKey(ACCUMULATOR_NAME)
+					&& (int) accumulators.get(ACCUMULATOR_NAME) == NUM_ITERATIONS,
+				TestingUtils.defaultScheduledExecutor()
+			).get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
+
+			NotifyingMapper.shutdownLatch.trigger();
+		} finally {
+			NotifyingMapper.shutdownLatch.trigger();
+		}
 	}
 
 	/**
 	 * UDF that notifies when it changes the accumulator values.
 	 */
-	private static class NotifyingMapper extends RichFlatMapFunction<String, Integer> {
+	private static class NotifyingMapper extends RichFlatMapFunction<Integer, Integer> {
 		private static final long serialVersionUID = 1L;
 
-		private IntCounter counter = new IntCounter();
+		private static final OneShotLatch notifyLatch = new OneShotLatch();
+		private static final OneShotLatch shutdownLatch = new OneShotLatch();
 
-		private static boolean finished = false;
+		private final IntCounter counter = new IntCounter();
 
 		@Override
 		public void open(Configuration parameters) throws Exception {
 			getRuntimeContext().addAccumulator(ACCUMULATOR_NAME, counter);
-			notifyTaskManagerOfAccumulatorUpdate();
 		}
 
 		@Override
-		public void flatMap(String value, Collector<Integer> out) throws Exception {
-			int val = Integer.valueOf(value);
-			counter.add(val);
-			out.collect(val);
+		public void flatMap(Integer value, Collector<Integer> out) throws Exception {
+			counter.add(1);
+			out.collect(value);
 			LOG.debug("Emitting value {}.", value);
-			notifyTaskManagerOfAccumulatorUpdate();
+			if (counter.getLocalValuePrimitive() == 5) {
+				notifyLatch.trigger();
+			}
 		}
 
 		@Override
 		public void close() throws Exception {
-			finished = true;
+			shutdownLatch.await();
+		}
+
+		private static void reset() throws InterruptedException {
+			notifyLatch.reset();
+			shutdownLatch.reset();
 		}
 	}
 
 	/**
-	 * Outputs format which notifies of accumulator changes and waits for the previous mapper.
+	 * Outputs format which waits for the previous mapper.
 	 */
-	private static class NotifyingOutputFormat implements OutputFormat<Integer> {
+	private static class DummyOutputFormat implements OutputFormat<Integer> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
@@ -312,17 +219,10 @@ public class AccumulatorLiveITCase extends TestLogger {
 
 		@Override
 		public void open(int taskNumber, int numTasks) throws IOException {
-			while (!NotifyingMapper.finished) {
-				try {
-					Thread.sleep(1000);
-				} catch (InterruptedException e) {}
-			}
-			notifyTaskManagerOfAccumulatorUpdate();
 		}
 
 		@Override
 		public void writeRecord(Integer record) throws IOException {
-			notifyTaskManagerOfAccumulatorUpdate();
 		}
 
 		@Override
@@ -331,56 +231,12 @@ public class AccumulatorLiveITCase extends TestLogger {
 	}
 
 	/**
-	 * Notify task manager of accumulator update and wait until the Heartbeat containing the message
-	 * has been reported.
-	 */
-	public static void notifyTaskManagerOfAccumulatorUpdate() {
-		new JavaTestKit(system) {{
-			Timeout timeout = new Timeout(TIMEOUT);
-			Future<Object> ask = Patterns.ask(taskManager, new TestingTaskManagerMessages.AccumulatorsChanged(jobID), timeout);
-			try {
-				Await.result(ask, timeout.duration());
-			} catch (Exception e) {
-				fail("Failed to notify task manager of accumulator update.");
-			}
-		}};
-	}
-
-	/**
 	 * Helpers to generate the JobGraph.
 	 */
-	private static JobGraph getOptimizedPlan(Plan plan) {
+	private static JobGraph getJobGraph(Plan plan) {
 		Optimizer pc = new Optimizer(new DataStatistics(), new Configuration());
 		JobGraphGenerator jgg = new JobGraphGenerator();
 		OptimizedPlan op = pc.compile(plan);
 		return jgg.compileJobGraph(op);
 	}
-
-	private static class BatchPlanExtractor extends LocalEnvironment {
-
-		private Plan plan = null;
-
-		@Override
-		public JobExecutionResult execute(String jobName) throws Exception {
-			plan = createProgramPlan();
-			return new JobExecutionResult(new JobID(), -1, null);
-		}
-	}
-
-	/**
-	 * This is used to for creating the example topology. {@link #execute} is never called, we
-	 * only use this to call {@link #getStreamGraph()}.
-	 */
-	private static class DummyStreamExecutionEnvironment extends StreamExecutionEnvironment {
-
-		@Override
-		public JobExecutionResult execute() throws Exception {
-			return execute("default");
-		}
-
-		@Override
-		public JobExecutionResult execute(String jobName) throws Exception {
-			throw new RuntimeException("This should not be called.");
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/765bcc15/flink-tests/src/test/java/org/apache/flink/test/accumulators/LegacyAccumulatorLiveITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/accumulators/LegacyAccumulatorLiveITCase.java b/flink-tests/src/test/java/org/apache/flink/test/accumulators/LegacyAccumulatorLiveITCase.java
new file mode 100644
index 0000000..6595b10
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/accumulators/LegacyAccumulatorLiveITCase.java
@@ -0,0 +1,386 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.accumulators;
+
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.accumulators.Accumulator;
+import org.apache.flink.api.common.accumulators.IntCounter;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.LocalEnvironment;
+import org.apache.flink.configuration.AkkaOptions;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.optimizer.DataStatistics;
+import org.apache.flink.optimizer.Optimizer;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.akka.ListeningBehaviour;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingCluster;
+import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingTaskManagerMessages;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.TestLogger;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.pattern.Patterns;
+import akka.testkit.JavaTestKit;
+import akka.util.Timeout;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import scala.concurrent.Await;
+import scala.concurrent.Future;
+import scala.concurrent.duration.FiniteDuration;
+
+import static org.junit.Assert.fail;
+
+/**
+ * Tests the availability of accumulator results during runtime. The test case tests a user-defined
+ * accumulator and Flink's internal accumulators for two consecutive tasks.
+ *
+ * <p>CHAINED[Source -> Map] -> Sink
+ *
+ * <p>Checks are performed as the elements arrive at the operators. Checks consist of a message sent by
+ * the task to the task manager which notifies the job manager and sends the current accumulators.
+ * The task blocks until the test has been notified about the current accumulator values.
+ *
+ * <p>A barrier between the operators ensures that that pipelining is disabled for the streaming test.
+ * The batch job reads the records one at a time. The streaming code buffers the records beforehand;
+ * that's why exact guarantees about the number of records read are very hard to make. Thus, why we
+ * check for an upper bound of the elements read.
+ */
+public class LegacyAccumulatorLiveITCase extends TestLogger {
+
+	private static final Logger LOG = LoggerFactory.getLogger(LegacyAccumulatorLiveITCase.class);
+
+	private static ActorSystem system;
+	private static ActorGateway jobManagerGateway;
+	private static ActorRef taskManager;
+
+	private static JobID jobID;
+	private static JobGraph jobGraph;
+
+	// name of user accumulator
+	private static final String ACCUMULATOR_NAME = "test";
+
+	// number of heartbeat intervals to check
+	private static final int NUM_ITERATIONS = 5;
+
+	private static List<String> inputData = new ArrayList<>(NUM_ITERATIONS);
+
+	private static final FiniteDuration TIMEOUT = new FiniteDuration(10, TimeUnit.SECONDS);
+
+	@Before
+	public void before() throws Exception {
+		system = AkkaUtils.createLocalActorSystem(new Configuration());
+
+		Configuration config = new Configuration();
+		config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 1);
+		config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
+		config.setString(AkkaOptions.ASK_TIMEOUT, TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT());
+		TestingCluster testingCluster = new TestingCluster(config, false, true);
+		testingCluster.start();
+
+		jobManagerGateway = testingCluster.getLeaderGateway(TestingUtils.TESTING_DURATION());
+		taskManager = testingCluster.getTaskManagersAsJava().get(0);
+
+		// generate test data
+		for (int i = 0; i < NUM_ITERATIONS; i++) {
+			inputData.add(i, String.valueOf(i + 1));
+		}
+
+		NotifyingMapper.finished = false;
+	}
+
+	@After
+	public void after() throws Exception {
+		JavaTestKit.shutdownActorSystem(system);
+		inputData.clear();
+	}
+
+	@Test
+	public void testBatch() throws Exception {
+
+		/** The program **/
+		ExecutionEnvironment env = new BatchPlanExtractor();
+		env.setParallelism(1);
+
+		DataSet<String> input = env.fromCollection(inputData);
+		input
+				.flatMap(new NotifyingMapper())
+				.output(new NotifyingOutputFormat());
+
+		env.execute();
+
+		// Extract job graph and set job id for the task to notify of accumulator changes.
+		jobGraph = getOptimizedPlan(((BatchPlanExtractor) env).plan);
+		jobID = jobGraph.getJobID();
+
+		verifyResults();
+	}
+
+	@Test
+	public void testStreaming() throws Exception {
+
+		StreamExecutionEnvironment env = new DummyStreamExecutionEnvironment();
+		env.setParallelism(1);
+
+		DataStream<String> input = env.fromCollection(inputData);
+		input
+				.flatMap(new NotifyingMapper())
+				.writeUsingOutputFormat(new NotifyingOutputFormat()).disableChaining();
+
+		jobGraph = env.getStreamGraph().getJobGraph();
+		jobID = jobGraph.getJobID();
+
+		verifyResults();
+	}
+
+	private static void verifyResults() {
+		new JavaTestKit(system) {{
+
+			ActorGateway selfGateway = new AkkaActorGateway(getRef(), jobManagerGateway.leaderSessionID());
+
+			// register for accumulator changes
+			jobManagerGateway.tell(new TestingJobManagerMessages.NotifyWhenAccumulatorChange(jobID), selfGateway);
+			expectMsgEquals(TIMEOUT, true);
+
+			// submit job
+
+			jobManagerGateway.tell(
+					new JobManagerMessages.SubmitJob(
+							jobGraph,
+							ListeningBehaviour.EXECUTION_RESULT),
+					selfGateway);
+			expectMsgClass(TIMEOUT, JobManagerMessages.JobSubmitSuccess.class);
+
+			TestingJobManagerMessages.UpdatedAccumulators msg = (TestingJobManagerMessages.UpdatedAccumulators) receiveOne(TIMEOUT);
+			Map<String, Accumulator<?, ?>> userAccumulators = msg.userAccumulators();
+
+			ExecutionAttemptID mapperTaskID = null;
+
+			ExecutionAttemptID sinkTaskID = null;
+
+			/* Check for accumulator values */
+			if (checkUserAccumulators(0, userAccumulators)) {
+				LOG.info("Passed initial check for map task.");
+			} else {
+				fail("Wrong accumulator results when map task begins execution.");
+			}
+
+			int expectedAccVal = 0;
+
+			/* for mapper task */
+			for (int i = 1; i <= NUM_ITERATIONS; i++) {
+				expectedAccVal += i;
+
+				// receive message
+				msg = (TestingJobManagerMessages.UpdatedAccumulators) receiveOne(TIMEOUT);
+				userAccumulators = msg.userAccumulators();
+
+				LOG.info("{}", userAccumulators);
+
+				if (checkUserAccumulators(expectedAccVal, userAccumulators)) {
+					LOG.info("Passed round #" + i);
+				} else if (checkUserAccumulators(expectedAccVal, userAccumulators)) {
+					// we determined the wrong task id and need to switch the two here
+					ExecutionAttemptID temp = mapperTaskID;
+					mapperTaskID = sinkTaskID;
+					sinkTaskID = temp;
+					LOG.info("Passed round #" + i);
+				} else {
+					fail("Failed in round #" + i);
+				}
+			}
+
+			msg = (TestingJobManagerMessages.UpdatedAccumulators) receiveOne(TIMEOUT);
+			userAccumulators = msg.userAccumulators();
+
+			if (checkUserAccumulators(expectedAccVal, userAccumulators)) {
+				LOG.info("Passed initial check for sink task.");
+			} else {
+				fail("Wrong accumulator results when sink task begins execution.");
+			}
+
+			/* for sink task */
+			for (int i = 1; i <= NUM_ITERATIONS; i++) {
+
+				// receive message
+				msg = (TestingJobManagerMessages.UpdatedAccumulators) receiveOne(TIMEOUT);
+
+				userAccumulators = msg.userAccumulators();
+
+				LOG.info("{}", userAccumulators);
+
+				if (checkUserAccumulators(expectedAccVal, userAccumulators)) {
+					LOG.info("Passed round #" + i);
+				} else {
+					fail("Failed in round #" + i);
+				}
+			}
+
+			expectMsgClass(TIMEOUT, JobManagerMessages.JobResultSuccess.class);
+
+		}};
+	}
+
+	private static boolean checkUserAccumulators(int expected, Map<String, Accumulator<?, ?>> accumulatorMap) {
+		LOG.info("checking user accumulators");
+		return accumulatorMap.containsKey(ACCUMULATOR_NAME) && expected == ((IntCounter) accumulatorMap.get(ACCUMULATOR_NAME)).getLocalValue();
+	}
+
+	/**
+	 * UDF that notifies when it changes the accumulator values.
+	 */
+	private static class NotifyingMapper extends RichFlatMapFunction<String, Integer> {
+		private static final long serialVersionUID = 1L;
+
+		private IntCounter counter = new IntCounter();
+
+		private static boolean finished = false;
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			getRuntimeContext().addAccumulator(ACCUMULATOR_NAME, counter);
+			notifyTaskManagerOfAccumulatorUpdate();
+		}
+
+		@Override
+		public void flatMap(String value, Collector<Integer> out) throws Exception {
+			int val = Integer.valueOf(value);
+			counter.add(val);
+			out.collect(val);
+			LOG.debug("Emitting value {}.", value);
+			notifyTaskManagerOfAccumulatorUpdate();
+		}
+
+		@Override
+		public void close() throws Exception {
+			finished = true;
+		}
+	}
+
+	/**
+	 * Outputs format which notifies of accumulator changes and waits for the previous mapper.
+	 */
+	private static class NotifyingOutputFormat implements OutputFormat<Integer> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public void configure(Configuration parameters) {
+		}
+
+		@Override
+		public void open(int taskNumber, int numTasks) throws IOException {
+			while (!NotifyingMapper.finished) {
+				try {
+					Thread.sleep(1000);
+				} catch (InterruptedException e) {}
+			}
+			notifyTaskManagerOfAccumulatorUpdate();
+		}
+
+		@Override
+		public void writeRecord(Integer record) throws IOException {
+			notifyTaskManagerOfAccumulatorUpdate();
+		}
+
+		@Override
+		public void close() throws IOException {
+		}
+	}
+
+	/**
+	 * Notify task manager of accumulator update and wait until the Heartbeat containing the message
+	 * has been reported.
+	 */
+	public static void notifyTaskManagerOfAccumulatorUpdate() {
+		new JavaTestKit(system) {{
+			Timeout timeout = new Timeout(TIMEOUT);
+			Future<Object> ask = Patterns.ask(taskManager, new TestingTaskManagerMessages.AccumulatorsChanged(jobID), timeout);
+			try {
+				Await.result(ask, timeout.duration());
+			} catch (Exception e) {
+				fail("Failed to notify task manager of accumulator update.");
+			}
+		}};
+	}
+
+	/**
+	 * Helpers to generate the JobGraph.
+	 */
+	private static JobGraph getOptimizedPlan(Plan plan) {
+		Optimizer pc = new Optimizer(new DataStatistics(), new Configuration());
+		JobGraphGenerator jgg = new JobGraphGenerator();
+		OptimizedPlan op = pc.compile(plan);
+		return jgg.compileJobGraph(op);
+	}
+
+	private static class BatchPlanExtractor extends LocalEnvironment {
+
+		private Plan plan = null;
+
+		@Override
+		public JobExecutionResult execute(String jobName) throws Exception {
+			plan = createProgramPlan();
+			return new JobExecutionResult(new JobID(), -1, null);
+		}
+	}
+
+	/**
+	 * This is used to for creating the example topology. {@link #execute} is never called, we
+	 * only use this to call {@link #getStreamGraph()}.
+	 */
+	private static class DummyStreamExecutionEnvironment extends StreamExecutionEnvironment {
+
+		@Override
+		public JobExecutionResult execute() throws Exception {
+			return execute("default");
+		}
+
+		@Override
+		public JobExecutionResult execute(String jobName) throws Exception {
+			throw new RuntimeException("This should not be called.");
+		}
+	}
+}