You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by bh...@apache.org on 2020/02/26 19:04:56 UTC
[samza] branch master updated: SAMZA-2468: Make Standby containers
respond to shutdown request (#1290)
This is an automated email from the ASF dual-hosted git repository.
bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new ded197d SAMZA-2468: Make Standby containers respond to shutdown request (#1290)
ded197d is described below
commit ded197de57a3c218780a1bf81130822e078a960d
Author: Sanil Jain <sa...@gmail.com>
AuthorDate: Wed Feb 26 11:04:44 2020 -0800
SAMZA-2468: Make Standby containers respond to shutdown request (#1290)
Make Standby containers to respond to shutdown request
---
.../apache/samza/container/SamzaContainer.scala | 12 +++++--
.../samza/container/TestSamzaContainer.scala | 39 ++++++++++++++++++++++
2 files changed, 48 insertions(+), 3 deletions(-)
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index a9b0417..c78e841 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -26,7 +26,7 @@ import java.nio.file.Path
import java.time.Duration
import java.util
import java.util.{Base64, Optional}
-import java.util.concurrent.{ExecutorService, Executors, ScheduledExecutorService, TimeUnit}
+import java.util.concurrent.{CountDownLatch, ExecutorService, Executors, ScheduledExecutorService, TimeUnit}
import com.google.common.annotations.VisibleForTesting
import com.google.common.util.concurrent.ThreadFactoryBuilder
@@ -714,6 +714,9 @@ class SamzaContainer(
var jmxServer: JmxServer = null
@volatile private var status = SamzaContainerStatus.NOT_STARTED
+
+ @volatile private var standbyContainerShutdownLatch = new CountDownLatch(1);
+
private var exceptionSeen: Throwable = null
private var containerListener: SamzaContainerListener = null
@@ -767,7 +770,7 @@ class SamzaContainer(
if (taskInstances.size > 0)
runLoop.run
else
- Thread.sleep(Long.MaxValue)
+ standbyContainerShutdownLatch.await() // Standby containers do not spin runLoop, instead they wait on signal to invoke shutdown
} catch {
case e: InterruptedException =>
/*
@@ -865,7 +868,10 @@ class SamzaContainer(
return
}
- shutdownRunLoop()
+ if (taskInstances.size > 0)
+ shutdownRunLoop()
+ else
+ standbyContainerShutdownLatch.countDown // Countdown the latch so standby container can invoke a shutdown sequence
}
// Shutdown Runloop
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
index 69223df..a36e475 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
@@ -126,6 +126,45 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
verify(this.runLoop).run()
}
+
+ @Test
+ def testShutDownSequenceForStandbyContainers() {
+ class ShutDownSignal(container: SamzaContainer) extends Runnable {
+ def run(): Unit = {
+ Thread.sleep(2000)
+ container.shutdown();
+ }
+ }
+
+ this.samzaContainer = new SamzaContainer(
+ this.config,
+ Map.empty[TaskName, TaskInstance],
+ Map.empty[TaskName, TaskInstanceMetrics],
+ this.runLoop,
+ this.systemAdmins,
+ this.consumerMultiplexer,
+ this.producerMultiplexer,
+ this.metrics,
+ localityManager = this.localityManager,
+ containerContext = this.containerContext,
+ applicationContainerContextOption = Some(this.applicationContainerContext),
+ externalContextOption = None,
+ containerStorageManager = containerStorageManager)
+ this.samzaContainer.setContainerListener(this.samzaContainerListener)
+
+ new ShutDownSignal(samzaContainer).run();
+ this.samzaContainer.run
+
+ verify(this.samzaContainerListener).beforeStart()
+ verify(this.samzaContainerListener).afterStart()
+ verify(this.samzaContainerListener).afterStop()
+ verify(this.runLoop, never()).run()
+ verify(this.systemAdmins).stop()
+ verify(this.containerStorageManager).shutdown()
+ }
+
+
+
@Test
def testCleanRun(): Unit = {
doNothing().when(this.runLoop).run() // run loop completes successfully