You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2017/03/17 21:46:47 UTC

[16/50] [abbrv] beam git commit: Move GC timer checking to StatefulDoFnRunner.CleanupTimer

Move GC timer checking to StatefulDoFnRunner.CleanupTimer


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/bf6d2748
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/bf6d2748
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/bf6d2748

Branch: refs/heads/gearpump-runner
Commit: bf6d2748c8876a7415290069163625598928f02f
Parents: 2c2424c
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Fri Mar 10 08:29:27 2017 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Fri Mar 10 11:09:04 2017 +0100

----------------------------------------------------------------------
 .../beam/runners/core/StatefulDoFnRunner.java   | 29 ++++++++++++++++----
 1 file changed, 23 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/bf6d2748/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
index 154d8bc..926345e 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/StatefulDoFnRunner.java
@@ -115,15 +115,12 @@ public class StatefulDoFnRunner<InputT, OutputT, W extends BoundedWindow>
   @Override
   public void onTimer(
       String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) {
-    boolean isEventTimer = timeDomain.equals(TimeDomain.EVENT_TIME);
-    Instant gcTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
-    if (isEventTimer && GC_TIMER_ID.equals(timerId) && gcTime.equals(timestamp)) {
+    if (cleanupTimer.isForWindow(timerId, window, timestamp, timeDomain)) {
       stateCleaner.clearForWindow(window);
       // There should invoke the onWindowExpiration of DoFn
     } else {
-      if (isEventTimer || !dropLateData(window)) {
-        doFnRunner.onTimer(timerId, window, timestamp, timeDomain);
-      }
+      // a timer can never be late because we don't allow setting timers after GC time
+      doFnRunner.onTimer(timerId, window, timestamp, timeDomain);
     }
   }
 
@@ -151,6 +148,16 @@ public class StatefulDoFnRunner<InputT, OutputT, W extends BoundedWindow>
      * Set the garbage collect time of the window to timer.
      */
     void setForWindow(BoundedWindow window);
+
+    /**
+     * Checks whether the given timer is a cleanup timer for the window.
+     */
+    boolean isForWindow(
+        String timerId,
+        BoundedWindow window,
+        Instant timestamp,
+        TimeDomain timeDomain);
+
   }
 
   /**
@@ -191,6 +198,16 @@ public class StatefulDoFnRunner<InputT, OutputT, W extends BoundedWindow>
           GC_TIMER_ID, gcTime, TimeDomain.EVENT_TIME);
     }
 
+    @Override
+    public boolean isForWindow(
+        String timerId,
+        BoundedWindow window,
+        Instant timestamp,
+        TimeDomain timeDomain) {
+      boolean isEventTimer = timeDomain.equals(TimeDomain.EVENT_TIME);
+      Instant gcTime = window.maxTimestamp().plus(windowingStrategy.getAllowedLateness());
+      return isEventTimer && GC_TIMER_ID.equals(timerId) && gcTime.equals(timestamp);
+    }
   }
 
   /**