You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/04/14 15:02:13 UTC
[1/2] flink git commit: [FLINK-1660] [streaming] [tests] Fix
MultiTriggerPolicyTest race
Repository: flink
Updated Branches:
refs/heads/master f50d75411 -> e5a3b95a2
[FLINK-1660] [streaming] [tests] Fix MultiTriggerPolicyTest race
Closes #590
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e5a3b95a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e5a3b95a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e5a3b95a
Branch: refs/heads/master
Commit: e5a3b95a262a0498f9d6dc7d47495c95719c5632
Parents: 54d5662
Author: Ufuk Celebi <uc...@apache.org>
Authored: Mon Apr 13 12:16:16 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Tue Apr 14 15:01:22 2015 +0200
----------------------------------------------------------------------
.../flink-streaming-core/pom.xml | 12 +++-
.../policy/MultiTriggerPolicyTest.java | 64 +++++++++++---------
2 files changed, 44 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e5a3b95a/flink-staging/flink-streaming/flink-streaming-core/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/pom.xml b/flink-staging/flink-streaming/flink-streaming-core/pom.xml
index a60bd01..bc6c55c 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/pom.xml
+++ b/flink-staging/flink-streaming/flink-streaming-core/pom.xml
@@ -48,12 +48,18 @@ under the License.
<version>${project.version}</version>
<scope>test</scope>
</dependency>
-
- <dependency>
+
+ <dependency>
<groupId>org.apache.sling</groupId>
<artifactId>org.apache.sling.commons.json</artifactId>
<version>2.0.6</version>
- </dependency>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>${guava.version}</version>
+ </dependency>
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/flink/blob/e5a3b95a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/MultiTriggerPolicyTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/MultiTriggerPolicyTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/MultiTriggerPolicyTest.java
index 9964cd8..4448b59 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/MultiTriggerPolicyTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/windowing/policy/MultiTriggerPolicyTest.java
@@ -17,16 +17,22 @@
package org.apache.flink.streaming.api.windowing.policy;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import com.google.common.collect.Sets;
+import org.apache.flink.streaming.api.windowing.helper.Timestamp;
+import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
+import org.junit.Test;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
-import org.apache.flink.streaming.api.windowing.helper.Timestamp;
-import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
-import org.junit.Test;
+import static com.google.common.base.Preconditions.checkArgument;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
public class MultiTriggerPolicyTest {
@@ -135,7 +141,7 @@ public class MultiTriggerPolicyTest {
* correctly.
*/
@Test
- public void testActiveTriggerRunnables() {
+ public void testActiveTriggerRunnables() throws InterruptedException {
TriggerPolicy<Integer> firstPolicy = new ActiveTriggerWithRunnable(1);
TriggerPolicy<Integer> secondPolicy = new ActiveTriggerWithRunnable(2);
TriggerPolicy<Integer> thirdPolicy = new ActiveTriggerWithRunnable(3);
@@ -143,7 +149,7 @@ public class MultiTriggerPolicyTest {
ActiveTriggerPolicy<Integer> multiTrigger = new MultiTriggerPolicy<Integer>(firstPolicy,
secondPolicy, thirdPolicy);
- MyCallbackClass cb = new MyCallbackClass();
+ MyCallbackClass cb = new MyCallbackClass(3);
Runnable runnable = multiTrigger.createActiveTriggerRunnable(cb);
new Thread(runnable).start();
@@ -168,7 +174,7 @@ public class MultiTriggerPolicyTest {
@SuppressWarnings("serial")
private class ActiveTriggerWithRunnable implements ActiveTriggerPolicy<Integer> {
- int id;
+ private final int id;
public ActiveTriggerWithRunnable(int id) {
this.id = id;
@@ -203,37 +209,37 @@ public class MultiTriggerPolicyTest {
*/
private class MyCallbackClass implements ActiveTriggerCallback {
- List<Integer> received = new LinkedList<Integer>();
+ private final Set<Integer> received = Sets
+ .newSetFromMap(new ConcurrentHashMap<Integer, Boolean>());
+
+ private final CountDownLatch sync;
+
+ public MyCallbackClass(int numberOfExpectedElements) {
+ checkArgument(numberOfExpectedElements >= 0);
+ this.sync = new CountDownLatch(numberOfExpectedElements);
+ }
@Override
public void sendFakeElement(Object datapoint) {
received.add((Integer) datapoint);
+
+ sync.countDown();
}
- public boolean check(int timeout, int... ids) {
- int totalTime = 0;
+ public boolean check(int timeout, int... expectedIds) throws InterruptedException {
+ // Wait for all elements
+ sync.await(timeout, TimeUnit.MILLISECONDS);
- while (totalTime <= timeout) {
- boolean result = true;
- for (int id : ids) {
- if (!received.contains(id)) {
- result = false;
- }
- }
+ // Check received all expected ids
+ assertEquals(expectedIds.length, received.size());
- if (result) {
- return true;
- } else {
- try {
- Thread.sleep(1000);
- totalTime += 1000;
- } catch (InterruptedException e) {
- // ignore it here
- }
+ for (int id : expectedIds) {
+ if (!received.contains(id)) {
+ return false;
}
}
- return false;
- }
+ return true;
+ }
}
}
\ No newline at end of file
[2/2] flink git commit: [FLINK-1866] [streaming] StreamConfig key
bugfix
Posted by mb...@apache.org.
[FLINK-1866] [streaming] StreamConfig key bugfix
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/54d5662f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/54d5662f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/54d5662f
Branch: refs/heads/master
Commit: 54d5662f353c19d96a4241cf9b7d7703ac2d01f8
Parents: f50d754
Author: Gyula Fora <gy...@apache.org>
Authored: Mon Apr 13 11:21:37 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Tue Apr 14 15:01:22 2015 +0200
----------------------------------------------------------------------
.../org/apache/flink/streaming/api/StreamConfig.java | 14 +++++++-------
1 file changed, 7 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/54d5662f/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
index c1e9606..152d489 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
@@ -44,20 +44,20 @@ public class StreamConfig implements Serializable {
private static final String OUTPUT_NAME = "outputName_";
private static final String VERTEX_NAME = "vertexID";
private static final String OPERATOR_NAME = "operatorName";
- private static final String ITERATION_ID = "iteration-id";
+ private static final String ITERATION_ID = "iterationId";
private static final String OUTPUT_SELECTOR_WRAPPER = "outputSelectorWrapper";
- private static final String SERIALIZEDUDF = "serializedudf";
- private static final String USER_FUNCTION = "userfunction";
+ private static final String SERIALIZEDUDF = "serializedUDF";
+ private static final String USER_FUNCTION = "userFunction";
private static final String BUFFER_TIMEOUT = "bufferTimeout";
private static final String TYPE_SERIALIZER_IN_1 = "typeSerializer_in_1";
private static final String TYPE_SERIALIZER_IN_2 = "typeSerializer_in_2";
private static final String TYPE_SERIALIZER_OUT_1 = "typeSerializer_out_1";
private static final String TYPE_SERIALIZER_OUT_2 = "typeSerializer_out_2";
private static final String ITERATON_WAIT = "iterationWait";
- private static final String NONCHAINED_OUTPUTS = "NONCHAINED_OUTPUTS";
- private static final String EDGES_IN_ORDER = "rwOrder";
- private static final String OUT_STREAM_EDGES = "out stream edges";
- private static final String IN_STREAM_EDGES = "out stream edges";
+ private static final String NONCHAINED_OUTPUTS = "nonChainedOutputs";
+ private static final String EDGES_IN_ORDER = "edgesInOrder";
+ private static final String OUT_STREAM_EDGES = "outStreamEdges";
+ private static final String IN_STREAM_EDGES = "inStreamEdges";
// DEFAULT VALUES
private static final long DEFAULT_TIMEOUT = 100;