You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2015/03/11 17:07:06 UTC

[2/2] samza git commit: SAMZA-506; gracefully shutdown container on SIGTERM

SAMZA-506; gracefully shutdown container on SIGTERM


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/b82d4587
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/b82d4587
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/b82d4587

Branch: refs/heads/master
Commit: b82d458718e43a4a7a580f5a8e65ee7969b0e0f0
Parents: ff40b12
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Authored: Wed Mar 11 09:06:53 2015 -0700
Committer: Chris Riccomini <cr...@apache.org>
Committed: Wed Mar 11 09:06:53 2015 -0700

----------------------------------------------------------------------
 .../org/apache/samza/container/RunLoop.scala    | 31 +++++++++++++++---
 .../apache/samza/container/TestRunLoop.scala    | 34 ++++++++++++++++++++
 2 files changed, 60 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/b82d4587/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
index 499f5c6..4098235 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
@@ -45,7 +45,7 @@ class RunLoop(
   private var lastCommitMs = 0L
   private var taskShutdownRequests: Set[TaskName] = Set()
   private var taskCommitRequests: Set[TaskName] = Set()
-  private var shutdownNow = false
+  @volatile private var shutdownNow = false
 
   // Messages come from the chooser with no connection to the TaskInstance they're bound for.
   // Keep a mapping of SystemStreamPartition to TaskInstance to efficiently route them.
@@ -56,15 +56,36 @@ class RunLoop(
     taskInstances.values.map { getSystemStreamPartitionToTaskInstance }.flatten.toMap
   }
 
+  val shutdownHook = new Thread() {
+    override def run() = {
+      info("Triggering shutdown in response to shutdown hook")
+      shutdownNow = true
+    }
+  }
+
+  protected def addShutdownHook() {
+    Runtime.getRuntime().addShutdownHook(shutdownHook)
+  }
+
+  protected def removeShutdownHook() {
+    Runtime.getRuntime().removeShutdownHook(shutdownHook)
+  }
+
   /**
    * Starts the run loop. Blocks until either the tasks request shutdown, or an
    * unhandled exception is thrown.
    */
   def run {
-    while (!shutdownNow) {
-      process
-      window
-      commit
+    try {
+      addShutdownHook()
+
+      while (!shutdownNow) {
+        process
+        window
+        commit
+      }
+    } finally {
+      removeShutdownHook()
     }
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/b82d4587/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala b/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
index ea48853..fb1ebdd 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
@@ -211,4 +211,38 @@ class TestRunLoop extends AssertionsForJUnit with MockitoSugar with ShouldMatche
     testMetrics.processMs.getSnapshot.getSize should equal(2)
     testMetrics.commitMs.getSnapshot.getSize should equal(2)
   }
+
+  @Test
+  def testShutdownHook: Unit = {
+    // The shutdown hook can't be directly tested so we verify that a) both add and remove
+    // are called and b) invoking the shutdown hook actually kills the run loop.
+    val consumers = mock[SystemConsumers]
+    when(consumers.choose).thenReturn(envelope0)
+    val testMetrics = new SamzaContainerMetrics
+    var addCalled = false
+    var removeCalled = false
+    val runLoop = new RunLoop(
+      taskInstances = getMockTaskInstances,
+      consumerMultiplexer = consumers,
+      metrics = testMetrics) {
+      override def addShutdownHook() {
+        addCalled = true
+      }
+      override def removeShutdownHook() {
+        removeCalled = true
+      }
+    }
+
+    val runThread = new Thread(runLoop)
+    runThread.start()
+
+    runLoop.shutdownHook.start()
+    runLoop.shutdownHook.join(1000)
+    runThread.join(1000)
+
+    assert(addCalled)
+    assert(removeCalled)
+    assert(!runLoop.shutdownHook.isAlive)
+    assert(!runThread.isAlive)
+  }
 }