You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bh...@apache.org on 2020/02/26 19:04:56 UTC

[samza] branch master updated: SAMZA-2468: Make Standby containers respond to shutdown request (#1290)

This is an automated email from the ASF dual-hosted git repository.

bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new ded197d  SAMZA-2468: Make Standby containers respond to shutdown request (#1290)
ded197d is described below

commit ded197de57a3c218780a1bf81130822e078a960d
Author: Sanil Jain <sa...@gmail.com>
AuthorDate: Wed Feb 26 11:04:44 2020 -0800

    SAMZA-2468: Make Standby containers respond to shutdown request (#1290)
    
    Make Standby containers to respond to shutdown request
---
 .../apache/samza/container/SamzaContainer.scala    | 12 +++++--
 .../samza/container/TestSamzaContainer.scala       | 39 ++++++++++++++++++++++
 2 files changed, 48 insertions(+), 3 deletions(-)

diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index a9b0417..c78e841 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -26,7 +26,7 @@ import java.nio.file.Path
 import java.time.Duration
 import java.util
 import java.util.{Base64, Optional}
-import java.util.concurrent.{ExecutorService, Executors, ScheduledExecutorService, TimeUnit}
+import java.util.concurrent.{CountDownLatch, ExecutorService, Executors, ScheduledExecutorService, TimeUnit}
 
 import com.google.common.annotations.VisibleForTesting
 import com.google.common.util.concurrent.ThreadFactoryBuilder
@@ -714,6 +714,9 @@ class SamzaContainer(
   var jmxServer: JmxServer = null
 
   @volatile private var status = SamzaContainerStatus.NOT_STARTED
+
+  @volatile private var standbyContainerShutdownLatch = new CountDownLatch(1);
+
   private var exceptionSeen: Throwable = null
   private var containerListener: SamzaContainerListener = null
 
@@ -767,7 +770,7 @@ class SamzaContainer(
       if (taskInstances.size > 0)
         runLoop.run
       else
-        Thread.sleep(Long.MaxValue)
+        standbyContainerShutdownLatch.await() // Standby containers do not spin runLoop, instead they wait on signal to invoke shutdown
     } catch {
       case e: InterruptedException =>
         /*
@@ -865,7 +868,10 @@ class SamzaContainer(
       return
     }
 
-    shutdownRunLoop()
+    if (taskInstances.size > 0)
+      shutdownRunLoop()
+    else
+      standbyContainerShutdownLatch.countDown // Countdown the latch so standby container can invoke a shutdown sequence
   }
 
   // Shutdown Runloop
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
index 69223df..a36e475 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
@@ -126,6 +126,45 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
     verify(this.runLoop).run()
   }
 
+
+  @Test
+  def testShutDownSequenceForStandbyContainers() {
+    class ShutDownSignal(container: SamzaContainer) extends Runnable {
+      def run(): Unit = {
+        Thread.sleep(2000)
+        container.shutdown();
+      }
+    }
+
+    this.samzaContainer = new SamzaContainer(
+      this.config,
+      Map.empty[TaskName, TaskInstance],
+      Map.empty[TaskName, TaskInstanceMetrics],
+      this.runLoop,
+      this.systemAdmins,
+      this.consumerMultiplexer,
+      this.producerMultiplexer,
+      this.metrics,
+      localityManager = this.localityManager,
+      containerContext = this.containerContext,
+      applicationContainerContextOption = Some(this.applicationContainerContext),
+      externalContextOption = None,
+      containerStorageManager = containerStorageManager)
+    this.samzaContainer.setContainerListener(this.samzaContainerListener)
+
+    new ShutDownSignal(samzaContainer).run();
+    this.samzaContainer.run
+
+    verify(this.samzaContainerListener).beforeStart()
+    verify(this.samzaContainerListener).afterStart()
+    verify(this.samzaContainerListener).afterStop()
+    verify(this.runLoop, never()).run()
+    verify(this.systemAdmins).stop()
+    verify(this.containerStorageManager).shutdown()
+  }
+
+
+
   @Test
   def testCleanRun(): Unit = {
     doNothing().when(this.runLoop).run() // run loop completes successfully