You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/06/06 13:58:19 UTC

[2/6] flink git commit: [FLINK-2136] [streaming] Added parallelism tests to DataStream

[FLINK-2136] [streaming] Added parallelism tests to DataStream


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/03ae80d9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/03ae80d9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/03ae80d9

Branch: refs/heads/master
Commit: 03ae80d972184983b80d8c998c90b902f3bb75ae
Parents: 8958603
Author: Gábor Hermann <re...@gmail.com>
Authored: Thu Jun 4 15:26:10 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Sat Jun 6 13:56:54 2015 +0200

----------------------------------------------------------------------
 .../operators/AbstractUdfStreamOperator.java    |   4 +
 .../flink/streaming/api/DataStreamTest.java     | 164 +++++++++++++++++--
 .../api/StreamExecutionEnvironmentTest.java     |  99 +++++++++--
 .../api/scala/WindowedDataStream.scala          |   9 +-
 .../streaming/api/scala/DataStreamTest.scala    | 140 ++++++++++++++--
 5 files changed, 370 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/03ae80d9/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
index 852bfde..90b2b2f 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java
@@ -91,6 +91,10 @@ public abstract class AbstractUdfStreamOperator<OUT, F extends Function & Serial
 		}
 	}
 
+	public F getUserFunction() {
+		return userFunction;
+	}
+
 	private static <T extends Serializable> void setStateOnFunction(Serializable state, Function function) {
 		@SuppressWarnings("unchecked")
 		T typedState = (T) state;

http://git-wip-us.apache.org/repos/asf/flink/blob/03ae80d9/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
index 591aa27..c245f76 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/DataStreamTest.java
@@ -17,18 +17,29 @@
 
 package org.apache.flink.streaming.api;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
 import org.apache.flink.api.common.functions.FoldFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.datastream.ConnectedDataStream;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.datastream.GroupedDataStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.datastream.WindowedDataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.WindowMapFunction;
 import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
 import org.apache.flink.streaming.api.functions.co.CoMapFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 import org.apache.flink.streaming.api.graph.StreamEdge;
 import org.apache.flink.streaming.api.graph.StreamGraph;
 import org.apache.flink.streaming.api.windowing.helper.Count;
@@ -37,10 +48,6 @@ import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.util.Collector;
 import org.junit.Test;
 
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
 public class DataStreamTest {
 
 	private static final long MEMORYSIZE = 32;
@@ -48,6 +55,7 @@ public class DataStreamTest {
 
 	/**
 	 * Tests {@link SingleOutputStreamOperator#name(String)} functionality.
+	 *
 	 * @throws Exception
 	 */
 	@Test
@@ -106,7 +114,7 @@ public class DataStreamTest {
 	 */
 	@Test
 	@SuppressWarnings("unchecked")
-	public void testPartitioning(){
+	public void testPartitioning() {
 		StreamExecutionEnvironment env = new TestStreamEnvironment(PARALLELISM, MEMORYSIZE);
 		StreamGraph graph = env.getStreamGraph();
 
@@ -116,7 +124,7 @@ public class DataStreamTest {
 
 		//Testing DataStream grouping
 		DataStream group1 = src1.groupBy(0);
-		DataStream group2 = src1.groupBy(1,0);
+		DataStream group2 = src1.groupBy(1, 0);
 		DataStream group3 = src1.groupBy("f0");
 		DataStream group4 = src1.groupBy(new FirstSelector());
 
@@ -147,10 +155,10 @@ public class DataStreamTest {
 		assertFalse(isGrouped(partition4));
 
 		//Testing ConnectedDataStream grouping
-		ConnectedDataStream connectedGroup1 = connected.groupBy(0,0);
+		ConnectedDataStream connectedGroup1 = connected.groupBy(0, 0);
 		Integer downStreamId1 = createDownStreamId(connectedGroup1);
 
-		ConnectedDataStream connectedGroup2 = connected.groupBy(new int[]{0},new int[]{0});
+		ConnectedDataStream connectedGroup2 = connected.groupBy(new int[]{0}, new int[]{0});
 		Integer downStreamId2 = createDownStreamId(connectedGroup2);
 
 		ConnectedDataStream connectedGroup3 = connected.groupBy("f0", "f0");
@@ -187,7 +195,7 @@ public class DataStreamTest {
 		ConnectedDataStream connectedPartition1 = connected.partitionByHash(0, 0);
 		Integer connectDownStreamId1 = createDownStreamId(connectedPartition1);
 
-		ConnectedDataStream connectedPartition2 = connected.partitionByHash(new int[]{0},new int[]{0});
+		ConnectedDataStream connectedPartition2 = connected.partitionByHash(new int[]{0}, new int[]{0});
 		Integer connectDownStreamId2 = createDownStreamId(connectedPartition2);
 
 		ConnectedDataStream connectedPartition3 = connected.partitionByHash("f0", "f0");
@@ -221,19 +229,119 @@ public class DataStreamTest {
 		assertFalse(isGrouped(connectedPartition5));
 	}
 
+	/**
+	 * Tests whether parallelism gets set.
+	 */
+	@Test
+	public void testParallelism() {
+		StreamExecutionEnvironment env = new TestStreamEnvironment(10, MEMORYSIZE);
+		StreamGraph graph = env.getStreamGraph();
+
+		DataStreamSource<Tuple2<Long, Long>> src = env.fromElements(new Tuple2<Long, Long>(0L, 0L));
+
+		SingleOutputStreamOperator<Long, ?> map = src.map(new MapFunction<Tuple2<Long, Long>, Long>() {
+			@Override
+			public Long map(Tuple2<Long, Long> value) throws Exception {
+				return null;
+			}
+		});
+
+		DataStream<Long> windowed = map
+				.window(Count.of(10))
+				.foldWindow(0L, new FoldFunction<Long, Long>() {
+					@Override
+					public Long fold(Long accumulator, Long value) throws Exception {
+						return null;
+					}
+				}).flatten();
+
+		DataStreamSink<Long> sink = map.addSink(new SinkFunction<Long>() {
+			@Override
+			public void invoke(Long value) throws Exception {
+			}
+		});
+
+		assertEquals(1, graph.getStreamNode(src.getId()).getParallelism());
+		assertEquals(10, graph.getStreamNode(map.getId()).getParallelism());
+		assertEquals(10, graph.getStreamNode(windowed.getId()).getParallelism());
+		assertEquals(10, graph.getStreamNode(sink.getId()).getParallelism());
+
+		env.setParallelism(7);
+		assertEquals(1, graph.getStreamNode(src.getId()).getParallelism());
+		assertEquals(7, graph.getStreamNode(map.getId()).getParallelism());
+		assertEquals(7, graph.getStreamNode(windowed.getId()).getParallelism());
+		assertEquals(7, graph.getStreamNode(sink.getId()).getParallelism());
+
+		try {
+			src.setParallelism(3);
+			fail();
+		} catch (IllegalArgumentException success) {
+		}
+
+		DataStreamSource<Long> parallelSource = env.generateParallelSequence(0, 0);
+		assertEquals(7, graph.getStreamNode(parallelSource.getId()).getParallelism());
+
+		parallelSource.setParallelism(3);
+		assertEquals(3, graph.getStreamNode(parallelSource.getId()).getParallelism());
+
+		map.setParallelism(2);
+		assertEquals(2, graph.getStreamNode(map.getId()).getParallelism());
+
+		sink.setParallelism(4);
+		assertEquals(4, graph.getStreamNode(sink.getId()).getParallelism());
+	}
+
+	@Test
+	public void testTypeInfo() {
+		StreamExecutionEnvironment env = new TestStreamEnvironment(PARALLELISM, MEMORYSIZE);
+
+		DataStream<Long> src1 = env.generateSequence(0, 0);
+		assertEquals(TypeExtractor.getForClass(Long.class), src1.getType());
+
+		DataStream<Tuple2<Integer, String>> map = src1.map(new MapFunction<Long, Tuple2<Integer, String>>() {
+			@Override
+			public Tuple2<Integer, String> map(Long value) throws Exception {
+				return null;
+			}
+		});
+
+		assertEquals(TypeExtractor.getForObject(new Tuple2<Integer, String>(0, "")), map.getType());
+
+		WindowedDataStream<String> window = map
+				.window(Count.of(5))
+				.mapWindow(new WindowMapFunction<Tuple2<Integer, String>, String>() {
+					@Override
+					public void mapWindow(Iterable<Tuple2<Integer, String>> values, Collector<String> out) throws Exception {
+					}
+				});
+
+		assertEquals(TypeExtractor.getForClass(String.class), window.getType());
+
+		DataStream<CustomPOJO> flatten = window
+				.foldWindow(new CustomPOJO(), new FoldFunction<String, CustomPOJO>() {
+					@Override
+					public CustomPOJO fold(CustomPOJO accumulator, String value) throws Exception {
+						return null;
+					}
+				})
+				.flatten();
+
+		assertEquals(TypeExtractor.getForClass(CustomPOJO.class), flatten.getType());
+	}
+
 	/////////////////////////////////////////////////////////////
 	// Utilities
 	/////////////////////////////////////////////////////////////
 
-	private static Integer createDownStreamId(DataStream dataStream){
+	private static Integer createDownStreamId(DataStream dataStream) {
 		return dataStream.print().getId();
 	}
 
-	private static boolean isGrouped(DataStream dataStream){
+	private static boolean isGrouped(DataStream dataStream) {
 		return dataStream instanceof GroupedDataStream;
 	}
 
-	private static Integer createDownStreamId(ConnectedDataStream dataStream){
+	private static Integer createDownStreamId(ConnectedDataStream dataStream) {
 		return dataStream.map(new CoMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>, Object>() {
 			@Override
 			public Object map1(Tuple2<Long, Long> value) {
@@ -247,18 +355,42 @@ public class DataStreamTest {
 		}).getId();
 	}
 
-	private static boolean isGrouped(ConnectedDataStream dataStream){
+	private static boolean isGrouped(ConnectedDataStream dataStream) {
 		return (dataStream.getFirst() instanceof GroupedDataStream && dataStream.getSecond() instanceof GroupedDataStream);
 	}
 
-	private static boolean isPartitioned(StreamEdge edge){
+	private static boolean isPartitioned(StreamEdge edge) {
 		return edge.getPartitioner() instanceof FieldsPartitioner;
 	}
 
-	private static class FirstSelector implements KeySelector<Tuple2<Long, Long>, Long>{
+	private static class FirstSelector implements KeySelector<Tuple2<Long, Long>, Long> {
 		@Override
 		public Long getKey(Tuple2<Long, Long> value) throws Exception {
 			return value.f0;
 		}
 	}
+
+	public static class CustomPOJO {
+		private String s;
+		private int i;
+
+		public CustomPOJO() {
+		}
+
+		public void setS(String s) {
+			this.s = s;
+		}
+
+		public void setI(int i) {
+			this.i = i;
+		}
+
+		public String getS() {
+			return s;
+		}
+
+		public int getI() {
+			return i;
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/03ae80d9/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
index 5ca4883..2f5f30d 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/StreamExecutionEnvironmentTest.java
@@ -17,24 +17,31 @@
 
 package org.apache.flink.streaming.api;
 
-import org.apache.flink.api.common.functions.FoldFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.ReduceFunction;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.functions.co.CoFlatMapFunction;
-import org.apache.flink.streaming.api.windowing.helper.Count;
+import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
+import org.apache.flink.streaming.api.functions.source.FromIteratorFunction;
+import org.apache.flink.streaming.api.functions.source.FromSplittableIteratorFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
-import org.apache.flink.util.Collector;
 import org.apache.flink.util.SplittableIterator;
 import org.junit.Test;
 
-import java.util.Iterator;
-
-import static org.junit.Assert.assertTrue;
-
 public class StreamExecutionEnvironmentTest {
 
 	private static final long MEMORYSIZE = 32;
@@ -72,12 +79,12 @@ public class StreamExecutionEnvironmentTest {
 		boolean seenExpectedException = false;
 
 		try {
-			DataStream<Long> dataStream1 = env.generateSequence(0,0).setParallelism(4);
+			DataStream<Long> dataStream1 = env.generateSequence(0, 0).setParallelism(4);
 		} catch (IllegalArgumentException e) {
 			seenExpectedException = true;
 		}
 
-		DataStream<Long> dataStream2 = env.generateParallelSequence(0,0).setParallelism(4);
+		DataStream<Long> dataStream2 = env.generateParallelSequence(0, 0).setParallelism(4);
 
 		String plan = env.getExecutionPlan();
 
@@ -88,6 +95,74 @@ public class StreamExecutionEnvironmentTest {
 				plan.contains("\"contents\":\"Parallel Sequence Source\",\"parallelism\":4"));
 	}
 
+	@Test
+	public void testSources() {
+		StreamExecutionEnvironment env = new TestStreamEnvironment(PARALLELISM, MEMORYSIZE);
+
+		SourceFunction<Integer> srcFun = new SourceFunction<Integer>() {
+			@Override
+			public boolean reachedEnd() throws Exception {
+				return false;
+			}
+
+			@Override
+			public Integer next() throws Exception {
+				return null;
+			}
+		};
+		DataStreamSource<Integer> src1 = env.addSource(srcFun);
+		assertEquals(srcFun, getFunctionForDataSource(src1));
+
+		List<Long> list = Arrays.asList(0L, 1L, 2L);
+
+		DataStreamSource<Long> src2 = env.generateSequence(0, 2);
+		assertTrue(getFunctionForDataSource(src2) instanceof FromIteratorFunction);
+		checkIfSameElements(list, getFunctionForDataSource(src2));
+
+		DataStreamSource<Long> src3 = env.fromElements(0L, 1L, 2L);
+		assertTrue(getFunctionForDataSource(src3) instanceof FromElementsFunction);
+
+		DataStreamSource<Long> src4 = env.fromCollection(list);
+		assertTrue(getFunctionForDataSource(src4) instanceof FromElementsFunction);
+
+		DataStreamSource<Long> src5 = env.generateParallelSequence(0, 2);
+		assertTrue(getFunctionForDataSource(src5) instanceof FromSplittableIteratorFunction);
+	}
+
+	/////////////////////////////////////////////////////////////
+	// Utilities
+	/////////////////////////////////////////////////////////////
+
+	private static <T> void checkIfSameElements(Collection<T> collection, SourceFunction<T> sourceFunction) {
+		for (T elem : collection) {
+			try {
+				assertEquals(elem, sourceFunction.next());
+			} catch (Exception e) {
+				e.printStackTrace();
+				fail(e.getMessage());
+			}
+		}
+
+		try {
+			assertTrue(sourceFunction.reachedEnd());
+		} catch (Exception e) {
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	private static StreamOperator<?> getOperatorForDataStream(DataStream<?> dataStream) {
+		StreamExecutionEnvironment env = dataStream.getExecutionEnvironment();
+		StreamGraph streamGraph = env.getStreamGraph();
+		return streamGraph.getStreamNode(dataStream.getId()).getOperator();
+	}
+
+	private static <T> SourceFunction<T> getFunctionForDataSource(DataStreamSource<T> dataStreamSource) {
+		AbstractUdfStreamOperator<?, ?> operator =
+				(AbstractUdfStreamOperator<?, ?>) getOperatorForDataStream(dataStreamSource);
+		return (SourceFunction<T>) operator.getUserFunction();
+	}
+
 	public static class DummySplittableIterator extends SplittableIterator {
 		private static final long serialVersionUID = 1312752876092210499L;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/03ae80d9/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala
index a3a416a..3173b8d 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/WindowedDataStream.scala
@@ -298,7 +298,7 @@ class WindowedDataStream[T](javaStream: JavaWStream[T]) {
     
   private def aggregate(aggregationType: AggregationType, field: String): 
   WindowedDataStream[T] = {
-    val position = fieldNames2Indices(javaStream.getType(), Array(field))(0)
+    val position = fieldNames2Indices(getType(), Array(field))(0)
     aggregate(aggregationType, position)
   }  
 
@@ -322,4 +322,11 @@ class WindowedDataStream[T](javaStream: JavaWStream[T]) {
             jStream.reduceWindow(reducer)).asInstanceOf[WindowedDataStream[T]]
   }
 
+  /**
+   * Gets the output type.
+   *
+   * @return The output type.
+   */
+  def getType(): TypeInformation[T] = javaStream.getType
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/03ae80d9/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
index 235adad..2f80967 100644
--- a/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
+++ b/flink-staging/flink-streaming/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/DataStreamTest.scala
@@ -18,19 +18,21 @@
 
 package org.apache.flink.streaming.api.scala
 
+import org.apache.flink.api.java.typeutils.TypeExtractor
 import org.apache.flink.streaming.api.graph.{StreamEdge, StreamGraph}
 import org.apache.flink.streaming.api.windowing.helper.Count
 import org.apache.flink.streaming.runtime.partitioner.FieldsPartitioner
 import org.apache.flink.util.Collector
+import org.junit.Assert._
 import org.junit.Test
 
 class DataStreamTest {
 
-  private val parallelism = 2;
+  private val parallelism = 2
 
   @Test
   def testNaming(): Unit = {
-    val env = StreamExecutionEnvironment.createLocalEnvironment(parallelism);
+    val env = StreamExecutionEnvironment.createLocalEnvironment(parallelism)
 
     val source1 = env.generateSequence(0, 0).name("testSource1")
     assert("testSource1" == source1.getName)
@@ -106,7 +108,8 @@ class DataStreamTest {
     val connectedGroup3: ConnectedDataStream[_, _] = connected.groupBy("_1", "_1")
     val downStreamId3: Integer = createDownStreamId(connectedGroup3)
 
-    val connectedGroup4: ConnectedDataStream[_, _] = connected.groupBy(Array[String]("_1"), Array[String]("_1"))
+    val connectedGroup4: ConnectedDataStream[_, _] =
+      connected.groupBy(Array[String]("_1"), Array[String]("_1"))
     val downStreamId4: Integer = createDownStreamId(connectedGroup4)
 
     val connectedGroup5: ConnectedDataStream[_, _] = connected.groupBy(x => x._1, x => x._1)
@@ -131,34 +134,137 @@ class DataStreamTest {
     val connectedPartition1: ConnectedDataStream[_, _] = connected.partitionByHash(0, 0)
     val connectDownStreamId1: Integer = createDownStreamId(connectedPartition1)
 
-    val connectedPartition2: ConnectedDataStream[_, _] = connected.partitionByHash(Array[Int](0), Array[Int](0))
+    val connectedPartition2: ConnectedDataStream[_, _] =
+      connected.partitionByHash(Array[Int](0), Array[Int](0))
     val connectDownStreamId2: Integer = createDownStreamId(connectedPartition2)
 
     val connectedPartition3: ConnectedDataStream[_, _] = connected.partitionByHash("_1", "_1")
     val connectDownStreamId3: Integer = createDownStreamId(connectedPartition3)
 
-    val connectedPartition4: ConnectedDataStream[_, _] = connected.partitionByHash(Array[String]("_1"), Array[String]("_1"))
+    val connectedPartition4: ConnectedDataStream[_, _] =
+      connected.partitionByHash(Array[String]("_1"), Array[String]("_1"))
     val connectDownStreamId4: Integer = createDownStreamId(connectedPartition4)
 
-    val connectedPartition5: ConnectedDataStream[_, _] = connected.partitionByHash(x => x._1, x => x._1)
+    val connectedPartition5: ConnectedDataStream[_, _] =
+      connected.partitionByHash(x => x._1, x => x._1)
     val connectDownStreamId5: Integer = createDownStreamId(connectedPartition5)
 
-    assert(isPartitioned(graph.getStreamEdge(connectedPartition1.getFirst.getId, connectDownStreamId1)))
-    assert(isPartitioned(graph.getStreamEdge(connectedPartition1.getSecond.getId, connectDownStreamId1)))
+    assert(
+      isPartitioned(graph.getStreamEdge(connectedPartition1.getFirst.getId, connectDownStreamId1))
+    )
+    assert(
+      isPartitioned(graph.getStreamEdge(connectedPartition1.getSecond.getId, connectDownStreamId1))
+    )
+
+    assert(
+      isPartitioned(graph.getStreamEdge(connectedPartition2.getFirst.getId, connectDownStreamId2))
+    )
+    assert(
+      isPartitioned(graph.getStreamEdge(connectedPartition2.getSecond.getId, connectDownStreamId2))
+    )
+
+    assert(
+      isPartitioned(graph.getStreamEdge(connectedPartition3.getFirst.getId, connectDownStreamId3))
+    )
+    assert(
+      isPartitioned(graph.getStreamEdge(connectedPartition3.getSecond.getId, connectDownStreamId3))
+    )
+
+    assert(
+      isPartitioned(graph.getStreamEdge(connectedPartition4.getFirst.getId, connectDownStreamId4))
+    )
+    assert(
+      isPartitioned(graph.getStreamEdge(connectedPartition4.getSecond.getId, connectDownStreamId4))
+    )
+
+    assert(
+      isPartitioned(graph.getStreamEdge(connectedPartition5.getFirst.getId, connectDownStreamId5))
+    )
+    assert(
+      isPartitioned(graph.getStreamEdge(connectedPartition5.getSecond.getId, connectDownStreamId5))
+    )
+  }
 
-    assert(isPartitioned(graph.getStreamEdge(connectedPartition2.getFirst.getId, connectDownStreamId2)))
-    assert(isPartitioned(graph.getStreamEdge(connectedPartition2.getSecond.getId, connectDownStreamId2)))
+  /**
+   * Tests whether parallelism gets set.
+   */
+  @Test
+  def testParallelism {
+    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment(10)
+
+    val graph: StreamGraph = env.getStreamGraph
+
+    val src = env.fromElements(new Tuple2[Long, Long](0L, 0L))
+    val map = src.map(x => 0L)
+    val windowed: DataStream[Long] = map
+      .window(Count.of(10))
+      .foldWindow(0L, (x: Long, y: Long) => 0L)
+      .flatten
+    val sink = map.addSink(x => {})
+
+    assert(1 == graph.getStreamNode(src.getId).getParallelism)
+    assert(10 == graph.getStreamNode(map.getId).getParallelism)
+    assert(10 == graph.getStreamNode(windowed.getId).getParallelism)
+    assert(10 == graph.getStreamNode(sink.getId).getParallelism)
+
+    try {
+      src.setParallelism(3)
+      fail
+    }
+    catch {
+      case success: IllegalArgumentException => {
+      }
+    }
+
+    env.setParallelism(7)
+    assert(1 == graph.getStreamNode(src.getId).getParallelism)
+    assert(7 == graph.getStreamNode(map.getId).getParallelism)
+    assert(7 == graph.getStreamNode(windowed.getId).getParallelism)
+    assert(7 == graph.getStreamNode(sink.getId).getParallelism)
+
+    val parallelSource = env.generateParallelSequence(0, 0)
+
+    assert(7 == graph.getStreamNode(parallelSource.getId).getParallelism)
+
+    parallelSource.setParallelism(3)
+    assert(3 == graph.getStreamNode(parallelSource.getId).getParallelism)
+
+    map.setParallelism(2)
+    assert(2 == graph.getStreamNode(map.getId).getParallelism)
+
+    sink.setParallelism(4)
+    assert(4 == graph.getStreamNode(sink.getId).getParallelism)
+  }
 
-    assert(isPartitioned(graph.getStreamEdge(connectedPartition3.getFirst.getId, connectDownStreamId3)))
-    assert(isPartitioned(graph.getStreamEdge(connectedPartition3.getSecond.getId, connectDownStreamId3)))
+  @Test
+  def testTypeInfo {
+    val env: StreamExecutionEnvironment = StreamExecutionEnvironment
+      .createLocalEnvironment(parallelism)
+
+    val src1: DataStream[Long] = env.generateSequence(0, 0)
+    assertEquals(TypeExtractor.getForClass(classOf[Long]), src1.getType)
+
+    val map: DataStream[(Integer, String)] = src1.map(x => null)
+    assertEquals(classOf[scala.Tuple2[Integer, String]], map.getType.getTypeClass)
 
-    assert(isPartitioned(graph.getStreamEdge(connectedPartition4.getFirst.getId, connectDownStreamId4)))
-    assert(isPartitioned(graph.getStreamEdge(connectedPartition4.getSecond.getId, connectDownStreamId4)))
+    val window: WindowedDataStream[String] = map
+      .window(Count.of(5))
+      .mapWindow((x: Iterable[(Integer, String)], y: Collector[String]) => {})
+    assertEquals(TypeExtractor.getForClass(classOf[String]), window.getType)
 
-    assert(isPartitioned(graph.getStreamEdge(connectedPartition5.getFirst.getId, connectDownStreamId5)))
-    assert(isPartitioned(graph.getStreamEdge(connectedPartition5.getSecond.getId, connectDownStreamId5)))
+    val flatten: DataStream[Int] = window
+      .foldWindow(0,
+        (accumulator: Int, value: String) => 0
+      ).flatten
+    assertEquals(TypeExtractor.getForClass(classOf[Int]), flatten.getType)
+
+    // TODO check for custom case class
   }
 
+  /////////////////////////////////////////////////////////////
+  // Utilities
+  /////////////////////////////////////////////////////////////
+
   private def isPartitioned(edge: StreamEdge): Boolean = {
     return edge.getPartitioner.isInstanceOf[FieldsPartitioner[_]]
   }
@@ -168,7 +274,7 @@ class DataStreamTest {
   }
 
   private def createDownStreamId(dataStream: ConnectedDataStream[_, _]): Integer = {
-    return dataStream.map(x  => 0, x => 0).getId
+    return dataStream.map(x => 0, x => 0).getId
   }
 
 }