You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/06/01 14:37:28 UTC
flink git commit: [streaming] Test added for iteration sink-source
CoLocation + non-chained windowing
Repository: flink
Updated Branches:
refs/heads/master 2f347a039 -> ac43a69c1
[streaming] Test added for iteration sink-source CoLocation + non-chained windowing
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ac43a69c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ac43a69c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ac43a69c
Branch: refs/heads/master
Commit: ac43a69c1a5b285d9519ce540a14dcbef946d63d
Parents: 2f347a0
Author: Gyula Fora <gy...@apache.org>
Authored: Mon Jun 1 13:01:04 2015 +0200
Committer: Gyula Fora <gy...@apache.org>
Committed: Mon Jun 1 14:37:13 2015 +0200
----------------------------------------------------------------------
.../api/graph/StreamingJobGraphGenerator.java | 3 +-
.../apache/flink/streaming/api/IterateTest.java | 52 ++++++++++++++++++++
.../windowing/WindowIntegrationTest.java | 1 +
3 files changed, 55 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ac43a69c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index bc1c984..7eb2028 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -360,9 +360,10 @@ public class StreamingJobGraphGenerator {
CoLocationGroup ccg = new CoLocationGroup();
AbstractJobVertex tail = jobVertices.get(loop.getSink().getID());
AbstractJobVertex head = jobVertices.get(loop.getSource().getID());
-
ccg.addVertex(head);
ccg.addVertex(tail);
+ tail.updateCoLocationGroup(ccg);
+ head.updateCoLocationGroup(ccg);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ac43a69c/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
index cc59a31..06013e3 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
@@ -17,12 +17,16 @@
package org.apache.flink.streaming.api;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.Collections;
+import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.runtime.jobgraph.AbstractJobVertex;
+import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.IterativeDataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -74,6 +78,17 @@ public class IterateTest {
public void invoke(Boolean tuple) {
}
}
+
+ public static final class NoOpMap implements MapFunction<Boolean, Boolean>{
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Boolean map(Boolean value) throws Exception {
+ return value;
+ }
+
+ }
public StreamExecutionEnvironment constructIterativeJob(StreamExecutionEnvironment env){
env.setBufferTimeout(10);
@@ -88,6 +103,43 @@ public class IterateTest {
iteration.closeWith(increment).addSink(new MySink());
return env;
}
+
+ @Test
+ public void testColocation() throws Exception {
+ StreamExecutionEnvironment env = new TestStreamEnvironment(4, MEMORYSIZE);
+
+ IterativeDataStream<Boolean> it = env.fromElements(true).distribute().map(new NoOpMap())
+ .iterate();
+
+ DataStream<Boolean> head = it.map(new NoOpMap()).setParallelism(2).name("HeadOperator");
+
+ it.closeWith(head.map(new NoOpMap()).setParallelism(3).name("TailOperator")).print();
+
+ JobGraph graph = env.getStreamGraph().getJobGraph();
+
+ AbstractJobVertex itSource = null;
+ AbstractJobVertex itSink = null;
+ AbstractJobVertex headOp = null;
+ AbstractJobVertex tailOp = null;
+
+ for (AbstractJobVertex vertex : graph.getVertices()) {
+ if (vertex.getName().contains("IterationHead")) {
+ itSource = vertex;
+ } else if (vertex.getName().contains("IterationTail")) {
+ itSink = vertex;
+ } else if (vertex.getName().contains("HeadOperator")) {
+ headOp = vertex;
+ } else if (vertex.getName().contains("TailOp")) {
+ tailOp = vertex;
+ }
+ }
+
+ assertTrue(itSource.getCoLocationGroup() != null);
+ assertEquals(itSource.getCoLocationGroup(), itSink.getCoLocationGroup());
+ assertEquals(headOp.getParallelism(), 2);
+ assertEquals(tailOp.getParallelism(), 3);
+ assertEquals(itSource.getParallelism(), itSink.getParallelism());
+ }
@Test
public void test() throws Exception {
http://git-wip-us.apache.org/repos/asf/flink/blob/ac43a69c/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowIntegrationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowIntegrationTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowIntegrationTest.java
index cdf39fe..d5b2a4c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowIntegrationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/operators/windowing/WindowIntegrationTest.java
@@ -109,6 +109,7 @@ public class WindowIntegrationTest implements Serializable {
};
StreamExecutionEnvironment env = new TestStreamEnvironment(2, MEMORYSIZE);
+ env.disableOperatorChaining();
DataStream<Integer> source = env.fromCollection(inputs);