You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/07/03 08:43:25 UTC

[1/2] flink git commit: [streaming] Minor streaming code cleanups

Repository: flink
Updated Branches:
  refs/heads/master 7c2bbb6a3 -> 9d8a34881


[streaming] Minor streaming code cleanups

Closes #873


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9d8a3488
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9d8a3488
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9d8a3488

Branch: refs/heads/master
Commit: 9d8a348815b4b49940b48aad09dacecbd7a9564e
Parents: 6c21862
Author: mbalassi <mb...@apache.org>
Authored: Mon Jun 29 11:21:23 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Fri Jul 3 08:42:54 2015 +0200

----------------------------------------------------------------------
 .../api/operators/windowing/GroupedActiveDiscretizer.java     | 7 -------
 .../api/operators/windowing/GroupedStreamDiscretizer.java     | 5 +----
 .../test/java/org/apache/flink/streaming/api/IterateTest.java | 1 -
 3 files changed, 1 insertion(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9d8a3488/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedActiveDiscretizer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedActiveDiscretizer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedActiveDiscretizer.java
index 190cb48..0cdafd9 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedActiveDiscretizer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedActiveDiscretizer.java
@@ -52,8 +52,6 @@ public class GroupedActiveDiscretizer<IN> extends GroupedStreamDiscretizer<IN> {
 
 	@Override
 	public void processElement(IN element) throws Exception {
-
-//			last = copy(element);
 			last = element;
 			Object key = keySelector.getKey(element);
 
@@ -67,10 +65,6 @@ public class GroupedActiveDiscretizer<IN> extends GroupedStreamDiscretizer<IN> {
 
 				groupDiscretizer.processRealElement(element);
 			}
-
-
-
-
 	}
 
 	@Override
@@ -90,7 +84,6 @@ public class GroupedActiveDiscretizer<IN> extends GroupedStreamDiscretizer<IN> {
 			centralThread.interrupt();
 			centralThread.join();
 		} catch (InterruptedException e) {
-			e.printStackTrace();
 			LOG.info("GroupedActiveDiscretizer got interruped while joining with central thread: {}", e);
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/9d8a3488/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedStreamDiscretizer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedStreamDiscretizer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedStreamDiscretizer.java
index e80b6ab..64e8b04 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedStreamDiscretizer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedStreamDiscretizer.java
@@ -32,7 +32,7 @@ import org.apache.flink.streaming.api.windowing.windowbuffer.WindowBuffer;
  * transformation. The user supplied eviction and trigger policies are applied
  * on a per group basis to create the {@link StreamWindow} that will be further
  * transformed in the next stages. </p> To allow pre-aggregations supply an
- * appropriate {@link WindowBuffer}
+ * appropriate {@link WindowBuffer}.
  */
 public class GroupedStreamDiscretizer<IN> extends StreamDiscretizer<IN> {
 
@@ -69,7 +69,6 @@ public class GroupedStreamDiscretizer<IN> extends StreamDiscretizer<IN> {
 	@Override
 	public void processElement(IN element) throws Exception {
 
-
 			Object key = keySelector.getKey(element);
 
 			StreamDiscretizer<IN> groupDiscretizer = groupedDiscretizers.get(key);
@@ -97,12 +96,10 @@ public class GroupedStreamDiscretizer<IN> extends StreamDiscretizer<IN> {
 		StreamDiscretizer<IN> groupDiscretizer = new StreamDiscretizer<IN>(triggerPolicy.clone(),
 				evictionPolicy.clone());
 
-//		groupDiscretizer.output = taskContext.getOutputCollector();
 		// TODO: this seems very hacky, maybe we can get around this
 		groupDiscretizer.setup(this.output, this.runtimeContext);
 		groupDiscretizer.open(this.parameters);
 
-
 		return groupDiscretizer;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9d8a3488/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
index c660dbc..3021abb 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
@@ -167,7 +167,6 @@ public class IterateTest {
 		it.closeWith(head.union(head.map(new NoOpMap()).shuffle()), true);
 		it2.closeWith(head2, false);
 
-		System.out.println(env.getExecutionPlan());
 		StreamGraph graph = env.getStreamGraph();
 
 		for (StreamLoop loop : graph.getStreamLoops()) {


[2/2] flink git commit: [FLINK-2285] [streaming] Removed duplicate call in close from GroupedActiveDiscretizer

Posted by mb...@apache.org.
[FLINK-2285] [streaming] Removed duplicate call in close from GroupedActiveDiscretizer


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6c218622
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6c218622
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6c218622

Branch: refs/heads/master
Commit: 6c218622ac7ca3c61cac6ac332471b6590726f71
Parents: 7c2bbb6
Author: mbalassi <mb...@apache.org>
Authored: Mon Jun 29 11:17:21 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Fri Jul 3 08:42:54 2015 +0200

----------------------------------------------------------------------
 .../api/operators/windowing/GroupedActiveDiscretizer.java         | 3 ---
 1 file changed, 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6c218622/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedActiveDiscretizer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedActiveDiscretizer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedActiveDiscretizer.java
index fd95110..190cb48 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedActiveDiscretizer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedActiveDiscretizer.java
@@ -84,9 +84,6 @@ public class GroupedActiveDiscretizer<IN> extends GroupedStreamDiscretizer<IN> {
 	@Override
 	public void close() throws Exception {
 		super.close();
-		for (StreamDiscretizer<IN> group : groupedDiscretizers.values()) {
-			group.emitWindow();
-		}
 
 		try {
 			centralCheck.running = false;