You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2014/08/19 23:33:54 UTC
git commit: SAMZA-382;
properly shutdown jmx server when an exception occurs in samza
container
Repository: incubator-samza
Updated Branches:
refs/heads/master 0d544aed1 -> 42df2b44f
SAMZA-382; properly shutdown jmx server when an exception occurs in samza container
Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/42df2b44
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/42df2b44
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/42df2b44
Branch: refs/heads/master
Commit: 42df2b44f1552b213547079ca17b6eb0221577e2
Parents: 0d544ae
Author: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Authored: Tue Aug 19 14:33:45 2014 -0700
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Tue Aug 19 14:33:45 2014 -0700
----------------------------------------------------------------------
.../apache/samza/container/SamzaContainer.scala | 34 ++++++++++++--------
.../samza/container/TestSamzaContainer.scala | 24 ++++++++++++--
2 files changed, 41 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/42df2b44/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 0ab8a55..04edf50 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -82,21 +82,27 @@ object SamzaContainer extends Logging {
}
def main(args: Array[String]) {
- val jmxServer = new JmxServer
- val containerName = System.getenv(ShellCommandConfig.ENV_CONTAINER_NAME)
-
- /**
- * If the compressed option is enabled in config, de-compress the 'ENV_CONFIG' and 'ENV_SYSTEM_STREAMS'
- * properties. Note: This is a temporary workaround to reduce the size of the config and hence size
- * of the environment variable(s) exported while starting a Samza container (SAMZA-337)
- */
- val isCompressed = System.getenv(ShellCommandConfig.ENV_COMPRESS_CONFIG).equals("TRUE")
- val configStr = getParameter(System.getenv(ShellCommandConfig.ENV_CONFIG), isCompressed)
- val config = JsonConfigSerializer.fromJson(configStr)
- val sspTaskNames = getTaskNameToSystemStreamPartition(getParameter(System.getenv(ShellCommandConfig.ENV_SYSTEM_STREAMS), isCompressed))
- val taskNameToChangeLogPartitionMapping = getTaskNameToChangeLogPartitionMapping(getParameter(System.getenv(ShellCommandConfig.ENV_TASK_NAME_TO_CHANGELOG_PARTITION_MAPPING), isCompressed))
+ safeMain()
+ }
+ def safeMain(jmxServer: JmxServer = new JmxServer) {
+ // Break out the main method to make the JmxServer injectable so we can
+ // validate that we don't leak JMX non-daemon threads if we have an
+ // exception in the main method.
try {
+ val containerName = System.getenv(ShellCommandConfig.ENV_CONTAINER_NAME)
+
+ /**
+ * If the compressed option is enabled in config, de-compress the 'ENV_CONFIG' and 'ENV_SYSTEM_STREAMS'
+ * properties. Note: This is a temporary workaround to reduce the size of the config and hence size
+ * of the environment variable(s) exported while starting a Samza container (SAMZA-337)
+ */
+ val isCompressed = System.getenv(ShellCommandConfig.ENV_COMPRESS_CONFIG).equals("TRUE")
+ val configStr = getParameter(System.getenv(ShellCommandConfig.ENV_CONFIG), isCompressed)
+ val config = JsonConfigSerializer.fromJson(configStr)
+ val sspTaskNames = getTaskNameToSystemStreamPartition(getParameter(System.getenv(ShellCommandConfig.ENV_SYSTEM_STREAMS), isCompressed))
+ val taskNameToChangeLogPartitionMapping = getTaskNameToChangeLogPartitionMapping(getParameter(System.getenv(ShellCommandConfig.ENV_TASK_NAME_TO_CHANGELOG_PARTITION_MAPPING), isCompressed))
+
SamzaContainer(containerName, sspTaskNames, taskNameToChangeLogPartitionMapping, config).run
} finally {
jmxServer.stop
@@ -104,7 +110,7 @@ object SamzaContainer extends Logging {
}
def getTaskNameToSystemStreamPartition(SSPTaskNamesJSON: String) = {
- // Covert into a standard Java map
+ // Convert into a standard Java map
val sspTaskNamesAsJava: Map[TaskName, Set[SystemStreamPartition]] = ShellCommandBuilder.deserializeSystemStreamPartitionSetFromJSON(SSPTaskNamesJSON)
// From that map build the TaskNamesToSystemStreamPartitions
http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/42df2b44/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
index 9fc6771..8a04a8a 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
@@ -42,8 +42,27 @@ import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin
import org.apache.samza.system.SystemStream
import org.apache.samza.system.StreamMetadataCache
import org.apache.samza.task.TaskInstanceCollector
+import org.scalatest.junit.AssertionsForJUnit
+import org.apache.samza.metrics.JmxServer
+
+class TestSamzaContainer extends AssertionsForJUnit {
+ @Test
+ def testJmxServerShutdownOnException {
+ var stopped = false
+ val jmxServer = new JmxServer {
+ override def stop {
+ super.stop
+ stopped = true
+ }
+ }
+ intercept[Exception] {
+ // Calling main will trigger an NPE since the container checks for an
+ // isCompressed environment variable, which isn't set.
+ SamzaContainer.safeMain(jmxServer)
+ }
+ assertTrue(stopped)
+ }
-class TestSamzaContainer {
@Test
def testGetInputStreamMetadata {
val inputStreams = Set(
@@ -98,8 +117,7 @@ class TestSamzaContainer {
val runLoop = new RunLoop(
taskInstances = Map(taskName -> taskInstance),
consumerMultiplexer = consumerMultiplexer,
- metrics = new SamzaContainerMetrics
- )
+ metrics = new SamzaContainerMetrics)
val container = new SamzaContainer(
Map(taskName -> taskInstance),
runLoop,