You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/04/15 11:38:48 UTC
[07/19] flink git commit: [streaming] Major internal renaming and
restructure
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/NullableCircularBuffer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/NullableCircularBuffer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/NullableCircularBuffer.java
deleted file mode 100644
index aa2a1bc..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/NullableCircularBuffer.java
+++ /dev/null
@@ -1,362 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.state;
-
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
-import java.util.AbstractCollection;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-
-import org.apache.commons.collections.BoundedCollection;
-import org.apache.commons.collections.Buffer;
-import org.apache.commons.collections.BufferUnderflowException;
-
-@SuppressWarnings("rawtypes")
-public class NullableCircularBuffer extends AbstractCollection implements Buffer,
- BoundedCollection, Serializable {
-
- /** Serialization version */
- private static final long serialVersionUID = 5603722811189451017L;
-
- /** Underlying storage array */
- private transient Object[] elements;
-
- /** Array index of first (oldest) buffer element */
- private transient int start = 0;
-
- /**
- * Index mod maxElements of the array position following the last buffer
- * element. Buffer elements start at elements[start] and "wrap around"
- * elements[maxElements-1], ending at elements[decrement(end)]. For example,
- * elements = {c,a,b}, start=1, end=1 corresponds to the buffer [a,b,c].
- */
- private transient int end = 0;
-
- /** Flag to indicate if the buffer is currently full. */
- private transient boolean full = false;
-
- /** Capacity of the buffer */
- private final int maxElements;
-
- /**
- * Constructs a new <code>BoundedFifoBuffer</code> big enough to hold 32
- * elements.
- */
- public NullableCircularBuffer() {
- this(32);
- }
-
- /**
- * Constructs a new <code>BoundedFifoBuffer</code> big enough to hold the
- * specified number of elements.
- *
- * @param size
- * the maximum number of elements for this fifo
- * @throws IllegalArgumentException
- * if the size is less than 1
- */
- public NullableCircularBuffer(int size) {
- if (size <= 0) {
- throw new IllegalArgumentException("The size must be greater than 0");
- }
- elements = new Object[size];
- maxElements = elements.length;
- }
-
- /**
- * Constructs a new <code>BoundedFifoBuffer</code> big enough to hold all of
- * the elements in the specified collection. That collection's elements will
- * also be added to the buffer.
- *
- * @param coll
- * the collection whose elements to add, may not be null
- * @throws NullPointerException
- * if the collection is null
- */
- @SuppressWarnings("unchecked")
- public NullableCircularBuffer(Collection coll) {
- this(coll.size());
- addAll(coll);
- }
-
- // -----------------------------------------------------------------------
- /**
- * Write the buffer out using a custom routine.
- *
- * @param out
- * the output stream
- * @throws IOException
- */
- private void writeObject(ObjectOutputStream out) throws IOException {
- out.defaultWriteObject();
- out.writeInt(size());
- for (Iterator it = iterator(); it.hasNext();) {
- out.writeObject(it.next());
- }
- }
-
- /**
- * Read the buffer in using a custom routine.
- *
- * @param in
- * the input stream
- * @throws IOException
- * @throws ClassNotFoundException
- */
- private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
- in.defaultReadObject();
- elements = new Object[maxElements];
- int size = in.readInt();
- for (int i = 0; i < size; i++) {
- elements[i] = in.readObject();
- }
- start = 0;
- full = (size == maxElements);
- if (full) {
- end = 0;
- } else {
- end = size;
- }
- }
-
- // -----------------------------------------------------------------------
- /**
- * Returns the number of elements stored in the buffer.
- *
- * @return this buffer's size
- */
- public int size() {
- int size = 0;
-
- if (end < start) {
- size = maxElements - start + end;
- } else if (end == start) {
- size = (full ? maxElements : 0);
- } else {
- size = end - start;
- }
-
- return size;
- }
-
- /**
- * Returns true if this buffer is empty; false otherwise.
- *
- * @return true if this buffer is empty
- */
- public boolean isEmpty() {
- return size() == 0;
- }
-
- /**
- * Returns true if this collection is full and no new elements can be added.
- *
- * @return <code>true</code> if the collection is full
- */
- public boolean isFull() {
- return size() == maxElements;
- }
-
- /**
- * Gets the maximum size of the collection (the bound).
- *
- * @return the maximum number of elements the collection can hold
- */
- public int maxSize() {
- return maxElements;
- }
-
- /**
- * Clears this buffer.
- */
- public void clear() {
- full = false;
- start = 0;
- end = 0;
- Arrays.fill(elements, null);
- }
-
- /**
- * Adds the given element to this buffer.
- *
- * @param element
- * the element to add
- * @return true, always
- */
- public boolean add(Object element) {
-
- if (isFull()) {
- remove();
- }
-
- elements[end++] = element;
-
- if (end >= maxElements) {
- end = 0;
- }
-
- if (end == start) {
- full = true;
- }
-
- return true;
- }
-
- /**
- * Returns the least recently inserted element in this buffer.
- *
- * @return the least recently inserted element
- * @throws BufferUnderflowException
- * if the buffer is empty
- */
- public Object get() {
- if (isEmpty()) {
- throw new BufferUnderflowException("The buffer is already empty");
- }
-
- return elements[start];
- }
-
- /**
- * Removes the least recently inserted element from this buffer.
- *
- * @return the least recently inserted element
- * @throws BufferUnderflowException
- * if the buffer is empty
- */
- public Object remove() {
- if (isEmpty()) {
- throw new BufferUnderflowException("The buffer is already empty");
- }
-
- Object element = elements[start];
-
- elements[start++] = null;
-
- if (start >= maxElements) {
- start = 0;
- }
-
- full = false;
-
- return element;
- }
-
- /**
- * Increments the internal index.
- *
- * @param index
- * the index to increment
- * @return the updated index
- */
- private int increment(int index) {
- index++;
- if (index >= maxElements) {
- index = 0;
- }
- return index;
- }
-
- /**
- * Decrements the internal index.
- *
- * @param index
- * the index to decrement
- * @return the updated index
- */
- private int decrement(int index) {
- index--;
- if (index < 0) {
- index = maxElements - 1;
- }
- return index;
- }
-
- /**
- * Returns an iterator over this buffer's elements.
- *
- * @return an iterator over this buffer's elements
- */
- public Iterator iterator() {
- return new Iterator() {
-
- private int index = start;
- private int lastReturnedIndex = -1;
- private boolean isFirst = full;
-
- public boolean hasNext() {
- return isFirst || (index != end);
-
- }
-
- public Object next() {
- if (!hasNext()) {
- throw new NoSuchElementException();
- }
- isFirst = false;
- lastReturnedIndex = index;
- index = increment(index);
- return elements[lastReturnedIndex];
- }
-
- public void remove() {
- if (lastReturnedIndex == -1) {
- throw new IllegalStateException();
- }
-
- // First element can be removed quickly
- if (lastReturnedIndex == start) {
- NullableCircularBuffer.this.remove();
- lastReturnedIndex = -1;
- return;
- }
-
- int pos = lastReturnedIndex + 1;
- if (start < lastReturnedIndex && pos < end) {
- // shift in one part
- System.arraycopy(elements, pos, elements, lastReturnedIndex, end - pos);
- } else {
- // Other elements require us to shift the subsequent
- // elements
- while (pos != end) {
- if (pos >= maxElements) {
- elements[pos - 1] = elements[0];
- pos = 0;
- } else {
- elements[decrement(pos)] = elements[pos];
- pos = increment(pos);
- }
- }
- }
-
- lastReturnedIndex = -1;
- end = decrement(end);
- elements[end] = null;
- full = false;
- index = decrement(index);
- }
-
- };
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/PartitionableState.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/PartitionableState.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/PartitionableState.java
deleted file mode 100644
index 1c67c9e..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/state/PartitionableState.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.state;
-
-import org.apache.flink.runtime.state.OperatorState;
-
-/**
- * Base class for representing operator states that can be repartitioned for
- * state state and load balancing.
- *
- * @param <T>
- * The type of the operator state.
- */
-public abstract class PartitionableState<T> extends OperatorState<T> {
-
- private static final long serialVersionUID = 1L;
-
- PartitionableState(T initialState) {
- super(initialState);
- }
-
- /**
- * Repartitions(divides) the current state into the given number of new
- * partitions. The created partitions will be used to redistribute then
- * rebuild the state among the parallel instances of the operator. The
- * implementation should reflect the partitioning of the input values to
- * maintain correct operator behavior.
- *
- * </br> </br> It is also assumed that if we would {@link #reBuild} the
- * repartitioned state we would basically get the same as before.
- *
- *
- * @param numberOfPartitions
- * The desired number of partitions. The method must return an
- * array of that size.
- * @return The array containing the state part for each partition.
- */
- public abstract OperatorState<T>[] repartition(int numberOfPartitions);
-
- /**
- * Rebuilds the current state partition from the given parts. Used for
- * building the state after a re-balance phase.
- *
- * @param parts
- * The state parts that will be used to rebuild the current
- * partition.
- * @return The rebuilt operator state.
- */
- public abstract OperatorState<T> reBuild(OperatorState<T>... parts);
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
index 54a7692..f489334 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java
@@ -25,11 +25,11 @@ import org.apache.flink.api.java.operators.Keys;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.aggregation.AggregationFunction.AggregationType;
-import org.apache.flink.streaming.api.function.aggregation.ComparableAggregator;
-import org.apache.flink.streaming.api.function.aggregation.SumAggregator;
-import org.apache.flink.streaming.api.invokable.operator.GroupedReduceInvokable;
-import org.apache.flink.streaming.api.invokable.operator.StreamReduceInvokable;
+import org.apache.flink.streaming.api.functions.aggregation.ComparableAggregator;
+import org.apache.flink.streaming.api.functions.aggregation.SumAggregator;
+import org.apache.flink.streaming.api.functions.aggregation.AggregationFunction.AggregationType;
+import org.apache.flink.streaming.api.operators.StreamGroupedReduce;
+import org.apache.flink.streaming.api.operators.StreamReduce;
import org.apache.flink.streaming.util.MockContext;
import org.apache.flink.streaming.util.keys.KeySelectorUtil;
import org.junit.Test;
@@ -107,13 +107,13 @@ public class AggregationFunctionTest {
ReduceFunction<Integer> maxFunction0 = ComparableAggregator.getAggregator(0, type2,
AggregationType.MAX);
List<Tuple2<Integer, Integer>> sumList = MockContext.createAndExecute(
- new StreamReduceInvokable<Tuple2<Integer, Integer>>(sumFunction), getInputList());
+ new StreamReduce<Tuple2<Integer, Integer>>(sumFunction), getInputList());
List<Tuple2<Integer, Integer>> minList = MockContext.createAndExecute(
- new StreamReduceInvokable<Tuple2<Integer, Integer>>(minFunction), getInputList());
+ new StreamReduce<Tuple2<Integer, Integer>>(minFunction), getInputList());
List<Tuple2<Integer, Integer>> maxList = MockContext.createAndExecute(
- new StreamReduceInvokable<Tuple2<Integer, Integer>>(maxFunction), getInputList());
+ new StreamReduce<Tuple2<Integer, Integer>>(maxFunction), getInputList());
TypeInformation<Tuple2<Integer, Integer>> typeInfo = TypeExtractor
.getForObject(new Tuple2<Integer, Integer>(1, 1));
@@ -123,15 +123,15 @@ public class AggregationFunctionTest {
typeInfo, new ExecutionConfig());
List<Tuple2<Integer, Integer>> groupedSumList = MockContext.createAndExecute(
- new GroupedReduceInvokable<Tuple2<Integer, Integer>>(sumFunction, keySelector),
+ new StreamGroupedReduce<Tuple2<Integer, Integer>>(sumFunction, keySelector),
getInputList());
List<Tuple2<Integer, Integer>> groupedMinList = MockContext.createAndExecute(
- new GroupedReduceInvokable<Tuple2<Integer, Integer>>(minFunction, keySelector),
+ new StreamGroupedReduce<Tuple2<Integer, Integer>>(minFunction, keySelector),
getInputList());
List<Tuple2<Integer, Integer>> groupedMaxList = MockContext.createAndExecute(
- new GroupedReduceInvokable<Tuple2<Integer, Integer>>(maxFunction, keySelector),
+ new StreamGroupedReduce<Tuple2<Integer, Integer>>(maxFunction, keySelector),
getInputList());
assertEquals(expectedSumList, sumList);
@@ -141,11 +141,11 @@ public class AggregationFunctionTest {
assertEquals(expectedGroupMinList, groupedMinList);
assertEquals(expectedGroupMaxList, groupedMaxList);
assertEquals(expectedSumList0, MockContext.createAndExecute(
- new StreamReduceInvokable<Integer>(sumFunction0), simpleInput));
+ new StreamReduce<Integer>(sumFunction0), simpleInput));
assertEquals(expectedMinList0, MockContext.createAndExecute(
- new StreamReduceInvokable<Integer>(minFunction0), simpleInput));
+ new StreamReduce<Integer>(minFunction0), simpleInput));
assertEquals(expectedMaxList0, MockContext.createAndExecute(
- new StreamReduceInvokable<Integer>(maxFunction0), simpleInput));
+ new StreamReduce<Integer>(maxFunction0), simpleInput));
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
try {
@@ -229,11 +229,11 @@ public class AggregationFunctionTest {
ReduceFunction<Integer> maxFunction0 = ComparableAggregator.getAggregator(0, type2, AggregationType.MAX);
List<MyPojo> sumList = MockContext.createAndExecute(
- new StreamReduceInvokable<MyPojo>(sumFunction), getInputPojoList());
+ new StreamReduce<MyPojo>(sumFunction), getInputPojoList());
List<MyPojo> minList = MockContext.createAndExecute(
- new StreamReduceInvokable<MyPojo>(minFunction), getInputPojoList());
+ new StreamReduce<MyPojo>(minFunction), getInputPojoList());
List<MyPojo> maxList = MockContext.createAndExecute(
- new StreamReduceInvokable<MyPojo>(maxFunction), getInputPojoList());
+ new StreamReduce<MyPojo>(maxFunction), getInputPojoList());
TypeInformation<MyPojo> typeInfo = TypeExtractor.getForObject(new MyPojo(1, 1));
KeySelector<MyPojo, ?> keySelector = KeySelectorUtil.getSelectorForKeys(
@@ -241,13 +241,13 @@ public class AggregationFunctionTest {
typeInfo, config);
List<MyPojo> groupedSumList = MockContext.createAndExecute(
- new GroupedReduceInvokable<MyPojo>(sumFunction, keySelector),
+ new StreamGroupedReduce<MyPojo>(sumFunction, keySelector),
getInputPojoList());
List<MyPojo> groupedMinList = MockContext.createAndExecute(
- new GroupedReduceInvokable<MyPojo>(minFunction, keySelector),
+ new StreamGroupedReduce<MyPojo>(minFunction, keySelector),
getInputPojoList());
List<MyPojo> groupedMaxList = MockContext.createAndExecute(
- new GroupedReduceInvokable<MyPojo>(maxFunction, keySelector),
+ new StreamGroupedReduce<MyPojo>(maxFunction, keySelector),
getInputPojoList());
assertEquals(expectedSumList, sumList);
@@ -257,11 +257,11 @@ public class AggregationFunctionTest {
assertEquals(expectedGroupMinList, groupedMinList);
assertEquals(expectedGroupMaxList, groupedMaxList);
assertEquals(expectedSumList0, MockContext.createAndExecute(
- new StreamReduceInvokable<Integer>(sumFunction0), simpleInput));
+ new StreamReduce<Integer>(sumFunction0), simpleInput));
assertEquals(expectedMinList0, MockContext.createAndExecute(
- new StreamReduceInvokable<Integer>(minFunction0), simpleInput));
+ new StreamReduce<Integer>(minFunction0), simpleInput));
assertEquals(expectedMaxList0, MockContext.createAndExecute(
- new StreamReduceInvokable<Integer>(maxFunction0), simpleInput));
+ new StreamReduce<Integer>(maxFunction0), simpleInput));
}
@Test
@@ -324,16 +324,16 @@ public class AggregationFunctionTest {
minByLastExpected.add(new Tuple2<Integer, Integer>(0, 6));
assertEquals(maxByFirstExpected, MockContext.createAndExecute(
- new StreamReduceInvokable<Tuple2<Integer, Integer>>(maxByFunctionFirst),
+ new StreamReduce<Tuple2<Integer, Integer>>(maxByFunctionFirst),
getInputList()));
assertEquals(maxByLastExpected, MockContext.createAndExecute(
- new StreamReduceInvokable<Tuple2<Integer, Integer>>(maxByFunctionLast),
+ new StreamReduce<Tuple2<Integer, Integer>>(maxByFunctionLast),
getInputList()));
assertEquals(minByLastExpected, MockContext.createAndExecute(
- new StreamReduceInvokable<Tuple2<Integer, Integer>>(minByFunctionLast),
+ new StreamReduce<Tuple2<Integer, Integer>>(minByFunctionLast),
getInputList()));
assertEquals(minByFirstExpected, MockContext.createAndExecute(
- new StreamReduceInvokable<Tuple2<Integer, Integer>>(minByFunctionFirst),
+ new StreamReduce<Tuple2<Integer, Integer>>(minByFunctionFirst),
getInputList()));
}
@@ -398,16 +398,16 @@ public class AggregationFunctionTest {
minByLastExpected.add(new MyPojo(0, 6));
assertEquals(maxByFirstExpected, MockContext.createAndExecute(
- new StreamReduceInvokable<MyPojo>(maxByFunctionFirst),
+ new StreamReduce<MyPojo>(maxByFunctionFirst),
getInputPojoList()));
assertEquals(maxByLastExpected, MockContext.createAndExecute(
- new StreamReduceInvokable<MyPojo>(maxByFunctionLast),
+ new StreamReduce<MyPojo>(maxByFunctionLast),
getInputPojoList()));
assertEquals(minByLastExpected, MockContext.createAndExecute(
- new StreamReduceInvokable<MyPojo>(minByFunctionLast),
+ new StreamReduce<MyPojo>(minByFunctionLast),
getInputPojoList()));
assertEquals(minByFirstExpected, MockContext.createAndExecute(
- new StreamReduceInvokable<MyPojo>(minByFunctionFirst),
+ new StreamReduce<MyPojo>(minByFunctionFirst),
getInputPojoList()));
}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoStreamTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoStreamTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoStreamTest.java
index f527de4..4228314 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoStreamTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/CoStreamTest.java
@@ -30,8 +30,8 @@ import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.co.CoFlatMapFunction;
-import org.apache.flink.streaming.api.invokable.StreamInvokable;
+import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
+import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.util.TestListResultSink;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.util.Collector;
@@ -89,7 +89,7 @@ public class CoStreamTest {
public boolean filter(Tuple2<Integer, Integer> value) throws Exception {
return true;
}
- }).setChainingStrategy(StreamInvokable.ChainingStrategy.NEVER).groupBy(new KeySelector<Tuple2<Integer, Integer>, Integer>() {
+ }).setChainingStrategy(StreamOperator.ChainingStrategy.NEVER).groupBy(new KeySelector<Tuple2<Integer, Integer>, Integer>() {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
index 31bd147..39c3f31 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
@@ -21,7 +21,7 @@ import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.IterativeDataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.util.Collector;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceTest.java
index 792c5d2..e72f2d9 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/SourceTest.java
@@ -27,8 +27,8 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import org.apache.flink.streaming.api.function.source.FromElementsFunction;
-import org.apache.flink.streaming.api.function.source.SocketTextStreamFunction;
+import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
+import org.apache.flink.streaming.api.functions.source.SocketTextStreamFunction;
import org.apache.flink.streaming.util.MockCollector;
import org.apache.flink.streaming.util.MockSource;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
index 9a19dde..f163b9e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/TypeFillTest.java
@@ -28,10 +28,10 @@ import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.co.CoFlatMapFunction;
-import org.apache.flink.streaming.api.function.co.CoMapFunction;
-import org.apache.flink.streaming.api.function.co.CoReduceFunction;
-import org.apache.flink.streaming.api.function.co.CoWindowFunction;
+import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
+import org.apache.flink.streaming.api.functions.co.CoMapFunction;
+import org.apache.flink.streaming.api.functions.co.CoReduceFunction;
+import org.apache.flink.streaming.api.functions.co.CoWindowFunction;
import org.apache.flink.util.Collector;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
index bc7fe73..fc3e36f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
@@ -29,7 +29,7 @@ import java.util.Map;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.SplitDataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.sink.SinkFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.util.TestListResultSink;
import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java
index 49b3bf8..32da578 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java
@@ -21,8 +21,8 @@ import static org.junit.Assert.assertArrayEquals;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.streaming.api.streamvertex.MockRecordWriter;
+import org.apache.flink.streaming.api.streamtask.MockRecordWriter;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.MockRecordWriterFactory;
import org.apache.flink.util.Collector;
import org.junit.Test;
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokableTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokableTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokableTest.java
deleted file mode 100644
index 969a06b..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokableTest.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.streaming.util.MockContext;
-import org.junit.Test;
-
-public class CounterInvokableTest {
-
- @Test
- public void counterTest() {
- CounterInvokable<String> invokable = new CounterInvokable<String>();
-
- List<Long> expected = Arrays.asList(1L, 2L, 3L);
- List<Long> actual = MockContext.createAndExecute(invokable, Arrays.asList("one", "two", "three"));
-
- assertEquals(expected, actual);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java
deleted file mode 100644
index 403dd17..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.streaming.util.MockContext;
-import org.junit.Test;
-
-public class FilterTest implements Serializable {
- private static final long serialVersionUID = 1L;
-
- static class MyFilter implements FilterFunction<Integer> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public boolean filter(Integer value) throws Exception {
- return value % 2 == 0;
- }
- }
-
- @Test
- public void test() {
- FilterInvokable<Integer> invokable = new FilterInvokable<Integer>(new MyFilter());
-
- List<Integer> expected = Arrays.asList(2, 4, 6);
- List<Integer> actual = MockContext.createAndExecute(invokable, Arrays.asList(1, 2, 3, 4, 5, 6, 7));
-
- assertEquals(expected, actual);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FlatMapTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FlatMapTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FlatMapTest.java
deleted file mode 100644
index 7424e21..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FlatMapTest.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.streaming.util.MockContext;
-import org.apache.flink.util.Collector;
-import org.junit.Test;
-
-public class FlatMapTest {
-
- public static final class MyFlatMap implements FlatMapFunction<Integer, Integer> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public void flatMap(Integer value, Collector<Integer> out) throws Exception {
- if (value % 2 == 0) {
- out.collect(value);
- out.collect(value * value);
- }
- }
- }
-
- @Test
- public void flatMapTest() {
- FlatMapInvokable<Integer, Integer> invokable = new FlatMapInvokable<Integer, Integer>(new MyFlatMap());
-
- List<Integer> expected = Arrays.asList(2, 4, 4, 16, 6, 36, 8, 64);
- List<Integer> actual = MockContext.createAndExecute(invokable, Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8));
-
- assertEquals(expected, actual);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedFoldInvokableTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedFoldInvokableTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedFoldInvokableTest.java
deleted file mode 100644
index 01375d2..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedFoldInvokableTest.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.FoldFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.util.MockContext;
-
-import org.junit.Test;
-
-public class GroupedFoldInvokableTest {
-
- private static class MyFolder implements FoldFunction<Integer, String> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public String fold(String accumulator, Integer value) throws Exception {
- return accumulator + value.toString();
- }
-
- }
-
- @Test
- public void test() {
- TypeInformation<String> outType = TypeExtractor.getForObject("A string");
-
- GroupedFoldInvokable<Integer, String> invokable1 = new GroupedFoldInvokable<Integer, String>(
- new MyFolder(), new KeySelector<Integer, String>() {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public String getKey(Integer value) throws Exception {
- return value.toString();
- }
- }, "100", outType);
-
- List<String> expected = Arrays.asList("1001","10011", "1002", "10022", "1003");
- List<String> actual = MockContext.createAndExecute(invokable1,
- Arrays.asList(1, 1, 2, 2, 3));
-
- assertEquals(expected, actual);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokableTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokableTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokableTest.java
deleted file mode 100644
index ce47c67..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokableTest.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.streaming.util.MockContext;
-import org.junit.Test;
-
-public class GroupedReduceInvokableTest {
-
- private static class MyReducer implements ReduceFunction<Integer> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public Integer reduce(Integer value1, Integer value2) throws Exception {
- return value1 + value2;
- }
-
- }
-
- @Test
- public void test() {
- GroupedReduceInvokable<Integer> invokable1 = new GroupedReduceInvokable<Integer>(
- new MyReducer(), new KeySelector<Integer, Integer>() {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public Integer getKey(Integer value) throws Exception {
- return value;
- }
- });
-
- List<Integer> expected = Arrays.asList(1, 2, 2, 4, 3);
- List<Integer> actual = MockContext.createAndExecute(invokable1,
- Arrays.asList(1, 1, 2, 2, 3));
-
- assertEquals(expected, actual);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/MapTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/MapTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/MapTest.java
deleted file mode 100644
index 5390ec9..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/MapTest.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.streaming.util.MockContext;
-import org.junit.Test;
-
-public class MapTest {
-
- private static class Map implements MapFunction<Integer, String> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public String map(Integer value) throws Exception {
- return "+" + (value + 1);
- }
- }
-
- @Test
- public void mapInvokableTest() {
- MapInvokable<Integer, String> invokable = new MapInvokable<Integer, String>(new Map());
-
- List<String> expectedList = Arrays.asList("+2", "+3", "+4");
- List<String> actualList = MockContext.createAndExecute(invokable, Arrays.asList(1, 2, 3));
-
- assertEquals(expectedList, actualList);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/ProjectTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/ProjectTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/ProjectTest.java
deleted file mode 100644
index 11c44cd..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/ProjectTest.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple5;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.datastream.StreamProjection;
-import org.apache.flink.streaming.util.MockContext;
-import org.junit.Test;
-
-public class ProjectTest implements Serializable {
- private static final long serialVersionUID = 1L;
-
- @Test
- public void test() {
-
- TypeInformation<Tuple5<Integer, String, Integer, String, Integer>> inType = TypeExtractor
- .getForObject(new Tuple5<Integer, String, Integer, String, Integer>(2, "a", 3, "b",
- 4));
-
- int[] fields = new int[] { 4, 4, 3 };
- Class<?>[] classes = new Class<?>[] { Integer.class, Integer.class, String.class };
-
- @SuppressWarnings("unchecked")
- ProjectInvokable<Tuple5<Integer, String, Integer, String, Integer>, Tuple3<Integer, Integer, String>> invokable = new ProjectInvokable<Tuple5<Integer, String, Integer, String, Integer>, Tuple3<Integer, Integer, String>>(
- fields,
- (TypeInformation<Tuple3<Integer, Integer, String>>) StreamProjection
- .extractFieldTypes(fields, classes, inType));
-
- List<Tuple5<Integer, String, Integer, String, Integer>> input = new ArrayList<Tuple5<Integer, String, Integer, String, Integer>>();
- input.add(new Tuple5<Integer, String, Integer, String, Integer>(2, "a", 3, "b", 4));
- input.add(new Tuple5<Integer, String, Integer, String, Integer>(2, "s", 3, "c", 2));
- input.add(new Tuple5<Integer, String, Integer, String, Integer>(2, "a", 3, "c", 2));
- input.add(new Tuple5<Integer, String, Integer, String, Integer>(2, "a", 3, "a", 7));
-
- List<Tuple3<Integer, Integer, String>> expected = new ArrayList<Tuple3<Integer, Integer, String>>();
- expected.add(new Tuple3<Integer, Integer, String>(4, 4, "b"));
- expected.add(new Tuple3<Integer, Integer, String>(2, 2, "c"));
- expected.add(new Tuple3<Integer, Integer, String>(2, 2, "c"));
- expected.add(new Tuple3<Integer, Integer, String>(7, 7, "a"));
-
- assertEquals(expected, MockContext.createAndExecute(invokable, input));
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldTest.java
deleted file mode 100644
index 90a133b..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamFoldTest.java
+++ /dev/null
@@ -1,56 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.FoldFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.util.MockContext;
-import org.junit.Test;
-
-public class StreamFoldTest {
-
- private static class MyFolder implements FoldFunction<Integer, String> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public String fold(String accumulator, Integer value) throws Exception {
- return accumulator + value.toString();
- }
- }
-
- @Test
- public void test() {
- TypeInformation<String> outType = TypeExtractor.getForObject("A string");
- StreamFoldInvokable<Integer, String> invokable1 = new StreamFoldInvokable<Integer, String>(
- new MyFolder(), "", outType);
-
- List<String> expected = Arrays.asList("1","11","112","1123","11233");
- List<String> actual = MockContext.createAndExecute(invokable1,
- Arrays.asList(1, 1, 2, 3, 3));
-
- assertEquals(expected, actual);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceTest.java
deleted file mode 100644
index ae866e6..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceTest.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.streaming.util.MockContext;
-import org.junit.Test;
-
-public class StreamReduceTest {
-
- private static class MyReducer implements ReduceFunction<Integer>{
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public Integer reduce(Integer value1, Integer value2) throws Exception {
- return value1+value2;
- }
-
- }
-
- @Test
- public void test() {
- StreamReduceInvokable<Integer> invokable1 = new StreamReduceInvokable<Integer>(
- new MyReducer());
-
- List<Integer> expected = Arrays.asList(1,2,4,7,10);
- List<Integer> actual = MockContext.createAndExecute(invokable1,
- Arrays.asList(1, 1, 2, 3, 3));
-
- assertEquals(expected, actual);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/co/CoFlatMapTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/co/CoFlatMapTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/co/CoFlatMapTest.java
deleted file mode 100644
index 195e67c..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/co/CoFlatMapTest.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.co;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.co.CoFlatMapFunction;
-import org.apache.flink.streaming.api.invokable.operator.co.CoFlatMapInvokable;
-import org.apache.flink.streaming.util.MockCoContext;
-import org.apache.flink.util.Collector;
-import org.junit.Test;
-
-public class CoFlatMapTest implements Serializable {
- private static final long serialVersionUID = 1L;
-
- private final static class MyCoFlatMap implements CoFlatMapFunction<String, Integer, String> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void flatMap1(String value, Collector<String> coll) {
- for (int i = 0; i < value.length(); i++) {
- coll.collect(value.substring(i, i + 1));
- }
- }
-
- @Override
- public void flatMap2(Integer value, Collector<String> coll) {
- coll.collect(value.toString());
- }
- }
-
- @Test
- public void coFlatMapTest() {
- CoFlatMapInvokable<String, Integer, String> invokable = new CoFlatMapInvokable<String, Integer, String>(
- new MyCoFlatMap());
-
- List<String> expectedList = Arrays.asList("a", "b", "c", "1", "d", "e", "f", "2", "g", "h",
- "e", "3", "4", "5");
- List<String> actualList = MockCoContext.createAndExecute(invokable,
- Arrays.asList("abc", "def", "ghe"), Arrays.asList(1, 2, 3, 4, 5));
-
- assertEquals(expectedList, actualList);
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void multipleInputTest() {
- LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
-
- DataStream<Integer> ds1 = env.fromElements(1, 3, 5);
- DataStream<Integer> ds2 = env.fromElements(2, 4).merge(ds1);
-
- try {
- ds1.forward().merge(ds2);
- fail();
- } catch (RuntimeException e) {
- // expected
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedReduceTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedReduceTest.java
deleted file mode 100644
index a531884..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupedReduceTest.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.co;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.streaming.api.function.co.CoReduceFunction;
-import org.apache.flink.streaming.api.invokable.operator.co.CoGroupedReduceInvokable;
-import org.apache.flink.streaming.util.MockCoContext;
-import org.junit.Test;
-
-public class CoGroupedReduceTest {
-
- private final static class MyCoReduceFunction implements
- CoReduceFunction<Tuple3<String, String, String>, Tuple2<Integer, Integer>, String> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Tuple3<String, String, String> reduce1(Tuple3<String, String, String> value1,
- Tuple3<String, String, String> value2) {
- return new Tuple3<String, String, String>(value1.f0, value1.f1 + value2.f1, value1.f2);
- }
-
- @Override
- public Tuple2<Integer, Integer> reduce2(Tuple2<Integer, Integer> value1,
- Tuple2<Integer, Integer> value2) {
- return new Tuple2<Integer, Integer>(value1.f0, value1.f1 + value2.f1);
- }
-
- @Override
- public String map1(Tuple3<String, String, String> value) {
- return value.f1;
- }
-
- @Override
- public String map2(Tuple2<Integer, Integer> value) {
- return value.f1.toString();
- }
- }
-
- @SuppressWarnings("unchecked")
- @Test
- public void coGroupedReduceTest() {
- Tuple3<String, String, String> word1 = new Tuple3<String, String, String>("a", "word1", "b");
- Tuple3<String, String, String> word2 = new Tuple3<String, String, String>("b", "word2", "a");
- Tuple3<String, String, String> word3 = new Tuple3<String, String, String>("a", "word3", "a");
- Tuple2<Integer, Integer> int1 = new Tuple2<Integer, Integer>(2, 1);
- Tuple2<Integer, Integer> int2 = new Tuple2<Integer, Integer>(1, 2);
- Tuple2<Integer, Integer> int3 = new Tuple2<Integer, Integer>(0, 3);
- Tuple2<Integer, Integer> int4 = new Tuple2<Integer, Integer>(2, 4);
- Tuple2<Integer, Integer> int5 = new Tuple2<Integer, Integer>(1, 5);
-
- KeySelector<Tuple3<String, String, String>, ?> keySelector0 = new KeySelector<Tuple3<String, String, String>, String>() {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public String getKey(Tuple3<String, String, String> value) throws Exception {
- return value.f0;
- }
- };
-
- KeySelector<Tuple2<Integer, Integer>, ?> keySelector1 = new KeySelector<Tuple2<Integer, Integer>, Integer>() {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public Integer getKey(Tuple2<Integer, Integer> value) throws Exception {
- return value.f0;
- }
- };
-
- KeySelector<Tuple3<String, String, String>, ?> keySelector2 = new KeySelector<Tuple3<String, String, String>, String>() {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public String getKey(Tuple3<String, String, String> value) throws Exception {
- return value.f2;
- }
- };
-
- CoGroupedReduceInvokable<Tuple3<String, String, String>, Tuple2<Integer, Integer>, String> invokable = new CoGroupedReduceInvokable<Tuple3<String, String, String>, Tuple2<Integer, Integer>, String>(
- new MyCoReduceFunction(), keySelector0, keySelector1);
-
- List<String> expected = Arrays.asList("word1", "1", "word2", "2", "word1word3", "3", "5",
- "7");
-
- List<String> actualList = MockCoContext.createAndExecute(invokable,
- Arrays.asList(word1, word2, word3), Arrays.asList(int1, int2, int3, int4, int5));
-
- assertEquals(expected, actualList);
-
- invokable = new CoGroupedReduceInvokable<Tuple3<String, String, String>, Tuple2<Integer, Integer>, String>(
- new MyCoReduceFunction(), keySelector2, keySelector1);
-
- expected = Arrays.asList("word1", "1", "word2", "2", "word2word3", "3", "5", "7");
-
- actualList = MockCoContext.createAndExecute(invokable, Arrays.asList(word1, word2, word3),
- Arrays.asList(int1, int2, int3, int4, int5));
-
- assertEquals(expected, actualList);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapTest.java
deleted file mode 100644
index 6b84440..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapTest.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.co;
-
-import static org.junit.Assert.assertEquals;
-
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.streaming.api.function.co.CoMapFunction;
-import org.apache.flink.streaming.api.invokable.operator.co.CoMapInvokable;
-import org.apache.flink.streaming.util.MockCoContext;
-import org.junit.Test;
-
-public class CoMapTest implements Serializable {
- private static final long serialVersionUID = 1L;
-
- private final static class MyCoMap implements CoMapFunction<Double, Integer, String> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public String map1(Double value) {
- return value.toString();
- }
-
- @Override
- public String map2(Integer value) {
- return value.toString();
- }
- }
-
- @Test
- public void coMapTest() {
- CoMapInvokable<Double, Integer, String> invokable = new CoMapInvokable<Double, Integer, String>(new MyCoMap());
-
- List<String> expectedList = Arrays.asList("1.1", "1", "1.2", "2", "1.3", "3", "1.4", "1.5");
- List<String> actualList = MockCoContext.createAndExecute(invokable, Arrays.asList(1.1, 1.2, 1.3, 1.4, 1.5), Arrays.asList(1, 2, 3));
-
- assertEquals(expectedList, actualList);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/co/CoStreamReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/co/CoStreamReduceTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/co/CoStreamReduceTest.java
deleted file mode 100644
index 7f23fba..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/co/CoStreamReduceTest.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.co;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.flink.streaming.api.function.co.CoReduceFunction;
-import org.apache.flink.streaming.api.invokable.operator.co.CoReduceInvokable;
-import org.apache.flink.streaming.util.MockCoContext;
-import org.junit.Test;
-
-public class CoStreamReduceTest {
-
- public static class MyCoReduceFunction implements
- CoReduceFunction<Integer, String, Integer> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public Integer reduce1(Integer value1, Integer value2) {
- return value1 * value2;
- }
-
- @Override
- public String reduce2(String value1, String value2) {
- return value1 + value2;
- }
-
- @Override
- public Integer map1(Integer value) {
- return value;
- }
-
- @Override
- public Integer map2(String value) {
- return Integer.parseInt(value);
- }
-
- }
-
- @Test
- public void coStreamReduceTest() {
-
- CoReduceInvokable<Integer, String, Integer> coReduce = new CoReduceInvokable<Integer, String, Integer>(
- new MyCoReduceFunction());
-
- List<Integer> expected1 = Arrays.asList(1, 9, 2, 99, 6, 998, 24);
- List<Integer> result = MockCoContext.createAndExecute(coReduce,
- Arrays.asList(1, 2, 3, 4), Arrays.asList("9", "9", "8"));
-
- assertEquals(expected1, result);
-
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowTest.java
deleted file mode 100644
index a4f6120..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/co/CoWindowTest.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.co;
-
-import static org.junit.Assert.assertEquals;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.function.co.CoWindowFunction;
-import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable;
-import org.apache.flink.streaming.api.windowing.helper.Timestamp;
-import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
-import org.apache.flink.streaming.util.MockCoContext;
-import org.apache.flink.util.Collector;
-import org.junit.Test;
-
-public class CoWindowTest {
-
- public static final class MyCoGroup1 implements CoWindowFunction<Integer, Integer, Integer> {
-
- private static final long serialVersionUID = 1L;
-
- @SuppressWarnings("unused")
- @Override
- public void coWindow(List<Integer> first, List<Integer> second, Collector<Integer> out)
- throws Exception {
- Integer count1 = 0;
- for (Integer i : first) {
- count1++;
- }
- Integer count2 = 0;
- for (Integer i : second) {
- count2++;
- }
- out.collect(count1);
- out.collect(count2);
-
- }
-
- }
-
- public static final class MyCoGroup2 implements
- CoWindowFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Integer> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public void coWindow(List<Tuple2<Integer, Integer>> first,
- List<Tuple2<Integer, Integer>> second, Collector<Integer> out) throws Exception {
-
- Set<Integer> firstElements = new HashSet<Integer>();
- for (Tuple2<Integer, Integer> value : first) {
- firstElements.add(value.f1);
- }
- for (Tuple2<Integer, Integer> value : second) {
- if (firstElements.contains(value.f1)) {
- out.collect(value.f1);
- }
- }
-
- }
-
- }
-
- private static final class MyTS1 implements Timestamp<Integer> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public long getTimestamp(Integer value) {
- return value;
- }
-
- }
-
- private static final class MyTS2 implements Timestamp<Tuple2<Integer, Integer>> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public long getTimestamp(Tuple2<Integer, Integer> value) {
- return value.f0;
- }
-
- }
-
- @Test
- public void coWindowGroupReduceTest2() throws Exception {
-
- CoWindowInvokable<Integer, Integer, Integer> invokable1 = new CoWindowInvokable<Integer, Integer, Integer>(
- new MyCoGroup1(), 2, 1, new TimestampWrapper<Integer>(new MyTS1(), 1),
- new TimestampWrapper<Integer>(new MyTS1(), 1));
-
- // Windowsize 2, slide 1
- // 1,2|2,3|3,4|4,5
-
- List<Integer> input11 = new ArrayList<Integer>();
- input11.add(1);
- input11.add(1);
- input11.add(2);
- input11.add(3);
- input11.add(3);
-
- List<Integer> input12 = new ArrayList<Integer>();
- input12.add(1);
- input12.add(2);
- input12.add(3);
- input12.add(3);
- input12.add(5);
-
- // Windows: (1,1,2)(1,1,2)|(2,3,3)(2,3,3)|(3,3)(3,3)|(5)(5)
- // expected output: 3,2|3,3|2,2|0,1
-
- List<Integer> expected1 = new ArrayList<Integer>();
- expected1.add(3);
- expected1.add(2);
- expected1.add(3);
- expected1.add(3);
- expected1.add(2);
- expected1.add(2);
- expected1.add(0);
- expected1.add(1);
-
- List<Integer> actual1 = MockCoContext.createAndExecute(invokable1, input11, input12);
- assertEquals(expected1, actual1);
-
- CoWindowInvokable<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Integer> invokable2 = new CoWindowInvokable<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>, Integer>(
- new MyCoGroup2(), 2, 3, new TimestampWrapper<Tuple2<Integer, Integer>>(new MyTS2(),
- 1), new TimestampWrapper<Tuple2<Integer, Integer>>(new MyTS2(), 1));
-
- // WindowSize 2, slide 3
- // 1,2|4,5|7,8|
-
- List<Tuple2<Integer, Integer>> input21 = new ArrayList<Tuple2<Integer, Integer>>();
- input21.add(new Tuple2<Integer, Integer>(1, 1));
- input21.add(new Tuple2<Integer, Integer>(1, 2));
- input21.add(new Tuple2<Integer, Integer>(2, 3));
- input21.add(new Tuple2<Integer, Integer>(3, 4));
- input21.add(new Tuple2<Integer, Integer>(3, 5));
- input21.add(new Tuple2<Integer, Integer>(4, 6));
- input21.add(new Tuple2<Integer, Integer>(4, 7));
- input21.add(new Tuple2<Integer, Integer>(5, 8));
-
- List<Tuple2<Integer, Integer>> input22 = new ArrayList<Tuple2<Integer, Integer>>();
- input22.add(new Tuple2<Integer, Integer>(1, 1));
- input22.add(new Tuple2<Integer, Integer>(2, 0));
- input22.add(new Tuple2<Integer, Integer>(2, 2));
- input22.add(new Tuple2<Integer, Integer>(3, 9));
- input22.add(new Tuple2<Integer, Integer>(3, 4));
- input22.add(new Tuple2<Integer, Integer>(4, 10));
- input22.add(new Tuple2<Integer, Integer>(5, 8));
- input22.add(new Tuple2<Integer, Integer>(5, 7));
-
- List<Integer> expected2 = new ArrayList<Integer>();
- expected2.add(1);
- expected2.add(2);
- expected2.add(8);
- expected2.add(7);
-
- List<Integer> actual2 = MockCoContext.createAndExecute(invokable2, input21, input22);
- assertEquals(expected2, actual2);
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/4754a97b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/co/SelfConnectionTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/co/SelfConnectionTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/co/SelfConnectionTest.java
deleted file mode 100644
index b58245a..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/co/SelfConnectionTest.java
+++ /dev/null
@@ -1,252 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.co;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.function.co.CoMapFunction;
-import org.apache.flink.streaming.api.invokable.StreamInvokable;
-import org.apache.flink.streaming.api.windowing.helper.Timestamp;
-import org.apache.flink.streaming.util.TestListResultSink;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
-import org.apache.flink.util.Collector;
-import org.junit.Test;
-
-public class SelfConnectionTest implements Serializable {
-
- private static final long serialVersionUID = 1L;
-
- private final int MEMORY_SIZE = 32;
-
- private static List<String> expected;
-
- @SuppressWarnings({ "unchecked", "rawtypes" })
- @Test
- public void sameDataStreamTest() {
-
- StreamExecutionEnvironment env = new TestStreamEnvironment(3, MEMORY_SIZE);
-
- TestListResultSink<String> resultSink = new TestListResultSink<String>();
-
- Timestamp<Integer> timeStamp = new Timestamp<Integer>() {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public long getTimestamp(Integer value) {
- return value;
- }
-
- };
-
- KeySelector keySelector = new KeySelector<Integer, Integer>() {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public Integer getKey(Integer value) throws Exception {
- return value;
- }
- };
-
- DataStream<Integer> src = env.fromElements(1, 3, 5);
-
- @SuppressWarnings("unused")
- DataStream<Tuple2<Integer, Integer>> dataStream =
- src.join(src).onWindow(50L, timeStamp, timeStamp).where(keySelector).equalTo(keySelector)
- .map(new MapFunction<Tuple2<Integer, Integer>, String>() {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public String map(Tuple2<Integer, Integer> value) throws Exception {
- return value.toString();
- }
- })
- .addSink(resultSink);
-
-
- try {
- env.execute();
-
- expected = new ArrayList<String>();
-
- expected.addAll(Arrays.asList("(1,1)", "(3,3)", "(5,5)"));
-
- List<String> result = resultSink.getResult();
-
- Collections.sort(expected);
- Collections.sort(result);
-
- assertEquals(expected, result);
- } catch (Exception e) {
- fail();
- e.printStackTrace();
- }
- }
-
- /**
- * We connect two different data streams in a chain to a CoMap.
- */
- @Test
- public void differentDataStreamSameChain() {
-
- TestListResultSink<String> resultSink = new TestListResultSink<String>();
-
- StreamExecutionEnvironment env = new TestStreamEnvironment(1, MEMORY_SIZE);
-
- DataStream<Integer> src = env.fromElements(1, 3, 5);
-
- DataStream<String> stringMap = src.map(new MapFunction<Integer, String>() {
- private static final long serialVersionUID = 1L;
-
- @Override
- public String map(Integer value) throws Exception {
- return "x " + value;
- }
- }).setChainingStrategy(StreamInvokable.ChainingStrategy.ALWAYS);
-
- stringMap.connect(src).map(new CoMapFunction<String, Integer, String>() {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public String map1(String value) {
- return value;
- }
-
- @Override
- public String map2(Integer value) {
- return String.valueOf(value + 1);
- }
- }).addSink(resultSink);
-
- try {
- env.execute();
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- expected = new ArrayList<String>();
-
- expected.addAll(Arrays.asList("x 1", "x 3", "x 5", "2", "4", "6"));
-
- List<String> result = resultSink.getResult();
-
- Collections.sort(expected);
- Collections.sort(result);
-
- assertEquals(expected, result);
- }
-
- /**
- * We connect two different data streams in different chains to a CoMap.
- * (This is not actually self-connect.)
- */
- @Test
- public void differentDataStreamDifferentChain() {
-
- TestListResultSink<String> resultSink = new TestListResultSink<String>();
-
- StreamExecutionEnvironment env = new TestStreamEnvironment(3, MEMORY_SIZE);
-
- DataStream<Integer> src = env.fromElements(1, 3, 5).setChainingStrategy(StreamInvokable.ChainingStrategy.NEVER);
-
- DataStream<String> stringMap = src.flatMap(new FlatMapFunction<Integer, String>() {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public void flatMap(Integer value, Collector<String> out) throws Exception {
- out.collect("x " + value);
- }
- }).groupBy(new KeySelector<String, Integer>() {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public Integer getKey(String value) throws Exception {
- return value.length();
- }
- });
-
- DataStream<Long> longMap = src.map(new MapFunction<Integer, Long>() {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public Long map(Integer value) throws Exception {
- return Long.valueOf(value + 1);
- }
- }).groupBy(new KeySelector<Long, Long>() {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public Long getKey(Long value) throws Exception {
- return value;
- }
- });
-
-
- stringMap.connect(longMap).map(new CoMapFunction<String, Long, String>() {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public String map1(String value) {
- return value;
- }
-
- @Override
- public String map2(Long value) {
- return value.toString();
- }
- }).addSink(resultSink);
-
- try {
- env.execute();
- } catch (Exception e) {
- e.printStackTrace();
- }
-
- expected = new ArrayList<String>();
-
- expected.addAll(Arrays.asList("x 1", "x 3", "x 5", "2", "4", "6"));
-
- List<String> result = resultSink.getResult();
-
- Collections.sort(expected);
- Collections.sort(result);
-
- assertEquals(expected, result);
- }
-}