You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by mx...@apache.org on 2020/05/04 08:51:04 UTC

[beam] 04/04: [BEAM-9874] Support clearing timers in portable batch mode (Spark/Flink)

This is an automated email from the ASF dual-hosted git repository.

mxm pushed a commit to branch release-2.21.0
in repository https://gitbox.apache.org/repos/asf/beam.git

commit fe15aecd618e0a76960eca76ebc5d17b899b293c
Author: Maximilian Michels <mx...@apache.org>
AuthorDate: Sun May 3 12:38:35 2020 +0200

    [BEAM-9874] Support clearing timers in portable batch mode (Spark/Flink)
---
 .../beam/runners/core/InMemoryTimerInternals.java   | 21 ++++++++-------------
 .../functions/FlinkExecutableStageFunction.java     |  6 +++++-
 .../translation/SparkExecutableStageFunction.java   |  6 +++++-
 3 files changed, 18 insertions(+), 15 deletions(-)

diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
index 21420a7..241b49d 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
@@ -40,13 +40,13 @@ public class InMemoryTimerInternals implements TimerInternals {
   Table<StateNamespace, String, TimerData> existingTimers = HashBasedTable.create();
 
   /** Pending input watermark timers, in timestamp order. */
-  private NavigableSet<TimerData> watermarkTimers = new TreeSet<>();
+  private final NavigableSet<TimerData> watermarkTimers = new TreeSet<>();
 
   /** Pending processing time timers, in timestamp order. */
-  private NavigableSet<TimerData> processingTimers = new TreeSet<>();
+  private final NavigableSet<TimerData> processingTimers = new TreeSet<>();
 
   /** Pending synchronized processing time timers, in timestamp order. */
-  private NavigableSet<TimerData> synchronizedProcessingTimers = new TreeSet<>();
+  private final NavigableSet<TimerData> synchronizedProcessingTimers = new TreeSet<>();
 
   /** Current input watermark. */
   private Instant inputWatermarkTime = BoundedWindow.TIMESTAMP_MIN_VALUE;
@@ -68,9 +68,7 @@ public class InMemoryTimerInternals implements TimerInternals {
 
   /** Returns true when there are still timers to be fired. */
   public boolean hasPendingTimers() {
-    return !(watermarkTimers.isEmpty()
-        && processingTimers.isEmpty()
-        && synchronizedProcessingTimers.isEmpty());
+    return !existingTimers.isEmpty();
   }
 
   /**
@@ -167,9 +165,9 @@ public class InMemoryTimerInternals implements TimerInternals {
   @Deprecated
   @Override
   public void deleteTimer(StateNamespace namespace, String timerId, String timerFamilyId) {
-    TimerData existing = existingTimers.get(namespace, timerId + '+' + timerFamilyId);
-    if (existing != null) {
-      deleteTimer(existing);
+    TimerData removedTimer = existingTimers.remove(namespace, timerId + '+' + timerFamilyId);
+    if (removedTimer != null) {
+      timersForDomain(removedTimer.getDomain()).remove(removedTimer);
     }
   }
 
@@ -177,10 +175,7 @@ public class InMemoryTimerInternals implements TimerInternals {
   @Deprecated
   @Override
   public void deleteTimer(TimerData timer) {
-    WindowTracing.trace("{}.deleteTimer: {}", getClass().getSimpleName(), timer);
-    existingTimers.remove(
-        timer.getNamespace(), timer.getTimerId() + '+' + timer.getTimerFamilyId());
-    timersForDomain(timer.getDomain()).remove(timer);
+    deleteTimer(timer.getNamespace(), timer.getTimerId(), timer.getTimerFamilyId());
   }
 
   @Override
diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
index bf3355d..4ee46ae 100644
--- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
+++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkExecutableStageFunction.java
@@ -228,7 +228,11 @@ public class FlinkExecutableStageFunction<InputT> extends AbstractRichFunction
             stageBundleFactory,
             (Timer<?> timer, TimerInternals.TimerData timerData) -> {
               currentTimerKey = timer.getUserKey();
-              timerInternals.setTimer(timerData);
+              if (timer.getClearBit()) {
+                timerInternals.deleteTimer(timerData);
+              } else {
+                timerInternals.setTimer(timerData);
+              }
             },
             windowCoder);
 
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunction.java
index a9a0dec..233e095 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunction.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkExecutableStageFunction.java
@@ -151,7 +151,11 @@ class SparkExecutableStageFunction<InputT, SideInputT>
                 stageBundleFactory,
                 (Timer<?> timer, TimerInternals.TimerData timerData) -> {
                   currentTimerKey = timer.getUserKey();
-                  timerInternals.setTimer(timerData);
+                  if (timer.getClearBit()) {
+                    timerInternals.deleteTimer(timerData);
+                  } else {
+                    timerInternals.setTimer(timerData);
+                  }
                 },
                 windowCoder);