You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2017/05/05 20:22:43 UTC

samza git commit: SAMZA-1228 : StreamProcessor should stop JmxServer

Repository: samza
Updated Branches:
  refs/heads/master 7dbc4c95f -> 67b195363


SAMZA-1228 : StreamProcessor should stop JmxServer

This is not the solution posted in SAMZA-1228. For now, we are moving jmxserver lifecycle to be within the container. Ideally, it should be within the Streamprocessor so that the job coordinator can also be associated with the same instance.

Author: Navina Ramesh <na...@apache.org>

Reviewers: Xinyu Liu <xi...@linkedin.com>, Prateek Maheshwari <pm...@linkedin.com>

Closes #162 from navina/SAMZA-1228


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/67b19536
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/67b19536
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/67b19536

Branch: refs/heads/master
Commit: 67b195363a921de537003faf72615b9349998996
Parents: 7dbc4c9
Author: Navina Ramesh <na...@apache.org>
Authored: Fri May 5 13:22:35 2017 -0700
Committer: nramesh <nr...@linkedin.com>
Committed: Fri May 5 13:22:35 2017 -0700

----------------------------------------------------------------------
 .../apache/samza/processor/StreamProcessor.java |  7 +--
 .../samza/runtime/LocalContainerRunner.java     | 63 +++++++++-----------
 .../apache/samza/container/SamzaContainer.scala |  9 ++-
 .../samza/job/local/ThreadJobFactory.scala      |  1 -
 .../samza/processor/TestStreamProcessor.java    |  4 +-
 .../samza/container/TestSamzaContainer.scala    | 18 ++----
 .../processor/StreamProcessorTestUtils.scala    |  3 +-
 7 files changed, 43 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/67b19536/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
index 2ee76dc..5c6a474 100644
--- a/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
+++ b/samza-core/src/main/java/org/apache/samza/processor/StreamProcessor.java
@@ -33,7 +33,6 @@ import org.apache.samza.coordinator.JobCoordinatorFactory;
 import org.apache.samza.coordinator.JobCoordinatorListener;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.JobModel;
-import org.apache.samza.metrics.JmxServer;
 import org.apache.samza.metrics.MetricsReporter;
 import org.apache.samza.task.AsyncStreamTaskFactory;
 import org.apache.samza.task.StreamTaskFactory;
@@ -199,12 +198,11 @@ public class StreamProcessor {
 
   }
 
-  SamzaContainer createSamzaContainer(ContainerModel containerModel, int maxChangelogStreamPartitions, JmxServer jmxServer) {
+  SamzaContainer createSamzaContainer(ContainerModel containerModel, int maxChangelogStreamPartitions) {
     return SamzaContainer.apply(
         containerModel,
         config,
         maxChangelogStreamPartitions,
-        jmxServer,
         Util.<String, MetricsReporter>javaMapAsScalaMap(customMetricsReporter),
         taskFactory);
   }
@@ -297,8 +295,7 @@ public class StreamProcessor {
 
           container = createSamzaContainer(
               jobModel.getContainers().get(processorId),
-              jobModel.maxChangeLogStreamPartitions,
-              new JmxServer());
+              jobModel.maxChangeLogStreamPartitions);
           container.setContainerListener(containerListener);
           LOGGER.info("Starting container " + container.toString());
           executorService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()

http://git-wip-us.apache.org/repos/asf/samza/blob/67b19536/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
index 920cc3d..e02ee23 100644
--- a/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
+++ b/samza-core/src/main/java/org/apache/samza/runtime/LocalContainerRunner.java
@@ -28,12 +28,11 @@ import org.apache.samza.config.ShellCommandConfig;
 import org.apache.samza.container.SamzaContainer;
 import org.apache.samza.container.SamzaContainer$;
 import org.apache.samza.container.SamzaContainerExceptionHandler;
+import org.apache.samza.container.SamzaContainerListener;
 import org.apache.samza.job.ApplicationStatus;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.JobModel;
-import org.apache.samza.metrics.JmxServer;
 import org.apache.samza.metrics.MetricsReporter;
-import org.apache.samza.container.SamzaContainerListener;
 import org.apache.samza.task.TaskFactoryUtil;
 import org.apache.samza.util.ScalaToJavaUtils;
 import org.apache.samza.util.Util;
@@ -66,44 +65,36 @@ public class LocalContainerRunner extends AbstractApplicationRunner {
 
   @Override
   public void run(StreamApplication streamApp) {
-    JmxServer jmxServer = null;
-    try {
-      jmxServer = new JmxServer();
-      ContainerModel containerModel = jobModel.getContainers().get(containerId);
-      Object taskFactory = TaskFactoryUtil.createTaskFactory(config, streamApp, this);
+    ContainerModel containerModel = jobModel.getContainers().get(containerId);
+    Object taskFactory = TaskFactoryUtil.createTaskFactory(config, streamApp, this);
 
-      SamzaContainer container = SamzaContainer$.MODULE$.apply(
-          containerModel,
-          config,
-          jobModel.maxChangeLogStreamPartitions,
-          jmxServer,
-          Util.<String, MetricsReporter>javaMapAsScalaMap(new HashMap<>()),
-          taskFactory);
-      container.setContainerListener(
-          new SamzaContainerListener() {
-            @Override
-            public void onContainerStart() {
-              log.info("Container Started");
-            }
+    SamzaContainer container = SamzaContainer$.MODULE$.apply(
+        containerModel,
+        config,
+        jobModel.maxChangeLogStreamPartitions,
+        Util.<String, MetricsReporter>javaMapAsScalaMap(new HashMap<>()),
+        taskFactory);
+    container.setContainerListener(
+        new SamzaContainerListener() {
+          @Override
+          public void onContainerStart() {
+            log.info("Container Started");
+          }
 
-            @Override
-            public void onContainerStop(boolean invokedExternally) {
-              log.info("Container Stopped");
-            }
+          @Override
+          public void onContainerStop(boolean invokedExternally) {
+            log.info("Container Stopped");
+          }
 
-            @Override
-            public void onContainerFailed(Throwable t) {
-              log.info("Container Failed");
-              containerException = t;
-            }
-          });
+          @Override
+          public void onContainerFailed(Throwable t) {
+            log.info("Container Failed");
+            containerException = t;
+          }
+        });
+
+    container.run();
 
-      container.run();
-    } finally {
-      if (jmxServer != null) {
-        jmxServer.stop();
-      }
-    }
     if (containerException != null) {
       log.error("Container stopped with Exception. Exiting process now.", containerException);
       System.exit(1);

http://git-wip-us.apache.org/repos/asf/samza/blob/67b19536/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index c7b2b7c..957cd2b 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -110,7 +110,6 @@ object SamzaContainer extends Logging {
     containerModel: ContainerModel,
     config: Config,
     maxChangeLogStreamPartitions: Int,
-    jmxServer: JmxServer,
     customReporters: Map[String, MetricsReporter] = Map[String, MetricsReporter](),
     taskFactory: Object) = {
     val containerId = containerModel.getProcessorId()
@@ -601,7 +600,6 @@ object SamzaContainer extends Logging {
       metrics = samzaContainerMetrics,
       reporters = reporters,
       jvm = jvm,
-      jmxServer = jmxServer,
       diskSpaceMonitor = diskSpaceMonitor,
       hostStatisticsMonitor = memoryStatisticsMonitor,
       taskThreadPool = taskThreadPool)
@@ -615,7 +613,6 @@ class SamzaContainer(
   consumerMultiplexer: SystemConsumers,
   producerMultiplexer: SystemProducers,
   metrics: SamzaContainerMetrics,
-  jmxServer: JmxServer,
   diskSpaceMonitor: DiskSpaceMonitor = null,
   hostStatisticsMonitor: SystemStatisticsMonitor = null,
   offsetManager: OffsetManager = new OffsetManager,
@@ -627,6 +624,7 @@ class SamzaContainer(
 
   val shutdownMs = containerContext.config.getShutdownMs.getOrElse(5000L)
   var shutdownHookThread: Thread = null
+  var jmxServer: JmxServer = null
 
   @volatile private var status = SamzaContainerStatus.NOT_STARTED
   private var exceptionSeen: Throwable = null
@@ -645,6 +643,8 @@ class SamzaContainer(
 
       status = SamzaContainerStatus.STARTING
 
+      jmxServer = new JmxServer()
+
       startMetrics
       startOffsetManager
       startLocalityManager
@@ -673,10 +673,13 @@ class SamzaContainer(
         status = SamzaContainerStatus.FAILED
         exceptionSeen = e
     }
+
     try {
       info("Shutting down.")
       removeShutdownHook
 
+      jmxServer.stop
+
       shutdownConsumers
       shutdownTask
       shutdownStores

http://git-wip-us.apache.org/repos/asf/samza/blob/67b19536/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
index cb36863..b8522b9 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
@@ -70,7 +70,6 @@ class ThreadJobFactory extends StreamJobFactory with Logging {
         containerModel,
         config,
         jobModel.maxChangeLogStreamPartitions,
-        jmxServer,
         Map[String, MetricsReporter](),
         taskFactory)
       container.setContainerListener(containerListener)

http://git-wip-us.apache.org/repos/asf/samza/blob/67b19536/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java b/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java
index 4a654dc..7aadd28 100644
--- a/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java
+++ b/samza-core/src/test/java/org/apache/samza/processor/TestStreamProcessor.java
@@ -26,7 +26,6 @@ import org.apache.samza.container.SamzaContainer;
 import org.apache.samza.coordinator.JobCoordinator;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.JobModel;
-import org.apache.samza.metrics.JmxServer;
 import org.apache.samza.metrics.MetricsReporter;
 import org.apache.samza.task.StreamTask;
 import org.apache.samza.task.StreamTaskFactory;
@@ -62,8 +61,7 @@ public class TestStreamProcessor {
     @Override
     SamzaContainer createSamzaContainer(
         ContainerModel containerModel,
-        int maxChangelogStreamPartitions,
-        JmxServer jmxServer) {
+        int maxChangelogStreamPartitions) {
       RunLoop mockRunLoop = mock(RunLoop.class);
       doAnswer(invocation ->
         {

http://git-wip-us.apache.org/repos/asf/samza/blob/67b19536/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
index bc4c47c..e03498c 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
@@ -191,8 +191,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
       runLoop = runLoop,
       consumerMultiplexer = consumerMultiplexer,
       producerMultiplexer = producerMultiplexer,
-      metrics = new SamzaContainerMetrics,
-      jmxServer = null)
+      metrics = new SamzaContainerMetrics)
 
     val containerListener = new SamzaContainerListener {
       override def onContainerFailed(t: Throwable): Unit = {
@@ -272,8 +271,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
       runLoop = mockRunLoop,
       consumerMultiplexer = consumerMultiplexer,
       producerMultiplexer = producerMultiplexer,
-      metrics = new SamzaContainerMetrics,
-      jmxServer = null)
+      metrics = new SamzaContainerMetrics)
     val containerListener = new SamzaContainerListener {
       override def onContainerFailed(t: Throwable): Unit = {
         onContainerFailedCalled = true
@@ -354,8 +352,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
       runLoop = runLoop,
       consumerMultiplexer = consumerMultiplexer,
       producerMultiplexer = producerMultiplexer,
-      metrics = new SamzaContainerMetrics,
-      jmxServer = null)
+      metrics = new SamzaContainerMetrics)
     val containerListener = new SamzaContainerListener {
       override def onContainerFailed(t: Throwable): Unit = {
         onContainerFailedCalled = true
@@ -437,8 +434,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
       runLoop = mockRunLoop,
       consumerMultiplexer = consumerMultiplexer,
       producerMultiplexer = producerMultiplexer,
-      metrics = new SamzaContainerMetrics,
-      jmxServer = null)
+      metrics = new SamzaContainerMetrics)
       val containerListener = new SamzaContainerListener {
         override def onContainerFailed(t: Throwable): Unit = {
           onContainerFailedCalled = true
@@ -514,8 +510,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
       runLoop = mockRunLoop,
       consumerMultiplexer = consumerMultiplexer,
       producerMultiplexer = producerMultiplexer,
-      metrics = new SamzaContainerMetrics,
-      jmxServer = null)
+      metrics = new SamzaContainerMetrics)
 
     val containerListener = new SamzaContainerListener {
         override def onContainerFailed(t: Throwable): Unit = {
@@ -582,8 +577,7 @@ class TestSamzaContainer extends AssertionsForJUnit with MockitoSugar {
       runLoop = null,
       consumerMultiplexer = consumerMultiplexer,
       producerMultiplexer = producerMultiplexer,
-      metrics = containerMetrics,
-      jmxServer = null)
+      metrics = containerMetrics)
 
     container.startStores
     assertNotNull(containerMetrics.taskStoreRestorationMetrics)

http://git-wip-us.apache.org/repos/asf/samza/blob/67b19536/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala b/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
index f5a8ee5..f437bfc 100644
--- a/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
+++ b/samza-core/src/test/scala/org/apache/samza/processor/StreamProcessorTestUtils.scala
@@ -57,8 +57,7 @@ object StreamProcessorTestUtils {
       runLoop = mockRunloop,
       consumerMultiplexer = consumerMultiplexer,
       producerMultiplexer = producerMultiplexer,
-      metrics = new SamzaContainerMetrics,
-      jmxServer = null)
+      metrics = new SamzaContainerMetrics)
     if (containerListener != null) {
       container.setContainerListener(containerListener)
     }