You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2015/07/03 08:43:25 UTC
[1/2] flink git commit: [streaming] Minor streaming code cleanups
Repository: flink
Updated Branches:
refs/heads/master 7c2bbb6a3 -> 9d8a34881
[streaming] Minor streaming code cleanups
Closes #873
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9d8a3488
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9d8a3488
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9d8a3488
Branch: refs/heads/master
Commit: 9d8a348815b4b49940b48aad09dacecbd7a9564e
Parents: 6c21862
Author: mbalassi <mb...@apache.org>
Authored: Mon Jun 29 11:21:23 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Fri Jul 3 08:42:54 2015 +0200
----------------------------------------------------------------------
.../api/operators/windowing/GroupedActiveDiscretizer.java | 7 -------
.../api/operators/windowing/GroupedStreamDiscretizer.java | 5 +----
.../test/java/org/apache/flink/streaming/api/IterateTest.java | 1 -
3 files changed, 1 insertion(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9d8a3488/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedActiveDiscretizer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedActiveDiscretizer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedActiveDiscretizer.java
index 190cb48..0cdafd9 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedActiveDiscretizer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedActiveDiscretizer.java
@@ -52,8 +52,6 @@ public class GroupedActiveDiscretizer<IN> extends GroupedStreamDiscretizer<IN> {
@Override
public void processElement(IN element) throws Exception {
-
-// last = copy(element);
last = element;
Object key = keySelector.getKey(element);
@@ -67,10 +65,6 @@ public class GroupedActiveDiscretizer<IN> extends GroupedStreamDiscretizer<IN> {
groupDiscretizer.processRealElement(element);
}
-
-
-
-
}
@Override
@@ -90,7 +84,6 @@ public class GroupedActiveDiscretizer<IN> extends GroupedStreamDiscretizer<IN> {
centralThread.interrupt();
centralThread.join();
} catch (InterruptedException e) {
- e.printStackTrace();
LOG.info("GroupedActiveDiscretizer got interruped while joining with central thread: {}", e);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9d8a3488/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedStreamDiscretizer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedStreamDiscretizer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedStreamDiscretizer.java
index e80b6ab..64e8b04 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedStreamDiscretizer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedStreamDiscretizer.java
@@ -32,7 +32,7 @@ import org.apache.flink.streaming.api.windowing.windowbuffer.WindowBuffer;
* transformation. The user supplied eviction and trigger policies are applied
* on a per group basis to create the {@link StreamWindow} that will be further
* transformed in the next stages. </p> To allow pre-aggregations supply an
- * appropriate {@link WindowBuffer}
+ * appropriate {@link WindowBuffer}.
*/
public class GroupedStreamDiscretizer<IN> extends StreamDiscretizer<IN> {
@@ -69,7 +69,6 @@ public class GroupedStreamDiscretizer<IN> extends StreamDiscretizer<IN> {
@Override
public void processElement(IN element) throws Exception {
-
Object key = keySelector.getKey(element);
StreamDiscretizer<IN> groupDiscretizer = groupedDiscretizers.get(key);
@@ -97,12 +96,10 @@ public class GroupedStreamDiscretizer<IN> extends StreamDiscretizer<IN> {
StreamDiscretizer<IN> groupDiscretizer = new StreamDiscretizer<IN>(triggerPolicy.clone(),
evictionPolicy.clone());
-// groupDiscretizer.output = taskContext.getOutputCollector();
// TODO: this seems very hacky, maybe we can get around this
groupDiscretizer.setup(this.output, this.runtimeContext);
groupDiscretizer.open(this.parameters);
-
return groupDiscretizer;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9d8a3488/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
index c660dbc..3021abb 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
@@ -167,7 +167,6 @@ public class IterateTest {
it.closeWith(head.union(head.map(new NoOpMap()).shuffle()), true);
it2.closeWith(head2, false);
- System.out.println(env.getExecutionPlan());
StreamGraph graph = env.getStreamGraph();
for (StreamLoop loop : graph.getStreamLoops()) {
[2/2] flink git commit: [FLINK-2285] [streaming] Removed duplicate
call in close from GroupedActiveDiscretizer
Posted by mb...@apache.org.
[FLINK-2285] [streaming] Removed duplicate call in close from GroupedActiveDiscretizer
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6c218622
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6c218622
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6c218622
Branch: refs/heads/master
Commit: 6c218622ac7ca3c61cac6ac332471b6590726f71
Parents: 7c2bbb6
Author: mbalassi <mb...@apache.org>
Authored: Mon Jun 29 11:17:21 2015 +0200
Committer: mbalassi <mb...@apache.org>
Committed: Fri Jul 3 08:42:54 2015 +0200
----------------------------------------------------------------------
.../api/operators/windowing/GroupedActiveDiscretizer.java | 3 ---
1 file changed, 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6c218622/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedActiveDiscretizer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedActiveDiscretizer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedActiveDiscretizer.java
index fd95110..190cb48 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedActiveDiscretizer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/windowing/GroupedActiveDiscretizer.java
@@ -84,9 +84,6 @@ public class GroupedActiveDiscretizer<IN> extends GroupedStreamDiscretizer<IN> {
@Override
public void close() throws Exception {
super.close();
- for (StreamDiscretizer<IN> group : groupedDiscretizers.values()) {
- group.emitWindow();
- }
try {
centralCheck.running = false;