You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/06/06 13:58:19 UTC
[2/6] flink git commit: [FLINK-2136] [streaming] Added parallelism
tests to DataStream
[FLINK-2136] [streaming] Added parallelism tests to DataStream
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/03ae80d9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/03ae80d9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/03ae80d9
Branch: refs/heads/master
Commit: 03ae80d972184983b80d8c998c90b902f3bb75ae
Parents: 8958603
Author: Gábor Hermann <re...@gmail.com>
Authored: Thu Jun 4 15:26:10 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Sat Jun 6 13:56:54 2015 +0200
----------------------------------------------------------------------
.../operators/AbstractUdfStreamOperator.java | 4 +
.../flink/streaming/api/DataStreamTest.java | 164 +++++++++++++++++--
.../api/StreamExecutionEnvironmentTest.java | 99 +++++++++--
.../api/scala/WindowedDataStream.scala | 9 +-
.../streaming/api/scala/DataStreamTest.scala | 140 ++++++++++++++--
5 files changed, 370 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/03ae80d9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
index 852bfde..90b2b2f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
@@ -91,6 +91,10 @@ public abstract class AbstractUdfStreamOperator<OUT, F extends Function & Serial
}
}
+ public F getUserFunction() {
+ return userFunction;
+ }
+
private static <T extends Serializable> void setStateOnFunction(Serializable state, Function function) {
@SuppressWarnings("unchecked")
T typedState = (T) state;
http://git-wip-us.apache.org/repos/asf/flink/blob/03ae80d9/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
index 591aa27..c245f76 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
@@ -17,18 +17,29 @@
package org.apache.flink.streaming.api;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.datastream.ConnectedDataStream;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.GroupedDataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.datastream.WindowedDataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.WindowMapFunction;
import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.windowing.helper.Count;
@@ -37,10 +48,6 @@ import org.apache.flink.streaming.util.TestStreamEnvironment;
import org.apache.flink.util.Collector;
import org.junit.Test;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
public class DataStreamTest {
private static final long MEMORYSIZE = 32;
@@ -48,6 +55,7 @@ public class DataStreamTest {
/**
* Tests {@link SingleOutputStreamOperator#name(String)} functionality.
+ *
* @throws Exception
*/
@Test
@@ -106,7 +114,7 @@ public class DataStreamTest {
*/
@Test
@SuppressWarnings("unchecked")
- public void testPartitioning(){
+ public void testPartitioning() {
StreamExecutionEnvironment env = new TestStreamEnvironment(PARALLELISM, MEMORYSIZE);
StreamGraph graph = env.getStreamGraph();
@@ -116,7 +124,7 @@ public class DataStreamTest {
//Testing DataStream grouping
DataStream group1 = src1.groupBy(0);
- DataStream group2 = src1.groupBy(1,0);
+ DataStream group2 = src1.groupBy(1, 0);
DataStream group3 = src1.groupBy("f0");
DataStream group4 = src1.groupBy(new FirstSelector());
@@ -147,10 +155,10 @@ public class DataStreamTest {
assertFalse(isGrouped(partition4));
//Testing ConnectedDataStream grouping
- ConnectedDataStream connectedGroup1 = connected.groupBy(0,0);
+ ConnectedDataStream connectedGroup1 = connected.groupBy(0, 0);
Integer downStreamId1 = createDownStreamId(connectedGroup1);
- ConnectedDataStream connectedGroup2 = connected.groupBy(new int[]{0},new int[]{0});
+ ConnectedDataStream connectedGroup2 = connected.groupBy(new int[]{0}, new int[]{0});
Integer downStreamId2 = createDownStreamId(connectedGroup2);
ConnectedDataStream connectedGroup3 = connected.groupBy("f0", "f0");
@@ -187,7 +195,7 @@ public class DataStreamTest {
ConnectedDataStream connectedPartition1 = connected.partitionByHash(0, 0);
Integer connectDownStreamId1 = createDownStreamId(connectedPartition1);
- ConnectedDataStream connectedPartition2 = connected.partitionByHash(new int[]{0},new int[]{0});
+ ConnectedDataStream connectedPartition2 = connected.partitionByHash(new int[]{0}, new int[]{0});
Integer connectDownStreamId2 = createDownStreamId(connectedPartition2);
ConnectedDataStream connectedPartition3 = connected.partitionByHash("f0", "f0");
@@ -221,19 +229,119 @@ public class DataStreamTest {
assertFalse(isGrouped(connectedPartition5));
}
+ /**
+ * Tests whether parallelism gets set.
+ */
+ @Test
+ public void testParallelism() {
+ StreamExecutionEnvironment env = new TestStreamEnvironment(10, MEMORYSIZE);
+ StreamGraph graph = env.getStreamGraph();
+
+ DataStreamSource<Tuple2<Long, Long>> src = env.fromElements(new Tuple2<Long, Long>(0L, 0L));
+
+ SingleOutputStreamOperator<Long, ?> map = src.map(new MapFunction<Tuple2<Long, Long>, Long>() {
+ @Override
+ public Long map(Tuple2<Long, Long> value) throws Exception {
+ return null;
+ }
+ });
+
+ DataStream<Long> windowed = map
+ .window(Count.of(10))
+ .foldWindow(0L, new FoldFunction<Long, Long>() {
+ @Override
+ public Long fold(Long accumulator, Long value) throws Exception {
+ return null;
+ }
+ }).flatten();
+
+ DataStreamSink<Long> sink = map.addSink(new SinkFunction<Long>() {
+ @Override
+ public void invoke(Long value) throws Exception {
+ }
+ });
+
+ assertEquals(1, graph.getStreamNode(src.getId()).getParallelism());
+ assertEquals(10, graph.getStreamNode(map.getId()).getParallelism());
+ assertEquals(10, graph.getStreamNode(windowed.getId()).getParallelism());
+ assertEquals(10, graph.getStreamNode(sink.getId()).getParallelism());
+
+ env.setParallelism(7);
+ assertEquals(1, graph.getStreamNode(src.getId()).getParallelism());
+ assertEquals(7, graph.getStreamNode(map.getId()).getParallelism());
+ assertEquals(7, graph.getStreamNode(windowed.getId()).getParallelism());
+ assertEquals(7, graph.getStreamNode(sink.getId()).getParallelism());
+
+ try {
+ src.setParallelism(3);
+ fail();
+ } catch (IllegalArgumentException success) {
+ }
+
+ DataStreamSource<Long> parallelSource = env.generateParallelSequence(0, 0);
+ assertEquals(7, graph.getStreamNode(parallelSource.getId()).getParallelism());
+
+ parallelSource.setParallelism(3);
+ assertEquals(3, graph.getStreamNode(parallelSource.getId()).getParallelism());
+
+ map.setParallelism(2);
+ assertEquals(2, graph.getStreamNode(map.getId()).getParallelism());
+
+ sink.setParallelism(4);
+ assertEquals(4, graph.getStreamNode(sink.getId()).getParallelism());
+ }
+
+ @Test
+ public void testTypeInfo() {
+ StreamExecutionEnvironment env = new TestStreamEnvironment(PARALLELISM, MEMORYSIZE);
+
+ DataStream<Long> src1 = env.generateSequence(0, 0);
+ assertEquals(TypeExtractor.getForClass(Long.class), src1.getType());
+
+ DataStream<Tuple2<Integer, String>> map = src1.map(new MapFunction<Long, Tuple2<Integer, String>>() {
+ @Override
+ public Tuple2<Integer, String> map(Long value) throws Exception {
+ return null;
+ }
+ });
+
+ assertEquals(TypeExtractor.getForObject(new Tuple2<Integer, String>(0, "")), map.getType());
+
+ WindowedDataStream<String> window = map
+ .window(Count.of(5))
+ .mapWindow(new WindowMapFunction<Tuple2<Integer, String>, String>() {
+ @Override
+ public void mapWindow(Iterable<Tuple2<Integer, String>> values, Collector<String> out) throws Exception {
+ }
+ });
+
+ assertEquals(TypeExtractor.getForClass(String.class), window.getType());
+
+ DataStream<CustomPOJO> flatten = window
+ .foldWindow(new CustomPOJO(), new FoldFunction<String, CustomPOJO>() {
+ @Override
+ public CustomPOJO fold(CustomPOJO accumulator, String value) throws Exception {
+ return null;
+ }
+ })
+ .flatten();
+
+ assertEquals(TypeExtractor.getForClass(CustomPOJO.class), flatten.getType());
+ }
+
/////////////////////////////////////////////////////////////
// Utilities
/////////////////////////////////////////////////////////////
- private static Integer createDownStreamId(DataStream dataStream){
+ private static Integer createDownStreamId(DataStream dataStream) {
return dataStream.print().getId();
}
- private static boolean isGrouped(DataStream dataStream){
+ private static boolean isGrouped(DataStream dataStream) {
return dataStream instanceof GroupedDataStream;
}
- private static Integer createDownStreamId(ConnectedDataStream dataStream){
+ private static Integer createDownStreamId(ConnectedDataStream dataStream) {
return dataStream.map(new CoMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Object>() {
@Override
public Object map1(Tuple2<Long, Long> value) {
@@ -247,18 +355,42 @@ public class DataStreamTest {
}).getId();
}
- private static boolean isGrouped(ConnectedDataStream dataStream){
+ private static boolean isGrouped(ConnectedDataStream dataStream) {
return (dataStream.getFirst() instanceof GroupedDataStream && dataStream.getSecond() instanceof GroupedDataStream);
}
- private static boolean isPartitioned(StreamEdge edge){
+ private static boolean isPartitioned(StreamEdge edge) {
return edge.getPartitioner() instanceof FieldsPartitioner;
}
- private static class FirstSelector implements KeySelector<Tuple2<Long, Long>, Long>{
+ private static class FirstSelector implements KeySelector<Tuple2<Long, Long>, Long> {
@Override
public Long getKey(Tuple2<Long, Long> value) throws Exception {
return value.f0;
}
}
+
+ public static class CustomPOJO {
+ private String s;
+ private int i;
+
+ public CustomPOJO() {
+ }
+
+ public void setS(String s) {
+ this.s = s;
+ }
+
+ public void setI(int i) {
+ this.i = i;
+ }
+
+ public String getS() {
+ return s;
+ }
+
+ public int getI() {
+ return i;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/03ae80d9/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
index 5ca4883..2f5f30d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
@@ -17,24 +17,31 @@
package org.apache.flink.streaming.api;
-import org.apache.flink.api.common.functions.FoldFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.ReduceFunction;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
-import org.apache.flink.streaming.api.windowing.helper.Count;
+import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
+import org.apache.flink.streaming.api.functions.source.FromIteratorFunction;
+import org.apache.flink.streaming.api.functions.source.FromSplittableIteratorFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.util.TestStreamEnvironment;
-import org.apache.flink.util.Collector;
import org.apache.flink.util.SplittableIterator;
import org.junit.Test;
-import java.util.Iterator;
-
-import static org.junit.Assert.assertTrue;
-
public class StreamExecutionEnvironmentTest {
private static final long MEMORYSIZE = 32;
@@ -72,12 +79,12 @@ public class StreamExecutionEnvironmentTest {
boolean seenExpectedException = false;
try {
- DataStream<Long> dataStream1 = env.generateSequence(0,0).setParallelism(4);
+ DataStream<Long> dataStream1 = env.generateSequence(0, 0).setParallelism(4);
} catch (IllegalArgumentException e) {
seenExpectedException = true;
}
- DataStream<Long> dataStream2 = env.generateParallelSequence(0,0).setParallelism(4);
+ DataStream<Long> dataStream2 = env.generateParallelSequence(0, 0).setParallelism(4);
String plan = env.getExecutionPlan();
@@ -88,6 +95,74 @@ public class StreamExecutionEnvironmentTest {
plan.contains("\"contents\":\"Parallel Sequence Source\",\"parallelism\":4"));
}
+ @Test
+ public void testSources() {
+ StreamExecutionEnvironment env = new TestStreamEnvironment(PARALLELISM, MEMORYSIZE);
+
+ SourceFunction<Integer> srcFun = new SourceFunction<Integer>() {
+ @Override
+ public boolean reachedEnd() throws Exception {
+ return false;
+ }
+
+ @Override
+ public Integer next() throws Exception {
+ return null;
+ }
+ };
+ DataStreamSource<Integer> src1 = env.addSource(srcFun);
+ assertEquals(srcFun, getFunctionForDataSource(src1));
+
+ List<Long> list = Arrays.asList(0L, 1L, 2L);
+
+ DataStreamSource<Long> src2 = env.generateSequence(0, 2);
+ assertTrue(getFunctionForDataSource(src2) instanceof FromIteratorFunction);
+ checkIfSameElements(list, getFunctionForDataSource(src2));
+
+ DataStreamSource<Long> src3 = env.fromElements(0L, 1L, 2L);
+ assertTrue(getFunctionForDataSource(src3) instanceof FromElementsFunction);
+
+ DataStreamSource<Long> src4 = env.fromCollection(list);
+ assertTrue(getFunctionForDataSource(src4) instanceof FromElementsFunction);
+
+ DataStreamSource<Long> src5 = env.generateParallelSequence(0, 2);
+ assertTrue(getFunctionForDataSource(src5) instanceof FromSplittableIteratorFunction);
+ }
+
+ /////////////////////////////////////////////////////////////
+ // Utilities
+ /////////////////////////////////////////////////////////////
+
+ private static <T> void checkIfSameElements(Collection<T> collection, SourceFunction<T> sourceFunction) {
+ for (T elem : collection) {
+ try {
+ assertEquals(elem, sourceFunction.next());
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ try {
+ assertTrue(sourceFunction.reachedEnd());
+ } catch (Exception e) {
+ e.printStackTrace();
+ fail(e.getMessage());
+ }
+ }
+
+ private static StreamOperator<?> getOperatorForDataStream(DataStream<?> dataStream) {
+ StreamExecutionEnvironment env = dataStream.getExecutionEnvironment();
+ StreamGraph streamGraph = env.getStreamGraph();
+ return streamGraph.getStreamNode(dataStream.getId()).getOperator();
+ }
+
+ private static <T> SourceFunction<T> getFunctionForDataSource(DataStreamSource<T> dataStreamSource) {
+ AbstractUdfStreamOperator<?, ?> operator =
+ (AbstractUdfStreamOperator<?, ?>) getOperatorForDataStream(dataStreamSource);
+ return (SourceFunction<T>) operator.getUserFunction();
+ }
+
public static class DummySplittableIterator extends SplittableIterator {
private static final long serialVersionUID = 1312752876092210499L;
http://git-wip-us.apache.org/repos/asf/flink/blob/03ae80d9/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala
index a3a416a..3173b8d 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala
@@ -298,7 +298,7 @@ class WindowedDataStream[T](javaStream: JavaWStream[T]) {
private def aggregate(aggregationType: AggregationType, field: String):
WindowedDataStream[T] = {
- val position = fieldNames2Indices(javaStream.getType(), Array(field))(0)
+ val position = fieldNames2Indices(getType(), Array(field))(0)
aggregate(aggregationType, position)
}
@@ -322,4 +322,11 @@ class WindowedDataStream[T](javaStream: JavaWStream[T]) {
jStream.reduceWindow(reducer)).asInstanceOf[WindowedDataStream[T]]
}
+ /**
+ * Gets the output type.
+ *
+ * @return The output type.
+ */
+ def getType(): TypeInformation[T] = javaStream.getType
+
}
http://git-wip-us.apache.org/repos/asf/flink/blob/03ae80d9/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
index 235adad..2f80967 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
@@ -18,19 +18,21 @@
package org.apache.flink.streaming.api.scala
+import org.apache.flink.api.java.typeutils.TypeExtractor
import org.apache.flink.streaming.api.graph.{StreamEdge, StreamGraph}
import org.apache.flink.streaming.api.windowing.helper.Count
import org.apache.flink.streaming.runtime.partitioner.FieldsPartitioner
import org.apache.flink.util.Collector
+import org.junit.Assert._
import org.junit.Test
class DataStreamTest {
- private val parallelism = 2;
+ private val parallelism = 2
@Test
def testNaming(): Unit = {
- val env = StreamExecutionEnvironment.createLocalEnvironment(parallelism);
+ val env = StreamExecutionEnvironment.createLocalEnvironment(parallelism)
val source1 = env.generateSequence(0, 0).name("testSource1")
assert("testSource1" == source1.getName)
@@ -106,7 +108,8 @@ class DataStreamTest {
val connectedGroup3: ConnectedDataStream[_, _] = connected.groupBy("_1", "_1")
val downStreamId3: Integer = createDownStreamId(connectedGroup3)
- val connectedGroup4: ConnectedDataStream[_, _] = connected.groupBy(Array[String]("_1"), Array[String]("_1"))
+ val connectedGroup4: ConnectedDataStream[_, _] =
+ connected.groupBy(Array[String]("_1"), Array[String]("_1"))
val downStreamId4: Integer = createDownStreamId(connectedGroup4)
val connectedGroup5: ConnectedDataStream[_, _] = connected.groupBy(x => x._1, x => x._1)
@@ -131,34 +134,137 @@ class DataStreamTest {
val connectedPartition1: ConnectedDataStream[_, _] = connected.partitionByHash(0, 0)
val connectDownStreamId1: Integer = createDownStreamId(connectedPartition1)
- val connectedPartition2: ConnectedDataStream[_, _] = connected.partitionByHash(Array[Int](0), Array[Int](0))
+ val connectedPartition2: ConnectedDataStream[_, _] =
+ connected.partitionByHash(Array[Int](0), Array[Int](0))
val connectDownStreamId2: Integer = createDownStreamId(connectedPartition2)
val connectedPartition3: ConnectedDataStream[_, _] = connected.partitionByHash("_1", "_1")
val connectDownStreamId3: Integer = createDownStreamId(connectedPartition3)
- val connectedPartition4: ConnectedDataStream[_, _] = connected.partitionByHash(Array[String]("_1"), Array[String]("_1"))
+ val connectedPartition4: ConnectedDataStream[_, _] =
+ connected.partitionByHash(Array[String]("_1"), Array[String]("_1"))
val connectDownStreamId4: Integer = createDownStreamId(connectedPartition4)
- val connectedPartition5: ConnectedDataStream[_, _] = connected.partitionByHash(x => x._1, x => x._1)
+ val connectedPartition5: ConnectedDataStream[_, _] =
+ connected.partitionByHash(x => x._1, x => x._1)
val connectDownStreamId5: Integer = createDownStreamId(connectedPartition5)
- assert(isPartitioned(graph.getStreamEdge(connectedPartition1.getFirst.getId, connectDownStreamId1)))
- assert(isPartitioned(graph.getStreamEdge(connectedPartition1.getSecond.getId, connectDownStreamId1)))
+ assert(
+ isPartitioned(graph.getStreamEdge(connectedPartition1.getFirst.getId, connectDownStreamId1))
+ )
+ assert(
+ isPartitioned(graph.getStreamEdge(connectedPartition1.getSecond.getId, connectDownStreamId1))
+ )
+
+ assert(
+ isPartitioned(graph.getStreamEdge(connectedPartition2.getFirst.getId, connectDownStreamId2))
+ )
+ assert(
+ isPartitioned(graph.getStreamEdge(connectedPartition2.getSecond.getId, connectDownStreamId2))
+ )
+
+ assert(
+ isPartitioned(graph.getStreamEdge(connectedPartition3.getFirst.getId, connectDownStreamId3))
+ )
+ assert(
+ isPartitioned(graph.getStreamEdge(connectedPartition3.getSecond.getId, connectDownStreamId3))
+ )
+
+ assert(
+ isPartitioned(graph.getStreamEdge(connectedPartition4.getFirst.getId, connectDownStreamId4))
+ )
+ assert(
+ isPartitioned(graph.getStreamEdge(connectedPartition4.getSecond.getId, connectDownStreamId4))
+ )
+
+ assert(
+ isPartitioned(graph.getStreamEdge(connectedPartition5.getFirst.getId, connectDownStreamId5))
+ )
+ assert(
+ isPartitioned(graph.getStreamEdge(connectedPartition5.getSecond.getId, connectDownStreamId5))
+ )
+ }
- assert(isPartitioned(graph.getStreamEdge(connectedPartition2.getFirst.getId, connectDownStreamId2)))
- assert(isPartitioned(graph.getStreamEdge(connectedPartition2.getSecond.getId, connectDownStreamId2)))
+ /**
+ * Tests whether parallelism gets set.
+ */
+ @Test
+ def testParallelism {
+ val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment(10)
+
+ val graph: StreamGraph = env.getStreamGraph
+
+ val src = env.fromElements(new Tuple2[Long, Long](0L, 0L))
+ val map = src.map(x => 0L)
+ val windowed: DataStream[Long] = map
+ .window(Count.of(10))
+ .foldWindow(0L, (x: Long, y: Long) => 0L)
+ .flatten
+ val sink = map.addSink(x => {})
+
+ assert(1 == graph.getStreamNode(src.getId).getParallelism)
+ assert(10 == graph.getStreamNode(map.getId).getParallelism)
+ assert(10 == graph.getStreamNode(windowed.getId).getParallelism)
+ assert(10 == graph.getStreamNode(sink.getId).getParallelism)
+
+ try {
+ src.setParallelism(3)
+ fail
+ }
+ catch {
+ case success: IllegalArgumentException => {
+ }
+ }
+
+ env.setParallelism(7)
+ assert(1 == graph.getStreamNode(src.getId).getParallelism)
+ assert(7 == graph.getStreamNode(map.getId).getParallelism)
+ assert(7 == graph.getStreamNode(windowed.getId).getParallelism)
+ assert(7 == graph.getStreamNode(sink.getId).getParallelism)
+
+ val parallelSource = env.generateParallelSequence(0, 0)
+
+ assert(7 == graph.getStreamNode(parallelSource.getId).getParallelism)
+
+ parallelSource.setParallelism(3)
+ assert(3 == graph.getStreamNode(parallelSource.getId).getParallelism)
+
+ map.setParallelism(2)
+ assert(2 == graph.getStreamNode(map.getId).getParallelism)
+
+ sink.setParallelism(4)
+ assert(4 == graph.getStreamNode(sink.getId).getParallelism)
+ }
- assert(isPartitioned(graph.getStreamEdge(connectedPartition3.getFirst.getId, connectDownStreamId3)))
- assert(isPartitioned(graph.getStreamEdge(connectedPartition3.getSecond.getId, connectDownStreamId3)))
+ @Test
+ def testTypeInfo {
+ val env: StreamExecutionEnvironment = StreamExecutionEnvironment
+ .createLocalEnvironment(parallelism)
+
+ val src1: DataStream[Long] = env.generateSequence(0, 0)
+ assertEquals(TypeExtractor.getForClass(classOf[Long]), src1.getType)
+
+ val map: DataStream[(Integer, String)] = src1.map(x => null)
+ assertEquals(classOf[scala.Tuple2[Integer, String]], map.getType.getTypeClass)
- assert(isPartitioned(graph.getStreamEdge(connectedPartition4.getFirst.getId, connectDownStreamId4)))
- assert(isPartitioned(graph.getStreamEdge(connectedPartition4.getSecond.getId, connectDownStreamId4)))
+ val window: WindowedDataStream[String] = map
+ .window(Count.of(5))
+ .mapWindow((x: Iterable[(Integer, String)], y: Collector[String]) => {})
+ assertEquals(TypeExtractor.getForClass(classOf[String]), window.getType)
- assert(isPartitioned(graph.getStreamEdge(connectedPartition5.getFirst.getId, connectDownStreamId5)))
- assert(isPartitioned(graph.getStreamEdge(connectedPartition5.getSecond.getId, connectDownStreamId5)))
+ val flatten: DataStream[Int] = window
+ .foldWindow(0,
+ (accumulator: Int, value: String) => 0
+ ).flatten
+ assertEquals(TypeExtractor.getForClass(classOf[Int]), flatten.getType)
+
+ // TODO check for custom case class
}
+ /////////////////////////////////////////////////////////////
+ // Utilities
+ /////////////////////////////////////////////////////////////
+
private def isPartitioned(edge: StreamEdge): Boolean = {
return edge.getPartitioner.isInstanceOf[FieldsPartitioner[_]]
}
@@ -168,7 +274,7 @@ class DataStreamTest {
}
private def createDownStreamId(dataStream: ConnectedDataStream[_, _]): Integer = {
- return dataStream.map(x => 0, x => 0).getId
+ return dataStream.map(x => 0, x => 0).getId
}
}