You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2015/04/08 01:25:37 UTC
[05/36] samza git commit: SAMZA-506;
gracefully shutdown container on SIGTERM
SAMZA-506; gracefully shutdown container on SIGTERM
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/b82d4587
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/b82d4587
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/b82d4587
Branch: refs/heads/samza-sql
Commit: b82d458718e43a4a7a580f5a8e65ee7969b0e0f0
Parents: ff40b12
Author: Ewen Cheslack-Postava <me...@ewencp.org>
Authored: Wed Mar 11 09:06:53 2015 -0700
Committer: Chris Riccomini <cr...@apache.org>
Committed: Wed Mar 11 09:06:53 2015 -0700
----------------------------------------------------------------------
.../org/apache/samza/container/RunLoop.scala | 31 +++++++++++++++---
.../apache/samza/container/TestRunLoop.scala | 34 ++++++++++++++++++++
2 files changed, 60 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/b82d4587/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
index 499f5c6..4098235 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
@@ -45,7 +45,7 @@ class RunLoop(
private var lastCommitMs = 0L
private var taskShutdownRequests: Set[TaskName] = Set()
private var taskCommitRequests: Set[TaskName] = Set()
- private var shutdownNow = false
+ @volatile private var shutdownNow = false
// Messages come from the chooser with no connection to the TaskInstance they're bound for.
// Keep a mapping of SystemStreamPartition to TaskInstance to efficiently route them.
@@ -56,15 +56,36 @@ class RunLoop(
taskInstances.values.map { getSystemStreamPartitionToTaskInstance }.flatten.toMap
}
+ val shutdownHook = new Thread() {
+ override def run() = {
+ info("Triggering shutdown in response to shutdown hook")
+ shutdownNow = true
+ }
+ }
+
+ protected def addShutdownHook() {
+ Runtime.getRuntime().addShutdownHook(shutdownHook)
+ }
+
+ protected def removeShutdownHook() {
+ Runtime.getRuntime().removeShutdownHook(shutdownHook)
+ }
+
/**
* Starts the run loop. Blocks until either the tasks request shutdown, or an
* unhandled exception is thrown.
*/
def run {
- while (!shutdownNow) {
- process
- window
- commit
+ try {
+ addShutdownHook()
+
+ while (!shutdownNow) {
+ process
+ window
+ commit
+ }
+ } finally {
+ removeShutdownHook()
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/b82d4587/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala b/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
index ea48853..fb1ebdd 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
@@ -211,4 +211,38 @@ class TestRunLoop extends AssertionsForJUnit with MockitoSugar with ShouldMatche
testMetrics.processMs.getSnapshot.getSize should equal(2)
testMetrics.commitMs.getSnapshot.getSize should equal(2)
}
+
+ @Test
+ def testShutdownHook: Unit = {
+ // The shutdown hook can't be directly tested so we verify that a) both add and remove
+ // are called and b) invoking the shutdown hook actually kills the run loop.
+ val consumers = mock[SystemConsumers]
+ when(consumers.choose).thenReturn(envelope0)
+ val testMetrics = new SamzaContainerMetrics
+ var addCalled = false
+ var removeCalled = false
+ val runLoop = new RunLoop(
+ taskInstances = getMockTaskInstances,
+ consumerMultiplexer = consumers,
+ metrics = testMetrics) {
+ override def addShutdownHook() {
+ addCalled = true
+ }
+ override def removeShutdownHook() {
+ removeCalled = true
+ }
+ }
+
+ val runThread = new Thread(runLoop)
+ runThread.start()
+
+ runLoop.shutdownHook.start()
+ runLoop.shutdownHook.join(1000)
+ runThread.join(1000)
+
+ assert(addCalled)
+ assert(removeCalled)
+ assert(!runLoop.shutdownHook.isAlive)
+ assert(!runThread.isAlive)
+ }
}