You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ca...@apache.org on 2020/03/27 22:02:51 UTC

[samza] branch master updated: SAMZA-2496: TestContainerHeartbeatMonitor does not properly stop the ContainerHeartbeatMonitor (#1334)

This is an automated email from the ASF dual-hosted git repository.

cameronlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new bbb5bde  SAMZA-2496: TestContainerHeartbeatMonitor does not properly stop the ContainerHeartbeatMonitor (#1334)
bbb5bde is described below

commit bbb5bdee7ccbd2b37ff07ef7e832f95b6a4c0ab4
Author: Cameron Lee <ca...@linkedin.com>
AuthorDate: Fri Mar 27 15:02:43 2020 -0700

    SAMZA-2496: TestContainerHeartbeatMonitor does not properly stop the ContainerHeartbeatMonitor (#1334)
    
    API/Usage changes: N/A
---
 .../samza/container/ContainerHeartbeatMonitor.java |  17 +++-
 .../container/TestContainerHeartbeatMonitor.java   | 109 ++++++++++++++++-----
 2 files changed, 96 insertions(+), 30 deletions(-)

diff --git a/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java b/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java
index 55fa8da..1a131c3 100644
--- a/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java
+++ b/samza-core/src/main/java/org/apache/samza/container/ContainerHeartbeatMonitor.java
@@ -19,6 +19,7 @@
 
 package org.apache.samza.container;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ThreadFactory;
@@ -33,16 +34,26 @@ import org.slf4j.LoggerFactory;
 public class ContainerHeartbeatMonitor {
   private static final Logger LOG = LoggerFactory.getLogger(ContainerHeartbeatMonitor.class);
   private static final ThreadFactory THREAD_FACTORY = new HeartbeatThreadFactory();
-  private static final int SCHEDULE_MS = 60000;
-  private static final int SHUTDOWN_TIMOUT_MS = 120000;
-  private final ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(THREAD_FACTORY);
+  @VisibleForTesting
+  static final int SCHEDULE_MS = 60000;
+  @VisibleForTesting
+  static final int SHUTDOWN_TIMOUT_MS = 120000;
+
   private final Runnable onContainerExpired;
   private final ContainerHeartbeatClient containerHeartbeatClient;
+  private final ScheduledExecutorService scheduler;
   private boolean started = false;
 
   public ContainerHeartbeatMonitor(Runnable onContainerExpired, ContainerHeartbeatClient containerHeartbeatClient) {
+    this(onContainerExpired, containerHeartbeatClient, Executors.newSingleThreadScheduledExecutor(THREAD_FACTORY));
+  }
+
+  @VisibleForTesting
+  ContainerHeartbeatMonitor(Runnable onContainerExpired, ContainerHeartbeatClient containerHeartbeatClient,
+      ScheduledExecutorService scheduler) {
     this.onContainerExpired = onContainerExpired;
     this.containerHeartbeatClient = containerHeartbeatClient;
+    this.scheduler = scheduler;
   }
 
   public void start() {
diff --git a/samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatMonitor.java b/samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatMonitor.java
index 829a158..65701b3 100644
--- a/samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatMonitor.java
+++ b/samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatMonitor.java
@@ -20,44 +20,99 @@
 package org.apache.samza.container;
 
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import junit.framework.Assert;
+import org.junit.Before;
 import org.junit.Test;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
-import static org.mockito.Mockito.*;
 
 public class TestContainerHeartbeatMonitor {
+  @Mock
+  private Runnable onExpired;
+  @Mock
+  private ContainerHeartbeatClient containerHeartbeatClient;
+
+  private ScheduledExecutorService scheduler;
+  /**
+   * Use this to detect when the scheduler has finished executing the fixed-rate task.
+   */
+  private CountDownLatch schedulerFixedRateExecutionLatch;
+
+  private ContainerHeartbeatMonitor containerHeartbeatMonitor;
+
+  @Before
+  public void setup() {
+    MockitoAnnotations.initMocks(this);
+    this.schedulerFixedRateExecutionLatch = new CountDownLatch(1);
+    this.scheduler = buildScheduledExecutorService(this.schedulerFixedRateExecutionLatch);
+    this.containerHeartbeatMonitor =
+        new ContainerHeartbeatMonitor(this.onExpired, this.containerHeartbeatClient, this.scheduler);
+  }
 
   @Test
-  public void testCallbackWhenHeartbeatDead()
-      throws InterruptedException {
-    ContainerHeartbeatClient mockClient = mock(ContainerHeartbeatClient.class);
-    CountDownLatch countDownLatch = new CountDownLatch(1);
-    Runnable onExpired = () -> {
-      countDownLatch.countDown();
-    };
-    ContainerHeartbeatMonitor monitor = new ContainerHeartbeatMonitor(onExpired, mockClient);
+  public void testCallbackWhenHeartbeatDead() throws InterruptedException {
     ContainerHeartbeatResponse response = new ContainerHeartbeatResponse(false);
-    when(mockClient.requestHeartbeat()).thenReturn(response);
-    monitor.start();
-    boolean success = countDownLatch.await(2, TimeUnit.SECONDS);
-    Assert.assertTrue(success);
+    when(this.containerHeartbeatClient.requestHeartbeat()).thenReturn(response);
+    this.containerHeartbeatMonitor.start();
+    // wait for the executor to finish the heartbeat check task
+    boolean fixedRateTaskCompleted = this.schedulerFixedRateExecutionLatch.await(2, TimeUnit.SECONDS);
+    assertTrue("Did not complete heartbeat check", fixedRateTaskCompleted);
+    // check that the shutdown task got submitted, but don't actually execute it since it will shut down the process
+    verify(this.scheduler).schedule(any(Runnable.class), eq((long) ContainerHeartbeatMonitor.SHUTDOWN_TIMOUT_MS),
+        eq(TimeUnit.MILLISECONDS));
+    verify(this.onExpired).run();
+
+    this.containerHeartbeatMonitor.stop();
+    verify(this.scheduler).shutdown();
   }
 
   @Test
-  public void testDoesNotCallbackWhenHeartbeatAlive()
-      throws InterruptedException {
-    ContainerHeartbeatClient client = mock(ContainerHeartbeatClient.class);
-    CountDownLatch countDownLatch = new CountDownLatch(1);
-    Runnable onExpired = () -> {
-      countDownLatch.countDown();
-    };
-    ContainerHeartbeatMonitor monitor = new ContainerHeartbeatMonitor(onExpired, client);
+  public void testDoesNotCallbackWhenHeartbeatAlive() throws InterruptedException {
     ContainerHeartbeatResponse response = new ContainerHeartbeatResponse(true);
-    when(client.requestHeartbeat()).thenReturn(response);
-    monitor.start();
-    boolean success = countDownLatch.await(2, TimeUnit.SECONDS);
-    Assert.assertFalse(success);
-    Assert.assertEquals(1, countDownLatch.getCount());
+    when(this.containerHeartbeatClient.requestHeartbeat()).thenReturn(response);
+    this.containerHeartbeatMonitor.start();
+    // wait for the executor to finish the heartbeat check task
+    boolean fixedRateTaskCompleted = this.schedulerFixedRateExecutionLatch.await(2, TimeUnit.SECONDS);
+    assertTrue("Did not complete heartbeat check", fixedRateTaskCompleted);
+    // shutdown task should not have been submitted
+    verify(this.scheduler, never()).schedule(any(Runnable.class), anyLong(), any());
+    verify(this.onExpired, never()).run();
+
+    this.containerHeartbeatMonitor.stop();
+    verify(this.scheduler).shutdown();
+  }
+
+  /**
+   * Build a mock {@link ScheduledExecutorService} which will execute a fixed-rate task once. It will count down on
+   * {@code schedulerFixedRateExecutionLatch} when the task is finished executing.
+   * It will not execute any one-shot tasks, but it can be used to verify that the one-shot task was submitted.
+   */
+  private static ScheduledExecutorService buildScheduledExecutorService(
+      CountDownLatch schedulerFixedRateExecutionLatch) {
+    ScheduledExecutorService scheduler = mock(ScheduledExecutorService.class);
+    when(scheduler.scheduleAtFixedRate(any(), eq(0L), eq((long) ContainerHeartbeatMonitor.SCHEDULE_MS),
+        eq(TimeUnit.MILLISECONDS))).thenAnswer(invocation -> {
+            Runnable command = invocation.getArgumentAt(0, Runnable.class);
+            (new Thread(() -> {
+                // just need to invoke the command once for these tests
+                command.run();
+                // notify that the execution is done, so verifications can begin
+                schedulerFixedRateExecutionLatch.countDown();
+              })).start();
+            // return value is not used by ContainerHeartbeatMonitor
+            return null;
+          });
+    return scheduler;
   }
 }