You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pe...@apache.org on 2017/08/20 15:03:30 UTC
[30/53] [abbrv] beam git commit: jstorm-runner: support deleteTimer
in JStormTimerInternals.
jstorm-runner: support deleteTimer in JStormTimerInternals.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/18198330
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/18198330
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/18198330
Branch: refs/heads/jstorm-runner
Commit: 18198330d42a13d3d8dd96cccdbd07ba077b9408
Parents: af5221c
Author: Pei He <pe...@apache.org>
Authored: Tue Jul 18 20:07:19 2017 +0800
Committer: Pei He <pe...@apache.org>
Committed: Sat Aug 19 12:02:58 2017 +0800
----------------------------------------------------------------------
.../runners/jstorm/translation/JStormTimerInternals.java | 3 +--
.../beam/runners/jstorm/translation/TimerService.java | 2 ++
.../beam/runners/jstorm/translation/TimerServiceImpl.java | 9 +++++++++
3 files changed, 12 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/beam/blob/18198330/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormTimerInternals.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormTimerInternals.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormTimerInternals.java
index 4c96541..0e9ee35 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormTimerInternals.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormTimerInternals.java
@@ -69,8 +69,7 @@ class JStormTimerInternals<K> implements TimerInternals {
@Override
@Deprecated
public void deleteTimer(TimerData timerData) {
- throw new UnsupportedOperationException(
- "Canceling of a timer is not yet supported.");
+ timerService.deleteTimer(timerData);
}
@Override
http://git-wip-us.apache.org/repos/asf/beam/blob/18198330/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerService.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerService.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerService.java
index 29345aa..24a9050 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerService.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerService.java
@@ -48,4 +48,6 @@ interface TimerService extends Serializable {
void setTimer(Object key, TimerInternals.TimerData timerData, DoFnExecutor doFnExecutor);
void fireTimers(long newWatermark);
+
+ void deleteTimer(TimerInternals.TimerData timerData);
}
http://git-wip-us.apache.org/repos/asf/beam/blob/18198330/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerServiceImpl.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerServiceImpl.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerServiceImpl.java
index c2600e5..6b463db 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerServiceImpl.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerServiceImpl.java
@@ -152,4 +152,13 @@ class TimerServiceImpl implements TimerService {
keyedExecutors.add(new Pair<>(doFnExecutor.getInternalDoFnExecutorId(), key));
timerDataToKeyedExecutors.put(timerData, keyedExecutors);
}
+
+ @Override
+ public void deleteTimer(TimerInternals.TimerData timerData) {
+ checkArgument(
+ TimeDomain.EVENT_TIME.equals(timerData.getDomain()),
+ String.format("Does not support domain: %s.", timerData.getDomain()));
+ eventTimeTimersQueue.remove(timerData);
+ timerDataToKeyedExecutors.remove(timerData);
+ }
}