You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by GitBox <gi...@apache.org> on 2020/03/27 18:47:45 UTC

[GitHub] [samza] cameronlee314 opened a new pull request #1334: SAMZA-2496: TestContainerHeartbeatMonitor does not properly stop the ContainerHeartbeatMonitor

cameronlee314 opened a new pull request #1334: SAMZA-2496: TestContainerHeartbeatMonitor does not properly stop the ContainerHeartbeatMonitor
URL: https://github.com/apache/samza/pull/1334
 
 
   Symptom: In some environments, the samza-core unit test suite fails with `Process 'Gradle Test Executor N' finished with non-zero exit value 1 This problem might be caused by incorrect test process configuration.`
   Cause: `ContainerHeartbeatMonitor` submits a delayed "force shutdown" task (uses `System.exit(1)`) to shut down the process when the job coordinator dies. In `TestContainerHeartbeatMonitor`, this "force shutdown" task is not cancelled when the test is complete. If the remainder of the samza-core tests last longer than the shutdown timeout (2 minutes), then the shutdown task will kill the test suite process with an exit code of 1. In some environments, `TestContainerHeartbeatMonitor` runs late enough in the test suite (it seems like the ordering of test classes is non-deterministic) that the test suite finishes successfully before the 2 minute timeout, so this error is not always seen.
   Fix: In the test, mock the executor service so that the "force shutdown" task never runs.
   Tests: Reduced the force shutdown timeout to 3 seconds in order to consistently reproduce the issue before the change. Then, applied the change and kept the 3-second timeout and verified that there no longer was an issue.
   API/Usage changes: N/A

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [samza] mynameborat commented on a change in pull request #1334: SAMZA-2496: TestContainerHeartbeatMonitor does not properly stop the ContainerHeartbeatMonitor

Posted by GitBox <gi...@apache.org>.
mynameborat commented on a change in pull request #1334: SAMZA-2496: TestContainerHeartbeatMonitor does not properly stop the ContainerHeartbeatMonitor
URL: https://github.com/apache/samza/pull/1334#discussion_r399517388
 
 

 ##########
 File path: samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatMonitor.java
 ##########
 @@ -20,44 +20,77 @@
 package org.apache.samza.container;
 
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import junit.framework.Assert;
 import org.junit.Test;
 
-import static org.mockito.Mockito.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
-public class TestContainerHeartbeatMonitor {
 
+public class TestContainerHeartbeatMonitor {
   @Test
-  public void testCallbackWhenHeartbeatDead()
-      throws InterruptedException {
+  public void testCallbackWhenHeartbeatDead() throws InterruptedException {
     ContainerHeartbeatClient mockClient = mock(ContainerHeartbeatClient.class);
     CountDownLatch countDownLatch = new CountDownLatch(1);
-    Runnable onExpired = () -> {
-      countDownLatch.countDown();
-    };
-    ContainerHeartbeatMonitor monitor = new ContainerHeartbeatMonitor(onExpired, mockClient);
+    Runnable onExpired = countDownLatch::countDown;
     ContainerHeartbeatResponse response = new ContainerHeartbeatResponse(false);
     when(mockClient.requestHeartbeat()).thenReturn(response);
+    ScheduledExecutorService scheduler = buildScheduledExecutorService();
+    ContainerHeartbeatMonitor monitor = new ContainerHeartbeatMonitor(onExpired, mockClient, scheduler);
     monitor.start();
     boolean success = countDownLatch.await(2, TimeUnit.SECONDS);
-    Assert.assertTrue(success);
+    assertTrue(success);
+    // check that the shutdown task got submitted, but don't actually execute it since it will shut down the process
+    verify(scheduler).schedule(any(Runnable.class), eq((long) ContainerHeartbeatMonitor.SHUTDOWN_TIMOUT_MS),
+        eq(TimeUnit.MILLISECONDS));
+
+    monitor.stop();
+    verify(scheduler).shutdown();
   }
 
   @Test
-  public void testDoesNotCallbackWhenHeartbeatAlive()
-      throws InterruptedException {
+  public void testDoesNotCallbackWhenHeartbeatAlive() throws InterruptedException {
     ContainerHeartbeatClient client = mock(ContainerHeartbeatClient.class);
     CountDownLatch countDownLatch = new CountDownLatch(1);
-    Runnable onExpired = () -> {
-      countDownLatch.countDown();
-    };
-    ContainerHeartbeatMonitor monitor = new ContainerHeartbeatMonitor(onExpired, client);
+    Runnable onExpired = countDownLatch::countDown;
     ContainerHeartbeatResponse response = new ContainerHeartbeatResponse(true);
     when(client.requestHeartbeat()).thenReturn(response);
+    ScheduledExecutorService scheduler = buildScheduledExecutorService();
+    ContainerHeartbeatMonitor monitor = new ContainerHeartbeatMonitor(onExpired, client, scheduler);
     monitor.start();
     boolean success = countDownLatch.await(2, TimeUnit.SECONDS);
 
 Review comment:
   I see you are the author of tests. But, do we need to wait for 2 seconds here for this test? Isn't verifying the scheduler for no task submissions below sufficient?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [samza] mynameborat commented on a change in pull request #1334: SAMZA-2496: TestContainerHeartbeatMonitor does not properly stop the ContainerHeartbeatMonitor

Posted by GitBox <gi...@apache.org>.
mynameborat commented on a change in pull request #1334: SAMZA-2496: TestContainerHeartbeatMonitor does not properly stop the ContainerHeartbeatMonitor
URL: https://github.com/apache/samza/pull/1334#discussion_r399522802
 
 

 ##########
 File path: samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatMonitor.java
 ##########
 @@ -20,44 +20,77 @@
 package org.apache.samza.container;
 
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import junit.framework.Assert;
 import org.junit.Test;
 
-import static org.mockito.Mockito.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
-public class TestContainerHeartbeatMonitor {
 
+public class TestContainerHeartbeatMonitor {
   @Test
-  public void testCallbackWhenHeartbeatDead()
-      throws InterruptedException {
+  public void testCallbackWhenHeartbeatDead() throws InterruptedException {
     ContainerHeartbeatClient mockClient = mock(ContainerHeartbeatClient.class);
     CountDownLatch countDownLatch = new CountDownLatch(1);
-    Runnable onExpired = () -> {
-      countDownLatch.countDown();
-    };
-    ContainerHeartbeatMonitor monitor = new ContainerHeartbeatMonitor(onExpired, mockClient);
+    Runnable onExpired = countDownLatch::countDown;
     ContainerHeartbeatResponse response = new ContainerHeartbeatResponse(false);
     when(mockClient.requestHeartbeat()).thenReturn(response);
+    ScheduledExecutorService scheduler = buildScheduledExecutorService();
+    ContainerHeartbeatMonitor monitor = new ContainerHeartbeatMonitor(onExpired, mockClient, scheduler);
     monitor.start();
     boolean success = countDownLatch.await(2, TimeUnit.SECONDS);
-    Assert.assertTrue(success);
+    assertTrue(success);
+    // check that the shutdown task got submitted, but don't actually execute it since it will shut down the process
+    verify(scheduler).schedule(any(Runnable.class), eq((long) ContainerHeartbeatMonitor.SHUTDOWN_TIMOUT_MS),
+        eq(TimeUnit.MILLISECONDS));
+
+    monitor.stop();
+    verify(scheduler).shutdown();
   }
 
   @Test
-  public void testDoesNotCallbackWhenHeartbeatAlive()
-      throws InterruptedException {
+  public void testDoesNotCallbackWhenHeartbeatAlive() throws InterruptedException {
     ContainerHeartbeatClient client = mock(ContainerHeartbeatClient.class);
     CountDownLatch countDownLatch = new CountDownLatch(1);
-    Runnable onExpired = () -> {
-      countDownLatch.countDown();
-    };
-    ContainerHeartbeatMonitor monitor = new ContainerHeartbeatMonitor(onExpired, client);
+    Runnable onExpired = countDownLatch::countDown;
     ContainerHeartbeatResponse response = new ContainerHeartbeatResponse(true);
     when(client.requestHeartbeat()).thenReturn(response);
+    ScheduledExecutorService scheduler = buildScheduledExecutorService();
+    ContainerHeartbeatMonitor monitor = new ContainerHeartbeatMonitor(onExpired, client, scheduler);
     monitor.start();
     boolean success = countDownLatch.await(2, TimeUnit.SECONDS);
 
 Review comment:
   I see that makes sense. Then can we have `buildScheduledExecutorService` take a `CountDownLatch` and call run on the `Runnable` directly instead of spinning up a thread and finally decrementing the lathc?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [samza] cameronlee314 commented on a change in pull request #1334: SAMZA-2496: TestContainerHeartbeatMonitor does not properly stop the ContainerHeartbeatMonitor

Posted by GitBox <gi...@apache.org>.
cameronlee314 commented on a change in pull request #1334: SAMZA-2496: TestContainerHeartbeatMonitor does not properly stop the ContainerHeartbeatMonitor
URL: https://github.com/apache/samza/pull/1334#discussion_r399551266
 
 

 ##########
 File path: samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatMonitor.java
 ##########
 @@ -20,44 +20,77 @@
 package org.apache.samza.container;
 
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import junit.framework.Assert;
 import org.junit.Test;
 
-import static org.mockito.Mockito.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
-public class TestContainerHeartbeatMonitor {
 
+public class TestContainerHeartbeatMonitor {
   @Test
-  public void testCallbackWhenHeartbeatDead()
-      throws InterruptedException {
+  public void testCallbackWhenHeartbeatDead() throws InterruptedException {
     ContainerHeartbeatClient mockClient = mock(ContainerHeartbeatClient.class);
     CountDownLatch countDownLatch = new CountDownLatch(1);
-    Runnable onExpired = () -> {
-      countDownLatch.countDown();
-    };
-    ContainerHeartbeatMonitor monitor = new ContainerHeartbeatMonitor(onExpired, mockClient);
+    Runnable onExpired = countDownLatch::countDown;
     ContainerHeartbeatResponse response = new ContainerHeartbeatResponse(false);
     when(mockClient.requestHeartbeat()).thenReturn(response);
+    ScheduledExecutorService scheduler = buildScheduledExecutorService();
+    ContainerHeartbeatMonitor monitor = new ContainerHeartbeatMonitor(onExpired, mockClient, scheduler);
     monitor.start();
     boolean success = countDownLatch.await(2, TimeUnit.SECONDS);
-    Assert.assertTrue(success);
+    assertTrue(success);
+    // check that the shutdown task got submitted, but don't actually execute it since it will shut down the process
+    verify(scheduler).schedule(any(Runnable.class), eq((long) ContainerHeartbeatMonitor.SHUTDOWN_TIMOUT_MS),
+        eq(TimeUnit.MILLISECONDS));
+
+    monitor.stop();
+    verify(scheduler).shutdown();
   }
 
   @Test
-  public void testDoesNotCallbackWhenHeartbeatAlive()
-      throws InterruptedException {
+  public void testDoesNotCallbackWhenHeartbeatAlive() throws InterruptedException {
     ContainerHeartbeatClient client = mock(ContainerHeartbeatClient.class);
     CountDownLatch countDownLatch = new CountDownLatch(1);
-    Runnable onExpired = () -> {
-      countDownLatch.countDown();
-    };
-    ContainerHeartbeatMonitor monitor = new ContainerHeartbeatMonitor(onExpired, client);
+    Runnable onExpired = countDownLatch::countDown;
     ContainerHeartbeatResponse response = new ContainerHeartbeatResponse(true);
     when(client.requestHeartbeat()).thenReturn(response);
+    ScheduledExecutorService scheduler = buildScheduledExecutorService();
+    ContainerHeartbeatMonitor monitor = new ContainerHeartbeatMonitor(onExpired, client, scheduler);
     monitor.start();
     boolean success = countDownLatch.await(2, TimeUnit.SECONDS);
 
 Review comment:
   Good idea. This prevents the extra 2s wait and it allows for some common initialization. Updated the tests.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [samza] cameronlee314 merged pull request #1334: SAMZA-2496: TestContainerHeartbeatMonitor does not properly stop the ContainerHeartbeatMonitor

Posted by GitBox <gi...@apache.org>.
cameronlee314 merged pull request #1334: SAMZA-2496: TestContainerHeartbeatMonitor does not properly stop the ContainerHeartbeatMonitor
URL: https://github.com/apache/samza/pull/1334
 
 
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [samza] cameronlee314 commented on a change in pull request #1334: SAMZA-2496: TestContainerHeartbeatMonitor does not properly stop the ContainerHeartbeatMonitor

Posted by GitBox <gi...@apache.org>.
cameronlee314 commented on a change in pull request #1334: SAMZA-2496: TestContainerHeartbeatMonitor does not properly stop the ContainerHeartbeatMonitor
URL: https://github.com/apache/samza/pull/1334#discussion_r399520833
 
 

 ##########
 File path: samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatMonitor.java
 ##########
 @@ -20,44 +20,77 @@
 package org.apache.samza.container;
 
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import junit.framework.Assert;
 import org.junit.Test;
 
-import static org.mockito.Mockito.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
-public class TestContainerHeartbeatMonitor {
 
+public class TestContainerHeartbeatMonitor {
   @Test
-  public void testCallbackWhenHeartbeatDead()
-      throws InterruptedException {
+  public void testCallbackWhenHeartbeatDead() throws InterruptedException {
     ContainerHeartbeatClient mockClient = mock(ContainerHeartbeatClient.class);
     CountDownLatch countDownLatch = new CountDownLatch(1);
-    Runnable onExpired = () -> {
-      countDownLatch.countDown();
-    };
-    ContainerHeartbeatMonitor monitor = new ContainerHeartbeatMonitor(onExpired, mockClient);
+    Runnable onExpired = countDownLatch::countDown;
     ContainerHeartbeatResponse response = new ContainerHeartbeatResponse(false);
     when(mockClient.requestHeartbeat()).thenReturn(response);
+    ScheduledExecutorService scheduler = buildScheduledExecutorService();
+    ContainerHeartbeatMonitor monitor = new ContainerHeartbeatMonitor(onExpired, mockClient, scheduler);
     monitor.start();
     boolean success = countDownLatch.await(2, TimeUnit.SECONDS);
-    Assert.assertTrue(success);
+    assertTrue(success);
+    // check that the shutdown task got submitted, but don't actually execute it since it will shut down the process
+    verify(scheduler).schedule(any(Runnable.class), eq((long) ContainerHeartbeatMonitor.SHUTDOWN_TIMOUT_MS),
+        eq(TimeUnit.MILLISECONDS));
+
+    monitor.stop();
+    verify(scheduler).shutdown();
   }
 
   @Test
-  public void testDoesNotCallbackWhenHeartbeatAlive()
-      throws InterruptedException {
+  public void testDoesNotCallbackWhenHeartbeatAlive() throws InterruptedException {
     ContainerHeartbeatClient client = mock(ContainerHeartbeatClient.class);
     CountDownLatch countDownLatch = new CountDownLatch(1);
-    Runnable onExpired = () -> {
-      countDownLatch.countDown();
-    };
-    ContainerHeartbeatMonitor monitor = new ContainerHeartbeatMonitor(onExpired, client);
+    Runnable onExpired = countDownLatch::countDown;
     ContainerHeartbeatResponse response = new ContainerHeartbeatResponse(true);
     when(client.requestHeartbeat()).thenReturn(response);
+    ScheduledExecutorService scheduler = buildScheduledExecutorService();
+    ContainerHeartbeatMonitor monitor = new ContainerHeartbeatMonitor(onExpired, client, scheduler);
     monitor.start();
     boolean success = countDownLatch.await(2, TimeUnit.SECONDS);
 
 Review comment:
   The task executes asynchronously, so this 2 seconds gives some time to make sure that it doesn't actually execute. If we check for no task submission immediately, and there was a bug which caused the task to be submitted, it's possible that the test would still pass because the verification happened before the async execution.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

[GitHub] [samza] mynameborat commented on a change in pull request #1334: SAMZA-2496: TestContainerHeartbeatMonitor does not properly stop the ContainerHeartbeatMonitor

Posted by GitBox <gi...@apache.org>.
mynameborat commented on a change in pull request #1334: SAMZA-2496: TestContainerHeartbeatMonitor does not properly stop the ContainerHeartbeatMonitor
URL: https://github.com/apache/samza/pull/1334#discussion_r399517388
 
 

 ##########
 File path: samza-core/src/test/java/org/apache/samza/container/TestContainerHeartbeatMonitor.java
 ##########
 @@ -20,44 +20,77 @@
 package org.apache.samza.container;
 
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
-import junit.framework.Assert;
 import org.junit.Test;
 
-import static org.mockito.Mockito.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
-public class TestContainerHeartbeatMonitor {
 
+public class TestContainerHeartbeatMonitor {
   @Test
-  public void testCallbackWhenHeartbeatDead()
-      throws InterruptedException {
+  public void testCallbackWhenHeartbeatDead() throws InterruptedException {
     ContainerHeartbeatClient mockClient = mock(ContainerHeartbeatClient.class);
     CountDownLatch countDownLatch = new CountDownLatch(1);
-    Runnable onExpired = () -> {
-      countDownLatch.countDown();
-    };
-    ContainerHeartbeatMonitor monitor = new ContainerHeartbeatMonitor(onExpired, mockClient);
+    Runnable onExpired = countDownLatch::countDown;
     ContainerHeartbeatResponse response = new ContainerHeartbeatResponse(false);
     when(mockClient.requestHeartbeat()).thenReturn(response);
+    ScheduledExecutorService scheduler = buildScheduledExecutorService();
+    ContainerHeartbeatMonitor monitor = new ContainerHeartbeatMonitor(onExpired, mockClient, scheduler);
     monitor.start();
     boolean success = countDownLatch.await(2, TimeUnit.SECONDS);
-    Assert.assertTrue(success);
+    assertTrue(success);
+    // check that the shutdown task got submitted, but don't actually execute it since it will shut down the process
+    verify(scheduler).schedule(any(Runnable.class), eq((long) ContainerHeartbeatMonitor.SHUTDOWN_TIMOUT_MS),
+        eq(TimeUnit.MILLISECONDS));
+
+    monitor.stop();
+    verify(scheduler).shutdown();
   }
 
   @Test
-  public void testDoesNotCallbackWhenHeartbeatAlive()
-      throws InterruptedException {
+  public void testDoesNotCallbackWhenHeartbeatAlive() throws InterruptedException {
     ContainerHeartbeatClient client = mock(ContainerHeartbeatClient.class);
     CountDownLatch countDownLatch = new CountDownLatch(1);
-    Runnable onExpired = () -> {
-      countDownLatch.countDown();
-    };
-    ContainerHeartbeatMonitor monitor = new ContainerHeartbeatMonitor(onExpired, client);
+    Runnable onExpired = countDownLatch::countDown;
     ContainerHeartbeatResponse response = new ContainerHeartbeatResponse(true);
     when(client.requestHeartbeat()).thenReturn(response);
+    ScheduledExecutorService scheduler = buildScheduledExecutorService();
+    ContainerHeartbeatMonitor monitor = new ContainerHeartbeatMonitor(onExpired, client, scheduler);
     monitor.start();
     boolean success = countDownLatch.await(2, TimeUnit.SECONDS);
 
 Review comment:
   I see you are not the author of tests. But, do we need to wait for 2 seconds here for this test? Isn't verifying the scheduler for no task submissions below sufficient?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services