You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mx...@apache.org on 2020/05/04 08:51:04 UTC
[beam] 04/04: [BEAM-9874] Support clearing timers in portable batch
mode (Spark/Flink)
This is an automated email from the ASF dual-hosted git repository.
mxm pushed a commit to branch release-2.21.0
in repository https://gitbox.apache.org/repos/asf/beam.git
commit fe15aecd618e0a76960eca76ebc5d17b899b293c
Author: Maximilian Michels <mx...@apache.org>
AuthorDate: Sun May 3 12:38:35 2020 +0200
[BEAM-9874] Support clearing timers in portable batch mode (Spark/Flink)
---
.../beam/runners/core/InMemoryTimerInternals.java | 21 ++++++++-------------
.../functions/FlinkExecutableStageFunction.java | 6 +++++-
.../translation/SparkExecutableStageFunction.java | 6 +++++-
3 files changed, 18 insertions(+), 15 deletions(-)
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
index 21420a7..241b49d 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
@@ -40,13 +40,13 @@ public class InMemoryTimerInternals implements TimerInternals {
Table<StateNamespace, String, TimerData> existingTimers = HashBasedTable.create();
/** Pending input watermark timers, in timestamp order. */
- private NavigableSet<TimerData> watermarkTimers = new TreeSet<>();
+ private final NavigableSet<TimerData> watermarkTimers = new TreeSet<>();
/** Pending processing time timers, in timestamp order. */
- private NavigableSet<TimerData> processingTimers = new TreeSet<>();
+ private final NavigableSet<TimerData> processingTimers = new TreeSet<>();
/** Pending synchronized processing time timers, in timestamp order. */
- private NavigableSet<TimerData> synchronizedProcessingTimers = new TreeSet<>();
+ private final NavigableSet<TimerData> synchronizedProcessingTimers = new TreeSet<>();
/** Current input watermark. */
private Instant inputWatermarkTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
@@ -68,9 +68,7 @@ public class InMemoryTimerInternals implements TimerInternals {
/** Returns true when there are still timers to be fired. */
public boolean hasPendingTimers() {
- return !(watermarkTimers.isEmpty()
- && processingTimers.isEmpty()
- && synchronizedProcessingTimers.isEmpty());
+ return !existingTimers.isEmpty();
}
/**
@@ -167,9 +165,9 @@ public class InMemoryTimerInternals implements TimerInternals {
@Deprecated
@Override
public void deleteTimer(StateNamespace namespace, String timerId, String timerFamilyId) {
- TimerData existing = existingTimers.get(namespace, timerId + '+' + timerFamilyId);
- if (existing != null) {
- deleteTimer(existing);
+ TimerData removedTimer = existingTimers.remove(namespace, timerId + '+' + timerFamilyId);
+ if (removedTimer != null) {
+ timersForDomain(removedTimer.getDomain()).remove(removedTimer);
}
}
@@ -177,10 +175,7 @@ public class InMemoryTimerInternals implements TimerInternals {
@Deprecated
@Override
public void deleteTimer(TimerData timer) {
- WindowTracing.trace("{}.deleteTimer: {}", getClass().getSimpleName(), timer);
- existingTimers.remove(
- timer.getNamespace(), timer.getTimerId() + '+' + timer.getTimerFamilyId());
- timersForDomain(timer.getDomain()).remove(timer);
+ deleteTimer(timer.getNamespace(), timer.getTimerId(), timer.getTimerFamilyId());
}
@Override
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
index bf3355d..4ee46ae 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
@@ -228,7 +228,11 @@ public class FlinkExecutableStageFunction<InputT> extends AbstractRichFunction
stageBundleFactory,
(Timer<?> timer, TimerInternals.TimerData timerData) -> {
currentTimerKey = timer.getUserKey();
- timerInternals.setTimer(timerData);
+ if (timer.getClearBit()) {
+ timerInternals.deleteTimer(timerData);
+ } else {
+ timerInternals.setTimer(timerData);
+ }
},
windowCoder);
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunction.java
index a9a0dec..233e095 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunction.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunction.java
@@ -151,7 +151,11 @@ class SparkExecutableStageFunction<InputT, SideInputT>
stageBundleFactory,
(Timer<?> timer, TimerInternals.TimerData timerData) -> {
currentTimerKey = timer.getUserKey();
- timerInternals.setTimer(timerData);
+ if (timer.getClearBit()) {
+ timerInternals.deleteTimer(timerData);
+ } else {
+ timerInternals.setTimer(timerData);
+ }
},
windowCoder);