You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bh...@apache.org on 2020/05/08 17:20:41 UTC
[samza] branch master updated: SAMZA-2519: Support duplicate timer
registration (#1355)
This is an automated email from the ASF dual-hosted git repository.
bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new a29f29a SAMZA-2519: Support duplicate timer registration (#1355)
a29f29a is described below
commit a29f29a6d56a4dfb1c062a97728bcb11bd824275
Author: mynameborat <bh...@gmail.com>
AuthorDate: Fri May 8 10:20:34 2020 -0700
SAMZA-2519: Support duplicate timer registration (#1355)
---
.../apache/samza/scheduler/EpochTimeScheduler.java | 35 +++++-
.../samza/scheduler/TestEpochTimeScheduler.java | 121 +++++++++++++++++++++
2 files changed, 151 insertions(+), 5 deletions(-)
diff --git a/samza-core/src/main/java/org/apache/samza/scheduler/EpochTimeScheduler.java b/samza-core/src/main/java/org/apache/samza/scheduler/EpochTimeScheduler.java
index cbebbde..4b1e281 100644
--- a/samza-core/src/main/java/org/apache/samza/scheduler/EpochTimeScheduler.java
+++ b/samza-core/src/main/java/org/apache/samza/scheduler/EpochTimeScheduler.java
@@ -19,14 +19,15 @@
package org.apache.samza.scheduler;
+import com.google.common.annotations.VisibleForTesting;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
-
-import static com.google.common.base.Preconditions.checkState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Per-task scheduler for keyed timers.
@@ -36,7 +37,7 @@ import static com.google.common.base.Preconditions.checkState;
* 3) triggers listener whenever a timer fires.
*/
public class EpochTimeScheduler {
-
+ private static final Logger LOG = LoggerFactory.getLogger(EpochTimeScheduler.class);
/**
* For run loop to listen to timer firing so it can schedule the callbacks.
*/
@@ -57,9 +58,33 @@ public class EpochTimeScheduler {
this.executor = executor;
}
+ @VisibleForTesting
+ Map<Object, ScheduledFuture> getScheduledFutures() {
+ return scheduledFutures;
+ }
+
public <K> void setTimer(K key, long timestamp, ScheduledCallback<K> callback) {
- checkState(!scheduledFutures.containsKey(key),
- String.format("Duplicate key %s registration for the same timer", key));
+ if (scheduledFutures.containsKey(key)) {
+ LOG.warn("Registering duplicate callback for key: {}. Attempting to cancel the previous callback", key);
+ ScheduledFuture<?> scheduledFuture = scheduledFutures.get(key);
+
+ /*
+ * We can have a race between the time we check for the presence of the key and the time we attempt to cancel;
+ * Hence we check for non-null criteria to ensure the executor hasn't kicked off the callback for the key which
+ * removes the future from the map before invoking onTimer.
+ * 1. In the event that callback is running then we will not attempt to interrupt the action and
+ * cancel will return as unsuccessful.
+ * 2. In case of the callback successfully executed, we want to allow duplicate registration to keep the
+ * behavior consistent with the scenario where the callback is already executed or in progress even before
+ * we entered this condition.
+ */
+ if (scheduledFuture != null
+ && !scheduledFuture.cancel(false)
+ && !scheduledFuture.isDone()) {
+ LOG.warn("Failed to cancel the previous callback successfully. Ignoring the current request to register new callback");
+ return;
+ }
+ }
final long delay = timestamp - System.currentTimeMillis();
final ScheduledFuture<?> scheduledFuture = executor.schedule(() -> {
diff --git a/samza-core/src/test/java/org/apache/samza/scheduler/TestEpochTimeScheduler.java b/samza-core/src/test/java/org/apache/samza/scheduler/TestEpochTimeScheduler.java
index 5db908c..4fd3dcf 100644
--- a/samza-core/src/test/java/org/apache/samza/scheduler/TestEpochTimeScheduler.java
+++ b/samza-core/src/test/java/org/apache/samza/scheduler/TestEpochTimeScheduler.java
@@ -21,6 +21,8 @@ package org.apache.samza.scheduler;
import java.util.ArrayList;
import java.util.List;
+import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import org.apache.samza.task.MessageCollector;
@@ -57,6 +59,125 @@ public class TestEpochTimeScheduler {
}
@Test
+ @SuppressWarnings("unchecked")
+ public void testDuplicateTimerWithCancelableCallback() {
+ final String timerKey = "timer-1";
+ ScheduledFuture mockScheduledFuture1 = mock(ScheduledFuture.class);
+ ScheduledFuture mockScheduledFuture2 = mock(ScheduledFuture.class);
+ ScheduledExecutorService executor = mock(ScheduledExecutorService.class);
+
+ when(mockScheduledFuture1.cancel(anyBoolean())).thenReturn(true);
+ when(executor.schedule((Runnable) anyObject(), anyLong(), anyObject()))
+ .thenReturn(mockScheduledFuture1)
+ .thenAnswer(invocation -> {
+ Object[] args = invocation.getArguments();
+ Runnable runnable = (Runnable) args[0];
+ runnable.run();
+ return mockScheduledFuture2;
+ });
+
+ EpochTimeScheduler scheduler = EpochTimeScheduler.create(executor);
+ long timestamp = System.currentTimeMillis() + 10000;
+
+ ScheduledCallback<String> expectedScheduledCallback = mock(ScheduledCallback.class);
+ scheduler.setTimer(timerKey, timestamp, mock(ScheduledCallback.class));
+ scheduler.setTimer(timerKey, timestamp, expectedScheduledCallback);
+
+ // verify the interactions with the scheduled future and the scheduler
+ verify(executor, times(2)).schedule((Runnable) anyObject(), anyLong(), anyObject());
+ verify(mockScheduledFuture1, times(1)).cancel(anyBoolean());
+
+ // verify the ready timer and its callback contents to ensure the second invocation callback overwrites the
+ // first callback
+ Set<Map.Entry<EpochTimeScheduler.TimerKey<?>, ScheduledCallback>> readyTimers =
+ scheduler.removeReadyTimers().entrySet();
+ assertEquals("Only one timer should be ready to be fired", readyTimers.size(), 1);
+
+ Map.Entry<EpochTimeScheduler.TimerKey<?>, ScheduledCallback> timerEntry = readyTimers.iterator().next();
+ assertEquals("Expected the scheduled callback from the second invocation",
+ timerEntry.getValue(),
+ expectedScheduledCallback);
+ assertEquals("Expected timer-1 as the key for ready timer",
+ timerEntry.getKey().getKey(),
+ timerKey);
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testDuplicateTimerWithUnsuccessfulCancellation() {
+ final String timerKey = "timer-1";
+ ScheduledFuture mockScheduledFuture1 = mock(ScheduledFuture.class);
+ ScheduledExecutorService executor = mock(ScheduledExecutorService.class);
+
+ when(mockScheduledFuture1.cancel(anyBoolean())).thenReturn(false);
+ when(mockScheduledFuture1.isDone()).thenReturn(false);
+ when(executor.schedule((Runnable) anyObject(), anyLong(), anyObject()))
+ .thenReturn(mockScheduledFuture1);
+
+ EpochTimeScheduler scheduler = EpochTimeScheduler.create(executor);
+ long timestamp = System.currentTimeMillis() + 10000;
+
+ scheduler.setTimer(timerKey, timestamp, mock(ScheduledCallback.class));
+ scheduler.setTimer(timerKey, timestamp, mock(ScheduledCallback.class));
+
+ // verify the interactions with the scheduled future and the scheduler
+ verify(executor, times(1)).schedule((Runnable) anyObject(), anyLong(), anyObject());
+ verify(mockScheduledFuture1, times(1)).cancel(anyBoolean());
+ verify(mockScheduledFuture1, times(1)).isDone();
+
+ Map<Object, ScheduledFuture> scheduledFutures = scheduler.getScheduledFutures();
+ assertTrue("Expected the timer to be in the queue", scheduledFutures.containsKey(timerKey));
+ assertEquals("Expected the scheduled callback from the first invocation",
+ scheduledFutures.get(timerKey),
+ mockScheduledFuture1);
+ }
+
+ @Test
+ public void testDuplicateTimerWithFinishedCallbacks() {
+ final String timerKey = "timer-1";
+ ScheduledFuture mockScheduledFuture1 = mock(ScheduledFuture.class);
+ ScheduledFuture mockScheduledFuture2 = mock(ScheduledFuture.class);
+ ScheduledExecutorService executor = mock(ScheduledExecutorService.class);
+
+ when(mockScheduledFuture1.cancel(anyBoolean())).thenReturn(false);
+ when(mockScheduledFuture1.isDone()).thenReturn(true);
+ when(executor.schedule((Runnable) anyObject(), anyLong(), anyObject()))
+ .thenReturn(mockScheduledFuture1)
+ .thenAnswer(invocation -> {
+ Object[] args = invocation.getArguments();
+ Runnable runnable = (Runnable) args[0];
+ runnable.run();
+ return mockScheduledFuture2;
+ });
+
+ EpochTimeScheduler scheduler = EpochTimeScheduler.create(executor);
+ long timestamp = System.currentTimeMillis() + 10000;
+
+ ScheduledCallback<String> expectedScheduledCallback = mock(ScheduledCallback.class);
+ scheduler.setTimer(timerKey, timestamp, mock(ScheduledCallback.class));
+ scheduler.setTimer(timerKey, timestamp, expectedScheduledCallback);
+
+ // verify the interactions with the scheduled future and the scheduler
+ verify(executor, times(2)).schedule((Runnable) anyObject(), anyLong(), anyObject());
+ verify(mockScheduledFuture1, times(1)).cancel(anyBoolean());
+ verify(mockScheduledFuture1, times(1)).isDone();
+
+ // verify the ready timer and its callback contents to ensure the second invocation callback overwrites the
+ // first callback
+ Set<Map.Entry<EpochTimeScheduler.TimerKey<?>, ScheduledCallback>> readyTimers =
+ scheduler.removeReadyTimers().entrySet();
+ assertEquals("Only one timer should be ready to be fired", readyTimers.size(), 1);
+
+ Map.Entry<EpochTimeScheduler.TimerKey<?>, ScheduledCallback> timerEntry = readyTimers.iterator().next();
+ assertEquals("Expected the scheduled callback from the second invocation",
+ timerEntry.getValue(),
+ expectedScheduledCallback);
+ assertEquals("Expected timer-1 as the key for ready timer",
+ timerEntry.getKey().getKey(),
+ timerKey);
+ }
+
+ @Test
public void testSingleTimer() {
EpochTimeScheduler scheduler = EpochTimeScheduler.create(createExecutorService());
List<String> results = new ArrayList<>();