You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bh...@apache.org on 2020/05/08 17:20:41 UTC

[samza] branch master updated: SAMZA-2519: Support duplicate timer registration (#1355)

This is an automated email from the ASF dual-hosted git repository.

bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new a29f29a  SAMZA-2519: Support duplicate timer registration (#1355)
a29f29a is described below

commit a29f29a6d56a4dfb1c062a97728bcb11bd824275
Author: mynameborat <bh...@gmail.com>
AuthorDate: Fri May 8 10:20:34 2020 -0700

    SAMZA-2519: Support duplicate timer registration (#1355)
---
 .../apache/samza/scheduler/EpochTimeScheduler.java |  35 +++++-
 .../samza/scheduler/TestEpochTimeScheduler.java    | 121 +++++++++++++++++++++
 2 files changed, 151 insertions(+), 5 deletions(-)

diff --git a/samza-core/src/main/java/org/apache/samza/scheduler/EpochTimeScheduler.java b/samza-core/src/main/java/org/apache/samza/scheduler/EpochTimeScheduler.java
index cbebbde..4b1e281 100644
--- a/samza-core/src/main/java/org/apache/samza/scheduler/EpochTimeScheduler.java
+++ b/samza-core/src/main/java/org/apache/samza/scheduler/EpochTimeScheduler.java
@@ -19,14 +19,15 @@
 
 package org.apache.samza.scheduler;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.util.Map;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
-
-import static com.google.common.base.Preconditions.checkState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Per-task scheduler for keyed timers.
@@ -36,7 +37,7 @@ import static com.google.common.base.Preconditions.checkState;
  * 3) triggers listener whenever a timer fires.
  */
 public class EpochTimeScheduler {
-
+  private static final Logger LOG = LoggerFactory.getLogger(EpochTimeScheduler.class);
   /**
    * For run loop to listen to timer firing so it can schedule the callbacks.
    */
@@ -57,9 +58,33 @@ public class EpochTimeScheduler {
     this.executor = executor;
   }
 
+  @VisibleForTesting
+  Map<Object, ScheduledFuture> getScheduledFutures() {
+    return scheduledFutures;
+  }
+
   public <K> void setTimer(K key, long timestamp, ScheduledCallback<K> callback) {
-    checkState(!scheduledFutures.containsKey(key),
-        String.format("Duplicate key %s registration for the same timer", key));
+    if (scheduledFutures.containsKey(key)) {
+      LOG.warn("Registering duplicate callback for key: {}. Attempting to cancel the previous callback", key);
+      ScheduledFuture<?> scheduledFuture = scheduledFutures.get(key);
+
+      /*
+       * We can have a race between the time we check for the presence of the key and the time we attempt to cancel;
+       * Hence we check for non-null criteria to ensure the executor hasn't kicked off the callback for the key which
+       * removes the future from the map before invoking onTimer.
+       *  1. In the event that callback is running then we will not attempt to interrupt the action and
+       *     cancel will return as unsuccessful.
+       *  2. In case of the callback successfully executed, we want to allow duplicate registration to keep the
+       *     behavior consistent with the scenario where the callback is already executed or in progress even before
+       *     we entered this condition.
+       */
+      if (scheduledFuture != null
+          && !scheduledFuture.cancel(false)
+          && !scheduledFuture.isDone()) {
+        LOG.warn("Failed to cancel the previous callback successfully. Ignoring the current request to register new callback");
+        return;
+      }
+    }
 
     final long delay = timestamp - System.currentTimeMillis();
     final ScheduledFuture<?> scheduledFuture = executor.schedule(() -> {
diff --git a/samza-core/src/test/java/org/apache/samza/scheduler/TestEpochTimeScheduler.java b/samza-core/src/test/java/org/apache/samza/scheduler/TestEpochTimeScheduler.java
index 5db908c..4fd3dcf 100644
--- a/samza-core/src/test/java/org/apache/samza/scheduler/TestEpochTimeScheduler.java
+++ b/samza-core/src/test/java/org/apache/samza/scheduler/TestEpochTimeScheduler.java
@@ -21,6 +21,8 @@ package org.apache.samza.scheduler;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import org.apache.samza.task.MessageCollector;
@@ -57,6 +59,125 @@ public class TestEpochTimeScheduler {
   }
 
   @Test
+  @SuppressWarnings("unchecked")
+  public void testDuplicateTimerWithCancelableCallback() {
+    final String timerKey = "timer-1";
+    ScheduledFuture mockScheduledFuture1 = mock(ScheduledFuture.class);
+    ScheduledFuture mockScheduledFuture2 = mock(ScheduledFuture.class);
+    ScheduledExecutorService executor = mock(ScheduledExecutorService.class);
+
+    when(mockScheduledFuture1.cancel(anyBoolean())).thenReturn(true);
+    when(executor.schedule((Runnable) anyObject(), anyLong(), anyObject()))
+        .thenReturn(mockScheduledFuture1)
+        .thenAnswer(invocation -> {
+            Object[] args = invocation.getArguments();
+            Runnable runnable = (Runnable) args[0];
+            runnable.run();
+            return mockScheduledFuture2;
+          });
+
+    EpochTimeScheduler scheduler = EpochTimeScheduler.create(executor);
+    long timestamp = System.currentTimeMillis() + 10000;
+
+    ScheduledCallback<String> expectedScheduledCallback = mock(ScheduledCallback.class);
+    scheduler.setTimer(timerKey, timestamp, mock(ScheduledCallback.class));
+    scheduler.setTimer(timerKey, timestamp, expectedScheduledCallback);
+
+    // verify the interactions with the scheduled future and the scheduler
+    verify(executor, times(2)).schedule((Runnable) anyObject(), anyLong(), anyObject());
+    verify(mockScheduledFuture1, times(1)).cancel(anyBoolean());
+
+    // verify the ready timer and its callback contents to ensure the second invocation callback overwrites the
+    // first callback
+    Set<Map.Entry<EpochTimeScheduler.TimerKey<?>, ScheduledCallback>> readyTimers =
+        scheduler.removeReadyTimers().entrySet();
+    assertEquals("Only one timer should be ready to be fired", readyTimers.size(), 1);
+
+    Map.Entry<EpochTimeScheduler.TimerKey<?>, ScheduledCallback> timerEntry = readyTimers.iterator().next();
+    assertEquals("Expected the scheduled callback from the second invocation",
+        timerEntry.getValue(),
+        expectedScheduledCallback);
+    assertEquals("Expected timer-1 as the key for ready timer",
+        timerEntry.getKey().getKey(),
+        timerKey);
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testDuplicateTimerWithUnsuccessfulCancellation() {
+    final String timerKey = "timer-1";
+    ScheduledFuture mockScheduledFuture1 = mock(ScheduledFuture.class);
+    ScheduledExecutorService executor = mock(ScheduledExecutorService.class);
+
+    when(mockScheduledFuture1.cancel(anyBoolean())).thenReturn(false);
+    when(mockScheduledFuture1.isDone()).thenReturn(false);
+    when(executor.schedule((Runnable) anyObject(), anyLong(), anyObject()))
+        .thenReturn(mockScheduledFuture1);
+
+    EpochTimeScheduler scheduler = EpochTimeScheduler.create(executor);
+    long timestamp = System.currentTimeMillis() + 10000;
+
+    scheduler.setTimer(timerKey, timestamp, mock(ScheduledCallback.class));
+    scheduler.setTimer(timerKey, timestamp, mock(ScheduledCallback.class));
+
+    // verify the interactions with the scheduled future and the scheduler
+    verify(executor, times(1)).schedule((Runnable) anyObject(), anyLong(), anyObject());
+    verify(mockScheduledFuture1, times(1)).cancel(anyBoolean());
+    verify(mockScheduledFuture1, times(1)).isDone();
+
+    Map<Object, ScheduledFuture> scheduledFutures = scheduler.getScheduledFutures();
+    assertTrue("Expected the timer to be in the queue", scheduledFutures.containsKey(timerKey));
+    assertEquals("Expected the scheduled callback from the first invocation",
+        scheduledFutures.get(timerKey),
+        mockScheduledFuture1);
+  }
+
+  @Test
+  public void testDuplicateTimerWithFinishedCallbacks() {
+    final String timerKey = "timer-1";
+    ScheduledFuture mockScheduledFuture1 = mock(ScheduledFuture.class);
+    ScheduledFuture mockScheduledFuture2 = mock(ScheduledFuture.class);
+    ScheduledExecutorService executor = mock(ScheduledExecutorService.class);
+
+    when(mockScheduledFuture1.cancel(anyBoolean())).thenReturn(false);
+    when(mockScheduledFuture1.isDone()).thenReturn(true);
+    when(executor.schedule((Runnable) anyObject(), anyLong(), anyObject()))
+        .thenReturn(mockScheduledFuture1)
+        .thenAnswer(invocation -> {
+            Object[] args = invocation.getArguments();
+            Runnable runnable = (Runnable) args[0];
+            runnable.run();
+            return mockScheduledFuture2;
+          });
+
+    EpochTimeScheduler scheduler = EpochTimeScheduler.create(executor);
+    long timestamp = System.currentTimeMillis() + 10000;
+
+    ScheduledCallback<String> expectedScheduledCallback = mock(ScheduledCallback.class);
+    scheduler.setTimer(timerKey, timestamp, mock(ScheduledCallback.class));
+    scheduler.setTimer(timerKey, timestamp, expectedScheduledCallback);
+
+    // verify the interactions with the scheduled future and the scheduler
+    verify(executor, times(2)).schedule((Runnable) anyObject(), anyLong(), anyObject());
+    verify(mockScheduledFuture1, times(1)).cancel(anyBoolean());
+    verify(mockScheduledFuture1, times(1)).isDone();
+
+    // verify the ready timer and its callback contents to ensure the second invocation callback overwrites the
+    // first callback
+    Set<Map.Entry<EpochTimeScheduler.TimerKey<?>, ScheduledCallback>> readyTimers =
+        scheduler.removeReadyTimers().entrySet();
+    assertEquals("Only one timer should be ready to be fired", readyTimers.size(), 1);
+
+    Map.Entry<EpochTimeScheduler.TimerKey<?>, ScheduledCallback> timerEntry = readyTimers.iterator().next();
+    assertEquals("Expected the scheduled callback from the second invocation",
+        timerEntry.getValue(),
+        expectedScheduledCallback);
+    assertEquals("Expected timer-1 as the key for ready timer",
+        timerEntry.getKey().getKey(),
+        timerKey);
+  }
+
+  @Test
   public void testSingleTimer() {
     EpochTimeScheduler scheduler = EpochTimeScheduler.create(createExecutorService());
     List<String> results = new ArrayList<>();