You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ya...@apache.org on 2015/07/16 07:43:11 UTC

samza git commit: SAMZA-670: added JMX access information to the dashboard

Repository: samza
Updated Branches:
  refs/heads/master 9709d9d46 -> 4f7bbb054


SAMZA-670: added JMX access information to the dashboard


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/4f7bbb05
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/4f7bbb05
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/4f7bbb05

Branch: refs/heads/master
Commit: 4f7bbb054ac7c5f6bb5528189d49f0b4942faa7e
Parents: 9709d9d
Author: József Márton Jung <j....@levi9.com>
Authored: Wed Jul 15 22:38:12 2015 -0700
Committer: Yan Fang <ya...@gmail.com>
Committed: Wed Jul 15 22:38:12 2015 -0700

----------------------------------------------------------------------
 checkstyle/import-control.xml                   |  1 +
 .../apache/samza/container/LocalityManager.java | 34 ++++++++++++--------
 .../stream/CoordinatorStreamMessage.java        | 21 +++++++++---
 .../org/apache/samza/job/model/JobModel.java    | 34 +++++++++++++++++---
 .../apache/samza/container/SamzaContainer.scala | 15 +++++----
 .../samza/coordinator/JobCoordinator.scala      | 17 ++--------
 .../samza/job/local/ThreadJobFactory.scala      |  4 +--
 .../org/apache/samza/metrics/JmxServer.scala    |  2 ++
 .../samza/container/TestSamzaContainer.scala    |  3 +-
 .../resources/scalate/WEB-INF/views/index.scaml | 11 +++++++
 .../apache/samza/job/yarn/SamzaAppMaster.scala  |  5 +++
 .../samza/job/yarn/SamzaAppMasterState.scala    |  8 ++---
 12 files changed, 106 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/4f7bbb05/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 3374f0c..de835c7 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -55,6 +55,7 @@
             <allow class="org.apache.samza.Partition" />
             <allow class="org.apache.samza.container.TaskName" />
             <allow class="org.apache.samza.system.SystemStreamPartition" />
+            <allow class="org.apache.samza.container.LocalityManager" />
         </subpackage>
     </subpackage>
 

http://git-wip-us.apache.org/repos/asf/samza/blob/4f7bbb05/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java b/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
index e661e12..55c258f 100644
--- a/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
+++ b/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
@@ -38,13 +38,13 @@ public class LocalityManager {
   private final CoordinatorStreamSystemConsumer coordinatorStreamConsumer;
   private final CoordinatorStreamSystemProducer coordinatorStreamProducer;
   private static final String SOURCE = "SamzaContainer-";
-  private Map<Integer, String> containerToHostMapping;
+  private Map<Integer, Map<String, String>> containerToHostMapping;
 
   public LocalityManager(CoordinatorStreamSystemProducer coordinatorStreamProducer,
                          CoordinatorStreamSystemConsumer coordinatorStreamConsumer) {
     this.coordinatorStreamConsumer = coordinatorStreamConsumer;
     this.coordinatorStreamProducer = coordinatorStreamProducer;
-    this.containerToHostMapping = new HashMap<Integer, String>();
+    this.containerToHostMapping = new HashMap<>();
   }
 
   public void start() {
@@ -65,26 +65,34 @@ public class LocalityManager {
     coordinatorStreamProducer.register(LocalityManager.SOURCE + sourceSuffix);
   }
 
-  public Map<Integer, String> readContainerLocality() {
-    Map<Integer, String> allMappings = new HashMap<Integer, String>();
+  public Map<Integer, Map<String, String>> readContainerLocality() {
+    Map<Integer, Map<String, String>> allMappings = new HashMap<>();
     for (CoordinatorStreamMessage message: coordinatorStreamConsumer.getBootstrappedStream(SetContainerHostMapping.TYPE)) {
       SetContainerHostMapping mapping = new SetContainerHostMapping(message);
-      allMappings.put(Integer.parseInt(mapping.getKey()), mapping.getHostLocality());
+      Map<String, String> localityMappings = new HashMap<>();
+      localityMappings.put(SetContainerHostMapping.IP_KEY, mapping.getHostLocality());
+      localityMappings.put(SetContainerHostMapping.JMX_URL_KEY, mapping.getJmxUrl());
+      localityMappings.put(SetContainerHostMapping.JMX_TUNNELING_URL_KEY, mapping.getJmxTunnelingUrl());
+      log.info(String.format("Read locality for container %s: %s", mapping.getKey(), localityMappings));
+      allMappings.put(Integer.parseInt(mapping.getKey()), localityMappings);
     }
     containerToHostMapping = Collections.unmodifiableMap(allMappings);
     return allMappings;
   }
 
-
-  public void writeContainerToHostMapping(Integer containerId, String hostHttpAddress) {
-    String existingMapping = containerToHostMapping.get(containerId);
-    if (existingMapping != null && !existingMapping.equals(hostHttpAddress)) {
-      log.info("Container {} moved from {} to {}", new Object[]{containerId, existingMapping, hostHttpAddress});
+  public void writeContainerToHostMapping(Integer containerId, String hostHttpAddress, String jmxAddress, String jmxTunnelingAddress) {
+    Map<String, String> existingMappings = containerToHostMapping.get(containerId);
+    String existingIpMapping = existingMappings != null ? existingMappings.get(SetContainerHostMapping.IP_KEY) : null;
+    if (existingIpMapping != null && !existingIpMapping.equals(hostHttpAddress)) {
+      log.info("Container {} moved from {} to {}", new Object[]{containerId, existingIpMapping, hostHttpAddress});
     } else {
       log.info("Container {} started at {}", containerId, hostHttpAddress);
     }
-    coordinatorStreamProducer.send(new SetContainerHostMapping(SOURCE + containerId, String.valueOf(containerId), hostHttpAddress));
-    containerToHostMapping.put(containerId, hostHttpAddress);
+    coordinatorStreamProducer.send(new SetContainerHostMapping(SOURCE + containerId, String.valueOf(containerId), hostHttpAddress, jmxAddress, jmxTunnelingAddress));
+    Map<String, String> mappings = new HashMap<>();
+    mappings.put(SetContainerHostMapping.IP_KEY, hostHttpAddress);
+    mappings.put(SetContainerHostMapping.JMX_URL_KEY, jmxAddress);
+    mappings.put(SetContainerHostMapping.JMX_TUNNELING_URL_KEY, jmxTunnelingAddress);
+    containerToHostMapping.put(containerId, mappings);
   }
-
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/4f7bbb05/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
index 6c1e488..6bd1bd3 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
@@ -485,24 +485,29 @@ public class CoordinatorStreamMessage {
    *     Source: "SamzaContainer-$ContainerId"
    *     MessageMap:
    *     {
-   *         ip: InetAddressString
+   *         ip: InetAddressString,
+   *         jmx-url: jmxAddressString
+   *         jmx-tunneling-url: jmxTunnelingAddressString
    *     }
    * }
    * */
   public static class SetContainerHostMapping extends CoordinatorStreamMessage {
     public static final String TYPE = "set-container-host-assignment";
-    private static final String IP_KEY = "ip";
+    public static final String IP_KEY = "ip";
+    public static final String JMX_URL_KEY = "jmx-url";
+    public static final String JMX_TUNNELING_URL_KEY = "jmx-tunneling-url";
 
     public SetContainerHostMapping(CoordinatorStreamMessage message) {
       super(message.getKeyArray(), message.getMessageMap());
     }
 
-    public SetContainerHostMapping(String source, String key, String hostHttpAddress) {
+    public SetContainerHostMapping(String source, String key, String hostHttpAddress, String jmxAddress, String jmxTunnelingAddress) {
       super(source);
       setType(TYPE);
       setKey(key);
       putMessageValue(IP_KEY, hostHttpAddress);
-
+      putMessageValue(JMX_URL_KEY, jmxAddress);
+      putMessageValue(JMX_TUNNELING_URL_KEY, jmxTunnelingAddress);
     }
 
     public String getHostLocality() {
@@ -510,5 +515,13 @@ public class CoordinatorStreamMessage {
 
     }
 
+    public String getJmxUrl() {
+      return getMessageValue(JMX_URL_KEY);
+    }
+
+    public String getJmxTunnelingUrl() {
+      return getMessageValue(JMX_TUNNELING_URL_KEY);
+    }
+
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/4f7bbb05/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java b/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
index 95a2ce5..ad6387d 100644
--- a/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
+++ b/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
@@ -20,9 +20,9 @@
 package org.apache.samza.job.model;
 
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Map;
 import org.apache.samza.config.Config;
+import org.apache.samza.container.LocalityManager;
 
 /**
  * <p>
@@ -37,20 +37,22 @@ import org.apache.samza.config.Config;
  * </p>
  */
 public class JobModel {
+  private static final String EMPTY_STRING = "";
   private final Config config;
   private final Map<Integer, ContainerModel> containers;
-  private final Map<Integer, String> containerToHostMapping;
+
+  private final LocalityManager localityManager;
 
   public int maxChangeLogStreamPartitions;
 
   public JobModel(Config config, Map<Integer, ContainerModel> containers) {
-    this(config, containers, new HashMap<Integer, String>());
+    this(config, containers, null);
   }
 
-  public JobModel(Config config, Map<Integer, ContainerModel> containers, Map<Integer, String> containerToHostMapping) {
+  public JobModel(Config config, Map<Integer, ContainerModel> containers, LocalityManager localityManager) {
     this.config = config;
     this.containers = Collections.unmodifiableMap(containers);
-    this.containerToHostMapping = Collections.unmodifiableMap(containerToHostMapping);
+    this.localityManager = localityManager;
 
     // Compute the number of change log stream partitions as the maximum partition-id
     // of all total number of tasks of the job; Increment by 1 because partition ids
@@ -68,6 +70,28 @@ public class JobModel {
     return config;
   }
 
+  /**
+   * Returns the container to host mapping for a given container ID and mapping key
+   *
+   * @param containerId the ID of the container
+   * @param key mapping key which is one of the keys declared in {@link org.apache.samza.coordinator.stream.CoordinatorStreamMessage.SetContainerHostMapping}
+   * @return the value if it exists for a given container and key, otherwise an empty string
+   */
+  public String getContainerToHostValue(Integer containerId, String key) {
+    if (localityManager == null) {
+      return EMPTY_STRING;
+    }
+    final Map<String, String> mappings = localityManager.readContainerLocality().get(containerId);
+    if (mappings == null) {
+      return EMPTY_STRING;
+    }
+    if (!mappings.containsKey(key)) {
+      return EMPTY_STRING;
+    }
+    return mappings.get(key);
+  }
+
+
   public Map<Integer, ContainerModel> getContainers() {
     return containers;
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/4f7bbb05/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index cbacd18..27b2517 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -92,7 +92,7 @@ object SamzaContainer extends Logging {
 
     try {
       jmxServer = newJmxServer()
-      SamzaContainer(containerModel, jobModel).run
+      SamzaContainer(containerModel, jobModel, jmxServer).run
     } finally {
       if (jmxServer != null) {
         jmxServer.stop
@@ -133,7 +133,7 @@ object SamzaContainer extends Logging {
     serde
   }
 
-  def apply(containerModel: ContainerModel, jobModel: JobModel) = {
+  def apply(containerModel: ContainerModel, jobModel: JobModel, jmxServer: JmxServer) = {
     val config = jobModel.getConfig
     val containerId = containerModel.getContainerId
     val containerName = "samza-container-%s" format containerId
@@ -407,7 +407,6 @@ object SamzaContainer extends Logging {
       .values
       .map(_.getTaskName)
       .toSet
-
     val containerContext = new SamzaContainerContext(containerId, config, taskNames)
 
     val taskInstances: Map[TaskName, TaskInstance] = containerModel.getTasks.values.map(taskModel => {
@@ -541,7 +540,8 @@ object SamzaContainer extends Logging {
       localityManager = localityManager,
       metrics = samzaContainerMetrics,
       reporters = reporters,
-      jvm = jvm)
+      jvm = jvm,
+      jmxServer = jmxServer)
   }
 }
 
@@ -552,6 +552,7 @@ class SamzaContainer(
   consumerMultiplexer: SystemConsumers,
   producerMultiplexer: SystemProducers,
   metrics: SamzaContainerMetrics,
+  jmxServer: JmxServer,
   offsetManager: OffsetManager = new OffsetManager,
   localityManager: LocalityManager = null,
   reporters: Map[String, MetricsReporter] = Map(),
@@ -625,10 +626,12 @@ class SamzaContainer(
       localityManager.start
       localityManager.register(String.valueOf(containerContext.id))
 
-      info("Writing container locality to Coordinator Stream")
+      info("Writing container locality and JMX address to Coordinator Stream")
       try {
         val hostInetAddress = InetAddress.getLocalHost.getHostAddress
-        localityManager.writeContainerToHostMapping(containerContext.id, hostInetAddress)
+        val jmxUrl = if (jmxServer != null) jmxServer.getJmxUrl else ""
+        val jmxTunnelingUrl = if (jmxServer != null) jmxServer.getTunnelingJmxUrl else ""
+        localityManager.writeContainerToHostMapping(containerContext.id, hostInetAddress, jmxUrl, jmxTunnelingUrl)
       } catch {
         case uhe: UnknownHostException =>
           warn("Received UnknownHostException when persisting locality info for container %d: %s" format (containerContext.id, uhe.getMessage))  //No-op

http://git-wip-us.apache.org/repos/asf/samza/blob/4f7bbb05/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
index 73c58a7..f621611 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
@@ -21,7 +21,7 @@ package org.apache.samza.coordinator
 
 
 import org.apache.samza.config.Config
-import org.apache.samza.job.model.{ContainerModel, JobModel, TaskModel}
+import org.apache.samza.job.model.{JobModel, TaskModel}
 import org.apache.samza.SamzaException
 import org.apache.samza.container.grouper.task.GroupByContainerCount
 import org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouperFactory
@@ -42,7 +42,7 @@ import org.apache.samza.coordinator.server.HttpServer
 import org.apache.samza.checkpoint.{Checkpoint, CheckpointManager}
 import org.apache.samza.coordinator.server.JobServlet
 import org.apache.samza.config.SystemConfig.Config2System
-import org.apache.samza.coordinator.stream.{CoordinatorStreamSystemConsumer, CoordinatorStreamSystemProducer, CoordinatorStreamMessage, CoordinatorStreamSystemFactory}
+import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory
 import org.apache.samza.config.ConfigRewriter
 
 /**
@@ -67,8 +67,6 @@ object JobCoordinator extends Logging {
     coordinatorSystemConsumer.start
     debug("Bootstrapping coordinator system stream.")
     coordinatorSystemConsumer.bootstrap
-    debug("Stopping coordinator system stream.")
-    coordinatorSystemConsumer.stop
     val config = coordinatorSystemConsumer.getConfig
     info("Got config: %s" format config)
     val checkpointManager = new CheckpointManager(coordinatorSystemProducer, coordinatorSystemConsumer, "Job-coordinator")
@@ -276,16 +274,7 @@ object JobCoordinator extends Logging {
       val containerModels = containerGrouper.group(taskModels).map
               { case (containerModel) => Integer.valueOf(containerModel.getContainerId) -> containerModel }.toMap
 
-      val containerLocality = if(localityManager != null) {
-        localityManager.readContainerLocality()
-      } else {
-        new util.HashMap[Integer, String]()
-      }
-
-      containerLocality.foreach{case (container: Integer, location: String) =>
-        info("Container id %d  -->  %s" format (container.intValue(), location))
-      }
-      new JobModel(config, containerModels, containerLocality)
+      new JobModel(config, containerModels, localityManager)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/4f7bbb05/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
index 3f2f70e..5acfe87 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
@@ -20,7 +20,7 @@
 package org.apache.samza.job.local
 
 
-import org.apache.samza.metrics.MetricsRegistryMap
+import org.apache.samza.metrics.{JmxServer, MetricsRegistryMap}
 import org.apache.samza.util.Logging
 import org.apache.samza.SamzaException
 import org.apache.samza.config.Config
@@ -48,7 +48,7 @@ class ThreadJobFactory extends StreamJobFactory with Logging {
 
     try {
       coordinator.start
-      new ThreadJob(SamzaContainer(containerModel, coordinator.jobModel))
+      new ThreadJob(SamzaContainer(containerModel, coordinator.jobModel, new JmxServer))
     } finally {
       coordinator.stop
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/4f7bbb05/samza-core/src/main/scala/org/apache/samza/metrics/JmxServer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/JmxServer.scala b/samza-core/src/main/scala/org/apache/samza/metrics/JmxServer.scala
index f343faf..de45123 100644
--- a/samza-core/src/main/scala/org/apache/samza/metrics/JmxServer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/metrics/JmxServer.scala
@@ -115,4 +115,6 @@ class JmxServer(requestedPort: Int) extends Logging {
   def stop = jmxServer.stop
 
   override def toString = "JmxServer registry port=%d server port=%d url=%s" format (getRegistryPort, getServerPort, getJmxUrl)
+
+  def getTunnelingJmxUrl = getJmxUrl.replaceAll("localhost", hostname)
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/4f7bbb05/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
index 9fb1aa9..84fdeaa 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
@@ -166,7 +166,8 @@ class TestSamzaContainer extends AssertionsForJUnit {
       runLoop = runLoop,
       consumerMultiplexer = consumerMultiplexer,
       producerMultiplexer = producerMultiplexer,
-      metrics = new SamzaContainerMetrics
+      metrics = new SamzaContainerMetrics,
+      jmxServer = null
     )
     try {
       container.run

http://git-wip-us.apache.org/repos/asf/samza/blob/4f7bbb05/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml b/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml
index cf0d2fc..41303f7 100644
--- a/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml
+++ b/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml
@@ -89,6 +89,12 @@
             %td.key Application master container
             %td
               %a(target="_blank" href="http://#{state.nodeHost}:#{state.nodeHttpPort.toString}/node/containerlogs/#{state.containerId.toString}/#{username}")= state.containerId.toString
+          %tr
+            %td.key JMX server url
+            %td= state.jmxUrl
+          %tr
+            %td.key JMX server tunneling url
+            %td= state.jmxTunnelingUrl
 
     %div.tab-pane#containers
       %h2 Containers
@@ -116,6 +122,7 @@
             %th Node
             %th Start Time
             %th Up Time
+            %th JMX access
         %tbody
           - for((containerId, container) <- state.runningContainers)
             %tr
@@ -128,6 +135,10 @@
                 Start time: #{container.startTimeStr()}
               %td
                 Up time: #{container.upTimeStr()}
+              %td
+                Ordinary: #{state.jobCoordinator.jobModel.getContainerToHostValue(containerId, org.apache.samza.coordinator.stream.CoordinatorStreamMessage.SetContainerHostMapping.JMX_URL_KEY)}
+                Tunneling: #{state.jobCoordinator.jobModel.getContainerToHostValue(containerId, org.apache.samza.coordinator.stream.CoordinatorStreamMessage.SetContainerHostMapping.JMX_TUNNELING_URL_KEY)}
+
 
     %div.tab-pane#task-groups
       %h2 Task Groups

http://git-wip-us.apache.org/repos/asf/samza/blob/4f7bbb05/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala
index 20aa373..af42c6a 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala
@@ -91,6 +91,11 @@ object SamzaAppMaster extends Logging with AMRMClientAsync.CallbackHandler {
     try {
       // wire up all of the yarn event listeners
       val state = new SamzaAppMasterState(jobCoordinator, -1, containerId, nodeHostString, nodePortString.toInt, nodeHttpPortString.toInt)
+      if (jmxServer.isDefined) {
+        state.jmxUrl = jmxServer.get.getJmxUrl
+        state.jmxTunnelingUrl = jmxServer.get.getTunnelingJmxUrl
+      }
+
       val service = new SamzaAppMasterService(config, state, registry, clientHelper)
       val lifecycle = new SamzaAppMasterLifecycle(containerMem, containerCpu, state, amClient)
       val metrics = new SamzaAppMasterMetrics(config, state, registry)

http://git-wip-us.apache.org/repos/asf/samza/blob/4f7bbb05/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala
index 1445605..f667c83 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala
@@ -21,11 +21,7 @@ package org.apache.samza.job.yarn
 import org.apache.samza.util.Logging
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus
 import org.apache.hadoop.yarn.api.records.ContainerId
-import java.util
-import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.container.TaskName
 import java.net.URL
-import org.apache.samza.job.model.JobModel
 import org.apache.samza.coordinator.JobCoordinator
 
 /**
@@ -53,4 +49,8 @@ class SamzaAppMasterState(val jobCoordinator: JobCoordinator, val taskId: Int, v
 
   // controlled on startup
   var appAttemptId = containerId.getApplicationAttemptId
+
+  // JMX address
+  var jmxUrl = ""
+  var jmxTunnelingUrl = ""
 }