You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2015/11/19 12:25:32 UTC
flink git commit: [hotfix] Fix concurrent processing-time Trigger in
WindowOperator
Repository: flink
Updated Branches:
refs/heads/master 8b086eb91 -> 891a1b966
[hotfix] Fix concurrent processing-time Trigger in WindowOperator
This fixes a problem that would occur if a Trigger registers a new
processing-time trigger in the onProcessingTime method. The problem is
that onProcessingTime() is called while traversing the set of active
triggers. If onProcessingTime() tries to register a new processing-time
trigger this leads to a concurrent modification exception.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/891a1b96
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/891a1b96
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/891a1b96
Branch: refs/heads/master
Commit: 891a1b966a28441f26b8f68b8ee68ef43e79a938
Parents: 8b086eb
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Nov 6 19:12:53 2015 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Nov 19 12:03:23 2015 +0100
----------------------------------------------------------------------
.../windowing/NonKeyedWindowOperator.java | 73 +++++++++++---------
.../operators/windowing/WindowOperator.java | 72 ++++++++++---------
2 files changed, 81 insertions(+), 64 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/891a1b96/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
index 7daaca4..5e4dea7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/NonKeyedWindowOperator.java
@@ -48,9 +48,12 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.Serializable;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -281,34 +284,31 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
@Override
public final void processWatermark(Watermark mark) throws Exception {
- Set<Long> toRemove = new HashSet<>();
- Set<Context> toTrigger = new HashSet<>();
+ List<Set<Context>> toTrigger = new ArrayList<>();
- // we cannot call the Trigger in here because trigger methods might register new triggers.
- // that would lead to concurrent modification errors.
- for (Map.Entry<Long, Set<Context>> triggers: watermarkTimers.entrySet()) {
+ Iterator<Map.Entry<Long, Set<Context>>> it = watermarkTimers.entrySet().iterator();
+
+ while (it.hasNext()) {
+ Map.Entry<Long, Set<Context>> triggers = it.next();
if (triggers.getKey() <= mark.getTimestamp()) {
- for (Context context: triggers.getValue()) {
- toTrigger.add(context);
- }
- toRemove.add(triggers.getKey());
+ toTrigger.add(triggers.getValue());
+ it.remove();
}
}
- for (Context context: toTrigger) {
- // double check the time. it can happen that the trigger registers a new timer,
- // in that case the entry is left in the watermarkTimers set for performance reasons.
- // We have to check here whether the entry in the set still reflects the
- // currently set timer in the Context.
- if (context.watermarkTimer <= mark.getTimestamp()) {
- Trigger.TriggerResult triggerResult = context.onEventTime(context.watermarkTimer);
- processTriggerResult(triggerResult, context.window);
+ for (Set<Context> ctxs: toTrigger) {
+ for (Context ctx: ctxs) {
+ // double check the time. it can happen that the trigger registers a new timer,
+ // in that case the entry is left in the watermarkTimers set for performance reasons.
+ // We have to check here whether the entry in the set still reflects the
+ // currently set timer in the Context.
+ if (ctx.watermarkTimer <= mark.getTimestamp()) {
+ Trigger.TriggerResult triggerResult = ctx.onEventTime(ctx.watermarkTimer);
+ processTriggerResult(triggerResult, ctx.window);
+ }
}
}
- for (Long l: toRemove) {
- watermarkTimers.remove(l);
- }
output.emitWatermark(mark);
this.currentWatermark = mark.getTimestamp();
@@ -316,20 +316,29 @@ public class NonKeyedWindowOperator<IN, OUT, W extends Window>
@Override
public final void trigger(long time) throws Exception {
- Set<Long> toRemove = new HashSet<>();
-
- for (Map.Entry<Long, Set<Context>> triggers: processingTimeTimers.entrySet()) {
- long actualTime = triggers.getKey();
- if (actualTime <= time) {
- for (Context context: triggers.getValue()) {
- Trigger.TriggerResult triggerResult = context.onProcessingTime(actualTime);
- processTriggerResult(triggerResult, context.window);
- }
- toRemove.add(triggers.getKey());
+ List<Set<Context>> toTrigger = new ArrayList<>();
+
+ Iterator<Map.Entry<Long, Set<Context>>> it = processingTimeTimers.entrySet().iterator();
+
+ while (it.hasNext()) {
+ Map.Entry<Long, Set<Context>> triggers = it.next();
+ if (triggers.getKey() <= time) {
+ toTrigger.add(triggers.getValue());
+ it.remove();
}
}
- for (Long l: toRemove) {
- processingTimeTimers.remove(l);
+
+ for (Set<Context> ctxs: toTrigger) {
+ for (Context ctx: ctxs) {
+ // double check the time. it can happen that the trigger registers a new timer,
+ // in that case the entry is left in the processingTimeTimers set for
+ // performance reasons. We have to check here whether the entry in the set still
+ // reflects the currently set timer in the Context.
+ if (ctx.processingTimeTimer <= time) {
+ Trigger.TriggerResult triggerResult = ctx.onProcessingTime(ctx.processingTimeTimer);
+ processTriggerResult(triggerResult, ctx.window);
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/891a1b96/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index 1c5a70c..f19e760 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -49,9 +49,12 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.Serializable;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -338,34 +341,31 @@ public class WindowOperator<K, IN, OUT, W extends Window>
@Override
public final void processWatermark(Watermark mark) throws Exception {
- Set<Long> toRemove = new HashSet<>();
- Set<Context> toTrigger = new HashSet<>();
+ List<Set<Context>> toTrigger = new ArrayList<>();
- // we cannot call the Trigger in here because trigger methods might register new triggers.
- // that would lead to concurrent modification errors.
- for (Map.Entry<Long, Set<Context>> triggers: watermarkTimers.entrySet()) {
+ Iterator<Map.Entry<Long, Set<Context>>> it = watermarkTimers.entrySet().iterator();
+
+ while (it.hasNext()) {
+ Map.Entry<Long, Set<Context>> triggers = it.next();
if (triggers.getKey() <= mark.getTimestamp()) {
- for (Context context: triggers.getValue()) {
- toTrigger.add(context);
- }
- toRemove.add(triggers.getKey());
+ toTrigger.add(triggers.getValue());
+ it.remove();
}
}
- for (Context context: toTrigger) {
- // double check the time. it can happen that the trigger registers a new timer,
- // in that case the entry is left in the watermarkTimers set for performance reasons.
- // We have to check here whether the entry in the set still reflects the
- // currently set timer in the Context.
- if (context.watermarkTimer <= mark.getTimestamp()) {
- Trigger.TriggerResult triggerResult = context.onEventTime(context.watermarkTimer);
- processTriggerResult(triggerResult, context.key, context.window);
+ for (Set<Context> ctxs: toTrigger) {
+ for (Context ctx: ctxs) {
+ // double check the time. it can happen that the trigger registers a new timer,
+ // in that case the entry is left in the watermarkTimers set for performance reasons.
+ // We have to check here whether the entry in the set still reflects the
+ // currently set timer in the Context.
+ if (ctx.watermarkTimer <= mark.getTimestamp()) {
+ Trigger.TriggerResult triggerResult = ctx.onEventTime(ctx.watermarkTimer);
+ processTriggerResult(triggerResult, ctx.key, ctx.window);
+ }
}
}
- for (Long l: toRemove) {
- watermarkTimers.remove(l);
- }
output.emitWatermark(mark);
this.currentWatermark = mark.getTimestamp();
@@ -373,21 +373,29 @@ public class WindowOperator<K, IN, OUT, W extends Window>
@Override
public final void trigger(long time) throws Exception {
- Set<Long> toRemove = new HashSet<>();
-
- for (Map.Entry<Long, Set<Context>> triggers: processingTimeTimers.entrySet()) {
- long actualTime = triggers.getKey();
- if (actualTime <= time) {
- for (Context context: triggers.getValue()) {
- Trigger.TriggerResult triggerResult = context.onProcessingTime(actualTime);
- processTriggerResult(triggerResult, context.key, context.window);
- }
- toRemove.add(triggers.getKey());
+ List<Set<Context>> toTrigger = new ArrayList<>();
+
+ Iterator<Map.Entry<Long, Set<Context>>> it = processingTimeTimers.entrySet().iterator();
+
+ while (it.hasNext()) {
+ Map.Entry<Long, Set<Context>> triggers = it.next();
+ if (triggers.getKey() <= time) {
+ toTrigger.add(triggers.getValue());
+ it.remove();
}
}
- for (Long l: toRemove) {
- processingTimeTimers.remove(l);
+ for (Set<Context> ctxs: toTrigger) {
+ for (Context ctx: ctxs) {
+ // double check the time. it can happen that the trigger registers a new timer,
+ // in that case the entry is left in the processingTimeTimers set for
+ // performance reasons. We have to check here whether the entry in the set still
+ // reflects the currently set timer in the Context.
+ if (ctx.processingTimeTimer <= time) {
+ Trigger.TriggerResult triggerResult = ctx.onProcessingTime(ctx.processingTimeTimer);
+ processTriggerResult(triggerResult, ctx.key, ctx.window);
+ }
+ }
}
}