You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/21 11:03:35 UTC
[19/51] [partial] flink git commit: [FLINK-2877] Move Streaming API
out of Staging package
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/EvenOddOutputSelector.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/EvenOddOutputSelector.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/EvenOddOutputSelector.java
deleted file mode 100644
index 9820ef8..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/EvenOddOutputSelector.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.util;
-
-import org.apache.flink.streaming.api.collector.selector.OutputSelector;
-
-import java.util.Arrays;
-
-public class EvenOddOutputSelector implements OutputSelector<Integer> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Iterable<String> select(Integer value) {
- return value % 2 == 0 ? Arrays.asList("even") : Arrays.asList("odd");
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/FieldAccessorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/FieldAccessorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/FieldAccessorTest.java
deleted file mode 100644
index d35089a..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/FieldAccessorTest.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-// This only tests a fraction of FieldAccessor. The other parts are tested indirectly by AggregationFunctionTest.
-public class FieldAccessorTest {
-
- @Test
- @SuppressWarnings("unchecked")
- public void arrayFieldAccessorTest() {
- int[] a = new int[]{3,5};
- FieldAccessor<int[], Integer> fieldAccessor =
- (FieldAccessor<int[], Integer>) (Object)
- FieldAccessor.create(1, PrimitiveArrayTypeInfo.getInfoFor(a.getClass()), null);
-
- assertEquals(Integer.class, fieldAccessor.getFieldType().getTypeClass());
-
- assertEquals((Integer)a[1], fieldAccessor.get(a));
-
- a = fieldAccessor.set(a, 6);
- assertEquals((Integer)a[1], fieldAccessor.get(a));
-
-
-
- Integer[] b = new Integer[]{3,5};
- FieldAccessor<Integer[], Integer> fieldAccessor2 =
- (FieldAccessor<Integer[], Integer>) (Object)
- FieldAccessor.create(1, BasicArrayTypeInfo.getInfoFor(b.getClass()), null);
-
- assertEquals(Integer.class, fieldAccessor2.getFieldType().getTypeClass());
-
- assertEquals((Integer)b[1], fieldAccessor2.get(b));
-
- b = fieldAccessor2.set(b, 6);
- assertEquals((Integer)b[1], fieldAccessor2.get(b));
- }
-
- @Test
- @SuppressWarnings("unchecked")
- public void tupleFieldAccessorOutOfBoundsTest() {
- try {
- FieldAccessor<Tuple2<Integer, Integer>, Integer> fieldAccessor =
- (FieldAccessor<Tuple2<Integer, Integer>, Integer>) (Object)
- FieldAccessor.create(2, TupleTypeInfo.getBasicTupleTypeInfo(Integer.class, Integer.class),
- null);
- fail();
- } catch (IndexOutOfBoundsException e) {
- // Nothing to do here
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
deleted file mode 100644
index 0c708c6..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.concurrent.Callable;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.operators.testutils.MockEnvironment;
-import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.runtime.state.StateBackend;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
-import org.apache.flink.streaming.runtime.operators.Triggerable;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-import org.mockito.stubbing.OngoingStubbing;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class MockContext<IN, OUT> {
-
- private List<OUT> outputs;
-
- private MockOutput<OUT> output;
-
- public MockContext(Collection<IN> inputs) {
- if (inputs.isEmpty()) {
- throw new RuntimeException("Inputs must not be empty");
- }
-
- outputs = new ArrayList<OUT>();
- output = new MockOutput<OUT>(outputs);
- }
-
- public List<OUT> getOutputs() {
- return outputs;
- }
-
- public Output<StreamRecord<OUT>> getOutput() {
- return output;
- }
-
- public static <IN, OUT> List<OUT> createAndExecute(OneInputStreamOperator<IN, OUT> operator, List<IN> inputs) {
- return createAndExecuteForKeyedStream(operator, inputs, null, null);
- }
-
- public static <IN, OUT, KEY> List<OUT> createAndExecuteForKeyedStream(
- OneInputStreamOperator<IN, OUT> operator, List<IN> inputs,
- KeySelector<IN, KEY> keySelector, TypeInformation<KEY> keyType) {
-
- MockContext<IN, OUT> mockContext = new MockContext<IN, OUT>(inputs);
-
- StreamConfig config = new StreamConfig(new Configuration());
- if (keySelector != null && keyType != null) {
- config.setStateKeySerializer(keyType.createSerializer(new ExecutionConfig()));
- config.setStatePartitioner(keySelector);
- }
-
- final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
- final Object lock = new Object();
- final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);
-
- operator.setup(mockTask, config, mockContext.output);
- try {
- operator.open();
-
- StreamRecord<IN> record = new StreamRecord<IN>(null);
- for (IN in: inputs) {
- record = record.replace(in);
- synchronized (lock) {
- operator.setKeyContextElement(record);
- operator.processElement(record);
- }
- }
-
- operator.close();
- } catch (Exception e) {
- throw new RuntimeException("Cannot invoke operator.", e);
- } finally {
- timerService.shutdownNow();
- }
-
- return mockContext.getOutputs();
- }
-
- private static StreamTask<?, ?> createMockTaskWithTimer(
- final ScheduledExecutorService timerService, final Object lock)
- {
- StreamTask<?, ?> task = mock(StreamTask.class);
- when(task.getAccumulatorMap()).thenReturn(new HashMap<String, Accumulator<?, ?>>());
- when(task.getName()).thenReturn("Test task name");
- when(task.getExecutionConfig()).thenReturn(new ExecutionConfig());
- when(task.getEnvironment()).thenReturn(new MockEnvironment("MockTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024));
- when(task.getCheckpointLock()).thenReturn(lock);
-
- doAnswer(new Answer<Void>() {
- @Override
- public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
- final Long timestamp = (Long) invocationOnMock.getArguments()[0];
- final Triggerable target = (Triggerable) invocationOnMock.getArguments()[1];
- timerService.schedule(
- new Callable<Object>() {
- @Override
- public Object call() throws Exception {
- synchronized (lock) {
- target.trigger(timestamp);
- }
- return null;
- }
- },
- timestamp - System.currentTimeMillis(),
- TimeUnit.MILLISECONDS);
- return null;
- }
- }).when(task).registerTimer(anyLong(), any(Triggerable.class));
-
- // ugly Java generic hacks to get the generic state backend into the mock
- @SuppressWarnings("unchecked")
- OngoingStubbing<StateBackend<?>> stubbing =
- (OngoingStubbing<StateBackend<?>>) (OngoingStubbing<?>) when(task.getStateBackend());
- stubbing.thenReturn(MemoryStateBackend.defaultInstance());
-
- return task;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockOutput.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockOutput.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockOutput.java
deleted file mode 100644
index 5371ba0..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockOutput.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import java.io.Serializable;
-import java.util.Collection;
-
-import org.apache.commons.lang3.SerializationUtils;
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-
-public class MockOutput<T> implements Output<StreamRecord<T>> {
- private Collection<T> outputs;
-
- public MockOutput(Collection<T> outputs) {
- this.outputs = outputs;
- }
-
- @Override
- public void collect(StreamRecord<T> record) {
- T copied = SerializationUtils.deserialize(SerializationUtils
- .serialize((Serializable) record.getValue()));
- outputs.add(copied);
- }
-
- @Override
- public void emitWatermark(Watermark mark) {
- throw new RuntimeException("THIS MUST BE IMPLEMENTED");
- }
-
- @Override
- public void close() {
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/NoOpIntMap.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/NoOpIntMap.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/NoOpIntMap.java
deleted file mode 100644
index bcb5691..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/NoOpIntMap.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.util;
-
-import org.apache.flink.api.common.functions.MapFunction;
-
-public class NoOpIntMap implements MapFunction<Integer, Integer> {
- private static final long serialVersionUID = 1L;
-
- public Integer map(Integer value) throws Exception {
- return value;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/NoOpSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/NoOpSink.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/NoOpSink.java
deleted file mode 100644
index d398121..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/NoOpSink.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.util;
-
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-
-public final class NoOpSink<T> extends RichSinkFunction<T> {
- public void invoke(T tuple) {
-
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
deleted file mode 100644
index 01f95bc..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/OneInputStreamOperatorTestHarness.java
+++ /dev/null
@@ -1,159 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.util;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.ClosureCleaner;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.operators.testutils.MockEnvironment;
-import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.runtime.state.StateBackend;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.mockito.stubbing.OngoingStubbing;
-
-import java.util.Collection;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-/**
- * A test harness for testing a {@link OneInputStreamOperator}.
- *
- * <p>
- * This mock task provides the operator with a basic runtime context and allows pushing elements
- * and watermarks into the operator. {@link java.util.Deque}s containing the emitted elements
- * and watermarks can be retrieved. You are free to modify these.
- */
-public class OneInputStreamOperatorTestHarness<IN, OUT> {
-
- final OneInputStreamOperator<IN, OUT> operator;
-
- final ConcurrentLinkedQueue<Object> outputList;
-
- final StreamConfig config;
-
- final ExecutionConfig executionConfig;
-
- final Object checkpointLock;
-
-
- public OneInputStreamOperatorTestHarness(OneInputStreamOperator<IN, OUT> operator) {
- this.operator = operator;
- this.outputList = new ConcurrentLinkedQueue<Object>();
- this.config = new StreamConfig(new Configuration());
- this.executionConfig = new ExecutionConfig();
- this.checkpointLock = new Object();
-
- Environment env = new MockEnvironment("MockTwoInputTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024);
- StreamTask<?, ?> mockTask = mock(StreamTask.class);
- when(mockTask.getName()).thenReturn("Mock Task");
- when(mockTask.getCheckpointLock()).thenReturn(checkpointLock);
- when(mockTask.getConfiguration()).thenReturn(config);
- when(mockTask.getEnvironment()).thenReturn(env);
- when(mockTask.getExecutionConfig()).thenReturn(executionConfig);
-
- // ugly Java generic hacks
- @SuppressWarnings("unchecked")
- OngoingStubbing<StateBackend<?>> stubbing =
- (OngoingStubbing<StateBackend<?>>) (OngoingStubbing<?>) when(mockTask.getStateBackend());
- stubbing.thenReturn(MemoryStateBackend.defaultInstance());
-
- operator.setup(mockTask, config, new MockOutput());
- }
-
- public <K> void configureForKeyedStream(KeySelector<IN, K> keySelector, TypeInformation<K> keyType) {
- ClosureCleaner.clean(keySelector, false);
- config.setStatePartitioner(keySelector);
- config.setStateKeySerializer(keyType.createSerializer(executionConfig));
- }
-
- /**
- * Get all the output from the task. This contains StreamRecords and Events interleaved. Use
- * {@link org.apache.flink.streaming.util.TestHarnessUtil#getStreamRecordsFromOutput(java.util.List)}
- * to extract only the StreamRecords.
- */
- public ConcurrentLinkedQueue<Object> getOutput() {
- return outputList;
- }
-
- /**
- * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#open()}
- */
- public void open() throws Exception {
- operator.open();
- }
-
- /**
- * Calls close on the operator.
- */
- public void close() throws Exception {
- operator.close();
- }
-
- public void processElement(StreamRecord<IN> element) throws Exception {
- operator.setKeyContextElement(element);
- operator.processElement(element);
- }
-
- public void processElements(Collection<StreamRecord<IN>> elements) throws Exception {
- for (StreamRecord<IN> element: elements) {
- operator.setKeyContextElement(element);
- operator.processElement(element);
- }
- }
-
- public void processWatermark(Watermark mark) throws Exception {
- operator.processWatermark(mark);
- }
-
- private class MockOutput implements Output<StreamRecord<OUT>> {
-
- private TypeSerializer<OUT> outputSerializer;
-
- @Override
- public void emitWatermark(Watermark mark) {
- outputList.add(mark);
- }
-
- @Override
- public void collect(StreamRecord<OUT> element) {
- if (outputSerializer == null) {
- outputSerializer = TypeExtractor.getForObject(element.getValue()).createSerializer(executionConfig);
- }
- outputList.add(new StreamRecord<OUT>(outputSerializer.copy(element.getValue()),
- element.getTimestamp()));
- }
-
- @Override
- public void close() {
- // ignore
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/ReceiveCheckNoOpSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/ReceiveCheckNoOpSink.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/ReceiveCheckNoOpSink.java
deleted file mode 100644
index a46ff55..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/ReceiveCheckNoOpSink.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.util;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.junit.Assert.assertTrue;
-
-public final class ReceiveCheckNoOpSink<T> extends RichSinkFunction<T> {
- private List<T> received;
-
- public void invoke(T tuple) {
- received.add(tuple);
- }
-
- public void open(Configuration conf) {
- received = new ArrayList<T>();
- }
-
- public void close() {
- assertTrue(received.size() > 0);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SocketOutputTestBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SocketOutputTestBase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SocketOutputTestBase.java
deleted file mode 100644
index 4ded0fa..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SocketOutputTestBase.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.util.serialization.DeserializationSchema;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
-import org.apache.flink.test.testdata.WordCountData;
-import org.apache.flink.util.NetUtils;
-
-import org.junit.Assert;
-
-import java.io.BufferedReader;
-import java.io.InputStreamReader;
-import java.net.ServerSocket;
-import java.net.Socket;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Set;
-
-/**
- * Test base for streaming programs relying on an open server socket to write to.
- */
-public abstract class SocketOutputTestBase extends StreamingProgramTestBase {
-
- protected static final String HOST = "localhost";
- protected static Integer port;
- protected Set<String> dataReadFromSocket = new HashSet<String>();
-
- @Override
- protected void preSubmit() throws Exception {
- port = NetUtils.getAvailablePort();
- temporarySocket = createLocalSocket(port);
- }
-
- @Override
- protected void postSubmit() throws Exception {
- Set<String> expectedData = new HashSet<String>(Arrays.asList(WordCountData.STREAMING_COUNTS_AS_TUPLES.split("\n")));
- Assert.assertEquals(expectedData, dataReadFromSocket);
- temporarySocket.close();
- }
-
- protected ServerSocket temporarySocket;
-
- public ServerSocket createLocalSocket(int port) throws Exception {
- ServerSocket serverSocket = new ServerSocket(port);
- ServerThread st = new ServerThread(serverSocket);
- st.start();
- return serverSocket;
- }
-
- protected class ServerThread extends Thread {
-
- private ServerSocket serverSocket;
- private Thread t;
-
- public ServerThread(ServerSocket serverSocket) {
- this.serverSocket = serverSocket;
- t = new Thread(this);
- }
-
- public void waitForAccept() throws Exception {
- Socket socket = serverSocket.accept();
- BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
- DeserializationSchema<String> schema = new DummyStringSchema();
- String rawData = in.readLine();
- while (rawData != null){
- String string = schema.deserialize(rawData.getBytes());
- dataReadFromSocket.add(string);
- rawData = in.readLine();
- }
- socket.close();
- }
-
- public void run() {
- try {
- waitForAccept();
- } catch (Exception e) {
- Assert.fail();
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public void start() {
- t.start();
- }
- }
-
- public static class DummyStringSchema implements DeserializationSchema<String>, SerializationSchema<String, byte[]>{
- private static final long serialVersionUID = 1L;
-
- @Override
- public boolean isEndOfStream(String nextElement) {
- return nextElement.equals("q");
- }
-
- @Override
- public byte[] serialize(String element) {
- return element.getBytes();
- }
-
- @Override
- public String deserialize(byte[] message) {
- return new String(message);
- }
-
- @Override
- public TypeInformation<String> getProducedType() {
- return TypeExtractor.getForClass(String.class);
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SocketProgramITCaseBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SocketProgramITCaseBase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SocketProgramITCaseBase.java
deleted file mode 100644
index d1bd64a..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SocketProgramITCaseBase.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import org.apache.flink.test.testdata.WordCountData;
-import org.apache.flink.util.NetUtils;
-
-import org.junit.Assert;
-
-import java.io.PrintWriter;
-import java.net.ServerSocket;
-import java.net.Socket;
-
-public abstract class SocketProgramITCaseBase extends StreamingProgramTestBase {
-
- protected static final String HOST = "localhost";
- protected static Integer port;
- protected String resultPath;
-
- private ServerSocket temporarySocket;
-
- @Override
- protected void preSubmit() throws Exception {
- port = NetUtils.getAvailablePort();
- temporarySocket = createSocket(HOST, port, WordCountData.TEXT);
- resultPath = getTempDirPath("result");
- }
-
- @Override
- protected void postSubmit() throws Exception {
- compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, resultPath);
- temporarySocket.close();
- }
-
- public ServerSocket createSocket(String host, int port, String contents) throws Exception {
- ServerSocket serverSocket = new ServerSocket(port);
- ServerThread st = new ServerThread(serverSocket, contents);
- st.start();
- return serverSocket;
- }
-
- private static class ServerThread extends Thread {
-
- private ServerSocket serverSocket;
- private String contents;
- private Thread t;
-
- public ServerThread(ServerSocket serverSocket, String contents) {
- this.serverSocket = serverSocket;
- this.contents = contents;
- t = new Thread(this);
- }
-
- public void waitForAccept() throws Exception {
- Socket socket = serverSocket.accept();
- PrintWriter writer = new PrintWriter(socket.getOutputStream(), true);
- writer.println(contents);
- writer.close();
- socket.close();
- }
-
- public void run() {
- try {
- waitForAccept();
- } catch (Exception e) {
- Assert.fail();
- }
- }
-
- @Override
- public void start() {
- t.start();
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
deleted file mode 100644
index 2afdc40..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/SourceFunctionUtil.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.accumulators.Accumulator;
-import org.apache.flink.api.common.functions.RichFunction;
-import org.apache.flink.api.common.functions.RuntimeContext;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.operators.testutils.MockEnvironment;
-import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
-import org.apache.flink.streaming.api.functions.source.EventTimeSourceFunction;
-import org.apache.flink.streaming.api.functions.source.SourceFunction;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.api.operators.StreamSource;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
-
-import static org.mockito.Mockito.*;
-
-public class SourceFunctionUtil<T> {
-
- public static <T> List<T> runSourceFunction(SourceFunction<T> sourceFunction) throws Exception {
- List<T> outputs = new ArrayList<T>();
-
- if (sourceFunction instanceof RichFunction) {
-
- AbstractStreamOperator<?> operator = mock(AbstractStreamOperator.class);
- when(operator.getExecutionConfig()).thenReturn(new ExecutionConfig());
-
- RuntimeContext runtimeContext = new StreamingRuntimeContext(
- operator,
- new MockEnvironment("MockTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024),
- new HashMap<String, Accumulator<?, ?>>());
-
- ((RichFunction) sourceFunction).setRuntimeContext(runtimeContext);
-
- ((RichFunction) sourceFunction).open(new Configuration());
- }
- try {
- final Output<StreamRecord<T>> collector = new MockOutput<T>(outputs);
- final Object lockingObject = new Object();
- SourceFunction.SourceContext<T> ctx;
- if (sourceFunction instanceof EventTimeSourceFunction) {
- ctx = new StreamSource.ManualWatermarkContext<T>(lockingObject, collector);
- } else {
- ctx = new StreamSource.NonWatermarkContext<T>(lockingObject, collector);
- }
- sourceFunction.run(ctx);
- } catch (Exception e) {
- throw new RuntimeException("Cannot invoke source.", e);
- }
- return outputs;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
deleted file mode 100644
index 4e02f2c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingMultipleProgramsTestBase.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import org.apache.flink.runtime.StreamingMode;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
-import org.apache.flink.test.util.TestBaseUtils;
-
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-
-/**
- * Base class for streaming unit tests that run multiple tests and want to reuse the same
- * Flink cluster. This saves a significant amount of time, since the startup and
- * shutdown of the Flink clusters (including actor systems, etc) usually dominates
- * the execution of the actual tests.
- *
- * To write a unit test against this test base, simply extend it and add
- * one or more regular test methods and retrieve the StreamExecutionEnvironment from
- * the context:
- *
- * <pre>
- * {@literal @}Test
- * public void someTest() {
- * StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- * // test code
- * env.execute();
- * }
- *
- * {@literal @}Test
- * public void anotherTest() {
- * StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
- * // test code
- * env.execute();
- * }
- *
- * </pre>
- */
-public class StreamingMultipleProgramsTestBase extends TestBaseUtils {
-
- // ------------------------------------------------------------------------
- // The mini cluster that is shared across tests
- // ------------------------------------------------------------------------
-
- protected static final int DEFAULT_PARALLELISM = 4;
-
- protected static ForkableFlinkMiniCluster cluster;
-
-
- // ------------------------------------------------------------------------
- // Cluster setup & teardown
- // ------------------------------------------------------------------------
-
- @BeforeClass
- public static void setup() throws Exception {
- cluster = TestBaseUtils.startCluster(1, DEFAULT_PARALLELISM, StreamingMode.STREAMING, false, false, true);
- TestStreamEnvironment.setAsContext(cluster, DEFAULT_PARALLELISM);
- }
-
- @AfterClass
- public static void teardown() throws Exception {
- TestStreamEnvironment.unsetAsContext();
- stopCluster(cluster, TestBaseUtils.DEFAULT_TIMEOUT);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java
deleted file mode 100644
index ce3aa86..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/StreamingProgramTestBase.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.StreamingMode;
-import org.apache.flink.test.util.AbstractTestBase;
-
-import org.junit.Test;
-
-import static org.junit.Assert.fail;
-
-public abstract class StreamingProgramTestBase extends AbstractTestBase {
-
- protected static final int DEFAULT_PARALLELISM = 4;
-
- private int parallelism;
-
-
- public StreamingProgramTestBase() {
- super(new Configuration(), StreamingMode.STREAMING);
- setParallelism(DEFAULT_PARALLELISM);
- }
-
-
- public void setParallelism(int parallelism) {
- this.parallelism = parallelism;
- setTaskManagerNumSlots(parallelism);
- }
-
- public int getParallelism() {
- return parallelism;
- }
-
-
- // --------------------------------------------------------------------------------------------
- // Methods to create the test program and for pre- and post- test work
- // --------------------------------------------------------------------------------------------
-
- protected abstract void testProgram() throws Exception;
-
- protected void preSubmit() throws Exception {}
-
- protected void postSubmit() throws Exception {}
-
- // --------------------------------------------------------------------------------------------
- // Test entry point
- // --------------------------------------------------------------------------------------------
-
- @Test
- public void testJob() throws Exception {
- try {
- // pre-submit
- try {
- preSubmit();
- }
- catch (Exception e) {
- System.err.println(e.getMessage());
- e.printStackTrace();
- fail("Pre-submit work caused an error: " + e.getMessage());
- }
-
- // prepare the test environment
- startCluster();
-
- TestStreamEnvironment.setAsContext(this.executor, getParallelism());
-
- // call the test program
- try {
- testProgram();
- }
- catch (Exception e) {
- System.err.println(e.getMessage());
- e.printStackTrace();
- fail("Error while calling the test program: " + e.getMessage());
- }
- finally {
- TestStreamEnvironment.unsetAsContext();
- }
-
- // post-submit
- try {
- postSubmit();
- }
- catch (Exception e) {
- System.err.println(e.getMessage());
- e.printStackTrace();
- fail("Post-submit work caused an error: " + e.getMessage());
- }
- }
- finally {
- stopCluster();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
deleted file mode 100644
index 0c5cd8f..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.streaming.util;
-
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.junit.Assert;
-
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Queue;
-
-/**
- * Utils for working with the various test harnesses.
- */
-public class TestHarnessUtil {
- /**
- * Extracts the StreamRecords from the given output list.
- */
- @SuppressWarnings("unchecked")
- public static <OUT> List<StreamRecord<OUT>> getStreamRecordsFromOutput(List<Object> output) {
- List<StreamRecord<OUT>> resultElements = new LinkedList<StreamRecord<OUT>>();
- for (Object e: output) {
- if (e instanceof StreamRecord) {
- resultElements.add((StreamRecord<OUT>) e);
- }
- }
- return resultElements;
- }
-
- /**
- * Extracts the raw elements from the given output list.
- */
- @SuppressWarnings("unchecked")
- public static <OUT> List<OUT> getRawElementsFromOutput(Queue<Object> output) {
- List<OUT> resultElements = new LinkedList<OUT>();
- for (Object e: output) {
- if (e instanceof StreamRecord) {
- resultElements.add(((StreamRecord<OUT>) e).getValue());
- }
- }
- return resultElements;
- }
-
- /**
- * Compare the two queues containing operator/task output by converting them to an array first.
- */
- public static void assertOutputEquals(String message, Queue<Object> expected, Queue<Object> actual) {
- Assert.assertArrayEquals(message,
- expected.toArray(),
- actual.toArray());
-
- }
-
- /**
- * Compare the two queues containing operator/task output by converting them to an array first.
- */
- public static void assertOutputEqualsSorted(String message, Queue<Object> expected, Queue<Object> actual, Comparator<Object> comparator) {
- Object[] sortedExpected = expected.toArray();
- Object[] sortedActual = actual.toArray();
-
- Arrays.sort(sortedExpected, comparator);
- Arrays.sort(sortedActual, comparator);
-
- Assert.assertArrayEquals(message, sortedExpected, sortedActual);
-
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestListResultSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestListResultSink.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestListResultSink.java
deleted file mode 100644
index 423d08e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestListResultSink.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.TreeSet;
-
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
-
-public class TestListResultSink<T> extends RichSinkFunction<T> {
-
- private static final long serialVersionUID = 1L;
- private int resultListId;
-
- public TestListResultSink() {
- this.resultListId = TestListWrapper.getInstance().createList();
- }
-
- @Override
- public void open(Configuration parameters) throws Exception {
- super.open(parameters);
- }
-
- @Override
- public void invoke(T value) throws Exception {
- synchronized (resultList()) {
- resultList().add(value);
- }
- }
-
- @Override
- public void close() throws Exception {
- super.close();
- }
-
- @SuppressWarnings("unchecked")
- private List<T> resultList() {
- synchronized (TestListWrapper.getInstance()) {
- return (List<T>) TestListWrapper.getInstance().getList(resultListId);
- }
- }
-
- public List<T> getResult() {
- synchronized (resultList()) {
- ArrayList<T> copiedList = new ArrayList<T>(resultList());
- return copiedList;
- }
- }
-
- public List<T> getSortedResult() {
- synchronized (resultList()) {
- TreeSet<T> treeSet = new TreeSet<T>(resultList());
- ArrayList<T> sortedList = new ArrayList<T>(treeSet);
- return sortedList;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestListWrapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestListWrapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestListWrapper.java
deleted file mode 100644
index 751f836..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestListWrapper.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-public class TestListWrapper {
-
- private static TestListWrapper instance;
-
- @SuppressWarnings("rawtypes")
- private List<List<? extends Comparable>> lists;
-
- @SuppressWarnings("rawtypes")
- private TestListWrapper() {
- lists = Collections.synchronizedList(new ArrayList<List<? extends Comparable>>());
- }
-
- public static TestListWrapper getInstance() {
- if (instance == null) {
- instance = new TestListWrapper();
- }
- return instance;
- }
-
- /**
- * Creates and stores a list, returns with the id.
- *
- * @return The ID of the list.
- */
- @SuppressWarnings("rawtypes")
- public int createList() {
- lists.add(new ArrayList<Comparable>());
- return lists.size() - 1;
- }
-
- public List<?> getList(int listId) {
- @SuppressWarnings("rawtypes")
- List<? extends Comparable> list = lists.get(listId);
- if (list == null) {
- throw new RuntimeException("No such list.");
- }
-
- return list;
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
deleted file mode 100644
index 8cd1e4a..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import com.google.common.base.Preconditions;
-import org.apache.flink.api.common.JobExecutionResult;
-import org.apache.flink.runtime.jobgraph.JobGraph;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironmentFactory;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
-
-/**
- * A StreamExecutionEnvironment that executes its jobs on a test cluster.
- */
-public class TestStreamEnvironment extends StreamExecutionEnvironment {
-
- /** The mini cluster in which this environment executes its jobs */
- private ForkableFlinkMiniCluster executor;
-
-
- public TestStreamEnvironment(ForkableFlinkMiniCluster executor, int parallelism) {
- this.executor = Preconditions.checkNotNull(executor);
- setParallelism(parallelism);
- }
-
- @Override
- public JobExecutionResult execute(String jobName) throws Exception {
- final JobGraph jobGraph = getStreamGraph().getJobGraph(jobName);
- return executor.submitJobAndWait(jobGraph, false);
- }
-
- // ------------------------------------------------------------------------
-
- /**
- * Sets the streaming context environment to a TestStreamEnvironment that runs its programs on
- * the given cluster with the given default parallelism.
- *
- * @param cluster The test cluster to run the test program on.
- * @param parallelism The default parallelism for the test programs.
- */
- public static void setAsContext(final ForkableFlinkMiniCluster cluster, final int parallelism) {
-
- StreamExecutionEnvironmentFactory factory = new StreamExecutionEnvironmentFactory() {
- @Override
- public StreamExecutionEnvironment createExecutionEnvironment() {
- return new TestStreamEnvironment(cluster, parallelism);
- }
- };
-
- initializeContextEnvironment(factory);
- }
-
- /**
- * Resets the streaming context environment to null.
- */
- public static void unsetAsContext() {
- resetContextEnvironment();
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
deleted file mode 100644
index c586db3..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TwoInputStreamOperatorTestHarness.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.execution.Environment;
-import org.apache.flink.runtime.operators.testutils.MockEnvironment;
-import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
-import org.apache.flink.runtime.state.StateBackend;
-import org.apache.flink.runtime.state.memory.MemoryStateBackend;
-import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.mockito.stubbing.OngoingStubbing;
-
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-/**
- * A test harness for testing a {@link TwoInputStreamOperator}.
- *
- * <p>
- * This mock task provides the operator with a basic runtime context and allows pushing elements
- * and watermarks into the operator. {@link java.util.Deque}s containing the emitted elements
- * and watermarks can be retrieved. you are free to modify these.
- */
-public class TwoInputStreamOperatorTestHarness<IN1, IN2, OUT> {
-
- TwoInputStreamOperator<IN1, IN2, OUT> operator;
-
- final ConcurrentLinkedQueue<Object> outputList;
-
- final ExecutionConfig executionConfig;
-
- final Object checkpointLock;
-
- public TwoInputStreamOperatorTestHarness(TwoInputStreamOperator<IN1, IN2, OUT> operator) {
- this(operator, new StreamConfig(new Configuration()));
- }
-
- public TwoInputStreamOperatorTestHarness(TwoInputStreamOperator<IN1, IN2, OUT> operator, StreamConfig config) {
- this.operator = operator;
- this.outputList = new ConcurrentLinkedQueue<Object>();
- this.executionConfig = new ExecutionConfig();
- this.checkpointLock = new Object();
-
- Environment env = new MockEnvironment("MockTwoInputTask", 3 * 1024 * 1024, new MockInputSplitProvider(), 1024);
- StreamTask<?, ?> mockTask = mock(StreamTask.class);
- when(mockTask.getName()).thenReturn("Mock Task");
- when(mockTask.getCheckpointLock()).thenReturn(checkpointLock);
- when(mockTask.getConfiguration()).thenReturn(config);
- when(mockTask.getEnvironment()).thenReturn(env);
- when(mockTask.getExecutionConfig()).thenReturn(executionConfig);
-
- // ugly Java generic hacks
- @SuppressWarnings("unchecked")
- OngoingStubbing<StateBackend<?>> stubbing =
- (OngoingStubbing<StateBackend<?>>) (OngoingStubbing<?>) when(mockTask.getStateBackend());
- stubbing.thenReturn(MemoryStateBackend.defaultInstance());
-
- operator.setup(mockTask, new StreamConfig(new Configuration()), new MockOutput());
- }
-
- /**
- * Get all the output from the task. This contains StreamRecords and Events interleaved. Use
- * {@link org.apache.flink.streaming.util.TestHarnessUtil#getStreamRecordsFromOutput(java.util.List)}
- * to extract only the StreamRecords.
- */
- public ConcurrentLinkedQueue<Object> getOutput() {
- return outputList;
- }
-
-
- /**
- * Calls {@link org.apache.flink.streaming.api.operators.StreamOperator#open()}.
- */
- public void open() throws Exception {
- operator.open();
- }
-
- /**
- * Calls close on the operator.
- */
- public void close() throws Exception {
- operator.close();
- }
-
- public void processElement1(StreamRecord<IN1> element) throws Exception {
- operator.processElement1(element);
- }
-
- public void processElement2(StreamRecord<IN2> element) throws Exception {
- operator.processElement2(element);
- }
-
- public void processWatermark1(Watermark mark) throws Exception {
- operator.processWatermark1(mark);
- }
-
- public void processWatermark2(Watermark mark) throws Exception {
- operator.processWatermark2(mark);
- }
-
- private class MockOutput implements Output<StreamRecord<OUT>> {
-
- private TypeSerializer<OUT> outputSerializer;
-
- @Override
- @SuppressWarnings("unchecked")
- public void emitWatermark(Watermark mark) {
- outputList.add(mark);
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public void collect(StreamRecord<OUT> element) {
- if (outputSerializer == null) {
- outputSerializer = TypeExtractor.getForObject(element.getValue()).createSerializer(executionConfig);
- }
- outputList.add(new StreamRecord<>(outputSerializer.copy(element.getValue()),
- element.getTimestamp()));
- }
-
- @Override
- public void close() {
- // ignore
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TypeInformationSerializationSchemaTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TypeInformationSerializationSchemaTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TypeInformationSerializationSchemaTest.java
deleted file mode 100644
index 1c0f850..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TypeInformationSerializationSchemaTest.java
+++ /dev/null
@@ -1,118 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.core.testutils.CommonTestUtils;
-import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
-
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Date;
-import java.util.List;
-
-import static org.junit.Assert.*;
-
-public class TypeInformationSerializationSchemaTest {
-
- @Test
- public void testDeSerialization() {
- try {
- TypeInformation<MyPOJO> info = TypeExtractor.getForClass(MyPOJO.class);
-
- TypeInformationSerializationSchema<MyPOJO> schema =
- new TypeInformationSerializationSchema<MyPOJO>(info, new ExecutionConfig());
-
- MyPOJO[] types = {
- new MyPOJO(72, new Date(763784523L), new Date(88234L)),
- new MyPOJO(-1, new Date(11111111111111L)),
- new MyPOJO(42),
- new MyPOJO(17, new Date(222763784523L))
- };
-
- for (MyPOJO val : types) {
- byte[] serialized = schema.serialize(val);
- MyPOJO deser = schema.deserialize(serialized);
- assertEquals(val, deser);
- }
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testSerializability() {
- try {
- TypeInformation<MyPOJO> info = TypeExtractor.getForClass(MyPOJO.class);
- TypeInformationSerializationSchema<MyPOJO> schema =
- new TypeInformationSerializationSchema<MyPOJO>(info, new ExecutionConfig());
-
- // this needs to succeed
- CommonTestUtils.createCopySerializable(schema);
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- // ------------------------------------------------------------------------
- // Test data types
- // ------------------------------------------------------------------------
-
- public static class MyPOJO {
-
- public int aField;
- public List<Date> aList;
-
- public MyPOJO() {}
-
- public MyPOJO(int iVal, Date... dates) {
- this.aField = iVal;
- this.aList = new ArrayList<>(Arrays.asList(dates));
- }
-
- @Override
- public int hashCode() {
- return aField;
- }
-
- @Override
- public boolean equals(Object obj) {
- if (obj instanceof MyPOJO) {
- MyPOJO that = (MyPOJO) obj;
- return this.aField == that.aField && (this.aList == null ?
- that.aList == null :
- that.aList != null && this.aList.equals(that.aList));
- }
- return super.equals(obj);
- }
-
- @Override
- public String toString() {
- return String.format("MyPOJO " + aField + " " + aList);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/keys/ArrayKeySelectorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/keys/ArrayKeySelectorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/keys/ArrayKeySelectorTest.java
deleted file mode 100644
index 63375a7..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/keys/ArrayKeySelectorTest.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util.keys;
-
-import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
-import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.junit.Test;
-
-import static org.junit.Assert.*;
-
-public class ArrayKeySelectorTest {
-
- @Test
- public void testObjectArrays() {
- try {
- String[] array1 = { "a", "b", "c", "d", "e" };
- String[] array2 = { "v", "w", "x", "y", "z" };
-
- KeySelectorUtil.ArrayKeySelector<String[]> singleFieldSelector =
- KeySelectorUtil.getSelectorForArray(new int[] {1}, BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO);
-
- assertEquals(new Tuple1<>("b"), singleFieldSelector.getKey(array1));
- assertEquals(new Tuple1<>("w"), singleFieldSelector.getKey(array2));
-
- KeySelectorUtil.ArrayKeySelector<String[]> twoFieldsSelector =
- KeySelectorUtil.getSelectorForArray(new int[] {3, 0}, BasicArrayTypeInfo.STRING_ARRAY_TYPE_INFO);
-
- assertEquals(new Tuple2<>("d", "a"), twoFieldsSelector.getKey(array1));
- assertEquals(new Tuple2<>("y", "v"), twoFieldsSelector.getKey(array2));
-
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-
- @Test
- public void testPrimitiveArrays() {
- try {
- int[] array1 = { 1, 2, 3, 4, 5 };
- int[] array2 = { -5, -4, -3, -2, -1, 0 };
-
- KeySelectorUtil.ArrayKeySelector<int[]> singleFieldSelector =
- KeySelectorUtil.getSelectorForArray(new int[] {1}, PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO);
-
- assertEquals(new Tuple1<>(2), singleFieldSelector.getKey(array1));
- assertEquals(new Tuple1<>(-4), singleFieldSelector.getKey(array2));
-
- KeySelectorUtil.ArrayKeySelector<int[]> twoFieldsSelector =
- KeySelectorUtil.getSelectorForArray(new int[] {3, 0}, PrimitiveArrayTypeInfo.INT_PRIMITIVE_ARRAY_TYPE_INFO);
-
- assertEquals(new Tuple2<>(4, 1), twoFieldsSelector.getKey(array1));
- assertEquals(new Tuple2<>(-2, -5), twoFieldsSelector.getKey(array2));
-
- }
- catch (Exception e) {
- e.printStackTrace();
- fail(e.getMessage());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/resources/log4j-test.properties b/flink-staging/flink-streaming/flink-streaming-core/src/test/resources/log4j-test.properties
deleted file mode 100644
index 0b686e5..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/resources/log4j-test.properties
+++ /dev/null
@@ -1,27 +0,0 @@
-################################################################################
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-# Set root logger level to DEBUG and its only appender to A1.
-log4j.rootLogger=OFF, A1
-
-# A1 is set to be a ConsoleAppender.
-log4j.appender.A1=org.apache.log4j.ConsoleAppender
-
-# A1 uses PatternLayout.
-log4j.appender.A1.layout=org.apache.log4j.PatternLayout
-log4j.appender.A1.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/resources/log4j.properties b/flink-staging/flink-streaming/flink-streaming-core/src/test/resources/log4j.properties
deleted file mode 100644
index ed2bbcb..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,27 +0,0 @@
-################################################################################
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-# This file ensures that tests executed from the IDE show log output
-
-log4j.rootLogger=OFF, console
-
-# Log all infos in the given file
-log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.target = System.err
-log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/06f6ac5d/flink-staging/flink-streaming/flink-streaming-core/src/test/resources/logback-test.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/resources/logback-test.xml b/flink-staging/flink-streaming/flink-streaming-core/src/test/resources/logback-test.xml
deleted file mode 100644
index 4f56748..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/resources/logback-test.xml
+++ /dev/null
@@ -1,30 +0,0 @@
-<!--
- ~ Licensed to the Apache Software Foundation (ASF) under one
- ~ or more contributor license agreements. See the NOTICE file
- ~ distributed with this work for additional information
- ~ regarding copyright ownership. The ASF licenses this file
- ~ to you under the Apache License, Version 2.0 (the
- ~ "License"); you may not use this file except in compliance
- ~ with the License. You may obtain a copy of the License at
- ~
- ~ http://www.apache.org/licenses/LICENSE-2.0
- ~
- ~ Unless required by applicable law or agreed to in writing, software
- ~ distributed under the License is distributed on an "AS IS" BASIS,
- ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- ~ See the License for the specific language governing permissions and
- ~ limitations under the License.
- -->
-
-<configuration>
- <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
- <encoder>
- <pattern>%d{HH:mm:ss.SSS} [%thread] %-5level %logger{60} %X{sourceThread} - %msg%n</pattern>
- </encoder>
- </appender>
-
- <root level="WARN">
- <appender-ref ref="STDOUT"/>
- </root>
- <logger name="org.apache.flink.runtime.client.JobClient" level="OFF"/>
-</configuration>
\ No newline at end of file