You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/06 16:41:13 UTC
[23/50] [abbrv] incubator-beam git commit: Add timerId to TimerData
Add timerId to TimerData
This timerId is generated to be identical to historical behavior, and
to be unique per time domain and timestamp.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/840fb3b9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/840fb3b9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/840fb3b9
Branch: refs/heads/gearpump-runner
Commit: 840fb3b9030908ef50937cc2e4498a2cdcb7b680
Parents: a088449
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Nov 23 14:30:57 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Fri Dec 2 13:28:57 2016 -0800
----------------------------------------------------------------------
.../operators/ApexGroupByKeyOperator.java | 4 +-
.../runners/direct/DirectTimerInternals.java | 4 +-
.../apache/beam/sdk/util/TimerInternals.java | 107 +++++++------------
.../sdk/util/state/InMemoryTimerInternals.java | 8 +-
.../beam/sdk/util/TimerInternalsTest.java | 4 +-
5 files changed, 49 insertions(+), 78 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/840fb3b9/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
index 3b0e4f2..f49c785 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
@@ -426,8 +426,8 @@ public class ApexGroupByKeyOperator<K, V> implements Operator {
public class ApexTimerInternals implements TimerInternals {
@Override
- public void setTimer(TimerData timerKey) {
- registerActiveTimer(context.element().key(), timerKey);
+ public void setTimer(TimerData timerData) {
+ registerActiveTimer(context.element().key(), timerData);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/840fb3b9/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
index 4245a87..8970b4b 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
@@ -53,8 +53,8 @@ class DirectTimerInternals implements TimerInternals {
}
@Override
- public void setTimer(TimerData timerKey) {
- timerUpdateBuilder.setTimer(timerKey);
+ public void setTimer(TimerData timerData) {
+ timerUpdateBuilder.setTimer(timerData);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/840fb3b9/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java
index 5d4a72d..c3e498e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java
@@ -18,18 +18,16 @@
package org.apache.beam.sdk.util;
import static com.google.common.base.Preconditions.checkArgument;
-import static com.google.common.base.Preconditions.checkNotNull;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
-import com.google.common.base.MoreObjects;
+import com.google.auto.value.AutoValue;
import com.google.common.collect.ComparisonChain;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Arrays;
import java.util.List;
-import java.util.Objects;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CoderException;
@@ -50,7 +48,7 @@ import org.joda.time.Instant;
public interface TimerInternals {
/**
- * Writes out a timer to be fired when the current time in the specified time domain reaches the
+ * Sets a timer to be fired when the current time in the specified time domain reaches the
* target timestamp.
*
* <p>The combination of {@code namespace} and {@code timerId} uniquely identify a timer.
@@ -63,14 +61,9 @@ public interface TimerInternals {
void setTimer(StateNamespace namespace, String timerId, Instant target, TimeDomain timeDomain);
/**
- * Writes out a timer to be fired when the watermark reaches the given timestamp, automatically
- * generating an id for it from the provided {@link TimerData}.
- *
- * <p>The {@link TimerData} contains all the fields necessary to set the timer. The timer's ID
- * is determinstically generated from the {@link TimerData}, so it may be canceled using
- * the same {@link TimerData}.
+ * Sets the timer described by {@code timerData}.
*/
- void setTimer(TimerData timerKey);
+ void setTimer(TimerData timerData);
/**
* Deletes the given timer.
@@ -78,7 +71,7 @@ public interface TimerInternals {
void deleteTimer(StateNamespace namespace, String timerId);
/**
- * Deletes the given timer, automatically inferring its ID from the {@link TimerData}.
+ * Deletes the timer with the ID contained in the provided {@link TimerData}.
*/
void deleteTimer(TimerData timerKey);
@@ -163,64 +156,38 @@ public interface TimerInternals {
/**
* Data about a timer as represented within {@link TimerInternals}.
*/
- class TimerData implements Comparable<TimerData> {
- private final StateNamespace namespace;
- private final Instant timestamp;
- private final TimeDomain domain;
+ @AutoValue
+ abstract class TimerData implements Comparable<TimerData> {
- private TimerData(StateNamespace namespace, Instant timestamp, TimeDomain domain) {
- this.namespace = checkNotNull(namespace);
- this.timestamp = checkNotNull(timestamp);
- this.domain = checkNotNull(domain);
- }
+ public abstract String getTimerId();
- public StateNamespace getNamespace() {
- return namespace;
- }
+ public abstract StateNamespace getNamespace();
- public Instant getTimestamp() {
- return timestamp;
- }
+ public abstract Instant getTimestamp();
- public TimeDomain getDomain() {
- return domain;
- }
+ public abstract TimeDomain getDomain();
/**
- * Construct the {@code TimerKey} for the given parameters.
+ * Construct a {@link TimerData} for the given parameters, where the timer ID is automatically
+ * generated.
*/
- public static TimerData of(StateNamespace namespace, Instant timestamp, TimeDomain domain) {
- return new TimerData(namespace, timestamp, domain);
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
-
- if (!(obj instanceof TimerData)) {
- return false;
- }
-
- TimerData that = (TimerData) obj;
- return Objects.equals(this.domain, that.domain)
- && this.timestamp.isEqual(that.timestamp)
- && Objects.equals(this.namespace, that.namespace);
- }
-
- @Override
- public int hashCode() {
- return Objects.hash(domain, timestamp, namespace);
+ public static TimerData of(
+ String timerId, StateNamespace namespace, Instant timestamp, TimeDomain domain) {
+ return new AutoValue_TimerInternals_TimerData(timerId, namespace, timestamp, domain);
}
- @Override
- public String toString() {
- return MoreObjects.toStringHelper(getClass())
- .add("namespace", namespace)
- .add("timestamp", timestamp)
- .add("domain", domain)
- .toString();
+ /**
+ * Construct a {@link TimerData} for the given parameters, where the timer ID is
+ * deterministically generated from the {@code timestamp} and {@code domain}.
+ */
+ public static TimerData of(StateNamespace namespace, Instant timestamp, TimeDomain domain) {
+ String timerId =
+ new StringBuilder()
+ .append(domain.ordinal())
+ .append(':')
+ .append(timestamp.getMillis())
+ .toString();
+ return of(timerId, namespace, timestamp, domain);
}
/**
@@ -236,11 +203,11 @@ public interface TimerInternals {
}
ComparisonChain chain =
ComparisonChain.start()
- .compare(this.timestamp, that.getTimestamp())
- .compare(this.domain, that.domain);
- if (chain.result() == 0 && !this.namespace.equals(that.namespace)) {
+ .compare(this.getTimestamp(), that.getTimestamp())
+ .compare(this.getDomain(), that.getDomain());
+ if (chain.result() == 0 && !this.getNamespace().equals(that.getNamespace())) {
// Obtaining the stringKey may be expensive; only do so if required
- chain = chain.compare(namespace.stringKey(), that.namespace.stringKey());
+ chain = chain.compare(getNamespace().stringKey(), that.getNamespace().stringKey());
}
return chain.result();
}
@@ -275,20 +242,22 @@ public interface TimerInternals {
public void encode(TimerData timer, OutputStream outStream, Context context)
throws CoderException, IOException {
Context nestedContext = context.nested();
- STRING_CODER.encode(timer.namespace.stringKey(), outStream, nestedContext);
- INSTANT_CODER.encode(timer.timestamp, outStream, nestedContext);
- STRING_CODER.encode(timer.domain.name(), outStream, nestedContext);
+ STRING_CODER.encode(timer.getTimerId(), outStream, nestedContext);
+ STRING_CODER.encode(timer.getNamespace().stringKey(), outStream, nestedContext);
+ INSTANT_CODER.encode(timer.getTimestamp(), outStream, nestedContext);
+ STRING_CODER.encode(timer.getDomain().name(), outStream, nestedContext);
}
@Override
public TimerData decode(InputStream inStream, Context context)
throws CoderException, IOException {
Context nestedContext = context.nested();
+ String timerId = STRING_CODER.decode(inStream, nestedContext);
StateNamespace namespace =
StateNamespaces.fromString(STRING_CODER.decode(inStream, nestedContext), windowCoder);
Instant timestamp = INSTANT_CODER.decode(inStream, nestedContext);
TimeDomain domain = TimeDomain.valueOf(STRING_CODER.decode(inStream, nestedContext));
- return TimerData.of(namespace, timestamp, domain);
+ return TimerData.of(timerId, namespace, timestamp, domain);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/840fb3b9/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java
index a3bb45a..60a90f5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java
@@ -104,10 +104,10 @@ public class InMemoryTimerInternals implements TimerInternals {
}
@Override
- public void setTimer(TimerData timer) {
- WindowTracing.trace("TestTimerInternals.setTimer: {}", timer);
- if (existingTimers.add(timer)) {
- queue(timer.getDomain()).add(timer);
+ public void setTimer(TimerData timerData) {
+ WindowTracing.trace("TestTimerInternals.setTimer: {}", timerData);
+ if (existingTimers.add(timerData)) {
+ queue(timerData.getDomain()).add(timerData);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/840fb3b9/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TimerInternalsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TimerInternalsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TimerInternalsTest.java
index e8ffdb3..7b56f1c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TimerInternalsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/TimerInternalsTest.java
@@ -45,12 +45,14 @@ public class TimerInternalsTest {
public void testTimerDataCoder() throws Exception {
CoderProperties.coderDecodeEncodeEqual(
TimerDataCoder.of(GlobalWindow.Coder.INSTANCE),
- TimerData.of(StateNamespaces.global(), new Instant(0), TimeDomain.EVENT_TIME));
+ TimerData.of(
+ "arbitrary-id", StateNamespaces.global(), new Instant(0), TimeDomain.EVENT_TIME));
Coder<IntervalWindow> windowCoder = IntervalWindow.getCoder();
CoderProperties.coderDecodeEncodeEqual(
TimerDataCoder.of(windowCoder),
TimerData.of(
+ "another-id",
StateNamespaces.window(
windowCoder, new IntervalWindow(new Instant(0), new Instant(100))),
new Instant(99), TimeDomain.PROCESSING_TIME));