You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2015/08/02 17:14:30 UTC
[2/8] flink git commit: [hotfix] Fix SourceStreamTaskTest to switch
source to running before checkpointing
[hotfix] Fix SourceStreamTaskTest to switch source to running before checkpointing
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c58ba3da
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c58ba3da
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c58ba3da
Branch: refs/heads/master
Commit: c58ba3da90884846a52b174098c782eb08308c25
Parents: b0f2379
Author: Stephan Ewen <se...@apache.org>
Authored: Sun Aug 2 15:51:14 2015 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Sun Aug 2 15:58:27 2015 +0200
----------------------------------------------------------------------
.../runtime/tasks/SourceStreamTaskTest.java | 72 +++++++++++---------
1 file changed, 39 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/c58ba3da/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
index 0f6e5f1..232485d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
@@ -15,21 +15,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.streaming.runtime.tasks;
+package org.apache.flink.streaming.runtime.tasks;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.util.TestHarnessUtil;
+
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -98,38 +98,46 @@ public class SourceStreamTaskTest {
final int SOURCE_CHECKPOINT_DELAY = 1000; // how many random values we sum up in storeCheckpoint
final int SOURCE_READ_DELAY = 1; // in ms
-
- final TupleTypeInfo<Tuple2<Long, Integer>> typeInfo = new TupleTypeInfo<Tuple2<Long, Integer>>(BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO);
- final SourceStreamTask<Tuple2<Long, Integer>> sourceTask = new SourceStreamTask<Tuple2<Long, Integer>>();
- final StreamTaskTestHarness<Tuple2<Long, Integer>> testHarness = new StreamTaskTestHarness<Tuple2<Long, Integer>>(sourceTask, typeInfo);
-
- StreamConfig streamConfig = testHarness.getStreamConfig();
- StreamSource<Tuple2<Long, Integer>> sourceOperator = new StreamSource<Tuple2<Long, Integer>>(new MockSource(NUM_ELEMENTS, SOURCE_CHECKPOINT_DELAY, SOURCE_READ_DELAY));
- streamConfig.setStreamOperator(sourceOperator);
-
-
ExecutorService executor = Executors.newFixedThreadPool(10);
- Future<Boolean>[] checkpointerResults = new Future[NUM_CHECKPOINTERS];
- for (int i = 0; i < NUM_CHECKPOINTERS; i++) {
- checkpointerResults[i] = executor.submit(new Checkpointer(NUM_CHECKPOINTS, CHECKPOINT_INTERVAL, sourceTask));
- }
-
- testHarness.invoke();
- testHarness.waitForTaskCompletion();
-
- // Get the result from the checkpointers, if these threw an exception it
- // will be rethrown here
- for (int i = 0; i < NUM_CHECKPOINTERS; i++) {
- if (!checkpointerResults[i].isDone()) {
- checkpointerResults[i].cancel(true);
+ try {
+ final TupleTypeInfo<Tuple2<Long, Integer>> typeInfo = new TupleTypeInfo<Tuple2<Long, Integer>>(BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO);
+ final SourceStreamTask<Tuple2<Long, Integer>> sourceTask = new SourceStreamTask<Tuple2<Long, Integer>>();
+ final StreamTaskTestHarness<Tuple2<Long, Integer>> testHarness = new StreamTaskTestHarness<Tuple2<Long, Integer>>(sourceTask, typeInfo);
+
+ StreamConfig streamConfig = testHarness.getStreamConfig();
+ StreamSource<Tuple2<Long, Integer>> sourceOperator = new StreamSource<Tuple2<Long, Integer>>(new MockSource(NUM_ELEMENTS, SOURCE_CHECKPOINT_DELAY, SOURCE_READ_DELAY));
+ streamConfig.setStreamOperator(sourceOperator);
+
+ // prepare the
+
+ Future<Boolean>[] checkpointerResults = new Future[NUM_CHECKPOINTERS];
+
+ // invoke this first, so the tasks are actually running when the checkpoints are scheduled
+ testHarness.invoke();
+
+ for (int i = 0; i < NUM_CHECKPOINTERS; i++) {
+ checkpointerResults[i] = executor.submit(new Checkpointer(NUM_CHECKPOINTS, CHECKPOINT_INTERVAL, sourceTask));
}
- if (!checkpointerResults[i].isCancelled()) {
- checkpointerResults[i].get();
+
+ testHarness.waitForTaskCompletion();
+
+ // Get the result from the checkpointers, if these threw an exception it
+ // will be rethrown here
+ for (int i = 0; i < NUM_CHECKPOINTERS; i++) {
+ if (!checkpointerResults[i].isDone()) {
+ checkpointerResults[i].cancel(true);
+ }
+ if (!checkpointerResults[i].isCancelled()) {
+ checkpointerResults[i].get();
+ }
}
+
+ List<Tuple2<Long, Integer>> resultElements = TestHarnessUtil.getRawElementsFromOutput(testHarness.getOutput());
+ Assert.assertEquals(NUM_ELEMENTS, resultElements.size());
+ }
+ finally {
+ executor.shutdown();
}
-
- List<Tuple2<Long, Integer>> resultElements = TestHarnessUtil.getRawElementsFromOutput(testHarness.getOutput());
- Assert.assertEquals(NUM_ELEMENTS, resultElements.size());
}
private static class MockSource implements SourceFunction<Tuple2<Long, Integer>>, Checkpointed<Serializable> {
@@ -267,9 +275,7 @@ public class SourceStreamTaskTest {
}
@Override
- public void cancel() {
-
- }
+ public void cancel() {}
}
}