You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/02 17:14:30 UTC

[2/8] flink git commit: [hotfix] Fix SourceStreamTaskTest to switch source to running before checkpointing

[hotfix] Fix SourceStreamTaskTest to switch source to running before checkpointing


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c58ba3da
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c58ba3da
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c58ba3da

Branch: refs/heads/master
Commit: c58ba3da90884846a52b174098c782eb08308c25
Parents: b0f2379
Author: Stephan Ewen <se...@apache.org>
Authored: Sun Aug 2 15:51:14 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Aug 2 15:58:27 2015 +0200

----------------------------------------------------------------------
 .../runtime/tasks/SourceStreamTaskTest.java     | 72 +++++++++++---------
 1 file changed, 39 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c58ba3da/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
index 0f6e5f1..232485d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
@@ -15,21 +15,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.streaming.runtime.tasks;
 
+package org.apache.flink.streaming.runtime.tasks;
 
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.streaming.api.checkpoint.Checkpointed;
 import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.util.TestHarnessUtil;
+
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -98,38 +98,46 @@ public class SourceStreamTaskTest {
 		final int SOURCE_CHECKPOINT_DELAY = 1000; // how many random values we sum up in storeCheckpoint
 		final int SOURCE_READ_DELAY = 1; // in ms
 
-
-		final TupleTypeInfo<Tuple2<Long, Integer>> typeInfo = new TupleTypeInfo<Tuple2<Long, Integer>>(BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO);
-		final SourceStreamTask<Tuple2<Long, Integer>> sourceTask = new SourceStreamTask<Tuple2<Long, Integer>>();
-		final StreamTaskTestHarness<Tuple2<Long, Integer>> testHarness = new StreamTaskTestHarness<Tuple2<Long, Integer>>(sourceTask, typeInfo);
-
-		StreamConfig streamConfig = testHarness.getStreamConfig();
-		StreamSource<Tuple2<Long, Integer>> sourceOperator = new StreamSource<Tuple2<Long, Integer>>(new MockSource(NUM_ELEMENTS, SOURCE_CHECKPOINT_DELAY, SOURCE_READ_DELAY));
-		streamConfig.setStreamOperator(sourceOperator);
-
-
 		ExecutorService executor = Executors.newFixedThreadPool(10);
-		Future<Boolean>[] checkpointerResults = new Future[NUM_CHECKPOINTERS];
-		for (int i = 0; i < NUM_CHECKPOINTERS; i++) {
-			checkpointerResults[i] = executor.submit(new Checkpointer(NUM_CHECKPOINTS, CHECKPOINT_INTERVAL, sourceTask));
-		}
-
-		testHarness.invoke();
-		testHarness.waitForTaskCompletion();
-
-		// Get the result from the checkpointers, if these threw an exception it
-		// will be rethrown here
-		for (int i = 0; i < NUM_CHECKPOINTERS; i++) {
-			if (!checkpointerResults[i].isDone()) {
-				checkpointerResults[i].cancel(true);
+		try {
+			final TupleTypeInfo<Tuple2<Long, Integer>> typeInfo = new TupleTypeInfo<Tuple2<Long, Integer>>(BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO);
+			final SourceStreamTask<Tuple2<Long, Integer>> sourceTask = new SourceStreamTask<Tuple2<Long, Integer>>();
+			final StreamTaskTestHarness<Tuple2<Long, Integer>> testHarness = new StreamTaskTestHarness<Tuple2<Long, Integer>>(sourceTask, typeInfo);
+	
+			StreamConfig streamConfig = testHarness.getStreamConfig();
+			StreamSource<Tuple2<Long, Integer>> sourceOperator = new StreamSource<Tuple2<Long, Integer>>(new MockSource(NUM_ELEMENTS, SOURCE_CHECKPOINT_DELAY, SOURCE_READ_DELAY));
+			streamConfig.setStreamOperator(sourceOperator);
+			
+			// prepare the 
+			
+			Future<Boolean>[] checkpointerResults = new Future[NUM_CHECKPOINTERS];
+	
+			// invoke this first, so the tasks are actually running when the checkpoints are scheduled
+			testHarness.invoke();
+			
+			for (int i = 0; i < NUM_CHECKPOINTERS; i++) {
+				checkpointerResults[i] = executor.submit(new Checkpointer(NUM_CHECKPOINTS, CHECKPOINT_INTERVAL, sourceTask));
 			}
-			if (!checkpointerResults[i].isCancelled()) {
-				checkpointerResults[i].get();
+			
+			testHarness.waitForTaskCompletion();
+	
+			// Get the result from the checkpointers, if these threw an exception it
+			// will be rethrown here
+			for (int i = 0; i < NUM_CHECKPOINTERS; i++) {
+				if (!checkpointerResults[i].isDone()) {
+					checkpointerResults[i].cancel(true);
+				}
+				if (!checkpointerResults[i].isCancelled()) {
+					checkpointerResults[i].get();
+				}
 			}
+	
+			List<Tuple2<Long, Integer>> resultElements = TestHarnessUtil.getRawElementsFromOutput(testHarness.getOutput());
+			Assert.assertEquals(NUM_ELEMENTS, resultElements.size());
+		}
+		finally {
+			executor.shutdown();
 		}
-
-		List<Tuple2<Long, Integer>> resultElements = TestHarnessUtil.getRawElementsFromOutput(testHarness.getOutput());
-		Assert.assertEquals(NUM_ELEMENTS, resultElements.size());
 	}
 
 	private static class MockSource implements SourceFunction<Tuple2<Long, Integer>>, Checkpointed<Serializable> {
@@ -267,9 +275,7 @@ public class SourceStreamTaskTest {
 		}
 
 		@Override
-		public void cancel() {
-
-		}
+		public void cancel() {}
 	}
 }