You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/21 11:03:35 UTC

[19/51] [partial] flink git commit: [FLINK-2877] Move Streaming API out of Staging package

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/EvenOddOutputSelector.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/EvenOddOutputSelector.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/EvenOddOutputSelector.java
deleted file mode 100644
index 9820ef8..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/EvenOddOutputSelector.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.util;
-
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
-
-import java.util.Arrays;
-
-public class EvenOddOutputSelector implements OutputSelector<Integer> {
-	private static final long serialVersionUID = 1L;
-
-	@Override
-	public Iterable<String> select(Integer value) {
-		return value % 2 == 0 ? Arrays.asList("even") : Arrays.asList("odd");
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/FieldAccessorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/FieldAccessorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/FieldAccessorTest.java
deleted file mode 100644
index d35089a..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/FieldAccessorTest.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-// This only tests a fraction of FieldAccessor. The other parts are tested indirectly by AggregationFunctionTest.
-public class FieldAccessorTest {
-
-	@Test
-	@SuppressWarnings("unchecked")
-	public void arrayFieldAccessorTest() {
-		int[] a = new int[]{3,5};
-		FieldAccessor<int[], Integer> fieldAccessor =
-				(FieldAccessor<int[], Integer>) (Object)
-						FieldAccessor.create(1, PrimitiveArrayTypeInfo.getInfoFor(a.getClass()), null);
-
-		assertEquals(Integer.class, fieldAccessor.getFieldType().getTypeClass());
-
-		assertEquals((Integer)a[1], fieldAccessor.get(a));
-
-		a = fieldAccessor.set(a, 6);
-		assertEquals((Integer)a[1], fieldAccessor.get(a));
-
-
-
-		Integer[] b = new Integer[]{3,5};
-		FieldAccessor<Integer[], Integer> fieldAccessor2 =
-				(FieldAccessor<Integer[], Integer>) (Object)
-						FieldAccessor.create(1, BasicArrayTypeInfo.getInfoFor(b.getClass()), null);
-
-		assertEquals(Integer.class, fieldAccessor2.getFieldType().getTypeClass());
-
-		assertEquals((Integer)b[1], fieldAccessor2.get(b));
-
-		b = fieldAccessor2.set(b, 6);
-		assertEquals((Integer)b[1], fieldAccessor2.get(b));
-	}
-
-	@Test
-	@SuppressWarnings("unchecked")
-	public void tupleFieldAccessorOutOfBoundsTest() {
-		try {
-			FieldAccessor<Tuple2<Integer, Integer>, Integer> fieldAccessor =
-					(FieldAccessor<Tuple2<Integer, Integer>, Integer>) (Object)
-							FieldAccessor.create(2, TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, Integer.class),
-									null);
-			fail();
-		} catch (IndexOutOfBoundsException e) {
-			// Nothing to do here
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
deleted file mode 100644
index 0c708c6..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.operators.testutils.MockEnvironment;
-import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.runtime.state.StateBackend;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-import org.mockito.stubbing.OngoingStubbing;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class MockContext<IN, OUT> {
-	
-	private List<OUT> outputs;
-
-	private MockOutput<OUT> output;
-
-	public MockContext(Collection<IN> inputs) {
-		if (inputs.isEmpty()) {
-			throw new RuntimeException("Inputs must not be empty");
-		}
-
-		outputs = new ArrayList<OUT>();
-		output = new MockOutput<OUT>(outputs);
-	}
-
-	public List<OUT> getOutputs() {
-		return outputs;
-	}
-
-	public Output<StreamRecord<OUT>> getOutput() {
-		return output;
-	}
-
-	public static <IN, OUT> List<OUT> createAndExecute(OneInputStreamOperator<IN, OUT> operator, List<IN> inputs) {
-		return createAndExecuteForKeyedStream(operator, inputs, null, null);
-	}
-	
-	public static <IN, OUT, KEY> List<OUT> createAndExecuteForKeyedStream(
-				OneInputStreamOperator<IN, OUT> operator, List<IN> inputs,
-				KeySelector<IN, KEY> keySelector, TypeInformation<KEY> keyType) {
-		
-		MockContext<IN, OUT> mockContext = new MockContext<IN, OUT>(inputs);
-
-		StreamConfig config = new StreamConfig(new Configuration());
-		if (keySelector != null && keyType != null) {
-			config.setStateKeySerializer(keyType.createSerializer(new ExecutionConfig()));
-			config.setStatePartitioner(keySelector);
-		}
-		
-		final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
-		final Object lock = new Object();
-		final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
-				
-		operator.setup(mockTask, config, mockContext.output);
-		try {
-			operator.open();
-
-			StreamRecord<IN> record = new StreamRecord<IN>(null);
-			for (IN in: inputs) {
-				record = record.replace(in);
-				synchronized (lock) {
-					operator.setKeyContextElement(record);
-					operator.processElement(record);
-				}
-			}
-
-			operator.close();
-		} catch (Exception e) {
-			throw new RuntimeException("Cannot invoke operator.", e);
-		} finally {
-			timerService.shutdownNow();
-		}
-
-		return mockContext.getOutputs();
-	}
-
-	private static StreamTask<?, ?> createMockTaskWithTimer(
-			final ScheduledExecutorService timerService, final Object lock)
-	{
-		StreamTask<?, ?> task = mock(StreamTask.class);
-		when(task.getAccumulatorMap()).thenReturn(new HashMap<String, Accumulator<?, ?>>());
-		when(task.getName()).thenReturn("Test task name");
-		when(task.getExecutionConfig()).thenReturn(new ExecutionConfig());
-		when(task.getEnvironment()).thenReturn(new MockEnvironment("MockTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024));
-		when(task.getCheckpointLock()).thenReturn(lock);
-
-		doAnswer(new Answer<Void>() {
-			@Override
-			public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
-				final Long timestamp = (Long) invocationOnMock.getArguments()[0];
-				final Triggerable target = (Triggerable) invocationOnMock.getArguments()[1];
-				timerService.schedule(
-						new Callable<Object>() {
-							@Override
-							public Object call() throws Exception {
-								synchronized (lock) {
-									target.trigger(timestamp);
-								}
-								return null;
-							}
-						},
-						timestamp - System.currentTimeMillis(),
-						TimeUnit.MILLISECONDS);
-				return null;
-			}
-		}).when(task).registerTimer(anyLong(), any(Triggerable.class));
-
-		// ugly Java generic hacks to get the generic state backend into the mock
-		@SuppressWarnings("unchecked")
-		OngoingStubbing<StateBackend<?>> stubbing =
-				(OngoingStubbing<StateBackend<?>>) (OngoingStubbing<?>) when(task.getStateBackend());
-		stubbing.thenReturn(MemoryStateBackend.defaultInstance());
-		
-		return task;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockOutput.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockOutput.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockOutput.java
deleted file mode 100644
index 5371ba0..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockOutput.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import java.io.Serializable;
-import java.util.Collection;
-
-import org.apache.commons.lang3.SerializationUtils;
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-public class MockOutput<T> implements Output<StreamRecord<T>> {
-	private Collection<T> outputs;
-
-	public MockOutput(Collection<T> outputs) {
-		this.outputs = outputs;
-	}
-
-	@Override
-	public void collect(StreamRecord<T> record) {
-		T copied = SerializationUtils.deserialize(SerializationUtils
-				.serialize((Serializable) record.getValue()));
-		outputs.add(copied);
-	}
-
-	@Override
-	public void emitWatermark(Watermark mark) {
-		throw new RuntimeException("THIS MUST BE IMPLEMENTED");
-	}
-
-	@Override
-	public void close() {
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/NoOpIntMap.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/NoOpIntMap.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/NoOpIntMap.java
deleted file mode 100644
index bcb5691..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/NoOpIntMap.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.util;
-
-import org.apache.flink.api.common.functions.MapFunction;
-
-public class NoOpIntMap implements MapFunction<Integer, Integer> {
-	private static final long serialVersionUID = 1L;
-
-	public Integer map(Integer value) throws Exception {
-		return value;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/NoOpSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/NoOpSink.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/NoOpSink.java
deleted file mode 100644
index d398121..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/NoOpSink.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.util;
-
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-
-public final class NoOpSink<T> extends RichSinkFunction<T> {
-	public void invoke(T tuple) {
-
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
deleted file mode 100644
index 01f95bc..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.util;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.ClosureCleaner;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.operators.testutils.MockEnvironment;
-import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.runtime.state.StateBackend;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.mockito.stubbing.OngoingStubbing;
-
-import java.util.Collection;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-/**
- * A test harness for testing a {@link OneInputStreamOperator}.
- *
- * <p>
- * This mock task provides the operator with a basic runtime context and allows pushing elements
- * and watermarks into the operator. {@link java.util.Deque}s containing the emitted elements
- * and watermarks can be retrieved. You are free to modify these.
- */
-public class OneInputStreamOperatorTestHarness<IN, OUT> {
-
-	final OneInputStreamOperator<IN, OUT> operator;
-
-	final ConcurrentLinkedQueue<Object> outputList;
-
-	final StreamConfig config;
-	
-	final ExecutionConfig executionConfig;
-	
-	final Object checkpointLock;
-	
-	
-	public OneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, OUT> operator) {
-		this.operator = operator;
-		this.outputList = new ConcurrentLinkedQueue<Object>();
-		this.config = new StreamConfig(new Configuration());
-		this.executionConfig = new ExecutionConfig();
-		this.checkpointLock = new Object();
-
-		Environment env = new MockEnvironment("MockTwoInputTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024);
-		StreamTask<?, ?> mockTask = mock(StreamTask.class);
-		when(mockTask.getName()).thenReturn("Mock Task");
-		when(mockTask.getCheckpointLock()).thenReturn(checkpointLock);
-		when(mockTask.getConfiguration()).thenReturn(config);
-		when(mockTask.getEnvironment()).thenReturn(env);
-		when(mockTask.getExecutionConfig()).thenReturn(executionConfig);
-		
-		// ugly Java generic hacks
-		@SuppressWarnings("unchecked")
-		OngoingStubbing<StateBackend<?>> stubbing = 
-				(OngoingStubbing<StateBackend<?>>) (OngoingStubbing<?>) when(mockTask.getStateBackend());
-		stubbing.thenReturn(MemoryStateBackend.defaultInstance());
-
-		operator.setup(mockTask, config, new MockOutput());
-	}
-
-	public <K> void configureForKeyedStream(KeySelector<IN, K> keySelector, TypeInformation<K> keyType) {
-		ClosureCleaner.clean(keySelector, false);
-		config.setStatePartitioner(keySelector);
-		config.setStateKeySerializer(keyType.createSerializer(executionConfig));
-	}
-	
-	/**
-	 * Get all the output from the task. This contains StreamRecords and Events interleaved. Use
-	 * {@link org.apache.flink.streaming.util.TestHarnessUtil#getStreamRecordsFromOutput(java.util.List)}
-	 * to extract only the StreamRecords.
-	 */
-	public ConcurrentLinkedQueue<Object> getOutput() {
-		return outputList;
-	}
-
-	/**
-	 * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#open()}
-	 */
-	public void open() throws Exception {
-		operator.open();
-	}
-
-	/**
-	 * Calls close on the operator.
-	 */
-	public void close() throws Exception {
-		operator.close();
-	}
-
-	public void processElement(StreamRecord<IN> element) throws Exception {
-		operator.setKeyContextElement(element);
-		operator.processElement(element);
-	}
-
-	public void processElements(Collection<StreamRecord<IN>> elements) throws Exception {
-		for (StreamRecord<IN> element: elements) {
-			operator.setKeyContextElement(element);
-			operator.processElement(element);
-		}
-	}
-
-	public void processWatermark(Watermark mark) throws Exception {
-		operator.processWatermark(mark);
-	}
-
-	private class MockOutput implements Output<StreamRecord<OUT>> {
-
-		private TypeSerializer<OUT> outputSerializer;
-
-		@Override
-		public void emitWatermark(Watermark mark) {
-			outputList.add(mark);
-		}
-
-		@Override
-		public void collect(StreamRecord<OUT> element) {
-			if (outputSerializer == null) {
-				outputSerializer = TypeExtractor.getForObject(element.getValue()).createSerializer(executionConfig);
-			}
-			outputList.add(new StreamRecord<OUT>(outputSerializer.copy(element.getValue()),
-					element.getTimestamp()));
-		}
-
-		@Override
-		public void close() {
-			// ignore
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/ReceiveCheckNoOpSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/ReceiveCheckNoOpSink.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/ReceiveCheckNoOpSink.java
deleted file mode 100644
index a46ff55..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/ReceiveCheckNoOpSink.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.util;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.junit.Assert.assertTrue;
-
-public final class ReceiveCheckNoOpSink<T> extends RichSinkFunction<T> {
-	private List<T> received;
-
-	public void invoke(T tuple) {
-		received.add(tuple);
-	}
-
-	public void open(Configuration conf) {
-		received = new ArrayList<T>();
-	}
-
-	public void close() {
-		assertTrue(received.size() > 0);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SocketOutputTestBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SocketOutputTestBase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SocketOutputTestBase.java
deleted file mode 100644
index 4ded0fa..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SocketOutputTestBase.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
-import org.apache.flink.test.testdata.WordCountData;
-import org.apache.flink.util.NetUtils;
-
-import org.junit.Assert;
-
-import java.io.BufferedReader;
-import java.io.InputStreamReader;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Set;
-
-/**
- * Test base for streaming programs relying on an open server socket to write to.
- */
-public abstract class SocketOutputTestBase extends StreamingProgramTestBase {
-
-	protected static final String HOST = "localhost";
-	protected static Integer port;
-	protected Set<String> dataReadFromSocket = new HashSet<String>();
-
-	@Override
-	protected void preSubmit() throws Exception {
-		port = NetUtils.getAvailablePort();
-		temporarySocket = createLocalSocket(port);
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		Set<String> expectedData = new HashSet<String>(Arrays.asList(WordCountData.STREAMING_COUNTS_AS_TUPLES.split("\n")));
-		Assert.assertEquals(expectedData, dataReadFromSocket);
-		temporarySocket.close();
-	}
-
-	protected ServerSocket temporarySocket;
-
-	public ServerSocket createLocalSocket(int port) throws Exception {
-		ServerSocket serverSocket = new ServerSocket(port);
-		ServerThread st = new ServerThread(serverSocket);
-		st.start();
-		return serverSocket;
-	}
-
-	protected class ServerThread extends Thread {
-
-		private ServerSocket serverSocket;
-		private Thread t;
-
-		public ServerThread(ServerSocket serverSocket) {
-			this.serverSocket = serverSocket;
-			t = new Thread(this);
-		}
-
-		public void waitForAccept() throws Exception {
-			Socket socket = serverSocket.accept();
-			BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
-			DeserializationSchema<String> schema = new DummyStringSchema();
-			String rawData = in.readLine();
-			while (rawData != null){
-				String string = schema.deserialize(rawData.getBytes());
-				dataReadFromSocket.add(string);
-				rawData = in.readLine();
-			}
-			socket.close();
-		}
-
-		public void run() {
-			try {
-				waitForAccept();
-			} catch (Exception e) {
-				Assert.fail();
-				throw new RuntimeException(e);
-			}
-		}
-
-		@Override
-		public void start() {
-			t.start();
-		}
-	}
-
-	public static class DummyStringSchema implements DeserializationSchema<String>, SerializationSchema<String, byte[]>{
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public boolean isEndOfStream(String nextElement) {
-		return nextElement.equals("q");
-	}
-
-		@Override
-		public byte[] serialize(String element) {
-		return element.getBytes();
-	}
-
-		@Override
-		public String deserialize(byte[] message) {
-		return new String(message);
-	}
-
-		@Override
-		public TypeInformation<String> getProducedType() {
-		return TypeExtractor.getForClass(String.class);
-	}
-
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SocketProgramITCaseBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SocketProgramITCaseBase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SocketProgramITCaseBase.java
deleted file mode 100644
index d1bd64a..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SocketProgramITCaseBase.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import org.apache.flink.test.testdata.WordCountData;
-import org.apache.flink.util.NetUtils;
-
-import org.junit.Assert;
-
-import java.io.PrintWriter;
-import java.net.ServerSocket;
-import java.net.Socket;
-
-public abstract class SocketProgramITCaseBase extends StreamingProgramTestBase {
-
-	protected static final String HOST = "localhost";
-	protected static Integer port;
-	protected String resultPath;
-
-	private ServerSocket temporarySocket;
-
-	@Override
-	protected void preSubmit() throws Exception {
-		port = NetUtils.getAvailablePort();
-		temporarySocket = createSocket(HOST, port, WordCountData.TEXT);
-		resultPath = getTempDirPath("result");
-	}
-
-	@Override
-	protected void postSubmit() throws Exception {
-		compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, resultPath);
-		temporarySocket.close();
-	}
-
-	public ServerSocket createSocket(String host, int port, String contents) throws Exception {
-		ServerSocket serverSocket = new ServerSocket(port);
-		ServerThread st = new ServerThread(serverSocket, contents);
-		st.start();
-		return serverSocket;
-	}
-
-	private static class ServerThread extends Thread {
-
-		private ServerSocket serverSocket;
-		private String contents;
-		private Thread t;
-
-		public ServerThread(ServerSocket serverSocket, String contents) {
-			this.serverSocket = serverSocket;
-			this.contents = contents;
-			t = new Thread(this);
-		}
-
-		public void waitForAccept() throws Exception {
-			Socket socket = serverSocket.accept();
-			PrintWriter writer = new PrintWriter(socket.getOutputStream(), true);
-			writer.println(contents);
-			writer.close();
-			socket.close();
-		}
-
-		public void run() {
-			try {
-				waitForAccept();
-			} catch (Exception e) {
-				Assert.fail();
-			}
-		}
-
-		@Override
-		public void start() {
-			t.start();
-		}
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
deleted file mode 100644
index 2afdc40..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.common.functions.RichFunction;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.operators.testutils.MockEnvironment;
-import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
-import org.apache.flink.streaming.api.functions.source.EventTimeSourceFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.api.operators.StreamSource;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-
-import static org.mockito.Mockito.*;
-
-public class SourceFunctionUtil<T> {
-
-	public static <T> List<T> runSourceFunction(SourceFunction<T> sourceFunction) throws Exception {
-		List<T> outputs = new ArrayList<T>();
-		
-		if (sourceFunction instanceof RichFunction) {
-
-			AbstractStreamOperator<?> operator = mock(AbstractStreamOperator.class);
-			when(operator.getExecutionConfig()).thenReturn(new ExecutionConfig());
-			
-			RuntimeContext runtimeContext =  new StreamingRuntimeContext(
-					operator,
-					new MockEnvironment("MockTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024),
-					new HashMap<String, Accumulator<?, ?>>());
-			
-			((RichFunction) sourceFunction).setRuntimeContext(runtimeContext);
-
-			((RichFunction) sourceFunction).open(new Configuration());
-		}
-		try {
-			final Output<StreamRecord<T>> collector = new MockOutput<T>(outputs);
-			final Object lockingObject = new Object();
-			SourceFunction.SourceContext<T> ctx;
-			if (sourceFunction instanceof EventTimeSourceFunction) {
-				ctx = new StreamSource.ManualWatermarkContext<T>(lockingObject, collector);
-			} else {
-				ctx = new StreamSource.NonWatermarkContext<T>(lockingObject, collector);
-			}
-			sourceFunction.run(ctx);
-		} catch (Exception e) {
-			throw new RuntimeException("Cannot invoke source.", e);
-		}
-		return outputs;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
deleted file mode 100644
index 4e02f2c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import org.apache.flink.runtime.StreamingMode;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
-import org.apache.flink.test.util.TestBaseUtils;
-
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-
-/**
- * Base class for streaming unit tests that run multiple tests and want to reuse the same
- * Flink cluster. This saves a significant amount of time, since the startup and
- * shutdown of the Flink clusters (including actor systems, etc) usually dominates
- * the execution of the actual tests.
- *
- * To write a unit test against this test base, simply extend it and add
- * one or more regular test methods and retrieve the StreamExecutionEnvironment from
- * the context:
- *
- * <pre>
- *   {@literal @}Test
- *   public void someTest() {
- *       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- *       // test code
- *       env.execute();
- *   }
- *
- *   {@literal @}Test
- *   public void anotherTest() {
- *       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- *       // test code
- *       env.execute();
- *   }
- *
- * </pre>
- */
-public class StreamingMultipleProgramsTestBase extends TestBaseUtils {
-
-	// ------------------------------------------------------------------------
-	//  The mini cluster that is shared across tests
-	// ------------------------------------------------------------------------
-
-	protected static final int DEFAULT_PARALLELISM = 4;
-
-	protected static ForkableFlinkMiniCluster cluster;
-	
-
-	// ------------------------------------------------------------------------
-	//  Cluster setup & teardown
-	// ------------------------------------------------------------------------
-
-	@BeforeClass
-	public static void setup() throws Exception {
-		cluster = TestBaseUtils.startCluster(1, DEFAULT_PARALLELISM, StreamingMode.STREAMING, false, false, true);
-		TestStreamEnvironment.setAsContext(cluster, DEFAULT_PARALLELISM);
-	}
-
-	@AfterClass
-	public static void teardown() throws Exception {
-		TestStreamEnvironment.unsetAsContext();
-		stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java
deleted file mode 100644
index ce3aa86..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.StreamingMode;
-import org.apache.flink.test.util.AbstractTestBase;
-
-import org.junit.Test;
-
-import static org.junit.Assert.fail;
-
-public abstract class StreamingProgramTestBase extends AbstractTestBase {
-
-	protected static final int DEFAULT_PARALLELISM = 4;
-
-	private int parallelism;
-	
-	
-	public StreamingProgramTestBase() {
-		super(new Configuration(), StreamingMode.STREAMING);
-		setParallelism(DEFAULT_PARALLELISM);
-	}
-
-
-	public void setParallelism(int parallelism) {
-		this.parallelism = parallelism;
-		setTaskManagerNumSlots(parallelism);
-	}
-	
-	public int getParallelism() {
-		return parallelism;
-	}
-	
-
-	// --------------------------------------------------------------------------------------------
-	//  Methods to create the test program and for pre- and post- test work
-	// --------------------------------------------------------------------------------------------
-
-	protected abstract void testProgram() throws Exception;
-
-	protected void preSubmit() throws Exception {}
-	
-	protected void postSubmit() throws Exception {}
-	
-	// --------------------------------------------------------------------------------------------
-	//  Test entry point
-	// --------------------------------------------------------------------------------------------
-
-	@Test
-	public void testJob() throws Exception {
-		try {
-			// pre-submit
-			try {
-				preSubmit();
-			}
-			catch (Exception e) {
-				System.err.println(e.getMessage());
-				e.printStackTrace();
-				fail("Pre-submit work caused an error: " + e.getMessage());
-			}
-
-			// prepare the test environment
-			startCluster();
-
-			TestStreamEnvironment.setAsContext(this.executor, getParallelism());
-
-			// call the test program
-			try {
-				testProgram();
-			}
-			catch (Exception e) {
-				System.err.println(e.getMessage());
-				e.printStackTrace();
-				fail("Error while calling the test program: " + e.getMessage());
-			}
-			finally {
-				TestStreamEnvironment.unsetAsContext();
-			}
-
-			// post-submit
-			try {
-				postSubmit();
-			}
-			catch (Exception e) {
-				System.err.println(e.getMessage());
-				e.printStackTrace();
-				fail("Post-submit work caused an error: " + e.getMessage());
-			}
-		}
-		finally {
-			stopCluster();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
deleted file mode 100644
index 0c5cd8f..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.util;
-
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.junit.Assert;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Queue;
-
-/**
- * Utils for working with the various test harnesses.
- */
-public class TestHarnessUtil {
-	/**
-	 * Extracts the StreamRecords from the given output list.
-	 */
-	@SuppressWarnings("unchecked")
-	public static <OUT> List<StreamRecord<OUT>> getStreamRecordsFromOutput(List<Object> output) {
-		List<StreamRecord<OUT>> resultElements = new LinkedList<StreamRecord<OUT>>();
-		for (Object e: output) {
-			if (e instanceof StreamRecord) {
-				resultElements.add((StreamRecord<OUT>) e);
-			}
-		}
-		return resultElements;
-	}
-
-	/**
-	 * Extracts the raw elements from the given output list.
-	 */
-	@SuppressWarnings("unchecked")
-	public static <OUT> List<OUT> getRawElementsFromOutput(Queue<Object> output) {
-		List<OUT> resultElements = new LinkedList<OUT>();
-		for (Object e: output) {
-			if (e instanceof StreamRecord) {
-				resultElements.add(((StreamRecord<OUT>) e).getValue());
-			}
-		}
-		return resultElements;
-	}
-
-	/**
-	 * Compare the two queues containing operator/task output by converting them to an array first.
-	 */
-	public static void assertOutputEquals(String message, Queue<Object> expected, Queue<Object> actual) {
-		Assert.assertArrayEquals(message,
-				expected.toArray(),
-				actual.toArray());
-
-	}
-
-	/**
-	 * Compare the two queues containing operator/task output by converting them to an array first.
-	 */
-	public static void assertOutputEqualsSorted(String message, Queue<Object> expected, Queue<Object> actual, Comparator<Object> comparator) {
-		Object[] sortedExpected = expected.toArray();
-		Object[] sortedActual = actual.toArray();
-
-		Arrays.sort(sortedExpected, comparator);
-		Arrays.sort(sortedActual, comparator);
-
-		Assert.assertArrayEquals(message, sortedExpected, sortedActual);
-
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestListResultSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestListResultSink.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestListResultSink.java
deleted file mode 100644
index 423d08e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestListResultSink.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.TreeSet;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-
-public class TestListResultSink<T> extends RichSinkFunction<T> {
-
-	private static final long serialVersionUID = 1L;
-	private int resultListId;
-
-	public TestListResultSink() {
-		this.resultListId = TestListWrapper.getInstance().createList();
-	}
-
-	@Override
-	public void open(Configuration parameters) throws Exception {
-		super.open(parameters);
-	}
-
-	@Override
-	public void invoke(T value) throws Exception {
-		synchronized (resultList()) {
-			resultList().add(value);
-		}
-	}
-
-	@Override
-	public void close() throws Exception {
-		super.close();
-	}
-
-	@SuppressWarnings("unchecked")
-	private List<T> resultList() {
-		synchronized (TestListWrapper.getInstance()) {
-			return (List<T>) TestListWrapper.getInstance().getList(resultListId);
-		}
-	}
-
-	public List<T> getResult() {
-		synchronized (resultList()) {
-			ArrayList<T> copiedList = new ArrayList<T>(resultList());
-			return copiedList;
-		}
-	}
-
-	public List<T> getSortedResult() {
-		synchronized (resultList()) {
-			TreeSet<T> treeSet = new TreeSet<T>(resultList());
-			ArrayList<T> sortedList = new ArrayList<T>(treeSet);
-			return sortedList;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestListWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestListWrapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestListWrapper.java
deleted file mode 100644
index 751f836..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestListWrapper.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-public class TestListWrapper {
-
-	private static TestListWrapper instance;
-
-	@SuppressWarnings("rawtypes")
-	private List<List<? extends Comparable>> lists;
-
-	@SuppressWarnings("rawtypes")
-	private TestListWrapper() {
-		lists = Collections.synchronizedList(new ArrayList<List<? extends Comparable>>());
-	}
-
-	public static TestListWrapper getInstance() {
-		if (instance == null) {
-			instance = new TestListWrapper();
-		}
-		return instance;
-	}
-
-	/**
-	 * Creates and stores a list, returns with the id.
-	 *
-	 * @return The ID of the list.
-	 */
-	@SuppressWarnings("rawtypes")
-	public int createList() {
-		lists.add(new ArrayList<Comparable>());
-		return lists.size() - 1;
-	}
-
-	public List<?> getList(int listId) {
-		@SuppressWarnings("rawtypes")
-		List<? extends Comparable> list = lists.get(listId);
-		if (list == null) {
-			throw new RuntimeException("No such list.");
-		}
-
-		return list;
-	}
-
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
deleted file mode 100644
index 8cd1e4a..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import com.google.common.base.Preconditions;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
-
-/**
- * A StreamExecutionEnvironment that executes its jobs on a test cluster.
- */
-public class TestStreamEnvironment extends StreamExecutionEnvironment {
-	
-	/** The mini cluster in which this environment executes its jobs */
-	private ForkableFlinkMiniCluster executor;
-	
-
-	public TestStreamEnvironment(ForkableFlinkMiniCluster executor, int parallelism) {
-		this.executor = Preconditions.checkNotNull(executor);
-		setParallelism(parallelism);
-	}
-	
-	@Override
-	public JobExecutionResult execute(String jobName) throws Exception {
-		final JobGraph jobGraph = getStreamGraph().getJobGraph(jobName);
-		return executor.submitJobAndWait(jobGraph, false);
-	}
-
-	// ------------------------------------------------------------------------
-
-	/**
-	 * Sets the streaming context environment to a TestStreamEnvironment that runs its programs on
-	 * the given cluster with the given default parallelism.
-	 * 
-	 * @param cluster The test cluster to run the test program on.
-	 * @param parallelism The default parallelism for the test programs.
-	 */
-	public static void setAsContext(final ForkableFlinkMiniCluster cluster, final int parallelism) {
-		
-		StreamExecutionEnvironmentFactory factory = new StreamExecutionEnvironmentFactory() {
-			@Override
-			public StreamExecutionEnvironment createExecutionEnvironment() {
-				return new TestStreamEnvironment(cluster, parallelism);
-			}
-		};
-
-		initializeContextEnvironment(factory);
-	}
-
-	/**
-	 * Resets the streaming context environment to null.
-	 */
-	public static void unsetAsContext() {
-		resetContextEnvironment();
-	} 
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
deleted file mode 100644
index c586db3..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.operators.testutils.MockEnvironment;
-import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
-import org.apache.flink.runtime.state.StateBackend;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.mockito.stubbing.OngoingStubbing;
-
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-/**
- * A test harness for testing a {@link TwoInputStreamOperator}.
- *
- * <p>
- * This mock task provides the operator with a basic runtime context and allows pushing elements
- * and watermarks into the operator. {@link java.util.Deque}s containing the emitted elements
- * and watermarks can be retrieved. you are free to modify these.
- */
-public class TwoInputStreamOperatorTestHarness<IN1, IN2, OUT> {
-
-	TwoInputStreamOperator<IN1, IN2, OUT> operator;
-
-	final ConcurrentLinkedQueue<Object> outputList;
-
-	final ExecutionConfig executionConfig;
-
-	final Object checkpointLock;
-
-	public TwoInputStreamOperatorTestHarness(TwoInputStreamOperator<IN1, IN2, OUT> operator) {
-		this(operator, new StreamConfig(new Configuration()));
-	}
-		
-	public TwoInputStreamOperatorTestHarness(TwoInputStreamOperator<IN1, IN2, OUT> operator, StreamConfig config) {
-		this.operator = operator;
-		this.outputList = new ConcurrentLinkedQueue<Object>();
-		this.executionConfig = new ExecutionConfig();
-		this.checkpointLock = new Object();
-
-		Environment env = new MockEnvironment("MockTwoInputTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024);
-		StreamTask<?, ?> mockTask = mock(StreamTask.class);
-		when(mockTask.getName()).thenReturn("Mock Task");
-		when(mockTask.getCheckpointLock()).thenReturn(checkpointLock);
-		when(mockTask.getConfiguration()).thenReturn(config);
-		when(mockTask.getEnvironment()).thenReturn(env);
-		when(mockTask.getExecutionConfig()).thenReturn(executionConfig);
-
-		// ugly Java generic hacks
-		@SuppressWarnings("unchecked")
-		OngoingStubbing<StateBackend<?>> stubbing =
-				(OngoingStubbing<StateBackend<?>>) (OngoingStubbing<?>) when(mockTask.getStateBackend());
-		stubbing.thenReturn(MemoryStateBackend.defaultInstance());
-
-		operator.setup(mockTask, new StreamConfig(new Configuration()), new MockOutput());
-	}
-
-	/**
-	 * Get all the output from the task. This contains StreamRecords and Events interleaved. Use
-	 * {@link org.apache.flink.streaming.util.TestHarnessUtil#getStreamRecordsFromOutput(java.util.List)}
-	 * to extract only the StreamRecords.
-	 */
-	public ConcurrentLinkedQueue<Object> getOutput() {
-		return outputList;
-	}
-
-
-	/**
-	 * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#open()}.
-	 */
-	public void open() throws Exception {
-		operator.open();
-	}
-
-	/**
-	 * Calls close on the operator.
-	 */
-	public void close() throws Exception {
-		operator.close();
-	}
-
-	public void processElement1(StreamRecord<IN1> element) throws Exception {
-		operator.processElement1(element);
-	}
-
-	public void processElement2(StreamRecord<IN2> element) throws Exception {
-		operator.processElement2(element);
-	}
-
-	public void processWatermark1(Watermark mark) throws Exception {
-		operator.processWatermark1(mark);
-	}
-
-	public void processWatermark2(Watermark mark) throws Exception {
-		operator.processWatermark2(mark);
-	}
-
-	private class MockOutput implements Output<StreamRecord<OUT>> {
-
-		private TypeSerializer<OUT> outputSerializer;
-
-		@Override
-		@SuppressWarnings("unchecked")
-		public void emitWatermark(Watermark mark) {
-			outputList.add(mark);
-		}
-
-		@Override
-		@SuppressWarnings("unchecked")
-		public void collect(StreamRecord<OUT> element) {
-			if (outputSerializer == null) {
-				outputSerializer = TypeExtractor.getForObject(element.getValue()).createSerializer(executionConfig);
-			}
-			outputList.add(new StreamRecord<>(outputSerializer.copy(element.getValue()),
-					element.getTimestamp()));
-		}
-
-		@Override
-		public void close() {
-			// ignore
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TypeInformationSerializationSchemaTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TypeInformationSerializationSchemaTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TypeInformationSerializationSchemaTest.java
deleted file mode 100644
index 1c0f850..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TypeInformationSerializationSchemaTest.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.core.testutils.CommonTestUtils;
-import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
-
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.List;
-
-import static org.junit.Assert.*;
-
-public class TypeInformationSerializationSchemaTest {
-	
-	@Test
-	public void testDeSerialization() {
-		try {
-			TypeInformation<MyPOJO> info = TypeExtractor.getForClass(MyPOJO.class);
-			
-			TypeInformationSerializationSchema<MyPOJO> schema =
-					new TypeInformationSerializationSchema<MyPOJO>(info, new ExecutionConfig());
-			
-			MyPOJO[] types = {
-					new MyPOJO(72, new Date(763784523L), new Date(88234L)),
-					new MyPOJO(-1, new Date(11111111111111L)),
-					new MyPOJO(42),
-					new MyPOJO(17, new Date(222763784523L))
-			};
-			
-			for (MyPOJO val : types) {
-				byte[] serialized = schema.serialize(val);
-				MyPOJO deser = schema.deserialize(serialized);
-				assertEquals(val, deser);
-			}
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	@Test
-	public void testSerializability() {
-		try {
-			TypeInformation<MyPOJO> info = TypeExtractor.getForClass(MyPOJO.class);
-			TypeInformationSerializationSchema<MyPOJO> schema =
-					new TypeInformationSerializationSchema<MyPOJO>(info, new ExecutionConfig());
-
-			// this needs to succeed
-			CommonTestUtils.createCopySerializable(schema);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-	
-	// ------------------------------------------------------------------------
-	//  Test data types
-	// ------------------------------------------------------------------------
-	
-	public static class MyPOJO {
-		
-		public int aField;
-		public List<Date> aList;
-		
-		public MyPOJO() {}
-
-		public MyPOJO(int iVal, Date... dates) {
-			this.aField = iVal;
-			this.aList = new ArrayList<>(Arrays.asList(dates));
-		}
-
-		@Override
-		public int hashCode() {
-			return aField;
-		}
-
-		@Override
-		public boolean equals(Object obj) {
-			if (obj instanceof MyPOJO) {
-				MyPOJO that = (MyPOJO) obj;
-				return this.aField == that.aField && (this.aList == null ? 
-						that.aList == null :
-						that.aList != null && this.aList.equals(that.aList));
-			}
-			return super.equals(obj);
-		}
-
-		@Override
-		public String toString() {
-			return String.format("MyPOJO " + aField + " " + aList);
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/keys/ArrayKeySelectorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/keys/ArrayKeySelectorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/keys/ArrayKeySelectorTest.java
deleted file mode 100644
index 63375a7..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/keys/ArrayKeySelectorTest.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util.keys;
-
-import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-public class ArrayKeySelectorTest {
-
-	@Test
-	public void testObjectArrays() {
-		try {
-			String[] array1 = { "a", "b", "c", "d", "e" };
-			String[] array2 = { "v", "w", "x", "y", "z" };
-			
-			KeySelectorUtil.ArrayKeySelector<String[]> singleFieldSelector =
-					KeySelectorUtil.getSelectorForArray(new int[] {1}, BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO);
-			
-			assertEquals(new Tuple1<>("b"), singleFieldSelector.getKey(array1));
-			assertEquals(new Tuple1<>("w"), singleFieldSelector.getKey(array2));
-
-			KeySelectorUtil.ArrayKeySelector<String[]> twoFieldsSelector =
-					KeySelectorUtil.getSelectorForArray(new int[] {3, 0}, BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO);
-			
-			assertEquals(new Tuple2<>("d", "a"), twoFieldsSelector.getKey(array1));
-			assertEquals(new Tuple2<>("y", "v"), twoFieldsSelector.getKey(array2));
-			
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-
-	@Test
-	public void testPrimitiveArrays() {
-		try {
-			int[] array1 = { 1, 2, 3, 4, 5 };
-			int[] array2 = { -5, -4, -3, -2, -1, 0 };
-
-			KeySelectorUtil.ArrayKeySelector<int[]> singleFieldSelector = 
-					KeySelectorUtil.getSelectorForArray(new int[] {1}, PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO);
-
-			assertEquals(new Tuple1<>(2), singleFieldSelector.getKey(array1));
-			assertEquals(new Tuple1<>(-4), singleFieldSelector.getKey(array2));
-
-			KeySelectorUtil.ArrayKeySelector<int[]> twoFieldsSelector =
-					KeySelectorUtil.getSelectorForArray(new int[] {3, 0}, PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO);
-			
-			assertEquals(new Tuple2<>(4, 1), twoFieldsSelector.getKey(array1));
-			assertEquals(new Tuple2<>(-2, -5), twoFieldsSelector.getKey(array2));
-
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail(e.getMessage());
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/resources/log4j-test.properties b/flink-staging/flink-streaming/flink-streaming-core/src/test/resources/log4j-test.properties
deleted file mode 100644
index 0b686e5..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/resources/log4j-test.properties
+++ /dev/null
@@ -1,27 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-# Set root logger level to DEBUG and its only appender to A1.
-log4j.rootLogger=OFF, A1
-
-# A1 is set to be a ConsoleAppender.
-log4j.appender.A1=org.apache.log4j.ConsoleAppender
-
-# A1 uses PatternLayout.
-log4j.appender.A1.layout=org.apache.log4j.PatternLayout
-log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/resources/log4j.properties b/flink-staging/flink-streaming/flink-streaming-core/src/test/resources/log4j.properties
deleted file mode 100644
index ed2bbcb..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,27 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-# This file ensures that tests executed from the IDE show log output
-
-log4j.rootLogger=OFF, console
-
-# Log all infos in the given file
-log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.target = System.err
-log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/resources/logback-test.xml b/flink-staging/flink-streaming/flink-streaming-core/src/test/resources/logback-test.xml
deleted file mode 100644
index 4f56748..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/resources/logback-test.xml
+++ /dev/null
@@ -1,30 +0,0 @@
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one
-  ~ or more contributor license agreements.  See the NOTICE file
-  ~ distributed with this work for additional information
-  ~ regarding copyright ownership.  The ASF licenses this file
-  ~ to you under the Apache License, Version 2.0 (the
-  ~ "License"); you may not use this file except in compliance
-  ~ with the License.  You may obtain a copy of the License at
-  ~
-  ~     http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-
-<configuration>
-    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
-        <encoder>
-            <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
-        </encoder>
-    </appender>
-
-    <root level="WARN">
-        <appender-ref ref="STDOUT"/>
-    </root>
-    <logger name="org.apache.flink.runtime.client.JobClient" level="OFF"/>
-</configuration>
\ No newline at end of file