You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by mb...@apache.org on 2014/12/05 18:26:37 UTC

[32/34] incubator-flink git commit: [streaming] Changed GroupedWindowInvokable to always delete groups in case they have an empty element buffer

[streaming] Changed GroupedWindowInvokable to always delete groups in case they have an empty element buffer


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/783aa4a8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/783aa4a8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/783aa4a8

Branch: refs/heads/master
Commit: 783aa4a86b6815a8cf2929d79334bebbe4fc4eb6
Parents: 4e046a9
Author: Jonas Traub (powibol) <jo...@s-traub.com>
Authored: Wed Dec 3 12:30:44 2014 +0100
Committer: mbalassi <mb...@apache.org>
Committed: Fri Dec 5 16:47:18 2014 +0100

----------------------------------------------------------------------
 .../operator/GroupedWindowInvokable.java        | 33 +++++++++++++++++++-
 .../api/invokable/operator/WindowInvokable.java | 11 +++++++
 2 files changed, 43 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/783aa4a8/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java
index 905b45f..ae16be0 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/GroupedWindowInvokable.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.api.invokable.operator;
 
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.Map;
 import java.util.NoSuchElementException;
@@ -252,6 +253,7 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
 					// process in groups
 					for (WindowInvokable<IN, OUT> group : windowingGroups.values()) {
 						group.processFakeElement(in, trigger);
+						checkForEmptyGroupBuffer(group);
 					}
 				}
 			}
@@ -267,6 +269,7 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
 
 				// only add the element to its group
 				groupInvokable.processRealElement(reuse.getObject());
+				checkForEmptyGroupBuffer(groupInvokable);
 
 				// If central eviction is used, handle it here
 				if (!centralEvictionPolicies.isEmpty()) {
@@ -286,6 +289,9 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
 						// policies
 						group.externalTriggerFakeElement(reuse.getObject(), currentTriggerPolicies);
 					}
+					
+					//remove group in case it has an empty buffer
+					//checkForEmptyGroupBuffer(group);
 				}
 
 				// If central eviction is used, handle it here
@@ -450,8 +456,13 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
 	 *            buffer.
 	 */
 	private void evictElements(int numToEvict) {
+		HashSet<WindowInvokable<IN, OUT>> usedGroups=new HashSet<WindowInvokable<IN,OUT>>();
 		for (; numToEvict > 0; numToEvict--) {
-			deleteOrderForCentralEviction.getFirst().evictFirst();
+			WindowInvokable<IN, OUT> currentGroup=deleteOrderForCentralEviction.getFirst();
+			//Do the eviction
+			currentGroup.evictFirst();
+			//Remember groups which possibly have an empty buffer after the eviction
+			usedGroups.add(currentGroup);
 			try {
 				deleteOrderForCentralEviction.removeFirst();
 			} catch (NoSuchElementException e) {
@@ -460,6 +471,25 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
 			}
 
 		}
+		
+		//Remove groups with empty buffer
+		for (WindowInvokable<IN, OUT> group:usedGroups){
+			checkForEmptyGroupBuffer(group);
+		}
+	}
+	
+	/**
+	 * Checks if the element buffer of a given windowing group is empty. If so,
+	 * the group will be deleted.
+	 * 
+	 * @param group
+	 *            The windowing group to be checked and and removed in case its
+	 *            buffer is empty.
+	 */
+	private void checkForEmptyGroupBuffer(WindowInvokable<IN, OUT> group) {
+		if (group.isBufferEmpty()) {
+			windowingGroups.remove(group);
+		}
 	}
 
 	/**
@@ -486,6 +516,7 @@ public class GroupedWindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT> {
 			// handle element in groups
 			for (WindowInvokable<IN, OUT> group : windowingGroups.values()) {
 				group.processFakeElement(datapoint, policy);
+				checkForEmptyGroupBuffer(group);
 			}
 		}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/783aa4a8/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java
index bbc1277..0e740bb 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java
@@ -357,6 +357,17 @@ public abstract class WindowInvokable<IN, OUT> extends StreamInvokable<IN, OUT>
 	}
 
 	/**
+	 * This method returns whether the element buffer is empty or not. It is
+	 * used to figure out if a group can be deleted or not when
+	 * {@link GroupedWindowInvokable} is used.
+	 * 
+	 * @return true in case the buffer is empty otherwise false.
+	 */
+	protected boolean isBufferEmpty(){
+		return buffer.isEmpty();
+	}
+	
+	/**
 	 * This method does the final reduce at the end of the stream and emits the
 	 * result.
 	 *