You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/02 21:29:26 UTC

[1/2] incubator-beam git commit: This closes #1477

Repository: incubator-beam
Updated Branches:
  refs/heads/master a0884492a -> 37e891fe9


This closes #1477


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/37e891fe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/37e891fe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/37e891fe

Branch: refs/heads/master
Commit: 37e891fe92e51477c4d92a2781a9475f13873a7f
Parents: a088449 840fb3b
Author: Kenneth Knowles <kl...@google.com>
Authored: Fri Dec 2 13:28:57 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Dec 2 13:28:57 2016 -0800

----------------------------------------------------------------------
 .../operators/ApexGroupByKeyOperator.java       |   4 +-
 .../runners/direct/DirectTimerInternals.java    |   4 +-
 .../apache/beam/sdk/util/TimerInternals.java    | 107 +++++++------------
 .../sdk/util/state/InMemoryTimerInternals.java  |   8 +-
 .../beam/sdk/util/TimerInternalsTest.java       |   4 +-
 5 files changed, 49 insertions(+), 78 deletions(-)
----------------------------------------------------------------------



[2/2] incubator-beam git commit: Add timerId to TimerData

Posted by ke...@apache.org.
Add timerId to TimerData

This timerId is generated to be identical to historical behavior, and
to be unique per time domain and timestamp.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/840fb3b9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/840fb3b9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/840fb3b9

Branch: refs/heads/master
Commit: 840fb3b9030908ef50937cc2e4498a2cdcb7b680
Parents: a088449
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Nov 23 14:30:57 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Dec 2 13:28:57 2016 -0800

----------------------------------------------------------------------
 .../operators/ApexGroupByKeyOperator.java       |   4 +-
 .../runners/direct/DirectTimerInternals.java    |   4 +-
 .../apache/beam/sdk/util/TimerInternals.java    | 107 +++++++------------
 .../sdk/util/state/InMemoryTimerInternals.java  |   8 +-
 .../beam/sdk/util/TimerInternalsTest.java       |   4 +-
 5 files changed, 49 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/840fb3b9/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
index 3b0e4f2..f49c785 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
@@ -426,8 +426,8 @@ public class ApexGroupByKeyOperator<K, V> implements Operator {
   public class ApexTimerInternals implements TimerInternals {
 
     @Override
-    public void setTimer(TimerData timerKey) {
-      registerActiveTimer(context.element().key(), timerKey);
+    public void setTimer(TimerData timerData) {
+      registerActiveTimer(context.element().key(), timerData);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/840fb3b9/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
index 4245a87..8970b4b 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
@@ -53,8 +53,8 @@ class DirectTimerInternals implements TimerInternals {
   }
 
   @Override
-  public void setTimer(TimerData timerKey) {
-    timerUpdateBuilder.setTimer(timerKey);
+  public void setTimer(TimerData timerData) {
+    timerUpdateBuilder.setTimer(timerData);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/840fb3b9/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java
index 5d4a72d..c3e498e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java
@@ -18,18 +18,16 @@
 package org.apache.beam.sdk.util;
 
 import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.base.MoreObjects;
+import com.google.auto.value.AutoValue;
 import com.google.common.collect.ComparisonChain;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.Arrays;
 import java.util.List;
-import java.util.Objects;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
@@ -50,7 +48,7 @@ import org.joda.time.Instant;
 public interface TimerInternals {
 
   /**
-   * Writes out a timer to be fired when the current time in the specified time domain reaches the
+   * Sets a timer to be fired when the current time in the specified time domain reaches the
    * target timestamp.
    *
    * <p>The combination of {@code namespace} and {@code timerId} uniquely identify a timer.
@@ -63,14 +61,9 @@ public interface TimerInternals {
   void setTimer(StateNamespace namespace, String timerId, Instant target, TimeDomain timeDomain);
 
   /**
-   * Writes out a timer to be fired when the watermark reaches the given timestamp, automatically
-   * generating an id for it from the provided {@link TimerData}.
-   *
-   * <p>The {@link TimerData} contains all the fields necessary to set the timer. The timer's ID
-   * is determinstically generated from the {@link TimerData}, so it may be canceled using
-   * the same {@link TimerData}.
+   * Sets the timer described by {@code timerData}.
    */
-  void setTimer(TimerData timerKey);
+  void setTimer(TimerData timerData);
 
   /**
    * Deletes the given timer.
@@ -78,7 +71,7 @@ public interface TimerInternals {
   void deleteTimer(StateNamespace namespace, String timerId);
 
   /**
-   * Deletes the given timer, automatically inferring its ID from the {@link TimerData}.
+   * Deletes the timer with the ID contained in the provided {@link TimerData}.
    */
   void deleteTimer(TimerData timerKey);
 
@@ -163,64 +156,38 @@ public interface TimerInternals {
   /**
    * Data about a timer as represented within {@link TimerInternals}.
    */
-  class TimerData implements Comparable<TimerData> {
-    private final StateNamespace namespace;
-    private final Instant timestamp;
-    private final TimeDomain domain;
+  @AutoValue
+  abstract class TimerData implements Comparable<TimerData> {
 
-    private TimerData(StateNamespace namespace, Instant timestamp, TimeDomain domain) {
-      this.namespace = checkNotNull(namespace);
-      this.timestamp = checkNotNull(timestamp);
-      this.domain = checkNotNull(domain);
-    }
+    public abstract String getTimerId();
 
-    public StateNamespace getNamespace() {
-      return namespace;
-    }
+    public abstract StateNamespace getNamespace();
 
-    public Instant getTimestamp() {
-      return timestamp;
-    }
+    public abstract Instant getTimestamp();
 
-    public TimeDomain getDomain() {
-      return domain;
-    }
+    public abstract TimeDomain getDomain();
 
     /**
-     * Construct the {@code TimerKey} for the given parameters.
+     * Construct a {@link TimerData} for the given parameters, where the timer ID is automatically
+     * generated.
      */
-    public static TimerData of(StateNamespace namespace, Instant timestamp, TimeDomain domain) {
-      return new TimerData(namespace, timestamp, domain);
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      if (this == obj) {
-        return true;
-      }
-
-      if (!(obj instanceof TimerData)) {
-        return false;
-      }
-
-      TimerData that = (TimerData) obj;
-      return Objects.equals(this.domain, that.domain)
-          && this.timestamp.isEqual(that.timestamp)
-          && Objects.equals(this.namespace, that.namespace);
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(domain, timestamp, namespace);
+    public static TimerData of(
+        String timerId, StateNamespace namespace, Instant timestamp, TimeDomain domain) {
+      return new AutoValue_TimerInternals_TimerData(timerId, namespace, timestamp, domain);
     }
 
-    @Override
-    public String toString() {
-      return MoreObjects.toStringHelper(getClass())
-          .add("namespace", namespace)
-          .add("timestamp", timestamp)
-          .add("domain", domain)
-          .toString();
+    /**
+     * Construct a {@link TimerData} for the given parameters, where the timer ID is
+     * deterministically generated from the {@code timestamp} and {@code domain}.
+     */
+    public static TimerData of(StateNamespace namespace, Instant timestamp, TimeDomain domain) {
+      String timerId =
+          new StringBuilder()
+              .append(domain.ordinal())
+              .append(':')
+              .append(timestamp.getMillis())
+              .toString();
+      return of(timerId, namespace, timestamp, domain);
     }
 
     /**
@@ -236,11 +203,11 @@ public interface TimerInternals {
       }
       ComparisonChain chain =
           ComparisonChain.start()
-              .compare(this.timestamp, that.getTimestamp())
-              .compare(this.domain, that.domain);
-      if (chain.result() == 0 && !this.namespace.equals(that.namespace)) {
+              .compare(this.getTimestamp(), that.getTimestamp())
+              .compare(this.getDomain(), that.getDomain());
+      if (chain.result() == 0 && !this.getNamespace().equals(that.getNamespace())) {
         // Obtaining the stringKey may be expensive; only do so if required
-        chain = chain.compare(namespace.stringKey(), that.namespace.stringKey());
+        chain = chain.compare(getNamespace().stringKey(), that.getNamespace().stringKey());
       }
       return chain.result();
     }
@@ -275,20 +242,22 @@ public interface TimerInternals {
     public void encode(TimerData timer, OutputStream outStream, Context context)
         throws CoderException, IOException {
       Context nestedContext = context.nested();
-      STRING_CODER.encode(timer.namespace.stringKey(), outStream, nestedContext);
-      INSTANT_CODER.encode(timer.timestamp, outStream, nestedContext);
-      STRING_CODER.encode(timer.domain.name(), outStream, nestedContext);
+      STRING_CODER.encode(timer.getTimerId(), outStream, nestedContext);
+      STRING_CODER.encode(timer.getNamespace().stringKey(), outStream, nestedContext);
+      INSTANT_CODER.encode(timer.getTimestamp(), outStream, nestedContext);
+      STRING_CODER.encode(timer.getDomain().name(), outStream, nestedContext);
     }
 
     @Override
     public TimerData decode(InputStream inStream, Context context)
         throws CoderException, IOException {
       Context nestedContext = context.nested();
+      String timerId = STRING_CODER.decode(inStream, nestedContext);
       StateNamespace namespace =
           StateNamespaces.fromString(STRING_CODER.decode(inStream, nestedContext), windowCoder);
       Instant timestamp = INSTANT_CODER.decode(inStream, nestedContext);
       TimeDomain domain = TimeDomain.valueOf(STRING_CODER.decode(inStream, nestedContext));
-      return TimerData.of(namespace, timestamp, domain);
+      return TimerData.of(timerId, namespace, timestamp, domain);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/840fb3b9/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java
index a3bb45a..60a90f5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java
@@ -104,10 +104,10 @@ public class InMemoryTimerInternals implements TimerInternals {
   }
 
   @Override
-  public void setTimer(TimerData timer) {
-    WindowTracing.trace("TestTimerInternals.setTimer: {}", timer);
-    if (existingTimers.add(timer)) {
-      queue(timer.getDomain()).add(timer);
+  public void setTimer(TimerData timerData) {
+    WindowTracing.trace("TestTimerInternals.setTimer: {}", timerData);
+    if (existingTimers.add(timerData)) {
+      queue(timerData.getDomain()).add(timerData);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/840fb3b9/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TimerInternalsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TimerInternalsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TimerInternalsTest.java
index e8ffdb3..7b56f1c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TimerInternalsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TimerInternalsTest.java
@@ -45,12 +45,14 @@ public class TimerInternalsTest {
   public void testTimerDataCoder() throws Exception {
     CoderProperties.coderDecodeEncodeEqual(
         TimerDataCoder.of(GlobalWindow.Coder.INSTANCE),
-        TimerData.of(StateNamespaces.global(), new Instant(0), TimeDomain.EVENT_TIME));
+        TimerData.of(
+            "arbitrary-id", StateNamespaces.global(), new Instant(0), TimeDomain.EVENT_TIME));
 
     Coder<IntervalWindow> windowCoder = IntervalWindow.getCoder();
     CoderProperties.coderDecodeEncodeEqual(
         TimerDataCoder.of(windowCoder),
         TimerData.of(
+            "another-id",
             StateNamespaces.window(
                 windowCoder, new IntervalWindow(new Instant(0), new Instant(100))),
             new Instant(99), TimeDomain.PROCESSING_TIME));