You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gy...@apache.org on 2015/03/26 20:31:08 UTC

[1/2] flink git commit: [FLINK-1686] [streaming] add parallel iteration test

Repository: flink
Updated Branches:
  refs/heads/master 61a3a6485 -> 1f726e482


[FLINK-1686] [streaming] add parallel iteration test


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1f726e48
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1f726e48
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1f726e48

Branch: refs/heads/master
Commit: 1f726e4828dce374a9613ea3474baf6ffbf948d1
Parents: 0b48e52
Author: Paris Carbone <se...@gmail.com>
Authored: Thu Mar 26 17:57:50 2015 +0100
Committer: Gyula Fora <gy...@apache.org>
Committed: Thu Mar 26 20:30:20 2015 +0100

----------------------------------------------------------------------
 .../api/streamvertex/StreamIterationTail.java   |  2 +-
 .../apache/flink/streaming/api/IterateTest.java | 26 ++++++++++++--------
 2 files changed, 17 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1f726e48/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationTail.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationTail.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationTail.java
index 7b654be..ab09aff 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationTail.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationTail.java
@@ -49,7 +49,7 @@ public class StreamIterationTail<IN> extends StreamVertex<IN, IN> {
 			iterationId = configuration.getIterationId();
 			iterationWaitTime = configuration.getIterationWaitTime();
 			shouldWait = iterationWaitTime > 0;
-			BlockingQueueBroker.instance().get(iterationId.toString()+"-"
+			dataChannel = BlockingQueueBroker.instance().get(iterationId.toString()+"-"
 					+getEnvironment().getIndexInSubtaskGroup());
 		} catch (Exception e) {
 			throw new StreamVertexException(String.format(

http://git-wip-us.apache.org/repos/asf/flink/blob/1f726e48/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
index 3f0c48a..a64c4b1 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
@@ -17,8 +17,6 @@
 
 package org.apache.flink.streaming.api;
 
-import static org.junit.Assert.assertTrue;
-
 import org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.IterativeDataStream;
@@ -28,19 +26,24 @@ import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.util.Collector;
 import org.junit.Test;
 
+import java.util.Collections;
+
+import static org.junit.Assert.assertTrue;
+
 public class IterateTest {
 
 	private static final long MEMORYSIZE = 32;
-	private static boolean iterated = false;
+	private static boolean iterated[];
 
-	public static final class IterationHead extends RichFlatMapFunction<Boolean, Boolean> {
+	public static final class IterationHead extends RichFlatMapFunction<Boolean,Boolean> {
 
 		private static final long serialVersionUID = 1L;
 
 		@Override
 		public void flatMap(Boolean value, Collector<Boolean> out) throws Exception {
+			int indx = getRuntimeContext().getIndexOfThisSubtask();
 			if (value) {
-				iterated = true;
+				iterated[indx] = true;
 			} else {
 				out.collect(value);
 			}
@@ -49,7 +52,7 @@ public class IterateTest {
 
 	}
 
-	public static final class IterationTail extends RichFlatMapFunction<Boolean, Boolean> {
+	public static final class IterationTail extends RichFlatMapFunction<Boolean,Boolean> {
 
 		private static final long serialVersionUID = 1L;
 
@@ -72,11 +75,12 @@ public class IterateTest {
 
 	@Test
 	public void test() throws Exception {
-		StreamExecutionEnvironment env = new TestStreamEnvironment(1, MEMORYSIZE);
-
+		int parallelism = 2;
+		StreamExecutionEnvironment env = new TestStreamEnvironment(parallelism, MEMORYSIZE);
+		iterated = new boolean[parallelism];
 		env.setBufferTimeout(10);
 
-		DataStream<Boolean> source = env.fromElements(false, false, false);
+		DataStream<Boolean> source = env.fromCollection(Collections.nCopies(parallelism, false));
 
 		IterativeDataStream<Boolean> iteration = source.iterate(3000);
 
@@ -87,7 +91,9 @@ public class IterateTest {
 
 		env.execute();
 
-		assertTrue(iterated);
+		for (boolean iter : iterated) {
+			assertTrue(iter);
+		}
 
 	}
 


[2/2] flink git commit: [FLINK-1686] [streaming] Use different slot identifier per iteration

Posted by gy...@apache.org.
[FLINK-1686] [streaming] Use different slot identifier per iteration


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0b48e52e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0b48e52e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0b48e52e

Branch: refs/heads/master
Commit: 0b48e52e249ead857c499585f6d68f6f57c3f71d
Parents: 61a3a64
Author: Paris Carbone <se...@gmail.com>
Authored: Thu Mar 26 17:54:44 2015 +0100
Committer: Gyula Fora <gy...@apache.org>
Committed: Thu Mar 26 20:30:20 2015 +0100

----------------------------------------------------------------------
 .../flink/streaming/api/streamvertex/StreamIterationHead.java     | 3 ++-
 .../flink/streaming/api/streamvertex/StreamIterationTail.java     | 3 ++-
 2 files changed, 4 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0b48e52e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationHead.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationHead.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationHead.java
index adbdde3..816c0d6 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationHead.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationHead.java
@@ -59,7 +59,8 @@ public class StreamIterationHead<OUT> extends StreamVertex<OUT, OUT> {
 		shouldWait = iterationWaitTime > 0;
 
 		try {
-			BlockingQueueBroker.instance().handIn(iterationId.toString(), dataChannel);
+			BlockingQueueBroker.instance().handIn(iterationId.toString()+"-" 
+					+getEnvironment().getIndexInSubtaskGroup(), dataChannel);
 		} catch (Exception e) {
 			throw new RuntimeException(e);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/0b48e52e/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationTail.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationTail.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationTail.java
index 53e8750..7b654be 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationTail.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/StreamIterationTail.java
@@ -49,7 +49,8 @@ public class StreamIterationTail<IN> extends StreamVertex<IN, IN> {
 			iterationId = configuration.getIterationId();
 			iterationWaitTime = configuration.getIterationWaitTime();
 			shouldWait = iterationWaitTime > 0;
-			dataChannel = BlockingQueueBroker.instance().get(iterationId.toString());
+			BlockingQueueBroker.instance().get(iterationId.toString()+"-"
+					+getEnvironment().getIndexInSubtaskGroup());
 		} catch (Exception e) {
 			throw new StreamVertexException(String.format(
 					"Cannot register inputs of StreamIterationSink %s", iterationId), e);