You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/12/18 19:59:53 UTC
[2/6] incubator-flink git commit: [streaming] StreamInvokable rework
for simpler logic and easier use
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokableTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokableTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokableTest.java
index 70b9d7e..969a06b 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokableTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CounterInvokableTest.java
@@ -22,7 +22,7 @@ import static org.junit.Assert.assertEquals;
import java.util.Arrays;
import java.util.List;
-import org.apache.flink.streaming.util.MockInvokable;
+import org.apache.flink.streaming.util.MockContext;
import org.junit.Test;
public class CounterInvokableTest {
@@ -32,7 +32,7 @@ public class CounterInvokableTest {
CounterInvokable<String> invokable = new CounterInvokable<String>();
List<Long> expected = Arrays.asList(1L, 2L, 3L);
- List<Long> actual = MockInvokable.createAndExecute(invokable, Arrays.asList("one", "two", "three"));
+ List<Long> actual = MockContext.createAndExecute(invokable, Arrays.asList("one", "two", "three"));
assertEquals(expected, actual);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java
index e06e0ef..403dd17 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java
@@ -24,7 +24,7 @@ import java.util.Arrays;
import java.util.List;
import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.streaming.util.MockInvokable;
+import org.apache.flink.streaming.util.MockContext;
import org.junit.Test;
public class FilterTest implements Serializable {
@@ -44,7 +44,7 @@ public class FilterTest implements Serializable {
FilterInvokable<Integer> invokable = new FilterInvokable<Integer>(new MyFilter());
List<Integer> expected = Arrays.asList(2, 4, 6);
- List<Integer> actual = MockInvokable.createAndExecute(invokable, Arrays.asList(1, 2, 3, 4, 5, 6, 7));
+ List<Integer> actual = MockContext.createAndExecute(invokable, Arrays.asList(1, 2, 3, 4, 5, 6, 7));
assertEquals(expected, actual);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FlatMapTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FlatMapTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FlatMapTest.java
index a89de50..7424e21 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FlatMapTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FlatMapTest.java
@@ -23,7 +23,7 @@ import java.util.Arrays;
import java.util.List;
import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.streaming.util.MockInvokable;
+import org.apache.flink.streaming.util.MockContext;
import org.apache.flink.util.Collector;
import org.junit.Test;
@@ -47,7 +47,7 @@ public class FlatMapTest {
FlatMapInvokable<Integer, Integer> invokable = new FlatMapInvokable<Integer, Integer>(new MyFlatMap());
List<Integer> expected = Arrays.asList(2, 4, 4, 16, 6, 36, 8, 64);
- List<Integer> actual = MockInvokable.createAndExecute(invokable, Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8));
+ List<Integer> actual = MockContext.createAndExecute(invokable, Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8));
assertEquals(expected, actual);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokableTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokableTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokableTest.java
index 0b68207..ceaccf3 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokableTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedReduceInvokableTest.java
@@ -23,7 +23,7 @@ import java.util.Arrays;
import java.util.List;
import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.streaming.util.MockInvokable;
+import org.apache.flink.streaming.util.MockContext;
import org.apache.flink.streaming.util.keys.ObjectKeySelector;
import org.junit.Test;
@@ -46,7 +46,7 @@ public class GroupedReduceInvokableTest {
new MyReducer(), new ObjectKeySelector<Integer>());
List<Integer> expected = Arrays.asList(1, 2, 2, 4, 3);
- List<Integer> actual = MockInvokable.createAndExecute(invokable1,
+ List<Integer> actual = MockContext.createAndExecute(invokable1,
Arrays.asList(1, 1, 2, 2, 3));
assertEquals(expected, actual);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokableTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokableTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokableTest.java
index 5ac5529..c3a48d5 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokableTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokableTest.java
@@ -39,7 +39,7 @@ import org.apache.flink.streaming.api.windowing.policy.TimeEvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy;
-import org.apache.flink.streaming.util.MockInvokable;
+import org.apache.flink.streaming.util.MockContext;
import org.apache.flink.streaming.util.keys.TupleKeySelector;
import org.junit.Test;
@@ -206,7 +206,7 @@ public class GroupedWindowInvokableTest {
GroupedWindowInvokable<Integer, Integer> invokable = new GroupedWindowInvokable<Integer, Integer>(
reduceFunction, keySelector, triggers, evictions, centralTriggers, null);
- List<Integer> result = MockInvokable.createAndExecute(invokable, inputs);
+ List<Integer> result = MockContext.createAndExecute(invokable, inputs);
List<Integer> actual = new LinkedList<Integer>();
for (Integer current : result) {
@@ -225,7 +225,7 @@ public class GroupedWindowInvokableTest {
invokable = new GroupedWindowInvokable<Integer, Integer>(
reduceFunction, keySelector, triggers, null, centralTriggers,centralEvictions);
- result = MockInvokable.createAndExecute(invokable, inputs);
+ result = MockContext.createAndExecute(invokable, inputs);
actual = new LinkedList<Integer>();
for (Integer current : result) {
actual.add(current);
@@ -282,7 +282,7 @@ public class GroupedWindowInvokableTest {
}, new TupleKeySelector<Tuple2<Integer, String>>(1), triggers, evictions,
centralTriggers, null);
- List<Tuple2<Integer, String>> result = MockInvokable.createAndExecute(invokable2, inputs2);
+ List<Tuple2<Integer, String>> result = MockContext.createAndExecute(invokable2, inputs2);
List<Tuple2<Integer, String>> actual2 = new LinkedList<Tuple2<Integer, String>>();
for (Tuple2<Integer, String> current : result) {
@@ -391,7 +391,7 @@ public class GroupedWindowInvokableTest {
distributedTriggers, evictions, triggers, null);
ArrayList<Tuple2<Integer, String>> result = new ArrayList<Tuple2<Integer, String>>();
- for (Tuple2<Integer, String> t : MockInvokable.createAndExecute(invokable, inputs)) {
+ for (Tuple2<Integer, String> t : MockContext.createAndExecute(invokable, inputs)) {
result.add(t);
}
@@ -411,7 +411,7 @@ public class GroupedWindowInvokableTest {
distributedTriggers, evictions, triggers, centralEvictions);
result = new ArrayList<Tuple2<Integer, String>>();
- for (Tuple2<Integer, String> t : MockInvokable.createAndExecute(invokable, inputs)) {
+ for (Tuple2<Integer, String> t : MockContext.createAndExecute(invokable, inputs)) {
result.add(t);
}
@@ -480,7 +480,7 @@ public class GroupedWindowInvokableTest {
}, distributedTriggers, evictions, triggers, null);
ArrayList<Integer> result = new ArrayList<Integer>();
- for (Integer t : MockInvokable.createAndExecute(invokable, inputs)) {
+ for (Integer t : MockContext.createAndExecute(invokable, inputs)) {
result.add(t);
}
@@ -556,7 +556,7 @@ public class GroupedWindowInvokableTest {
}, distributedTriggers, evictions, triggers, null);
ArrayList<Integer> result = new ArrayList<Integer>();
- for (Integer t : MockInvokable.createAndExecute(invokable, inputs)) {
+ for (Integer t : MockContext.createAndExecute(invokable, inputs)) {
result.add(t);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/MapTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/MapTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/MapTest.java
index 7124ff8..5390ec9 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/MapTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/MapTest.java
@@ -23,7 +23,7 @@ import java.util.Arrays;
import java.util.List;
import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.streaming.util.MockInvokable;
+import org.apache.flink.streaming.util.MockContext;
import org.junit.Test;
public class MapTest {
@@ -42,7 +42,7 @@ public class MapTest {
MapInvokable<Integer, String> invokable = new MapInvokable<Integer, String>(new Map());
List<String> expectedList = Arrays.asList("+2", "+3", "+4");
- List<String> actualList = MockInvokable.createAndExecute(invokable, Arrays.asList(1, 2, 3));
+ List<String> actualList = MockContext.createAndExecute(invokable, Arrays.asList(1, 2, 3));
assertEquals(expectedList, actualList);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/ProjectTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/ProjectTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/ProjectTest.java
index 288d4ee..11c44cd 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/ProjectTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/ProjectTest.java
@@ -28,7 +28,7 @@ import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.datastream.StreamProjection;
-import org.apache.flink.streaming.util.MockInvokable;
+import org.apache.flink.streaming.util.MockContext;
import org.junit.Test;
public class ProjectTest implements Serializable {
@@ -62,6 +62,6 @@ public class ProjectTest implements Serializable {
expected.add(new Tuple3<Integer, Integer, String>(2, 2, "c"));
expected.add(new Tuple3<Integer, Integer, String>(7, 7, "a"));
- assertEquals(expected, MockInvokable.createAndExecute(invokable, input));
+ assertEquals(expected, MockContext.createAndExecute(invokable, input));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceTest.java
index 68b8f8e..ae866e6 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/StreamReduceTest.java
@@ -23,7 +23,7 @@ import java.util.Arrays;
import java.util.List;
import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.streaming.util.MockInvokable;
+import org.apache.flink.streaming.util.MockContext;
import org.junit.Test;
public class StreamReduceTest {
@@ -45,7 +45,7 @@ public class StreamReduceTest {
new MyReducer());
List<Integer> expected = Arrays.asList(1,2,4,7,10);
- List<Integer> actual = MockInvokable.createAndExecute(invokable1,
+ List<Integer> actual = MockContext.createAndExecute(invokable1,
Arrays.asList(1, 1, 2, 3, 3));
assertEquals(expected, actual);
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokableTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokableTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokableTest.java
index 612da84..421a999 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokableTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokableTest.java
@@ -31,7 +31,7 @@ import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.TimeEvictionPolicy;
import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
-import org.apache.flink.streaming.util.MockInvokable;
+import org.apache.flink.streaming.util.MockContext;
import org.junit.Test;
public class WindowInvokableTest {
@@ -98,7 +98,7 @@ public class WindowInvokableTest {
myReduceFunction, triggers, evictions);
ArrayList<Integer> result = new ArrayList<Integer>();
- for (Integer t : MockInvokable.createAndExecute(invokable, inputs)) {
+ for (Integer t : MockContext.createAndExecute(invokable, inputs)) {
result.add(t);
}
@@ -148,7 +148,7 @@ public class WindowInvokableTest {
expected.add(24);
expected.add(19);
List<Integer> result = new ArrayList<Integer>();
- for (Integer t : MockInvokable.createAndExecute(invokable, inputs)) {
+ for (Integer t : MockContext.createAndExecute(invokable, inputs)) {
result.add(t);
}
assertEquals(expected, result);
@@ -199,7 +199,7 @@ public class WindowInvokableTest {
expected2.add(-4);
result = new ArrayList<Integer>();
- for (Integer t : MockInvokable.createAndExecute(invokable2, inputs2)) {
+ for (Integer t : MockContext.createAndExecute(invokable2, inputs2)) {
result.add(t);
}
@@ -253,7 +253,7 @@ public class WindowInvokableTest {
myReduceFunction, triggers, evictions);
ArrayList<Integer> result = new ArrayList<Integer>();
- for (Integer t : MockInvokable.createAndExecute(invokable, inputs)) {
+ for (Integer t : MockContext.createAndExecute(invokable, inputs)) {
result.add(t);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java
new file mode 100644
index 0000000..ea94f98
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoContext.java
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.StreamConfig;
+import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
+import org.apache.flink.streaming.api.streamrecord.StreamRecord;
+import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
+import org.apache.flink.streaming.api.streamvertex.StreamTaskContext;
+import org.apache.flink.streaming.io.CoReaderIterator;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+
+public class MockCoContext<IN1, IN2, OUT> implements StreamTaskContext<OUT> {
+ // private Collection<IN1> input1;
+ // private Collection<IN2> input2;
+ private Iterator<IN1> inputIterator1;
+ private Iterator<IN2> inputIterator2;
+ private List<OUT> outputs;
+
+ private Collector<OUT> collector;
+ private StreamRecordSerializer<IN1> inDeserializer1;
+ private CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> mockIterator;
+ private StreamRecordSerializer<IN2> inDeserializer2;
+
+ public MockCoContext(Collection<IN1> input1, Collection<IN2> input2) {
+
+ if (input1.isEmpty() || input2.isEmpty()) {
+ throw new RuntimeException("Inputs must not be empty");
+ }
+
+ this.inputIterator1 = input1.iterator();
+ this.inputIterator2 = input2.iterator();
+
+ TypeInformation<IN1> inTypeInfo1 = TypeExtractor.getForObject(input1.iterator().next());
+ inDeserializer1 = new StreamRecordSerializer<IN1>(inTypeInfo1);
+ TypeInformation<IN2> inTypeInfo2 = TypeExtractor.getForObject(input2.iterator().next());
+ inDeserializer2 = new StreamRecordSerializer<IN2>(inTypeInfo2);
+
+ mockIterator = new MockCoReaderIterator(inDeserializer1, inDeserializer2);
+
+ outputs = new ArrayList<OUT>();
+ collector = new MockCollector<OUT>(outputs);
+ }
+
+ private int currentInput = 1;
+ private StreamRecord<IN1> reuse1;
+ private StreamRecord<IN2> reuse2;
+
+ private class MockCoReaderIterator extends
+ CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> {
+
+ public MockCoReaderIterator(TypeSerializer<StreamRecord<IN1>> serializer1,
+ TypeSerializer<StreamRecord<IN2>> serializer2) {
+ super(null, serializer1, serializer2);
+ reuse1 = inDeserializer1.createInstance();
+ reuse2 = inDeserializer2.createInstance();
+ }
+
+ @Override
+ public int next(StreamRecord<IN1> target1, StreamRecord<IN2> target2) throws IOException {
+ this.delegate1.setInstance(target1);
+ this.delegate2.setInstance(target2);
+
+ int inputNumber = nextRecord();
+ target1.setObject(reuse1.getObject());
+ target2.setObject(reuse2.getObject());
+
+ return inputNumber;
+ }
+ }
+
+ private Integer nextRecord() {
+ if (inputIterator1.hasNext() && inputIterator2.hasNext()) {
+ switch (currentInput) {
+ case 1:
+ return next1();
+ case 2:
+ return next2();
+ default:
+ return 0;
+ }
+ }
+
+ if (inputIterator1.hasNext()) {
+ return next1();
+ }
+
+ if (inputIterator2.hasNext()) {
+ return next2();
+ }
+
+ return 0;
+ }
+
+ private int next1() {
+ reuse1 = inDeserializer1.createInstance();
+ reuse1.setObject(inputIterator1.next());
+ currentInput = 2;
+ return 1;
+ }
+
+ private int next2() {
+ reuse2 = inDeserializer2.createInstance();
+ reuse2.setObject(inputIterator2.next());
+ currentInput = 1;
+ return 2;
+ }
+
+ public List<OUT> getOutputs() {
+ return outputs;
+ }
+
+ public Collector<OUT> getCollector() {
+ return collector;
+ }
+
+ public StreamRecordSerializer<IN1> getInDeserializer1() {
+ return inDeserializer1;
+ }
+
+ public StreamRecordSerializer<IN2> getInDeserializer2() {
+ return inDeserializer2;
+ }
+
+ public CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> getIterator() {
+ return mockIterator;
+ }
+
+ public static <IN1, IN2, OUT> List<OUT> createAndExecute(CoInvokable<IN1, IN2, OUT> invokable,
+ List<IN1> input1, List<IN2> input2) {
+ MockCoContext<IN1, IN2, OUT> mockContext = new MockCoContext<IN1, IN2, OUT>(input1, input2);
+ invokable.setup(mockContext);
+
+ try {
+ invokable.open(null);
+ invokable.invoke();
+ invokable.close();
+ } catch (Exception e) {
+ throw new RuntimeException("Cannot invoke invokable.", e);
+ }
+
+ return mockContext.getOutputs();
+ }
+
+ @Override
+ public StreamConfig getConfig() {
+ return null;
+ }
+
+ @Override
+ public ClassLoader getUserCodeClassLoader() {
+ return null;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <X> MutableObjectIterator<X> getInput(int index) {
+ switch (index) {
+ case 0:
+ return (MutableObjectIterator<X>) inputIterator1;
+ case 1:
+ return (MutableObjectIterator<X>) inputIterator2;
+ default:
+ throw new IllegalArgumentException("CoStreamVertex has only 2 inputs");
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <X> StreamRecordSerializer<X> getInputSerializer(int index) {
+ switch (index) {
+ case 0:
+ return (StreamRecordSerializer<X>) inDeserializer1;
+ case 1:
+ return (StreamRecordSerializer<X>) inDeserializer2;
+ default:
+ throw new IllegalArgumentException("CoStreamVertex has only 2 inputs");
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <X, Y> CoReaderIterator<X, Y> getCoReader() {
+ return (CoReaderIterator<X, Y>) mockIterator;
+ }
+
+ @Override
+ public Collector<OUT> getOutputCollector() {
+ return collector;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoInvokable.java
deleted file mode 100644
index 39d3ab4..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoInvokable.java
+++ /dev/null
@@ -1,169 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
-import org.apache.flink.streaming.io.CoReaderIterator;
-import org.apache.flink.util.Collector;
-
-public class MockCoInvokable<IN1, IN2, OUT> {
- // private Collection<IN1> input1;
- // private Collection<IN2> input2;
- private Iterator<IN1> inputIterator1;
- private Iterator<IN2> inputIterator2;
- private List<OUT> outputs;
-
- private Collector<OUT> collector;
- private StreamRecordSerializer<IN1> inDeserializer1;
- private CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> mockIterator;
- private StreamRecordSerializer<IN2> inDeserializer2;
-
- public MockCoInvokable(Collection<IN1> input1, Collection<IN2> input2) {
-
- if (input1.isEmpty() || input2.isEmpty()) {
- throw new RuntimeException("Inputs must not be empty");
- }
-
- this.inputIterator1 = input1.iterator();
- this.inputIterator2 = input2.iterator();
-
- TypeInformation<IN1> inTypeInfo1 = TypeExtractor.getForObject(input1.iterator().next());
- inDeserializer1 = new StreamRecordSerializer<IN1>(inTypeInfo1);
- TypeInformation<IN2> inTypeInfo2 = TypeExtractor.getForObject(input2.iterator().next());
- inDeserializer2 = new StreamRecordSerializer<IN2>(inTypeInfo2);
-
- mockIterator = new MockCoReaderIterator(inDeserializer1, inDeserializer2);
-
- outputs = new ArrayList<OUT>();
- collector = new MockCollector<OUT>(outputs);
- }
-
- private int currentInput = 1;
- private StreamRecord<IN1> reuse1;
- private StreamRecord<IN2> reuse2;
-
- private class MockCoReaderIterator extends
- CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> {
-
- public MockCoReaderIterator(
- TypeSerializer<StreamRecord<IN1>> serializer1,
- TypeSerializer<StreamRecord<IN2>> serializer2) {
- super(null, serializer1, serializer2);
- reuse1 = inDeserializer1.createInstance();
- reuse2 = inDeserializer2.createInstance();
- }
-
- @Override
- public int next(StreamRecord<IN1> target1, StreamRecord<IN2> target2) throws IOException {
- this.delegate1.setInstance(target1);
- this.delegate2.setInstance(target2);
-
- int inputNumber = nextRecord();
- target1.setObject(reuse1.getObject());
- target2.setObject(reuse2.getObject());
-
- return inputNumber;
- }
- }
-
- private Integer nextRecord() {
- if (inputIterator1.hasNext() && inputIterator2.hasNext()) {
- switch (currentInput) {
- case 1:
- return next1();
- case 2:
- return next2();
- default:
- return 0;
- }
- }
-
- if (inputIterator1.hasNext()) {
- return next1();
- }
-
- if (inputIterator2.hasNext()) {
- return next2();
- }
-
- return 0;
- }
-
- private int next1() {
- reuse1 = inDeserializer1.createInstance();
- reuse1.setObject(inputIterator1.next());
- currentInput = 2;
- return 1;
- }
-
- private int next2() {
- reuse2 = inDeserializer2.createInstance();
- reuse2.setObject(inputIterator2.next());
- currentInput = 1;
- return 2;
- }
-
- public List<OUT> getOutputs() {
- return outputs;
- }
-
- public Collector<OUT> getCollector() {
- return collector;
- }
-
- public StreamRecordSerializer<IN1> getInDeserializer1() {
- return inDeserializer1;
- }
-
- public StreamRecordSerializer<IN2> getInDeserializer2() {
- return inDeserializer2;
- }
-
- public CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> getIterator() {
- return mockIterator;
- }
-
- public static <IN1, IN2, OUT> List<OUT> createAndExecute(CoInvokable<IN1, IN2, OUT> invokable,
- List<IN1> input1, List<IN2> input2) {
- MockCoInvokable<IN1, IN2, OUT> mock = new MockCoInvokable<IN1, IN2, OUT>(input1, input2);
- invokable.initialize(mock.getCollector(), mock.getIterator(), mock.getInDeserializer1(),
- mock.getInDeserializer2(), false);
-
- try {
- invokable.open(null);
- invokable.invoke();
- invokable.close();
- } catch (Exception e) {
- throw new RuntimeException("Cannot invoke invokable.", e);
- }
-
- return mock.getOutputs();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
new file mode 100644
index 0000000..87bedb2
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockContext.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.StreamConfig;
+import org.apache.flink.streaming.api.invokable.StreamInvokable;
+import org.apache.flink.streaming.api.streamrecord.StreamRecord;
+import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
+import org.apache.flink.streaming.api.streamvertex.StreamTaskContext;
+import org.apache.flink.streaming.io.CoReaderIterator;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+
+public class MockContext<IN, OUT> implements StreamTaskContext<OUT> {
+ private Collection<IN> inputs;
+ private List<OUT> outputs;
+
+ private Collector<OUT> collector;
+ private StreamRecordSerializer<IN> inDeserializer;
+ private MutableObjectIterator<StreamRecord<IN>> iterator;
+
+ public MockContext(Collection<IN> inputs) {
+ this.inputs = inputs;
+ if (inputs.isEmpty()) {
+ throw new RuntimeException("Inputs must not be empty");
+ }
+
+ TypeInformation<IN> inTypeInfo = TypeExtractor.getForObject(inputs.iterator().next());
+ inDeserializer = new StreamRecordSerializer<IN>(inTypeInfo);
+
+ iterator = new MockInputIterator();
+ outputs = new ArrayList<OUT>();
+ collector = new MockCollector<OUT>(outputs);
+ }
+
+ private class MockInputIterator implements MutableObjectIterator<StreamRecord<IN>> {
+ Iterator<IN> listIterator;
+
+ public MockInputIterator() {
+ listIterator = inputs.iterator();
+ }
+
+ @Override
+ public StreamRecord<IN> next(StreamRecord<IN> reuse) throws IOException {
+ if (listIterator.hasNext()) {
+ reuse.setObject(listIterator.next());
+ } else {
+ reuse = null;
+ }
+ return reuse;
+ }
+ }
+
+ public List<OUT> getOutputs() {
+ return outputs;
+ }
+
+ public Collector<OUT> getCollector() {
+ return collector;
+ }
+
+ public StreamRecordSerializer<IN> getInDeserializer() {
+ return inDeserializer;
+ }
+
+ public MutableObjectIterator<StreamRecord<IN>> getIterator() {
+ return iterator;
+ }
+
+ public static <IN, OUT> List<OUT> createAndExecute(StreamInvokable<IN, OUT> invokable,
+ List<IN> inputs) {
+ MockContext<IN, OUT> mockContext = new MockContext<IN, OUT>(inputs);
+ invokable.setup(mockContext);
+ try {
+ invokable.open(null);
+ invokable.invoke();
+ invokable.close();
+ } catch (Exception e) {
+ throw new RuntimeException("Cannot invoke invokable.", e);
+ }
+
+ return mockContext.getOutputs();
+ }
+
+ @Override
+ public StreamConfig getConfig() {
+ return null;
+ }
+
+ @Override
+ public ClassLoader getUserCodeClassLoader() {
+ return null;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <X> MutableObjectIterator<X> getInput(int index) {
+ if (index == 0) {
+ return (MutableObjectIterator<X>) iterator;
+ } else {
+ throw new IllegalArgumentException("There is only 1 input");
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public <X> StreamRecordSerializer<X> getInputSerializer(int index) {
+ if (index == 0) {
+ return (StreamRecordSerializer<X>) inDeserializer;
+ } else {
+ throw new IllegalArgumentException("There is only 1 input");
+ }
+ }
+
+ @Override
+ public Collector<OUT> getOutputCollector() {
+ return collector;
+ }
+
+ @Override
+ public <X, Y> CoReaderIterator<X, Y> getCoReader() {
+ throw new IllegalArgumentException("CoReader not available");
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/88f38e49/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockInvokable.java
deleted file mode 100644
index c06f53a..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockInvokable.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.invokable.StreamInvokable;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.MutableObjectIterator;
-
-public class MockInvokable<IN, OUT> {
- private Collection<IN> inputs;
- private List<OUT> outputs;
-
- private Collector<OUT> collector;
- private StreamRecordSerializer<IN> inDeserializer;
- private MutableObjectIterator<StreamRecord<IN>> iterator;
-
- public MockInvokable(Collection<IN> inputs) {
- this.inputs = inputs;
- if (inputs.isEmpty()) {
- throw new RuntimeException("Inputs must not be empty");
- }
-
- TypeInformation<IN> inTypeInfo = TypeExtractor.getForObject(inputs.iterator().next());
- inDeserializer = new StreamRecordSerializer<IN>(inTypeInfo);
-
- iterator = new MockInputIterator();
- outputs = new ArrayList<OUT>();
- collector = new MockCollector<OUT>(outputs);
- }
-
-
- private class MockInputIterator implements MutableObjectIterator<StreamRecord<IN>> {
- Iterator<IN> listIterator;
-
- public MockInputIterator() {
- listIterator = inputs.iterator();
- }
-
- @Override
- public StreamRecord<IN> next(StreamRecord<IN> reuse) throws IOException {
- if (listIterator.hasNext()) {
- reuse.setObject(listIterator.next());
- } else {
- reuse = null;
- }
- return reuse;
- }
- }
-
- public List<OUT> getOutputs() {
- return outputs;
- }
-
- public Collector<OUT> getCollector() {
- return collector;
- }
-
- public StreamRecordSerializer<IN> getInDeserializer() {
- return inDeserializer;
- }
-
- public MutableObjectIterator<StreamRecord<IN>> getIterator() {
- return iterator;
- }
-
- public static <IN, OUT> List<OUT> createAndExecute(StreamInvokable<IN, OUT> invokable, List<IN> inputs) {
- MockInvokable<IN, OUT> mock = new MockInvokable<IN, OUT>(inputs);
- invokable.initialize(mock.getCollector(), mock.getIterator(), mock.getInDeserializer(), false);
- try {
- invokable.open(null);
- invokable.invoke();
- invokable.close();
- } catch (Exception e) {
- throw new RuntimeException("Cannot invoke invokable.", e);
- }
-
- return mock.getOutputs();
- }
-
-}