You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by ya...@apache.org on 2015/07/16 07:43:11 UTC
samza git commit: SAMZA-670: added JMX access information to the
dashboard
Repository: samza
Updated Branches:
refs/heads/master 9709d9d46 -> 4f7bbb054
SAMZA-670: added JMX access information to the dashboard
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/4f7bbb05
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/4f7bbb05
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/4f7bbb05
Branch: refs/heads/master
Commit: 4f7bbb054ac7c5f6bb5528189d49f0b4942faa7e
Parents: 9709d9d
Author: József Márton Jung <j....@levi9.com>
Authored: Wed Jul 15 22:38:12 2015 -0700
Committer: Yan Fang <ya...@gmail.com>
Committed: Wed Jul 15 22:38:12 2015 -0700
----------------------------------------------------------------------
checkstyle/import-control.xml | 1 +
.../apache/samza/container/LocalityManager.java | 34 ++++++++++++--------
.../stream/CoordinatorStreamMessage.java | 21 +++++++++---
.../org/apache/samza/job/model/JobModel.java | 34 +++++++++++++++++---
.../apache/samza/container/SamzaContainer.scala | 15 +++++----
.../samza/coordinator/JobCoordinator.scala | 17 ++--------
.../samza/job/local/ThreadJobFactory.scala | 4 +--
.../org/apache/samza/metrics/JmxServer.scala | 2 ++
.../samza/container/TestSamzaContainer.scala | 3 +-
.../resources/scalate/WEB-INF/views/index.scaml | 11 +++++++
.../apache/samza/job/yarn/SamzaAppMaster.scala | 5 +++
.../samza/job/yarn/SamzaAppMasterState.scala | 8 ++---
12 files changed, 106 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/4f7bbb05/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 3374f0c..de835c7 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -55,6 +55,7 @@
<allow class="org.apache.samza.Partition" />
<allow class="org.apache.samza.container.TaskName" />
<allow class="org.apache.samza.system.SystemStreamPartition" />
+ <allow class="org.apache.samza.container.LocalityManager" />
</subpackage>
</subpackage>
http://git-wip-us.apache.org/repos/asf/samza/blob/4f7bbb05/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java b/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
index e661e12..55c258f 100644
--- a/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
+++ b/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
@@ -38,13 +38,13 @@ public class LocalityManager {
private final CoordinatorStreamSystemConsumer coordinatorStreamConsumer;
private final CoordinatorStreamSystemProducer coordinatorStreamProducer;
private static final String SOURCE = "SamzaContainer-";
- private Map<Integer, String> containerToHostMapping;
+ private Map<Integer, Map<String, String>> containerToHostMapping;
public LocalityManager(CoordinatorStreamSystemProducer coordinatorStreamProducer,
CoordinatorStreamSystemConsumer coordinatorStreamConsumer) {
this.coordinatorStreamConsumer = coordinatorStreamConsumer;
this.coordinatorStreamProducer = coordinatorStreamProducer;
- this.containerToHostMapping = new HashMap<Integer, String>();
+ this.containerToHostMapping = new HashMap<>();
}
public void start() {
@@ -65,26 +65,34 @@ public class LocalityManager {
coordinatorStreamProducer.register(LocalityManager.SOURCE + sourceSuffix);
}
- public Map<Integer, String> readContainerLocality() {
- Map<Integer, String> allMappings = new HashMap<Integer, String>();
+ public Map<Integer, Map<String, String>> readContainerLocality() {
+ Map<Integer, Map<String, String>> allMappings = new HashMap<>();
for (CoordinatorStreamMessage message: coordinatorStreamConsumer.getBootstrappedStream(SetContainerHostMapping.TYPE)) {
SetContainerHostMapping mapping = new SetContainerHostMapping(message);
- allMappings.put(Integer.parseInt(mapping.getKey()), mapping.getHostLocality());
+ Map<String, String> localityMappings = new HashMap<>();
+ localityMappings.put(SetContainerHostMapping.IP_KEY, mapping.getHostLocality());
+ localityMappings.put(SetContainerHostMapping.JMX_URL_KEY, mapping.getJmxUrl());
+ localityMappings.put(SetContainerHostMapping.JMX_TUNNELING_URL_KEY, mapping.getJmxTunnelingUrl());
+ log.info(String.format("Read locality for container %s: %s", mapping.getKey(), localityMappings));
+ allMappings.put(Integer.parseInt(mapping.getKey()), localityMappings);
}
containerToHostMapping = Collections.unmodifiableMap(allMappings);
return allMappings;
}
-
- public void writeContainerToHostMapping(Integer containerId, String hostHttpAddress) {
- String existingMapping = containerToHostMapping.get(containerId);
- if (existingMapping != null && !existingMapping.equals(hostHttpAddress)) {
- log.info("Container {} moved from {} to {}", new Object[]{containerId, existingMapping, hostHttpAddress});
+ public void writeContainerToHostMapping(Integer containerId, String hostHttpAddress, String jmxAddress, String jmxTunnelingAddress) {
+ Map<String, String> existingMappings = containerToHostMapping.get(containerId);
+ String existingIpMapping = existingMappings != null ? existingMappings.get(SetContainerHostMapping.IP_KEY) : null;
+ if (existingIpMapping != null && !existingIpMapping.equals(hostHttpAddress)) {
+ log.info("Container {} moved from {} to {}", new Object[]{containerId, existingIpMapping, hostHttpAddress});
} else {
log.info("Container {} started at {}", containerId, hostHttpAddress);
}
- coordinatorStreamProducer.send(new SetContainerHostMapping(SOURCE + containerId, String.valueOf(containerId), hostHttpAddress));
- containerToHostMapping.put(containerId, hostHttpAddress);
+ coordinatorStreamProducer.send(new SetContainerHostMapping(SOURCE + containerId, String.valueOf(containerId), hostHttpAddress, jmxAddress, jmxTunnelingAddress));
+ Map<String, String> mappings = new HashMap<>();
+ mappings.put(SetContainerHostMapping.IP_KEY, hostHttpAddress);
+ mappings.put(SetContainerHostMapping.JMX_URL_KEY, jmxAddress);
+ mappings.put(SetContainerHostMapping.JMX_TUNNELING_URL_KEY, jmxTunnelingAddress);
+ containerToHostMapping.put(containerId, mappings);
}
-
}
http://git-wip-us.apache.org/repos/asf/samza/blob/4f7bbb05/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
index 6c1e488..6bd1bd3 100644
--- a/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
+++ b/samza-core/src/main/java/org/apache/samza/coordinator/stream/CoordinatorStreamMessage.java
@@ -485,24 +485,29 @@ public class CoordinatorStreamMessage {
* Source: "SamzaContainer-$ContainerId"
* MessageMap:
* {
- * ip: InetAddressString
+ * ip: InetAddressString,
+ * jmx-url: jmxAddressString
+ * jmx-tunneling-url: jmxTunnelingAddressString
* }
* }
* */
public static class SetContainerHostMapping extends CoordinatorStreamMessage {
public static final String TYPE = "set-container-host-assignment";
- private static final String IP_KEY = "ip";
+ public static final String IP_KEY = "ip";
+ public static final String JMX_URL_KEY = "jmx-url";
+ public static final String JMX_TUNNELING_URL_KEY = "jmx-tunneling-url";
public SetContainerHostMapping(CoordinatorStreamMessage message) {
super(message.getKeyArray(), message.getMessageMap());
}
- public SetContainerHostMapping(String source, String key, String hostHttpAddress) {
+ public SetContainerHostMapping(String source, String key, String hostHttpAddress, String jmxAddress, String jmxTunnelingAddress) {
super(source);
setType(TYPE);
setKey(key);
putMessageValue(IP_KEY, hostHttpAddress);
-
+ putMessageValue(JMX_URL_KEY, jmxAddress);
+ putMessageValue(JMX_TUNNELING_URL_KEY, jmxTunnelingAddress);
}
public String getHostLocality() {
@@ -510,5 +515,13 @@ public class CoordinatorStreamMessage {
}
+ public String getJmxUrl() {
+ return getMessageValue(JMX_URL_KEY);
+ }
+
+ public String getJmxTunnelingUrl() {
+ return getMessageValue(JMX_TUNNELING_URL_KEY);
+ }
+
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/4f7bbb05/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java b/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
index 95a2ce5..ad6387d 100644
--- a/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
+++ b/samza-core/src/main/java/org/apache/samza/job/model/JobModel.java
@@ -20,9 +20,9 @@
package org.apache.samza.job.model;
import java.util.Collections;
-import java.util.HashMap;
import java.util.Map;
import org.apache.samza.config.Config;
+import org.apache.samza.container.LocalityManager;
/**
* <p>
@@ -37,20 +37,22 @@ import org.apache.samza.config.Config;
* </p>
*/
public class JobModel {
+ private static final String EMPTY_STRING = "";
private final Config config;
private final Map<Integer, ContainerModel> containers;
- private final Map<Integer, String> containerToHostMapping;
+
+ private final LocalityManager localityManager;
public int maxChangeLogStreamPartitions;
public JobModel(Config config, Map<Integer, ContainerModel> containers) {
- this(config, containers, new HashMap<Integer, String>());
+ this(config, containers, null);
}
- public JobModel(Config config, Map<Integer, ContainerModel> containers, Map<Integer, String> containerToHostMapping) {
+ public JobModel(Config config, Map<Integer, ContainerModel> containers, LocalityManager localityManager) {
this.config = config;
this.containers = Collections.unmodifiableMap(containers);
- this.containerToHostMapping = Collections.unmodifiableMap(containerToHostMapping);
+ this.localityManager = localityManager;
// Compute the number of change log stream partitions as the maximum partition-id
// of all total number of tasks of the job; Increment by 1 because partition ids
@@ -68,6 +70,28 @@ public class JobModel {
return config;
}
+ /**
+ * Returns the container to host mapping for a given container ID and mapping key
+ *
+ * @param containerId the ID of the container
+ * @param key mapping key which is one of the keys declared in {@link org.apache.samza.coordinator.stream.CoordinatorStreamMessage.SetContainerHostMapping}
+ * @return the value if it exists for a given container and key, otherwise an empty string
+ */
+ public String getContainerToHostValue(Integer containerId, String key) {
+ if (localityManager == null) {
+ return EMPTY_STRING;
+ }
+ final Map<String, String> mappings = localityManager.readContainerLocality().get(containerId);
+ if (mappings == null) {
+ return EMPTY_STRING;
+ }
+ if (!mappings.containsKey(key)) {
+ return EMPTY_STRING;
+ }
+ return mappings.get(key);
+ }
+
+
public Map<Integer, ContainerModel> getContainers() {
return containers;
}
http://git-wip-us.apache.org/repos/asf/samza/blob/4f7bbb05/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index cbacd18..27b2517 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -92,7 +92,7 @@ object SamzaContainer extends Logging {
try {
jmxServer = newJmxServer()
- SamzaContainer(containerModel, jobModel).run
+ SamzaContainer(containerModel, jobModel, jmxServer).run
} finally {
if (jmxServer != null) {
jmxServer.stop
@@ -133,7 +133,7 @@ object SamzaContainer extends Logging {
serde
}
- def apply(containerModel: ContainerModel, jobModel: JobModel) = {
+ def apply(containerModel: ContainerModel, jobModel: JobModel, jmxServer: JmxServer) = {
val config = jobModel.getConfig
val containerId = containerModel.getContainerId
val containerName = "samza-container-%s" format containerId
@@ -407,7 +407,6 @@ object SamzaContainer extends Logging {
.values
.map(_.getTaskName)
.toSet
-
val containerContext = new SamzaContainerContext(containerId, config, taskNames)
val taskInstances: Map[TaskName, TaskInstance] = containerModel.getTasks.values.map(taskModel => {
@@ -541,7 +540,8 @@ object SamzaContainer extends Logging {
localityManager = localityManager,
metrics = samzaContainerMetrics,
reporters = reporters,
- jvm = jvm)
+ jvm = jvm,
+ jmxServer = jmxServer)
}
}
@@ -552,6 +552,7 @@ class SamzaContainer(
consumerMultiplexer: SystemConsumers,
producerMultiplexer: SystemProducers,
metrics: SamzaContainerMetrics,
+ jmxServer: JmxServer,
offsetManager: OffsetManager = new OffsetManager,
localityManager: LocalityManager = null,
reporters: Map[String, MetricsReporter] = Map(),
@@ -625,10 +626,12 @@ class SamzaContainer(
localityManager.start
localityManager.register(String.valueOf(containerContext.id))
- info("Writing container locality to Coordinator Stream")
+ info("Writing container locality and JMX address to Coordinator Stream")
try {
val hostInetAddress = InetAddress.getLocalHost.getHostAddress
- localityManager.writeContainerToHostMapping(containerContext.id, hostInetAddress)
+ val jmxUrl = if (jmxServer != null) jmxServer.getJmxUrl else ""
+ val jmxTunnelingUrl = if (jmxServer != null) jmxServer.getTunnelingJmxUrl else ""
+ localityManager.writeContainerToHostMapping(containerContext.id, hostInetAddress, jmxUrl, jmxTunnelingUrl)
} catch {
case uhe: UnknownHostException =>
warn("Received UnknownHostException when persisting locality info for container %d: %s" format (containerContext.id, uhe.getMessage)) //No-op
http://git-wip-us.apache.org/repos/asf/samza/blob/4f7bbb05/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
index 73c58a7..f621611 100644
--- a/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
+++ b/samza-core/src/main/scala/org/apache/samza/coordinator/JobCoordinator.scala
@@ -21,7 +21,7 @@ package org.apache.samza.coordinator
import org.apache.samza.config.Config
-import org.apache.samza.job.model.{ContainerModel, JobModel, TaskModel}
+import org.apache.samza.job.model.{JobModel, TaskModel}
import org.apache.samza.SamzaException
import org.apache.samza.container.grouper.task.GroupByContainerCount
import org.apache.samza.container.grouper.stream.SystemStreamPartitionGrouperFactory
@@ -42,7 +42,7 @@ import org.apache.samza.coordinator.server.HttpServer
import org.apache.samza.checkpoint.{Checkpoint, CheckpointManager}
import org.apache.samza.coordinator.server.JobServlet
import org.apache.samza.config.SystemConfig.Config2System
-import org.apache.samza.coordinator.stream.{CoordinatorStreamSystemConsumer, CoordinatorStreamSystemProducer, CoordinatorStreamMessage, CoordinatorStreamSystemFactory}
+import org.apache.samza.coordinator.stream.CoordinatorStreamSystemFactory
import org.apache.samza.config.ConfigRewriter
/**
@@ -67,8 +67,6 @@ object JobCoordinator extends Logging {
coordinatorSystemConsumer.start
debug("Bootstrapping coordinator system stream.")
coordinatorSystemConsumer.bootstrap
- debug("Stopping coordinator system stream.")
- coordinatorSystemConsumer.stop
val config = coordinatorSystemConsumer.getConfig
info("Got config: %s" format config)
val checkpointManager = new CheckpointManager(coordinatorSystemProducer, coordinatorSystemConsumer, "Job-coordinator")
@@ -276,16 +274,7 @@ object JobCoordinator extends Logging {
val containerModels = containerGrouper.group(taskModels).map
{ case (containerModel) => Integer.valueOf(containerModel.getContainerId) -> containerModel }.toMap
- val containerLocality = if(localityManager != null) {
- localityManager.readContainerLocality()
- } else {
- new util.HashMap[Integer, String]()
- }
-
- containerLocality.foreach{case (container: Integer, location: String) =>
- info("Container id %d --> %s" format (container.intValue(), location))
- }
- new JobModel(config, containerModels, containerLocality)
+ new JobModel(config, containerModels, localityManager)
}
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/4f7bbb05/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
index 3f2f70e..5acfe87 100644
--- a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
+++ b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala
@@ -20,7 +20,7 @@
package org.apache.samza.job.local
-import org.apache.samza.metrics.MetricsRegistryMap
+import org.apache.samza.metrics.{JmxServer, MetricsRegistryMap}
import org.apache.samza.util.Logging
import org.apache.samza.SamzaException
import org.apache.samza.config.Config
@@ -48,7 +48,7 @@ class ThreadJobFactory extends StreamJobFactory with Logging {
try {
coordinator.start
- new ThreadJob(SamzaContainer(containerModel, coordinator.jobModel))
+ new ThreadJob(SamzaContainer(containerModel, coordinator.jobModel, new JmxServer))
} finally {
coordinator.stop
}
http://git-wip-us.apache.org/repos/asf/samza/blob/4f7bbb05/samza-core/src/main/scala/org/apache/samza/metrics/JmxServer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/metrics/JmxServer.scala b/samza-core/src/main/scala/org/apache/samza/metrics/JmxServer.scala
index f343faf..de45123 100644
--- a/samza-core/src/main/scala/org/apache/samza/metrics/JmxServer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/metrics/JmxServer.scala
@@ -115,4 +115,6 @@ class JmxServer(requestedPort: Int) extends Logging {
def stop = jmxServer.stop
override def toString = "JmxServer registry port=%d server port=%d url=%s" format (getRegistryPort, getServerPort, getJmxUrl)
+
+ def getTunnelingJmxUrl = getJmxUrl.replaceAll("localhost", hostname)
}
http://git-wip-us.apache.org/repos/asf/samza/blob/4f7bbb05/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
index 9fb1aa9..84fdeaa 100644
--- a/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
+++ b/samza-core/src/test/scala/org/apache/samza/container/TestSamzaContainer.scala
@@ -166,7 +166,8 @@ class TestSamzaContainer extends AssertionsForJUnit {
runLoop = runLoop,
consumerMultiplexer = consumerMultiplexer,
producerMultiplexer = producerMultiplexer,
- metrics = new SamzaContainerMetrics
+ metrics = new SamzaContainerMetrics,
+ jmxServer = null
)
try {
container.run
http://git-wip-us.apache.org/repos/asf/samza/blob/4f7bbb05/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml b/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml
index cf0d2fc..41303f7 100644
--- a/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml
+++ b/samza-yarn/src/main/resources/scalate/WEB-INF/views/index.scaml
@@ -89,6 +89,12 @@
%td.key Application master container
%td
%a(target="_blank" href="http://#{state.nodeHost}:#{state.nodeHttpPort.toString}/node/containerlogs/#{state.containerId.toString}/#{username}")= state.containerId.toString
+ %tr
+ %td.key JMX server url
+ %td= state.jmxUrl
+ %tr
+ %td.key JMX server tunneling url
+ %td= state.jmxTunnelingUrl
%div.tab-pane#containers
%h2 Containers
@@ -116,6 +122,7 @@
%th Node
%th Start Time
%th Up Time
+ %th JMX access
%tbody
- for((containerId, container) <- state.runningContainers)
%tr
@@ -128,6 +135,10 @@
Start time: #{container.startTimeStr()}
%td
Up time: #{container.upTimeStr()}
+ %td
+ Ordinary: #{state.jobCoordinator.jobModel.getContainerToHostValue(containerId, org.apache.samza.coordinator.stream.CoordinatorStreamMessage.SetContainerHostMapping.JMX_URL_KEY)}
+ Tunneling: #{state.jobCoordinator.jobModel.getContainerToHostValue(containerId, org.apache.samza.coordinator.stream.CoordinatorStreamMessage.SetContainerHostMapping.JMX_TUNNELING_URL_KEY)}
+
%div.tab-pane#task-groups
%h2 Task Groups
http://git-wip-us.apache.org/repos/asf/samza/blob/4f7bbb05/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala
index 20aa373..af42c6a 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMaster.scala
@@ -91,6 +91,11 @@ object SamzaAppMaster extends Logging with AMRMClientAsync.CallbackHandler {
try {
// wire up all of the yarn event listeners
val state = new SamzaAppMasterState(jobCoordinator, -1, containerId, nodeHostString, nodePortString.toInt, nodeHttpPortString.toInt)
+ if (jmxServer.isDefined) {
+ state.jmxUrl = jmxServer.get.getJmxUrl
+ state.jmxTunnelingUrl = jmxServer.get.getTunnelingJmxUrl
+ }
+
val service = new SamzaAppMasterService(config, state, registry, clientHelper)
val lifecycle = new SamzaAppMasterLifecycle(containerMem, containerCpu, state, amClient)
val metrics = new SamzaAppMasterMetrics(config, state, registry)
http://git-wip-us.apache.org/repos/asf/samza/blob/4f7bbb05/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala
----------------------------------------------------------------------
diff --git a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala
index 1445605..f667c83 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/SamzaAppMasterState.scala
@@ -21,11 +21,7 @@ package org.apache.samza.job.yarn
import org.apache.samza.util.Logging
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus
import org.apache.hadoop.yarn.api.records.ContainerId
-import java.util
-import org.apache.samza.system.SystemStreamPartition
-import org.apache.samza.container.TaskName
import java.net.URL
-import org.apache.samza.job.model.JobModel
import org.apache.samza.coordinator.JobCoordinator
/**
@@ -53,4 +49,8 @@ class SamzaAppMasterState(val jobCoordinator: JobCoordinator, val taskId: Int, v
// controlled on startup
var appAttemptId = containerId.getApplicationAttemptId
+
+ // JMX address
+ var jmxUrl = ""
+ var jmxTunnelingUrl = ""
}