You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2020/05/07 21:17:21 UTC

[GitHub] [samza] xinyuiscool commented on a change in pull request #1355: SAMZA-2519: Support duplicate timer registration

xinyuiscool commented on a change in pull request #1355:
URL: https://github.com/apache/samza/pull/1355#discussion_r421798671



##########
File path: samza-core/src/main/java/org/apache/samza/scheduler/EpochTimeScheduler.java
##########
@@ -57,9 +58,33 @@ private EpochTimeScheduler(ScheduledExecutorService executor) {
     this.executor = executor;
   }
 
+  @VisibleForTesting
+  Map<Object, ScheduledFuture> getScheduledFutures() {
+    return scheduledFutures;
+  }
+
   public <K> void setTimer(K key, long timestamp, ScheduledCallback<K> callback) {
-    checkState(!scheduledFutures.containsKey(key),
-        String.format("Duplicate key %s registration for the same timer", key));
+    if (scheduledFutures.containsKey(key)) {
+      LOG.warn("Registering duplicate callback for key: {}. Attempting to cancel the previous callback", key);
+      ScheduledFuture<?> scheduledFuture = scheduledFutures.get(key);
+
+      /*
+       * We can have a race between the time we check for the presence of the key and the time we attempt to cancel;
+       * Hence we check for non-null criteria to ensure the executor hasn't kicked off the callback for the key which
+       * removes the future from the map before invoking onTimer.
+       *  1. In the event that callback is running then we will not attempt to interrupt the action and
+       *     cancel will return as unsuccessful.
+       *  2. In case of the callback successfully executed, we want to allow duplicate registration to keep the
+       *     behavior consistent with the scenario where the callback is already executed or in progress even before
+       *     we entered this condition.
+       */
+      if (scheduledFuture != null
+          && !scheduledFuture.cancel(false)
+          && !scheduledFuture.isDone()) {
+        LOG.debug("Failed to cancel the previous callback successfully. Ignoring the current request to register new callback");

Review comment:
       Probably need this to be warn level logging here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org