You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by ke...@apache.org on 2016/12/21 18:17:51 UTC
[1/2] incubator-beam git commit: Require TimeDomain to delete a timer
Repository: incubator-beam
Updated Branches:
refs/heads/master 0d0a5e287 -> 4843dc59c
Require TimeDomain to delete a timer
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/35a02740
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/35a02740
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/35a02740
Branch: refs/heads/master
Commit: 35a02740748182ee52729d8bfb621a3c342b8312
Parents: 0d0a5e2
Author: Kenneth Knowles <kl...@google.com>
Authored: Tue Dec 20 20:09:25 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed Dec 21 08:20:28 2016 -0800
----------------------------------------------------------------------
.../operators/ApexGroupByKeyOperator.java | 8 ++++++++
.../beam/runners/core/InMemoryTimerInternals.java | 8 ++++++++
.../beam/runners/direct/DirectTimerInternals.java | 8 ++++++++
.../wrappers/streaming/WindowDoFnOperator.java | 9 +++++++++
.../org/apache/beam/sdk/util/TimerInternals.java | 17 +++++++++++++++--
5 files changed, 48 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/35a02740/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
index 48ac177..49ec1c8 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java
@@ -425,12 +425,19 @@ public class ApexGroupByKeyOperator<K, V> implements Operator {
*/
public class ApexTimerInternals implements TimerInternals {
+ @Deprecated
@Override
public void setTimer(TimerData timerData) {
registerActiveTimer(context.element().key(), timerData);
}
@Override
+ public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain) {
+ throw new UnsupportedOperationException("Canceling of timer by ID is not yet supported.");
+ }
+
+ @Deprecated
+ @Override
public void deleteTimer(TimerData timerKey) {
unregisterActiveTimer(context.element().key(), timerKey);
}
@@ -463,6 +470,7 @@ public class ApexGroupByKeyOperator<K, V> implements Operator {
throw new UnsupportedOperationException("Setting timer by ID not yet supported.");
}
+ @Deprecated
@Override
public void deleteTimer(StateNamespace namespace, String timerId) {
throw new UnsupportedOperationException("Canceling of timer by ID is not yet supported.");
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/35a02740/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
index 5fcd088..5ddd5a7 100644
--- a/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/InMemoryTimerInternals.java
@@ -107,6 +107,7 @@ public class InMemoryTimerInternals implements TimerInternals {
throw new UnsupportedOperationException("Setting a timer by ID is not yet supported.");
}
+ @Deprecated
@Override
public void setTimer(TimerData timerData) {
WindowTracing.trace("{}.setTimer: {}", getClass().getSimpleName(), timerData);
@@ -116,10 +117,17 @@ public class InMemoryTimerInternals implements TimerInternals {
}
@Override
+ public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain) {
+ throw new UnsupportedOperationException("Canceling a timer by ID is not yet supported.");
+ }
+
+ @Deprecated
+ @Override
public void deleteTimer(StateNamespace namespace, String timerId) {
throw new UnsupportedOperationException("Canceling a timer by ID is not yet supported.");
}
+ @Deprecated
@Override
public void deleteTimer(TimerData timer) {
WindowTracing.trace("{}.deleteTimer: {}", getClass().getSimpleName(), timer);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/35a02740/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
index 8970b4b..5ca276d 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectTimerInternals.java
@@ -52,16 +52,24 @@ class DirectTimerInternals implements TimerInternals {
throw new UnsupportedOperationException("Setting timer by ID not yet supported.");
}
+ @Deprecated
@Override
public void setTimer(TimerData timerData) {
timerUpdateBuilder.setTimer(timerData);
}
@Override
+ public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain) {
+ throw new UnsupportedOperationException("Canceling of timer by ID is not yet supported.");
+ }
+
+ @Deprecated
+ @Override
public void deleteTimer(StateNamespace namespace, String timerId) {
throw new UnsupportedOperationException("Canceling of timer by ID is not yet supported.");
}
+ @Deprecated
@Override
public void deleteTimer(TimerData timerKey) {
timerUpdateBuilder.deletedTimer(timerKey);
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/35a02740/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
index 9cea529..5398d7b 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java
@@ -455,6 +455,7 @@ public class WindowDoFnOperator<K, InputT, OutputT>
throw new UnsupportedOperationException("Setting a timer by ID is not yet supported.");
}
+ @Deprecated
@Override
public void setTimer(TimerData timerKey) {
if (timerKey.getDomain().equals(TimeDomain.EVENT_TIME)) {
@@ -468,11 +469,19 @@ public class WindowDoFnOperator<K, InputT, OutputT>
}
@Override
+ public void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain) {
+ throw new UnsupportedOperationException(
+ "Canceling of a timer by ID is not yet supported.");
+ }
+
+ @Deprecated
+ @Override
public void deleteTimer(StateNamespace namespace, String timerId) {
throw new UnsupportedOperationException(
"Canceling of a timer by ID is not yet supported.");
}
+ @Deprecated
@Override
public void deleteTimer(TimerData timerKey) {
if (timerKey.getDomain().equals(TimeDomain.EVENT_TIME)) {
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/35a02740/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java
index c3e498e..0bfcddc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/TimerInternals.java
@@ -34,6 +34,7 @@ import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.InstantCoder;
import org.apache.beam.sdk.coders.StandardCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.state.StateNamespace;
import org.apache.beam.sdk.util.state.StateNamespaces;
@@ -61,18 +62,30 @@ public interface TimerInternals {
void setTimer(StateNamespace namespace, String timerId, Instant target, TimeDomain timeDomain);
/**
- * Sets the timer described by {@code timerData}.
+ * @deprecated use {@link #setTimer(StateNamespace, String, Instant, TimeDomain)}.
*/
+ @Deprecated
void setTimer(TimerData timerData);
/**
* Deletes the given timer.
+ *
+ * <p>A timer's ID is enforced to be unique in validation of a {@link DoFn}, but runners
+ * often manage timers for different time domains in very different ways, thus the
+ * {@link TimeDomain} is a required parameter.
+ */
+ void deleteTimer(StateNamespace namespace, String timerId, TimeDomain timeDomain);
+
+ /**
+ * @deprecated use {@link #deleteTimer(StateNamespace, String, TimeDomain)}.
*/
+ @Deprecated
void deleteTimer(StateNamespace namespace, String timerId);
/**
- * Deletes the timer with the ID contained in the provided {@link TimerData}.
+ * @deprecated use {@link #deleteTimer(StateNamespace, String, TimeDomain)}.
*/
+ @Deprecated
void deleteTimer(TimerData timerKey);
/**
[2/2] incubator-beam git commit: This closes #1673: Require
TimeDomain to delete a timer
Posted by ke...@apache.org.
This closes #1673: Require TimeDomain to delete a timer
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4843dc59
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4843dc59
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4843dc59
Branch: refs/heads/master
Commit: 4843dc59c6e87ea0be75f7abd1e312bf5bc5a529
Parents: 0d0a5e2 35a0274
Author: Kenneth Knowles <kl...@google.com>
Authored: Wed Dec 21 10:15:56 2016 -0800
Committer: Kenneth Knowles <kl...@google.com>
Committed: Wed Dec 21 10:15:56 2016 -0800
----------------------------------------------------------------------
.../operators/ApexGroupByKeyOperator.java | 8 ++++++++
.../beam/runners/core/InMemoryTimerInternals.java | 8 ++++++++
.../beam/runners/direct/DirectTimerInternals.java | 8 ++++++++
.../wrappers/streaming/WindowDoFnOperator.java | 9 +++++++++
.../org/apache/beam/sdk/util/TimerInternals.java | 17 +++++++++++++++--
5 files changed, 48 insertions(+), 2 deletions(-)
----------------------------------------------------------------------