You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/11/19 12:25:32 UTC

flink git commit: [hotfix] Fix concurrent processing-time Trigger in WindowOperator

Repository: flink
Updated Branches:
  refs/heads/master 8b086eb91 -> 891a1b966


[hotfix] Fix concurrent processing-time Trigger in WindowOperator

This fixes a problem that would occur if a Trigger registers a new
processing-time trigger in the onProcessingTime method. The problem is
that onProcessingTime() is called while traversing the set of active
triggers. If onProcessingTime() tries to register a new processing-time
trigger this leads to a concurrent modification exception.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/891a1b96
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/891a1b96
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/891a1b96

Branch: refs/heads/master
Commit: 891a1b966a28441f26b8f68b8ee68ef43e79a938
Parents: 8b086eb
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Nov 6 19:12:53 2015 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Nov 19 12:03:23 2015 +0100

----------------------------------------------------------------------
 .../windowing/NonKeyedWindowOperator.java       | 73 +++++++++++---------
 .../operators/windowing/WindowOperator.java     | 72 ++++++++++---------
 2 files changed, 81 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/891a1b96/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
index 7daaca4..5e4dea7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
@@ -48,9 +48,12 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -281,34 +284,31 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 
 	@Override
 	public final void processWatermark(Watermark mark) throws Exception {
-		Set<Long> toRemove = new HashSet<>();
-		Set<Context> toTrigger = new HashSet<>();
+		List<Set<Context>> toTrigger = new ArrayList<>();
 
-		// we cannot call the Trigger in here because trigger methods might register new triggers.
-		// that would lead to concurrent modification errors.
-		for (Map.Entry<Long, Set<Context>> triggers: watermarkTimers.entrySet()) {
+		Iterator<Map.Entry<Long, Set<Context>>> it = watermarkTimers.entrySet().iterator();
+
+		while (it.hasNext()) {
+			Map.Entry<Long, Set<Context>> triggers = it.next();
 			if (triggers.getKey() <= mark.getTimestamp()) {
-				for (Context context: triggers.getValue()) {
-					toTrigger.add(context);
-				}
-				toRemove.add(triggers.getKey());
+				toTrigger.add(triggers.getValue());
+				it.remove();
 			}
 		}
 
-		for (Context context: toTrigger) {
-			// double check the time. it can happen that the trigger registers a new timer,
-			// in that case the entry is left in the watermarkTimers set for performance reasons.
-			// We have to check here whether the entry in the set still reflects the
-			// currently set timer in the Context.
-			if (context.watermarkTimer <= mark.getTimestamp()) {
-				Trigger.TriggerResult triggerResult = context.onEventTime(context.watermarkTimer);
-				processTriggerResult(triggerResult, context.window);
+		for (Set<Context> ctxs: toTrigger) {
+			for (Context ctx: ctxs) {
+				// double check the time. it can happen that the trigger registers a new timer,
+				// in that case the entry is left in the watermarkTimers set for performance reasons.
+				// We have to check here whether the entry in the set still reflects the
+				// currently set timer in the Context.
+				if (ctx.watermarkTimer <= mark.getTimestamp()) {
+					Trigger.TriggerResult triggerResult = ctx.onEventTime(ctx.watermarkTimer);
+					processTriggerResult(triggerResult, ctx.window);
+				}
 			}
 		}
 
-		for (Long l: toRemove) {
-			watermarkTimers.remove(l);
-		}
 		output.emitWatermark(mark);
 
 		this.currentWatermark = mark.getTimestamp();
@@ -316,20 +316,29 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
 
 	@Override
 	public final void trigger(long time) throws Exception {
-		Set<Long> toRemove = new HashSet<>();
-
-		for (Map.Entry<Long, Set<Context>> triggers: processingTimeTimers.entrySet()) {
-			long actualTime = triggers.getKey();
-			if (actualTime <= time) {
-				for (Context context: triggers.getValue()) {
-					Trigger.TriggerResult triggerResult = context.onProcessingTime(actualTime);
-					processTriggerResult(triggerResult, context.window);
-				}
-				toRemove.add(triggers.getKey());
+		List<Set<Context>> toTrigger = new ArrayList<>();
+
+		Iterator<Map.Entry<Long, Set<Context>>> it = processingTimeTimers.entrySet().iterator();
+
+		while (it.hasNext()) {
+			Map.Entry<Long, Set<Context>> triggers = it.next();
+			if (triggers.getKey() <= time) {
+				toTrigger.add(triggers.getValue());
+				it.remove();
 			}
 		}
-		for (Long l: toRemove) {
-			processingTimeTimers.remove(l);
+
+		for (Set<Context> ctxs: toTrigger) {
+			for (Context ctx: ctxs) {
+				// double check the time. it can happen that the trigger registers a new timer,
+				// in that case the entry is left in the processingTimeTimers set for
+				// performance reasons. We have to check here whether the entry in the set still
+				// reflects the currently set timer in the Context.
+				if (ctx.processingTimeTimer <= time) {
+					Trigger.TriggerResult triggerResult = ctx.onProcessingTime(ctx.processingTimeTimer);
+					processTriggerResult(triggerResult, ctx.window);
+				}
+			}
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/891a1b96/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 1c5a70c..f19e760 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -49,9 +49,12 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.Serializable;
+import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -338,34 +341,31 @@ public class WindowOperator<K, IN, OUT, W extends Window>
 
 	@Override
 	public final void processWatermark(Watermark mark) throws Exception {
-		Set<Long> toRemove = new HashSet<>();
-		Set<Context> toTrigger = new HashSet<>();
+		List<Set<Context>> toTrigger = new ArrayList<>();
 
-		// we cannot call the Trigger in here because trigger methods might register new triggers.
-		// that would lead to concurrent modification errors.
-		for (Map.Entry<Long, Set<Context>> triggers: watermarkTimers.entrySet()) {
+		Iterator<Map.Entry<Long, Set<Context>>> it = watermarkTimers.entrySet().iterator();
+
+		while (it.hasNext()) {
+			Map.Entry<Long, Set<Context>> triggers = it.next();
 			if (triggers.getKey() <= mark.getTimestamp()) {
-				for (Context context: triggers.getValue()) {
-					toTrigger.add(context);
-				}
-				toRemove.add(triggers.getKey());
+				toTrigger.add(triggers.getValue());
+				it.remove();
 			}
 		}
 
-		for (Context context: toTrigger) {
-			// double check the time. it can happen that the trigger registers a new timer,
-			// in that case the entry is left in the watermarkTimers set for performance reasons.
-			// We have to check here whether the entry in the set still reflects the
-			// currently set timer in the Context.
-			if (context.watermarkTimer <= mark.getTimestamp()) {
-				Trigger.TriggerResult triggerResult = context.onEventTime(context.watermarkTimer);
-				processTriggerResult(triggerResult, context.key, context.window);
+		for (Set<Context> ctxs: toTrigger) {
+			for (Context ctx: ctxs) {
+					// double check the time. it can happen that the trigger registers a new timer,
+					// in that case the entry is left in the watermarkTimers set for performance reasons.
+					// We have to check here whether the entry in the set still reflects the
+					// currently set timer in the Context.
+					if (ctx.watermarkTimer <= mark.getTimestamp()) {
+						Trigger.TriggerResult triggerResult = ctx.onEventTime(ctx.watermarkTimer);
+						processTriggerResult(triggerResult, ctx.key, ctx.window);
+					}
 			}
 		}
 
-		for (Long l: toRemove) {
-			watermarkTimers.remove(l);
-		}
 		output.emitWatermark(mark);
 
 		this.currentWatermark = mark.getTimestamp();
@@ -373,21 +373,29 @@ public class WindowOperator<K, IN, OUT, W extends Window>
 
 	@Override
 	public final void trigger(long time) throws Exception {
-		Set<Long> toRemove = new HashSet<>();
-
-		for (Map.Entry<Long, Set<Context>> triggers: processingTimeTimers.entrySet()) {
-			long actualTime = triggers.getKey();
-			if (actualTime <= time) {
-				for (Context context: triggers.getValue()) {
-					Trigger.TriggerResult triggerResult = context.onProcessingTime(actualTime);
-					processTriggerResult(triggerResult, context.key, context.window);
-				}
-				toRemove.add(triggers.getKey());
+		List<Set<Context>> toTrigger = new ArrayList<>();
+
+		Iterator<Map.Entry<Long, Set<Context>>> it = processingTimeTimers.entrySet().iterator();
+
+		while (it.hasNext()) {
+			Map.Entry<Long, Set<Context>> triggers = it.next();
+			if (triggers.getKey() <= time) {
+				toTrigger.add(triggers.getValue());
+				it.remove();
 			}
 		}
 
-		for (Long l: toRemove) {
-			processingTimeTimers.remove(l);
+		for (Set<Context> ctxs: toTrigger) {
+			for (Context ctx: ctxs) {
+				// double check the time. it can happen that the trigger registers a new timer,
+				// in that case the entry is left in the processingTimeTimers set for
+				// performance reasons. We have to check here whether the entry in the set still
+				// reflects the currently set timer in the Context.
+				if (ctx.processingTimeTimer <= time) {
+					Trigger.TriggerResult triggerResult = ctx.onProcessingTime(ctx.processingTimeTimer);
+					processTriggerResult(triggerResult, ctx.key, ctx.window);
+				}
+			}
 		}
 	}