You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2016/12/20 05:05:39 UTC
[3/7] flink git commit: [FLINK-4391] Polish asynchronous I/O
operations
http://git-wip-us.apache.org/repos/asf/flink/blob/ad603d59/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
index 560ee5a..10ee14f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperatorTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.streaming.api.operators.async;
import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
@@ -36,8 +37,10 @@ import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.async.RichAsyncFunction;
import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
-import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.async.queue.StreamElementQueue;
+import org.apache.flink.streaming.api.operators.async.queue.StreamElementQueueEntry;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
@@ -45,17 +48,15 @@ import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
+import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;
-import java.util.ArrayList;
+import java.util.ArrayDeque;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
-import java.util.List;
import java.util.Queue;
-import java.util.Random;
-import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
@@ -74,19 +75,16 @@ import static org.junit.Assert.assertEquals;
* <li>Snapshot state and restore state</li>
* </ul>
*/
-public class AsyncWaitOperatorTest {
-
- // hold sink result
- private static Queue<Object> sinkResult;
+public class AsyncWaitOperatorTest extends TestLogger {
private static class MyAsyncFunction extends RichAsyncFunction<Integer, Integer> {
- final int SLEEP_FACTOR = 100;
- final int THREAD_POOL_SIZE = 10;
+ private static final long serialVersionUID = 8522411971886428444L;
- transient static ExecutorService executorService;
- static int counter = 0;
+ private static final long TIMEOUT = 5000L;
+ private static final int THREAD_POOL_SIZE = 10;
- static Random random = new Random();
+ static ExecutorService executorService;
+ static int counter = 0;
@Override
public void open(Configuration parameters) throws Exception {
@@ -105,33 +103,35 @@ public class AsyncWaitOperatorTest {
public void close() throws Exception {
super.close();
+ freeExecutor();
+ }
+
+ private void freeExecutor() {
synchronized (MyAsyncFunction.class) {
--counter;
if (counter == 0) {
executorService.shutdown();
- executorService.awaitTermination(SLEEP_FACTOR * THREAD_POOL_SIZE, TimeUnit.MILLISECONDS);
+
+ try {
+ if (!executorService.awaitTermination(TIMEOUT, TimeUnit.MILLISECONDS)) {
+ executorService.shutdownNow();
+ }
+ } catch (InterruptedException interrupted) {
+ executorService.shutdownNow();
+
+ Thread.currentThread().interrupt();
+ }
}
}
}
@Override
public void asyncInvoke(final Integer input, final AsyncCollector<Integer> collector) throws Exception {
- this.executorService.submit(new Runnable() {
+ executorService.submit(new Runnable() {
@Override
public void run() {
- // wait for while to simulate async operation here
- int sleep = (int) (random.nextFloat() * SLEEP_FACTOR);
-
- try {
- Thread.sleep(sleep);
- List<Integer> ret = new ArrayList<>();
- ret.add(input*2);
- collector.collect(ret);
- }
- catch (InterruptedException e) {
- // do nothing
- }
+ collector.collect(Collections.singletonList(input * 2));
}
});
}
@@ -141,11 +141,13 @@ public class AsyncWaitOperatorTest {
* A special {@link org.apache.flink.streaming.api.functions.async.AsyncFunction} without issuing
* {@link AsyncCollector#collect} until the latch counts to zero.
* This function is used in the testStateSnapshotAndRestore, ensuring
- * that {@link org.apache.flink.streaming.api.functions.async.buffer.StreamElementEntry} can stay
- * in the {@link org.apache.flink.streaming.api.functions.async.buffer.AsyncCollectorBuffer} to be
+ * that {@link StreamElementQueueEntry} can stay
+ * in the {@link StreamElementQueue} to be
* snapshotted while checkpointing.
*/
private static class LazyAsyncFunction extends MyAsyncFunction {
+ private static final long serialVersionUID = 3537791752703154670L;
+
private static CountDownLatch latch;
public LazyAsyncFunction() {
@@ -200,17 +202,23 @@ public class AsyncWaitOperatorTest {
}
}
+ /**
+ * Test the AsyncWaitOperator with ordered mode and event time.
+ */
@Test
- public void testWaterMarkOrdered() throws Exception {
- testWithWatermark(AsyncDataStream.OutputMode.ORDERED);
+ public void testEventTimeOrdered() throws Exception {
+ testEventTime(AsyncDataStream.OutputMode.ORDERED);
}
+ /**
+ * Test the AsyncWaitOperator with unordered mode and event time.
+ */
@Test
public void testWaterMarkUnordered() throws Exception {
- testWithWatermark(AsyncDataStream.OutputMode.UNORDERED);
+ testEventTime(AsyncDataStream.OutputMode.UNORDERED);
}
- private void testWithWatermark(AsyncDataStream.OutputMode mode) throws Exception {
+ private void testEventTime(AsyncDataStream.OutputMode mode) throws Exception {
final AsyncWaitOperator<Integer, Integer> operator = new AsyncWaitOperator<>(new MyAsyncFunction(), 2, mode);
final OneInputStreamOperatorTestHarness<Integer, Integer> testHarness =
@@ -255,40 +263,42 @@ public class AsyncWaitOperatorTest {
}
}
+ /**
+ * Test the AsyncWaitOperator with ordered mode and processing time.
+ */
@Test
- public void testOrdered() throws Exception {
- testRun(AsyncDataStream.OutputMode.ORDERED);
+ public void testProcessingTimeOrdered() throws Exception {
+ testProcessingTime(AsyncDataStream.OutputMode.ORDERED);
}
+ /**
+ * Test the AsyncWaitOperator with unordered mode and processing time.
+ */
@Test
- public void testUnordered() throws Exception {
- testRun(AsyncDataStream.OutputMode.UNORDERED);
+ public void testProcessingUnordered() throws Exception {
+ testProcessingTime(AsyncDataStream.OutputMode.UNORDERED);
}
- private void testRun(AsyncDataStream.OutputMode mode) throws Exception {
- final OneInputStreamTask<Integer, Integer> task = new OneInputStreamTask<>();
- final OneInputStreamTaskTestHarness<Integer, Integer> testHarness =
- new OneInputStreamTaskTestHarness<>(task, 1, 1, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO);
-
+ private void testProcessingTime(AsyncDataStream.OutputMode mode) throws Exception {
final AsyncWaitOperator<Integer, Integer> operator = new AsyncWaitOperator<>(new MyAsyncFunction(), 6, mode);
- final StreamConfig streamConfig = testHarness.getStreamConfig();
- streamConfig.setStreamOperator(operator);
-
- testHarness.invoke();
- testHarness.waitForTaskRunning();
+ final OneInputStreamOperatorTestHarness<Integer, Integer> testHarness = new OneInputStreamOperatorTestHarness<>(operator, IntSerializer.INSTANCE);
final long initialTime = 0L;
- final ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
+ final Queue<Object> expectedOutput = new ArrayDeque<>();
- testHarness.processElement(new StreamRecord<>(1, initialTime + 1));
- testHarness.processElement(new StreamRecord<>(2, initialTime + 2));
- testHarness.processElement(new StreamRecord<>(3, initialTime + 3));
- testHarness.processElement(new StreamRecord<>(4, initialTime + 4));
- testHarness.processElement(new StreamRecord<>(5, initialTime + 5));
- testHarness.processElement(new StreamRecord<>(6, initialTime + 6));
- testHarness.processElement(new StreamRecord<>(7, initialTime + 7));
- testHarness.processElement(new StreamRecord<>(8, initialTime + 8));
+ testHarness.open();
+
+ synchronized (testHarness.getCheckpointLock()) {
+ testHarness.processElement(new StreamRecord<>(1, initialTime + 1));
+ testHarness.processElement(new StreamRecord<>(2, initialTime + 2));
+ testHarness.processElement(new StreamRecord<>(3, initialTime + 3));
+ testHarness.processElement(new StreamRecord<>(4, initialTime + 4));
+ testHarness.processElement(new StreamRecord<>(5, initialTime + 5));
+ testHarness.processElement(new StreamRecord<>(6, initialTime + 6));
+ testHarness.processElement(new StreamRecord<>(7, initialTime + 7));
+ testHarness.processElement(new StreamRecord<>(8, initialTime + 8));
+ }
expectedOutput.add(new StreamRecord<>(2, initialTime + 1));
expectedOutput.add(new StreamRecord<>(4, initialTime + 2));
@@ -299,11 +309,9 @@ public class AsyncWaitOperatorTest {
expectedOutput.add(new StreamRecord<>(14, initialTime + 7));
expectedOutput.add(new StreamRecord<>(16, initialTime + 8));
- testHarness.waitForInputProcessing();
-
- testHarness.endInput();
-
- testHarness.waitForTaskCompletion();
+ synchronized (testHarness.getCheckpointLock()) {
+ testHarness.close();
+ }
if (mode == AsyncDataStream.OutputMode.ORDERED) {
TestHarnessUtil.assertOutputEquals("ORDERED Output was not correct.", expectedOutput, testHarness.getOutput());
@@ -317,6 +325,54 @@ public class AsyncWaitOperatorTest {
}
}
+ /**
+ * Tests that the AsyncWaitOperator works together with chaining
+ */
+ @Test
+ public void testOperatorChainWithProcessingTime() throws Exception {
+
+ JobVertex chainedVertex = createChainedVertex(false);
+
+ final OneInputStreamTask<Integer, Integer> task = new OneInputStreamTask<>();
+ final OneInputStreamTaskTestHarness<Integer, Integer> testHarness =
+ new OneInputStreamTaskTestHarness<>(task, 1, 1, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO);
+
+ testHarness.taskConfig = chainedVertex.getConfiguration();
+
+ final StreamConfig streamConfig = testHarness.getStreamConfig();
+ final StreamConfig operatorChainStreamConfig = new StreamConfig(chainedVertex.getConfiguration());
+ final AsyncWaitOperator<Integer, Integer> headOperator =
+ operatorChainStreamConfig.getStreamOperator(AsyncWaitOperatorTest.class.getClassLoader());
+ streamConfig.setStreamOperator(headOperator);
+
+ testHarness.invoke();
+ testHarness.waitForTaskRunning();
+
+ long initialTimestamp = 0L;
+
+ testHarness.processElement(new StreamRecord<>(5, initialTimestamp));
+ testHarness.processElement(new StreamRecord<>(6, initialTimestamp + 1L));
+ testHarness.processElement(new StreamRecord<>(7, initialTimestamp + 2L));
+ testHarness.processElement(new StreamRecord<>(8, initialTimestamp + 3L));
+ testHarness.processElement(new StreamRecord<>(9, initialTimestamp + 4L));
+
+ testHarness.endInput();
+ testHarness.waitForTaskCompletion();
+
+ ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+ expectedOutput.add(new StreamRecord<>(22, initialTimestamp));
+ expectedOutput.add(new StreamRecord<>(26, initialTimestamp + 1L));
+ expectedOutput.add(new StreamRecord<>(30, initialTimestamp + 2L));
+ expectedOutput.add(new StreamRecord<>(34, initialTimestamp + 3L));
+ expectedOutput.add(new StreamRecord<>(38, initialTimestamp + 4L));
+
+ TestHarnessUtil.assertOutputEqualsSorted(
+ "Test for chained operator with AsyncWaitOperator failed",
+ expectedOutput,
+ testHarness.getOutput(),
+ new StreamRecordComparator());
+ }
+
private JobVertex createChainedVertex(boolean withLazyFunction) {
StreamExecutionEnvironment chainEnv = StreamExecutionEnvironment.getExecutionEnvironment();
@@ -353,144 +409,23 @@ public class AsyncWaitOperatorTest {
input = AsyncDataStream.unorderedWait(input, new MyAsyncFunction(), 3);
- input.addSink(new SinkFunction<Integer>() {
- private static final long serialVersionUID = 1L;
+ input.map(new MapFunction<Integer, Integer>() {
+ private static final long serialVersionUID = 5162085254238405527L;
@Override
- public void invoke(Integer value) throws Exception {
- sinkResult.add(value);
+ public Integer map(Integer value) throws Exception {
+ return value;
}
- });
+ }).startNewChain().addSink(new DiscardingSink<Integer>());
// be build our own OperatorChain
final JobGraph jobGraph = chainEnv.getStreamGraph().getJobGraph();
- Assert.assertTrue(jobGraph.getVerticesSortedTopologicallyFromSources().size() == 2);
+ Assert.assertTrue(jobGraph.getVerticesSortedTopologicallyFromSources().size() == 3);
return jobGraph.getVerticesSortedTopologicallyFromSources().get(1);
}
- /**
- * Get the {@link SubtaskState} for the operator chain. The state will keep several inputs.
- *
- * @return A {@link SubtaskState}
- * @throws Exception
- */
- private SubtaskState createTaskState() throws Exception {
- sinkResult = new ConcurrentLinkedDeque<>();
-
- final OneInputStreamTask<Integer, Integer> task = new OneInputStreamTask<>();
- final OneInputStreamTaskTestHarness<Integer, Integer> testHarness =
- new OneInputStreamTaskTestHarness<>(task, 1, 1, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO);
-
- JobVertex chainedVertex = createChainedVertex(true);
-
- testHarness.taskConfig = chainedVertex.getConfiguration();
-
- final AcknowledgeStreamMockEnvironment env = new AcknowledgeStreamMockEnvironment(
- testHarness.jobConfig,
- testHarness.taskConfig,
- testHarness.getExecutionConfig(),
- testHarness.memorySize,
- new MockInputSplitProvider(),
- testHarness.bufferSize);
-
- final StreamConfig streamConfig = testHarness.getStreamConfig();
- final StreamConfig operatorChainStreamConfig = new StreamConfig(chainedVertex.getConfiguration());
- final AsyncWaitOperator<Integer, Integer> headOperator =
- operatorChainStreamConfig.getStreamOperator(Thread.currentThread().getContextClassLoader());
- streamConfig.setStreamOperator(headOperator);
-
- testHarness.invoke(env);
- testHarness.waitForTaskRunning();
-
- testHarness.processElement(new StreamRecord<>(1));
- testHarness.processElement(new StreamRecord<>(2));
- testHarness.processElement(new StreamRecord<>(3));
- testHarness.processElement(new StreamRecord<>(4));
-
- testHarness.waitForInputProcessing();
-
- final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(1L, 1L);
-
- task.triggerCheckpoint(checkpointMetaData);
-
- env.getCheckpointLatch().await();
-
- assertEquals(1L, env.getCheckpointId());
-
- LazyAsyncFunction.countDown();
-
- testHarness.endInput();
- testHarness.waitForTaskCompletion();
-
- return env.getCheckpointStateHandles();
- }
-
- @Test
- public void testOperatorChain() throws Exception {
-
- JobVertex chainedVertex = createChainedVertex(false);
-
- final OneInputStreamTask<Integer, Integer> task = new OneInputStreamTask<>();
- final OneInputStreamTaskTestHarness<Integer, Integer> testHarness =
- new OneInputStreamTaskTestHarness<>(task, 1, 1, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO);
-
- task.setInitialState(new TaskStateHandles(createTaskState()));
-
- sinkResult = new ConcurrentLinkedDeque<>();
-
- testHarness.taskConfig = chainedVertex.getConfiguration();
-
- final AcknowledgeStreamMockEnvironment env = new AcknowledgeStreamMockEnvironment(
- testHarness.jobConfig,
- testHarness.taskConfig,
- testHarness.getExecutionConfig(),
- testHarness.memorySize,
- new MockInputSplitProvider(),
- testHarness.bufferSize);
-
- final StreamConfig streamConfig = testHarness.getStreamConfig();
- final StreamConfig operatorChainStreamConfig = new StreamConfig(chainedVertex.getConfiguration());
- final AsyncWaitOperator<Integer, Integer> headOperator =
- operatorChainStreamConfig.getStreamOperator(Thread.currentThread().getContextClassLoader());
- streamConfig.setStreamOperator(headOperator);
-
- testHarness.invoke(env);
- testHarness.waitForTaskRunning();
-
- testHarness.processElement(new StreamRecord<>(5));
- testHarness.processElement(new StreamRecord<>(6));
- testHarness.processElement(new StreamRecord<>(7));
- testHarness.processElement(new StreamRecord<>(8));
- testHarness.processElement(new StreamRecord<>(9));
-
- testHarness.endInput();
- testHarness.waitForTaskCompletion();
-
- ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
- expectedOutput.add(6);
- expectedOutput.add(10);
- expectedOutput.add(14);
- expectedOutput.add(18);
- expectedOutput.add(22);
- expectedOutput.add(26);
- expectedOutput.add(30);
- expectedOutput.add(34);
- expectedOutput.add(38);
-
- TestHarnessUtil.assertOutputEqualsSorted(
- "Test for chained operator with AsyncWaitOperator failed",
- expectedOutput,
- sinkResult,
- new Comparator<Object>() {
- @Override
- public int compare(Object o1, Object o2) {
- return (Integer)o1 - (Integer)o2;
- }
- });
- }
-
@Test
public void testStateSnapshotAndRestore() throws Exception {
final OneInputStreamTask<Integer, Integer> task = new OneInputStreamTask<>();
@@ -498,7 +433,7 @@ public class AsyncWaitOperatorTest {
new OneInputStreamTaskTestHarness<>(task, 1, 1, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO);
AsyncWaitOperator<Integer, Integer> operator =
- new AsyncWaitOperator<>(new LazyAsyncFunction(), 6, AsyncDataStream.OutputMode.ORDERED);
+ new AsyncWaitOperator<>(new LazyAsyncFunction(), 3, AsyncDataStream.OutputMode.ORDERED);
final StreamConfig streamConfig = testHarness.getStreamConfig();
streamConfig.setStreamOperator(operator);
http://git-wip-us.apache.org/repos/asf/flink/blob/ad603d59/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/EmitterTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/EmitterTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/EmitterTest.java
new file mode 100644
index 0000000..c3a47aa
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/EmitterTest.java
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async;
+
+import org.apache.flink.streaming.api.functions.async.collector.AsyncCollector;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.operators.async.queue.OrderedStreamElementQueue;
+import org.apache.flink.streaming.api.operators.async.queue.StreamElementQueue;
+import org.apache.flink.streaming.api.operators.async.queue.StreamRecordQueueEntry;
+import org.apache.flink.streaming.api.operators.async.queue.WatermarkQueueEntry;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.CollectorOutput;
+import org.apache.flink.util.TestLogger;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+public class EmitterTest extends TestLogger {
+
+ private static final long timeout = 10000L;
+ private static ExecutorService executor;
+
+ @BeforeClass
+ public static void setup() {
+ executor = Executors.newFixedThreadPool(3);
+ }
+
+ @AfterClass
+ public static void shutdown() {
+ executor.shutdown();
+
+ try {
+ if (!executor.awaitTermination(timeout, TimeUnit.MILLISECONDS)) {
+ executor.shutdownNow();
+ }
+ } catch (InterruptedException interrupted) {
+ executor.shutdownNow();
+
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ /**
+ * Tests that the emitter outputs completed stream element queue entries.
+ */
+ @Test
+ public void testEmitterWithOrderedQueue() throws Exception {
+ Object lock = new Object();
+ List<StreamElement> list = new ArrayList<>();
+ Output<StreamRecord<Integer>> output = new CollectorOutput<>(list);
+
+ List<StreamElement> expected = Arrays.asList(
+ new StreamRecord<>(1, 0L),
+ new StreamRecord<>(2, 0L),
+ new StreamRecord<>(3, 1L),
+ new StreamRecord<>(4, 1L),
+ new Watermark(3L),
+ new StreamRecord<>(5, 4L),
+ new StreamRecord<>(6, 4L));
+
+ OperatorActions operatorActions = mock(OperatorActions.class);
+
+ final int capacity = 5;
+
+ StreamElementQueue queue = new OrderedStreamElementQueue(capacity, executor, operatorActions);
+
+ final Emitter<Integer> emitter = new Emitter<>(lock, output, queue, operatorActions);
+
+ final Thread emitterThread = new Thread(emitter);
+ emitterThread.start();
+
+ try {
+ StreamRecordQueueEntry<Integer> record1 = new StreamRecordQueueEntry<>(new StreamRecord<>(1, 0L));
+ StreamRecordQueueEntry<Integer> record2 = new StreamRecordQueueEntry<>(new StreamRecord<>(2, 1L));
+ WatermarkQueueEntry watermark1 = new WatermarkQueueEntry(new Watermark(3L));
+ StreamRecordQueueEntry<Integer> record3 = new StreamRecordQueueEntry<>(new StreamRecord<>(3, 4L));
+
+ queue.put(record1);
+ queue.put(record2);
+ queue.put(watermark1);
+ queue.put(record3);
+
+ record2.collect(Arrays.asList(3, 4));
+ record1.collect(Arrays.asList(1, 2));
+ record3.collect(Arrays.asList(5, 6));
+
+ synchronized (lock) {
+ while (!queue.isEmpty()) {
+ lock.wait();
+ }
+ }
+
+ Assert.assertEquals(expected, list);
+ } finally {
+ emitter.stop();
+ emitterThread.interrupt();
+ }
+ }
+
+ /**
+ * Tests that the emitter handles exceptions occurring in the {@link AsyncCollector} correctly.
+ */
+ @Test
+ public void testEmitterWithExceptions() throws Exception {
+ Object lock = new Object();
+ List<StreamElement> list = new ArrayList<>();
+ Output<StreamRecord<Integer>> output = new CollectorOutput<>(list);
+
+ List<StreamElement> expected = Arrays.asList(
+ new StreamRecord<>(1, 0L),
+ new Watermark(3L));
+
+ OperatorActions operatorActions = mock(OperatorActions.class);
+
+ final int capacity = 3;
+
+ StreamElementQueue queue = new OrderedStreamElementQueue(capacity, executor, operatorActions);
+
+ final Emitter<Integer> emitter = new Emitter<>(lock, output, queue, operatorActions);
+
+ final Thread emitterThread = new Thread(emitter);
+ emitterThread.start();
+
+ final Exception testException = new Exception("Test exception");
+
+ try {
+ StreamRecordQueueEntry<Integer> record1 = new StreamRecordQueueEntry<>(new StreamRecord<>(1, 0L));
+ StreamRecordQueueEntry<Integer> record2 = new StreamRecordQueueEntry<>(new StreamRecord<>(2, 1L));
+ WatermarkQueueEntry watermark1 = new WatermarkQueueEntry(new Watermark(3L));
+
+ queue.put(record1);
+ queue.put(record2);
+ queue.put(watermark1);
+
+ record2.collect(testException);
+ record1.collect(Arrays.asList(1));
+
+ synchronized (lock) {
+ while (!queue.isEmpty()) {
+ lock.wait();
+ }
+ }
+
+ Assert.assertEquals(expected, list);
+
+ ArgumentCaptor<Throwable> argumentCaptor = ArgumentCaptor.forClass(Throwable.class);
+
+ verify(operatorActions).failOperator(argumentCaptor.capture());
+
+ Throwable failureCause = argumentCaptor.getValue();
+
+ Assert.assertNotNull(failureCause.getCause());
+ Assert.assertTrue(failureCause.getCause() instanceof ExecutionException);
+
+ Assert.assertNotNull(failureCause.getCause().getCause());
+ Assert.assertEquals(testException, failureCause.getCause().getCause());
+ } finally {
+ emitter.stop();
+ emitterThread.interrupt();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ad603d59/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueueTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueueTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueueTest.java
new file mode 100644
index 0000000..0380512
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueueTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async.queue;
+
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
+import org.apache.flink.streaming.api.operators.async.OperatorActions;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.TestLogger;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
+/**
+ * {@link OrderedStreamElementQueue} specific tests
+ */
+public class OrderedStreamElementQueueTest extends TestLogger {
+
+ private static final long timeout = 10000L;
+ private static ExecutorService executor;
+
+ @BeforeClass
+ public static void setup() {
+ executor = Executors.newFixedThreadPool(3);
+ }
+
+ @AfterClass
+ public static void shutdown() {
+ executor.shutdown();
+
+ try {
+ if (!executor.awaitTermination(timeout, TimeUnit.MILLISECONDS)) {
+ executor.shutdownNow();
+ }
+ } catch (InterruptedException interrupted) {
+ executor.shutdownNow();
+
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ /**
+ * Tests that only the head element is pulled from the ordered queue if it has been
+ * completed.
+ */
+ @Test
+ public void testCompletionOrder() throws Exception {
+ OperatorActions operatorActions = mock(OperatorActions.class);
+ final OrderedStreamElementQueue queue = new OrderedStreamElementQueue(4, executor, operatorActions);
+
+ StreamRecordQueueEntry<Integer> entry1 = new StreamRecordQueueEntry<>(new StreamRecord<>(1, 0L));
+ StreamRecordQueueEntry<Integer> entry2 = new StreamRecordQueueEntry<>(new StreamRecord<>(2, 1L));
+ WatermarkQueueEntry entry3 = new WatermarkQueueEntry(new Watermark(2L));
+ StreamRecordQueueEntry<Integer> entry4 = new StreamRecordQueueEntry<>(new StreamRecord<>(3, 3L));
+
+ List<StreamElementQueueEntry<?>> expected = Arrays.asList(entry1, entry2, entry3, entry4);
+
+ for (StreamElementQueueEntry<?> entry : expected) {
+ queue.put(entry);
+ }
+
+ Future<List<AsyncResult>> pollOperation = FlinkFuture.supplyAsync(new Callable<List<AsyncResult>>() {
+ @Override
+ public List<AsyncResult> call() throws Exception {
+ List<AsyncResult> result = new ArrayList<>(4);
+ while (!queue.isEmpty()) {
+ result.add(queue.poll());
+ }
+
+ return result;
+ }
+ }, executor);
+
+ Thread.sleep(10L);
+
+ Assert.assertFalse(pollOperation.isDone());
+
+ entry2.collect(Collections.<Integer>emptyList());
+
+ entry4.collect(Collections.<Integer>emptyList());
+
+ Thread.sleep(10L);
+
+ Assert.assertEquals(4, queue.size());
+
+ entry1.collect(Collections.<Integer>emptyList());
+
+ Assert.assertEquals(expected, pollOperation.get());
+
+ verify(operatorActions, never()).failOperator(any(Exception.class));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ad603d59/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueTest.java
new file mode 100644
index 0000000..c9e59c7
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/StreamElementQueueTest.java
@@ -0,0 +1,263 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async.queue;
+
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
+import org.apache.flink.streaming.api.operators.async.OperatorActions;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.flink.streaming.api.operators.async.queue.StreamElementQueueTest.StreamElementQueueType.OrderedStreamElementQueueType;
+import static org.apache.flink.streaming.api.operators.async.queue.StreamElementQueueTest.StreamElementQueueType.UnorderedStreamElementQueueType;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Tests for the basic functionality of {@link StreamElementQueue}. The basic operations consist
+ * of putting and polling elements from the queue.
+ */
+@RunWith(Parameterized.class)
+public class StreamElementQueueTest extends TestLogger {
+
+ private static final long timeout = 10000L;
+ private static ExecutorService executor;
+
+ @BeforeClass
+ public static void setup() {
+ executor = Executors.newFixedThreadPool(3);
+ }
+
+ @AfterClass
+ public static void shutdown() {
+ executor.shutdown();
+
+ try {
+ if (!executor.awaitTermination(timeout, TimeUnit.MILLISECONDS)) {
+ executor.shutdownNow();
+ }
+ } catch (InterruptedException interrupted) {
+ executor.shutdownNow();
+
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ enum StreamElementQueueType {
+ OrderedStreamElementQueueType,
+ UnorderedStreamElementQueueType
+ }
+
+ @Parameterized.Parameters
+ public static Collection<StreamElementQueueType> streamElementQueueTypes() {
+ return Arrays.asList(OrderedStreamElementQueueType, UnorderedStreamElementQueueType);
+ }
+
+ private final StreamElementQueueType streamElementQueueType;
+
+ public StreamElementQueueTest(StreamElementQueueType streamElementQueueType) {
+ this.streamElementQueueType = Preconditions.checkNotNull(streamElementQueueType);
+ }
+
+ public StreamElementQueue createStreamElementQueue(int capacity, OperatorActions operatorActions) {
+ switch (streamElementQueueType) {
+ case OrderedStreamElementQueueType:
+ return new OrderedStreamElementQueue(capacity, executor, operatorActions);
+ case UnorderedStreamElementQueueType:
+ return new UnorderedStreamElementQueue(capacity, executor, operatorActions);
+ default:
+ throw new IllegalStateException("Unknown stream element queue type: " + streamElementQueueType);
+ }
+ }
+
+ @Test
+ public void testPut() throws InterruptedException {
+ OperatorActions operatorActions = mock(OperatorActions.class);
+ StreamElementQueue queue = createStreamElementQueue(2, operatorActions);
+
+ final Watermark watermark = new Watermark(0L);
+ final StreamRecord<Integer> streamRecord = new StreamRecord<>(42, 1L);
+ final Watermark nextWatermark = new Watermark(2L);
+
+ final WatermarkQueueEntry watermarkQueueEntry = new WatermarkQueueEntry(watermark);
+ final StreamRecordQueueEntry<Integer> streamRecordQueueEntry = new StreamRecordQueueEntry<>(streamRecord);
+
+ queue.put(watermarkQueueEntry);
+ queue.put(streamRecordQueueEntry);
+
+ Assert.assertEquals(2, queue.size());
+
+ Assert.assertFalse(queue.tryPut(new WatermarkQueueEntry(nextWatermark)));
+
+ Collection<StreamElementQueueEntry<?>> actualValues = queue.values();
+
+ List<StreamElementQueueEntry<?>> expectedValues = Arrays.asList(watermarkQueueEntry, streamRecordQueueEntry);
+
+ Assert.assertEquals(expectedValues, actualValues);
+
+ verify(operatorActions, never()).failOperator(any(Exception.class));
+ }
+
+ @Test
+ public void testPoll() throws InterruptedException {
+ OperatorActions operatorActions = mock(OperatorActions.class);
+ StreamElementQueue queue = createStreamElementQueue(2, operatorActions);
+
+ WatermarkQueueEntry watermarkQueueEntry = new WatermarkQueueEntry(new Watermark(0L));
+ StreamRecordQueueEntry<Integer> streamRecordQueueEntry = new StreamRecordQueueEntry<>(new StreamRecord<>(42, 1L));
+
+ queue.put(watermarkQueueEntry);
+ queue.put(streamRecordQueueEntry);
+
+ Assert.assertEquals(watermarkQueueEntry, queue.peekBlockingly());
+ Assert.assertEquals(2, queue.size());
+
+ Assert.assertEquals(watermarkQueueEntry, queue.poll());
+ Assert.assertEquals(1, queue.size());
+
+ streamRecordQueueEntry.collect(Collections.<Integer>emptyList());
+
+ Assert.assertEquals(streamRecordQueueEntry, queue.poll());
+
+ Assert.assertEquals(0, queue.size());
+ Assert.assertTrue(queue.isEmpty());
+
+ verify(operatorActions, never()).failOperator(any(Exception.class));
+ }
+
+ /**
+ * Tests that a put operation blocks if the queue is full.
+ */
+ @Test
+ public void testBlockingPut() throws Exception {
+ OperatorActions operatorActions = mock(OperatorActions.class);
+ final StreamElementQueue queue = createStreamElementQueue(1, operatorActions);
+
+ StreamRecordQueueEntry<Integer> streamRecordQueueEntry = new StreamRecordQueueEntry<>(new StreamRecord<>(42, 0L));
+ final StreamRecordQueueEntry<Integer> streamRecordQueueEntry2 = new StreamRecordQueueEntry<>(new StreamRecord<>(43, 1L));
+
+ queue.put(streamRecordQueueEntry);
+
+ Assert.assertEquals(1, queue.size());
+
+ Future<Void> putOperation = FlinkFuture.supplyAsync(new Callable<Void>() {
+ @Override
+ public Void call() throws Exception {
+ queue.put(streamRecordQueueEntry2);
+
+ return null;
+ }
+ }, executor);
+
+ // give the future a chance to complete
+ Thread.sleep(10L);
+
+ // but it shouldn't ;-)
+ Assert.assertFalse(putOperation.isDone());
+
+ streamRecordQueueEntry.collect(Collections.<Integer>emptyList());
+
+ // polling the completed head element frees the queue again
+ Assert.assertEquals(streamRecordQueueEntry, queue.poll());
+
+ // now the put operation should complete
+ putOperation.get();
+
+ verify(operatorActions, never()).failOperator(any(Exception.class));
+ }
+
+ /**
+ * Test that a poll operation on an empty queue blocks.
+ */
+ @Test
+ public void testBlockingPoll() throws Exception {
+ OperatorActions operatorActions = mock(OperatorActions.class);
+ final StreamElementQueue queue = createStreamElementQueue(1, operatorActions);
+
+ WatermarkQueueEntry watermarkQueueEntry = new WatermarkQueueEntry(new Watermark(1L));
+ StreamRecordQueueEntry<Integer> streamRecordQueueEntry = new StreamRecordQueueEntry<>(new StreamRecord<>(1, 2L));
+
+ Assert.assertTrue(queue.isEmpty());
+
+ Future<AsyncResult> peekOperation = FlinkFuture.supplyAsync(new Callable<AsyncResult>() {
+ @Override
+ public AsyncResult call() throws Exception {
+ return queue.peekBlockingly();
+ }
+ }, executor);
+
+ Thread.sleep(10L);
+
+ Assert.assertFalse(peekOperation.isDone());
+
+ queue.put(watermarkQueueEntry);
+
+ AsyncResult watermarkResult = peekOperation.get();
+
+ Assert.assertEquals(watermarkQueueEntry, watermarkResult);
+ Assert.assertEquals(1, queue.size());
+
+ Assert.assertEquals(watermarkQueueEntry, queue.poll());
+ Assert.assertTrue(queue.isEmpty());
+
+ Future<AsyncResult> pollOperation = FlinkFuture.supplyAsync(new Callable<AsyncResult>() {
+ @Override
+ public AsyncResult call() throws Exception {
+ return queue.poll();
+ }
+ }, executor);
+
+ Thread.sleep(10L);
+
+ Assert.assertFalse(pollOperation.isDone());
+
+ queue.put(streamRecordQueueEntry);
+
+ Thread.sleep(10L);
+
+ Assert.assertFalse(pollOperation.isDone());
+
+ streamRecordQueueEntry.collect(Collections.<Integer>emptyList());
+
+ Assert.assertEquals(streamRecordQueueEntry, pollOperation.get());
+
+ Assert.assertTrue(queue.isEmpty());
+
+ verify(operatorActions, never()).failOperator(any(Exception.class));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ad603d59/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueueTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueueTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueueTest.java
new file mode 100644
index 0000000..0a57f92
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/async/queue/UnorderedStreamElementQueueTest.java
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.async.queue;
+
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
+import org.apache.flink.streaming.api.operators.async.OperatorActions;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.TestLogger;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
+/**
+ * {@link UnorderedStreamElementQueue} specific tests
+ */
+public class UnorderedStreamElementQueueTest extends TestLogger {
+ private static final long timeout = 10000L;
+ private static ExecutorService executor;
+
+ @BeforeClass
+ public static void setup() {
+ executor = Executors.newFixedThreadPool(3);
+ }
+
+ @AfterClass
+ public static void shutdown() {
+ executor.shutdown();
+
+ try {
+ if (!executor.awaitTermination(timeout, TimeUnit.MILLISECONDS)) {
+ executor.shutdownNow();
+ }
+ } catch (InterruptedException interrupted) {
+ executor.shutdownNow();
+
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ /**
+ * Tests that only elements before the oldest watermark are returned if they are completed.
+ */
+ @Test
+ public void testCompletionOrder() throws Exception {
+ OperatorActions operatorActions = mock(OperatorActions.class);
+
+ final UnorderedStreamElementQueue queue = new UnorderedStreamElementQueue(8, executor, operatorActions);
+
+ StreamRecordQueueEntry<Integer> record1 = new StreamRecordQueueEntry<>(new StreamRecord<>(1, 0L));
+ StreamRecordQueueEntry<Integer> record2 = new StreamRecordQueueEntry<>(new StreamRecord<>(2, 1L));
+ WatermarkQueueEntry watermark1 = new WatermarkQueueEntry(new Watermark(2L));
+ StreamRecordQueueEntry<Integer> record3 = new StreamRecordQueueEntry<>(new StreamRecord<>(3, 3L));
+ StreamRecordQueueEntry<Integer> record4 = new StreamRecordQueueEntry<>(new StreamRecord<>(4, 4L));
+ WatermarkQueueEntry watermark2 = new WatermarkQueueEntry(new Watermark(5L));
+ StreamRecordQueueEntry<Integer> record5 = new StreamRecordQueueEntry<>(new StreamRecord<>(5, 6L));
+ StreamRecordQueueEntry<Integer> record6 = new StreamRecordQueueEntry<>(new StreamRecord<>(6, 7L));
+
+ List<StreamElementQueueEntry<?>> entries = Arrays.asList(record1, record2, watermark1, record3,
+ record4, watermark2, record5, record6);
+
+ // The queue should look like R1, R2, W1, R3, R4, W2, R5, R6
+ for (StreamElementQueueEntry<?> entry : entries) {
+ queue.put(entry);
+ }
+
+ Assert.assertTrue(8 == queue.size());
+
+ Future<AsyncResult> firstPoll = FlinkFuture.supplyAsync(new Callable<AsyncResult>() {
+ @Override
+ public AsyncResult call() throws Exception {
+ return queue.poll();
+ }
+ }, executor);
+
+ // this should not fulfill the poll, because R3 is behind W1
+ record3.collect(Collections.<Integer>emptyList());
+
+ Thread.sleep(10L);
+
+ Assert.assertFalse(firstPoll.isDone());
+
+ record2.collect(Collections.<Integer>emptyList());
+
+ Assert.assertEquals(record2, firstPoll.get());
+
+ Future<AsyncResult> secondPoll = FlinkFuture.supplyAsync(new Callable<AsyncResult>() {
+ @Override
+ public AsyncResult call() throws Exception {
+ return queue.poll();
+ }
+ }, executor);
+
+ record6.collect(Collections.<Integer>emptyList());
+ record4.collect(Collections.<Integer>emptyList());
+
+ Thread.sleep(10L);
+
+ // The future should not be completed because R1 has not been completed yet
+ Assert.assertFalse(secondPoll.isDone());
+
+ record1.collect(Collections.<Integer>emptyList());
+
+ Assert.assertEquals(record1, secondPoll.get());
+
+ // Now W1, R3, R4 and W2 are completed and should be pollable
+ Assert.assertEquals(watermark1, queue.poll());
+
+ // The order of R3 and R4 is not specified
+ Set<AsyncResult> expected = new HashSet<>(2);
+ expected.add(record3);
+ expected.add(record4);
+
+ Set<AsyncResult> actual = new HashSet<>(2);
+
+ actual.add(queue.poll());
+ actual.add(queue.poll());
+
+ Assert.assertEquals(expected, actual);
+
+ Assert.assertEquals(watermark2, queue.poll());
+
+ // since R6 has been completed before and W2 has been consumed, we should be able to poll
+ // this record as well
+ Assert.assertEquals(record6, queue.poll());
+
+ // only R5 left in the queue
+ Assert.assertTrue(1 == queue.size());
+
+ Future<AsyncResult> thirdPoll = FlinkFuture.supplyAsync(new Callable<AsyncResult>() {
+ @Override
+ public AsyncResult call() throws Exception {
+ return queue.poll();
+ }
+ }, executor);
+
+ Thread.sleep(10L);
+
+ Assert.assertFalse(thirdPoll.isDone());
+
+ record5.collect(Collections.<Integer>emptyList());
+
+ Assert.assertEquals(record5, thirdPoll.get());
+
+ Assert.assertTrue(queue.isEmpty());
+
+ verify(operatorActions, never()).failOperator(any(Exception.class));
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ad603d59/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
index f87b5ef..e600420 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorTest.java
@@ -40,6 +40,7 @@ import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.streaming.util.CollectorOutput;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
@@ -340,33 +341,4 @@ public class StreamSourceOperatorTest {
running = false;
}
}
-
- // ------------------------------------------------------------------------
-
- private static class CollectorOutput<T> implements Output<StreamRecord<T>> {
-
- private final List<StreamElement> list;
-
- private CollectorOutput(List<StreamElement> list) {
- this.list = list;
- }
-
- @Override
- public void emitWatermark(Watermark mark) {
- list.add(mark);
- }
-
- @Override
- public void emitLatencyMarker(LatencyMarker latencyMarker) {
- list.add(latencyMarker);
- }
-
- @Override
- public void collect(StreamRecord<T> record) {
- list.add(record);
- }
-
- @Override
- public void close() {}
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ad603d59/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectorOutput.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectorOutput.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectorOutput.java
new file mode 100644
index 0000000..fcc8a6c
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/CollectorOutput.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util;
+
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.io.Serializable;
+import java.util.List;
+
+public class CollectorOutput<T> implements Output<StreamRecord<T>> {
+
+ private final List<StreamElement> list;
+
+ public CollectorOutput(List<StreamElement> list) {
+ this.list = list;
+ }
+
+ @Override
+ public void emitWatermark(Watermark mark) {
+ list.add(mark);
+ }
+
+ @Override
+ public void emitLatencyMarker(LatencyMarker latencyMarker) {
+ list.add(latencyMarker);
+ }
+
+ @Override
+ public void collect(StreamRecord<T> record) {
+ T copied = SerializationUtils.deserialize(SerializationUtils.serialize((Serializable) record.getValue()));
+ list.add(record.copy(copied));
+ }
+
+ @Override
+ public void close() {}
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/ad603d59/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
index ea99fe3..3631965 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/StreamingOperatorsITCase.java
@@ -38,6 +38,7 @@ import org.apache.flink.util.MathUtils;
import org.junit.*;
import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -94,6 +95,9 @@ public class StreamingOperatorsITCase extends StreamingMultipleProgramsTestBase
final MemorySinkFunction sinkFunction1 = new MemorySinkFunction(0);
+ final List<Integer> actualResult1 = new ArrayList<>();
+ MemorySinkFunction.registerCollection(0, actualResult1);
+
splittedResult.select("0").map(new MapFunction<Tuple2<Integer,Integer>, Integer>() {
private static final long serialVersionUID = 2114608668010092995L;
@@ -103,9 +107,11 @@ public class StreamingOperatorsITCase extends StreamingMultipleProgramsTestBase
}
}).addSink(sinkFunction1);
-
final MemorySinkFunction sinkFunction2 = new MemorySinkFunction(1);
+ final List<Integer> actualResult2 = new ArrayList<>();
+ MemorySinkFunction.registerCollection(1, actualResult2);
+
splittedResult.select("1").map(new MapFunction<Tuple2<Integer, Integer>, Integer>() {
private static final long serialVersionUID = 5631104389744681308L;
@@ -132,13 +138,11 @@ public class StreamingOperatorsITCase extends StreamingMultipleProgramsTestBase
env.execute();
- Collection<Integer> result1 = sinkFunction1.getResult();
- Collections.sort((ArrayList)result1);
- Collection<Integer> result2 = sinkFunction2.getResult();
- Collections.sort((ArrayList)result2);
+ Collections.sort(actualResult1);
+ Collections.sort(actualResult2);
- Assert.assertArrayEquals(result1.toArray(), expected1.toArray());
- Assert.assertArrayEquals(result2.toArray(), expected2.toArray());
+ Assert.assertEquals(expected1, actualResult1);
+ Assert.assertEquals(expected2, actualResult2);
MemorySinkFunction.clear();
}
@@ -155,6 +159,8 @@ public class StreamingOperatorsITCase extends StreamingMultipleProgramsTestBase
DataStream<Tuple2<Integer, NonSerializable>> input = env.addSource(new NonSerializableTupleSource(numElements));
final MemorySinkFunction sinkFunction = new MemorySinkFunction(0);
+ final ArrayList<Integer> actualResult = new ArrayList<>();
+ MemorySinkFunction.registerCollection(0, actualResult);
input
.keyBy(0)
@@ -186,23 +192,28 @@ public class StreamingOperatorsITCase extends StreamingMultipleProgramsTestBase
env.execute();
- Collection<Integer> result = sinkFunction.getResult();
- Collections.sort((ArrayList)result);
+ Collections.sort(actualResult);
- Assert.assertArrayEquals(result.toArray(), expected.toArray());
+ Assert.assertEquals(expected, actualResult);
MemorySinkFunction.clear();
}
+ /**
+ * Tests the basic functionality of the AsyncWaitOperator: Processing a limited stream of
+ * elements by doubling their value. This is tested in for the ordered and unordered mode.
+ */
@Test
public void testAsyncWaitOperator() throws Exception {
- final int numElements = 10;
+ final int numElements = 5;
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<Integer, NonSerializable>> input = env.addSource(new NonSerializableTupleSource(numElements));
AsyncFunction<Tuple2<Integer, NonSerializable>, Integer> function = new RichAsyncFunction<Tuple2<Integer, NonSerializable>, Integer>() {
+ private static final long serialVersionUID = 7000343199829487985L;
+
transient ExecutorService executorService;
@Override
@@ -214,26 +225,16 @@ public class StreamingOperatorsITCase extends StreamingMultipleProgramsTestBase
@Override
public void close() throws Exception {
super.close();
- executorService.shutdown();
+ executorService.shutdownNow();
}
@Override
public void asyncInvoke(final Tuple2<Integer, NonSerializable> input,
final AsyncCollector<Integer> collector) throws Exception {
- this.executorService.submit(new Runnable() {
+ executorService.submit(new Runnable() {
@Override
public void run() {
- // wait for while to simulate async operation here
- int sleep = (int) (new Random().nextFloat() * 10);
- try {
- Thread.sleep(sleep);
- List<Integer> ret = new ArrayList<>();
- ret.add(input.f0+input.f0);
- collector.collect(ret);
- }
- catch (InterruptedException e) {
- collector.collect(new ArrayList<Integer>(0));
- }
+ collector.collect(Collections.singletonList(input.f0 + input.f0));
}
});
}
@@ -243,6 +244,8 @@ public class StreamingOperatorsITCase extends StreamingMultipleProgramsTestBase
// save result from ordered process
final MemorySinkFunction sinkFunction1 = new MemorySinkFunction(0);
+ final List<Integer> actualResult1 = new ArrayList<>(numElements);
+ MemorySinkFunction.registerCollection(0, actualResult1);
orderedResult.addSink(sinkFunction1).setParallelism(1);
@@ -251,6 +254,8 @@ public class StreamingOperatorsITCase extends StreamingMultipleProgramsTestBase
// save result from unordered process
final MemorySinkFunction sinkFunction2 = new MemorySinkFunction(1);
+ final List<Integer> actualResult2 = new ArrayList<>(numElements);
+ MemorySinkFunction.registerCollection(1, actualResult2);
unorderedResult.addSink(sinkFunction2);
@@ -263,11 +268,10 @@ public class StreamingOperatorsITCase extends StreamingMultipleProgramsTestBase
env.execute();
- Assert.assertArrayEquals(expected.toArray(), sinkFunction1.getResult().toArray());
+ Assert.assertEquals(expected, actualResult1);
- Collection<Integer> result = sinkFunction2.getResult();
- Collections.sort((ArrayList)result);
- Assert.assertArrayEquals(expected.toArray(), result.toArray());
+ Collections.sort(actualResult2);
+ Assert.assertEquals(expected, actualResult2);
MemorySinkFunction.clear();
}
@@ -331,43 +335,31 @@ public class StreamingOperatorsITCase extends StreamingMultipleProgramsTestBase
}
private static class MemorySinkFunction implements SinkFunction<Integer> {
- private final static Collection<Integer> collection1 = new ArrayList<>(10);
+ private static Map<Integer, Collection<Integer>> collections = new ConcurrentHashMap<>();
- private final static Collection<Integer> collection2 = new ArrayList<>(10);
+ private static final long serialVersionUID = -8815570195074103860L;
- private final long serialVersionUID = -8815570195074103860L;
+ private final int key;
- private final int idx;
-
- public MemorySinkFunction(int idx) {
- this.idx = idx;
+ public MemorySinkFunction(int key) {
+ this.key = key;
}
@Override
public void invoke(Integer value) throws Exception {
- if (idx == 0) {
- synchronized (collection1) {
- collection1.add(value);
- }
- }
- else {
- synchronized (collection2) {
- collection2.add(value);
- }
- }
- }
+ Collection<Integer> collection = collections.get(key);
- public Collection<Integer> getResult() {
- if (idx == 0) {
- return collection1;
+ synchronized (collection) {
+ collection.add(value);
}
+ }
- return collection2;
+ public static void registerCollection(int key, Collection<Integer> collection) {
+ collections.put(key, collection);
}
public static void clear() {
- collection1.clear();
- collection2.clear();
+ collections.clear();
}
}
}