You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by cr...@apache.org on 2014/08/19 23:33:54 UTC

git commit: SAMZA-382; properly shutdown jmx server when an exception occurs in samza container

Repository: incubator-samza
Updated Branches:
  refs/heads/master 0d544aed1 -> 42df2b44f


SAMZA-382; properly shutdown jmx server when an exception occurs in samza container


Project: http://git-wip-us.apache.org/repos/asf/incubator-samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-samza/commit/42df2b44
Tree: http://git-wip-us.apache.org/repos/asf/incubator-samza/tree/42df2b44
Diff: http://git-wip-us.apache.org/repos/asf/incubator-samza/diff/42df2b44

Branch: refs/heads/master
Commit: 42df2b44f1552b213547079ca17b6eb0221577e2
Parents: 0d544ae
Author: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Authored: Tue Aug 19 14:33:45 2014 -0700
Committer: Chris Riccomini <cr...@criccomi-mn.linkedin.biz>
Committed: Tue Aug 19 14:33:45 2014 -0700

----------------------------------------------------------------------
 .../apache/samza/container/SamzaContainer.scala | 34 ++++++++++++--------
 .../samza/container/TestSamzaContainer.scala    | 24 ++++++++++++--
 2 files changed, 41 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/42df2b44/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 0ab8a55..04edf50 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -82,21 +82,27 @@ object SamzaContainer extends Logging {
   }
 
   def main(args: Array[String]) {
-    val jmxServer = new JmxServer
-    val containerName = System.getenv(ShellCommandConfig.ENV_CONTAINER_NAME)
-
-    /**
-     * If the compressed option is enabled in config, de-compress the 'ENV_CONFIG' and 'ENV_SYSTEM_STREAMS'
-     * properties. Note: This is a temporary workaround to reduce the size of the config and hence size
-     * of the environment variable(s) exported while starting a Samza container (SAMZA-337)
-     */
-    val isCompressed = System.getenv(ShellCommandConfig.ENV_COMPRESS_CONFIG).equals("TRUE")
-    val configStr = getParameter(System.getenv(ShellCommandConfig.ENV_CONFIG), isCompressed)
-    val config = JsonConfigSerializer.fromJson(configStr)
-    val sspTaskNames = getTaskNameToSystemStreamPartition(getParameter(System.getenv(ShellCommandConfig.ENV_SYSTEM_STREAMS), isCompressed))
-    val taskNameToChangeLogPartitionMapping = getTaskNameToChangeLogPartitionMapping(getParameter(System.getenv(ShellCommandConfig.ENV_TASK_NAME_TO_CHANGELOG_PARTITION_MAPPING), isCompressed))
+    safeMain()
+  }
 
+  def safeMain(jmxServer: JmxServer = new JmxServer) {
+    // Break out the main method to make the JmxServer injectable so we can 
+    // validate that we don't leak JMX non-daemon threads if we have an 
+    // exception in the main method.
     try {
+      val containerName = System.getenv(ShellCommandConfig.ENV_CONTAINER_NAME)
+
+      /**
+       * If the compressed option is enabled in config, de-compress the 'ENV_CONFIG' and 'ENV_SYSTEM_STREAMS'
+       * properties. Note: This is a temporary workaround to reduce the size of the config and hence size
+       * of the environment variable(s) exported while starting a Samza container (SAMZA-337)
+       */
+      val isCompressed = System.getenv(ShellCommandConfig.ENV_COMPRESS_CONFIG).equals("TRUE")
+      val configStr = getParameter(System.getenv(ShellCommandConfig.ENV_CONFIG), isCompressed)
+      val config = JsonConfigSerializer.fromJson(configStr)
+      val sspTaskNames = getTaskNameToSystemStreamPartition(getParameter(System.getenv(ShellCommandConfig.ENV_SYSTEM_STREAMS), isCompressed))
+      val taskNameToChangeLogPartitionMapping = getTaskNameToChangeLogPartitionMapping(getParameter(System.getenv(ShellCommandConfig.ENV_TASK_NAME_TO_CHANGELOG_PARTITION_MAPPING), isCompressed))
+
       SamzaContainer(containerName, sspTaskNames, taskNameToChangeLogPartitionMapping, config).run
     } finally {
       jmxServer.stop
@@ -104,7 +110,7 @@ object SamzaContainer extends Logging {
   }
 
   def getTaskNameToSystemStreamPartition(SSPTaskNamesJSON: String) = {
-    // Covert into a standard Java map
+    // Convert into a standard Java map
     val sspTaskNamesAsJava: Map[TaskName, Set[SystemStreamPartition]] = ShellCommandBuilder.deserializeSystemStreamPartitionSetFromJSON(SSPTaskNamesJSON)
 
     // From that map build the TaskNamesToSystemStreamPartitions

http://git-wip-us.apache.org/repos/asf/incubator-samza/blob/42df2b44/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
index 9fc6771..8a04a8a 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
@@ -42,8 +42,27 @@ import org.apache.samza.util.SinglePartitionWithoutOffsetsSystemAdmin
 import org.apache.samza.system.SystemStream
 import org.apache.samza.system.StreamMetadataCache
 import org.apache.samza.task.TaskInstanceCollector
+import org.scalatest.junit.AssertionsForJUnit
+import org.apache.samza.metrics.JmxServer
+
+class TestSamzaContainer extends AssertionsForJUnit {
+  @Test
+  def testJmxServerShutdownOnException {
+    var stopped = false
+    val jmxServer = new JmxServer {
+      override def stop {
+        super.stop
+        stopped = true
+      }
+    }
+    intercept[Exception] {
+      // Calling main will trigger an NPE since the container checks for an 
+      // isCompressed environment variable, which isn't set.
+      SamzaContainer.safeMain(jmxServer)
+    }
+    assertTrue(stopped)
+  }
 
-class TestSamzaContainer {
   @Test
   def testGetInputStreamMetadata {
     val inputStreams = Set(
@@ -98,8 +117,7 @@ class TestSamzaContainer {
     val runLoop = new RunLoop(
       taskInstances = Map(taskName -> taskInstance),
       consumerMultiplexer = consumerMultiplexer,
-      metrics = new SamzaContainerMetrics
-    )
+      metrics = new SamzaContainerMetrics)
     val container = new SamzaContainer(
       Map(taskName -> taskInstance),
       runLoop,