You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/10/23 17:37:31 UTC

[2/2] flink git commit: [hotfix] Improve handling of Window Trigger results

[hotfix] Improve handling of Window Trigger results

This enhances the TriggerResult enum with methods isFire() and isPurge()
that simplify the logic in WindowOperator.processTriggerResult().

Also, the operator now keeps track of the current watermark and fires
immediately if a trigger registers an event-time callback for a
timestamp that lies in the past. For this the TriggerResult now as
method merge() that allows to merge to TriggerResultS.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e6969377
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e6969377
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e6969377

Branch: refs/heads/master
Commit: e696937788c0fcba78dcdf820a5ebb70f8086710
Parents: e18cdd0
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Oct 23 11:31:13 2015 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Oct 23 15:49:50 2015 +0200

----------------------------------------------------------------------
 .../api/windowing/triggers/Trigger.java         |  45 ++++++-
 .../windowing/NonKeyedWindowOperator.java       | 107 +++++++++++-----
 .../operators/windowing/WindowOperator.java     | 127 +++++++++++++------
 .../flink/streaming/util/TestHarnessUtil.java   |  33 ++++-
 4 files changed, 236 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e6969377/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
index 15ccb33..ee6a279 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
@@ -77,10 +77,51 @@ public interface Trigger<T, W extends Window> extends Serializable {
 	 * <p>
 	 * On {@code FIRE} the pane is evaluated and results are emitted. The contents of the window
 	 * are kept. {@code FIRE_AND_PURGE} acts like {@code FIRE} but the contents of the pane
-	 * are purged. On {@code CONTINUE} nothing happens, processing continues.
+	 * are purged. On {@code CONTINUE} nothing happens, processing continues. On {@code PURGE}
+	 * the contents of the window are discarded and now result is emitted for the window.
 	 */
 	enum TriggerResult {
-		CONTINUE, FIRE_AND_PURGE, FIRE
+		CONTINUE(false, false), FIRE_AND_PURGE(true, true), FIRE(true, false), PURGE(false, true);
+
+		private final boolean fire;
+		private final boolean purge;
+
+		TriggerResult(boolean fire, boolean purge) {
+			this.purge = purge;
+			this.fire = fire;
+		}
+
+		public boolean isFire() {
+			return fire;
+		}
+
+		public boolean isPurge() {
+			return purge;
+		}
+
+		/**
+		 * Merges two {@code TriggerResults}. This specifies what should happen if we have
+		 * two results from a Trigger, for example as a result from
+		 * {@link #onElement(Object, long, Window, TriggerContext)} and
+		 * {@link #onEventTime(long, Window, TriggerContext)}.
+		 *
+		 * <p>
+		 * For example, if one result says {@code CONTINUE} while the other says {@code FIRE}
+		 * then {@code FIRE} is the combined result;
+		 */
+		public static TriggerResult merge(TriggerResult a, TriggerResult b) {
+			if (a.purge || b.purge) {
+				if (a.fire || b.fire) {
+					return FIRE_AND_PURGE;
+				} else {
+					return PURGE;
+				}
+			} else if (a.fire || b.fire) {
+				return FIRE;
+			} else {
+				return CONTINUE;
+			}
+		}
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/e6969377/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
index 2209d5e..d12a930 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
@@ -49,6 +49,7 @@ import org.slf4j.LoggerFactory;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.io.ObjectInputStream;
 import java.io.Serializable;
 import java.util.Collection;
 import java.util.HashMap;
@@ -122,6 +123,12 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 	 */
 	protected transient TimestampedCollector<OUT> timestampedCollector;
 
+	/**
+	 * To keep track of the current watermark so that we can immediately fire if a trigger
+	 * registers an event time callback for a timestamp that lies in the past.
+	 */
+	protected transient long currentWatermark = -1L;
+
 	// ------------------------------------------------------------------------
 	// State that needs to be checkpointed
 	// ------------------------------------------------------------------------
@@ -152,6 +159,11 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 		setChainingStrategy(ChainingStrategy.ALWAYS);
 	}
 
+	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+		in.defaultReadObject();
+		currentWatermark = -1;
+	}
+
 	@Override
 	@SuppressWarnings("unchecked")
 	public final void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) {
@@ -232,7 +244,7 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 				windows.put(window, context);
 			}
 			context.windowBuffer.storeElement(element);
-			Trigger.TriggerResult triggerResult = trigger.onElement(element.getValue(), element.getTimestamp(), window, context);
+			Trigger.TriggerResult triggerResult = context.onElement(element);
 			processTriggerResult(triggerResult, window);
 		}
 	}
@@ -249,53 +261,60 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 	}
 
 	private void processTriggerResult(Trigger.TriggerResult triggerResult, W window) throws Exception {
-		switch (triggerResult) {
-			case FIRE: {
-				Context context = windows.get(window);
-				if (context == null) {
-					LOG.debug("Window {} already gone.", window);
-					return;
-				}
-
-
-				emitWindow(context);
-				break;
-			}
-
-			case FIRE_AND_PURGE: {
-				Context context = windows.remove(window);
-				if (context == null) {
-					LOG.debug("Window {} already gone.", window);
-					return;
-				}
+		if (!triggerResult.isFire() && !triggerResult.isPurge()) {
+			// do nothing
+			return;
+		}
+		Context context;
 
-				emitWindow(context);
-				break;
-			}
+		if (triggerResult.isPurge()) {
+			context = windows.remove(window);
+		} else {
+			context = windows.get(window);
+		}
+		if (context == null) {
+			LOG.debug("Window {} already gone.", window);
+			return;
+		}
 
-			case CONTINUE:
-				// ingore
+		if (triggerResult.isFire()) {
+			emitWindow(context);
 		}
 	}
 
 	@Override
 	public final void processWatermark(Watermark mark) throws Exception {
 		Set<Long> toRemove = new HashSet<>();
+		Set<Context> toTrigger = new HashSet<>();
 
+		// we cannot call the Trigger in here because trigger methods might register new triggers.
+		// that would lead to concurrent modification errors.
 		for (Map.Entry<Long, Set<Context>> triggers: watermarkTimers.entrySet()) {
 			if (triggers.getKey() <= mark.getTimestamp()) {
 				for (Context context: triggers.getValue()) {
-					Trigger.TriggerResult triggerResult = context.onEventTime(triggers.getKey());
-					processTriggerResult(triggerResult, context.window);
+					toTrigger.add(context);
 				}
 				toRemove.add(triggers.getKey());
 			}
 		}
 
+		for (Context context: toTrigger) {
+			// double check the time. it can happen that the trigger registers a new timer,
+			// in that case the entry is left in the watermarkTimers set for performance reasons.
+			// We have to check here whether the entry in the set still reflects the
+			// currently set timer in the Context.
+			if (context.watermarkTimer <= mark.getTimestamp()) {
+				Trigger.TriggerResult triggerResult = context.onEventTime(context.watermarkTimer);
+				processTriggerResult(triggerResult, context.window);
+			}
+		}
+
 		for (Long l: toRemove) {
 			watermarkTimers.remove(l);
 		}
 		output.emitWatermark(mark);
+
+		this.currentWatermark = mark.getTimestamp();
 	}
 
 	@Override
@@ -318,8 +337,14 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 	}
 
 	/**
-	 * A context object that is given to {@code Trigger} functions to allow them to register
-	 * timer/watermark callbacks.
+	 * The {@code Context} is responsible for keeping track of the state of one pane.
+	 *
+	 * <p>
+	 * A pane is the bucket of elements that have the same key (assigned by the
+	 * {@link org.apache.flink.api.java.functions.KeySelector}) and same {@link Window}. An element can
+	 * be in multiple panes of it was assigned to multiple windows by the
+	 * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. These panes all
+	 * have their own instance of the {@code Trigger}.
 	 */
 	protected class Context implements Trigger.TriggerContext {
 		protected W window;
@@ -435,8 +460,20 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 			triggers.add(this);
 		}
 
+		public Trigger.TriggerResult onElement(StreamRecord<IN> element) throws Exception {
+			Trigger.TriggerResult onElementResult = trigger.onElement(element.getValue(), element.getTimestamp(), window, this);
+			if (watermarkTimer > 0 && watermarkTimer <= currentWatermark) {
+				// fire now and don't wait for the next watermark update
+				Trigger.TriggerResult onEventTimeResult = onEventTime(watermarkTimer);
+				return Trigger.TriggerResult.merge(onElementResult, onEventTimeResult);
+			} else {
+				return onElementResult;
+			}
+		}
+
 		public Trigger.TriggerResult onProcessingTime(long time) throws Exception {
 			if (time == processingTimeTimer) {
+				processingTimeTimer = -1;
 				return trigger.onProcessingTime(time, window, this);
 			} else {
 				return Trigger.TriggerResult.CONTINUE;
@@ -445,7 +482,17 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 
 		public Trigger.TriggerResult onEventTime(long time) throws Exception {
 			if (time == watermarkTimer) {
-				return trigger.onEventTime(time, window, this);
+				watermarkTimer = -1;
+				Trigger.TriggerResult firstTriggerResult = trigger.onEventTime(time, window, this);
+
+				if (watermarkTimer > 0 && watermarkTimer <= currentWatermark) {
+					// fire now and don't wait for the next watermark update
+					Trigger.TriggerResult secondTriggerResult = onEventTime(watermarkTimer);
+					return Trigger.TriggerResult.merge(firstTriggerResult, secondTriggerResult);
+				} else {
+					return firstTriggerResult;
+				}
+
 			} else {
 				return Trigger.TriggerResult.CONTINUE;
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/e6969377/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index e8e001d..c39679f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -50,6 +50,7 @@ import org.slf4j.LoggerFactory;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.io.ObjectInputStream;
 import java.io.Serializable;
 import java.util.Collection;
 import java.util.HashMap;
@@ -147,6 +148,12 @@ public class WindowOperator<K, IN, OUT, W extends Window>
 	 */
 	protected transient TimestampedCollector<OUT> timestampedCollector;
 
+	/**
+	 * To keep track of the current watermark so that we can immediately fire if a trigger
+	 * registers an event time callback for a timestamp that lies in the past.
+	 */
+	protected transient long currentWatermark = -1L;
+
 	// ------------------------------------------------------------------------
 	// State that needs to be checkpointed
 	// ------------------------------------------------------------------------
@@ -181,6 +188,11 @@ public class WindowOperator<K, IN, OUT, W extends Window>
 		setChainingStrategy(ChainingStrategy.ALWAYS);
 	}
 
+	private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+		in.defaultReadObject();
+		currentWatermark = -1;
+	}
+
 	@Override
 	@SuppressWarnings("unchecked")
 	public final void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) {
@@ -276,8 +288,9 @@ public class WindowOperator<K, IN, OUT, W extends Window>
 				context = new Context(key, window, windowBuffer);
 				keyWindows.put(window, context);
 			}
+
 			context.windowBuffer.storeElement(element);
-			Trigger.TriggerResult triggerResult = trigger.onElement(element.getValue(), element.getTimestamp(), window, context);
+			Trigger.TriggerResult triggerResult = context.onElement(element);
 			processTriggerResult(triggerResult, key, window);
 		}
 	}
@@ -297,66 +310,68 @@ public class WindowOperator<K, IN, OUT, W extends Window>
 	}
 
 	private void processTriggerResult(Trigger.TriggerResult triggerResult, K key, W window) throws Exception {
-		switch (triggerResult) {
-			case FIRE: {
-				Map<W, Context> keyWindows = windows.get(key);
-				if (keyWindows == null) {
-					LOG.debug("Window {} for key {} already gone.", window, key);
-					return;
-				}
-				Context context = keyWindows.get(window);
-				if (context == null) {
-					LOG.debug("Window {} for key {} already gone.", window, key);
-					return;
-				}
-
-
-				emitWindow(context);
-				break;
-			}
-
-			case FIRE_AND_PURGE: {
-				Map<W, Context> keyWindows = windows.get(key);
-				if (keyWindows == null) {
-					LOG.debug("Window {} for key {} already gone.", window, key);
-					return;
-				}
-				Context context = keyWindows.remove(window);
-				if (context == null) {
-					LOG.debug("Window {} for key {} already gone.", window, key);
-					return;
-				}
-				if (keyWindows.isEmpty()) {
-					windows.remove(key);
-				}
+		if (!triggerResult.isFire() && !triggerResult.isPurge()) {
+			// do nothing
+			return;
+		}
+		Context context;
+		Map<W, Context> keyWindows = windows.get(key);
+		if (keyWindows == null) {
+			LOG.debug("Window {} for key {} already gone.", window, key);
+			return;
+		}
 
-				emitWindow(context);
-				break;
+		if (triggerResult.isPurge()) {
+			context = keyWindows.remove(window);
+			if (keyWindows.isEmpty()) {
+				windows.remove(key);
 			}
+		} else {
+			context = keyWindows.get(window);
+		}
+		if (context == null) {
+			LOG.debug("Window {} for key {} already gone.", window, key);
+			return;
+		}
 
-			case CONTINUE:
-				// ingore
+		if (triggerResult.isFire()) {
+			emitWindow(context);
 		}
 	}
 
 	@Override
 	public final void processWatermark(Watermark mark) throws Exception {
 		Set<Long> toRemove = new HashSet<>();
+		Set<Context> toTrigger = new HashSet<>();
 
+		// we cannot call the Trigger in here because trigger methods might register new triggers.
+		// that would lead to concurrent modification errors.
 		for (Map.Entry<Long, Set<Context>> triggers: watermarkTimers.entrySet()) {
 			if (triggers.getKey() <= mark.getTimestamp()) {
 				for (Context context: triggers.getValue()) {
-					Trigger.TriggerResult triggerResult = context.onEventTime(triggers.getKey());
-					processTriggerResult(triggerResult, context.key, context.window);
+					toTrigger.add(context);
 				}
 				toRemove.add(triggers.getKey());
 			}
 		}
 
+		for (Context context: toTrigger) {
+			// double check the time. it can happen that the trigger registers a new timer,
+			// in that case the entry is left in the watermarkTimers set for performance reasons.
+			// We have to check here whether the entry in the set still reflects the
+			// currently set timer in the Context.
+			if (context.watermarkTimer <= mark.getTimestamp()) {
+				Trigger.TriggerResult triggerResult = context.onEventTime(context.watermarkTimer);
+				processTriggerResult(triggerResult, context.key, context.window);
+			}
+		}
+
 		for (Long l: toRemove) {
 			watermarkTimers.remove(l);
 		}
 		output.emitWatermark(mark);
+
+		this.currentWatermark = mark.getTimestamp();
 	}
 
 	@Override
@@ -380,8 +395,14 @@ public class WindowOperator<K, IN, OUT, W extends Window>
 	}
 
 	/**
-	 * A context object that is given to {@code Trigger} functions to allow them to register
-	 * timer/watermark callbacks.
+	 * The {@code Context} is responsible for keeping track of the state of one pane.
+	 *
+	 * <p>
+	 * A pane is the bucket of elements that have the same key (assigned by the
+	 * {@link org.apache.flink.api.java.functions.KeySelector}) and same {@link Window}. An element can
+	 * be in multiple panes of it was assigned to multiple windows by the
+	 * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. These panes all
+	 * have their own instance of the {@code Trigger}.
 	 */
 	protected class Context implements Trigger.TriggerContext {
 		protected K key;
@@ -508,8 +529,20 @@ public class WindowOperator<K, IN, OUT, W extends Window>
 			triggers.add(this);
 		}
 
+		public Trigger.TriggerResult onElement(StreamRecord<IN> element) throws Exception {
+			Trigger.TriggerResult onElementResult = trigger.onElement(element.getValue(), element.getTimestamp(), window, this);
+			if (watermarkTimer > 0 && watermarkTimer <= currentWatermark) {
+				// fire now and don't wait for the next watermark update
+				Trigger.TriggerResult onEventTimeResult = onEventTime(watermarkTimer);
+				return Trigger.TriggerResult.merge(onElementResult, onEventTimeResult);
+			} else {
+				return onElementResult;
+			}
+		}
+
 		public Trigger.TriggerResult onProcessingTime(long time) throws Exception {
 			if (time == processingTimeTimer) {
+				processingTimeTimer = -1;
 				return trigger.onProcessingTime(time, window, this);
 			} else {
 				return Trigger.TriggerResult.CONTINUE;
@@ -518,7 +551,17 @@ public class WindowOperator<K, IN, OUT, W extends Window>
 
 		public Trigger.TriggerResult onEventTime(long time) throws Exception {
 			if (time == watermarkTimer) {
-				return trigger.onEventTime(time, window, this);
+				watermarkTimer = -1;
+				Trigger.TriggerResult firstTriggerResult = trigger.onEventTime(time, window, this);
+
+				if (watermarkTimer > 0 && watermarkTimer <= currentWatermark) {
+					// fire now and don't wait for the next watermark update
+					Trigger.TriggerResult secondTriggerResult = onEventTime(watermarkTimer);
+					return Trigger.TriggerResult.merge(firstTriggerResult, secondTriggerResult);
+				} else {
+					return firstTriggerResult;
+				}
+
 			} else {
 				return Trigger.TriggerResult.CONTINUE;
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/e6969377/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
index 889ae37..9d1e674 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TestHarnessUtil.java
@@ -17,11 +17,14 @@
  */
 package org.apache.flink.streaming.util;
 
+import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.junit.Assert;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Queue;
@@ -72,8 +75,34 @@ public class TestHarnessUtil {
 	 * Compare the two queues containing operator/task output by converting them to an array first.
 	 */
 	public static void assertOutputEqualsSorted(String message, Queue<Object> expected, Queue<Object> actual, Comparator<Object> comparator) {
-		Object[] sortedExpected = expected.toArray();
-		Object[] sortedActual = actual.toArray();
+		// first, compare only watermarks, their position should be deterministic
+		Iterator<Object> exIt = expected.iterator();
+		Iterator<Object> actIt = actual.iterator();
+		while (exIt.hasNext()) {
+			Object nextEx = exIt.next();
+			Object nextAct = actIt.next();
+			if (nextEx instanceof Watermark) {
+				Assert.assertEquals(nextEx, nextAct);
+			}
+		}
+
+		List<Object> expectedRecords = new ArrayList<>();
+		List<Object> actualRecords = new ArrayList<>();
+
+		for (Object ex: expected) {
+			if (ex instanceof StreamRecord) {
+				expectedRecords.add(ex);
+			}
+		}
+
+		for (Object act: actual) {
+			if (act instanceof StreamRecord) {
+				actualRecords.add(act);
+			}
+		}
+
+		Object[] sortedExpected = expectedRecords.toArray();
+		Object[] sortedActual = actualRecords.toArray();
 
 		Arrays.sort(sortedExpected, comparator);
 		Arrays.sort(sortedActual, comparator);