You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@beam.apache.org by pe...@apache.org on 2017/08/20 15:03:30 UTC

[30/53] [abbrv] beam git commit: jstorm-runner: support deleteTimer in JStormTimerInternals.

jstorm-runner: support deleteTimer in JStormTimerInternals.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/18198330
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/18198330
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/18198330

Branch: refs/heads/jstorm-runner
Commit: 18198330d42a13d3d8dd96cccdbd07ba077b9408
Parents: af5221c
Author: Pei He <pe...@apache.org>
Authored: Tue Jul 18 20:07:19 2017 +0800
Committer: Pei He <pe...@apache.org>
Committed: Sat Aug 19 12:02:58 2017 +0800

----------------------------------------------------------------------
 .../runners/jstorm/translation/JStormTimerInternals.java    | 3 +--
 .../beam/runners/jstorm/translation/TimerService.java       | 2 ++
 .../beam/runners/jstorm/translation/TimerServiceImpl.java   | 9 +++++++++
 3 files changed, 12 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/18198330/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormTimerInternals.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormTimerInternals.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormTimerInternals.java
index 4c96541..0e9ee35 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormTimerInternals.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/JStormTimerInternals.java
@@ -69,8 +69,7 @@ class JStormTimerInternals<K> implements TimerInternals {
   @Override
   @Deprecated
   public void deleteTimer(TimerData timerData) {
-    throw new UnsupportedOperationException(
-        "Canceling of a timer is not yet supported.");
+    timerService.deleteTimer(timerData);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/18198330/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerService.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerService.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerService.java
index 29345aa..24a9050 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerService.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerService.java
@@ -48,4 +48,6 @@ interface TimerService extends Serializable {
   void setTimer(Object key, TimerInternals.TimerData timerData, DoFnExecutor doFnExecutor);
 
   void fireTimers(long newWatermark);
+
+  void deleteTimer(TimerInternals.TimerData timerData);
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/18198330/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerServiceImpl.java
----------------------------------------------------------------------
diff --git a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerServiceImpl.java b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerServiceImpl.java
index c2600e5..6b463db 100644
--- a/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerServiceImpl.java
+++ b/runners/jstorm/src/main/java/org/apache/beam/runners/jstorm/translation/TimerServiceImpl.java
@@ -152,4 +152,13 @@ class TimerServiceImpl implements TimerService {
     keyedExecutors.add(new Pair<>(doFnExecutor.getInternalDoFnExecutorId(), key));
     timerDataToKeyedExecutors.put(timerData, keyedExecutors);
   }
+
+  @Override
+  public void deleteTimer(TimerInternals.TimerData timerData) {
+    checkArgument(
+        TimeDomain.EVENT_TIME.equals(timerData.getDomain()),
+        String.format("Does not support domain: %s.", timerData.getDomain()));
+    eventTimeTimersQueue.remove(timerData);
+    timerDataToKeyedExecutors.remove(timerData);
+  }
 }