You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/06/01 14:37:28 UTC

flink git commit: [streaming] Test added for iteration sink-source CoLocation + non-chained windowing

Repository: flink
Updated Branches:
  refs/heads/master 2f347a039 -> ac43a69c1


[streaming] Test added for iteration sink-source CoLocation + non-chained windowing


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ac43a69c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ac43a69c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ac43a69c

Branch: refs/heads/master
Commit: ac43a69c1a5b285d9519ce540a14dcbef946d63d
Parents: 2f347a0
Author: Gyula Fora <gy...@apache.org>
Authored: Mon Jun 1 13:01:04 2015 +0200
Committer: Gyula Fora <gy...@apache.org>
Committed: Mon Jun 1 14:37:13 2015 +0200

----------------------------------------------------------------------
 .../api/graph/StreamingJobGraphGenerator.java   |  3 +-
 .../apache/flink/streaming/api/IterateTest.java | 52 ++++++++++++++++++++
 .../windowing/WindowIntegrationTest.java        |  1 +
 3 files changed, 55 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ac43a69c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index bc1c984..7eb2028 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -360,9 +360,10 @@ public class StreamingJobGraphGenerator {
 			CoLocationGroup ccg = new CoLocationGroup();
 			AbstractJobVertex tail = jobVertices.get(loop.getSink().getID());
 			AbstractJobVertex head = jobVertices.get(loop.getSource().getID());
-
 			ccg.addVertex(head);
 			ccg.addVertex(tail);
+			tail.updateCoLocationGroup(ccg);
+			head.updateCoLocationGroup(ccg);
 		}
 	}
 	

http://git-wip-us.apache.org/repos/asf/flink/blob/ac43a69c/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
index cc59a31..06013e3 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
@@ -17,12 +17,16 @@
 
 package org.apache.flink.streaming.api;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.util.Collections;
 
+import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.IterativeDataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -74,6 +78,17 @@ public class IterateTest {
 		public void invoke(Boolean tuple) {
 		}
 	}
+	
+	public static final class NoOpMap implements MapFunction<Boolean, Boolean>{
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Boolean map(Boolean value) throws Exception {
+			return value;
+		}
+		
+	}
 
 	public StreamExecutionEnvironment constructIterativeJob(StreamExecutionEnvironment env){
 		env.setBufferTimeout(10);
@@ -88,6 +103,43 @@ public class IterateTest {
 		iteration.closeWith(increment).addSink(new MySink());
 		return env;
 	}
+	
+	@Test
+	public void testColocation() throws Exception {
+		StreamExecutionEnvironment env = new TestStreamEnvironment(4, MEMORYSIZE);
+
+		IterativeDataStream<Boolean> it = env.fromElements(true).distribute().map(new NoOpMap())
+				.iterate();
+
+		DataStream<Boolean> head = it.map(new NoOpMap()).setParallelism(2).name("HeadOperator");
+
+		it.closeWith(head.map(new NoOpMap()).setParallelism(3).name("TailOperator")).print();
+
+		JobGraph graph = env.getStreamGraph().getJobGraph();
+
+		AbstractJobVertex itSource = null;
+		AbstractJobVertex itSink = null;
+		AbstractJobVertex headOp = null;
+		AbstractJobVertex tailOp = null;
+
+		for (AbstractJobVertex vertex : graph.getVertices()) {
+			if (vertex.getName().contains("IterationHead")) {
+				itSource = vertex;
+			} else if (vertex.getName().contains("IterationTail")) {
+				itSink = vertex;
+			} else if (vertex.getName().contains("HeadOperator")) {
+				headOp = vertex;
+			} else if (vertex.getName().contains("TailOp")) {
+				tailOp = vertex;
+			}
+		}
+
+		assertTrue(itSource.getCoLocationGroup() != null);
+		assertEquals(itSource.getCoLocationGroup(), itSink.getCoLocationGroup());
+		assertEquals(headOp.getParallelism(), 2);
+		assertEquals(tailOp.getParallelism(), 3);
+		assertEquals(itSource.getParallelism(), itSink.getParallelism());
+	}
 
 	@Test
 	public void test() throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/ac43a69c/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowIntegrationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowIntegrationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowIntegrationTest.java
index cdf39fe..d5b2a4c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowIntegrationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowIntegrationTest.java
@@ -109,6 +109,7 @@ public class WindowIntegrationTest implements Serializable {
 		};
 
 		StreamExecutionEnvironment env = new TestStreamEnvironment(2, MEMORYSIZE);
+		env.disableOperatorChaining();
 
 		DataStream<Integer> source = env.fromCollection(inputs);