You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2015/04/08 18:25:20 UTC

samza git commit: SAMZA-616; make shutdown hook wait for container to finish

Repository: samza
Updated Branches:
  refs/heads/master edc2c78f8 -> 81f54a2b6


SAMZA-616; make shutdown hook wait for container to finish


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/81f54a2b
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/81f54a2b
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/81f54a2b

Branch: refs/heads/master
Commit: 81f54a2b60628a33068ea786a73abc185a4e9258
Parents: edc2c78
Author: Tommy Becker <tw...@gmail.com>
Authored: Wed Apr 8 09:25:11 2015 -0700
Committer: Chris Riccomini <cr...@apache.org>
Committed: Wed Apr 8 09:25:11 2015 -0700

----------------------------------------------------------------------
 .../versioned/jobs/configuration-table.html     | 30 +++++++-----
 .../org/apache/samza/config/TaskConfig.scala    |  6 +++
 .../org/apache/samza/container/RunLoop.scala    | 48 ++++++++++----------
 .../apache/samza/container/SamzaContainer.scala |  7 ++-
 .../apache/samza/container/TestRunLoop.scala    | 34 --------------
 5 files changed, 54 insertions(+), 71 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/81f54a2b/docs/learn/documentation/versioned/jobs/configuration-table.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html
index e091460..5ebe8a7 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -358,7 +358,7 @@
                     <td class="default"></td>
                     <td class="description">
                         This property is to define how the system deals with deserialization failure situation. If set to true, the system will
-                        skip the error messages and keep running. If set to false, the system with throw exceptions and fail the container. Default 
+                        skip the error messages and keep running. If set to false, the system with throw exceptions and fail the container. Default
                         is false.
                     </td>
                 </tr>
@@ -368,7 +368,7 @@
                     <td class="default"></td>
                     <td class="description">
                         This property is to define how the system deals with serialization failure situation. If set to true, the system will
-                        drop the error messages and keep running. If set to false, the system with throw exceptions and fail the container. Default 
+                        drop the error messages and keep running. If set to false, the system with throw exceptions and fail the container. Default
                         is false.
                     </td>
                 </tr>
@@ -392,7 +392,7 @@
                     <td class="default">false</td>
                     <td class="description">
                         Defines whether or not to include log4j's LocationInfo data in Log4j StreamAppender messages. LocationInfo includes
-                        information such as the file, class, and line that wrote a log message. This setting is only active if the Log4j 
+                        information such as the file, class, and line that wrote a log message. This setting is only active if the Log4j
                         stream appender is being used. (See <a href="logging.html#stream-log4j-appender">Stream Log4j Appender</a>)
                         <dl>
                             <dt>Example: <code>task.log4j.location.info.enabled=true</code></dt>
@@ -404,13 +404,13 @@
                     <td class="property" id="task-poll-interval-ms">task.poll.interval.ms</td>
                     <td class="default"></td>
                     <td class="description">
-                      Samza's container polls for more messages under two conditions. The first condition arises when there are simply no remaining 
-                      buffered messages to process for any input SystemStreamPartition. The second condition arises when some input 
+                      Samza's container polls for more messages under two conditions. The first condition arises when there are simply no remaining
+                      buffered messages to process for any input SystemStreamPartition. The second condition arises when some input
                       SystemStreamPartitions have empty buffers, but some do not. In the latter case, a polling interval is defined to determine how
-                      often to refresh the empty SystemStreamPartition buffers. By default, this interval is 50ms, which means that any empty 
-                      SystemStreamPartition buffer will be refreshed at least every 50ms. A higher value here means that empty SystemStreamPartitions 
-                      will be refreshed less often, which means more latency is introduced, but less CPU and network will be used. Decreasing this 
-                      value means that empty SystemStreamPartitions are refreshed more frequently, thereby introducing less latency, but increasing 
+                      often to refresh the empty SystemStreamPartition buffers. By default, this interval is 50ms, which means that any empty
+                      SystemStreamPartition buffer will be refreshed at least every 50ms. A higher value here means that empty SystemStreamPartitions
+                      will be refreshed less often, which means more latency is introduced, but less CPU and network will be used. Decreasing this
+                      value means that empty SystemStreamPartitions are refreshed more frequently, thereby introducing less latency, but increasing
                       CPU and network utilization.
                     </td>
                 </tr>
@@ -426,6 +426,14 @@
                 </tr>
 
                 <tr>
+                    <td class="property" id="task-shutdown-ms">task.shutdown.ms</td>
+                    <td class="default">5000</td>
+                    <td class="description">
+                        This property controls how long the Samza container will wait for an orderly shutdown of task instances.
+                    </td>
+                </tr>
+
+                <tr>
                     <th colspan="3" class="section" id="streams"><a href="../container/streams.html">Systems (input and output streams)</a></th>
                 </tr>
 
@@ -502,7 +510,7 @@
                         one of the following:
                         <dl>
                             <dt><code>upcoming</code></dt>
-                            <dd>Start processing messages that are published after the job starts. Any messages published while 
+                            <dd>Start processing messages that are published after the job starts. Any messages published while
                                 the job was not running are not processed.</dd>
                             <dt><code>oldest</code></dt>
                             <dd>Start processing at the oldest available message in the system, and
@@ -766,7 +774,7 @@
                 </tr>
 
                 <tr>
-                    <td class="property" id="store-changelog-partitions">stores.<span class="store">store-name</span>.changelog.<br>replication.factor</td>
+                    <td class="property" id="store-changelog-replication-factor">stores.<span class="store">store-name</span>.changelog.<br>replication.factor</td>
                     <td class="default">2</td>
                     <td class="description">
                         The property defines the number of replicas to use for the change log stream.

http://git-wip-us.apache.org/repos/asf/samza/blob/81f54a2b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
index 1ca9e2c..cd06c06 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
@@ -27,6 +27,7 @@ object TaskConfig {
   val INPUT_STREAMS = "task.inputs" // streaming.input-streams
   val WINDOW_MS = "task.window.ms" // window period in milliseconds
   val COMMIT_MS = "task.commit.ms" // commit period in milliseconds
+  val SHUTDOWN_MS = "task.shutdown.ms" // how long to wait for a clean shutdown
   val TASK_CLASS = "task.class" // streaming.task-factory-class
   val COMMAND_BUILDER = "task.command.class" // streaming.task-factory-class
   val LIFECYCLE_LISTENERS = "task.lifecycle.listeners" // li-generator,foo
@@ -79,6 +80,11 @@ class TaskConfig(config: Config) extends ScalaMapConfig(config) {
     case _ => None
   }
 
+  def getShutdownMs: Option[Long] = getOption(TaskConfig.SHUTDOWN_MS) match {
+    case Some(ms) => Some(ms.toLong)
+    case _ => None
+  }
+
   def getLifecycleListeners(): Option[String] = getOption(TaskConfig.LIFECYCLE_LISTENERS)
 
   def getLifecycleListenerClass(name: String): Option[String] = getOption(TaskConfig.LIFECYCLE_LISTENER format name)

http://git-wip-us.apache.org/repos/asf/samza/blob/81f54a2b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
index 4098235..4c0faf6 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
@@ -39,7 +39,8 @@ class RunLoop(
   val metrics: SamzaContainerMetrics,
   val windowMs: Long = -1,
   val commitMs: Long = 60000,
-  val clock: () => Long = { System.currentTimeMillis }) extends Runnable with TimerUtils with Logging {
+  val clock: () => Long = { System.currentTimeMillis },
+  val shutdownMs: Long = 5000) extends Runnable with TimerUtils with Logging {
 
   private var lastWindowMs = 0L
   private var lastCommitMs = 0L
@@ -56,39 +57,36 @@ class RunLoop(
     taskInstances.values.map { getSystemStreamPartitionToTaskInstance }.flatten.toMap
   }
 
-  val shutdownHook = new Thread() {
-    override def run() = {
-      info("Triggering shutdown in response to shutdown hook")
-      shutdownNow = true
-    }
-  }
-
-  protected def addShutdownHook() {
-    Runtime.getRuntime().addShutdownHook(shutdownHook)
-  }
-
-  protected def removeShutdownHook() {
-    Runtime.getRuntime().removeShutdownHook(shutdownHook)
-  }
 
   /**
    * Starts the run loop. Blocks until either the tasks request shutdown, or an
    * unhandled exception is thrown.
    */
   def run {
-    try {
-      addShutdownHook()
+    addShutdownHook(Thread.currentThread())
 
-      while (!shutdownNow) {
-        process
-        window
-        commit
-      }
-    } finally {
-      removeShutdownHook()
+    while (!shutdownNow) {
+      process
+      window
+      commit
     }
   }
 
+  private def addShutdownHook(runLoopThread: Thread) {
+    Runtime.getRuntime().addShutdownHook(new Thread() {
+      override def run() = {
+        info("Shutting down, will wait up to %s ms" format shutdownMs)
+        shutdownNow = true
+        runLoopThread.join(shutdownMs)
+        if (runLoopThread.isAlive) {
+          warn("Did not shut down within %s ms, exiting" format shutdownMs)
+        } else {
+          info("Shutdown complete")
+        }
+      }
+    })
+  }
+
   /**
    * Chooses a message from an input stream to process, and calls the
    * process() method on the appropriate StreamTask to handle it.
@@ -189,4 +187,4 @@ class RunLoop(
       shutdownNow = true
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/81f54a2b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 5416dd6..66640fe 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -400,6 +400,10 @@ object SamzaContainer extends Logging {
 
     info("Got commit milliseconds: %s" format taskCommitMs)
 
+    val taskShutdownMs = config.getShutdownMs.getOrElse(5000L)
+
+    info("Got shutdown timeout milliseconds: %s" format taskShutdownMs)
+
     // Wire up all task-instance-level (unshared) objects.
 
     val taskNames = containerModel
@@ -509,7 +513,8 @@ object SamzaContainer extends Logging {
       consumerMultiplexer = consumerMultiplexer,
       metrics = samzaContainerMetrics,
       windowMs = taskWindowMs,
-      commitMs = taskCommitMs)
+      commitMs = taskCommitMs,
+      shutdownMs = taskShutdownMs)
 
     info("Samza container setup complete.")
 

http://git-wip-us.apache.org/repos/asf/samza/blob/81f54a2b/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala b/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
index 2a0897f..73ec2b5 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
@@ -211,38 +211,4 @@ class TestRunLoop extends AssertionsForJUnit with MockitoSugar with ScalaTestMat
     testMetrics.processMs.getSnapshot.getSize should equal(2)
     testMetrics.commitMs.getSnapshot.getSize should equal(2)
   }
-
-  @Test
-  def testShutdownHook: Unit = {
-    // The shutdown hook can't be directly tested so we verify that a) both add and remove
-    // are called and b) invoking the shutdown hook actually kills the run loop.
-    val consumers = mock[SystemConsumers]
-    when(consumers.choose).thenReturn(envelope0)
-    val testMetrics = new SamzaContainerMetrics
-    var addCalled = false
-    var removeCalled = false
-    val runLoop = new RunLoop(
-      taskInstances = getMockTaskInstances,
-      consumerMultiplexer = consumers,
-      metrics = testMetrics) {
-      override def addShutdownHook() {
-        addCalled = true
-      }
-      override def removeShutdownHook() {
-        removeCalled = true
-      }
-    }
-
-    val runThread = new Thread(runLoop)
-    runThread.start()
-
-    runLoop.shutdownHook.start()
-    runLoop.shutdownHook.join(1000)
-    runThread.join(1000)
-
-    assert(addCalled)
-    assert(removeCalled)
-    assert(!runLoop.shutdownHook.isAlive)
-    assert(!runThread.isAlive)
-  }
 }