You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/10/31 03:16:37 UTC
[1/2] incubator-beam git commit: Add setTimer and deleteTimer by ID
in TimerInternals
Repository: incubator-beam
Updated Branches:
refs/heads/master beccdc686 -> 7160ee9e4
Add setTimer and deleteTimer by ID in TimerInternals
For users, timers will have IDs. This is a step towards
that API; our current API treats the timer's timestamp
as its ID, more-or-less. We can leave that API or not. This
change adds the more general API.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/371d56f2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/371d56f2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/371d56f2
Branch: refs/heads/master
Commit: 371d56f2c5e36b59596db90ca032ef6c7d8026e9
Parents: beccdc6
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Oct 25 12:28:14 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sun Oct 30 20:14:03 2016 -0700
----------------------------------------------------------------------
.../runners/direct/DirectTimerInternals.java | 13 ++++++++++
.../wrappers/streaming/WindowDoFnOperator.java | 13 ++++++++++
.../apache/beam/sdk/util/TimerInternals.java | 27 +++++++++++++++++---
.../sdk/util/state/InMemoryTimerInternals.java | 12 +++++++++
4 files changed, 61 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/371d56f2/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
index 3158577..4245a87 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
@@ -21,7 +21,9 @@ import javax.annotation.Nullable;
import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate;
import org.apache.beam.runners.direct.WatermarkManager.TimerUpdate.TimerUpdateBuilder;
import org.apache.beam.runners.direct.WatermarkManager.TransformWatermarks;
+import org.apache.beam.sdk.util.TimeDomain;
import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.state.StateNamespace;
import org.joda.time.Instant;
/**
@@ -45,11 +47,22 @@ class DirectTimerInternals implements TimerInternals {
}
@Override
+ public void setTimer(StateNamespace namespace, String timerId, Instant target,
+ TimeDomain timeDomain) {
+ throw new UnsupportedOperationException("Setting timer by ID not yet supported.");
+ }
+
+ @Override
public void setTimer(TimerData timerKey) {
timerUpdateBuilder.setTimer(timerKey);
}
@Override
+ public void deleteTimer(StateNamespace namespace, String timerId) {
+ throw new UnsupportedOperationException("Canceling of timer by ID is not yet supported.");
+ }
+
+ @Override
public void deleteTimer(TimerData timerKey) {
timerUpdateBuilder.deletedTimer(timerKey);
}
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/371d56f2/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
index e06a783..5debd4b 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
@@ -55,6 +55,7 @@ import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.util.WindowingStrategy;
import org.apache.beam.sdk.util.state.StateInternals;
import org.apache.beam.sdk.util.state.StateInternalsFactory;
+import org.apache.beam.sdk.util.state.StateNamespace;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TupleTag;
@@ -451,6 +452,12 @@ public class WindowDoFnOperator<K, InputT, OutputT>
public TimerInternals timerInternals() {
return new TimerInternals() {
@Override
+ public void setTimer(
+ StateNamespace namespace, String timerId, Instant target, TimeDomain timeDomain) {
+ throw new UnsupportedOperationException("Setting a timer by ID is not yet supported.");
+ }
+
+ @Override
public void setTimer(TimerData timerKey) {
if (timerKey.getDomain().equals(TimeDomain.EVENT_TIME)) {
registerEventTimeTimer(timerKey);
@@ -463,6 +470,12 @@ public class WindowDoFnOperator<K, InputT, OutputT>
}
@Override
+ public void deleteTimer(StateNamespace namespace, String timerId) {
+ throw new UnsupportedOperationException(
+ "Canceling of a timer by ID is not yet supported.");
+ }
+
+ @Override
public void deleteTimer(TimerData timerKey) {
if (timerKey.getDomain().equals(TimeDomain.EVENT_TIME)) {
deleteEventTimeTimer(timerKey);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/371d56f2/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java
index 58678f8..8015116 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java
@@ -50,17 +50,36 @@ import org.joda.time.Instant;
public interface TimerInternals {
/**
- * Writes out a timer to be fired when the watermark reaches the given
- * timestamp.
+ * Writes out a timer to be fired when the current time in the specified time domain reaches the
+ * target timestamp.
*
- * <p>The combination of {@code namespace}, {@code timestamp} and {@code domain} uniquely
- * identify a timer. Multiple timers set for the same parameters can be safely deduplicated.
+ * <p>The combination of {@code namespace} and {@code timerId} uniquely identify a timer.
+ *
+ * <p>If a timer is set and then set again before it fires, later settings should clear the prior
+ * setting.
+ *
+ * <p>It is an error to set a timer for two different time domains.
+ */
+ void setTimer(StateNamespace namespace, String timerId, Instant target, TimeDomain timeDomain);
+
+ /**
+ * Writes out a timer to be fired when the watermark reaches the given timestamp, automatically
+ * generating an id for it from the provided {@link TimerData}.
+ *
+ * <p>The {@link TimerData} contains all the fields necessary to set the timer. The timer's ID
+ * is determinstically generated from the {@link TimerData}, so it may be canceled using
+ * the same {@link TimerData}.
*/
void setTimer(TimerData timerKey);
/**
* Deletes the given timer.
*/
+ void deleteTimer(StateNamespace namespace, String timerId);
+
+ /**
+ * Deletes the given timer, automatically inferring its ID from the {@link TimerData}.
+ */
void deleteTimer(TimerData timerKey);
/**
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/371d56f2/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java
index dcab5fe..a3bb45a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/state/InMemoryTimerInternals.java
@@ -36,6 +36,7 @@ import org.joda.time.Instant;
* computation and key in a Windmill-like streaming environment.
*/
public class InMemoryTimerInternals implements TimerInternals {
+
/** At most one timer per timestamp is kept. */
private Set<TimerData> existingTimers = new HashSet<>();
@@ -97,6 +98,12 @@ public class InMemoryTimerInternals implements TimerInternals {
}
@Override
+ public void setTimer(StateNamespace namespace, String timerId, Instant target,
+ TimeDomain timeDomain) {
+ throw new UnsupportedOperationException("Setting a timer by ID is not yet supported.");
+ }
+
+ @Override
public void setTimer(TimerData timer) {
WindowTracing.trace("TestTimerInternals.setTimer: {}", timer);
if (existingTimers.add(timer)) {
@@ -105,6 +112,11 @@ public class InMemoryTimerInternals implements TimerInternals {
}
@Override
+ public void deleteTimer(StateNamespace namespace, String timerId) {
+ throw new UnsupportedOperationException("Canceling a timer by ID is not yet supported.");
+ }
+
+ @Override
public void deleteTimer(TimerData timer) {
WindowTracing.trace("TestTimerInternals.deleteTimer: {}", timer);
existingTimers.remove(timer);
[2/2] incubator-beam git commit: This closes #1217
Posted by ke...@apache.org.
This closes #1217
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7160ee9e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7160ee9e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7160ee9e
Branch: refs/heads/master
Commit: 7160ee9e4e4592464fda7aa14a30c85cd29a258e
Parents: beccdc6 371d56f
Author: Kenneth Knowles <kl...@google.com>
Authored: Sun Oct 30 20:15:26 2016 -0700
Committer: Kenneth Knowles <kl...@google.com>
Committed: Sun Oct 30 20:15:26 2016 -0700
----------------------------------------------------------------------
.../runners/direct/DirectTimerInternals.java | 13 ++++++++++
.../wrappers/streaming/WindowDoFnOperator.java | 13 ++++++++++
.../apache/beam/sdk/util/TimerInternals.java | 27 +++++++++++++++++---
.../sdk/util/state/InMemoryTimerInternals.java | 12 +++++++++
4 files changed, 61 insertions(+), 4 deletions(-)
----------------------------------------------------------------------