You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ca...@apache.org on 2020/03/27 22:02:51 UTC
[samza] branch master updated: SAMZA-2496:
TestContainerHeartbeatMonitor does not properly stop the
ContainerHeartbeatMonitor (#1334)
This is an automated email from the ASF dual-hosted git repository.
cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new bbb5bde SAMZA-2496: TestContainerHeartbeatMonitor does not properly stop the ContainerHeartbeatMonitor (#1334)
bbb5bde is described below
commit bbb5bdee7ccbd2b37ff07ef7e832f95b6a4c0ab4
Author: Cameron Lee <ca...@linkedin.com>
AuthorDate: Fri Mar 27 15:02:43 2020 -0700
SAMZA-2496: TestContainerHeartbeatMonitor does not properly stop the ContainerHeartbeatMonitor (#1334)
API/Usage changes: N/A
---
.../samza/container/ContainerHeartbeatMonitor.java | 17 +++-
.../container/TestContainerHeartbeatMonitor.java | 109 ++++++++++++++++-----
2 files changed, 96 insertions(+), 30 deletions(-)
diff --git a/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java b/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java
index 55fa8da..1a131c3 100644
--- a/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java
+++ b/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java
@@ -19,6 +19,7 @@
package org.apache.samza.container;
+import com.google.common.annotations.VisibleForTesting;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
@@ -33,16 +34,26 @@ import org.slf4j.LoggerFactory;
public class ContainerHeartbeatMonitor {
private static final Logger LOG = LoggerFactory.getLogger(ContainerHeartbeatMonitor.class);
private static final ThreadFactory THREAD_FACTORY = new HeartbeatThreadFactory();
- private static final int SCHEDULE_MS = 60000;
- private static final int SHUTDOWN_TIMOUT_MS = 120000;
- private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(THREAD_FACTORY);
+ @VisibleForTesting
+ static final int SCHEDULE_MS = 60000;
+ @VisibleForTesting
+ static final int SHUTDOWN_TIMOUT_MS = 120000;
+
private final Runnable onContainerExpired;
private final ContainerHeartbeatClient containerHeartbeatClient;
+ private final ScheduledExecutorService scheduler;
private boolean started = false;
public ContainerHeartbeatMonitor(Runnable onContainerExpired, ContainerHeartbeatClient containerHeartbeatClient) {
+ this(onContainerExpired, containerHeartbeatClient, Executors.newSingleThreadScheduledExecutor(THREAD_FACTORY));
+ }
+
+ @VisibleForTesting
+ ContainerHeartbeatMonitor(Runnable onContainerExpired, ContainerHeartbeatClient containerHeartbeatClient,
+ ScheduledExecutorService scheduler) {
this.onContainerExpired = onContainerExpired;
this.containerHeartbeatClient = containerHeartbeatClient;
+ this.scheduler = scheduler;
}
public void start() {
diff --git a/samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatMonitor.java b/samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatMonitor.java
index 829a158..65701b3 100644
--- a/samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatMonitor.java
+++ b/samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatMonitor.java
@@ -20,44 +20,99 @@
package org.apache.samza.container;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-import junit.framework.Assert;
+import org.junit.Before;
import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
-import static org.mockito.Mockito.*;
public class TestContainerHeartbeatMonitor {
+ @Mock
+ private Runnable onExpired;
+ @Mock
+ private ContainerHeartbeatClient containerHeartbeatClient;
+
+ private ScheduledExecutorService scheduler;
+ /**
+ * Use this to detect when the scheduler has finished executing the fixed-rate task.
+ */
+ private CountDownLatch schedulerFixedRateExecutionLatch;
+
+ private ContainerHeartbeatMonitor containerHeartbeatMonitor;
+
+ @Before
+ public void setup() {
+ MockitoAnnotations.initMocks(this);
+ this.schedulerFixedRateExecutionLatch = new CountDownLatch(1);
+ this.scheduler = buildScheduledExecutorService(this.schedulerFixedRateExecutionLatch);
+ this.containerHeartbeatMonitor =
+ new ContainerHeartbeatMonitor(this.onExpired, this.containerHeartbeatClient, this.scheduler);
+ }
@Test
- public void testCallbackWhenHeartbeatDead()
- throws InterruptedException {
- ContainerHeartbeatClient mockClient = mock(ContainerHeartbeatClient.class);
- CountDownLatch countDownLatch = new CountDownLatch(1);
- Runnable onExpired = () -> {
- countDownLatch.countDown();
- };
- ContainerHeartbeatMonitor monitor = new ContainerHeartbeatMonitor(onExpired, mockClient);
+ public void testCallbackWhenHeartbeatDead() throws InterruptedException {
ContainerHeartbeatResponse response = new ContainerHeartbeatResponse(false);
- when(mockClient.requestHeartbeat()).thenReturn(response);
- monitor.start();
- boolean success = countDownLatch.await(2, TimeUnit.SECONDS);
- Assert.assertTrue(success);
+ when(this.containerHeartbeatClient.requestHeartbeat()).thenReturn(response);
+ this.containerHeartbeatMonitor.start();
+ // wait for the executor to finish the heartbeat check task
+ boolean fixedRateTaskCompleted = this.schedulerFixedRateExecutionLatch.await(2, TimeUnit.SECONDS);
+ assertTrue("Did not complete heartbeat check", fixedRateTaskCompleted);
+ // check that the shutdown task got submitted, but don't actually execute it since it will shut down the process
+ verify(this.scheduler).schedule(any(Runnable.class), eq((long) ContainerHeartbeatMonitor.SHUTDOWN_TIMOUT_MS),
+ eq(TimeUnit.MILLISECONDS));
+ verify(this.onExpired).run();
+
+ this.containerHeartbeatMonitor.stop();
+ verify(this.scheduler).shutdown();
}
@Test
- public void testDoesNotCallbackWhenHeartbeatAlive()
- throws InterruptedException {
- ContainerHeartbeatClient client = mock(ContainerHeartbeatClient.class);
- CountDownLatch countDownLatch = new CountDownLatch(1);
- Runnable onExpired = () -> {
- countDownLatch.countDown();
- };
- ContainerHeartbeatMonitor monitor = new ContainerHeartbeatMonitor(onExpired, client);
+ public void testDoesNotCallbackWhenHeartbeatAlive() throws InterruptedException {
ContainerHeartbeatResponse response = new ContainerHeartbeatResponse(true);
- when(client.requestHeartbeat()).thenReturn(response);
- monitor.start();
- boolean success = countDownLatch.await(2, TimeUnit.SECONDS);
- Assert.assertFalse(success);
- Assert.assertEquals(1, countDownLatch.getCount());
+ when(this.containerHeartbeatClient.requestHeartbeat()).thenReturn(response);
+ this.containerHeartbeatMonitor.start();
+ // wait for the executor to finish the heartbeat check task
+ boolean fixedRateTaskCompleted = this.schedulerFixedRateExecutionLatch.await(2, TimeUnit.SECONDS);
+ assertTrue("Did not complete heartbeat check", fixedRateTaskCompleted);
+ // shutdown task should not have been submitted
+ verify(this.scheduler, never()).schedule(any(Runnable.class), anyLong(), any());
+ verify(this.onExpired, never()).run();
+
+ this.containerHeartbeatMonitor.stop();
+ verify(this.scheduler).shutdown();
+ }
+
+ /**
+ * Build a mock {@link ScheduledExecutorService} which will execute a fixed-rate task once. It will count down on
+ * {@code schedulerFixedRateExecutionLatch} when the task is finished executing.
+ * It will not execute any one-shot tasks, but it can be used to verify that the one-shot task was submitted.
+ */
+ private static ScheduledExecutorService buildScheduledExecutorService(
+ CountDownLatch schedulerFixedRateExecutionLatch) {
+ ScheduledExecutorService scheduler = mock(ScheduledExecutorService.class);
+ when(scheduler.scheduleAtFixedRate(any(), eq(0L), eq((long) ContainerHeartbeatMonitor.SCHEDULE_MS),
+ eq(TimeUnit.MILLISECONDS))).thenAnswer(invocation -> {
+ Runnable command = invocation.getArgumentAt(0, Runnable.class);
+ (new Thread(() -> {
+ // just need to invoke the command once for these tests
+ command.run();
+ // notify that the execution is done, so verifications can begin
+ schedulerFixedRateExecutionLatch.countDown();
+ })).start();
+ // return value is not used by ContainerHeartbeatMonitor
+ return null;
+ });
+ return scheduler;
}
}