You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/21 11:03:36 UTC
[20/51] [partial] flink git commit: [FLINK-2877] Move Streaming API
out of Staging package
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
deleted file mode 100644
index 232485d..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
+++ /dev/null
@@ -1,281 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.tasks;
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
-import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.operators.StreamSource;
-import org.apache.flink.streaming.util.TestHarnessUtil;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import java.io.Serializable;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * These tests verify that the RichFunction methods are called (in correct order). And that
- * checkpointing/element emission don't occur concurrently.
- */
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({ResultPartitionWriter.class})
-public class SourceStreamTaskTest {
-
-
- /**
- * This test verifies that open() and close() are correctly called by the StreamTask.
- */
- @Test
- public void testOpenClose() throws Exception {
- final SourceStreamTask<String> sourceTask = new SourceStreamTask<String>();
- final StreamTaskTestHarness<String> testHarness = new StreamTaskTestHarness<String>(sourceTask, BasicTypeInfo.STRING_TYPE_INFO);
-
- StreamConfig streamConfig = testHarness.getStreamConfig();
- StreamSource<String> sourceOperator = new StreamSource<String>(new OpenCloseTestSource());
- streamConfig.setStreamOperator(sourceOperator);
-
- testHarness.invoke();
- testHarness.waitForTaskCompletion();
-
- Assert.assertTrue("RichFunction methods where not called.", OpenCloseTestSource.closeCalled);
-
- List<String> resultElements = TestHarnessUtil.getRawElementsFromOutput(testHarness.getOutput());
- Assert.assertEquals(10, resultElements.size());
- }
-
- /**
- * This test ensures that the SourceStreamTask properly serializes checkpointing
- * and element emission. This also verifies that there are no concurrent invocations
- * of the checkpoint method on the source operator.
- *
- * The source emits elements and performs checkpoints. We have several checkpointer threads
- * that fire checkpoint requests at the source task.
- *
- * If element emission and checkpointing are not in series the count of elements at the
- * beginning of a checkpoint and at the end of a checkpoint are not the same because the
- * source kept emitting elements while the checkpoint was ongoing.
- */
- @Test
- @SuppressWarnings("unchecked")
- public void testCheckpointing() throws Exception {
- final int NUM_ELEMENTS = 100;
- final int NUM_CHECKPOINTS = 100;
- final int NUM_CHECKPOINTERS = 1;
- final int CHECKPOINT_INTERVAL = 5; // in ms
- final int SOURCE_CHECKPOINT_DELAY = 1000; // how many random values we sum up in storeCheckpoint
- final int SOURCE_READ_DELAY = 1; // in ms
-
- ExecutorService executor = Executors.newFixedThreadPool(10);
- try {
- final TupleTypeInfo<Tuple2<Long, Integer>> typeInfo = new TupleTypeInfo<Tuple2<Long, Integer>>(BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO);
- final SourceStreamTask<Tuple2<Long, Integer>> sourceTask = new SourceStreamTask<Tuple2<Long, Integer>>();
- final StreamTaskTestHarness<Tuple2<Long, Integer>> testHarness = new StreamTaskTestHarness<Tuple2<Long, Integer>>(sourceTask, typeInfo);
-
- StreamConfig streamConfig = testHarness.getStreamConfig();
- StreamSource<Tuple2<Long, Integer>> sourceOperator = new StreamSource<Tuple2<Long, Integer>>(new MockSource(NUM_ELEMENTS, SOURCE_CHECKPOINT_DELAY, SOURCE_READ_DELAY));
- streamConfig.setStreamOperator(sourceOperator);
-
- // prepare the
-
- Future<Boolean>[] checkpointerResults = new Future[NUM_CHECKPOINTERS];
-
- // invoke this first, so the tasks are actually running when the checkpoints are scheduled
- testHarness.invoke();
-
- for (int i = 0; i < NUM_CHECKPOINTERS; i++) {
- checkpointerResults[i] = executor.submit(new Checkpointer(NUM_CHECKPOINTS, CHECKPOINT_INTERVAL, sourceTask));
- }
-
- testHarness.waitForTaskCompletion();
-
- // Get the result from the checkpointers, if these threw an exception it
- // will be rethrown here
- for (int i = 0; i < NUM_CHECKPOINTERS; i++) {
- if (!checkpointerResults[i].isDone()) {
- checkpointerResults[i].cancel(true);
- }
- if (!checkpointerResults[i].isCancelled()) {
- checkpointerResults[i].get();
- }
- }
-
- List<Tuple2<Long, Integer>> resultElements = TestHarnessUtil.getRawElementsFromOutput(testHarness.getOutput());
- Assert.assertEquals(NUM_ELEMENTS, resultElements.size());
- }
- finally {
- executor.shutdown();
- }
- }
-
- private static class MockSource implements SourceFunction<Tuple2<Long, Integer>>, Checkpointed<Serializable> {
- private static final long serialVersionUID = 1;
-
- private int maxElements;
- private int checkpointDelay;
- private int readDelay;
-
- private volatile int count;
- private volatile long lastCheckpointId = -1;
-
- private Semaphore semaphore;
-
- private volatile boolean isRunning = true;
-
- public MockSource(int maxElements, int checkpointDelay, int readDelay) {
- this.maxElements = maxElements;
- this.checkpointDelay = checkpointDelay;
- this.readDelay = readDelay;
- this.count = 0;
- semaphore = new Semaphore(1);
- }
-
- @Override
- public void run(SourceContext<Tuple2<Long, Integer>> ctx) {
- final Object lockObject = ctx.getCheckpointLock();
- while (isRunning && count < maxElements) {
- // simulate some work
- try {
- Thread.sleep(readDelay);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
-
- synchronized (lockObject) {
- ctx.collect(new Tuple2<Long, Integer>(lastCheckpointId, count));
- count++;
- }
- }
- }
-
- @Override
- public void cancel() {
- isRunning = false;
- }
-
- @Override
- public Serializable snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
- if (!semaphore.tryAcquire()) {
- Assert.fail("Concurrent invocation of snapshotState.");
- }
- int startCount = count;
- lastCheckpointId = checkpointId;
-
- long sum = 0;
- for (int i = 0; i < checkpointDelay; i++) {
- sum += new Random().nextLong();
- }
-
- if (startCount != count) {
- semaphore.release();
- // This means that next() was invoked while the snapshot was ongoing
- Assert.fail("Count is different at start end end of snapshot.");
- }
- semaphore.release();
- return sum;
- }
-
- @Override
- public void restoreState(Serializable state) {
-
- }
- }
-
- /**
- * This calls triggerInterrupt on the given task with the given interval.
- */
- private static class Checkpointer implements Callable<Boolean> {
- private final int numCheckpoints;
- private final int checkpointInterval;
- private final AtomicLong checkpointId;
- private final StreamTask<Tuple2<Long, Integer>, ?> sourceTask;
-
- public Checkpointer(int numCheckpoints, int checkpointInterval, StreamTask<Tuple2<Long, Integer>, ?> task) {
- this.numCheckpoints = numCheckpoints;
- checkpointId = new AtomicLong(0);
- sourceTask = task;
- this.checkpointInterval = checkpointInterval;
- }
-
- @Override
- public Boolean call() throws Exception {
- for (int i = 0; i < numCheckpoints; i++) {
- long currentCheckpointId = checkpointId.getAndIncrement();
- sourceTask.triggerCheckpoint(currentCheckpointId, 0L);
- Thread.sleep(checkpointInterval);
- }
- return true;
- }
- }
-
- public static class OpenCloseTestSource extends RichSourceFunction<String> {
- private static final long serialVersionUID = 1L;
-
- public static boolean openCalled = false;
- public static boolean closeCalled = false;
-
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- if (closeCalled) {
- Assert.fail("Close called before open.");
- }
- openCalled = true;
- }
-
- @Override
- public void close() throws Exception {
- super.close();
- if (!openCalled) {
- Assert.fail("Open was not called before close.");
- }
- closeCalled = true;
- }
-
- @Override
- public void run(SourceContext<String> ctx) throws Exception {
- if (!openCalled) {
- Assert.fail("Open was not called before run.");
- }
- for (int i = 0; i < 10; i++) {
- ctx.collect("Hello" + i);
- }
- }
-
- @Override
- public void cancel() {}
- }
-}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
deleted file mode 100644
index 090f7cb..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ /dev/null
@@ -1,304 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.tasks;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.UnmodifiableConfiguration;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.memory.MemorySegmentFactory;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
-import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
-import org.apache.flink.runtime.event.AbstractEvent;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.io.network.api.serialization.AdaptiveSpanningRecordDeserializer;
-import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferProvider;
-import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
-import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
-import org.apache.flink.runtime.plugable.DeserializationDelegate;
-import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.concurrent.Future;
-
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyInt;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class StreamMockEnvironment implements Environment {
-
- private final MemoryManager memManager;
-
- private final IOManager ioManager;
-
- private final InputSplitProvider inputSplitProvider;
-
- private final Configuration jobConfiguration;
-
- private final Configuration taskConfiguration;
-
- private final List<InputGate> inputs;
-
- private final List<ResultPartitionWriter> outputs;
-
- private final JobID jobID = new JobID();
-
- private final BroadcastVariableManager bcVarManager = new BroadcastVariableManager();
-
- private final AccumulatorRegistry accumulatorRegistry;
-
- private final int bufferSize;
-
- public StreamMockEnvironment(Configuration jobConfig, Configuration taskConfig, long memorySize,
- MockInputSplitProvider inputSplitProvider, int bufferSize) {
- this.jobConfiguration = jobConfig;
- this.taskConfiguration = taskConfig;
- this.inputs = new LinkedList<InputGate>();
- this.outputs = new LinkedList<ResultPartitionWriter>();
-
- this.memManager = new MemoryManager(memorySize, 1);
- this.ioManager = new IOManagerAsync();
- this.inputSplitProvider = inputSplitProvider;
- this.bufferSize = bufferSize;
-
- this.accumulatorRegistry = new AccumulatorRegistry(jobID, getExecutionId());
- }
-
- public void addInputGate(InputGate gate) {
- inputs.add(gate);
- }
-
- public <T> void addOutput(final Queue<Object> outputList, final TypeSerializer<T> serializer) {
- try {
- // The record-oriented writers wrap the buffer writer. We mock it
- // to collect the returned buffers and deserialize the content to
- // the output list
- BufferProvider mockBufferProvider = mock(BufferProvider.class);
- when(mockBufferProvider.requestBufferBlocking()).thenAnswer(new Answer<Buffer>() {
-
- @Override
- public Buffer answer(InvocationOnMock invocationOnMock) throws Throwable {
- return new Buffer(
- MemorySegmentFactory.allocateUnpooledSegment(bufferSize),
- mock(BufferRecycler.class));
- }
- });
-
- ResultPartitionWriter mockWriter = mock(ResultPartitionWriter.class);
- when(mockWriter.getNumberOfOutputChannels()).thenReturn(1);
- when(mockWriter.getBufferProvider()).thenReturn(mockBufferProvider);
-
- final RecordDeserializer<DeserializationDelegate<T>> recordDeserializer = new AdaptiveSpanningRecordDeserializer<DeserializationDelegate<T>>();
- final NonReusingDeserializationDelegate<T> delegate = new NonReusingDeserializationDelegate<T>(serializer);
-
- // Add records from the buffer to the output list
- doAnswer(new Answer<Void>() {
-
- @Override
- public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
- Buffer buffer = (Buffer) invocationOnMock.getArguments()[0];
-
- recordDeserializer.setNextBuffer(buffer);
-
- while (recordDeserializer.hasUnfinishedData()) {
- RecordDeserializer.DeserializationResult result = recordDeserializer.getNextRecord(delegate);
-
- if (result.isFullRecord()) {
- outputList.add(delegate.getInstance());
- }
-
- if (result == RecordDeserializer.DeserializationResult.LAST_RECORD_FROM_BUFFER
- || result == RecordDeserializer.DeserializationResult.PARTIAL_RECORD) {
- break;
- }
- }
-
- return null;
- }
- }).when(mockWriter).writeBuffer(any(Buffer.class), anyInt());
-
- // Add events to the output list
- doAnswer(new Answer<Void>() {
-
- @Override
- public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
- AbstractEvent event = (AbstractEvent) invocationOnMock.getArguments()[0];
-
- outputList.add(event);
- return null;
- }
- }).when(mockWriter).writeEvent(any(AbstractEvent.class), anyInt());
-
- doAnswer(new Answer<Void>() {
-
- @Override
- public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
- AbstractEvent event = (AbstractEvent) invocationOnMock.getArguments()[0];
-
- outputList.add(event);
- return null;
- }
- }).when(mockWriter).writeEventToAllChannels(any(AbstractEvent.class));
-
- outputs.add(mockWriter);
- }
- catch (Throwable t) {
- t.printStackTrace();
- fail(t.getMessage());
- }
- }
-
- @Override
- public Configuration getTaskConfiguration() {
- return this.taskConfiguration;
- }
-
- @Override
- public MemoryManager getMemoryManager() {
- return this.memManager;
- }
-
- @Override
- public IOManager getIOManager() {
- return this.ioManager;
- }
-
- @Override
- public JobID getJobID() {
- return this.jobID;
- }
-
- @Override
- public Configuration getJobConfiguration() {
- return this.jobConfiguration;
- }
-
- @Override
- public int getNumberOfSubtasks() {
- return 1;
- }
-
- @Override
- public int getIndexInSubtaskGroup() {
- return 0;
- }
-
- @Override
- public InputSplitProvider getInputSplitProvider() {
- return this.inputSplitProvider;
- }
-
- @Override
- public String getTaskName() {
- return "";
- }
-
- @Override
- public String getTaskNameWithSubtasks() {
- return "";
- }
-
- @Override
- public ClassLoader getUserClassLoader() {
- return getClass().getClassLoader();
- }
-
- @Override
- public Map<String, Future<Path>> getDistributedCacheEntries() {
- return Collections.emptyMap();
- }
-
- @Override
- public ResultPartitionWriter getWriter(int index) {
- return outputs.get(index);
- }
-
- @Override
- public ResultPartitionWriter[] getAllWriters() {
- return outputs.toArray(new ResultPartitionWriter[outputs.size()]);
- }
-
- @Override
- public InputGate getInputGate(int index) {
- return inputs.get(index);
- }
-
- @Override
- public InputGate[] getAllInputGates() {
- InputGate[] gates = new InputGate[inputs.size()];
- inputs.toArray(gates);
- return gates;
- }
-
- @Override
- public JobVertexID getJobVertexId() {
- return new JobVertexID(new byte[16]);
- }
-
- @Override
- public ExecutionAttemptID getExecutionId() {
- return new ExecutionAttemptID(0L, 0L);
- }
-
- @Override
- public BroadcastVariableManager getBroadcastVariableManager() {
- return this.bcVarManager;
- }
-
- @Override
- public AccumulatorRegistry getAccumulatorRegistry() {
- return accumulatorRegistry;
- }
-
- @Override
- public void acknowledgeCheckpoint(long checkpointId) {
- }
-
- @Override
- public void acknowledgeCheckpoint(long checkpointId, StateHandle<?> state) {
- }
-
- @Override
- public TaskManagerRuntimeInfo getTaskManagerInfo() {
- return new TaskManagerRuntimeInfo("localhost", new UnmodifiableConfiguration(new Configuration()));
- }
-}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
deleted file mode 100644
index 6c48668..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ /dev/null
@@ -1,321 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.runtime.tasks;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.event.AbstractEvent;
-import org.apache.flink.runtime.io.network.partition.consumer.StreamTestSingleInputGate;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
-import org.apache.flink.streaming.api.collector.selector.BroadcastOutputSelectorWrapper;
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.graph.StreamEdge;
-import org.apache.flink.streaming.api.graph.StreamNode;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
-import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
-import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
-import org.apache.flink.util.InstantiationUtil;
-import org.junit.Assert;
-
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-
-/**
- * Test harness for testing a {@link StreamTask}.
- *
- * <p>
- * This mock Invokable provides the task with a basic runtime context and allows pushing elements
- * and watermarks into the task. {@link #getOutput()} can be used to get the emitted elements
- * and events. You are free to modify the retrieved list.
- *
- * <p>
- * After setting up everything the Task can be invoked using {@link #invoke()}. This will start
- * a new Thread to execute the Task. Use {@link #waitForTaskCompletion()} to wait for the Task
- * thread to finish.
- *
- * <p>
- * When using this you need to add the following line to your test class to setup Powermock:
- * {@code @PrepareForTest({ResultPartitionWriter.class})}
- */
-public class StreamTaskTestHarness<OUT> {
-
- private static final int DEFAULT_MEMORY_MANAGER_SIZE = 1024 * 1024;
-
- private static final int DEFAULT_NETWORK_BUFFER_SIZE = 1024;
-
- protected long memorySize = 0;
- protected int bufferSize = 0;
-
- protected StreamMockEnvironment mockEnv;
- protected ExecutionConfig executionConfig;
- private Configuration jobConfig;
- private Configuration taskConfig;
- protected StreamConfig streamConfig;
-
- private AbstractInvokable task;
-
- private TypeSerializer<OUT> outputSerializer;
- private TypeSerializer<StreamElement> outputStreamRecordSerializer;
-
- private ConcurrentLinkedQueue<Object> outputList;
-
- protected TaskThread taskThread;
-
- // These don't get initialized, the one-input/two-input specific test harnesses
- // must initialize these if they want to simulate input. We have them here so that all the
- // input related methods only need to be implemented once, in generic form
- protected int numInputGates;
- protected int numInputChannelsPerGate;
- @SuppressWarnings("rawtypes")
- protected StreamTestSingleInputGate[] inputGates;
-
- public StreamTaskTestHarness(AbstractInvokable task, TypeInformation<OUT> outputType) {
- this.task = task;
- this.memorySize = DEFAULT_MEMORY_MANAGER_SIZE;
- this.bufferSize = DEFAULT_NETWORK_BUFFER_SIZE;
-
- this.jobConfig = new Configuration();
- this.taskConfig = new Configuration();
- this.executionConfig = new ExecutionConfig();
- executionConfig.enableTimestamps();
- try {
- InstantiationUtil.writeObjectToConfig(executionConfig, this.jobConfig, ExecutionConfig.CONFIG_KEY);
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
-
- streamConfig = new StreamConfig(taskConfig);
- streamConfig.setChainStart();
- streamConfig.setBufferTimeout(0);
-
- outputSerializer = outputType.createSerializer(executionConfig);
- outputStreamRecordSerializer = new MultiplexingStreamRecordSerializer<OUT>(outputSerializer);
- }
-
- /**
- * This must be overwritten for OneInputStreamTask or TwoInputStreamTask test harnesses.
- */
- protected void initializeInputs() throws IOException, InterruptedException {}
-
- @SuppressWarnings("unchecked")
- private void initializeOutput() {
- outputList = new ConcurrentLinkedQueue<Object>();
-
- mockEnv.addOutput(outputList, outputStreamRecordSerializer);
-
- streamConfig.setOutputSelectorWrapper(new BroadcastOutputSelectorWrapper<Object>());
- streamConfig.setNumberOfOutputs(1);
-
- StreamOperator<OUT> dummyOperator = new AbstractStreamOperator<OUT>() {
- private static final long serialVersionUID = 1L;
- };
-
- List<StreamEdge> outEdgesInOrder = new LinkedList<StreamEdge>();
- StreamNode sourceVertexDummy = new StreamNode(null, 0, dummyOperator, "source dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class);
- StreamNode targetVertexDummy = new StreamNode(null, 1, dummyOperator, "target dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class);
-
- outEdgesInOrder.add(new StreamEdge(sourceVertexDummy, targetVertexDummy, 0, new LinkedList<String>(), new BroadcastPartitioner<Object>()));
- streamConfig.setOutEdgesInOrder(outEdgesInOrder);
- streamConfig.setNonChainedOutputs(outEdgesInOrder);
- streamConfig.setTypeSerializerOut(outputSerializer);
- streamConfig.setVertexID(0);
-
- }
-
- /**
- * Invoke the Task. This resets the output of any previous invocation. This will start a new
- * Thread to execute the Task in. Use {@link #waitForTaskCompletion()} to wait for the
- * Task thread to finish running.
- */
- public void invoke() throws Exception {
- mockEnv = new StreamMockEnvironment(jobConfig, taskConfig, memorySize, new MockInputSplitProvider(), bufferSize);
- task.setEnvironment(mockEnv);
-
- initializeInputs();
- initializeOutput();
-
- task.registerInputOutput();
-
- taskThread = new TaskThread(task);
- taskThread.start();
- }
-
- public void waitForTaskCompletion() throws Exception {
- if (taskThread == null) {
- throw new IllegalStateException("Task thread was not started.");
- }
-
- taskThread.join();
- if (taskThread.getError() != null) {
- throw new Exception("error in task", taskThread.getError());
- }
- }
-
- /**
- * Get all the output from the task. This contains StreamRecords and Events interleaved. Use
- * {@link org.apache.flink.streaming.util.TestHarnessUtil#getRawElementsFromOutput(java.util.Queue)}}
- * to extract only the StreamRecords.
- */
- public ConcurrentLinkedQueue<Object> getOutput() {
- return outputList;
- }
-
- public StreamConfig getStreamConfig() {
- return streamConfig;
- }
-
- private void shutdownIOManager() throws Exception {
- this.mockEnv.getIOManager().shutdown();
- Assert.assertTrue("IO Manager has not properly shut down.", this.mockEnv.getIOManager().isProperlyShutDown());
- }
-
- private void shutdownMemoryManager() throws Exception {
- if (this.memorySize > 0) {
- MemoryManager memMan = this.mockEnv.getMemoryManager();
- if (memMan != null) {
- Assert.assertTrue("Memory Manager managed memory was not completely freed.", memMan.verifyEmpty());
- memMan.shutdown();
- }
- }
- }
-
- /**
- * Sends the element to input gate 0 on channel 0.
- */
- @SuppressWarnings("unchecked")
- public void processElement(Object element) {
- inputGates[0].sendElement(element, 0);
- }
-
- /**
- * Sends the element to the specified channel on the specified input gate.
- */
- @SuppressWarnings("unchecked")
- public void processElement(Object element, int inputGate, int channel) {
- inputGates[inputGate].sendElement(element, channel);
- }
-
- /**
- * Sends the event to input gate 0 on channel 0.
- */
- public void processEvent(AbstractEvent event) {
- inputGates[0].sendEvent(event, 0);
- }
-
- /**
- * Sends the event to the specified channel on the specified input gate.
- */
- public void processEvent(AbstractEvent event, int inputGate, int channel) {
- inputGates[inputGate].sendEvent(event, channel);
- }
-
- /**
- * This only returns after all input queues are empty.
- */
- public void waitForInputProcessing() {
-
-
- // first wait for all input queues to be empty
- try {
- Thread.sleep(1);
- } catch (InterruptedException ignored) {}
-
- while (true) {
- boolean allEmpty = true;
- for (int i = 0; i < numInputGates; i++) {
- if (!inputGates[i].allQueuesEmpty()) {
- allEmpty = false;
- }
- }
- try {
- Thread.sleep(10);
- } catch (InterruptedException ignored) {}
-
- if (allEmpty) {
- break;
- }
- }
-
- // then wait for the Task Thread to be in a blocked state
- // Check whether the state is blocked, this should be the case if it cannot
- // read more input, i.e. all currently available input has been processed.
- while (true) {
- Thread.State state = taskThread.getState();
- if (state == Thread.State.BLOCKED || state == Thread.State.TERMINATED ||
- state == Thread.State.WAITING || state == Thread.State.TIMED_WAITING) {
- break;
- }
-
- try {
- Thread.sleep(1);
- } catch (InterruptedException ignored) {}
- }
- }
-
- /**
- * Notifies all input channels on all input gates that no more input will arrive. This
- * will usually make the Task exit from his internal loop.
- */
- public void endInput() {
- for (int i = 0; i < numInputGates; i++) {
- inputGates[i].endInput();
- }
- }
-
- // ------------------------------------------------------------------------
-
- private class TaskThread extends Thread {
-
- private final AbstractInvokable task;
-
- private volatile Throwable error;
-
-
- TaskThread(AbstractInvokable task) {
- super("Task Thread");
- this.task = task;
- }
-
- @Override
- public void run() {
- try {
- task.invoke();
- shutdownIOManager();
- shutdownMemoryManager();
- }
- catch (Throwable t) {
- this.error = t;
- }
- }
-
- public Throwable getError() {
- return error;
- }
- }
-}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTimerITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTimerITCase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTimerITCase.java
deleted file mode 100644
index cdc2c53..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTimerITCase.java
+++ /dev/null
@@ -1,313 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.runtime.tasks;
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.concurrent.Semaphore;
-
-/**
- * Tests for the timer service of {@code StreamTask}.
- *
- * <p>
- * These tests ensure that exceptions are properly forwarded from the timer thread to
- * the task thread and that operator methods are not invoked concurrently.
- */
-public class StreamTaskTimerITCase extends StreamingMultipleProgramsTestBase {
-
- /**
- * Note: this test fails if we don't have the synchronized block in
- * {@link org.apache.flink.streaming.runtime.tasks.SourceStreamTask.SourceOutput}
- *
- * <p>
- * This test never finishes if exceptions from the timer thread are not forwarded. Thus
- * a success here means that the exception forwarding works.
- */
- @Test
- public void testOperatorChainedToSource() throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
-
- DataStream<String> source = env.addSource(new InfiniteTestSource());
-
- source.transform("Custom Operator", BasicTypeInfo.STRING_TYPE_INFO, new TimerOperator(ChainingStrategy.ALWAYS));
-
- boolean testSuccess = false;
- try {
- env.execute("Timer test");
- } catch (JobExecutionException e) {
- if (e.getCause() instanceof TimerException) {
- TimerException te = (TimerException) e.getCause();
- if (te.getCause() instanceof RuntimeException) {
- RuntimeException re = (RuntimeException) te.getCause();
- if (re.getMessage().equals("TEST SUCCESS")) {
- testSuccess = true;
- } else {
- throw e;
- }
- } else {
- throw e;
- }
- } else {
- throw e;
- }
- }
- Assert.assertTrue(testSuccess);
- }
-
- /**
- * Note: this test fails if we don't have the synchronized block in
- * {@link org.apache.flink.streaming.runtime.tasks.SourceStreamTask.SourceOutput}
- */
- @Test
- public void testOneInputOperatorWithoutChaining() throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
-
- DataStream<String> source = env.addSource(new InfiniteTestSource());
-
- source.transform("Custom Operator", BasicTypeInfo.STRING_TYPE_INFO, new TimerOperator(ChainingStrategy.NEVER));
-
- boolean testSuccess = false;
- try {
- env.execute("Timer test");
- } catch (JobExecutionException e) {
- if (e.getCause() instanceof TimerException) {
- TimerException te = (TimerException) e.getCause();
- if (te.getCause() instanceof RuntimeException) {
- RuntimeException re = (RuntimeException) te.getCause();
- if (re.getMessage().equals("TEST SUCCESS")) {
- testSuccess = true;
- } else {
- throw e;
- }
- } else {
- throw e;
- }
- } else {
- throw e;
- }
- }
- Assert.assertTrue(testSuccess);
- }
-
- /**
- * Note: this test fails if we don't have the synchronized block in
- * {@link org.apache.flink.streaming.runtime.tasks.SourceStreamTask.SourceOutput}
- */
- @Test
- public void testTwoInputOperatorWithoutChaining() throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- env.setParallelism(1);
-
- DataStream<String> source = env.addSource(new InfiniteTestSource());
-
- source.connect(source).transform(
- "Custom Operator",
- BasicTypeInfo.STRING_TYPE_INFO,
- new TwoInputTimerOperator(ChainingStrategy.NEVER));
-
- boolean testSuccess = false;
- try {
- env.execute("Timer test");
- } catch (JobExecutionException e) {
- if (e.getCause() instanceof TimerException) {
- TimerException te = (TimerException) e.getCause();
- if (te.getCause() instanceof RuntimeException) {
- RuntimeException re = (RuntimeException) te.getCause();
- if (re.getMessage().equals("TEST SUCCESS")) {
- testSuccess = true;
- } else {
- throw e;
- }
- } else {
- throw e;
- }
- } else {
- throw e;
- }
- }
- Assert.assertTrue(testSuccess);
- }
-
- public static class TimerOperator extends AbstractStreamOperator<String> implements OneInputStreamOperator<String, String>, Triggerable {
- private static final long serialVersionUID = 1L;
-
- int numTimers = 0;
- int numElements = 0;
-
- private boolean first = true;
-
- private Semaphore semaphore = new Semaphore(1);
-
- public TimerOperator(ChainingStrategy chainingStrategy) {
- setChainingStrategy(chainingStrategy);
- }
-
- @Override
- public void processElement(StreamRecord<String> element) throws Exception {
- if (!semaphore.tryAcquire()) {
- Assert.fail("Concurrent invocation of operator functions.");
- }
-
- if (first) {
- registerTimer(System.currentTimeMillis() + 100, this);
- first = false;
- }
- numElements++;
-
- semaphore.release();
- }
-
- @Override
- public void trigger(long time) throws Exception {
- if (!semaphore.tryAcquire()) {
- Assert.fail("Concurrent invocation of operator functions.");
- }
-
- try {
- numTimers++;
- throwIfDone();
- registerTimer(System.currentTimeMillis() + 1, this);
- } finally {
- semaphore.release();
- }
- }
-
- private void throwIfDone() {
- if (numTimers > 1000 && numElements > 10_000) {
- throw new RuntimeException("TEST SUCCESS");
- }
- }
-
- @Override
- public void processWatermark(Watermark mark) throws Exception {
- //ignore
- }
- }
-
- public static class TwoInputTimerOperator extends AbstractStreamOperator<String> implements TwoInputStreamOperator<String, String, String>, Triggerable {
- private static final long serialVersionUID = 1L;
-
- int numTimers = 0;
- int numElements = 0;
-
- private boolean first = true;
-
- private Semaphore semaphore = new Semaphore(1);
-
- public TwoInputTimerOperator(ChainingStrategy chainingStrategy) {
- setChainingStrategy(chainingStrategy);
- }
-
- @Override
- public void processElement1(StreamRecord<String> element) throws Exception {
- if (!semaphore.tryAcquire()) {
- Assert.fail("Concurrent invocation of operator functions.");
- }
-
- if (first) {
- registerTimer(System.currentTimeMillis() + 100, this);
- first = false;
- }
- numElements++;
-
- semaphore.release();
- }
-
- @Override
- public void processElement2(StreamRecord<String> element) throws Exception {
- if (!semaphore.tryAcquire()) {
- Assert.fail("Concurrent invocation of operator functions.");
- }
-
- if (first) {
- registerTimer(System.currentTimeMillis() + 100, this);
- first = false;
- }
- numElements++;
-
- semaphore.release();
- }
-
-
- @Override
- public void trigger(long time) throws Exception {
- if (!semaphore.tryAcquire()) {
- Assert.fail("Concurrent invocation of operator functions.");
- }
-
- try {
- numTimers++;
- throwIfDone();
- registerTimer(System.currentTimeMillis() + 1, this);
- } finally {
- semaphore.release();
- }
- }
-
- private void throwIfDone() {
- if (numTimers > 1000 && numElements > 10_000) {
- throw new RuntimeException("TEST SUCCESS");
- }
- }
-
- @Override
- public void processWatermark1(Watermark mark) throws Exception {
- //ignore
- }
-
- @Override
- public void processWatermark2(Watermark mark) throws Exception {
- //ignore
- }
- }
-
-
- private static class InfiniteTestSource implements SourceFunction<String> {
- private static final long serialVersionUID = 1L;
- private volatile boolean running = true;
-
- @Override
- public void run(SourceContext<String> ctx) throws Exception {
- while (running) {
- ctx.collect("hello");
- }
- }
-
- @Override
- public void cancel() {
- running = false;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
deleted file mode 100644
index f87d7ea..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
+++ /dev/null
@@ -1,373 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.runtime.tasks;
-
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.streaming.api.functions.co.CoMapFunction;
-import org.apache.flink.streaming.api.functions.co.RichCoMapFunction;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.operators.co.CoStreamMap;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.TestHarnessUtil;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import java.util.List;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-/**
- * Tests for {@link org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask}. Theses tests
- * implicitly also test the {@link org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor}.
- *
- * <p>
- * Note:<br>
- * We only use a {@link CoStreamMap} operator here. We also test the individual operators but Map is
- * used as a representative to test TwoInputStreamTask, since TwoInputStreamTask is used for all
- * TwoInputStreamOperators.
- */
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({ResultPartitionWriter.class})
-public class TwoInputStreamTaskTest {
-
- /**
- * This test verifies that open() and close() are correctly called. This test also verifies
- * that timestamps of emitted elements are correct. {@link CoStreamMap} assigns the input
- * timestamp to emitted elements.
- */
- @Test
- @SuppressWarnings("unchecked")
- public void testOpenCloseAndTimestamps() throws Exception {
- final TwoInputStreamTask<String, Integer, String> coMapTask = new TwoInputStreamTask<String, Integer, String>();
- final TwoInputStreamTaskTestHarness<String, Integer, String> testHarness = new TwoInputStreamTaskTestHarness<String, Integer, String>(coMapTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
-
- StreamConfig streamConfig = testHarness.getStreamConfig();
- CoStreamMap<String, Integer, String> coMapOperator = new CoStreamMap<String, Integer, String>(new TestOpenCloseMapFunction());
- streamConfig.setStreamOperator(coMapOperator);
-
- long initialTime = 0L;
- ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
-
- testHarness.invoke();
-
- testHarness.processElement(new StreamRecord<String>("Hello", initialTime + 1), 0, 0);
- expectedOutput.add(new StreamRecord<String>("Hello", initialTime + 1));
-
- // wait until the input is processed to ensure ordering of the output
- testHarness.waitForInputProcessing();
-
- testHarness.processElement(new StreamRecord<Integer>(1337, initialTime + 2), 1, 0);
-
- expectedOutput.add(new StreamRecord<String>("1337", initialTime + 2));
-
- testHarness.endInput();
-
- testHarness.waitForTaskCompletion();
-
- Assert.assertTrue("RichFunction methods where not called.", TestOpenCloseMapFunction.closeCalled);
-
- TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
- }
-
- /**
- * This test verifies that watermarks are correctly forwarded. This also checks whether
- * watermarks are forwarded only when we have received watermarks from all inputs. The
- * forwarded watermark must be the minimum of the watermarks of all inputs.
- */
- @Test
- @SuppressWarnings("unchecked")
- public void testWatermarkForwarding() throws Exception {
- final TwoInputStreamTask<String, Integer, String> coMapTask = new TwoInputStreamTask<String, Integer, String>();
- final TwoInputStreamTaskTestHarness<String, Integer, String> testHarness = new TwoInputStreamTaskTestHarness<String, Integer, String>(coMapTask, 2, 2, new int[] {1, 2}, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
-
- StreamConfig streamConfig = testHarness.getStreamConfig();
- CoStreamMap<String, Integer, String> coMapOperator = new CoStreamMap<String, Integer, String>(new IdentityMap());
- streamConfig.setStreamOperator(coMapOperator);
-
- ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
- long initialTime = 0L;
-
- testHarness.invoke();
-
- testHarness.processElement(new Watermark(initialTime), 0, 0);
- testHarness.processElement(new Watermark(initialTime), 0, 1);
-
- testHarness.processElement(new Watermark(initialTime), 1, 0);
-
-
- // now the output should still be empty
- testHarness.waitForInputProcessing();
- TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-
- testHarness.processElement(new Watermark(initialTime), 1, 1);
-
- // now the watermark should have propagated, Map simply forward Watermarks
- testHarness.waitForInputProcessing();
- expectedOutput.add(new Watermark(initialTime));
- TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-
- // contrary to checkpoint barriers these elements are not blocked by watermarks
- testHarness.processElement(new StreamRecord<String>("Hello", initialTime), 0, 0);
- testHarness.processElement(new StreamRecord<Integer>(42, initialTime), 1, 1);
- expectedOutput.add(new StreamRecord<String>("Hello", initialTime));
- expectedOutput.add(new StreamRecord<String>("42", initialTime));
-
- testHarness.waitForInputProcessing();
- TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-
- testHarness.processElement(new Watermark(initialTime + 4), 0, 0);
- testHarness.processElement(new Watermark(initialTime + 3), 0, 1);
- testHarness.processElement(new Watermark(initialTime + 3), 1, 0);
- testHarness.processElement(new Watermark(initialTime + 2), 1, 1);
-
- // check whether we get the minimum of all the watermarks, this must also only occur in
- // the output after the two StreamRecords
- expectedOutput.add(new Watermark(initialTime + 2));
- testHarness.waitForInputProcessing();
- TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-
-
- // advance watermark from one of the inputs, now we should get a now one since the
- // minimum increases
- testHarness.processElement(new Watermark(initialTime + 4), 1, 1);
- testHarness.waitForInputProcessing();
- expectedOutput.add(new Watermark(initialTime + 3));
- TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-
- // advance the other two inputs, now we should get a new one since the
- // minimum increases again
- testHarness.processElement(new Watermark(initialTime + 4), 0, 1);
- testHarness.processElement(new Watermark(initialTime + 4), 1, 0);
- testHarness.waitForInputProcessing();
- expectedOutput.add(new Watermark(initialTime + 4));
- TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-
- testHarness.endInput();
-
- testHarness.waitForTaskCompletion();
-
- List<String> resultElements = TestHarnessUtil.getRawElementsFromOutput(testHarness.getOutput());
- Assert.assertEquals(2, resultElements.size());
- }
-
- /**
- * This test verifies that checkpoint barriers are correctly forwarded.
- */
- @Test
- @SuppressWarnings("unchecked")
- public void testCheckpointBarriers() throws Exception {
- final TwoInputStreamTask<String, Integer, String> coMapTask = new TwoInputStreamTask<String, Integer, String>();
- final TwoInputStreamTaskTestHarness<String, Integer, String> testHarness = new TwoInputStreamTaskTestHarness<String, Integer, String>(coMapTask, 2, 2, new int[] {1, 2}, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
-
- StreamConfig streamConfig = testHarness.getStreamConfig();
- CoStreamMap<String, Integer, String> coMapOperator = new CoStreamMap<String, Integer, String>(new IdentityMap());
- streamConfig.setStreamOperator(coMapOperator);
-
- ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
- long initialTime = 0L;
-
- testHarness.invoke();
-
- testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 0);
-
- // This element should be buffered since we received a checkpoint barrier on
- // this input
- testHarness.processElement(new StreamRecord<String>("Hello-0-0", initialTime), 0, 0);
-
- // This one should go through
- testHarness.processElement(new StreamRecord<String>("Ciao-0-0", initialTime), 0, 1);
- expectedOutput.add(new StreamRecord<String>("Ciao-0-0", initialTime));
-
- // These elements should be forwarded, since we did not yet receive a checkpoint barrier
- // on that input, only add to same input, otherwise we would not know the ordering
- // of the output since the Task might read the inputs in any order
- testHarness.processElement(new StreamRecord<Integer>(11, initialTime), 1, 1);
- testHarness.processElement(new StreamRecord<Integer>(111, initialTime), 1, 1);
- expectedOutput.add(new StreamRecord<String>("11", initialTime));
- expectedOutput.add(new StreamRecord<String>("111", initialTime));
-
- testHarness.waitForInputProcessing();
- // we should not yet see the barrier, only the two elements from non-blocked input
- TestHarnessUtil.assertOutputEquals("Output was not correct.",
- testHarness.getOutput(),
- expectedOutput);
-
- testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 1);
- testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 0);
- testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 1);
-
- testHarness.waitForInputProcessing();
-
- // now we should see the barrier and after that the buffered elements
- expectedOutput.add(new CheckpointBarrier(0, 0));
- expectedOutput.add(new StreamRecord<String>("Hello-0-0", initialTime));
- TestHarnessUtil.assertOutputEquals("Output was not correct.",
- testHarness.getOutput(),
- expectedOutput);
-
- testHarness.endInput();
-
- testHarness.waitForTaskCompletion();
-
- List<String> resultElements = TestHarnessUtil.getRawElementsFromOutput(testHarness.getOutput());
- Assert.assertEquals(4, resultElements.size());
- }
-
- /**
- * This test verifies that checkpoint barriers and barrier buffers work correctly with
- * concurrent checkpoint barriers where one checkpoint is "overtaking" another checkpoint, i.e.
- * some inputs receive barriers from an earlier checkpoint, thereby blocking,
- * then all inputs receive barriers from a later checkpoint.
- */
- @Test
- @SuppressWarnings("unchecked")
- public void testOvertakingCheckpointBarriers() throws Exception {
- final TwoInputStreamTask<String, Integer, String> coMapTask = new TwoInputStreamTask<String, Integer, String>();
- final TwoInputStreamTaskTestHarness<String, Integer, String> testHarness = new TwoInputStreamTaskTestHarness<String, Integer, String>(coMapTask, 2, 2, new int[] {1, 2}, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
-
- StreamConfig streamConfig = testHarness.getStreamConfig();
- CoStreamMap<String, Integer, String> coMapOperator = new CoStreamMap<String, Integer, String>(new IdentityMap());
- streamConfig.setStreamOperator(coMapOperator);
-
- ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
- long initialTime = 0L;
-
- testHarness.invoke();
-
- testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 0);
-
- // These elements should be buffered until we receive barriers from
- // all inputs
- testHarness.processElement(new StreamRecord<String>("Hello-0-0", initialTime), 0, 0);
- testHarness.processElement(new StreamRecord<String>("Ciao-0-0", initialTime), 0, 0);
-
- // These elements should be forwarded, since we did not yet receive a checkpoint barrier
- // on that input, only add to same input, otherwise we would not know the ordering
- // of the output since the Task might read the inputs in any order
- testHarness.processElement(new StreamRecord<Integer>(42, initialTime), 1, 1);
- testHarness.processElement(new StreamRecord<Integer>(1337, initialTime), 1, 1);
- expectedOutput.add(new StreamRecord<String>("42", initialTime));
- expectedOutput.add(new StreamRecord<String>("1337", initialTime));
-
- testHarness.waitForInputProcessing();
- // we should not yet see the barrier, only the two elements from non-blocked input
- TestHarnessUtil.assertOutputEquals("Output was not correct.",
- expectedOutput,
- testHarness.getOutput());
-
- // Now give a later barrier to all inputs, this should unblock the first channel,
- // thereby allowing the two blocked elements through
- testHarness.processEvent(new CheckpointBarrier(1, 1), 0, 0);
- testHarness.processEvent(new CheckpointBarrier(1, 1), 0, 1);
- testHarness.processEvent(new CheckpointBarrier(1, 1), 1, 0);
- testHarness.processEvent(new CheckpointBarrier(1, 1), 1, 1);
-
- expectedOutput.add(new StreamRecord<String>("Hello-0-0", initialTime));
- expectedOutput.add(new StreamRecord<String>("Ciao-0-0", initialTime));
- expectedOutput.add(new CheckpointBarrier(1, 1));
-
- testHarness.waitForInputProcessing();
-
- TestHarnessUtil.assertOutputEquals("Output was not correct.",
- expectedOutput,
- testHarness.getOutput());
-
-
- // Then give the earlier barrier, these should be ignored
- testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 1);
- testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 0);
- testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 1);
-
- testHarness.waitForInputProcessing();
-
-
- testHarness.endInput();
-
- testHarness.waitForTaskCompletion();
-
- TestHarnessUtil.assertOutputEquals("Output was not correct.",
- expectedOutput,
- testHarness.getOutput());
- }
-
- // This must only be used in one test, otherwise the static fields will be changed
- // by several tests concurrently
- private static class TestOpenCloseMapFunction extends RichCoMapFunction<String, Integer, String> {
- private static final long serialVersionUID = 1L;
-
- public static boolean openCalled = false;
- public static boolean closeCalled = false;
-
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- if (closeCalled) {
- Assert.fail("Close called before open.");
- }
- openCalled = true;
- }
-
- @Override
- public void close() throws Exception {
- super.close();
- if (!openCalled) {
- Assert.fail("Open was not called before close.");
- }
- closeCalled = true;
- }
-
- @Override
- public String map1(String value) throws Exception {
- if (!openCalled) {
- Assert.fail("Open was not called before run.");
- }
- return value;
- }
-
- @Override
- public String map2(Integer value) throws Exception {
- if (!openCalled) {
- Assert.fail("Open was not called before run.");
- }
- return value.toString();
- }
- }
-
- private static class IdentityMap implements CoMapFunction<String, Integer, String> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public String map1(String value) throws Exception {
- return value;
- }
-
- @Override
- public String map2(Integer value) throws Exception {
-
- return value.toString();
- }
- }
-}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
deleted file mode 100644
index 2b20101..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.runtime.tasks;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.io.network.partition.consumer.StreamTestSingleInputGate;
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
-import org.apache.flink.streaming.api.graph.StreamEdge;
-import org.apache.flink.streaming.api.graph.StreamNode;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
-
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-
-
-/**
- * Test harness for testing a {@link TwoInputStreamTask}.
- *
- * <p>
- * This mock Invokable provides the task with a basic runtime context and allows pushing elements
- * and watermarks into the task. {@link #getOutput()} can be used to get the emitted elements
- * and events. You are free to modify the retrieved list.
- *
- * <p>
- * After setting up everything the Task can be invoked using {@link #invoke()}. This will start
- * a new Thread to execute the Task. Use {@link #waitForTaskCompletion()} to wait for the Task
- * thread to finish. Use {@link #processElement}
- * to send elements to the task. Use
- * {@link #processEvent(org.apache.flink.runtime.event.AbstractEvent)} to send events to the task.
- * Before waiting for the task to finish you must call {@link #endInput()} to signal to the task
- * that data entry is finished.
- *
- * <p>
- * When Elements or Events are offered to the Task they are put into a queue. The input gates
- * of the Task read from this queue. Use {@link #waitForInputProcessing()} to wait until all
- * queues are empty. This must be used after entering some elements before checking the
- * desired output.
- *
- * <p>
- * When using this you need to add the following line to your test class to setup Powermock:
- * {@code @PrepareForTest({ResultPartitionWriter.class})}
- */
-public class TwoInputStreamTaskTestHarness<IN1, IN2, OUT> extends StreamTaskTestHarness<OUT> {
-
- private TypeInformation<IN1> inputType1;
- private TypeSerializer<IN1> inputSerializer1;
-
- private TypeInformation<IN2> inputType2;
- private TypeSerializer<IN2> inputSerializer2;
-
- private int[] inputGateAssignment;
-
- /**
- * Creates a test harness with the specified number of input gates and specified number
- * of channels per input gate. Parameter inputGateAssignment specifies for each gate whether
- * it should be assigned to the first (1), or second (2) input of the task.
- */
- public TwoInputStreamTaskTestHarness(TwoInputStreamTask<IN1, IN2, OUT> task,
- int numInputGates,
- int numInputChannelsPerGate,
- int[] inputGateAssignment,
- TypeInformation<IN1> inputType1,
- TypeInformation<IN2> inputType2,
- TypeInformation<OUT> outputType) {
- super(task, outputType);
-
- this.inputType1 = inputType1;
- inputSerializer1 = inputType1.createSerializer(executionConfig);
-
- this.inputType2 = inputType2;
- inputSerializer2 = inputType2.createSerializer(executionConfig);
-
- this.numInputGates = numInputGates;
- this.numInputChannelsPerGate = numInputChannelsPerGate;
- this.inputGateAssignment = inputGateAssignment;
- }
-
- /**
- * Creates a test harness with one input gate (that has one input channel) per input. The first
- * input gate is assigned to the first task input, the second input gate is assigned to the
- * second task input.
- */
- public TwoInputStreamTaskTestHarness(TwoInputStreamTask<IN1, IN2, OUT> task,
- TypeInformation<IN1> inputType1,
- TypeInformation<IN2> inputType2,
- TypeInformation<OUT> outputType) {
- this(task, 2, 1, new int[] {1, 2}, inputType1, inputType2, outputType);
- }
-
- @Override
- protected void initializeInputs() throws IOException, InterruptedException {
-
- inputGates = new StreamTestSingleInputGate[numInputGates];
- List<StreamEdge> inPhysicalEdges = new LinkedList<StreamEdge>();
-
- StreamOperator<IN1> dummyOperator = new AbstractStreamOperator<IN1>() {
- private static final long serialVersionUID = 1L;
- };
-
- StreamNode sourceVertexDummy = new StreamNode(null, 0, dummyOperator, "source dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class);
- StreamNode targetVertexDummy = new StreamNode(null, 1, dummyOperator, "target dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class);
-
- for (int i = 0; i < numInputGates; i++) {
-
- switch (inputGateAssignment[i]) {
- case 1: {
- inputGates[i] = new StreamTestSingleInputGate<IN1>(
- numInputChannelsPerGate,
- bufferSize,
- inputSerializer1);
-
-
- StreamEdge streamEdge = new StreamEdge(sourceVertexDummy,
- targetVertexDummy,
- 1,
- new LinkedList<String>(),
- new BroadcastPartitioner<Object>());
-
- inPhysicalEdges.add(streamEdge);
- break;
- }
- case 2: {
- inputGates[i] = new StreamTestSingleInputGate<IN2>(
- numInputChannelsPerGate,
- bufferSize,
- inputSerializer2);
-
- StreamEdge streamEdge = new StreamEdge(sourceVertexDummy,
- targetVertexDummy,
- 2,
- new LinkedList<String>(),
- new BroadcastPartitioner<Object>());
-
- inPhysicalEdges.add(streamEdge);
- break;
- }
- default:
- throw new IllegalStateException("Wrong input gate assignment.");
- }
-
- this.mockEnv.addInputGate(inputGates[i].getInputGate());
- }
-
- streamConfig.setInPhysicalEdges(inPhysicalEdges);
- streamConfig.setNumberOfInputs(numInputGates);
- streamConfig.setTypeSerializerIn1(inputSerializer1);
- streamConfig.setTypeSerializerIn2(inputSerializer2);
- }
-
-}
-
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
deleted file mode 100644
index 749e1dd..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
+++ /dev/null
@@ -1,619 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.timestamp;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.client.program.ProgramInvocationException;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.taskmanager.MultiShotLatch;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.AscendingTimestampExtractor;
-import org.apache.flink.streaming.api.functions.TimestampExtractor;
-import org.apache.flink.streaming.api.functions.co.CoMapFunction;
-import org.apache.flink.streaming.api.functions.source.EventTimeSourceFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.NoOpSink;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.junit.Assert.fail;
-
-/**
- * Tests for timestamps, watermarks, and event-time sources.
- */
-@SuppressWarnings("serial")
-public class TimestampITCase {
-
- private static final int NUM_TASK_MANAGERS = 2;
- private static final int NUM_TASK_SLOTS = 3;
- private static final int PARALLELISM = NUM_TASK_MANAGERS * NUM_TASK_SLOTS;
-
- // this is used in some tests to synchronize
- static MultiShotLatch latch;
-
-
- private static ForkableFlinkMiniCluster cluster;
-
- @Before
- public void setupLatch() {
- // ensure that we get a fresh latch for each test
- latch = new MultiShotLatch();
- }
-
-
- @BeforeClass
- public static void startCluster() {
- try {
- Configuration config = new Configuration();
- config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TASK_MANAGERS);
- config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_TASK_SLOTS);
- config.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "0 ms");
- config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12);
-
- cluster = new ForkableFlinkMiniCluster(config, false);
-
- cluster.start();
- }
- catch (Exception e) {
- e.printStackTrace();
- fail("Failed to start test cluster: " + e.getMessage());
- }
- }
-
- @AfterClass
- public static void shutdownCluster() {
- try {
- cluster.shutdown();
- cluster = null;
- }
- catch (Exception e) {
- e.printStackTrace();
- fail("Failed to stop test cluster: " + e.getMessage());
- }
- }
-
- /**
- * These check whether custom timestamp emission works at sources and also whether timestamps
- * arrive at operators throughout a topology.
- *
- * <p>
- * This also checks whether watermarks keep propagating if a source closes early.
- *
- * <p>
- * This only uses map to test the workings of watermarks in a complete, running topology. All
- * tasks and stream operators have dedicated tests that test the watermark propagation
- * behaviour.
- */
- @Test
- public void testWatermarkPropagation() throws Exception {
- final int NUM_WATERMARKS = 10;
-
- long initialTime = 0L;
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
- "localhost", cluster.getLeaderRPCPort());
- env.setParallelism(PARALLELISM);
- env.getConfig().disableSysoutLogging();
- env.getConfig().enableTimestamps();
-
-
- DataStream<Integer> source1 = env.addSource(new MyTimestampSource(initialTime, NUM_WATERMARKS));
- DataStream<Integer> source2 = env.addSource(new MyTimestampSource(initialTime, NUM_WATERMARKS / 2));
-
- source1.union(source2)
- .map(new IdentityMap())
- .connect(source2).map(new IdentityCoMap())
- .transform("Custom Operator", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator())
- .addSink(new NoOpSink<Integer>());
-
- env.execute();
-
- // verify that all the watermarks arrived at the final custom operator
- for (int i = 0; i < PARALLELISM; i++) {
- // There can be two cases, either we get NUM_WATERMARKS + 1 watermarks or
- // (NUM_WATERMARKS / 2) + 1 watermarks. This depends on which source get's to run first.
- // If source1 runs first we jump directly to +Inf and skip all the intermediate
- // watermarks. If source2 runs first we see the intermediate watermarks from
- // NUM_WATERMARKS/2 to +Inf.
- if (CustomOperator.finalWatermarks[i].size() == NUM_WATERMARKS + 1) {
- for (int j = 0; j < NUM_WATERMARKS; j++) {
- if (!CustomOperator.finalWatermarks[i].get(j).equals(new Watermark(initialTime + j))) {
- System.err.println("All Watermarks: ");
- for (int k = 0; k <= NUM_WATERMARKS; k++) {
- System.err.println(CustomOperator.finalWatermarks[i].get(k));
- }
-
- Assert.fail("Wrong watermark.");
- }
- }
- if (!CustomOperator.finalWatermarks[i].get(NUM_WATERMARKS).equals(new Watermark(Long.MAX_VALUE))) {
- System.err.println("All Watermarks: ");
- for (int k = 0; k <= NUM_WATERMARKS; k++) {
- System.err.println(CustomOperator.finalWatermarks[i].get(k));
- }
-
- Assert.fail("Wrong watermark.");
- }
- } else {
- for (int j = 0; j < NUM_WATERMARKS / 2; j++) {
- if (!CustomOperator.finalWatermarks[i].get(j).equals(new Watermark(initialTime + j))) {
- System.err.println("All Watermarks: ");
- for (int k = 0; k <= NUM_WATERMARKS / 2; k++) {
- System.err.println(CustomOperator.finalWatermarks[i].get(k));
- }
-
- Assert.fail("Wrong watermark.");
- }
- }
- if (!CustomOperator.finalWatermarks[i].get(NUM_WATERMARKS / 2).equals(new Watermark(Long.MAX_VALUE))) {
- System.err.println("All Watermarks: ");
- for (int k = 0; k <= NUM_WATERMARKS / 2; k++) {
- System.err.println(CustomOperator.finalWatermarks[i].get(k));
- }
-
- Assert.fail("Wrong watermark.");
- }
-
- }
-
- }
- }
-
-
-
- /**
- * These check whether timestamps are properly assigned at the sources and handled in
- * network transmission and between chained operators when timestamps are enabled.
- */
- @Test
- public void testTimestampHandling() throws Exception {
- final int NUM_ELEMENTS = 10;
-
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
- "localhost", cluster.getLeaderRPCPort());
- env.setParallelism(PARALLELISM);
- env.getConfig().disableSysoutLogging();
- env.getConfig().enableTimestamps();
-
-
- DataStream<Integer> source1 = env.addSource(new MyTimestampSource(0L, NUM_ELEMENTS));
- DataStream<Integer> source2 = env.addSource(new MyTimestampSource(0L, NUM_ELEMENTS));
-
- source1
- .map(new IdentityMap())
- .connect(source2).map(new IdentityCoMap())
- .transform("Custom Operator", BasicTypeInfo.INT_TYPE_INFO, new TimestampCheckingOperator())
- .addSink(new NoOpSink<Integer>());
-
-
- env.execute();
- }
-
- /**
- * These check whether timestamps are properly ignored when they are disabled.
- */
- @Test
- public void testDisabledTimestamps() throws Exception {
- final int NUM_ELEMENTS = 10;
-
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
- "localhost", cluster.getLeaderRPCPort());
- env.setParallelism(PARALLELISM);
- env.getConfig().disableSysoutLogging();
- Assert.assertEquals("Timestamps are not disabled by default.",
- false,
- env.getConfig().areTimestampsEnabled());
- env.getConfig().disableTimestamps();
-
-
- DataStream<Integer> source1 = env.addSource(new MyNonWatermarkingSource(NUM_ELEMENTS));
- DataStream<Integer> source2 = env.addSource(new MyNonWatermarkingSource(NUM_ELEMENTS));
-
- source1
- .map(new IdentityMap())
- .connect(source2).map(new IdentityCoMap())
- .transform("Custom Operator", BasicTypeInfo.INT_TYPE_INFO, new DisabledTimestampCheckingOperator())
- .addSink(new NoOpSink<Integer>());
-
-
- env.execute();
- }
-
- /**
- * This thests whether timestamps are properly extracted in the timestamp
- * extractor and whether watermarks are also correctly forwared from this with the auto watermark
- * interval.
- */
- @Test
- public void testTimestampExtractorWithAutoInterval() throws Exception {
- final int NUM_ELEMENTS = 10;
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort());
- env.setParallelism(1);
- env.getConfig().disableSysoutLogging();
- env.getConfig().enableTimestamps();
- env.getConfig().setAutoWatermarkInterval(10);
-
-
- DataStream<Integer> source1 = env.addSource(new SourceFunction<Integer>() {
- @Override
- public void run(SourceContext<Integer> ctx) throws Exception {
- int index = 0;
- while (index < NUM_ELEMENTS) {
- ctx.collect(index);
- latch.await();
- index++;
- }
- }
-
- @Override
- public void cancel() {
-
- }
- });
-
- DataStream<Integer> extractOp = source1.assignTimestamps(
- new AscendingTimestampExtractor<Integer>() {
- @Override
- public long extractAscendingTimestamp(Integer element, long currentTimestamp) {
- return element;
- }
- });
-
- extractOp
- .transform("Watermark Check", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator())
- .transform("Timestamp Check",
- BasicTypeInfo.INT_TYPE_INFO,
- new TimestampCheckingOperator());
-
- // verify that extractor picks up source parallelism
- Assert.assertEquals(extractOp.getTransformation().getParallelism(), source1.getTransformation().getParallelism());
-
- env.execute();
-
- // verify that we get NUM_ELEMENTS watermarks
- for (int j = 0; j < NUM_ELEMENTS; j++) {
- if (!CustomOperator.finalWatermarks[0].get(j).equals(new Watermark(j - 1))) {
- Assert.fail("Wrong watermark.");
- }
- }
- if (!CustomOperator.finalWatermarks[0].get(NUM_ELEMENTS).equals(new Watermark(Long.MAX_VALUE))) {
- Assert.fail("Wrong watermark.");
- }
- }
-
- /**
- * This thests whether timestamps are properly extracted in the timestamp
- * extractor and whether watermark are correctly forwarded from the custom watermark emit
- * function.
- */
- @Test
- public void testTimestampExtractorWithCustomWatermarkEmit() throws Exception {
- final int NUM_ELEMENTS = 10;
-
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort());
- env.setParallelism(1);
- env.getConfig().disableSysoutLogging();
- env.getConfig().enableTimestamps();
-
-
- DataStream<Integer> source1 = env.addSource(new SourceFunction<Integer>() {
- @Override
- public void run(SourceContext<Integer> ctx) throws Exception {
- int index = 0;
- while (index < NUM_ELEMENTS) {
- ctx.collect(index);
- latch.await();
- index++;
- }
- }
-
- @Override
- public void cancel() {
-
- }
- });
-
- source1.assignTimestamps(new TimestampExtractor<Integer>() {
- @Override
- public long extractTimestamp(Integer element, long currentTimestamp) {
- return element;
- }
-
- @Override
- public long extractWatermark(Integer element, long currentTimestamp) {
- return element - 1;
- }
-
- @Override
- public long getCurrentWatermark() {
- return Long.MIN_VALUE;
- }
- })
- .transform("Watermark Check", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator())
- .transform("Timestamp Check", BasicTypeInfo.INT_TYPE_INFO, new TimestampCheckingOperator());
-
-
- env.execute();
-
- // verify that we get NUM_ELEMENTS watermarks
- for (int j = 0; j < NUM_ELEMENTS; j++) {
- if (!CustomOperator.finalWatermarks[0].get(j).equals(new Watermark(j - 1))) {
- Assert.fail("Wrong watermark.");
- }
- }
- if (!CustomOperator.finalWatermarks[0].get(NUM_ELEMENTS).equals(new Watermark(Long.MAX_VALUE))) {
- Assert.fail("Wrong watermark.");
- }
- }
-
- /**
- * This tests whether the program throws an exception when an event-time source tries
- * to emit without timestamp.
- */
- @Test(expected = ProgramInvocationException.class)
- public void testEventTimeSourceEmitWithoutTimestamp() throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort());
- env.setParallelism(PARALLELISM);
- env.getConfig().disableSysoutLogging();
-
- DataStream<Integer> source1 = env.addSource(new MyErroneousTimestampSource());
-
- source1
- .map(new IdentityMap())
- .addSink(new NoOpSink<Integer>());
-
- env.execute();
- }
-
- /**
- * This tests whether the program throws an exception when a regular source tries
- * to emit with timestamp.
- */
- @Test(expected = ProgramInvocationException.class)
- public void testSourceEmitWithTimestamp() throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort());
- env.setParallelism(PARALLELISM);
- env.getConfig().disableSysoutLogging();
-
- DataStream<Integer> source1 = env.addSource(new MyErroneousSource());
-
- source1
- .map(new IdentityMap())
- .addSink(new NoOpSink<Integer>());
-
- env.execute();
- }
-
- /**
- * This tests whether the program throws an exception when a regular source tries
- * to emit a watermark.
- */
- @Test(expected = ProgramInvocationException.class)
- public void testSourceEmitWatermark() throws Exception {
- StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort());
- env.setParallelism(PARALLELISM);
- env.getConfig().disableSysoutLogging();
-
- DataStream<Integer> source1 = env.addSource(new MyErroneousWatermarkSource());
-
- source1
- .map(new IdentityMap())
- .addSink(new NoOpSink<Integer>());
-
- env.execute();
- }
-
- @SuppressWarnings("unchecked")
- public static class CustomOperator extends AbstractStreamOperator<Integer> implements OneInputStreamOperator<Integer, Integer> {
-
- List<Watermark> watermarks;
- public static List<Watermark>[] finalWatermarks = new List[PARALLELISM];
- private long oldTimestamp;
-
- @Override
- public void processElement(StreamRecord<Integer> element) throws Exception {
- if (element.getTimestamp() != element.getValue()) {
- Assert.fail("Timestamps are not properly handled.");
- }
- oldTimestamp = element.getTimestamp();
- output.collect(element);
- }
-
- @Override
- public void processWatermark(Watermark mark) throws Exception {
- watermarks.add(mark);
- latch.trigger();
- output.emitWatermark(mark);
- }
-
- @Override
- public void open() throws Exception {
- super.open();
- watermarks = new ArrayList<Watermark>();
- }
-
- @Override
- public void close() throws Exception {
- super.close();
- finalWatermarks[getRuntimeContext().getIndexOfThisSubtask()] = watermarks;
- }
- }
-
- public static class TimestampCheckingOperator extends AbstractStreamOperator<Integer> implements OneInputStreamOperator<Integer, Integer> {
-
- @Override
- public void processElement(StreamRecord<Integer> element) throws Exception {
- if (element.getTimestamp() != element.getValue()) {
- Assert.fail("Timestamps are not properly handled.");
- }
- output.collect(element);
- }
-
- @Override
- public void processWatermark(Watermark mark) throws Exception {
- }
- }
-
- public static class DisabledTimestampCheckingOperator extends AbstractStreamOperator<Integer> implements OneInputStreamOperator<Integer, Integer> {
-
- @Override
- public void processElement(StreamRecord<Integer> element) throws Exception {
- if (element.getTimestamp() != 0) {
- Assert.fail("Timestamps are not properly handled.");
- }
- output.collect(element);
- }
-
- @Override
- public void processWatermark(Watermark mark) throws Exception {
- }
- }
-
- public static class IdentityCoMap implements CoMapFunction<Integer, Integer, Integer> {
- @Override
- public Integer map1(Integer value) throws Exception {
- return value;
- }
-
- @Override
- public Integer map2(Integer value) throws Exception {
- return value;
- }
- }
-
- public static class IdentityMap implements MapFunction<Integer, Integer> {
- @Override
- public Integer map(Integer value) throws Exception {
- return value;
- }
- }
-
- public static class MyTimestampSource implements EventTimeSourceFunction<Integer> {
-
- long initialTime;
- int numWatermarks;
-
- public MyTimestampSource(long initialTime, int numWatermarks) {
- this.initialTime = initialTime;
- this.numWatermarks = numWatermarks;
- }
-
- @Override
- public void run(SourceContext<Integer> ctx) throws Exception {
- for (int i = 0; i < numWatermarks; i++) {
- ctx.collectWithTimestamp(i, initialTime + i);
- ctx.emitWatermark(new Watermark(initialTime + i));
- }
- }
-
- @Override
- public void cancel() {
-
- }
- }
-
- public static class MyNonWatermarkingSource implements SourceFunction<Integer> {
-
- int numWatermarks;
-
- public MyNonWatermarkingSource(int numWatermarks) {
- this.numWatermarks = numWatermarks;
- }
-
- @Override
- public void run(SourceContext<Integer> ctx) throws Exception {
- for (int i = 0; i < numWatermarks; i++) {
- ctx.collect(i);
- }
- }
-
- @Override
- public void cancel() {
-
- }
- }
-
- // This is a event-time source. This should only emit elements with timestamps. The test should
- // therefore throw an exception
- public static class MyErroneousTimestampSource implements EventTimeSourceFunction<Integer> {
-
- @Override
- public void run(SourceContext<Integer> ctx) throws Exception {
- for (int i = 0; i < 10; i++) {
- ctx.collect(i);
- }
- }
-
- @Override
- public void cancel() {
-
- }
- }
-
- // This is a normal source. This should only emit elements without timestamps. The test should
- // therefore throw an exception
- public static class MyErroneousSource implements SourceFunction<Integer> {
-
- @Override
- public void run(SourceContext<Integer> ctx) throws Exception {
- for (int i = 0; i < 10; i++) {
- ctx.collectWithTimestamp(i, 0L);
- }
- }
-
- @Override
- public void cancel() {
-
- }
- }
-
- // This is a normal source. This should only emit elements without timestamps. This also
- // must not emit watermarks. The test should therefore throw an exception
- public static class MyErroneousWatermarkSource implements SourceFunction<Integer> {
-
- @Override
- public void run(SourceContext<Integer> ctx) throws Exception {
- for (int i = 0; i < 10; i++) {
- ctx.collect(i);
- ctx.emitWatermark(new Watermark(0L));
- }
- }
-
- @Override
- public void cancel() {
-
- }
- }
-}