You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2015/04/08 18:25:20 UTC
samza git commit: SAMZA-616;
make shutdown hook wait for container to finish
Repository: samza
Updated Branches:
refs/heads/master edc2c78f8 -> 81f54a2b6
SAMZA-616; make shutdown hook wait for container to finish
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/81f54a2b
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/81f54a2b
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/81f54a2b
Branch: refs/heads/master
Commit: 81f54a2b60628a33068ea786a73abc185a4e9258
Parents: edc2c78
Author: Tommy Becker <tw...@gmail.com>
Authored: Wed Apr 8 09:25:11 2015 -0700
Committer: Chris Riccomini <cr...@apache.org>
Committed: Wed Apr 8 09:25:11 2015 -0700
----------------------------------------------------------------------
.../versioned/jobs/configuration-table.html | 30 +++++++-----
.../org/apache/samza/config/TaskConfig.scala | 6 +++
.../org/apache/samza/container/RunLoop.scala | 48 ++++++++++----------
.../apache/samza/container/SamzaContainer.scala | 7 ++-
.../apache/samza/container/TestRunLoop.scala | 34 --------------
5 files changed, 54 insertions(+), 71 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/81f54a2b/docs/learn/documentation/versioned/jobs/configuration-table.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/jobs/configuration-table.html b/docs/learn/documentation/versioned/jobs/configuration-table.html
index e091460..5ebe8a7 100644
--- a/docs/learn/documentation/versioned/jobs/configuration-table.html
+++ b/docs/learn/documentation/versioned/jobs/configuration-table.html
@@ -358,7 +358,7 @@
<td class="default"></td>
<td class="description">
This property is to define how the system deals with deserialization failure situation. If set to true, the system will
- skip the error messages and keep running. If set to false, the system with throw exceptions and fail the container. Default
+ skip the error messages and keep running. If set to false, the system with throw exceptions and fail the container. Default
is false.
</td>
</tr>
@@ -368,7 +368,7 @@
<td class="default"></td>
<td class="description">
This property is to define how the system deals with serialization failure situation. If set to true, the system will
- drop the error messages and keep running. If set to false, the system with throw exceptions and fail the container. Default
+ drop the error messages and keep running. If set to false, the system with throw exceptions and fail the container. Default
is false.
</td>
</tr>
@@ -392,7 +392,7 @@
<td class="default">false</td>
<td class="description">
Defines whether or not to include log4j's LocationInfo data in Log4j StreamAppender messages. LocationInfo includes
- information such as the file, class, and line that wrote a log message. This setting is only active if the Log4j
+ information such as the file, class, and line that wrote a log message. This setting is only active if the Log4j
stream appender is being used. (See <a href="logging.html#stream-log4j-appender">Stream Log4j Appender</a>)
<dl>
<dt>Example: <code>task.log4j.location.info.enabled=true</code></dt>
@@ -404,13 +404,13 @@
<td class="property" id="task-poll-interval-ms">task.poll.interval.ms</td>
<td class="default"></td>
<td class="description">
- Samza's container polls for more messages under two conditions. The first condition arises when there are simply no remaining
- buffered messages to process for any input SystemStreamPartition. The second condition arises when some input
+ Samza's container polls for more messages under two conditions. The first condition arises when there are simply no remaining
+ buffered messages to process for any input SystemStreamPartition. The second condition arises when some input
SystemStreamPartitions have empty buffers, but some do not. In the latter case, a polling interval is defined to determine how
- often to refresh the empty SystemStreamPartition buffers. By default, this interval is 50ms, which means that any empty
- SystemStreamPartition buffer will be refreshed at least every 50ms. A higher value here means that empty SystemStreamPartitions
- will be refreshed less often, which means more latency is introduced, but less CPU and network will be used. Decreasing this
- value means that empty SystemStreamPartitions are refreshed more frequently, thereby introducing less latency, but increasing
+ often to refresh the empty SystemStreamPartition buffers. By default, this interval is 50ms, which means that any empty
+ SystemStreamPartition buffer will be refreshed at least every 50ms. A higher value here means that empty SystemStreamPartitions
+ will be refreshed less often, which means more latency is introduced, but less CPU and network will be used. Decreasing this
+ value means that empty SystemStreamPartitions are refreshed more frequently, thereby introducing less latency, but increasing
CPU and network utilization.
</td>
</tr>
@@ -426,6 +426,14 @@
</tr>
<tr>
+ <td class="property" id="task-shutdown-ms">task.shutdown.ms</td>
+ <td class="default">5000</td>
+ <td class="description">
+ This property controls how long the Samza container will wait for an orderly shutdown of task instances.
+ </td>
+ </tr>
+
+ <tr>
<th colspan="3" class="section" id="streams"><a href="../container/streams.html">Systems (input and output streams)</a></th>
</tr>
@@ -502,7 +510,7 @@
one of the following:
<dl>
<dt><code>upcoming</code></dt>
- <dd>Start processing messages that are published after the job starts. Any messages published while
+ <dd>Start processing messages that are published after the job starts. Any messages published while
the job was not running are not processed.</dd>
<dt><code>oldest</code></dt>
<dd>Start processing at the oldest available message in the system, and
@@ -766,7 +774,7 @@
</tr>
<tr>
- <td class="property" id="store-changelog-partitions">stores.<span class="store">store-name</span>.changelog.<br>replication.factor</td>
+ <td class="property" id="store-changelog-replication-factor">stores.<span class="store">store-name</span>.changelog.<br>replication.factor</td>
<td class="default">2</td>
<td class="description">
The property defines the number of replicas to use for the change log stream.
http://git-wip-us.apache.org/repos/asf/samza/blob/81f54a2b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
index 1ca9e2c..cd06c06 100644
--- a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
+++ b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala
@@ -27,6 +27,7 @@ object TaskConfig {
val INPUT_STREAMS = "task.inputs" // streaming.input-streams
val WINDOW_MS = "task.window.ms" // window period in milliseconds
val COMMIT_MS = "task.commit.ms" // commit period in milliseconds
+ val SHUTDOWN_MS = "task.shutdown.ms" // how long to wait for a clean shutdown
val TASK_CLASS = "task.class" // streaming.task-factory-class
val COMMAND_BUILDER = "task.command.class" // streaming.task-factory-class
val LIFECYCLE_LISTENERS = "task.lifecycle.listeners" // li-generator,foo
@@ -79,6 +80,11 @@ class TaskConfig(config: Config) extends ScalaMapConfig(config) {
case _ => None
}
+ def getShutdownMs: Option[Long] = getOption(TaskConfig.SHUTDOWN_MS) match {
+ case Some(ms) => Some(ms.toLong)
+ case _ => None
+ }
+
def getLifecycleListeners(): Option[String] = getOption(TaskConfig.LIFECYCLE_LISTENERS)
def getLifecycleListenerClass(name: String): Option[String] = getOption(TaskConfig.LIFECYCLE_LISTENER format name)
http://git-wip-us.apache.org/repos/asf/samza/blob/81f54a2b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
index 4098235..4c0faf6 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/RunLoop.scala
@@ -39,7 +39,8 @@ class RunLoop(
val metrics: SamzaContainerMetrics,
val windowMs: Long = -1,
val commitMs: Long = 60000,
- val clock: () => Long = { System.currentTimeMillis }) extends Runnable with TimerUtils with Logging {
+ val clock: () => Long = { System.currentTimeMillis },
+ val shutdownMs: Long = 5000) extends Runnable with TimerUtils with Logging {
private var lastWindowMs = 0L
private var lastCommitMs = 0L
@@ -56,39 +57,36 @@ class RunLoop(
taskInstances.values.map { getSystemStreamPartitionToTaskInstance }.flatten.toMap
}
- val shutdownHook = new Thread() {
- override def run() = {
- info("Triggering shutdown in response to shutdown hook")
- shutdownNow = true
- }
- }
-
- protected def addShutdownHook() {
- Runtime.getRuntime().addShutdownHook(shutdownHook)
- }
-
- protected def removeShutdownHook() {
- Runtime.getRuntime().removeShutdownHook(shutdownHook)
- }
/**
* Starts the run loop. Blocks until either the tasks request shutdown, or an
* unhandled exception is thrown.
*/
def run {
- try {
- addShutdownHook()
+ addShutdownHook(Thread.currentThread())
- while (!shutdownNow) {
- process
- window
- commit
- }
- } finally {
- removeShutdownHook()
+ while (!shutdownNow) {
+ process
+ window
+ commit
}
}
+ private def addShutdownHook(runLoopThread: Thread) {
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ override def run() = {
+ info("Shutting down, will wait up to %s ms" format shutdownMs)
+ shutdownNow = true
+ runLoopThread.join(shutdownMs)
+ if (runLoopThread.isAlive) {
+ warn("Did not shut down within %s ms, exiting" format shutdownMs)
+ } else {
+ info("Shutdown complete")
+ }
+ }
+ })
+ }
+
/**
* Chooses a message from an input stream to process, and calls the
* process() method on the appropriate StreamTask to handle it.
@@ -189,4 +187,4 @@ class RunLoop(
shutdownNow = true
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/81f54a2b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 5416dd6..66640fe 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -400,6 +400,10 @@ object SamzaContainer extends Logging {
info("Got commit milliseconds: %s" format taskCommitMs)
+ val taskShutdownMs = config.getShutdownMs.getOrElse(5000L)
+
+ info("Got shutdown timeout milliseconds: %s" format taskShutdownMs)
+
// Wire up all task-instance-level (unshared) objects.
val taskNames = containerModel
@@ -509,7 +513,8 @@ object SamzaContainer extends Logging {
consumerMultiplexer = consumerMultiplexer,
metrics = samzaContainerMetrics,
windowMs = taskWindowMs,
- commitMs = taskCommitMs)
+ commitMs = taskCommitMs,
+ shutdownMs = taskShutdownMs)
info("Samza container setup complete.")
http://git-wip-us.apache.org/repos/asf/samza/blob/81f54a2b/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala b/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
index 2a0897f..73ec2b5 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestRunLoop.scala
@@ -211,38 +211,4 @@ class TestRunLoop extends AssertionsForJUnit with MockitoSugar with ScalaTestMat
testMetrics.processMs.getSnapshot.getSize should equal(2)
testMetrics.commitMs.getSnapshot.getSize should equal(2)
}
-
- @Test
- def testShutdownHook: Unit = {
- // The shutdown hook can't be directly tested so we verify that a) both add and remove
- // are called and b) invoking the shutdown hook actually kills the run loop.
- val consumers = mock[SystemConsumers]
- when(consumers.choose).thenReturn(envelope0)
- val testMetrics = new SamzaContainerMetrics
- var addCalled = false
- var removeCalled = false
- val runLoop = new RunLoop(
- taskInstances = getMockTaskInstances,
- consumerMultiplexer = consumers,
- metrics = testMetrics) {
- override def addShutdownHook() {
- addCalled = true
- }
- override def removeShutdownHook() {
- removeCalled = true
- }
- }
-
- val runThread = new Thread(runLoop)
- runThread.start()
-
- runLoop.shutdownHook.start()
- runLoop.shutdownHook.join(1000)
- runThread.join(1000)
-
- assert(addCalled)
- assert(removeCalled)
- assert(!runLoop.shutdownHook.isAlive)
- assert(!runThread.isAlive)
- }
}