You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by se...@apache.org on 2014/08/29 21:04:01 UTC
[28/28] git commit: [streaming] Added MockCoInvokable,
replaced environments with it in CoFunction tests
[streaming] Added MockCoInvokable, replaced environments with it in CoFunction tests
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/4207c5fb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/4207c5fb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/4207c5fb
Branch: refs/heads/master
Commit: 4207c5fb578555f093be7ad00afb8cc7dbcee8f5
Parents: 82aa005
Author: ghermann <re...@gmail.com>
Authored: Tue Aug 26 15:36:25 2014 +0200
Committer: Stephan Ewen <se...@apache.org>
Committed: Fri Aug 29 21:01:57 2014 +0200
----------------------------------------------------------------------
.../operator/co/CoFlatMapInvokable.java | 12 ++
.../operator/co/CoGroupReduceInvokable.java | 12 ++
.../invokable/operator/co/CoMapInvokable.java | 12 ++
.../operator/co/CoReduceInvokable.java | 12 ++
.../flink/streaming/io/CoReaderIterator.java | 6 +-
.../flink/streaming/util/MockCollector.java | 39 -----
.../flink/streaming/util/MockInvokable.java | 104 ------------
.../apache/flink/streaming/util/MockSource.java | 36 ----
.../api/invokable/operator/CoFlatMapTest.java | 59 ++-----
.../invokable/operator/CoGroupReduceTest.java | 122 ++++----------
.../api/invokable/operator/CoMapTest.java | 71 ++------
.../flink/streaming/util/MockCoInvokable.java | 168 +++++++++++++++++++
.../flink/streaming/util/MockCollector.java | 39 +++++
.../flink/streaming/util/MockInvokable.java | 104 ++++++++++++
.../apache/flink/streaming/util/MockSource.java | 36 ++++
15 files changed, 466 insertions(+), 366 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4207c5fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoFlatMapInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoFlatMapInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoFlatMapInvokable.java
index 2d5262d..bde9368 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoFlatMapInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoFlatMapInvokable.java
@@ -55,4 +55,16 @@ public class CoFlatMapInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2, OUT
}
}
+ @Override
+ protected void coUSerFunction1() throws Exception {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ protected void coUserFunction2() throws Exception {
+ // TODO Auto-generated method stub
+
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4207c5fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupReduceInvokable.java
index fb1d5ec..e5f4059 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoGroupReduceInvokable.java
@@ -70,4 +70,16 @@ public class CoGroupReduceInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2,
}
}
+ @Override
+ protected void coUSerFunction1() throws Exception {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ protected void coUserFunction2() throws Exception {
+ // TODO Auto-generated method stub
+
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4207c5fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java
index 85f3ca6..6179d2e 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoMapInvokable.java
@@ -55,4 +55,16 @@ public class CoMapInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2, OUT> {
}
}
+ @Override
+ protected void coUSerFunction1() throws Exception {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ protected void coUserFunction2() throws Exception {
+ // TODO Auto-generated method stub
+
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4207c5fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoReduceInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoReduceInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoReduceInvokable.java
index c9a313f..4a4a280 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoReduceInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/co/CoReduceInvokable.java
@@ -55,4 +55,16 @@ public class CoReduceInvokable<IN1, IN2, OUT> extends CoInvokable<IN1, IN2, OUT>
}
}
+ @Override
+ protected void coUSerFunction1() throws Exception {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ protected void coUserFunction2() throws Exception {
+ // TODO Auto-generated method stub
+
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4207c5fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoReaderIterator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoReaderIterator.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoReaderIterator.java
index 729f75d..a6be8fc 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoReaderIterator.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/io/CoReaderIterator.java
@@ -26,13 +26,13 @@ import org.apache.flink.runtime.plugable.DeserializationDelegate;
* A CoReaderIterator wraps a {@link CoRecordReader} producing records of two
* input types.
*/
-public final class CoReaderIterator<T1, T2> {
+public class CoReaderIterator<T1, T2> {
private final CoRecordReader<DeserializationDelegate<T1>, DeserializationDelegate<T2>> reader; // the
// source
- private final DeserializationDelegate<T1> delegate1;
- private final DeserializationDelegate<T2> delegate2;
+ protected final DeserializationDelegate<T1> delegate1;
+ protected final DeserializationDelegate<T2> delegate2;
public CoReaderIterator(
CoRecordReader<DeserializationDelegate<T1>, DeserializationDelegate<T2>> reader,
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4207c5fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/MockCollector.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/MockCollector.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/MockCollector.java
deleted file mode 100644
index 984fb6d..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/MockCollector.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import java.util.Collection;
-
-import org.apache.flink.util.Collector;
-
-public class MockCollector<T> implements Collector<T> {
- private Collection<T> outputs;
-
- public MockCollector(Collection<T> outputs) {
- this.outputs = outputs;
- }
-
- @Override
- public void collect(T record) {
- outputs.add(record);
- }
-
- @Override
- public void close() {
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4207c5fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/MockInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/MockInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/MockInvokable.java
deleted file mode 100644
index 1ea78e1..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/MockInvokable.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.List;
-
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
-import org.apache.flink.types.TypeInformation;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.MutableObjectIterator;
-
-public class MockInvokable<IN, OUT> {
- private Collection<IN> inputs;
- private List<OUT> outputs;
-
- private Collector<OUT> collector;
- private StreamRecordSerializer<IN> inDeserializer;
- private MutableObjectIterator<StreamRecord<IN>> iterator;
-
- public MockInvokable(Collection<IN> inputs) {
- this.inputs = inputs;
- if (inputs.isEmpty()) {
- throw new RuntimeException("Inputs must not be empty");
- }
-
- TypeInformation<IN> inTypeInfo = TypeExtractor.getForObject(inputs.iterator().next());
- inDeserializer = new StreamRecordSerializer<IN>(inTypeInfo);
-
- iterator = new MockInputIterator();
- outputs = new ArrayList<OUT>();
- collector = new MockCollector<OUT>(outputs);
- }
-
-
- private class MockInputIterator implements MutableObjectIterator<StreamRecord<IN>> {
- Iterator<IN> listIterator;
-
- public MockInputIterator() {
- listIterator = inputs.iterator();
- }
-
- @Override
- public StreamRecord<IN> next(StreamRecord<IN> reuse) throws IOException {
- if (listIterator.hasNext()) {
- reuse.setObject(listIterator.next());
- } else {
- reuse = null;
- }
- return reuse;
- }
- }
-
- public List<OUT> getOutputs() {
- return outputs;
- }
-
- public Collector<OUT> getCollector() {
- return collector;
- }
-
- public StreamRecordSerializer<IN> getInDeserializer() {
- return inDeserializer;
- }
-
- public MutableObjectIterator<StreamRecord<IN>> getIterator() {
- return iterator;
- }
-
- public static <IN, OUT> List<OUT> createAndExecute(UserTaskInvokable<IN, OUT> invokable, List<IN> inputs) {
- MockInvokable<IN, OUT> mock = new MockInvokable<IN, OUT>(inputs);
- invokable.initialize(mock.getCollector(), mock.getIterator(), mock.getInDeserializer(), false);
- try {
- invokable.open(null);
- invokable.invoke();
- } catch (Exception e) {
- throw new RuntimeException("Cannot invoke invokable.", e);
- }
-
- return mock.getOutputs();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4207c5fb/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/MockSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/MockSource.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/MockSource.java
deleted file mode 100644
index 92b9e42..0000000
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/util/MockSource.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.util;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.flink.streaming.api.function.source.SourceFunction;
-
-public class MockSource<T> {
-
- public static <T> List<T> createAndExecute(SourceFunction<T> source) {
- List<T> outputs = new ArrayList<T>();
- try {
- source.invoke(new MockCollector<T>(outputs));
- } catch (Exception e) {
- throw new RuntimeException("Cannot invoke source.", e);
- }
- return outputs;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4207c5fb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java
index e713f88..b31b39c 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoFlatMapTest.java
@@ -17,37 +17,27 @@
package org.apache.flink.streaming.api.invokable.operator;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import java.io.Serializable;
-import java.util.HashSet;
-import java.util.Set;
+import java.util.Arrays;
+import java.util.List;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.co.CoFlatMapFunction;
-import org.apache.flink.streaming.api.function.sink.SinkFunction;
+import org.apache.flink.streaming.api.invokable.operator.co.CoFlatMapInvokable;
import org.apache.flink.streaming.util.LogUtils;
+import org.apache.flink.streaming.util.MockCoInvokable;
import org.apache.flink.util.Collector;
import org.apache.log4j.Level;
-import org.junit.Assert;
import org.junit.Test;
public class CoFlatMapTest implements Serializable {
private static final long serialVersionUID = 1L;
- private static Set<String> result;
- private static Set<String> expected = new HashSet<String>();
-
- private final static class EmptySink implements SinkFunction<String> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void invoke(String tuple) {
- }
- }
-
private final static class MyCoFlatMap implements CoFlatMapFunction<String, Integer, String> {
private static final long serialVersionUID = 1L;
@@ -55,37 +45,32 @@ public class CoFlatMapTest implements Serializable {
public void flatMap1(String value, Collector<String> coll) {
for (int i = 0; i < value.length(); i++) {
coll.collect(value.substring(i, i + 1));
- result.add(value.substring(i, i + 1));
}
}
@Override
public void flatMap2(Integer value, Collector<String> coll) {
coll.collect(value.toString());
- result.add(value.toString());
}
}
+ @Test
+ public void coFlatMapTest() {
+ CoFlatMapInvokable<String, Integer, String> invokable = new CoFlatMapInvokable<String, Integer, String>(
+ new MyCoFlatMap());
+
+ List<String> expectedList = Arrays.asList("a", "b", "c", "1", "d", "e", "f", "2", "g", "h",
+ "e", "3", "4", "5");
+ List<String> actualList = MockCoInvokable.createAndExecute(invokable,
+ Arrays.asList("abc", "def", "ghe"), Arrays.asList(1, 2, 3, 4, 5));
+
+ assertEquals(expectedList, actualList);
+ }
+
@SuppressWarnings("unchecked")
@Test
public void multipleInputTest() {
LogUtils.initializeDefaultConsoleLogger(Level.OFF, Level.OFF);
- expected.add("a");
- expected.add("b");
- expected.add("c");
- expected.add("d");
- expected.add("e");
- expected.add("f");
- expected.add("g");
- expected.add("h");
- expected.add("e");
- expected.add("1");
- expected.add("2");
- expected.add("3");
- expected.add("4");
- expected.add("5");
-
- result = new HashSet<String>();
LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
@@ -98,13 +83,5 @@ public class CoFlatMapTest implements Serializable {
} catch (RuntimeException e) {
// good
}
-
- @SuppressWarnings({ "unused" })
- DataStream<String> ds4 = env.fromElements("abc", "def", "ghe").connect(ds2)
- .flatMap(new MyCoFlatMap()).addSink(new EmptySink());
-
- env.executeTest(32);
-
- Assert.assertEquals(expected, result);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4207c5fb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupReduceTest.java
index 1c91cc0..e1d10b8 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupReduceTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoGroupReduceTest.java
@@ -17,34 +17,20 @@
package org.apache.flink.streaming.api.invokable.operator;
-import java.util.ArrayList;
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
import java.util.List;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.co.CoReduceFunction;
-import org.apache.flink.streaming.api.function.sink.SinkFunction;
-import org.apache.flink.streaming.util.LogUtils;
-import org.apache.log4j.Level;
-import org.junit.Assert;
+import org.apache.flink.streaming.api.invokable.operator.co.CoGroupReduceInvokable;
+import org.apache.flink.streaming.util.MockCoInvokable;
import org.junit.Test;
public class CoGroupReduceTest {
-
- private static List<String> result;
- private static List<String> expected = new ArrayList<String>();
-
- private final static class EmptySink implements SinkFunction<String> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void invoke(String tuple) {
- }
- }
-
+
private final static class MyCoReduceFunction implements
CoReduceFunction<Tuple3<String, String, String>, Tuple2<Integer, Integer>, String> {
private static final long serialVersionUID = 1L;
@@ -63,65 +49,18 @@ public class CoGroupReduceTest {
@Override
public String map1(Tuple3<String, String, String> value) {
- String mapResult = value.f1;
- result.add(mapResult);
- return mapResult;
+ return value.f1;
}
@Override
public String map2(Tuple2<Integer, Integer> value) {
- String mapResult = value.f1.toString();
- result.add(mapResult);
- return mapResult;
+ return value.f1.toString();
}
}
+ @SuppressWarnings("unchecked")
@Test
- public void multipleInputTest() {
- LogUtils.initializeDefaultConsoleLogger(Level.OFF, Level.OFF);
- expected.add("word1word3");
- expected.add("word2");
- expected.add("3");
- expected.add("5");
- expected.add("7");
- Tuple3<String, String, String> word1 = new Tuple3<String, String, String>("a", "word1", "b");
- Tuple3<String, String, String> word2 = new Tuple3<String, String, String>("b", "word2", "a");
- Tuple3<String, String, String> word3 = new Tuple3<String, String, String>("a", "word3", "a");
- Tuple2<Integer, Integer> int1 = new Tuple2<Integer, Integer>(2, 1);
- Tuple2<Integer, Integer> int2 = new Tuple2<Integer, Integer>(1, 2);
- Tuple2<Integer, Integer> int3 = new Tuple2<Integer, Integer>(0, 3);
- Tuple2<Integer, Integer> int4 = new Tuple2<Integer, Integer>(2, 4);
- Tuple2<Integer, Integer> int5 = new Tuple2<Integer, Integer>(1, 5);
-
- result = new ArrayList<String>();
-
- LocalStreamEnvironment env1 = StreamExecutionEnvironment.createLocalEnvironment(1);
-
- @SuppressWarnings("unchecked")
- DataStream<Tuple2<Integer, Integer>> ds1 = env1.fromElements(int1, int3, int5);
- @SuppressWarnings("unchecked")
- DataStream<Tuple2<Integer, Integer>> ds2 = env1.fromElements(int2, int4).merge(ds1);
-
- @SuppressWarnings({ "unused", "unchecked" })
- DataStream<String> ds4 = env1.fromElements(word1, word2, word3).connect(ds2).groupBy(0, 0)
- .reduce(new MyCoReduceFunction()).addSink(new EmptySink());
-
- env1.executeTest(32);
-
- Assert.assertEquals(result.size(), 8);
- Assert.assertTrue(result.containsAll(expected));
- }
-
- @Test
- public void multipleInputTest2() {
- LogUtils.initializeDefaultConsoleLogger(Level.OFF, Level.OFF);
- expected.clear();
- result.clear();
- expected.add("word2word3");
- expected.add("word1");
- expected.add("3");
- expected.add("5");
- expected.add("7");
+ public void coFlatMapTest() {
Tuple3<String, String, String> word1 = new Tuple3<String, String, String>("a", "word1", "b");
Tuple3<String, String, String> word2 = new Tuple3<String, String, String>("b", "word2", "a");
Tuple3<String, String, String> word3 = new Tuple3<String, String, String>("a", "word3", "a");
@@ -130,23 +69,28 @@ public class CoGroupReduceTest {
Tuple2<Integer, Integer> int3 = new Tuple2<Integer, Integer>(0, 3);
Tuple2<Integer, Integer> int4 = new Tuple2<Integer, Integer>(2, 4);
Tuple2<Integer, Integer> int5 = new Tuple2<Integer, Integer>(1, 5);
-
- result = new ArrayList<String>();
-
- LocalStreamEnvironment env2 = StreamExecutionEnvironment.createLocalEnvironment(1);
-
- @SuppressWarnings("unchecked")
- DataStream<Tuple2<Integer, Integer>> ds1 = env2.fromElements(int1, int3, int5);
- @SuppressWarnings("unchecked")
- DataStream<Tuple2<Integer, Integer>> ds2 = env2.fromElements(int2, int4).merge(ds1);
-
- @SuppressWarnings({ "unused", "unchecked" })
- DataStream<String> ds4 = env2.fromElements(word1, word2, word3).connect(ds2).groupBy(2, 0)
- .reduce(new MyCoReduceFunction()).addSink(new EmptySink());
-
- env2.executeTest(32);
-
- Assert.assertEquals(result.size(), 8);
- Assert.assertTrue(result.containsAll(expected));
+
+ CoGroupReduceInvokable<Tuple3<String, String, String>, Tuple2<Integer, Integer>, String> invokable = new CoGroupReduceInvokable<Tuple3<String, String, String>, Tuple2<Integer, Integer>, String>(
+ new MyCoReduceFunction(), 0, 0);
+
+ List<String> expected = Arrays.asList("word1", "1", "word2", "2", "word1word3", "3", "5",
+ "7");
+
+ List<String> actualList = MockCoInvokable.createAndExecute(invokable,
+ Arrays.asList(word1, word2, word3), Arrays.asList(int1, int2, int3, int4, int5));
+
+ assertEquals(expected, actualList);
+
+
+ invokable = new CoGroupReduceInvokable<Tuple3<String, String, String>, Tuple2<Integer, Integer>, String>(
+ new MyCoReduceFunction(), 2, 0);
+
+ expected = Arrays.asList("word1", "1", "word2", "2", "word2word3", "3", "5",
+ "7");
+
+ actualList = MockCoInvokable.createAndExecute(invokable,
+ Arrays.asList(word1, word2, word3), Arrays.asList(int1, int2, int3, int4, int5));
+
+ assertEquals(expected, actualList);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4207c5fb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java
index e24362e..c47d086 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java
@@ -17,78 +17,41 @@
package org.apache.flink.streaming.api.invokable.operator;
+import static org.junit.Assert.assertEquals;
+
import java.io.Serializable;
-import java.util.HashSet;
-import java.util.Set;
+import java.util.Arrays;
+import java.util.List;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.function.co.CoMapFunction;
-import org.apache.flink.streaming.api.function.sink.SinkFunction;
-import org.apache.flink.util.LogUtils;
-
-import org.junit.Assert;
+import org.apache.flink.streaming.api.invokable.operator.co.CoMapInvokable;
+import org.apache.flink.streaming.util.MockCoInvokable;
import org.junit.Test;
public class CoMapTest implements Serializable {
private static final long serialVersionUID = 1L;
- private static Set<String> result;
- private static Set<String> expected = new HashSet<String>();
-
- private final static class EmptySink implements SinkFunction<Boolean> {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void invoke(Boolean tuple) {
- }
- }
-
- private final static class MyCoMap implements CoMapFunction<String, Integer, Boolean> {
+ private final static class MyCoMap implements CoMapFunction<Double, Integer, String> {
private static final long serialVersionUID = 1L;
@Override
- public Boolean map1(String value) {
- result.add(value);
- return true;
+ public String map1(Double value) {
+ return value.toString();
}
@Override
- public Boolean map2(Integer value) {
- result.add(value.toString());
- return false;
+ public String map2(Integer value) {
+ return value.toString();
}
}
@Test
- public void multipleInputTest() {
-
- LogUtils.initializeDefaultTestConsoleLogger();
-
- expected.add("a");
- expected.add("b");
- expected.add("c");
- expected.add("1");
- expected.add("2");
- expected.add("3");
- expected.add("4");
-
- result = new HashSet<String>();
-
- LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
+ public void coMapTest() {
+ CoMapInvokable<Double, Integer, String> invokable = new CoMapInvokable<Double, Integer, String>(new MyCoMap());
- DataStream<Integer> ds1 = env.fromElements(1, 3);
- @SuppressWarnings("unchecked")
- DataStream<Integer> ds2 = env.fromElements(2, 4).merge(ds1);
-
- DataStream<String> ds3 = env.fromElements("a", "b");
-
- @SuppressWarnings({ "unused", "unchecked" })
- DataStream<Boolean> ds4 = env.fromElements("c").merge(ds3).connect(ds2).map(new MyCoMap())
- .addSink(new EmptySink());
-
- env.executeTest(32);
- Assert.assertEquals(expected, result);
+ List<String> expectedList = Arrays.asList("1.1", "1", "1.2", "2", "1.3", "3", "1.4", "1.5");
+ List<String> actualList = MockCoInvokable.createAndExecute(invokable, Arrays.asList(1.1, 1.2, 1.3, 1.4, 1.5), Arrays.asList(1, 2, 3));
+
+ assertEquals(expectedList, actualList);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4207c5fb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoInvokable.java
new file mode 100644
index 0000000..2215fcd
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCoInvokable.java
@@ -0,0 +1,168 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.invokable.operator.co.CoInvokable;
+import org.apache.flink.streaming.api.streamrecord.StreamRecord;
+import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
+import org.apache.flink.streaming.io.CoReaderIterator;
+import org.apache.flink.types.TypeInformation;
+import org.apache.flink.util.Collector;
+
+public class MockCoInvokable<IN1, IN2, OUT> {
+ // private Collection<IN1> input1;
+ // private Collection<IN2> input2;
+ private Iterator<IN1> inputIterator1;
+ private Iterator<IN2> inputIterator2;
+ private List<OUT> outputs;
+
+ private Collector<OUT> collector;
+ private StreamRecordSerializer<IN1> inDeserializer1;
+ private CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> mockIterator;
+ private StreamRecordSerializer<IN2> inDeserializer2;
+
+ public MockCoInvokable(Collection<IN1> input1, Collection<IN2> input2) {
+
+ if (input1.isEmpty() || input2.isEmpty()) {
+ throw new RuntimeException("Inputs must not be empty");
+ }
+
+ this.inputIterator1 = input1.iterator();
+ this.inputIterator2 = input2.iterator();
+
+ TypeInformation<IN1> inTypeInfo1 = TypeExtractor.getForObject(input1.iterator().next());
+ inDeserializer1 = new StreamRecordSerializer<IN1>(inTypeInfo1);
+ TypeInformation<IN2> inTypeInfo2 = TypeExtractor.getForObject(input2.iterator().next());
+ inDeserializer2 = new StreamRecordSerializer<IN2>(inTypeInfo2);
+
+ mockIterator = new MockCoReaderIterator(inDeserializer1, inDeserializer2);
+
+ outputs = new ArrayList<OUT>();
+ collector = new MockCollector<OUT>(outputs);
+ }
+
+ private int currentInput = 1;
+ private StreamRecord<IN1> reuse1;
+ private StreamRecord<IN2> reuse2;
+
+ private class MockCoReaderIterator extends
+ CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> {
+
+ public MockCoReaderIterator(
+ TypeSerializer<StreamRecord<IN1>> serializer1,
+ TypeSerializer<StreamRecord<IN2>> serializer2) {
+ super(null, serializer1, serializer2);
+ reuse1 = inDeserializer1.createInstance();
+ reuse2 = inDeserializer2.createInstance();
+ }
+
+ @Override
+ public int next(StreamRecord<IN1> target1, StreamRecord<IN2> target2) throws IOException {
+ this.delegate1.setInstance(target1);
+ this.delegate2.setInstance(target2);
+
+ int inputNumber = nextRecord();
+ target1.setObject(reuse1.getObject());
+ target2.setObject(reuse2.getObject());
+
+ return inputNumber;
+ }
+ }
+
+ private Integer nextRecord() {
+ if (inputIterator1.hasNext() && inputIterator2.hasNext()) {
+ switch (currentInput) {
+ case 1:
+ return next1();
+ case 2:
+ return next2();
+ default:
+ return 0;
+ }
+ }
+
+ if (inputIterator1.hasNext()) {
+ return next1();
+ }
+
+ if (inputIterator2.hasNext()) {
+ return next2();
+ }
+
+ return 0;
+ }
+
+ private int next1() {
+ reuse1 = inDeserializer1.createInstance();
+ reuse1.setObject(inputIterator1.next());
+ currentInput = 2;
+ return 1;
+ }
+
+ private int next2() {
+ reuse2 = inDeserializer2.createInstance();
+ reuse2.setObject(inputIterator2.next());
+ currentInput = 1;
+ return 2;
+ }
+
+ public List<OUT> getOutputs() {
+ return outputs;
+ }
+
+ public Collector<OUT> getCollector() {
+ return collector;
+ }
+
+ public StreamRecordSerializer<IN1> getInDeserializer1() {
+ return inDeserializer1;
+ }
+
+ public StreamRecordSerializer<IN2> getInDeserializer2() {
+ return inDeserializer2;
+ }
+
+ public CoReaderIterator<StreamRecord<IN1>, StreamRecord<IN2>> getIterator() {
+ return mockIterator;
+ }
+
+ public static <IN1, IN2, OUT> List<OUT> createAndExecute(CoInvokable<IN1, IN2, OUT> invokable,
+ List<IN1> input1, List<IN2> input2) {
+ MockCoInvokable<IN1, IN2, OUT> mock = new MockCoInvokable<IN1, IN2, OUT>(input1, input2);
+ invokable.initialize(mock.getCollector(), mock.getIterator(), mock.getInDeserializer1(),
+ mock.getInDeserializer2(), false);
+
+ try {
+ invokable.open(null);
+ invokable.invoke();
+ } catch (Exception e) {
+ throw new RuntimeException("Cannot invoke invokable.", e);
+ }
+
+ return mock.getOutputs();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4207c5fb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCollector.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCollector.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCollector.java
new file mode 100644
index 0000000..984fb6d
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockCollector.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util;
+
+import java.util.Collection;
+
+import org.apache.flink.util.Collector;
+
+public class MockCollector<T> implements Collector<T> {
+ private Collection<T> outputs;
+
+ public MockCollector(Collection<T> outputs) {
+ this.outputs = outputs;
+ }
+
+ @Override
+ public void collect(T record) {
+ outputs.add(record);
+ }
+
+ @Override
+ public void close() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4207c5fb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockInvokable.java
new file mode 100644
index 0000000..1ea78e1
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockInvokable.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.invokable.UserTaskInvokable;
+import org.apache.flink.streaming.api.streamrecord.StreamRecord;
+import org.apache.flink.streaming.api.streamrecord.StreamRecordSerializer;
+import org.apache.flink.types.TypeInformation;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.MutableObjectIterator;
+
+public class MockInvokable<IN, OUT> {
+ private Collection<IN> inputs;
+ private List<OUT> outputs;
+
+ private Collector<OUT> collector;
+ private StreamRecordSerializer<IN> inDeserializer;
+ private MutableObjectIterator<StreamRecord<IN>> iterator;
+
+ public MockInvokable(Collection<IN> inputs) {
+ this.inputs = inputs;
+ if (inputs.isEmpty()) {
+ throw new RuntimeException("Inputs must not be empty");
+ }
+
+ TypeInformation<IN> inTypeInfo = TypeExtractor.getForObject(inputs.iterator().next());
+ inDeserializer = new StreamRecordSerializer<IN>(inTypeInfo);
+
+ iterator = new MockInputIterator();
+ outputs = new ArrayList<OUT>();
+ collector = new MockCollector<OUT>(outputs);
+ }
+
+
+ private class MockInputIterator implements MutableObjectIterator<StreamRecord<IN>> {
+ Iterator<IN> listIterator;
+
+ public MockInputIterator() {
+ listIterator = inputs.iterator();
+ }
+
+ @Override
+ public StreamRecord<IN> next(StreamRecord<IN> reuse) throws IOException {
+ if (listIterator.hasNext()) {
+ reuse.setObject(listIterator.next());
+ } else {
+ reuse = null;
+ }
+ return reuse;
+ }
+ }
+
+ public List<OUT> getOutputs() {
+ return outputs;
+ }
+
+ public Collector<OUT> getCollector() {
+ return collector;
+ }
+
+ public StreamRecordSerializer<IN> getInDeserializer() {
+ return inDeserializer;
+ }
+
+ public MutableObjectIterator<StreamRecord<IN>> getIterator() {
+ return iterator;
+ }
+
+ public static <IN, OUT> List<OUT> createAndExecute(UserTaskInvokable<IN, OUT> invokable, List<IN> inputs) {
+ MockInvokable<IN, OUT> mock = new MockInvokable<IN, OUT>(inputs);
+ invokable.initialize(mock.getCollector(), mock.getIterator(), mock.getInDeserializer(), false);
+ try {
+ invokable.open(null);
+ invokable.invoke();
+ } catch (Exception e) {
+ throw new RuntimeException("Cannot invoke invokable.", e);
+ }
+
+ return mock.getOutputs();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/4207c5fb/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockSource.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockSource.java
new file mode 100644
index 0000000..92b9e42
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/MockSource.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.util;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.streaming.api.function.source.SourceFunction;
+
+public class MockSource<T> {
+
+ public static <T> List<T> createAndExecute(SourceFunction<T> source) {
+ List<T> outputs = new ArrayList<T>();
+ try {
+ source.invoke(new MockCollector<T>(outputs));
+ } catch (Exception e) {
+ throw new RuntimeException("Cannot invoke source.", e);
+ }
+ return outputs;
+ }
+}