You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/07/21 12:45:12 UTC
[2/8] flink git commit: [FLINK-1967] Introduce (Event)time in
Streaming
http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitionerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitionerTest.java
index 405a28e..856f7aa 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitionerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/ForwardPartitionerTest.java
@@ -28,7 +28,7 @@ import org.junit.Test;
public class ForwardPartitionerTest {
private RebalancePartitioner<Tuple> forwardPartitioner;
- private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>();
+ private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>(null);
private SerializationDelegate<StreamRecord<Tuple>> sd = new SerializationDelegate<StreamRecord<Tuple>>(
null);
http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitionerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitionerTest.java
index 69c00cd..6ae3730 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitionerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/GlobalPartitionerTest.java
@@ -21,7 +21,6 @@ import static org.junit.Assert.assertArrayEquals;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.runtime.partitioner.GlobalPartitioner;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.junit.Before;
import org.junit.Test;
@@ -29,7 +28,7 @@ import org.junit.Test;
public class GlobalPartitionerTest {
private GlobalPartitioner<Tuple> globalPartitioner;
- private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>();
+ private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>(null);
private SerializationDelegate<StreamRecord<Tuple>> sd = new SerializationDelegate<StreamRecord<Tuple>>(
null);
http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitionerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitionerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitionerTest.java
index d99a21e..aff177c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitionerTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/partitioner/ShufflePartitionerTest.java
@@ -22,7 +22,6 @@ import static org.junit.Assert.assertTrue;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.junit.Before;
import org.junit.Test;
@@ -30,7 +29,7 @@ import org.junit.Test;
public class ShufflePartitionerTest {
private ShufflePartitioner<Tuple> shufflePartitioner;
- private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>();
+ private StreamRecord<Tuple> streamRecord = new StreamRecord<Tuple>(null);
private SerializationDelegate<StreamRecord<Tuple>> sd = new SerializationDelegate<StreamRecord<Tuple>>(
null);
http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
new file mode 100644
index 0000000..d623dd8
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTest.java
@@ -0,0 +1,339 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.tasks;
+
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.StreamMap;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.joda.time.Instant;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * Tests for {@link OneInputStreamTask}.
+ *
+ * <p>
+ * Note:<br>
+ * We only use a {@link StreamMap} operator here. We also test the individual operators but Map is
+ * used as a representative to test OneInputStreamTask, since OneInputStreamTask is used for all
+ * OneInputStreamOperators.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ResultPartitionWriter.class})
+public class OneInputStreamTaskTest {
+
+ /**
+ * This test verifies that open() and close() are correctly called. This test also verifies
+ * that timestamps of emitted elements are correct. {@link StreamMap} assigns the input
+ * timestamp to emitted elements.
+ */
+ @Test
+ public void testOpenCloseAndTimestamps() throws Exception {
+ final OneInputStreamTask<String, String> mapTask = new OneInputStreamTask<String, String>();
+ final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<String, String>(mapTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+
+ StreamConfig streamConfig = testHarness.getStreamConfig();
+ StreamMap<String, String> mapOperator = new StreamMap<String, String>(new TestOpenCloseMapFunction());
+ streamConfig.setStreamOperator(mapOperator);
+
+ long initialTime = 0L;
+ ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue();
+
+ testHarness.invoke();
+
+ testHarness.processElement(new StreamRecord<String>("Hello", initialTime + 1));
+ testHarness.processElement(new StreamRecord<String>("Ciao", initialTime + 2));
+ expectedOutput.add(new StreamRecord<String>("Hello", initialTime + 1));
+ expectedOutput.add(new StreamRecord<String>("Ciao", initialTime + 2));
+
+ testHarness.endInput();
+
+ testHarness.waitForTaskCompletion();
+
+ Assert.assertTrue("RichFunction methods where not called.", TestOpenCloseMapFunction.closeCalled);
+
+ TestHarnessUtil.assertOutputEquals("Output was not correct.",
+ expectedOutput,
+ testHarness.getOutput());
+ }
+
+ /**
+ * This test verifies that watermarks are correctly forwarded. This also checks whether
+ * watermarks are forwarded only when we have received watermarks from all inputs. The
+ * forwarded watermark must be the minimum of the watermarks of all inputs.
+ */
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testWatermarkForwarding() throws Exception {
+ final OneInputStreamTask<String, String> mapTask = new OneInputStreamTask<String, String>();
+ final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<String, String>(mapTask, 2, 2, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+
+ StreamConfig streamConfig = testHarness.getStreamConfig();
+ StreamMap<String, String> mapOperator = new StreamMap<String, String>(new IdentityMap());
+ streamConfig.setStreamOperator(mapOperator);
+
+ ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue();
+ long initialTime = 0L;
+
+ testHarness.invoke();
+
+ testHarness.processElement(new Watermark(initialTime), 0, 0);
+ testHarness.processElement(new Watermark(initialTime), 0, 1);
+ testHarness.processElement(new Watermark(initialTime), 1, 0);
+
+ // now the output should still be empty
+ testHarness.waitForInputProcessing();
+ TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+ testHarness.processElement(new Watermark(initialTime), 1, 1);
+
+ // now the watermark should have propagated, Map simply forward Watermarks
+ testHarness.waitForInputProcessing();
+ expectedOutput.add(new Watermark(initialTime));
+ TestHarnessUtil.assertOutputEquals("Output was not correct.",
+ expectedOutput,
+ testHarness.getOutput());
+
+ // contrary to checkpoint barriers these elements are not blocked by watermarks
+ testHarness.processElement(new StreamRecord<String>("Hello", initialTime));
+ testHarness.processElement(new StreamRecord<String>("Ciao", initialTime));
+ expectedOutput.add(new StreamRecord<String>("Hello", initialTime));
+ expectedOutput.add(new StreamRecord<String>("Ciao", initialTime));
+
+ testHarness.processElement(new Watermark(initialTime + 4), 0, 0);
+ testHarness.processElement(new Watermark(initialTime + 3), 0, 1);
+ testHarness.processElement(new Watermark(initialTime + 3), 1, 0);
+ testHarness.processElement(new Watermark(initialTime + 2), 1, 1);
+
+ // check whether we get the minimum of all the watermarks, this must also only occur in
+ // the output after the two StreamRecords
+ testHarness.waitForInputProcessing();
+ expectedOutput.add(new Watermark(initialTime + 2));
+ TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+
+ // advance watermark from one of the inputs, now we should get a now one since the
+ // minimum increases
+ testHarness.processElement(new Watermark(initialTime + 4), 1, 1);
+ testHarness.waitForInputProcessing();
+ expectedOutput.add(new Watermark(initialTime + 3));
+ TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+ // advance the other two inputs, now we should get a new one since the
+ // minimum increases again
+ testHarness.processElement(new Watermark(initialTime + 4), 0, 1);
+ testHarness.processElement(new Watermark(initialTime + 4), 1, 0);
+ testHarness.waitForInputProcessing();
+ expectedOutput.add(new Watermark(initialTime + 4));
+ TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+ testHarness.endInput();
+
+ testHarness.waitForTaskCompletion();
+
+ List<String> resultElements = TestHarnessUtil.getRawElementsFromOutput(testHarness.getOutput());
+ Assert.assertEquals(2, resultElements.size());
+ }
+
+ /**
+ * This test verifies that checkpoint barriers are correctly forwarded.
+ */
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testCheckpointBarriers() throws Exception {
+ final OneInputStreamTask<String, String> mapTask = new OneInputStreamTask<String, String>();
+ final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<String, String>(mapTask, 2, 2, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+
+ StreamConfig streamConfig = testHarness.getStreamConfig();
+ StreamMap<String, String> mapOperator = new StreamMap<String, String>(new IdentityMap());
+ streamConfig.setStreamOperator(mapOperator);
+
+ Queue expectedOutput = new ConcurrentLinkedQueue();
+ long initialTime = 0L;
+
+ testHarness.invoke();
+
+ testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 0);
+
+ // These elements should be buffered until we receive barriers from
+ // all inputs
+ testHarness.processElement(new StreamRecord<String>("Hello-0-0", initialTime), 0, 0);
+ testHarness.processElement(new StreamRecord<String>("Ciao-0-0", initialTime), 0, 0);
+
+ // These elements should be forwarded, since we did not yet receive a checkpoint barrier
+ // on that input, only add to same input, otherwise we would not know the ordering
+ // of the output since the Task might read the inputs in any order
+ testHarness.processElement(new StreamRecord<String>("Hello-1-1", initialTime), 1, 1);
+ testHarness.processElement(new StreamRecord<String>("Ciao-1-1", initialTime), 1, 1);
+ expectedOutput.add(new StreamRecord<String>("Hello-1-1", initialTime));
+ expectedOutput.add(new StreamRecord<String>("Ciao-1-1", initialTime));
+
+ testHarness.waitForInputProcessing();
+ // we should not yet see the barrier, only the two elements from non-blocked input
+ TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+ testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 1);
+ testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 0);
+ testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 1);
+
+ testHarness.waitForInputProcessing();
+
+ // now we should see the barrier and after that the buffered elements
+ expectedOutput.add(new CheckpointBarrier(0, 0));
+ expectedOutput.add(new StreamRecord<String>("Hello-0-0", initialTime));
+ expectedOutput.add(new StreamRecord<String>("Ciao-0-0", initialTime));
+
+ testHarness.endInput();
+
+ testHarness.waitForTaskCompletion();
+
+ TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+ }
+
+ /**
+ * This test verifies that checkpoint barriers and barrier buffers work correctly with
+ * concurrent checkpoint barriers where one checkpoint is "overtaking" another checkpoint, i.e.
+ * some inputs receive barriers from an earlier checkpoint, thereby blocking,
+ * then all inputs receive barriers from a later checkpoint.
+ */
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testOvertakingCheckpointBarriers() throws Exception {
+ final OneInputStreamTask<String, String> mapTask = new OneInputStreamTask<String, String>();
+ final OneInputStreamTaskTestHarness<String, String> testHarness = new OneInputStreamTaskTestHarness<String, String>(mapTask, 2, 2, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+
+ StreamConfig streamConfig = testHarness.getStreamConfig();
+ StreamMap<String, String> mapOperator = new StreamMap<String, String>(new IdentityMap());
+ streamConfig.setStreamOperator(mapOperator);
+
+ Queue expectedOutput = new ConcurrentLinkedQueue();
+ long initialTime = 0L;
+
+ testHarness.invoke();
+
+ testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 0);
+
+ // These elements should be buffered until we receive barriers from
+ // all inputs
+ testHarness.processElement(new StreamRecord<String>("Hello-0-0", initialTime), 0, 0);
+ testHarness.processElement(new StreamRecord<String>("Ciao-0-0", initialTime), 0, 0);
+
+ // These elements should be forwarded, since we did not yet receive a checkpoint barrier
+ // on that input, only add to same input, otherwise we would not know the ordering
+ // of the output since the Task might read the inputs in any order
+ testHarness.processElement(new StreamRecord<String>("Hello-1-1", initialTime), 1, 1);
+ testHarness.processElement(new StreamRecord<String>("Ciao-1-1", initialTime), 1, 1);
+ expectedOutput.add(new StreamRecord<String>("Hello-1-1", initialTime));
+ expectedOutput.add(new StreamRecord<String>("Ciao-1-1", initialTime));
+
+ testHarness.waitForInputProcessing();
+ // we should not yet see the barrier, only the two elements from non-blocked input
+ TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+ // Now give a later barrier to all inputs, this should unblock the first channel,
+ // thereby allowing the two blocked elements through
+ testHarness.processEvent(new CheckpointBarrier(1, 1), 0, 0);
+ testHarness.processEvent(new CheckpointBarrier(1, 1), 0, 1);
+ testHarness.processEvent(new CheckpointBarrier(1, 1), 1, 0);
+ testHarness.processEvent(new CheckpointBarrier(1, 1), 1, 1);
+
+ expectedOutput.add(new StreamRecord<String>("Hello-0-0", initialTime));
+ expectedOutput.add(new StreamRecord<String>("Ciao-0-0", initialTime));
+ expectedOutput.add(new CheckpointBarrier(1, 1));
+
+ testHarness.waitForInputProcessing();
+
+ TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+
+ // Then give the earlier barrier, these should be ignored
+ testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 1);
+ testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 0);
+ testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 1);
+
+ testHarness.waitForInputProcessing();
+
+
+ testHarness.endInput();
+
+ testHarness.waitForTaskCompletion();
+
+ TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+ }
+
+ // This must only be used in one test, otherwise the static fields will be changed
+ // by several tests concurrently
+ private static class TestOpenCloseMapFunction extends RichMapFunction<String, String> {
+ private static final long serialVersionUID = 1L;
+
+ public static boolean openCalled = false;
+ public static boolean closeCalled = false;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ if (closeCalled) {
+ Assert.fail("Close called before open.");
+ }
+ openCalled = true;
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ if (!openCalled) {
+ Assert.fail("Open was not called before close.");
+ }
+ closeCalled = true;
+ }
+
+ @Override
+ public String map(String value) throws Exception {
+ if (!openCalled) {
+ Assert.fail("Open was not called before run.");
+ }
+ return value;
+ }
+ }
+
+ private static class IdentityMap implements MapFunction<String, String> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public String map(String value) throws Exception {
+ return value;
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
new file mode 100644
index 0000000..a8029e6
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/OneInputStreamTaskTestHarness.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.event.task.AbstractEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.StreamTestSingleInputGate;
+
+import java.io.IOException;
+
+
+/**
+ * Test harness for testing a {@link org.apache.flink.streaming.runtime.tasks.OneInputStreamTask}.
+ *
+ * <p>
+ * This mock Invokable provides the task with a basic runtime context and allows pushing elements
+ * and watermarks into the task. {@link #getOutput()} can be used to get the emitted elements
+ * and events. You are free to modify the retrieved list.
+ *
+ * <p>
+ * After setting up everything the Task can be invoked using {@link #invoke()}. This will start
+ * a new Thread to execute the Task. Use {@link #waitForTaskCompletion()} to wait for the Task
+ * thread to finish. Use {@link #processElement} to send elements to the task. Use
+ * {@link #processEvent(AbstractEvent)} to send events to the task.
+ * Before waiting for the task to finish you must call {@link #endInput()} to signal to the task
+ * that data entry is finished.
+ *
+ * <p>
+ * When Elements or Events are offered to the Task they are put into a queue. The input gates
+ * of the Task read from this queue. Use {@link #waitForInputProcessing()} to wait until all
+ * queues are empty. This must be used after entering some elements before checking the
+ * desired output.
+ *
+ * <p>
+ * When using this you need to add the following line to your test class to setup Powermock:
+ * {@code @PrepareForTest({ResultPartitionWriter.class})}
+ */
+public class OneInputStreamTaskTestHarness<IN, OUT> extends StreamTaskTestHarness<OUT> {
+
+ private TypeInformation<IN> inputType;
+ private TypeSerializer<IN> inputSerializer;
+
+ /**
+ * Creates a test harness with the specified number of input gates and specified number
+ * of channels per input gate.
+ */
+ public OneInputStreamTaskTestHarness(OneInputStreamTask<IN, OUT> task,
+ int numInputGates,
+
+ int numInputChannelsPerGate,
+ TypeInformation<IN> inputType,
+ TypeInformation<OUT> outputType) {
+ super(task, outputType);
+
+ this.inputType = inputType;
+ inputSerializer = inputType.createSerializer(executionConfig);
+
+ this.numInputGates = numInputGates;
+ this.numInputChannelsPerGate = numInputChannelsPerGate;
+ }
+
+ /**
+ * Creates a test harness with one input gate that has one input channel.
+ */
+ public OneInputStreamTaskTestHarness(OneInputStreamTask<IN, OUT> task,
+ TypeInformation<IN> inputType,
+ TypeInformation<OUT> outputType) {
+ this(task, 1, 1, inputType, outputType);
+ }
+
+ @Override
+ protected void initializeInputs() throws IOException, InterruptedException {
+ inputGates = new StreamTestSingleInputGate[numInputGates];
+
+ for (int i = 0; i < numInputGates; i++) {
+ inputGates[i] = new StreamTestSingleInputGate<IN>(
+ numInputChannelsPerGate,
+ bufferSize,
+ inputSerializer);
+ this.mockEnv.addInputGate(inputGates[i].getInputGate());
+ }
+
+
+ streamConfig.setNumberOfInputs(1);
+ streamConfig.setTypeSerializerIn1(inputSerializer);
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
index b4877c6..f34eafe 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
@@ -18,50 +18,63 @@
package org.apache.flink.streaming.runtime.tasks;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.state.OperatorState;
-import org.apache.flink.api.common.state.StateCheckpointer;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.taskmanager.Task;
-import org.apache.flink.streaming.api.collector.selector.BroadcastOutputSelectorWrapper;
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
+import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.graph.StreamEdge;
-import org.apache.flink.streaming.api.graph.StreamNode;
import org.apache.flink.streaming.api.operators.StreamSource;
-import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
-import org.apache.flink.util.StringUtils;
+import org.apache.flink.streaming.util.TestHarnessUtil;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Random;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * These tests verify that the RichFunction methods are called (in correct order). And that
+ * checkpointing/element emission don't occur concurrently.
+ */
@RunWith(PowerMockRunner.class)
-@PrepareForTest({Task.class, ResultPartitionWriter.class})
-public class SourceStreamTaskTest extends StreamTaskTestBase {
+@PrepareForTest({ResultPartitionWriter.class})
+public class SourceStreamTaskTest {
- private static final int MEMORY_MANAGER_SIZE = 1024 * 1024;
- private static final int NETWORK_BUFFER_SIZE = 1024;
+ /**
+ * This test verifies that open() and close() are correctly called by the StreamTask.
+ */
+ @Test
+ public void testOpenClose() throws Exception {
+ final SourceStreamTask<String> sourceTask = new SourceStreamTask<String>();
+ final StreamTaskTestHarness<String> testHarness = new StreamTaskTestHarness<String>(sourceTask, BasicTypeInfo.STRING_TYPE_INFO);
+
+ StreamConfig streamConfig = testHarness.getStreamConfig();
+ StreamSource<String> sourceOperator = new StreamSource<String>(new OpenCloseTestSource());
+ streamConfig.setStreamOperator(sourceOperator);
+
+ testHarness.invoke();
+ testHarness.waitForTaskCompletion();
+
+ Assert.assertTrue("RichFunction methods where not called.", OpenCloseTestSource.closeCalled);
+
+ List<String> resultElements = TestHarnessUtil.getRawElementsFromOutput(testHarness.getOutput());
+ Assert.assertEquals(10, resultElements.size());
+ }
/**
* This test ensures that the SourceStreamTask properly serializes checkpointing
@@ -76,7 +89,7 @@ public class SourceStreamTaskTest extends StreamTaskTestBase {
* source kept emitting elements while the checkpoint was ongoing.
*/
@Test
- public void testDataSourceTask() throws Exception {
+ public void testCheckpointing() throws Exception {
final int NUM_ELEMENTS = 100;
final int NUM_CHECKPOINTS = 100;
final int NUM_CHECKPOINTERS = 1;
@@ -84,38 +97,15 @@ public class SourceStreamTaskTest extends StreamTaskTestBase {
final int SOURCE_CHECKPOINT_DELAY = 1000; // how many random values we sum up in storeCheckpoint
final int SOURCE_READ_DELAY = 1; // in ms
- List<Tuple2<Long, Integer>> outList = new ArrayList<Tuple2<Long, Integer>>();
-
- super.initEnvironment(MEMORY_MANAGER_SIZE, NETWORK_BUFFER_SIZE);
-
- StreamSource<Tuple2<Long, Integer>> sourceOperator = new StreamSource<Tuple2<Long, Integer>>(new MockSource(NUM_ELEMENTS, SOURCE_CHECKPOINT_DELAY, SOURCE_READ_DELAY));
+ final TupleTypeInfo<Tuple2<Long, Integer>> typeInfo = new TupleTypeInfo<Tuple2<Long, Integer>>(BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO);
final SourceStreamTask<Tuple2<Long, Integer>> sourceTask = new SourceStreamTask<Tuple2<Long, Integer>>();
+ final StreamTaskTestHarness<Tuple2<Long, Integer>> testHarness = new StreamTaskTestHarness<Tuple2<Long, Integer>>(sourceTask, typeInfo);
- TupleTypeInfo<Tuple2<Long, Integer>> typeInfo = new TupleTypeInfo<Tuple2<Long, Integer>>(BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO);
- TypeSerializer<Tuple2<Long, Integer>> serializer = typeInfo.createSerializer(new ExecutionConfig());
- StreamRecordSerializer<Tuple2<Long, Integer>> streamSerializer = new StreamRecordSerializer<Tuple2<Long, Integer>>(typeInfo, new ExecutionConfig());
-
- super.addOutput(outList, serializer);
-
- StreamConfig streamConfig = super.getStreamConfig();
-
+ StreamConfig streamConfig = testHarness.getStreamConfig();
+ StreamSource<Tuple2<Long, Integer>> sourceOperator = new StreamSource<Tuple2<Long, Integer>>(new MockSource(NUM_ELEMENTS, SOURCE_CHECKPOINT_DELAY, SOURCE_READ_DELAY));
streamConfig.setStreamOperator(sourceOperator);
- streamConfig.setChainStart();
- streamConfig.setOutputSelectorWrapper(new BroadcastOutputSelectorWrapper<Object>());
- streamConfig.setNumberOfOutputs(1);
-
- List<StreamEdge> outEdgesInOrder = new LinkedList<StreamEdge>();
- StreamNode sourceVertex = new StreamNode(null, 0, sourceOperator, "source", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class);
- StreamNode targetVertexDummy = new StreamNode(null, 1, sourceOperator, "target dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class);
- outEdgesInOrder.add(new StreamEdge(sourceVertex, targetVertexDummy, 0, new LinkedList<String>(), new BroadcastPartitioner<Object>()));
- streamConfig.setOutEdgesInOrder(outEdgesInOrder);
- streamConfig.setNonChainedOutputs(outEdgesInOrder);
- streamConfig.setTypeSerializerOut1(streamSerializer);
- streamConfig.setVertexID(0);
-
- super.registerTask(sourceTask);
ExecutorService executor = Executors.newFixedThreadPool(10);
Future[] checkpointerResults = new Future[NUM_CHECKPOINTERS];
@@ -123,13 +113,8 @@ public class SourceStreamTaskTest extends StreamTaskTestBase {
checkpointerResults[i] = executor.submit(new Checkpointer(NUM_CHECKPOINTS, CHECKPOINT_INTERVAL, sourceTask));
}
-
- try {
- sourceTask.invoke();
- } catch (Exception e) {
- System.err.println(StringUtils.stringifyException(e));
- Assert.fail("Invoke method caused exception.");
- }
+ testHarness.invoke();
+ testHarness.waitForTaskCompletion();
// Get the result from the checkpointers, if these threw an exception it
// will be rethrown here
@@ -142,11 +127,11 @@ public class SourceStreamTaskTest extends StreamTaskTestBase {
}
}
- Assert.assertEquals(NUM_ELEMENTS, outList.size());
+ List<Tuple2<Long, Integer>> resultElements = TestHarnessUtil.getRawElementsFromOutput(testHarness.getOutput());
+ Assert.assertEquals(NUM_ELEMENTS, resultElements.size());
}
- private static class MockSource extends RichSourceFunction<Tuple2<Long, Integer>> implements StateCheckpointer<Integer, Integer> {
-
+ private static class MockSource implements SourceFunction<Tuple2<Long, Integer>>, Checkpointed {
private static final long serialVersionUID = 1;
private int maxElements;
@@ -157,7 +142,6 @@ public class SourceStreamTaskTest extends StreamTaskTestBase {
private volatile long lastCheckpointId = -1;
private Semaphore semaphore;
- private OperatorState<Integer> state;
private volatile boolean isRunning = true;
@@ -166,7 +150,7 @@ public class SourceStreamTaskTest extends StreamTaskTestBase {
this.checkpointDelay = checkpointDelay;
this.readDelay = readDelay;
this.count = 0;
- this.semaphore = new Semaphore(1);
+ semaphore = new Semaphore(1);
}
@Override
@@ -191,33 +175,32 @@ public class SourceStreamTaskTest extends StreamTaskTestBase {
public void cancel() {
isRunning = false;
}
-
- @Override
- public void open(Configuration conf) throws IOException{
- state = getRuntimeContext().getOperatorState("state", 1, false, this);
- }
-
@Override
- public Integer snapshotState(Integer state, long checkpointId, long checkpointTimestamp) {
+ public Serializable snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
if (!semaphore.tryAcquire()) {
Assert.fail("Concurrent invocation of snapshotState.");
- } else {
- int startCount = count;
-
- if (startCount != count) {
- semaphore.release();
- // This means that next() was invoked while the snapshot was ongoing
- Assert.fail("Count is different at start end end of snapshot.");
- }
+ }
+ int startCount = count;
+ lastCheckpointId = checkpointId;
+
+ long sum = 0;
+ for (int i = 0; i < checkpointDelay; i++) {
+ sum += new Random().nextLong();
+ }
+
+ if (startCount != count) {
semaphore.release();
+ // This means that next() was invoked while the snapshot was ongoing
+ Assert.fail("Count is different at start end end of snapshot.");
}
- return 0;
+ semaphore.release();
+ return sum;
}
@Override
- public Integer restoreState(Integer stateSnapshot) {
- return stateSnapshot;
+ public void restoreState(Serializable state) {
+
}
}
@@ -247,5 +230,45 @@ public class SourceStreamTaskTest extends StreamTaskTestBase {
return true;
}
}
+
+ public static class OpenCloseTestSource extends RichSourceFunction<String> {
+ private static final long serialVersionUID = 1L;
+
+ public static boolean openCalled = false;
+ public static boolean closeCalled = false;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ if (closeCalled) {
+ Assert.fail("Close called before open.");
+ }
+ openCalled = true;
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ if (!openCalled) {
+ Assert.fail("Open was not called before close.");
+ }
+ closeCalled = true;
+ }
+
+ @Override
+ public void run(SourceContext<String> ctx) throws Exception {
+ if (!openCalled) {
+ Assert.fail("Open was not called before run.");
+ }
+ for (int i = 0; i < 10; i++) {
+ ctx.collect("Hello" + i);
+ }
+ }
+
+ @Override
+ public void cancel() {
+
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
index 2092d83..df0c9ee 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
@@ -25,6 +25,7 @@ import org.apache.flink.core.fs.Path;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.event.task.AbstractEvent;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -36,7 +37,6 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.io.network.partition.consumer.IteratorWrappingTestSingleInputGate;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
import org.apache.flink.runtime.memorymanager.DefaultMemoryManager;
@@ -45,8 +45,6 @@ import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.plugable.DeserializationDelegate;
import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.MutableObjectIterator;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -54,6 +52,7 @@ import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.Queue;
import java.util.concurrent.Future;
import static org.junit.Assert.fail;
@@ -87,9 +86,9 @@ public class StreamMockEnvironment implements Environment {
private final int bufferSize;
- public StreamMockEnvironment(long memorySize, MockInputSplitProvider inputSplitProvider, int bufferSize) {
- this.jobConfiguration = new Configuration();
- this.taskConfiguration = new Configuration();
+ public StreamMockEnvironment(Configuration jobConfig, Configuration taskConfig, long memorySize, MockInputSplitProvider inputSplitProvider, int bufferSize) {
+ this.jobConfiguration = jobConfig;
+ this.taskConfiguration = taskConfig;
this.inputs = new LinkedList<InputGate>();
this.outputs = new LinkedList<ResultPartitionWriter>();
@@ -101,20 +100,11 @@ public class StreamMockEnvironment implements Environment {
this.accumulatorRegistry = new AccumulatorRegistry(jobID, getExecutionId());
}
- public IteratorWrappingTestSingleInputGate<Record> addInput(MutableObjectIterator<Record> inputIterator) {
- try {
- final IteratorWrappingTestSingleInputGate<Record> reader = new IteratorWrappingTestSingleInputGate<Record>(bufferSize, Record.class, inputIterator);
-
- inputs.add(reader.getInputGate());
-
- return reader;
- }
- catch (Throwable t) {
- throw new RuntimeException("Error setting up mock readers: " + t.getMessage(), t);
- }
+ public void addInputGate(InputGate gate) {
+ inputs.add(gate);
}
- public <T> void addOutput(final List<T> outputList, final TypeSerializer<T> serializer) {
+ public <T> void addOutput(final Queue<Object> outputList, final TypeSerializer<T> serializer) {
try {
// The record-oriented writers wrap the buffer writer. We mock it
// to collect the returned buffers and deserialize the content to
@@ -161,6 +151,29 @@ public class StreamMockEnvironment implements Environment {
}
}).when(mockWriter).writeBuffer(any(Buffer.class), anyInt());
+ // Add events to the output list
+ doAnswer(new Answer<Void>() {
+
+ @Override
+ public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
+ AbstractEvent event = (AbstractEvent) invocationOnMock.getArguments()[0];
+
+ outputList.add(event);
+ return null;
+ }
+ }).when(mockWriter).writeEvent(any(AbstractEvent.class), anyInt());
+
+ doAnswer(new Answer<Void>() {
+
+ @Override
+ public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
+ AbstractEvent event = (AbstractEvent) invocationOnMock.getArguments()[0];
+
+ outputList.add(event);
+ return null;
+ }
+ }).when(mockWriter).writeEventToAllChannels(any(AbstractEvent.class));
+
outputs.add(mockWriter);
}
catch (Throwable t) {
http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestBase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestBase.java
deleted file mode 100644
index f1a36c8..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestBase.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.runtime.tasks;
-
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.common.typeutils.record.RecordSerializerFactory;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.io.network.partition.consumer.IteratorWrappingTestSingleInputGate;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memorymanager.MemoryManager;
-import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
-import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
-import org.apache.flink.runtime.operators.util.TaskConfig;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.types.Record;
-import org.apache.flink.util.MutableObjectIterator;
-import org.junit.After;
-import org.junit.Assert;
-
-import java.util.List;
-
-
-public abstract class StreamTaskTestBase {
-
- protected long memorySize = 0;
-
- protected StreamMockEnvironment mockEnv;
-
- public void initEnvironment(long memorySize, int bufferSize) {
- this.memorySize = memorySize;
- this.mockEnv = new StreamMockEnvironment(this.memorySize, new MockInputSplitProvider(), bufferSize);
- }
-
- public IteratorWrappingTestSingleInputGate<Record> addInput(MutableObjectIterator<Record> input, int groupId) {
- final IteratorWrappingTestSingleInputGate<Record> reader = addInput(input, groupId, true);
-
- return reader;
- }
-
- public IteratorWrappingTestSingleInputGate<Record> addInput(MutableObjectIterator<Record> input, int groupId, boolean read) {
- final IteratorWrappingTestSingleInputGate<Record> reader = this.mockEnv.addInput(input);
- TaskConfig conf = new TaskConfig(this.mockEnv.getTaskConfiguration());
- conf.addInputToGroup(groupId);
- conf.setInputSerializer(RecordSerializerFactory.get(), groupId);
-
- if (read) {
- reader.read();
- }
-
- return reader;
- }
-
- public <T> void addOutput(List<T> output, TypeSerializer<T> serializer) {
- this.mockEnv.addOutput(output, serializer);
- TaskConfig conf = new TaskConfig(this.mockEnv.getTaskConfiguration());
- conf.addOutputShipStrategy(ShipStrategyType.FORWARD);
- conf.setOutputSerializer(RecordSerializerFactory.get());
- }
-
- public Configuration getConfiguration() {
- return this.mockEnv.getTaskConfiguration();
- }
-
- public StreamConfig getStreamConfig() {
- return new StreamConfig(this.mockEnv.getTaskConfiguration());
- }
-
- public void registerTask(AbstractInvokable task) {
- task.setEnvironment(this.mockEnv);
- task.registerInputOutput();
- }
-
- public MemoryManager getMemoryManager() {
- return this.mockEnv.getMemoryManager();
- }
-
- @After
- public void shutdownIOManager() throws Exception {
- this.mockEnv.getIOManager().shutdown();
- Assert.assertTrue("IO Manager has not properly shut down.", this.mockEnv.getIOManager().isProperlyShutDown());
- }
-
- @After
- public void shutdownMemoryManager() throws Exception {
- if (this.memorySize > 0) {
- MemoryManager memMan = getMemoryManager();
- if (memMan != null) {
- Assert.assertTrue("Memory Manager managed memory was not completely freed.", memMan.verifyEmpty());
- memMan.shutdown();
- }
- }
- }
-}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
new file mode 100644
index 0000000..a4cc0d3
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -0,0 +1,308 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.event.task.AbstractEvent;
+import org.apache.flink.runtime.io.network.partition.consumer.StreamTestSingleInputGate;
+import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.memorymanager.MemoryManager;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.streaming.api.collector.selector.BroadcastOutputSelectorWrapper;
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.graph.StreamEdge;
+import org.apache.flink.streaming.api.graph.StreamNode;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
+import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecordSerializer;
+import org.apache.flink.util.InstantiationUtil;
+import org.junit.Assert;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+
+/**
+ * Test harness for testing a {@link StreamTask}.
+ *
+ * <p>
+ * This mock Invokable provides the task with a basic runtime context and allows pushing elements
+ * and watermarks into the task. {@link #getOutput()} can be used to get the emitted elements
+ * and events. You are free to modify the retrieved list.
+ *
+ * <p>
+ * After setting up everything the Task can be invoked using {@link #invoke()}. This will start
+ * a new Thread to execute the Task. Use {@link #waitForTaskCompletion()} to wait for the Task
+ * thread to finish.
+ *
+ * <p>
+ * When using this you need to add the following line to your test class to setup Powermock:
+ * {@code @PrepareForTest({ResultPartitionWriter.class})}
+ */
+public class StreamTaskTestHarness<OUT> {
+
+ private static final int DEFAULT_MEMORY_MANAGER_SIZE = 1024 * 1024;
+
+ private static final int DEFAULT_NETWORK_BUFFER_SIZE = 1024;
+
+ protected long memorySize = 0;
+ protected int bufferSize = 0;
+
+ protected StreamMockEnvironment mockEnv;
+ protected ExecutionConfig executionConfig;
+ private Configuration jobConfig;
+ private Configuration taskConfig;
+ protected StreamConfig streamConfig;
+
+ private AbstractInvokable task;
+
+ private TypeInformation<OUT> outputType;
+ private TypeSerializer<OUT> outputSerializer;
+ private StreamRecordSerializer<OUT> outputStreamRecordSerializer;
+
+ private ConcurrentLinkedQueue outputList;
+
+ protected Thread taskThread;
+
+ // These don't get initialized, the one-input/two-input specific test harnesses
+ // must initialize these if they want to simulate input. We have them here so that all the
+ // input related methods only need to be implemented once, in generic form
+ protected int numInputGates;
+ protected int numInputChannelsPerGate;
+ protected StreamTestSingleInputGate[] inputGates;
+
+ public StreamTaskTestHarness(AbstractInvokable task, TypeInformation<OUT> outputType) {
+ this.task = task;
+ this.memorySize = DEFAULT_MEMORY_MANAGER_SIZE;
+ this.bufferSize = DEFAULT_NETWORK_BUFFER_SIZE;
+
+ this.jobConfig = new Configuration();
+ this.taskConfig = new Configuration();
+ this.executionConfig = new ExecutionConfig();
+ executionConfig.enableTimestamps();
+ try {
+ InstantiationUtil.writeObjectToConfig(executionConfig, this.jobConfig, ExecutionConfig.CONFIG_KEY);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ streamConfig = new StreamConfig(taskConfig);
+ streamConfig.setChainStart();
+ streamConfig.setBufferTimeout(0);
+
+ this.outputType = outputType;
+ outputSerializer = outputType.createSerializer(executionConfig);
+ outputStreamRecordSerializer = new MultiplexingStreamRecordSerializer<OUT>(outputSerializer);
+ }
+
+ /**
+ * This must be overwritten for OneInputStreamTask or TwoInputStreamTask test harnesses.
+ */
+ protected void initializeInputs() throws IOException, InterruptedException {
+ }
+
+ @SuppressWarnings("unchecked")
+ private void initializeOutput() {
+ outputList = new ConcurrentLinkedQueue();
+
+ mockEnv.addOutput(outputList, outputStreamRecordSerializer);
+
+ streamConfig.setOutputSelectorWrapper(new BroadcastOutputSelectorWrapper<Object>());
+ streamConfig.setNumberOfOutputs(1);
+
+ StreamOperator<OUT> dummyOperator = new AbstractStreamOperator<OUT>() {
+ private static final long serialVersionUID = 1L;
+ };
+
+ List<StreamEdge> outEdgesInOrder = new LinkedList<StreamEdge>();
+ StreamNode sourceVertexDummy = new StreamNode(null, 0, dummyOperator, "source dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class);
+ StreamNode targetVertexDummy = new StreamNode(null, 1, dummyOperator, "target dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class);
+
+ outEdgesInOrder.add(new StreamEdge(sourceVertexDummy, targetVertexDummy, 0, new LinkedList<String>(), new BroadcastPartitioner<Object>()));
+ streamConfig.setOutEdgesInOrder(outEdgesInOrder);
+ streamConfig.setNonChainedOutputs(outEdgesInOrder);
+ streamConfig.setTypeSerializerOut1(outputSerializer);
+ streamConfig.setVertexID(0);
+
+ }
+
+ /**
+ * Invoke the Task. This resets the output of any previous invocation. This will start a new
+ * Thread to execute the Task in. Use {@link #waitForTaskCompletion()} to wait for the
+ * Task thread to finish running.
+ */
+ public void invoke() throws Exception {
+ mockEnv = new StreamMockEnvironment(jobConfig, taskConfig, memorySize, new MockInputSplitProvider(), bufferSize);
+ task.setEnvironment(mockEnv);
+
+ initializeInputs();
+ initializeOutput();
+
+ task.registerInputOutput();
+
+ taskThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+
+
+
+ try {
+ task.invoke();
+ shutdownIOManager();
+ shutdownMemoryManager();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+
+ }
+ });
+
+ taskThread.start();
+ }
+
+ public void waitForTaskCompletion() throws InterruptedException {
+ if (taskThread == null) {
+ throw new IllegalStateException("Task thread was not started.");
+ }
+
+ taskThread.join();
+ }
+
+ /**
+ * Get all the output from the task. This contains StreamRecords and Events interleaved. Use
+ * {@link org.apache.flink.streaming.util.TestHarnessUtil#getRawElementsFromOutput(java.util.Queue)}}
+ * to extract only the StreamRecords.
+ */
+ public Queue getOutput() {
+ return outputList;
+ }
+
+ public StreamConfig getStreamConfig() {
+ return streamConfig;
+ }
+
+ private void shutdownIOManager() throws Exception {
+ this.mockEnv.getIOManager().shutdown();
+ Assert.assertTrue("IO Manager has not properly shut down.", this.mockEnv.getIOManager().isProperlyShutDown());
+ }
+
+ private void shutdownMemoryManager() throws Exception {
+ if (this.memorySize > 0) {
+ MemoryManager memMan = this.mockEnv.getMemoryManager();
+ if (memMan != null) {
+ Assert.assertTrue("Memory Manager managed memory was not completely freed.", memMan.verifyEmpty());
+ memMan.shutdown();
+ }
+ }
+ }
+
+ /**
+ * Sends the element to input gate 0 on channel 0.
+ */
+ @SuppressWarnings("unchecked")
+ public void processElement(Object element) {
+ inputGates[0].sendElement(element, 0);
+ }
+
+ /**
+ * Sends the element to the specified channel on the specified input gate.
+ */
+ @SuppressWarnings("unchecked")
+ public void processElement(Object element, int inputGate, int channel) {
+ inputGates[inputGate].sendElement(element, channel);
+ }
+
+ /**
+ * Sends the event to input gate 0 on channel 0.
+ */
+ public void processEvent(AbstractEvent event) {
+ inputGates[0].sendEvent(event, 0);
+ }
+
+ /**
+ * Sends the event to the specified channel on the specified input gate.
+ */
+ public void processEvent(AbstractEvent event, int inputGate, int channel) {
+ inputGates[inputGate].sendEvent(event, channel);
+ }
+
+ /**
+ * This only returns after all input queues are empty.
+ */
+ public void waitForInputProcessing() {
+
+
+ // first wait for all input queues to be empty
+ try {
+ Thread.sleep(1);
+ } catch (InterruptedException e) {
+ }
+ while (true) {
+ boolean allEmpty = true;
+ for (int i = 0; i < numInputGates; i++) {
+ if (!inputGates[i].allQueuesEmpty()) {
+ allEmpty = false;
+ }
+ }
+ try {
+ Thread.sleep(10);
+ } catch (InterruptedException e) {
+ }
+ if (allEmpty) {
+ break;
+ }
+ }
+
+ // then wait for the Task Thread to be in a blocked state
+ // Check whether the state is blocked, this should be the case if it cannot
+ // read more input, i.e. all currently available input has been processed.
+ while (true) {
+ Thread.State state = taskThread.getState();
+ if (state == Thread.State.BLOCKED || state == Thread.State.TERMINATED ||
+ state == Thread.State.WAITING || state == Thread.State.TIMED_WAITING) {
+ break;
+ }
+
+ try {
+ Thread.sleep(1);
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+
+ /**
+ * Notifies all input channels on all input gates that no more input will arrive. This
+ * will usually make the Task exit from his internal loop.
+ */
+ public void endInput() {
+ for (int i = 0; i < numInputGates; i++) {
+ inputGates[i].endInput();
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
new file mode 100644
index 0000000..3b113ab
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
@@ -0,0 +1,374 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.tasks;
+
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.streaming.api.functions.co.CoMapFunction;
+import org.apache.flink.streaming.api.functions.co.RichCoMapFunction;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.co.CoStreamMap;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.joda.time.Instant;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+
+/**
+ * Tests for {@link org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask}. Theses tests
+ * implicitly also test the {@link org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor}.
+ *
+ * <p>
+ * Note:<br>
+ * We only use a {@link CoStreamMap} operator here. We also test the individual operators but Map is
+ * used as a representative to test TwoInputStreamTask, since TwoInputStreamTask is used for all
+ * TwoInputStreamOperators.
+ */
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({ResultPartitionWriter.class})
+public class TwoInputStreamTaskTest {
+
+ /**
+ * This test verifies that open() and close() are correctly called. This test also verifies
+ * that timestamps of emitted elements are correct. {@link CoStreamMap} assigns the input
+ * timestamp to emitted elements.
+ */
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testOpenCloseAndTimestamps() throws Exception {
+ final TwoInputStreamTask<String, Integer, String> coMapTask = new TwoInputStreamTask<String, Integer, String>();
+ final TwoInputStreamTaskTestHarness<String, Integer, String> testHarness = new TwoInputStreamTaskTestHarness<String, Integer, String>(coMapTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+
+ StreamConfig streamConfig = testHarness.getStreamConfig();
+ CoStreamMap<String, Integer, String> coMapOperator = new CoStreamMap<String, Integer, String>(new TestOpenCloseMapFunction());
+ streamConfig.setStreamOperator(coMapOperator);
+
+ long initialTime = 0L;
+ ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue();
+
+ testHarness.invoke();
+
+ testHarness.processElement(new StreamRecord<String>("Hello", initialTime + 1), 0, 0);
+ expectedOutput.add(new StreamRecord<String>("Hello", initialTime + 1));
+
+ // wait until the input is processed to ensure ordering of the output
+ testHarness.waitForInputProcessing();
+
+ testHarness.processElement(new StreamRecord<Integer>(1337, initialTime + 2), 1, 0);
+
+ expectedOutput.add(new StreamRecord<String>("1337", initialTime + 2));
+
+ testHarness.endInput();
+
+ testHarness.waitForTaskCompletion();
+
+ Assert.assertTrue("RichFunction methods where not called.", TestOpenCloseMapFunction.closeCalled);
+
+ TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+ }
+
+ /**
+ * This test verifies that watermarks are correctly forwarded. This also checks whether
+ * watermarks are forwarded only when we have received watermarks from all inputs. The
+ * forwarded watermark must be the minimum of the watermarks of all inputs.
+ */
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testWatermarkForwarding() throws Exception {
+ final TwoInputStreamTask<String, Integer, String> coMapTask = new TwoInputStreamTask<String, Integer, String>();
+ final TwoInputStreamTaskTestHarness<String, Integer, String> testHarness = new TwoInputStreamTaskTestHarness<String, Integer, String>(coMapTask, 2, 2, new int[] {1, 2}, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+
+ StreamConfig streamConfig = testHarness.getStreamConfig();
+ CoStreamMap<String, Integer, String> coMapOperator = new CoStreamMap<String, Integer, String>(new IdentityMap());
+ streamConfig.setStreamOperator(coMapOperator);
+
+ ConcurrentLinkedQueue expectedOutput = new ConcurrentLinkedQueue();
+ long initialTime = 0L;
+
+ testHarness.invoke();
+
+ testHarness.processElement(new Watermark(initialTime), 0, 0);
+ testHarness.processElement(new Watermark(initialTime), 0, 1);
+
+ testHarness.processElement(new Watermark(initialTime), 1, 0);
+
+
+ // now the output should still be empty
+ testHarness.waitForInputProcessing();
+ TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+ testHarness.processElement(new Watermark(initialTime), 1, 1);
+
+ // now the watermark should have propagated, Map simply forward Watermarks
+ testHarness.waitForInputProcessing();
+ expectedOutput.add(new Watermark(initialTime));
+ TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+ // contrary to checkpoint barriers these elements are not blocked by watermarks
+ testHarness.processElement(new StreamRecord<String>("Hello", initialTime), 0, 0);
+ testHarness.processElement(new StreamRecord<Integer>(42, initialTime), 1, 1);
+ expectedOutput.add(new StreamRecord<String>("Hello", initialTime));
+ expectedOutput.add(new StreamRecord<String>("42", initialTime));
+
+ testHarness.waitForInputProcessing();
+ TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+ testHarness.processElement(new Watermark(initialTime + 4), 0, 0);
+ testHarness.processElement(new Watermark(initialTime + 3), 0, 1);
+ testHarness.processElement(new Watermark(initialTime + 3), 1, 0);
+ testHarness.processElement(new Watermark(initialTime + 2), 1, 1);
+
+ // check whether we get the minimum of all the watermarks, this must also only occur in
+ // the output after the two StreamRecords
+ expectedOutput.add(new Watermark(initialTime + 2));
+ testHarness.waitForInputProcessing();
+ TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+
+ // advance watermark from one of the inputs, now we should get a now one since the
+ // minimum increases
+ testHarness.processElement(new Watermark(initialTime + 4), 1, 1);
+ testHarness.waitForInputProcessing();
+ expectedOutput.add(new Watermark(initialTime + 3));
+ TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+ // advance the other two inputs, now we should get a new one since the
+ // minimum increases again
+ testHarness.processElement(new Watermark(initialTime + 4), 0, 1);
+ testHarness.processElement(new Watermark(initialTime + 4), 1, 0);
+ testHarness.waitForInputProcessing();
+ expectedOutput.add(new Watermark(initialTime + 4));
+ TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
+
+ testHarness.endInput();
+
+ testHarness.waitForTaskCompletion();
+
+ List<String> resultElements = TestHarnessUtil.getRawElementsFromOutput(testHarness.getOutput());
+ Assert.assertEquals(2, resultElements.size());
+ }
+
+ /**
+ * This test verifies that checkpoint barriers are correctly forwarded.
+ */
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testCheckpointBarriers() throws Exception {
+ final TwoInputStreamTask<String, Integer, String> coMapTask = new TwoInputStreamTask<String, Integer, String>();
+ final TwoInputStreamTaskTestHarness<String, Integer, String> testHarness = new TwoInputStreamTaskTestHarness<String, Integer, String>(coMapTask, 2, 2, new int[] {1, 2}, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+
+ StreamConfig streamConfig = testHarness.getStreamConfig();
+ CoStreamMap<String, Integer, String> coMapOperator = new CoStreamMap<String, Integer, String>(new IdentityMap());
+ streamConfig.setStreamOperator(coMapOperator);
+
+ Queue expectedOutput = new ConcurrentLinkedQueue();
+ long initialTime = 0L;
+
+ testHarness.invoke();
+
+ testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 0);
+
+ // This element should be buffered since we received a checkpoint barrier on
+ // this input
+ testHarness.processElement(new StreamRecord<String>("Hello-0-0", initialTime), 0, 0);
+
+ // This one should go through
+ testHarness.processElement(new StreamRecord<String>("Ciao-0-0", initialTime), 0, 1);
+ expectedOutput.add(new StreamRecord<String>("Ciao-0-0", initialTime));
+
+ // These elements should be forwarded, since we did not yet receive a checkpoint barrier
+ // on that input, only add to same input, otherwise we would not know the ordering
+ // of the output since the Task might read the inputs in any order
+ testHarness.processElement(new StreamRecord<Integer>(11, initialTime), 1, 1);
+ testHarness.processElement(new StreamRecord<Integer>(111, initialTime), 1, 1);
+ expectedOutput.add(new StreamRecord<String>("11", initialTime));
+ expectedOutput.add(new StreamRecord<String>("111", initialTime));
+
+ testHarness.waitForInputProcessing();
+ // we should not yet see the barrier, only the two elements from non-blocked input
+ TestHarnessUtil.assertOutputEquals("Output was not correct.",
+ testHarness.getOutput(),
+ expectedOutput);
+
+ testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 1);
+ testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 0);
+ testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 1);
+
+ testHarness.waitForInputProcessing();
+
+ // now we should see the barrier and after that the buffered elements
+ expectedOutput.add(new CheckpointBarrier(0, 0));
+ expectedOutput.add(new StreamRecord<String>("Hello-0-0", initialTime));
+ TestHarnessUtil.assertOutputEquals("Output was not correct.",
+ testHarness.getOutput(),
+ expectedOutput);
+
+ testHarness.endInput();
+
+ testHarness.waitForTaskCompletion();
+
+ List<String> resultElements = TestHarnessUtil.getRawElementsFromOutput(testHarness.getOutput());
+ Assert.assertEquals(4, resultElements.size());
+ }
+
+ /**
+ * This test verifies that checkpoint barriers and barrier buffers work correctly with
+ * concurrent checkpoint barriers where one checkpoint is "overtaking" another checkpoint, i.e.
+ * some inputs receive barriers from an earlier checkpoint, thereby blocking,
+ * then all inputs receive barriers from a later checkpoint.
+ */
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testOvertakingCheckpointBarriers() throws Exception {
+ final TwoInputStreamTask<String, Integer, String> coMapTask = new TwoInputStreamTask<String, Integer, String>();
+ final TwoInputStreamTaskTestHarness<String, Integer, String> testHarness = new TwoInputStreamTaskTestHarness<String, Integer, String>(coMapTask, 2, 2, new int[] {1, 2}, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
+
+ StreamConfig streamConfig = testHarness.getStreamConfig();
+ CoStreamMap<String, Integer, String> coMapOperator = new CoStreamMap<String, Integer, String>(new IdentityMap());
+ streamConfig.setStreamOperator(coMapOperator);
+
+ Queue expectedOutput = new ConcurrentLinkedQueue();
+ long initialTime = 0L;
+
+ testHarness.invoke();
+
+ testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 0);
+
+ // These elements should be buffered until we receive barriers from
+ // all inputs
+ testHarness.processElement(new StreamRecord<String>("Hello-0-0", initialTime), 0, 0);
+ testHarness.processElement(new StreamRecord<String>("Ciao-0-0", initialTime), 0, 0);
+
+ // These elements should be forwarded, since we did not yet receive a checkpoint barrier
+ // on that input, only add to same input, otherwise we would not know the ordering
+ // of the output since the Task might read the inputs in any order
+ testHarness.processElement(new StreamRecord<Integer>(42, initialTime), 1, 1);
+ testHarness.processElement(new StreamRecord<Integer>(1337, initialTime), 1, 1);
+ expectedOutput.add(new StreamRecord<String>("42", initialTime));
+ expectedOutput.add(new StreamRecord<String>("1337", initialTime));
+
+ testHarness.waitForInputProcessing();
+ // we should not yet see the barrier, only the two elements from non-blocked input
+ TestHarnessUtil.assertOutputEquals("Output was not correct.",
+ expectedOutput,
+ testHarness.getOutput());
+
+ // Now give a later barrier to all inputs, this should unblock the first channel,
+ // thereby allowing the two blocked elements through
+ testHarness.processEvent(new CheckpointBarrier(1, 1), 0, 0);
+ testHarness.processEvent(new CheckpointBarrier(1, 1), 0, 1);
+ testHarness.processEvent(new CheckpointBarrier(1, 1), 1, 0);
+ testHarness.processEvent(new CheckpointBarrier(1, 1), 1, 1);
+
+ expectedOutput.add(new StreamRecord<String>("Hello-0-0", initialTime));
+ expectedOutput.add(new StreamRecord<String>("Ciao-0-0", initialTime));
+ expectedOutput.add(new CheckpointBarrier(1, 1));
+
+ testHarness.waitForInputProcessing();
+
+ TestHarnessUtil.assertOutputEquals("Output was not correct.",
+ expectedOutput,
+ testHarness.getOutput());
+
+
+ // Then give the earlier barrier, these should be ignored
+ testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 1);
+ testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 0);
+ testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 1);
+
+ testHarness.waitForInputProcessing();
+
+
+ testHarness.endInput();
+
+ testHarness.waitForTaskCompletion();
+
+ TestHarnessUtil.assertOutputEquals("Output was not correct.",
+ expectedOutput,
+ testHarness.getOutput());
+ }
+
+ // This must only be used in one test, otherwise the static fields will be changed
+ // by several tests concurrently
+ private static class TestOpenCloseMapFunction extends RichCoMapFunction<String, Integer, String> {
+ private static final long serialVersionUID = 1L;
+
+ public static boolean openCalled = false;
+ public static boolean closeCalled = false;
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
+ if (closeCalled) {
+ Assert.fail("Close called before open.");
+ }
+ openCalled = true;
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ if (!openCalled) {
+ Assert.fail("Open was not called before close.");
+ }
+ closeCalled = true;
+ }
+
+ @Override
+ public String map1(String value) throws Exception {
+ if (!openCalled) {
+ Assert.fail("Open was not called before run.");
+ }
+ return value;
+ }
+
+ @Override
+ public String map2(Integer value) throws Exception {
+ if (!openCalled) {
+ Assert.fail("Open was not called before run.");
+ }
+ return value.toString();
+ }
+ }
+
+ private static class IdentityMap implements CoMapFunction<String, Integer, String> {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public String map1(String value) throws Exception {
+ return value;
+ }
+
+ @Override
+ public String map2(Integer value) throws Exception {
+
+ return value.toString();
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/flink/blob/a2eb6cc8/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
new file mode 100644
index 0000000..f37eb66
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.runtime.io.network.partition.consumer.StreamTestSingleInputGate;
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
+import org.apache.flink.streaming.api.graph.StreamEdge;
+import org.apache.flink.streaming.api.graph.StreamNode;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+
+
+/**
+ * Test harness for testing a {@link TwoInputStreamTask}.
+ *
+ * <p>
+ * This mock Invokable provides the task with a basic runtime context and allows pushing elements
+ * and watermarks into the task. {@link #getOutput()} can be used to get the emitted elements
+ * and events. You are free to modify the retrieved list.
+ *
+ * <p>
+ * After setting up everything the Task can be invoked using {@link #invoke()}. This will start
+ * a new Thread to execute the Task. Use {@link #waitForTaskCompletion()} to wait for the Task
+ * thread to finish. Use {@link #processElement}
+ * to send elements to the task. Use
+ * {@link #processEvent(org.apache.flink.runtime.event.task.AbstractEvent)} to send events to the task.
+ * Before waiting for the task to finish you must call {@link #endInput()} to signal to the task
+ * that data entry is finished.
+ *
+ * <p>
+ * When Elements or Events are offered to the Task they are put into a queue. The input gates
+ * of the Task read from this queue. Use {@link #waitForInputProcessing()} to wait until all
+ * queues are empty. This must be used after entering some elements before checking the
+ * desired output.
+ *
+ * <p>
+ * When using this you need to add the following line to your test class to setup Powermock:
+ * {@code @PrepareForTest({ResultPartitionWriter.class})}
+ */
+public class TwoInputStreamTaskTestHarness<IN1, IN2, OUT> extends StreamTaskTestHarness<OUT> {
+
+ private TypeInformation<IN1> inputType1;
+ private TypeSerializer<IN1> inputSerializer1;
+
+ private TypeInformation<IN2> inputType2;
+ private TypeSerializer<IN2> inputSerializer2;
+
+ private int[] inputGateAssignment;
+
+ /**
+ * Creates a test harness with the specified number of input gates and specified number
+ * of channels per input gate. Parameter inputGateAssignment specifies for each gate whether
+ * it should be assigned to the first (1), or second (2) input of the task.
+ */
+ public TwoInputStreamTaskTestHarness(TwoInputStreamTask<IN1, IN2, OUT> task,
+ int numInputGates,
+ int numInputChannelsPerGate,
+ int[] inputGateAssignment,
+ TypeInformation<IN1> inputType1,
+ TypeInformation<IN2> inputType2,
+ TypeInformation<OUT> outputType) {
+ super(task, outputType);
+
+ this.inputType1 = inputType1;
+ inputSerializer1 = inputType1.createSerializer(executionConfig);
+
+ this.inputType2 = inputType2;
+ inputSerializer2 = inputType2.createSerializer(executionConfig);
+
+ this.numInputGates = numInputGates;
+ this.numInputChannelsPerGate = numInputChannelsPerGate;
+ this.inputGateAssignment = inputGateAssignment;
+ }
+
+ /**
+ * Creates a test harness with one input gate (that has one input channel) per input. The first
+ * input gate is assigned to the first task input, the second input gate is assigned to the
+ * second task input.
+ */
+ public TwoInputStreamTaskTestHarness(TwoInputStreamTask<IN1, IN2, OUT> task,
+ TypeInformation<IN1> inputType1,
+ TypeInformation<IN2> inputType2,
+ TypeInformation<OUT> outputType) {
+ this(task, 2, 1, new int[] {1, 2}, inputType1, inputType2, outputType);
+ }
+
+ @Override
+ protected void initializeInputs() throws IOException, InterruptedException {
+
+ inputGates = new StreamTestSingleInputGate[numInputGates];
+ List<StreamEdge> inPhysicalEdges = new LinkedList<StreamEdge>();
+
+ StreamOperator<IN1> dummyOperator = new AbstractStreamOperator<IN1>() {
+ private static final long serialVersionUID = 1L;
+ };
+
+ StreamNode sourceVertexDummy = new StreamNode(null, 0, dummyOperator, "source dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class);
+ StreamNode targetVertexDummy = new StreamNode(null, 1, dummyOperator, "target dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class);
+
+ for (int i = 0; i < numInputGates; i++) {
+
+ switch (inputGateAssignment[i]) {
+ case 1: {
+ inputGates[i] = new StreamTestSingleInputGate<IN1>(
+ numInputChannelsPerGate,
+ bufferSize,
+ inputSerializer1);
+
+
+ StreamEdge streamEdge = new StreamEdge(sourceVertexDummy,
+ targetVertexDummy,
+ 1,
+ new LinkedList<String>(),
+ new BroadcastPartitioner<Object>());
+
+ inPhysicalEdges.add(streamEdge);
+ break;
+ }
+ case 2: {
+ inputGates[i] = new StreamTestSingleInputGate<IN2>(
+ numInputChannelsPerGate,
+ bufferSize,
+ inputSerializer2);
+
+ StreamEdge streamEdge = new StreamEdge(sourceVertexDummy,
+ targetVertexDummy,
+ 2,
+ new LinkedList<String>(),
+ new BroadcastPartitioner<Object>());
+
+ inPhysicalEdges.add(streamEdge);
+ break;
+ }
+ default:
+ throw new IllegalStateException("Wrong input gate assignment.");
+ }
+
+ this.mockEnv.addInputGate(inputGates[i].getInputGate());
+ }
+
+ streamConfig.setInPhysicalEdges(inPhysicalEdges);
+ streamConfig.setNumberOfInputs(numInputGates);
+ streamConfig.setTypeSerializerIn1(inputSerializer1);
+ streamConfig.setTypeSerializerIn2(inputSerializer2);
+ }
+
+}
+