You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/21 11:03:36 UTC

[20/51] [partial] flink git commit: [FLINK-2877] Move Streaming API out of Staging package

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
deleted file mode 100644
index 232485d..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
+++ /dev/null
@@ -1,281 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.tasks;
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.streaming.api.checkpoint.Checkpointed;
-import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.operators.StreamSource;
-import org.apache.flink.streaming.util.TestHarnessUtil;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import java.io.Serializable;
-import java.util.List;
-import java.util.Random;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * These tests verify that the RichFunction methods are called (in correct order). And that
- * checkpointing/element emission don't occur concurrently.
- */
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({ResultPartitionWriter.class})
-public class SourceStreamTaskTest {
-
-
-	/**
-	 * This test verifies that open() and close() are correctly called by the StreamTask.
-	 */
-	@Test
-	public void testOpenClose() throws Exception {
-		final SourceStreamTask<String> sourceTask = new SourceStreamTask<String>();
-		final StreamTaskTestHarness<String> testHarness = new StreamTaskTestHarness<String>(sourceTask, BasicTypeInfo.STRING_TYPE_INFO);
-
-		StreamConfig streamConfig = testHarness.getStreamConfig();
-		StreamSource<String> sourceOperator = new StreamSource<String>(new OpenCloseTestSource());
-		streamConfig.setStreamOperator(sourceOperator);
-
-		testHarness.invoke();
-		testHarness.waitForTaskCompletion();
-
-		Assert.assertTrue("RichFunction methods where not called.", OpenCloseTestSource.closeCalled);
-
-		List<String> resultElements = TestHarnessUtil.getRawElementsFromOutput(testHarness.getOutput());
-		Assert.assertEquals(10, resultElements.size());
-	}
-
-	/**
-	 * This test ensures that the SourceStreamTask properly serializes checkpointing
-	 * and element emission. This also verifies that there are no concurrent invocations
-	 * of the checkpoint method on the source operator.
-	 *
-	 * The source emits elements and performs checkpoints. We have several checkpointer threads
-	 * that fire checkpoint requests at the source task.
-	 *
-	 * If element emission and checkpointing are not in series the count of elements at the
-	 * beginning of a checkpoint and at the end of a checkpoint are not the same because the
-	 * source kept emitting elements while the checkpoint was ongoing.
-	 */
-	@Test
-	@SuppressWarnings("unchecked")
-	public void testCheckpointing() throws Exception {
-		final int NUM_ELEMENTS = 100;
-		final int NUM_CHECKPOINTS = 100;
-		final int NUM_CHECKPOINTERS = 1;
-		final int CHECKPOINT_INTERVAL = 5; // in ms
-		final int SOURCE_CHECKPOINT_DELAY = 1000; // how many random values we sum up in storeCheckpoint
-		final int SOURCE_READ_DELAY = 1; // in ms
-
-		ExecutorService executor = Executors.newFixedThreadPool(10);
-		try {
-			final TupleTypeInfo<Tuple2<Long, Integer>> typeInfo = new TupleTypeInfo<Tuple2<Long, Integer>>(BasicTypeInfo.LONG_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO);
-			final SourceStreamTask<Tuple2<Long, Integer>> sourceTask = new SourceStreamTask<Tuple2<Long, Integer>>();
-			final StreamTaskTestHarness<Tuple2<Long, Integer>> testHarness = new StreamTaskTestHarness<Tuple2<Long, Integer>>(sourceTask, typeInfo);
-	
-			StreamConfig streamConfig = testHarness.getStreamConfig();
-			StreamSource<Tuple2<Long, Integer>> sourceOperator = new StreamSource<Tuple2<Long, Integer>>(new MockSource(NUM_ELEMENTS, SOURCE_CHECKPOINT_DELAY, SOURCE_READ_DELAY));
-			streamConfig.setStreamOperator(sourceOperator);
-			
-			// prepare the 
-			
-			Future<Boolean>[] checkpointerResults = new Future[NUM_CHECKPOINTERS];
-	
-			// invoke this first, so the tasks are actually running when the checkpoints are scheduled
-			testHarness.invoke();
-			
-			for (int i = 0; i < NUM_CHECKPOINTERS; i++) {
-				checkpointerResults[i] = executor.submit(new Checkpointer(NUM_CHECKPOINTS, CHECKPOINT_INTERVAL, sourceTask));
-			}
-			
-			testHarness.waitForTaskCompletion();
-	
-			// Get the result from the checkpointers, if these threw an exception it
-			// will be rethrown here
-			for (int i = 0; i < NUM_CHECKPOINTERS; i++) {
-				if (!checkpointerResults[i].isDone()) {
-					checkpointerResults[i].cancel(true);
-				}
-				if (!checkpointerResults[i].isCancelled()) {
-					checkpointerResults[i].get();
-				}
-			}
-	
-			List<Tuple2<Long, Integer>> resultElements = TestHarnessUtil.getRawElementsFromOutput(testHarness.getOutput());
-			Assert.assertEquals(NUM_ELEMENTS, resultElements.size());
-		}
-		finally {
-			executor.shutdown();
-		}
-	}
-
-	private static class MockSource implements SourceFunction<Tuple2<Long, Integer>>, Checkpointed<Serializable> {
-		private static final long serialVersionUID = 1;
-
-		private int maxElements;
-		private int checkpointDelay;
-		private int readDelay;
-
-		private volatile int count;
-		private volatile long lastCheckpointId = -1;
-
-		private Semaphore semaphore;
-
-		private volatile boolean isRunning = true;
-
-		public MockSource(int maxElements, int checkpointDelay, int readDelay) {
-			this.maxElements = maxElements;
-			this.checkpointDelay = checkpointDelay;
-			this.readDelay = readDelay;
-			this.count = 0;
-			semaphore = new Semaphore(1);
-		}
-
-		@Override
-		public void run(SourceContext<Tuple2<Long, Integer>> ctx) {
-			final Object lockObject = ctx.getCheckpointLock();
-			while (isRunning && count < maxElements) {
-				// simulate some work
-				try {
-					Thread.sleep(readDelay);
-				} catch (InterruptedException e) {
-					e.printStackTrace();
-				}
-
-				synchronized (lockObject) {
-					ctx.collect(new Tuple2<Long, Integer>(lastCheckpointId, count));
-					count++;
-				}
-			}
-		}
-
-		@Override
-		public void cancel() {
-			isRunning = false;
-		}
-
-		@Override
-		public Serializable snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
-			if (!semaphore.tryAcquire()) {
-				Assert.fail("Concurrent invocation of snapshotState.");
-			}
-			int startCount = count;
-			lastCheckpointId = checkpointId;
-
-			long sum = 0;
-			for (int i = 0; i < checkpointDelay; i++) {
-				sum += new Random().nextLong();
-			}
-
-			if (startCount != count) {
-				semaphore.release();
-				// This means that next() was invoked while the snapshot was ongoing
-				Assert.fail("Count is different at start end end of snapshot.");
-			}
-			semaphore.release();
-			return sum;
-		}
-
-		@Override
-		public void restoreState(Serializable state) {
-
-		}
-	}
-
-	/**
-	 * This calls triggerInterrupt on the given task with the given interval.
-	 */
-	private static class Checkpointer implements Callable<Boolean> {
-		private final int numCheckpoints;
-		private final int checkpointInterval;
-		private final AtomicLong checkpointId;
-		private final StreamTask<Tuple2<Long, Integer>, ?> sourceTask;
-
-		public Checkpointer(int numCheckpoints, int checkpointInterval, StreamTask<Tuple2<Long, Integer>, ?> task) {
-			this.numCheckpoints = numCheckpoints;
-			checkpointId = new AtomicLong(0);
-			sourceTask = task;
-			this.checkpointInterval = checkpointInterval;
-		}
-
-		@Override
-		public Boolean call() throws Exception {
-			for (int i = 0; i < numCheckpoints; i++) {
-				long currentCheckpointId = checkpointId.getAndIncrement();
-				sourceTask.triggerCheckpoint(currentCheckpointId, 0L);
-				Thread.sleep(checkpointInterval);
-			}
-			return true;
-		}
-	}
-
-	public static class OpenCloseTestSource extends RichSourceFunction<String> {
-		private static final long serialVersionUID = 1L;
-
-		public static boolean openCalled = false;
-		public static boolean closeCalled = false;
-
-		@Override
-		public void open(Configuration parameters) throws Exception {
-			super.open(parameters);
-			if (closeCalled) {
-				Assert.fail("Close called before open.");
-			}
-			openCalled = true;
-		}
-
-		@Override
-		public void close() throws Exception {
-			super.close();
-			if (!openCalled) {
-				Assert.fail("Open was not called before close.");
-			}
-			closeCalled = true;
-		}
-
-		@Override
-		public void run(SourceContext<String> ctx) throws Exception {
-			if (!openCalled) {
-				Assert.fail("Open was not called before run.");
-			}
-			for (int i = 0; i < 10; i++) {
-				ctx.collect("Hello" + i);
-			}
-		}
-
-		@Override
-		public void cancel() {}
-	}
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
deleted file mode 100644
index 090f7cb..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamMockEnvironment.java
+++ /dev/null
@@ -1,304 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.runtime.tasks;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.UnmodifiableConfiguration;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.core.memory.MemorySegmentFactory;
-import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
-import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
-import org.apache.flink.runtime.event.AbstractEvent;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.io.disk.iomanager.IOManager;
-import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
-import org.apache.flink.runtime.io.network.api.serialization.AdaptiveSpanningRecordDeserializer;
-import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.buffer.BufferProvider;
-import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
-import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
-import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
-import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
-import org.apache.flink.runtime.plugable.DeserializationDelegate;
-import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
-import org.apache.flink.runtime.state.StateHandle;
-import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import java.util.Collections;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.concurrent.Future;
-
-import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyInt;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class StreamMockEnvironment implements Environment {
-
-	private final MemoryManager memManager;
-
-	private final IOManager ioManager;
-
-	private final InputSplitProvider inputSplitProvider;
-
-	private final Configuration jobConfiguration;
-
-	private final Configuration taskConfiguration;
-
-	private final List<InputGate> inputs;
-
-	private final List<ResultPartitionWriter> outputs;
-
-	private final JobID jobID = new JobID();
-
-	private final BroadcastVariableManager bcVarManager = new BroadcastVariableManager();
-
-	private final AccumulatorRegistry accumulatorRegistry;
-
-	private final int bufferSize;
-
-	public StreamMockEnvironment(Configuration jobConfig, Configuration taskConfig, long memorySize,
-									MockInputSplitProvider inputSplitProvider, int bufferSize) {
-		this.jobConfiguration = jobConfig;
-		this.taskConfiguration = taskConfig;
-		this.inputs = new LinkedList<InputGate>();
-		this.outputs = new LinkedList<ResultPartitionWriter>();
-
-		this.memManager = new MemoryManager(memorySize, 1);
-		this.ioManager = new IOManagerAsync();
-		this.inputSplitProvider = inputSplitProvider;
-		this.bufferSize = bufferSize;
-
-		this.accumulatorRegistry = new AccumulatorRegistry(jobID, getExecutionId());
-	}
-
-	public void addInputGate(InputGate gate) {
-		inputs.add(gate);
-	}
-
-	public <T> void addOutput(final Queue<Object> outputList, final TypeSerializer<T> serializer) {
-		try {
-			// The record-oriented writers wrap the buffer writer. We mock it
-			// to collect the returned buffers and deserialize the content to
-			// the output list
-			BufferProvider mockBufferProvider = mock(BufferProvider.class);
-			when(mockBufferProvider.requestBufferBlocking()).thenAnswer(new Answer<Buffer>() {
-
-				@Override
-				public Buffer answer(InvocationOnMock invocationOnMock) throws Throwable {
-					return new Buffer(
-							MemorySegmentFactory.allocateUnpooledSegment(bufferSize),
-							mock(BufferRecycler.class));
-				}
-			});
-
-			ResultPartitionWriter mockWriter = mock(ResultPartitionWriter.class);
-			when(mockWriter.getNumberOfOutputChannels()).thenReturn(1);
-			when(mockWriter.getBufferProvider()).thenReturn(mockBufferProvider);
-
-			final RecordDeserializer<DeserializationDelegate<T>> recordDeserializer = new AdaptiveSpanningRecordDeserializer<DeserializationDelegate<T>>();
-			final NonReusingDeserializationDelegate<T> delegate = new NonReusingDeserializationDelegate<T>(serializer);
-
-			// Add records from the buffer to the output list
-			doAnswer(new Answer<Void>() {
-
-				@Override
-				public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
-					Buffer buffer = (Buffer) invocationOnMock.getArguments()[0];
-
-					recordDeserializer.setNextBuffer(buffer);
-
-					while (recordDeserializer.hasUnfinishedData()) {
-						RecordDeserializer.DeserializationResult result = recordDeserializer.getNextRecord(delegate);
-
-						if (result.isFullRecord()) {
-							outputList.add(delegate.getInstance());
-						}
-
-						if (result == RecordDeserializer.DeserializationResult.LAST_RECORD_FROM_BUFFER
-								|| result == RecordDeserializer.DeserializationResult.PARTIAL_RECORD) {
-							break;
-						}
-					}
-
-					return null;
-				}
-			}).when(mockWriter).writeBuffer(any(Buffer.class), anyInt());
-
-			// Add events to the output list
-			doAnswer(new Answer<Void>() {
-
-				@Override
-				public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
-					AbstractEvent event = (AbstractEvent) invocationOnMock.getArguments()[0];
-
-					outputList.add(event);
-					return null;
-				}
-			}).when(mockWriter).writeEvent(any(AbstractEvent.class), anyInt());
-
-			doAnswer(new Answer<Void>() {
-
-				@Override
-				public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
-					AbstractEvent event = (AbstractEvent) invocationOnMock.getArguments()[0];
-
-					outputList.add(event);
-					return null;
-				}
-			}).when(mockWriter).writeEventToAllChannels(any(AbstractEvent.class));
-
-			outputs.add(mockWriter);
-		}
-		catch (Throwable t) {
-			t.printStackTrace();
-			fail(t.getMessage());
-		}
-	}
-
-	@Override
-	public Configuration getTaskConfiguration() {
-		return this.taskConfiguration;
-	}
-
-	@Override
-	public MemoryManager getMemoryManager() {
-		return this.memManager;
-	}
-
-	@Override
-	public IOManager getIOManager() {
-		return this.ioManager;
-	}
-
-	@Override
-	public JobID getJobID() {
-		return this.jobID;
-	}
-
-	@Override
-	public Configuration getJobConfiguration() {
-		return this.jobConfiguration;
-	}
-
-	@Override
-	public int getNumberOfSubtasks() {
-		return 1;
-	}
-
-	@Override
-	public int getIndexInSubtaskGroup() {
-		return 0;
-	}
-
-	@Override
-	public InputSplitProvider getInputSplitProvider() {
-		return this.inputSplitProvider;
-	}
-
-	@Override
-	public String getTaskName() {
-		return "";
-	}
-
-	@Override
-	public String getTaskNameWithSubtasks() {
-		return "";
-	}
-
-	@Override
-	public ClassLoader getUserClassLoader() {
-		return getClass().getClassLoader();
-	}
-
-	@Override
-	public Map<String, Future<Path>> getDistributedCacheEntries() {
-		return Collections.emptyMap();
-	}
-
-	@Override
-	public ResultPartitionWriter getWriter(int index) {
-		return outputs.get(index);
-	}
-
-	@Override
-	public ResultPartitionWriter[] getAllWriters() {
-		return outputs.toArray(new ResultPartitionWriter[outputs.size()]);
-	}
-
-	@Override
-	public InputGate getInputGate(int index) {
-		return inputs.get(index);
-	}
-
-	@Override
-	public InputGate[] getAllInputGates() {
-		InputGate[] gates = new InputGate[inputs.size()];
-		inputs.toArray(gates);
-		return gates;
-	}
-
-	@Override
-	public JobVertexID getJobVertexId() {
-		return new JobVertexID(new byte[16]);
-	}
-
-	@Override
-	public ExecutionAttemptID getExecutionId() {
-		return new ExecutionAttemptID(0L, 0L);
-	}
-
-	@Override
-	public BroadcastVariableManager getBroadcastVariableManager() {
-		return this.bcVarManager;
-	}
-
-	@Override
-	public AccumulatorRegistry getAccumulatorRegistry() {
-		return accumulatorRegistry;
-	}
-
-	@Override
-	public void acknowledgeCheckpoint(long checkpointId) {
-	}
-
-	@Override
-	public void acknowledgeCheckpoint(long checkpointId, StateHandle<?> state) {
-	}
-
-	@Override
-	public TaskManagerRuntimeInfo getTaskManagerInfo() {
-		return new TaskManagerRuntimeInfo("localhost", new UnmodifiableConfiguration(new Configuration()));
-	}
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
deleted file mode 100644
index 6c48668..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ /dev/null
@@ -1,321 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.runtime.tasks;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.event.AbstractEvent;
-import org.apache.flink.runtime.io.network.partition.consumer.StreamTestSingleInputGate;
-import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.memory.MemoryManager;
-import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
-import org.apache.flink.streaming.api.collector.selector.BroadcastOutputSelectorWrapper;
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.graph.StreamEdge;
-import org.apache.flink.streaming.api.graph.StreamNode;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
-import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
-import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
-import org.apache.flink.util.InstantiationUtil;
-import org.junit.Assert;
-
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-
-/**
- * Test harness for testing a {@link StreamTask}.
- *
- * <p>
- * This mock Invokable provides the task with a basic runtime context and allows pushing elements
- * and watermarks into the task. {@link #getOutput()} can be used to get the emitted elements
- * and events. You are free to modify the retrieved list.
- *
- * <p>
- * After setting up everything the Task can be invoked using {@link #invoke()}. This will start
- * a new Thread to execute the Task. Use {@link #waitForTaskCompletion()} to wait for the Task
- * thread to finish.
- *
- * <p>
- * When using this you need to add the following line to your test class to setup Powermock:
- * {@code @PrepareForTest({ResultPartitionWriter.class})}
- */
-public class StreamTaskTestHarness<OUT> {
-
-	private static final int DEFAULT_MEMORY_MANAGER_SIZE = 1024 * 1024;
-
-	private static final int DEFAULT_NETWORK_BUFFER_SIZE = 1024;
-
-	protected long memorySize = 0;
-	protected int bufferSize = 0;
-
-	protected StreamMockEnvironment mockEnv;
-	protected ExecutionConfig executionConfig;
-	private Configuration jobConfig;
-	private Configuration taskConfig;
-	protected StreamConfig streamConfig;
-
-	private AbstractInvokable task;
-
-	private TypeSerializer<OUT> outputSerializer;
-	private TypeSerializer<StreamElement> outputStreamRecordSerializer;
-
-	private ConcurrentLinkedQueue<Object> outputList;
-
-	protected TaskThread taskThread;
-
-	// These don't get initialized, the one-input/two-input specific test harnesses
-	// must initialize these if they want to simulate input. We have them here so that all the
-	// input related methods only need to be implemented once, in generic form
-	protected int numInputGates;
-	protected int numInputChannelsPerGate;
-	@SuppressWarnings("rawtypes")
-	protected StreamTestSingleInputGate[] inputGates;
-
-	public StreamTaskTestHarness(AbstractInvokable task, TypeInformation<OUT> outputType) {
-		this.task = task;
-		this.memorySize = DEFAULT_MEMORY_MANAGER_SIZE;
-		this.bufferSize = DEFAULT_NETWORK_BUFFER_SIZE;
-
-		this.jobConfig = new Configuration();
-		this.taskConfig = new Configuration();
-		this.executionConfig = new ExecutionConfig();
-		executionConfig.enableTimestamps();
-		try {
-			InstantiationUtil.writeObjectToConfig(executionConfig, this.jobConfig, ExecutionConfig.CONFIG_KEY);
-		} catch (IOException e) {
-			throw new RuntimeException(e);
-		}
-
-		streamConfig = new StreamConfig(taskConfig);
-		streamConfig.setChainStart();
-		streamConfig.setBufferTimeout(0);
-
-		outputSerializer = outputType.createSerializer(executionConfig);
-		outputStreamRecordSerializer = new MultiplexingStreamRecordSerializer<OUT>(outputSerializer);
-	}
-
-	/**
-	 * This must be overwritten for OneInputStreamTask or TwoInputStreamTask test harnesses.
-	 */
-	protected void initializeInputs() throws IOException, InterruptedException {}
-
-	@SuppressWarnings("unchecked")
-	private void initializeOutput() {
-		outputList = new ConcurrentLinkedQueue<Object>();
-
-		mockEnv.addOutput(outputList, outputStreamRecordSerializer);
-
-		streamConfig.setOutputSelectorWrapper(new BroadcastOutputSelectorWrapper<Object>());
-		streamConfig.setNumberOfOutputs(1);
-
-		StreamOperator<OUT> dummyOperator = new AbstractStreamOperator<OUT>() {
-			private static final long serialVersionUID = 1L;
-		};
-
-		List<StreamEdge> outEdgesInOrder = new LinkedList<StreamEdge>();
-		StreamNode sourceVertexDummy = new StreamNode(null, 0, dummyOperator, "source dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class);
-		StreamNode targetVertexDummy = new StreamNode(null, 1, dummyOperator, "target dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class);
-
-		outEdgesInOrder.add(new StreamEdge(sourceVertexDummy, targetVertexDummy, 0, new LinkedList<String>(), new BroadcastPartitioner<Object>()));
-		streamConfig.setOutEdgesInOrder(outEdgesInOrder);
-		streamConfig.setNonChainedOutputs(outEdgesInOrder);
-		streamConfig.setTypeSerializerOut(outputSerializer);
-		streamConfig.setVertexID(0);
-
-	}
-
-	/**
-	 * Invoke the Task. This resets the output of any previous invocation. This will start a new
-	 * Thread to execute the Task in. Use {@link #waitForTaskCompletion()} to wait for the
-	 * Task thread to finish running.
-	 */
-	public void invoke() throws Exception {
-		mockEnv = new StreamMockEnvironment(jobConfig, taskConfig, memorySize, new MockInputSplitProvider(), bufferSize);
-		task.setEnvironment(mockEnv);
-
-		initializeInputs();
-		initializeOutput();
-
-		task.registerInputOutput();
-
-		taskThread = new TaskThread(task);
-		taskThread.start();
-	}
-
-	public void waitForTaskCompletion() throws Exception {
-		if (taskThread == null) {
-			throw new IllegalStateException("Task thread was not started.");
-		}
-
-		taskThread.join();
-		if (taskThread.getError() != null) {
-			throw new Exception("error in task", taskThread.getError());
-		}
-	}
-
-	/**
-	 * Get all the output from the task. This contains StreamRecords and Events interleaved. Use
-	 * {@link org.apache.flink.streaming.util.TestHarnessUtil#getRawElementsFromOutput(java.util.Queue)}}
-	 * to extract only the StreamRecords.
-	 */
-	public ConcurrentLinkedQueue<Object> getOutput() {
-		return outputList;
-	}
-
-	public StreamConfig getStreamConfig() {
-		return streamConfig;
-	}
-
-	private void shutdownIOManager() throws Exception {
-		this.mockEnv.getIOManager().shutdown();
-		Assert.assertTrue("IO Manager has not properly shut down.", this.mockEnv.getIOManager().isProperlyShutDown());
-	}
-
-	private void shutdownMemoryManager() throws Exception {
-		if (this.memorySize > 0) {
-			MemoryManager memMan = this.mockEnv.getMemoryManager();
-			if (memMan != null) {
-				Assert.assertTrue("Memory Manager managed memory was not completely freed.", memMan.verifyEmpty());
-				memMan.shutdown();
-			}
-		}
-	}
-
-	/**
-	 * Sends the element to input gate 0 on channel 0.
-	 */
-	@SuppressWarnings("unchecked")
-	public void processElement(Object element) {
-		inputGates[0].sendElement(element, 0);
-	}
-
-	/**
-	 * Sends the element to the specified channel on the specified input gate.
-	 */
-	@SuppressWarnings("unchecked")
-	public void processElement(Object element, int inputGate, int channel) {
-		inputGates[inputGate].sendElement(element, channel);
-	}
-
-	/**
-	 * Sends the event to input gate 0 on channel 0.
-	 */
-	public void processEvent(AbstractEvent event) {
-		inputGates[0].sendEvent(event, 0);
-	}
-
-	/**
-	 * Sends the event to the specified channel on the specified input gate.
-	 */
-	public void processEvent(AbstractEvent event, int inputGate, int channel) {
-		inputGates[inputGate].sendEvent(event, channel);
-	}
-
-	/**
-	 * This only returns after all input queues are empty.
-	 */
-	public void waitForInputProcessing() {
-
-
-		// first wait for all input queues to be empty
-		try {
-			Thread.sleep(1);
-		} catch (InterruptedException ignored) {}
-		
-		while (true) {
-			boolean allEmpty = true;
-			for (int i = 0; i < numInputGates; i++) {
-				if (!inputGates[i].allQueuesEmpty()) {
-					allEmpty = false;
-				}
-			}
-			try {
-				Thread.sleep(10);
-			} catch (InterruptedException ignored) {}
-			
-			if (allEmpty) {
-				break;
-			}
-		}
-
-		// then wait for the Task Thread to be in a blocked state
-		// Check whether the state is blocked, this should be the case if it cannot
-		// read more input, i.e. all currently available input has been processed.
-		while (true) {
-			Thread.State state = taskThread.getState();
-			if (state == Thread.State.BLOCKED || state == Thread.State.TERMINATED ||
-					state == Thread.State.WAITING || state == Thread.State.TIMED_WAITING) {
-				break;
-			}
-
-			try {
-				Thread.sleep(1);
-			} catch (InterruptedException ignored) {}
-		}
-	}
-
-	/**
-	 * Notifies all input channels on all input gates that no more input will arrive. This
-	 * will usually make the Task exit from his internal loop.
-	 */
-	public void endInput() {
-		for (int i = 0; i < numInputGates; i++) {
-			inputGates[i].endInput();
-		}
-	}
-	
-	// ------------------------------------------------------------------------
-	
-	private class TaskThread extends Thread {
-		
-		private final AbstractInvokable task;
-		
-		private volatile Throwable error;
-
-
-		TaskThread(AbstractInvokable task) {
-			super("Task Thread");
-			this.task = task;
-		}
-
-		@Override
-		public void run() {
-			try {
-				task.invoke();
-				shutdownIOManager();
-				shutdownMemoryManager();
-			}
-			catch (Throwable t) {
-				this.error = t;
-			}
-		}
-
-		public Throwable getError() {
-			return error;
-		}
-	}
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTimerITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTimerITCase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTimerITCase.java
deleted file mode 100644
index cdc2c53..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTimerITCase.java
+++ /dev/null
@@ -1,313 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.runtime.tasks;
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.concurrent.Semaphore;
-
-/**
- * Tests for the timer service of {@code StreamTask}.
- *
- * <p>
- * These tests ensure that exceptions are properly forwarded from the timer thread to
- * the task thread and that operator methods are not invoked concurrently.
- */
-public class StreamTaskTimerITCase extends StreamingMultipleProgramsTestBase {
-
-	/**
-	 * Note: this test fails if we don't have the synchronized block in
-	 * {@link org.apache.flink.streaming.runtime.tasks.SourceStreamTask.SourceOutput}
-	 *
-	 * <p>
-	 * This test never finishes if exceptions from the timer thread are not forwarded. Thus
-	 * a success here means that the exception forwarding works.
-	 */
-	@Test
-	public void testOperatorChainedToSource() throws Exception {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(1);
-
-		DataStream<String> source = env.addSource(new InfiniteTestSource());
-
-		source.transform("Custom Operator", BasicTypeInfo.STRING_TYPE_INFO, new TimerOperator(ChainingStrategy.ALWAYS));
-
-		boolean testSuccess = false;
-		try {
-			env.execute("Timer test");
-		} catch (JobExecutionException e) {
-			if (e.getCause() instanceof TimerException) {
-				TimerException te = (TimerException) e.getCause();
-				if (te.getCause() instanceof RuntimeException) {
-					RuntimeException re = (RuntimeException) te.getCause();
-					if (re.getMessage().equals("TEST SUCCESS")) {
-						testSuccess = true;
-					} else {
-						throw e;
-					}
-				} else {
-					throw e;
-				}
-			} else {
-				throw e;
-			}
-		}
-		Assert.assertTrue(testSuccess);
-	}
-
-	/**
-	 * Note: this test fails if we don't have the synchronized block in
-	 * {@link org.apache.flink.streaming.runtime.tasks.SourceStreamTask.SourceOutput}
-	 */
-	@Test
-	public void testOneInputOperatorWithoutChaining() throws Exception {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(1);
-
-		DataStream<String> source = env.addSource(new InfiniteTestSource());
-
-		source.transform("Custom Operator", BasicTypeInfo.STRING_TYPE_INFO, new TimerOperator(ChainingStrategy.NEVER));
-
-		boolean testSuccess = false;
-		try {
-			env.execute("Timer test");
-		} catch (JobExecutionException e) {
-			if (e.getCause() instanceof TimerException) {
-				TimerException te = (TimerException) e.getCause();
-				if (te.getCause() instanceof RuntimeException) {
-					RuntimeException re = (RuntimeException) te.getCause();
-					if (re.getMessage().equals("TEST SUCCESS")) {
-						testSuccess = true;
-					} else {
-						throw e;
-					}
-				} else {
-					throw e;
-				}
-			} else {
-				throw e;
-			}
-		}
-		Assert.assertTrue(testSuccess);
-	}
-
-	/**
-	 * Note: this test fails if we don't have the synchronized block in
-	 * {@link org.apache.flink.streaming.runtime.tasks.SourceStreamTask.SourceOutput}
-	 */
-	@Test
-	public void testTwoInputOperatorWithoutChaining() throws Exception {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-		env.setParallelism(1);
-
-		DataStream<String> source = env.addSource(new InfiniteTestSource());
-
-		source.connect(source).transform(
-				"Custom Operator",
-				BasicTypeInfo.STRING_TYPE_INFO,
-				new TwoInputTimerOperator(ChainingStrategy.NEVER));
-
-		boolean testSuccess = false;
-		try {
-			env.execute("Timer test");
-		} catch (JobExecutionException e) {
-			if (e.getCause() instanceof TimerException) {
-				TimerException te = (TimerException) e.getCause();
-				if (te.getCause() instanceof RuntimeException) {
-					RuntimeException re = (RuntimeException) te.getCause();
-					if (re.getMessage().equals("TEST SUCCESS")) {
-						testSuccess = true;
-					} else {
-						throw e;
-					}
-				} else {
-					throw e;
-				}
-			} else {
-				throw e;
-			}
-		}
-		Assert.assertTrue(testSuccess);
-	}
-
-	public static class TimerOperator extends AbstractStreamOperator<String> implements OneInputStreamOperator<String, String>, Triggerable {
-		private static final long serialVersionUID = 1L;
-
-		int numTimers = 0;
-		int numElements = 0;
-
-		private boolean first = true;
-
-		private Semaphore semaphore = new Semaphore(1);
-
-		public TimerOperator(ChainingStrategy chainingStrategy) {
-			setChainingStrategy(chainingStrategy);
-		}
-
-		@Override
-		public void processElement(StreamRecord<String> element) throws Exception {
-			if (!semaphore.tryAcquire()) {
-				Assert.fail("Concurrent invocation of operator functions.");
-			}
-
-			if (first) {
-				registerTimer(System.currentTimeMillis() + 100, this);
-				first = false;
-			}
-			numElements++;
-
-			semaphore.release();
-		}
-
-		@Override
-		public void trigger(long time) throws Exception {
-			if (!semaphore.tryAcquire()) {
-				Assert.fail("Concurrent invocation of operator functions.");
-			}
-
-			try {
-				numTimers++;
-				throwIfDone();
-				registerTimer(System.currentTimeMillis() + 1, this);
-			} finally {
-				semaphore.release();
-			}
-		}
-
-		private void throwIfDone() {
-			if (numTimers > 1000 && numElements > 10_000) {
-				throw new RuntimeException("TEST SUCCESS");
-			}
-		}
-
-		@Override
-		public void processWatermark(Watermark mark) throws Exception {
-			//ignore
-		}
-	}
-
-	public static class TwoInputTimerOperator extends AbstractStreamOperator<String> implements TwoInputStreamOperator<String, String, String>, Triggerable {
-		private static final long serialVersionUID = 1L;
-
-		int numTimers = 0;
-		int numElements = 0;
-
-		private boolean first = true;
-
-		private Semaphore semaphore = new Semaphore(1);
-
-		public TwoInputTimerOperator(ChainingStrategy chainingStrategy) {
-			setChainingStrategy(chainingStrategy);
-		}
-
-		@Override
-		public void processElement1(StreamRecord<String> element) throws Exception {
-			if (!semaphore.tryAcquire()) {
-				Assert.fail("Concurrent invocation of operator functions.");
-			}
-
-			if (first) {
-				registerTimer(System.currentTimeMillis() + 100, this);
-				first = false;
-			}
-			numElements++;
-
-			semaphore.release();
-		}
-
-		@Override
-		public void processElement2(StreamRecord<String> element) throws Exception {
-			if (!semaphore.tryAcquire()) {
-				Assert.fail("Concurrent invocation of operator functions.");
-			}
-
-			if (first) {
-				registerTimer(System.currentTimeMillis() + 100, this);
-				first = false;
-			}
-			numElements++;
-
-			semaphore.release();
-		}
-
-
-		@Override
-		public void trigger(long time) throws Exception {
-			if (!semaphore.tryAcquire()) {
-				Assert.fail("Concurrent invocation of operator functions.");
-			}
-
-			try {
-				numTimers++;
-				throwIfDone();
-				registerTimer(System.currentTimeMillis() + 1, this);
-			} finally {
-				semaphore.release();
-			}
-		}
-
-		private void throwIfDone() {
-			if (numTimers > 1000 && numElements > 10_000) {
-				throw new RuntimeException("TEST SUCCESS");
-			}
-		}
-
-		@Override
-		public void processWatermark1(Watermark mark) throws Exception {
-			//ignore
-		}
-
-		@Override
-		public void processWatermark2(Watermark mark) throws Exception {
-			//ignore
-		}
-	}
-
-
-	private static class InfiniteTestSource implements SourceFunction<String> {
-		private static final long serialVersionUID = 1L;
-		private volatile boolean running = true;
-
-		@Override
-		public void run(SourceContext<String> ctx) throws Exception {
-			while (running) {
-				ctx.collect("hello");
-			}
-		}
-
-		@Override
-		public void cancel() {
-			running = false;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
deleted file mode 100644
index f87d7ea..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTest.java
+++ /dev/null
@@ -1,373 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.runtime.tasks;
-
-
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
-import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.streaming.api.functions.co.CoMapFunction;
-import org.apache.flink.streaming.api.functions.co.RichCoMapFunction;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.operators.co.CoStreamMap;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.TestHarnessUtil;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-
-import java.util.List;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-/**
- * Tests for {@link org.apache.flink.streaming.runtime.tasks.TwoInputStreamTask}. Theses tests
- * implicitly also test the {@link org.apache.flink.streaming.runtime.io.StreamTwoInputProcessor}.
- *
- * <p>
- * Note:<br>
- * We only use a {@link CoStreamMap} operator here. We also test the individual operators but Map is
- * used as a representative to test TwoInputStreamTask, since TwoInputStreamTask is used for all
- * TwoInputStreamOperators.
- */
-@RunWith(PowerMockRunner.class)
-@PrepareForTest({ResultPartitionWriter.class})
-public class TwoInputStreamTaskTest {
-
-	/**
-	 * This test verifies that open() and close() are correctly called. This test also verifies
-	 * that timestamps of emitted elements are correct. {@link CoStreamMap} assigns the input
-	 * timestamp to emitted elements.
-	 */
-	@Test
-	@SuppressWarnings("unchecked")
-	public void testOpenCloseAndTimestamps() throws Exception {
-		final TwoInputStreamTask<String, Integer, String> coMapTask = new TwoInputStreamTask<String, Integer, String>();
-		final TwoInputStreamTaskTestHarness<String, Integer, String> testHarness = new TwoInputStreamTaskTestHarness<String, Integer, String>(coMapTask, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
-
-		StreamConfig streamConfig = testHarness.getStreamConfig();
-		CoStreamMap<String, Integer, String> coMapOperator = new CoStreamMap<String, Integer, String>(new TestOpenCloseMapFunction());
-		streamConfig.setStreamOperator(coMapOperator);
-
-		long initialTime = 0L;
-		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
-
-		testHarness.invoke();
-
-		testHarness.processElement(new StreamRecord<String>("Hello", initialTime + 1), 0, 0);
-		expectedOutput.add(new StreamRecord<String>("Hello", initialTime + 1));
-
-		// wait until the input is processed to ensure ordering of the output
-		testHarness.waitForInputProcessing();
-
-		testHarness.processElement(new StreamRecord<Integer>(1337, initialTime + 2), 1, 0);
-
-		expectedOutput.add(new StreamRecord<String>("1337", initialTime + 2));
-
-		testHarness.endInput();
-
-		testHarness.waitForTaskCompletion();
-
-		Assert.assertTrue("RichFunction methods where not called.", TestOpenCloseMapFunction.closeCalled);
-
-		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-	}
-
-	/**
-	 * This test verifies that watermarks are correctly forwarded. This also checks whether
-	 * watermarks are forwarded only when we have received watermarks from all inputs. The
-	 * forwarded watermark must be the minimum of the watermarks of all inputs.
-	 */
-	@Test
-	@SuppressWarnings("unchecked")
-	public void testWatermarkForwarding() throws Exception {
-		final TwoInputStreamTask<String, Integer, String> coMapTask = new TwoInputStreamTask<String, Integer, String>();
-		final TwoInputStreamTaskTestHarness<String, Integer, String> testHarness = new TwoInputStreamTaskTestHarness<String, Integer, String>(coMapTask, 2, 2, new int[] {1, 2}, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
-
-		StreamConfig streamConfig = testHarness.getStreamConfig();
-		CoStreamMap<String, Integer, String> coMapOperator = new CoStreamMap<String, Integer, String>(new IdentityMap());
-		streamConfig.setStreamOperator(coMapOperator);
-
-		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
-		long initialTime = 0L;
-
-		testHarness.invoke();
-
-		testHarness.processElement(new Watermark(initialTime), 0, 0);
-		testHarness.processElement(new Watermark(initialTime), 0, 1);
-
-		testHarness.processElement(new Watermark(initialTime), 1, 0);
-
-
-		// now the output should still be empty
-		testHarness.waitForInputProcessing();
-		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-
-		testHarness.processElement(new Watermark(initialTime), 1, 1);
-
-		// now the watermark should have propagated, Map simply forward Watermarks
-		testHarness.waitForInputProcessing();
-		expectedOutput.add(new Watermark(initialTime));
-		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-
-		// contrary to checkpoint barriers these elements are not blocked by watermarks
-		testHarness.processElement(new StreamRecord<String>("Hello", initialTime), 0, 0);
-		testHarness.processElement(new StreamRecord<Integer>(42, initialTime), 1, 1);
-		expectedOutput.add(new StreamRecord<String>("Hello", initialTime));
-		expectedOutput.add(new StreamRecord<String>("42", initialTime));
-
-		testHarness.waitForInputProcessing();
-		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-
-		testHarness.processElement(new Watermark(initialTime + 4), 0, 0);
-		testHarness.processElement(new Watermark(initialTime + 3), 0, 1);
-		testHarness.processElement(new Watermark(initialTime + 3), 1, 0);
-		testHarness.processElement(new Watermark(initialTime + 2), 1, 1);
-
-		// check whether we get the minimum of all the watermarks, this must also only occur in
-		// the output after the two StreamRecords
-		expectedOutput.add(new Watermark(initialTime + 2));
-		testHarness.waitForInputProcessing();
-		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-
-
-		// advance watermark from one of the inputs, now we should get a now one since the
-		// minimum increases
-		testHarness.processElement(new Watermark(initialTime + 4), 1, 1);
-		testHarness.waitForInputProcessing();
-		expectedOutput.add(new Watermark(initialTime + 3));
-		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-
-		// advance the other two inputs, now we should get a new one since the
-		// minimum increases again
-		testHarness.processElement(new Watermark(initialTime + 4), 0, 1);
-		testHarness.processElement(new Watermark(initialTime + 4), 1, 0);
-		testHarness.waitForInputProcessing();
-		expectedOutput.add(new Watermark(initialTime + 4));
-		TestHarnessUtil.assertOutputEquals("Output was not correct.", expectedOutput, testHarness.getOutput());
-
-		testHarness.endInput();
-
-		testHarness.waitForTaskCompletion();
-
-		List<String> resultElements = TestHarnessUtil.getRawElementsFromOutput(testHarness.getOutput());
-		Assert.assertEquals(2, resultElements.size());
-	}
-
-	/**
-	 * This test verifies that checkpoint barriers are correctly forwarded.
-	 */
-	@Test
-	@SuppressWarnings("unchecked")
-	public void testCheckpointBarriers() throws Exception {
-		final TwoInputStreamTask<String, Integer, String> coMapTask = new TwoInputStreamTask<String, Integer, String>();
-		final TwoInputStreamTaskTestHarness<String, Integer, String> testHarness = new TwoInputStreamTaskTestHarness<String, Integer, String>(coMapTask, 2, 2, new int[] {1, 2}, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
-
-		StreamConfig streamConfig = testHarness.getStreamConfig();
-		CoStreamMap<String, Integer, String> coMapOperator = new CoStreamMap<String, Integer, String>(new IdentityMap());
-		streamConfig.setStreamOperator(coMapOperator);
-
-		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
-		long initialTime = 0L;
-
-		testHarness.invoke();
-
-		testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 0);
-
-		// This element should be buffered since we received a checkpoint barrier on
-		// this input
-		testHarness.processElement(new StreamRecord<String>("Hello-0-0", initialTime), 0, 0);
-
-		// This one should go through
-		testHarness.processElement(new StreamRecord<String>("Ciao-0-0", initialTime), 0, 1);
-		expectedOutput.add(new StreamRecord<String>("Ciao-0-0", initialTime));
-
-		// These elements should be forwarded, since we did not yet receive a checkpoint barrier
-		// on that input, only add to same input, otherwise we would not know the ordering
-		// of the output since the Task might read the inputs in any order
-		testHarness.processElement(new StreamRecord<Integer>(11, initialTime), 1, 1);
-		testHarness.processElement(new StreamRecord<Integer>(111, initialTime), 1, 1);
-		expectedOutput.add(new StreamRecord<String>("11", initialTime));
-		expectedOutput.add(new StreamRecord<String>("111", initialTime));
-
-		testHarness.waitForInputProcessing();
-		// we should not yet see the barrier, only the two elements from non-blocked input
-		TestHarnessUtil.assertOutputEquals("Output was not correct.",
-				testHarness.getOutput(),
-				expectedOutput);
-
-		testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 1);
-		testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 0);
-		testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 1);
-
-		testHarness.waitForInputProcessing();
-
-		// now we should see the barrier and after that the buffered elements
-		expectedOutput.add(new CheckpointBarrier(0, 0));
-		expectedOutput.add(new StreamRecord<String>("Hello-0-0", initialTime));
-		TestHarnessUtil.assertOutputEquals("Output was not correct.",
-				testHarness.getOutput(),
-				expectedOutput);
-
-		testHarness.endInput();
-
-		testHarness.waitForTaskCompletion();
-
-		List<String> resultElements = TestHarnessUtil.getRawElementsFromOutput(testHarness.getOutput());
-		Assert.assertEquals(4, resultElements.size());
-	}
-
-	/**
-	 * This test verifies that checkpoint barriers and barrier buffers work correctly with
-	 * concurrent checkpoint barriers where one checkpoint is "overtaking" another checkpoint, i.e.
-	 * some inputs receive barriers from an earlier checkpoint, thereby blocking,
-	 * then all inputs receive barriers from a later checkpoint.
-	 */
-	@Test
-	@SuppressWarnings("unchecked")
-	public void testOvertakingCheckpointBarriers() throws Exception {
-		final TwoInputStreamTask<String, Integer, String> coMapTask = new TwoInputStreamTask<String, Integer, String>();
-		final TwoInputStreamTaskTestHarness<String, Integer, String> testHarness = new TwoInputStreamTaskTestHarness<String, Integer, String>(coMapTask, 2, 2, new int[] {1, 2}, BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.INT_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
-
-		StreamConfig streamConfig = testHarness.getStreamConfig();
-		CoStreamMap<String, Integer, String> coMapOperator = new CoStreamMap<String, Integer, String>(new IdentityMap());
-		streamConfig.setStreamOperator(coMapOperator);
-
-		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<Object>();
-		long initialTime = 0L;
-
-		testHarness.invoke();
-
-		testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 0);
-
-		// These elements should be buffered until we receive barriers from
-		// all inputs
-		testHarness.processElement(new StreamRecord<String>("Hello-0-0", initialTime), 0, 0);
-		testHarness.processElement(new StreamRecord<String>("Ciao-0-0", initialTime), 0, 0);
-
-		// These elements should be forwarded, since we did not yet receive a checkpoint barrier
-		// on that input, only add to same input, otherwise we would not know the ordering
-		// of the output since the Task might read the inputs in any order
-		testHarness.processElement(new StreamRecord<Integer>(42, initialTime), 1, 1);
-		testHarness.processElement(new StreamRecord<Integer>(1337, initialTime), 1, 1);
-		expectedOutput.add(new StreamRecord<String>("42", initialTime));
-		expectedOutput.add(new StreamRecord<String>("1337", initialTime));
-
-		testHarness.waitForInputProcessing();
-		// we should not yet see the barrier, only the two elements from non-blocked input
-		TestHarnessUtil.assertOutputEquals("Output was not correct.",
-				expectedOutput,
-				testHarness.getOutput());
-
-		// Now give a later barrier to all inputs, this should unblock the first channel,
-		// thereby allowing the two blocked elements through
-		testHarness.processEvent(new CheckpointBarrier(1, 1), 0, 0);
-		testHarness.processEvent(new CheckpointBarrier(1, 1), 0, 1);
-		testHarness.processEvent(new CheckpointBarrier(1, 1), 1, 0);
-		testHarness.processEvent(new CheckpointBarrier(1, 1), 1, 1);
-
-		expectedOutput.add(new StreamRecord<String>("Hello-0-0", initialTime));
-		expectedOutput.add(new StreamRecord<String>("Ciao-0-0", initialTime));
-		expectedOutput.add(new CheckpointBarrier(1, 1));
-
-		testHarness.waitForInputProcessing();
-
-		TestHarnessUtil.assertOutputEquals("Output was not correct.",
-				expectedOutput,
-				testHarness.getOutput());
-
-
-		// Then give the earlier barrier, these should be ignored
-		testHarness.processEvent(new CheckpointBarrier(0, 0), 0, 1);
-		testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 0);
-		testHarness.processEvent(new CheckpointBarrier(0, 0), 1, 1);
-
-		testHarness.waitForInputProcessing();
-
-
-		testHarness.endInput();
-
-		testHarness.waitForTaskCompletion();
-
-		TestHarnessUtil.assertOutputEquals("Output was not correct.",
-				expectedOutput,
-				testHarness.getOutput());
-	}
-
-	// This must only be used in one test, otherwise the static fields will be changed
-	// by several tests concurrently
-	private static class TestOpenCloseMapFunction extends RichCoMapFunction<String, Integer, String> {
-		private static final long serialVersionUID = 1L;
-
-		public static boolean openCalled = false;
-		public static boolean closeCalled = false;
-
-		@Override
-		public void open(Configuration parameters) throws Exception {
-			super.open(parameters);
-			if (closeCalled) {
-				Assert.fail("Close called before open.");
-			}
-			openCalled = true;
-		}
-
-		@Override
-		public void close() throws Exception {
-			super.close();
-			if (!openCalled) {
-				Assert.fail("Open was not called before close.");
-			}
-			closeCalled = true;
-		}
-
-		@Override
-		public String map1(String value) throws Exception {
-			if (!openCalled) {
-				Assert.fail("Open was not called before run.");
-			}
-			return value;
-		}
-
-		@Override
-		public String map2(Integer value) throws Exception {
-			if (!openCalled) {
-				Assert.fail("Open was not called before run.");
-			}
-			return value.toString();
-		}
-	}
-
-	private static class IdentityMap implements CoMapFunction<String, Integer, String> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public String map1(String value) throws Exception {
-			return value;
-		}
-
-		@Override
-		public String map2(Integer value) throws Exception {
-
-			return value.toString();
-		}
-	}
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
deleted file mode 100644
index 2b20101..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/runtime/tasks/TwoInputStreamTaskTestHarness.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.runtime.tasks;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.io.network.partition.consumer.StreamTestSingleInputGate;
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
-import org.apache.flink.streaming.api.graph.StreamEdge;
-import org.apache.flink.streaming.api.graph.StreamNode;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.StreamOperator;
-import org.apache.flink.streaming.runtime.partitioner.BroadcastPartitioner;
-
-import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
-
-
-/**
- * Test harness for testing a {@link TwoInputStreamTask}.
- *
- * <p>
- * This mock Invokable provides the task with a basic runtime context and allows pushing elements
- * and watermarks into the task. {@link #getOutput()} can be used to get the emitted elements
- * and events. You are free to modify the retrieved list.
- *
- * <p>
- * After setting up everything the Task can be invoked using {@link #invoke()}. This will start
- * a new Thread to execute the Task. Use {@link #waitForTaskCompletion()} to wait for the Task
- * thread to finish. Use {@link #processElement}
- * to send elements to the task. Use
- * {@link #processEvent(org.apache.flink.runtime.event.AbstractEvent)} to send events to the task.
- * Before waiting for the task to finish you must call {@link #endInput()} to signal to the task
- * that data entry is finished.
- *
- * <p>
- * When Elements or Events are offered to the Task they are put into a queue. The input gates
- * of the Task read from this queue. Use {@link #waitForInputProcessing()} to wait until all
- * queues are empty. This must be used after entering some elements before checking the
- * desired output.
- *
- * <p>
- * When using this you need to add the following line to your test class to setup Powermock:
- * {@code @PrepareForTest({ResultPartitionWriter.class})}
- */
-public class TwoInputStreamTaskTestHarness<IN1, IN2, OUT> extends StreamTaskTestHarness<OUT> {
-
-	private TypeInformation<IN1> inputType1;
-	private TypeSerializer<IN1> inputSerializer1;
-
-	private TypeInformation<IN2> inputType2;
-	private TypeSerializer<IN2> inputSerializer2;
-
-	private int[] inputGateAssignment;
-
-	/**
-	 * Creates a test harness with the specified number of input gates and specified number
-	 * of channels per input gate. Parameter inputGateAssignment specifies for each gate whether
-	 * it should be assigned to the first (1), or second (2) input of the task.
-	 */
-	public TwoInputStreamTaskTestHarness(TwoInputStreamTask<IN1, IN2, OUT> task,
-			int numInputGates,
-			int numInputChannelsPerGate,
-			int[] inputGateAssignment,
-			TypeInformation<IN1> inputType1,
-			TypeInformation<IN2> inputType2,
-			TypeInformation<OUT> outputType) {
-		super(task, outputType);
-
-		this.inputType1 = inputType1;
-		inputSerializer1 = inputType1.createSerializer(executionConfig);
-
-		this.inputType2 = inputType2;
-		inputSerializer2 = inputType2.createSerializer(executionConfig);
-
-		this.numInputGates = numInputGates;
-		this.numInputChannelsPerGate = numInputChannelsPerGate;
-		this.inputGateAssignment = inputGateAssignment;
-	}
-
-	/**
-	 * Creates a test harness with one input gate (that has one input channel) per input. The first
-	 * input gate is assigned to the first task input, the second input gate is assigned to the
-	 * second task input.
-	 */
-	public TwoInputStreamTaskTestHarness(TwoInputStreamTask<IN1, IN2, OUT> task,
-			TypeInformation<IN1> inputType1,
-			TypeInformation<IN2> inputType2,
-			TypeInformation<OUT> outputType) {
-		this(task, 2, 1, new int[] {1, 2}, inputType1, inputType2, outputType);
-	}
-
-	@Override
-	protected void initializeInputs() throws IOException, InterruptedException {
-
-		inputGates = new StreamTestSingleInputGate[numInputGates];
-		List<StreamEdge> inPhysicalEdges = new LinkedList<StreamEdge>();
-
-		StreamOperator<IN1> dummyOperator = new AbstractStreamOperator<IN1>() {
-			private static final long serialVersionUID = 1L;
-		};
-
-		StreamNode sourceVertexDummy = new StreamNode(null, 0, dummyOperator, "source dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class);
-		StreamNode targetVertexDummy = new StreamNode(null, 1, dummyOperator, "target dummy", new LinkedList<OutputSelector<?>>(), SourceStreamTask.class);
-
-		for (int i = 0; i < numInputGates; i++) {
-
-			switch (inputGateAssignment[i]) {
-				case 1: {
-					inputGates[i] = new StreamTestSingleInputGate<IN1>(
-							numInputChannelsPerGate,
-							bufferSize,
-							inputSerializer1);
-
-
-					StreamEdge streamEdge = new StreamEdge(sourceVertexDummy,
-							targetVertexDummy,
-							1,
-							new LinkedList<String>(),
-							new BroadcastPartitioner<Object>());
-
-					inPhysicalEdges.add(streamEdge);
-					break;
-				}
-				case 2: {
-					inputGates[i] = new StreamTestSingleInputGate<IN2>(
-							numInputChannelsPerGate,
-							bufferSize,
-							inputSerializer2);
-
-					StreamEdge streamEdge = new StreamEdge(sourceVertexDummy,
-							targetVertexDummy,
-							2,
-							new LinkedList<String>(),
-							new BroadcastPartitioner<Object>());
-
-					inPhysicalEdges.add(streamEdge);
-					break;
-				}
-				default:
-					throw new IllegalStateException("Wrong input gate assignment.");
-			}
-
-			this.mockEnv.addInputGate(inputGates[i].getInputGate());
-		}
-
-		streamConfig.setInPhysicalEdges(inPhysicalEdges);
-		streamConfig.setNumberOfInputs(numInputGates);
-		streamConfig.setTypeSerializerIn1(inputSerializer1);
-		streamConfig.setTypeSerializerIn2(inputSerializer2);
-	}
-
-}
-

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
deleted file mode 100644
index 749e1dd..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/timestamp/TimestampITCase.java
+++ /dev/null
@@ -1,619 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.timestamp;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.client.program.ProgramInvocationException;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.taskmanager.MultiShotLatch;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.AscendingTimestampExtractor;
-import org.apache.flink.streaming.api.functions.TimestampExtractor;
-import org.apache.flink.streaming.api.functions.co.CoMapFunction;
-import org.apache.flink.streaming.api.functions.source.EventTimeSourceFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.util.NoOpSink;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.junit.Assert.fail;
-
-/**
- * Tests for timestamps, watermarks, and event-time sources.
- */
-@SuppressWarnings("serial")
-public class TimestampITCase {
-
-	private static final int NUM_TASK_MANAGERS = 2;
-	private static final int NUM_TASK_SLOTS = 3;
-	private static final int PARALLELISM = NUM_TASK_MANAGERS * NUM_TASK_SLOTS;
-
-	// this is used in some tests to synchronize
-	static MultiShotLatch latch;
-
-
-	private static ForkableFlinkMiniCluster cluster;
-
-	@Before
-	public void setupLatch() {
-		// ensure that we get a fresh latch for each test
-		latch = new MultiShotLatch();
-	}
-
-
-	@BeforeClass
-	public static void startCluster() {
-		try {
-			Configuration config = new Configuration();
-			config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TASK_MANAGERS);
-			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, NUM_TASK_SLOTS);
-			config.setString(ConfigConstants.DEFAULT_EXECUTION_RETRY_DELAY_KEY, "0 ms");
-			config.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 12);
-
-			cluster = new ForkableFlinkMiniCluster(config, false);
-
-			cluster.start();
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail("Failed to start test cluster: " + e.getMessage());
-		}
-	}
-
-	@AfterClass
-	public static void shutdownCluster() {
-		try {
-			cluster.shutdown();
-			cluster = null;
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail("Failed to stop test cluster: " + e.getMessage());
-		}
-	}
-
-	/**
-	 * These check whether custom timestamp emission works at sources and also whether timestamps
-	 * arrive at operators throughout a topology.
-	 *
-	 * <p>
-	 * This also checks whether watermarks keep propagating if a source closes early.
-	 *
-	 * <p>
-	 * This only uses map to test the workings of watermarks in a complete, running topology. All
-	 * tasks and stream operators have dedicated tests that test the watermark propagation
-	 * behaviour.
-	 */
-	@Test
-	public void testWatermarkPropagation() throws Exception {
-		final int NUM_WATERMARKS = 10;
-
-		long initialTime = 0L;
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
-				"localhost", cluster.getLeaderRPCPort());
-		env.setParallelism(PARALLELISM);
-		env.getConfig().disableSysoutLogging();
-		env.getConfig().enableTimestamps();
-
-
-		DataStream<Integer> source1 = env.addSource(new MyTimestampSource(initialTime, NUM_WATERMARKS));
-		DataStream<Integer> source2 = env.addSource(new MyTimestampSource(initialTime, NUM_WATERMARKS / 2));
-
-		source1.union(source2)
-				.map(new IdentityMap())
-				.connect(source2).map(new IdentityCoMap())
-				.transform("Custom Operator", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator())
-				.addSink(new NoOpSink<Integer>());
-
-		env.execute();
-
-		// verify that all the watermarks arrived at the final custom operator
-		for (int i = 0; i < PARALLELISM; i++) {
-			// There can be two cases, either we get NUM_WATERMARKS + 1 watermarks or
-			// (NUM_WATERMARKS / 2) + 1 watermarks. This depends on which source get's to run first.
-			// If source1 runs first we jump directly to +Inf and skip all the intermediate
-			// watermarks. If source2 runs first we see the intermediate watermarks from
-			// NUM_WATERMARKS/2 to +Inf.
-			if (CustomOperator.finalWatermarks[i].size() == NUM_WATERMARKS + 1) {
-				for (int j = 0; j < NUM_WATERMARKS; j++) {
-					if (!CustomOperator.finalWatermarks[i].get(j).equals(new Watermark(initialTime + j))) {
-						System.err.println("All Watermarks: ");
-						for (int k = 0; k <= NUM_WATERMARKS; k++) {
-							System.err.println(CustomOperator.finalWatermarks[i].get(k));
-						}
-
-						Assert.fail("Wrong watermark.");
-					}
-				}
-				if (!CustomOperator.finalWatermarks[i].get(NUM_WATERMARKS).equals(new Watermark(Long.MAX_VALUE))) {
-					System.err.println("All Watermarks: ");
-					for (int k = 0; k <= NUM_WATERMARKS; k++) {
-						System.err.println(CustomOperator.finalWatermarks[i].get(k));
-					}
-
-					Assert.fail("Wrong watermark.");
-				}
-			} else {
-				for (int j = 0; j < NUM_WATERMARKS / 2; j++) {
-					if (!CustomOperator.finalWatermarks[i].get(j).equals(new Watermark(initialTime + j))) {
-						System.err.println("All Watermarks: ");
-						for (int k = 0; k <= NUM_WATERMARKS / 2; k++) {
-							System.err.println(CustomOperator.finalWatermarks[i].get(k));
-						}
-
-						Assert.fail("Wrong watermark.");
-					}
-				}
-				if (!CustomOperator.finalWatermarks[i].get(NUM_WATERMARKS / 2).equals(new Watermark(Long.MAX_VALUE))) {
-					System.err.println("All Watermarks: ");
-					for (int k = 0; k <= NUM_WATERMARKS / 2; k++) {
-						System.err.println(CustomOperator.finalWatermarks[i].get(k));
-					}
-
-					Assert.fail("Wrong watermark.");
-				}
-
-			}
-
-		}
-	}
-
-
-
-	/**
-	 * These check whether timestamps are properly assigned at the sources and handled in
-	 * network transmission and between chained operators when timestamps are enabled.
-	 */
-	@Test
-	public void testTimestampHandling() throws Exception {
-		final int NUM_ELEMENTS = 10;
-
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
-				"localhost", cluster.getLeaderRPCPort());
-		env.setParallelism(PARALLELISM);
-		env.getConfig().disableSysoutLogging();
-		env.getConfig().enableTimestamps();
-
-
-		DataStream<Integer> source1 = env.addSource(new MyTimestampSource(0L, NUM_ELEMENTS));
-		DataStream<Integer> source2 = env.addSource(new MyTimestampSource(0L, NUM_ELEMENTS));
-
-		source1
-				.map(new IdentityMap())
-				.connect(source2).map(new IdentityCoMap())
-				.transform("Custom Operator", BasicTypeInfo.INT_TYPE_INFO, new TimestampCheckingOperator())
-				.addSink(new NoOpSink<Integer>());
-
-
-		env.execute();
-	}
-
-	/**
-	 * These check whether timestamps are properly ignored when they are disabled.
-	 */
-	@Test
-	public void testDisabledTimestamps() throws Exception {
-		final int NUM_ELEMENTS = 10;
-
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment(
-				"localhost", cluster.getLeaderRPCPort());
-		env.setParallelism(PARALLELISM);
-		env.getConfig().disableSysoutLogging();
-		Assert.assertEquals("Timestamps are not disabled by default.",
-				false,
-				env.getConfig().areTimestampsEnabled());
-		env.getConfig().disableTimestamps();
-
-
-		DataStream<Integer> source1 = env.addSource(new MyNonWatermarkingSource(NUM_ELEMENTS));
-		DataStream<Integer> source2 = env.addSource(new MyNonWatermarkingSource(NUM_ELEMENTS));
-
-		source1
-				.map(new IdentityMap())
-				.connect(source2).map(new IdentityCoMap())
-				.transform("Custom Operator", BasicTypeInfo.INT_TYPE_INFO, new DisabledTimestampCheckingOperator())
-				.addSink(new NoOpSink<Integer>());
-
-
-		env.execute();
-	}
-
-	/**
-	 * This thests whether timestamps are properly extracted in the timestamp
-	 * extractor and whether watermarks are also correctly forwared from this with the auto watermark
-	 * interval.
-	 */
-	@Test
-	public void testTimestampExtractorWithAutoInterval() throws Exception {
-		final int NUM_ELEMENTS = 10;
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort());
-		env.setParallelism(1);
-		env.getConfig().disableSysoutLogging();
-		env.getConfig().enableTimestamps();
-		env.getConfig().setAutoWatermarkInterval(10);
-
-
-		DataStream<Integer> source1 = env.addSource(new SourceFunction<Integer>() {
-			@Override
-			public void run(SourceContext<Integer> ctx) throws Exception {
-				int index = 0;
-				while (index < NUM_ELEMENTS) {
-					ctx.collect(index);
-					latch.await();
-					index++;
-				}
-			}
-
-			@Override
-			public void cancel() {
-
-			}
-		});
-
-		DataStream<Integer> extractOp = source1.assignTimestamps(
-				new AscendingTimestampExtractor<Integer>() {
-					@Override
-					public long extractAscendingTimestamp(Integer element, long currentTimestamp) {
-						return element;
-					}
-				});
-
-		extractOp
-				.transform("Watermark Check", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator())
-				.transform("Timestamp Check",
-						BasicTypeInfo.INT_TYPE_INFO,
-						new TimestampCheckingOperator());
-
-		// verify that extractor picks up source parallelism
-		Assert.assertEquals(extractOp.getTransformation().getParallelism(), source1.getTransformation().getParallelism());
-
-		env.execute();
-
-		// verify that we get NUM_ELEMENTS watermarks
-		for (int j = 0; j < NUM_ELEMENTS; j++) {
-			if (!CustomOperator.finalWatermarks[0].get(j).equals(new Watermark(j - 1))) {
-				Assert.fail("Wrong watermark.");
-			}
-		}
-		if (!CustomOperator.finalWatermarks[0].get(NUM_ELEMENTS).equals(new Watermark(Long.MAX_VALUE))) {
-			Assert.fail("Wrong watermark.");
-		}
-	}
-
-	/**
-	 * This thests whether timestamps are properly extracted in the timestamp
-	 * extractor and whether watermark are correctly forwarded from the custom watermark emit
-	 * function.
-	 */
-	@Test
-	public void testTimestampExtractorWithCustomWatermarkEmit() throws Exception {
-		final int NUM_ELEMENTS = 10;
-
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort());
-		env.setParallelism(1);
-		env.getConfig().disableSysoutLogging();
-		env.getConfig().enableTimestamps();
-
-
-		DataStream<Integer> source1 = env.addSource(new SourceFunction<Integer>() {
-			@Override
-			public void run(SourceContext<Integer> ctx) throws Exception {
-				int index = 0;
-				while (index < NUM_ELEMENTS) {
-					ctx.collect(index);
-					latch.await();
-					index++;
-				}
-			}
-
-			@Override
-			public void cancel() {
-
-			}
-		});
-
-		source1.assignTimestamps(new TimestampExtractor<Integer>() {
-			@Override
-			public long extractTimestamp(Integer element, long currentTimestamp) {
-				return element;
-			}
-
-			@Override
-			public long extractWatermark(Integer element, long currentTimestamp) {
-				return element - 1;
-			}
-
-			@Override
-			public long getCurrentWatermark() {
-				return Long.MIN_VALUE;
-			}
-		})
-				.transform("Watermark Check", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator())
-				.transform("Timestamp Check", BasicTypeInfo.INT_TYPE_INFO, new TimestampCheckingOperator());
-
-
-		env.execute();
-
-		// verify that we get NUM_ELEMENTS watermarks
-		for (int j = 0; j < NUM_ELEMENTS; j++) {
-			if (!CustomOperator.finalWatermarks[0].get(j).equals(new Watermark(j - 1))) {
-				Assert.fail("Wrong watermark.");
-			}
-		}
-		if (!CustomOperator.finalWatermarks[0].get(NUM_ELEMENTS).equals(new Watermark(Long.MAX_VALUE))) {
-			Assert.fail("Wrong watermark.");
-		}
-	}
-
-	/**
-	 * This tests whether the program throws an exception when an event-time source tries
-	 * to emit without timestamp.
-	 */
-	@Test(expected = ProgramInvocationException.class)
-	public void testEventTimeSourceEmitWithoutTimestamp() throws Exception {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort());
-		env.setParallelism(PARALLELISM);
-		env.getConfig().disableSysoutLogging();
-
-		DataStream<Integer> source1 = env.addSource(new MyErroneousTimestampSource());
-
-		source1
-				.map(new IdentityMap())
-				.addSink(new NoOpSink<Integer>());
-
-		env.execute();
-	}
-
-	/**
-	 * This tests whether the program throws an exception when a regular source tries
-	 * to emit with timestamp.
-	 */
-	@Test(expected = ProgramInvocationException.class)
-	public void testSourceEmitWithTimestamp() throws Exception {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort());
-		env.setParallelism(PARALLELISM);
-		env.getConfig().disableSysoutLogging();
-
-		DataStream<Integer> source1 = env.addSource(new MyErroneousSource());
-
-		source1
-				.map(new IdentityMap())
-				.addSink(new NoOpSink<Integer>());
-
-		env.execute();
-	}
-
-	/**
-	 * This tests whether the program throws an exception when a regular source tries
-	 * to emit a watermark.
-	 */
-	@Test(expected = ProgramInvocationException.class)
-	public void testSourceEmitWatermark() throws Exception {
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", cluster.getLeaderRPCPort());
-		env.setParallelism(PARALLELISM);
-		env.getConfig().disableSysoutLogging();
-
-		DataStream<Integer> source1 = env.addSource(new MyErroneousWatermarkSource());
-
-		source1
-				.map(new IdentityMap())
-				.addSink(new NoOpSink<Integer>());
-
-		env.execute();
-	}
-
-	@SuppressWarnings("unchecked")
-	public static class CustomOperator extends AbstractStreamOperator<Integer> implements OneInputStreamOperator<Integer, Integer> {
-
-		List<Watermark> watermarks;
-		public static List<Watermark>[] finalWatermarks = new List[PARALLELISM];
-		private long oldTimestamp;
-
-		@Override
-		public void processElement(StreamRecord<Integer> element) throws Exception {
-			if (element.getTimestamp() != element.getValue()) {
-				Assert.fail("Timestamps are not properly handled.");
-			}
-			oldTimestamp = element.getTimestamp();
-			output.collect(element);
-		}
-
-		@Override
-		public void processWatermark(Watermark mark) throws Exception {
-			watermarks.add(mark);
-			latch.trigger();
-			output.emitWatermark(mark);
-		}
-
-		@Override
-		public void open() throws Exception {
-			super.open();
-			watermarks = new ArrayList<Watermark>();
-		}
-
-		@Override
-		public void close() throws Exception {
-			super.close();
-			finalWatermarks[getRuntimeContext().getIndexOfThisSubtask()] = watermarks;
-		}
-	}
-
-	public static class TimestampCheckingOperator extends AbstractStreamOperator<Integer> implements OneInputStreamOperator<Integer, Integer> {
-
-		@Override
-		public void processElement(StreamRecord<Integer> element) throws Exception {
-			if (element.getTimestamp() != element.getValue()) {
-				Assert.fail("Timestamps are not properly handled.");
-			}
-			output.collect(element);
-		}
-
-		@Override
-		public void processWatermark(Watermark mark) throws Exception {
-		}
-	}
-
-	public static class DisabledTimestampCheckingOperator extends AbstractStreamOperator<Integer> implements OneInputStreamOperator<Integer, Integer> {
-
-		@Override
-		public void processElement(StreamRecord<Integer> element) throws Exception {
-			if (element.getTimestamp() != 0) {
-				Assert.fail("Timestamps are not properly handled.");
-			}
-			output.collect(element);
-		}
-
-		@Override
-		public void processWatermark(Watermark mark) throws Exception {
-		}
-	}
-
-	public static class IdentityCoMap implements CoMapFunction<Integer, Integer, Integer> {
-		@Override
-		public Integer map1(Integer value) throws Exception {
-			return value;
-		}
-
-		@Override
-		public Integer map2(Integer value) throws Exception {
-			return value;
-		}
-	}
-
-	public static class IdentityMap implements MapFunction<Integer, Integer> {
-		@Override
-		public Integer map(Integer value) throws Exception {
-			return value;
-		}
-	}
-
-	public static class MyTimestampSource implements EventTimeSourceFunction<Integer> {
-
-		long initialTime;
-		int numWatermarks;
-
-		public MyTimestampSource(long initialTime, int numWatermarks) {
-			this.initialTime = initialTime;
-			this.numWatermarks = numWatermarks;
-		}
-
-		@Override
-		public void run(SourceContext<Integer> ctx) throws Exception {
-			for (int i = 0; i < numWatermarks; i++) {
-				ctx.collectWithTimestamp(i, initialTime + i);
-				ctx.emitWatermark(new Watermark(initialTime + i));
-			}
-		}
-
-		@Override
-		public void cancel() {
-
-		}
-	}
-
-	public static class MyNonWatermarkingSource implements SourceFunction<Integer> {
-
-		int numWatermarks;
-
-		public MyNonWatermarkingSource(int numWatermarks) {
-			this.numWatermarks = numWatermarks;
-		}
-
-		@Override
-		public void run(SourceContext<Integer> ctx) throws Exception {
-			for (int i = 0; i < numWatermarks; i++) {
-				ctx.collect(i);
-			}
-		}
-
-		@Override
-		public void cancel() {
-
-		}
-	}
-
-	// This is a event-time source. This should only emit elements with timestamps. The test should
-	// therefore throw an exception
-	public static class MyErroneousTimestampSource implements EventTimeSourceFunction<Integer> {
-
-		@Override
-		public void run(SourceContext<Integer> ctx) throws Exception {
-			for (int i = 0; i < 10; i++) {
-				ctx.collect(i);
-			}
-		}
-
-		@Override
-		public void cancel() {
-
-		}
-	}
-
-	// This is a normal source. This should only emit elements without timestamps. The test should
-	// therefore throw an exception
-	public static class MyErroneousSource implements SourceFunction<Integer> {
-
-		@Override
-		public void run(SourceContext<Integer> ctx) throws Exception {
-			for (int i = 0; i < 10; i++) {
-				ctx.collectWithTimestamp(i, 0L);
-			}
-		}
-
-		@Override
-		public void cancel() {
-
-		}
-	}
-
-	// This is a normal source. This should only emit elements without timestamps. This also
-	// must not emit watermarks. The test should therefore throw an exception
-	public static class MyErroneousWatermarkSource implements SourceFunction<Integer> {
-
-		@Override
-		public void run(SourceContext<Integer> ctx) throws Exception {
-			for (int i = 0; i < 10; i++) {
-				ctx.collect(i);
-				ctx.emitWatermark(new Watermark(0L));
-			}
-		}
-
-		@Override
-		public void cancel() {
-
-		}
-	}
-}