You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/12/05 18:26:37 UTC
[32/34] incubator-flink git commit: [streaming] Changed
GroupedWindowInvokable to always delete groups in case they have an empty
element buffer
[streaming] Changed GroupedWindowInvokable to always delete groups in case they have an empty element buffer
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/783aa4a8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/783aa4a8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/783aa4a8
Branch: refs/heads/master
Commit: 783aa4a86b6815a8cf2929d79334bebbe4fc4eb6
Parents: 4e046a9
Author: Jonas Traub (powibol) <jo...@s-traub.com>
Authored: Wed Dec 3 12:30:44 2014 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Fri Dec 5 16:47:18 2014 +0100
----------------------------------------------------------------------
.../operator/GroupedWindowInvokable.java | 33 +++++++++++++++++++-
.../api/invokable/operator/WindowInvokable.java | 11 +++++++
2 files changed, 43 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/783aa4a8/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java
index 905b45f..ae16be0 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java
@@ -18,6 +18,7 @@
package org.apache.flink.streaming.api.invokable.operator;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedList;
import java.util.Map;
import java.util.NoSuchElementException;
@@ -252,6 +253,7 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
// process in groups
for (WindowInvokable<IN, OUT> group : windowingGroups.values()) {
group.processFakeElement(in, trigger);
+ checkForEmptyGroupBuffer(group);
}
}
}
@@ -267,6 +269,7 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
// only add the element to its group
groupInvokable.processRealElement(reuse.getObject());
+ checkForEmptyGroupBuffer(groupInvokable);
// If central eviction is used, handle it here
if (!centralEvictionPolicies.isEmpty()) {
@@ -286,6 +289,9 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
// policies
group.externalTriggerFakeElement(reuse.getObject(), currentTriggerPolicies);
}
+
+ //remove group in case it has an empty buffer
+ //checkForEmptyGroupBuffer(group);
}
// If central eviction is used, handle it here
@@ -450,8 +456,13 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
* buffer.
*/
private void evictElements(int numToEvict) {
+ HashSet<WindowInvokable<IN, OUT>> usedGroups=new HashSet<WindowInvokable<IN,OUT>>();
for (; numToEvict > 0; numToEvict--) {
- deleteOrderForCentralEviction.getFirst().evictFirst();
+ WindowInvokable<IN, OUT> currentGroup=deleteOrderForCentralEviction.getFirst();
+ //Do the eviction
+ currentGroup.evictFirst();
+ //Remember groups which possibly have an empty buffer after the eviction
+ usedGroups.add(currentGroup);
try {
deleteOrderForCentralEviction.removeFirst();
} catch (NoSuchElementException e) {
@@ -460,6 +471,25 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
}
}
+
+ //Remove groups with empty buffer
+ for (WindowInvokable<IN, OUT> group:usedGroups){
+ checkForEmptyGroupBuffer(group);
+ }
+ }
+
+ /**
+ * Checks if the element buffer of a given windowing group is empty. If so,
+ * the group will be deleted.
+ *
+ * @param group
+ * The windowing group to be checked and and removed in case its
+ * buffer is empty.
+ */
+ private void checkForEmptyGroupBuffer(WindowInvokable<IN, OUT> group) {
+ if (group.isBufferEmpty()) {
+ windowingGroups.remove(group);
+ }
}
/**
@@ -486,6 +516,7 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
// handle element in groups
for (WindowInvokable<IN, OUT> group : windowingGroups.values()) {
group.processFakeElement(datapoint, policy);
+ checkForEmptyGroupBuffer(group);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/783aa4a8/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java
index bbc1277..0e740bb 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java
@@ -357,6 +357,17 @@ public abstract class WindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT>
}
/**
+ * This method returns whether the element buffer is empty or not. It is
+ * used to figure out if a group can be deleted or not when
+ * {@link GroupedWindowInvokable} is used.
+ *
+ * @return true in case the buffer is empty otherwise false.
+ */
+ protected boolean isBufferEmpty(){
+ return buffer.isEmpty();
+ }
+
+ /**
* This method does the final reduce at the end of the stream and emits the
* result.
*