You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2016/09/21 15:54:18 UTC

[5/6] flink git commit: [hotfix] [tests] Fix race condition in RescalingITCase that could make the test stuck in a blocking call until timeout

[hotfix] [tests] Fix race condition in RescalingITCase that could make the test stuck in a blocking call until timeout

This closes #2513


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5f67b54b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5f67b54b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5f67b54b

Branch: refs/heads/master
Commit: 5f67b54b2ca4f7ea79d184e65a99ef230dbdc660
Parents: 4d4eb64
Author: Stefan Richter <s....@data-artisans.com>
Authored: Mon Sep 19 17:54:23 2016 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Wed Sep 21 17:53:33 2016 +0200

----------------------------------------------------------------------
 .../test/checkpointing/RescalingITCase.java     | 51 ++++++++++++--------
 1 file changed, 30 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5f67b54b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
index 7f1d7f3..263bf79 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RescalingITCase.java
@@ -69,9 +69,9 @@ import static org.junit.Assert.fail;
 
 public class RescalingITCase extends TestLogger {
 
-	private static int numTaskManagers = 2;
-	private static int slotsPerTaskManager = 2;
-	private static int numSlots = numTaskManagers * slotsPerTaskManager;
+	private static final int numTaskManagers = 2;
+	private static final int slotsPerTaskManager = 2;
+	private static final int numSlots = numTaskManagers * slotsPerTaskManager;
 
 	private static TestingCluster cluster;
 
@@ -109,12 +109,12 @@ public class RescalingITCase extends TestLogger {
 	 */
 	@Test
 	public void testSavepointRescalingWithPartitionedState() throws Exception {
-		int numberKeys = 42;
-		int numberElements = 1000;
-		int numberElements2 = 500;
-		int parallelism = numSlots / 2;
-		int parallelism2 = numSlots;
-		int maxParallelism = 13;
+		final int numberKeys = 42;
+		final int numberElements = 1000;
+		final int numberElements2 = 500;
+		final int parallelism = numSlots / 2;
+		final int parallelism2 = numSlots;
+		final int maxParallelism = 13;
 
 		FiniteDuration timeout = new FiniteDuration(3, TimeUnit.MINUTES);
 		Deadline deadline = timeout.fromNow();
@@ -214,9 +214,9 @@ public class RescalingITCase extends TestLogger {
 	 */
 	@Test
 	public void testSavepointRescalingFailureWithNonPartitionedState() throws Exception {
-		int parallelism = numSlots / 2;
-		int parallelism2 = numSlots;
-		int maxParallelism = 13;
+		final int parallelism = numSlots / 2;
+		final int parallelism2 = numSlots;
+		final int maxParallelism = 13;
 
 		FiniteDuration timeout = new FiniteDuration(3, TimeUnit.MINUTES);
 		Deadline deadline = timeout.fromNow();
@@ -235,12 +235,14 @@ public class RescalingITCase extends TestLogger {
 
 			Object savepointResponse = null;
 
-			// we might be too early for taking a savepoint if the operators have not been started yet
+			// wait until the operator is started
+			NonPartitionedStateSource.workStartedLatch.await();
+
 			while (deadline.hasTimeLeft()) {
 
 				Future<Object> savepointPathFuture = jobManager.ask(new JobManagerMessages.TriggerSavepoint(jobID), deadline.timeLeft());
-
-				savepointResponse = Await.result(savepointPathFuture, deadline.timeLeft());
+				FiniteDuration waitingTime = new FiniteDuration(10, TimeUnit.SECONDS);
+				savepointResponse = Await.result(savepointPathFuture, waitingTime);
 
 				if (savepointResponse instanceof JobManagerMessages.TriggerSavepointSuccess) {
 					break;
@@ -428,6 +430,8 @@ public class RescalingITCase extends TestLogger {
 		env.enableCheckpointing(checkpointInterval);
 		env.setRestartStrategy(RestartStrategies.noRestart());
 
+		NonPartitionedStateSource.workStartedLatch = new CountDownLatch(1);
+
 		DataStream<Integer> input = env.addSource(new NonPartitionedStateSource());
 
 		input.addSink(new DiscardingSink<Integer>());
@@ -466,7 +470,7 @@ public class RescalingITCase extends TestLogger {
 
 		DataStream<Tuple2<Integer, Integer>> result = input.flatMap(new SubtaskIndexFlatMapper(numberElements));
 
-		result.addSink(new CollectionSink());
+		result.addSink(new CollectionSink<Tuple2<Integer, Integer>>());
 
 		return env.getStreamGraph().getJobGraph();
 	}
@@ -504,7 +508,7 @@ public class RescalingITCase extends TestLogger {
 
 		DataStream<Tuple2<Integer, Integer>> result = input.flatMap(new SubtaskIndexFlatMapper(numberElements));
 
-		result.addSink(new CollectionSink());
+		result.addSink(new CollectionSink<Tuple2<Integer, Integer>>());
 
 		return env.getStreamGraph().getJobGraph();
 	}
@@ -645,8 +649,10 @@ public class RescalingITCase extends TestLogger {
 
 		private static final long serialVersionUID = -8108185918123186841L;
 
-		private int counter = 0;
-		private boolean running = true;
+		private static volatile CountDownLatch workStartedLatch = new CountDownLatch(1);
+
+		private volatile int counter = 0;
+		private volatile boolean running = true;
 
 		@Override
 		public Integer snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
@@ -669,13 +675,16 @@ public class RescalingITCase extends TestLogger {
 					ctx.collect(counter * getRuntimeContext().getIndexOfThisSubtask());
 				}
 
-				Thread.sleep(100);
+				Thread.sleep(2);
+				if(counter == 10) {
+					workStartedLatch.countDown();
+				}
 			}
 		}
 
 		@Override
 		public void cancel() {
-			running = true;
+			running = false;
 		}
 	}
 }