You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/10/31 03:16:37 UTC

[1/2] incubator-beam git commit: Add setTimer and deleteTimer by ID in TimerInternals

Repository: incubator-beam
Updated Branches:
  refs/heads/master beccdc686 -> 7160ee9e4


Add setTimer and deleteTimer by ID in TimerInternals

For users, timers will have IDs. This is a step towards
that API; our current API treats the timer's timestamp
as its ID, more-or-less. We can leave that API or not. This
change adds the more general API.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/371d56f2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/371d56f2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/371d56f2

Branch: refs/heads/master
Commit: 371d56f2c5e36b59596db90ca032ef6c7d8026e9
Parents: beccdc6
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Oct 25 12:28:14 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sun Oct 30 20:14:03 2016 -0700

----------------------------------------------------------------------
 .../runners/direct/DirectTimerInternals.java    | 13 ++++++++++
 .../wrappers/streaming/WindowDoFnOperator.java  | 13 ++++++++++
 .../apache/beam/sdk/util/TimerInternals.java    | 27 +++++++++++++++++---
 .../sdk/util/state/InMemoryTimerInternals.java  | 12 +++++++++
 4 files changed, 61 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/371d56f2/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
index 3158577..4245a87 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
@@ -21,7 +21,9 @@ import javax.annotation.Nullable;
 import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
 import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate.TimerUpdateBuilder;
 import org.apache.beam.runners.direct.WatermarkManager.TransformWatermarks;
+import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.state.StateNamespace;
 import org.joda.time.Instant;
 
 /**
@@ -45,11 +47,22 @@ class DirectTimerInternals implements TimerInternals {
   }
 
   @Override
+  public void setTimer(StateNamespace namespace, String timerId, Instant target,
+      TimeDomain timeDomain) {
+    throw new UnsupportedOperationException("Setting timer by ID not yet supported.");
+  }
+
+  @Override
   public void setTimer(TimerData timerKey) {
     timerUpdateBuilder.setTimer(timerKey);
   }
 
   @Override
+  public void deleteTimer(StateNamespace namespace, String timerId) {
+    throw new UnsupportedOperationException("Canceling of timer by ID is not yet supported.");
+  }
+
+  @Override
   public void deleteTimer(TimerData timerKey) {
     timerUpdateBuilder.deletedTimer(timerKey);
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/371d56f2/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
index e06a783..5debd4b 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
@@ -55,6 +55,7 @@ import org.apache.beam.sdk.util.WindowedValue;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.state.StateInternals;
 import org.apache.beam.sdk.util.state.StateInternalsFactory;
+import org.apache.beam.sdk.util.state.StateNamespace;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
@@ -451,6 +452,12 @@ public class WindowDoFnOperator<K, InputT, OutputT>
     public TimerInternals timerInternals() {
       return new TimerInternals() {
         @Override
+        public void setTimer(
+            StateNamespace namespace, String timerId, Instant target, TimeDomain timeDomain) {
+          throw new UnsupportedOperationException("Setting a timer by ID is not yet supported.");
+        }
+
+        @Override
         public void setTimer(TimerData timerKey) {
           if (timerKey.getDomain().equals(TimeDomain.EVENT_TIME)) {
             registerEventTimeTimer(timerKey);
@@ -463,6 +470,12 @@ public class WindowDoFnOperator<K, InputT, OutputT>
         }
 
         @Override
+        public void deleteTimer(StateNamespace namespace, String timerId) {
+          throw new UnsupportedOperationException(
+              "Canceling of a timer by ID is not yet supported.");
+        }
+
+        @Override
         public void deleteTimer(TimerData timerKey) {
           if (timerKey.getDomain().equals(TimeDomain.EVENT_TIME)) {
             deleteEventTimeTimer(timerKey);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/371d56f2/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java
index 58678f8..8015116 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java
@@ -50,17 +50,36 @@ import org.joda.time.Instant;
 public interface TimerInternals {
 
   /**
-   * Writes out a timer to be fired when the watermark reaches the given
-   * timestamp.
+   * Writes out a timer to be fired when the current time in the specified time domain reaches the
+   * target timestamp.
    *
-   * <p>The combination of {@code namespace}, {@code timestamp} and {@code domain} uniquely
-   * identify a timer. Multiple timers set for the same parameters can be safely deduplicated.
+   * <p>The combination of {@code namespace} and {@code timerId} uniquely identify a timer.
+   *
+   * <p>If a timer is set and then set again before it fires, later settings should clear the prior
+   * setting.
+   *
+   * <p>It is an error to set a timer for two different time domains.
+   */
+  void setTimer(StateNamespace namespace, String timerId, Instant target, TimeDomain timeDomain);
+
+  /**
+   * Writes out a timer to be fired when the watermark reaches the given timestamp, automatically
+   * generating an id for it from the provided {@link TimerData}.
+   *
+   * <p>The {@link TimerData} contains all the fields necessary to set the timer. The timer's ID
+   * is determinstically generated from the {@link TimerData}, so it may be canceled using
+   * the same {@link TimerData}.
    */
   void setTimer(TimerData timerKey);
 
   /**
    * Deletes the given timer.
    */
+  void deleteTimer(StateNamespace namespace, String timerId);
+
+  /**
+   * Deletes the given timer, automatically inferring its ID from the {@link TimerData}.
+   */
   void deleteTimer(TimerData timerKey);
 
   /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/371d56f2/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java
index dcab5fe..a3bb45a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java
@@ -36,6 +36,7 @@ import org.joda.time.Instant;
  * computation and key in a Windmill-like streaming environment.
  */
 public class InMemoryTimerInternals implements TimerInternals {
+
   /** At most one timer per timestamp is kept. */
   private Set<TimerData> existingTimers = new HashSet<>();
 
@@ -97,6 +98,12 @@ public class InMemoryTimerInternals implements TimerInternals {
   }
 
   @Override
+  public void setTimer(StateNamespace namespace, String timerId, Instant target,
+      TimeDomain timeDomain) {
+    throw new UnsupportedOperationException("Setting a timer by ID is not yet supported.");
+  }
+
+  @Override
   public void setTimer(TimerData timer) {
     WindowTracing.trace("TestTimerInternals.setTimer: {}", timer);
     if (existingTimers.add(timer)) {
@@ -105,6 +112,11 @@ public class InMemoryTimerInternals implements TimerInternals {
   }
 
   @Override
+  public void deleteTimer(StateNamespace namespace, String timerId) {
+    throw new UnsupportedOperationException("Canceling a timer by ID is not yet supported.");
+  }
+
+  @Override
   public void deleteTimer(TimerData timer) {
     WindowTracing.trace("TestTimerInternals.deleteTimer: {}", timer);
     existingTimers.remove(timer);


[2/2] incubator-beam git commit: This closes #1217

Posted by ke...@apache.org.
This closes #1217


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7160ee9e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7160ee9e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7160ee9e

Branch: refs/heads/master
Commit: 7160ee9e4e4592464fda7aa14a30c85cd29a258e
Parents: beccdc6 371d56f
Author: Kenneth Knowles <kl...@google.com>
Authored: Sun Oct 30 20:15:26 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sun Oct 30 20:15:26 2016 -0700

----------------------------------------------------------------------
 .../runners/direct/DirectTimerInternals.java    | 13 ++++++++++
 .../wrappers/streaming/WindowDoFnOperator.java  | 13 ++++++++++
 .../apache/beam/sdk/util/TimerInternals.java    | 27 +++++++++++++++++---
 .../sdk/util/state/InMemoryTimerInternals.java  | 12 +++++++++
 4 files changed, 61 insertions(+), 4 deletions(-)
----------------------------------------------------------------------