You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/04/14 15:02:13 UTC

[1/2] flink git commit: [FLINK-1660] [streaming] [tests] Fix MultiTriggerPolicyTest race

Repository: flink
Updated Branches:
  refs/heads/master f50d75411 -> e5a3b95a2


[FLINK-1660] [streaming] [tests] Fix MultiTriggerPolicyTest race

Closes #590


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e5a3b95a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e5a3b95a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e5a3b95a

Branch: refs/heads/master
Commit: e5a3b95a262a0498f9d6dc7d47495c95719c5632
Parents: 54d5662
Author: Ufuk Celebi <uc...@apache.org>
Authored: Mon Apr 13 12:16:16 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Tue Apr 14 15:01:22 2015 +0200

----------------------------------------------------------------------
 .../flink-streaming-core/pom.xml                | 12 +++-
 .../policy/MultiTriggerPolicyTest.java          | 64 +++++++++++---------
 2 files changed, 44 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e5a3b95a/flink-staging/flink-streaming/flink-streaming-core/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/pom.xml b/flink-staging/flink-streaming/flink-streaming-core/pom.xml
index a60bd01..bc6c55c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/pom.xml
+++ b/flink-staging/flink-streaming/flink-streaming-core/pom.xml
@@ -48,12 +48,18 @@ under the License.
 			<version>${project.version}</version>
 			<scope>test</scope>
 		</dependency>
-        
-	        <dependency>
+
+		<dependency>
 			<groupId>org.apache.sling</groupId>
 			<artifactId>org.apache.sling.commons.json</artifactId>
 			<version>2.0.6</version>
-        	</dependency>
+		</dependency>
+
+		<dependency>
+			<groupId>com.google.guava</groupId>
+			<artifactId>guava</artifactId>
+			<version>${guava.version}</version>
+		</dependency>
 	</dependencies>
 
 	<build>

http://git-wip-us.apache.org/repos/asf/flink/blob/e5a3b95a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/MultiTriggerPolicyTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/MultiTriggerPolicyTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/MultiTriggerPolicyTest.java
index 9964cd8..4448b59 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/MultiTriggerPolicyTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/MultiTriggerPolicyTest.java
@@ -17,16 +17,22 @@
 
 package org.apache.flink.streaming.api.windowing.policy;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import com.google.common.collect.Sets;
+import org.apache.flink.streaming.api.windowing.helper.Timestamp;
+import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
+import org.junit.Test;
 
 import java.util.Arrays;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
-import org.apache.flink.streaming.api.windowing.helper.Timestamp;
-import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
-import org.junit.Test;
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 public class MultiTriggerPolicyTest {
 
@@ -135,7 +141,7 @@ public class MultiTriggerPolicyTest {
 	 * correctly.
 	 */
 	@Test
-	public void testActiveTriggerRunnables() {
+	public void testActiveTriggerRunnables() throws InterruptedException {
 		TriggerPolicy<Integer> firstPolicy = new ActiveTriggerWithRunnable(1);
 		TriggerPolicy<Integer> secondPolicy = new ActiveTriggerWithRunnable(2);
 		TriggerPolicy<Integer> thirdPolicy = new ActiveTriggerWithRunnable(3);
@@ -143,7 +149,7 @@ public class MultiTriggerPolicyTest {
 		ActiveTriggerPolicy<Integer> multiTrigger = new MultiTriggerPolicy<Integer>(firstPolicy,
 				secondPolicy, thirdPolicy);
 
-		MyCallbackClass cb = new MyCallbackClass();
+		MyCallbackClass cb = new MyCallbackClass(3);
 		Runnable runnable = multiTrigger.createActiveTriggerRunnable(cb);
 		new Thread(runnable).start();
 
@@ -168,7 +174,7 @@ public class MultiTriggerPolicyTest {
 	@SuppressWarnings("serial")
 	private class ActiveTriggerWithRunnable implements ActiveTriggerPolicy<Integer> {
 
-		int id;
+		private final int id;
 
 		public ActiveTriggerWithRunnable(int id) {
 			this.id = id;
@@ -203,37 +209,37 @@ public class MultiTriggerPolicyTest {
 	 */
 	private class MyCallbackClass implements ActiveTriggerCallback {
 
-		List<Integer> received = new LinkedList<Integer>();
+		private final Set<Integer> received = Sets
+				.newSetFromMap(new ConcurrentHashMap<Integer, Boolean>());
+
+		private final CountDownLatch sync;
+
+		public MyCallbackClass(int numberOfExpectedElements) {
+			checkArgument(numberOfExpectedElements >= 0);
+			this.sync = new CountDownLatch(numberOfExpectedElements);
+		}
 
 		@Override
 		public void sendFakeElement(Object datapoint) {
 			received.add((Integer) datapoint);
+
+			sync.countDown();
 		}
 
-		public boolean check(int timeout, int... ids) {
-			int totalTime = 0;
+		public boolean check(int timeout, int... expectedIds) throws InterruptedException {
+			// Wait for all elements
+			sync.await(timeout, TimeUnit.MILLISECONDS);
 
-			while (totalTime <= timeout) {
-				boolean result = true;
-				for (int id : ids) {
-					if (!received.contains(id)) {
-						result = false;
-					}
-				}
+			// Check received all expected ids
+			assertEquals(expectedIds.length, received.size());
 
-				if (result) {
-					return true;
-				} else {
-					try {
-						Thread.sleep(1000);
-						totalTime += 1000;
-					} catch (InterruptedException e) {
-						// ignore it here
-					}
+			for (int id : expectedIds) {
+				if (!received.contains(id)) {
+					return false;
 				}
 			}
-			return false;
-		}
 
+			return true;
+		}
 	}
 }
\ No newline at end of file


[2/2] flink git commit: [FLINK-1866] [streaming] StreamConfig key bugfix

Posted by mb...@apache.org.
[FLINK-1866] [streaming] StreamConfig key bugfix


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/54d5662f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/54d5662f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/54d5662f

Branch: refs/heads/master
Commit: 54d5662f353c19d96a4241cf9b7d7703ac2d01f8
Parents: f50d754
Author: Gyula Fora <gy...@apache.org>
Authored: Mon Apr 13 11:21:37 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Tue Apr 14 15:01:22 2015 +0200

----------------------------------------------------------------------
 .../org/apache/flink/streaming/api/StreamConfig.java  | 14 +++++++-------
 1 file changed, 7 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/54d5662f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
index c1e9606..152d489 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
@@ -44,20 +44,20 @@ public class StreamConfig implements Serializable {
 	private static final String OUTPUT_NAME = "outputName_";
 	private static final String VERTEX_NAME = "vertexID";
 	private static final String OPERATOR_NAME = "operatorName";
-	private static final String ITERATION_ID = "iteration-id";
+	private static final String ITERATION_ID = "iterationId";
 	private static final String OUTPUT_SELECTOR_WRAPPER = "outputSelectorWrapper";
-	private static final String SERIALIZEDUDF = "serializedudf";
-	private static final String USER_FUNCTION = "userfunction";
+	private static final String SERIALIZEDUDF = "serializedUDF";
+	private static final String USER_FUNCTION = "userFunction";
 	private static final String BUFFER_TIMEOUT = "bufferTimeout";
 	private static final String TYPE_SERIALIZER_IN_1 = "typeSerializer_in_1";
 	private static final String TYPE_SERIALIZER_IN_2 = "typeSerializer_in_2";
 	private static final String TYPE_SERIALIZER_OUT_1 = "typeSerializer_out_1";
 	private static final String TYPE_SERIALIZER_OUT_2 = "typeSerializer_out_2";
 	private static final String ITERATON_WAIT = "iterationWait";
-	private static final String NONCHAINED_OUTPUTS = "NONCHAINED_OUTPUTS";
-	private static final String EDGES_IN_ORDER = "rwOrder";
-	private static final String OUT_STREAM_EDGES = "out stream edges";
-	private static final String IN_STREAM_EDGES = "out stream edges";
+	private static final String NONCHAINED_OUTPUTS = "nonChainedOutputs";
+	private static final String EDGES_IN_ORDER = "edgesInOrder";
+	private static final String OUT_STREAM_EDGES = "outStreamEdges";
+	private static final String IN_STREAM_EDGES = "inStreamEdges";
 
 	// DEFAULT VALUES
 	private static final long DEFAULT_TIMEOUT = 100;