You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2017/06/26 23:35:38 UTC

samza git commit: SAMZA-1324 : Add a metricsreporter lifecycle for JobCoordinator component of StreamProcessor

Repository: samza
Updated Branches:
  refs/heads/master fde243475 -> 4072c9e85


SAMZA-1324 : Add a metricsreporter lifecycle for JobCoordinator component of StreamProcessor

Added a metrics class for ZK based job coordinator that reports a few metrics.

Author: PawasChhokra <Jaimatadi1$>
Author: Pawas Chhokra <pc...@pchhokra-mn1.linkedin.biz>
Author: PawasChhokra <pa...@gmail.com>

Reviewers: Navina Ramesh <na...@apache.org>

Closes #223 from PawasChhokra/ZkJobCoordinatorMetrics


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/4072c9e8
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/4072c9e8
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/4072c9e8

Branch: refs/heads/master
Commit: 4072c9e85c86212372eb527509ffa9c3eba97315
Parents: fde2434
Author: PawasChhokra <pa...@gmail.com>
Authored: Mon Jun 26 16:35:26 2017 -0700
Committer: navina <na...@apache.org>
Committed: Mon Jun 26 16:35:26 2017 -0700

----------------------------------------------------------------------
 .../versioned/container/metrics-table.html      | 45 +++++++++++
 .../apache/samza/container/LocalityManager.java |  1 +
 .../samza/zk/ZkBarrierForVersionUpgrade.java    | 10 +--
 .../org/apache/samza/zk/ZkJobCoordinator.java   | 46 +++++++++--
 .../samza/zk/ZkJobCoordinatorFactory.java       |  4 +-
 .../samza/zk/ZkJobCoordinatorMetrics.java       | 83 ++++++++++++++++++++
 .../main/java/org/apache/samza/zk/ZkUtils.java  | 30 +++++++
 .../apache/samza/container/SamzaContainer.scala |  2 +
 .../samza/container/SamzaContainerMetrics.scala |  1 +
 .../zk/TestZkBarrierForVersionUpgrade.java      |  5 +-
 .../apache/samza/zk/TestZkLeaderElector.java    |  3 +-
 .../java/org/apache/samza/zk/TestZkUtils.java   |  5 +-
 12 files changed, 218 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/4072c9e8/docs/learn/documentation/versioned/container/metrics-table.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/container/metrics-table.html b/docs/learn/documentation/versioned/container/metrics-table.html
index 2eb46e3..7fbbc40 100644
--- a/docs/learn/documentation/versioned/container/metrics-table.html
+++ b/docs/learn/documentation/versioned/container/metrics-table.html
@@ -142,6 +142,7 @@
     <li><a href="#bootstrapping-chooser-metrics">BootstrappingChooserMetrics</a></li>
     <li><a href="#hdfs-system-producer-metrics">HdfsSystemProducerMetrics</a></li>
     <li><a href="#elasticsearch-system-producer-metrics">ElasticsearchSystemProducerMetrics</a></li>
+    <li><a href="#zookeeper-job-coordinator-metrics">ZkJobCoordinatorMetrics</a></li>
 </ul>
 <p>Words highlighted like <span class="system">this</span> are placeholders for your own variable names defined in configuration file or system variables defined while starting the job.</p>
 <p id="average-time" style="color: #00a">Note: Average time is calculated for the current time window (set to 300 seconds)</p>
@@ -215,6 +216,10 @@
         <td>physical-memory-mb</td>
         <td>The physical memory used by the Samza container process (native + on heap) (in megabytes)</td>
     </tr>
+    <tr>
+        <td>container-startup-time</td>
+        <td><a href="#average-time">Average time</a> spent for the container to startup</td>
+    </tr>
 
     <tr>
         <th colspan="2" class="section" id="job-coordinator">job-coordinator</th>
@@ -887,6 +892,46 @@
         <td><span class="system">system</span>-version-conflicts</td>
         <td>Number of times the request could not be completed due to a conflict with the current state of the document</td>
     </tr>
+
+    <tr>
+        <th colspan="2" class="section" id="zookeeper-job-coordinator-metrics">org.apache.samza.zk.ZkJobCoordinatorMetrics</th>
+    </tr>
+    <tr>
+        <td>reads</td>
+        <td>Number of reads from Zookeeper</td>
+    </tr>
+    <tr>
+        <td>writes</td>
+        <td>Number of writes to Zookeeper</td>
+    </tr>
+    <tr>
+        <td>subscriptions</td>
+        <td>Number of subscriptions to znodes in Zookeeper</td>
+    </tr>
+    <tr>
+        <td>zk-connection-error</td>
+        <td>Number of Zookeeper connection errors</td>
+    </tr>
+    <tr>
+        <td>is-leader</td>
+        <td>Denotes if the processor is a leader or not</td>
+    </tr>
+    <tr>
+        <td>barrier-creation</td>
+        <td>Number of times a barrier was created by the leader</td>
+    </tr>
+    <tr>
+        <td>barrier-state-change</td>
+        <td>Number of times the barrier state changed</td>
+    </tr>
+    <tr>
+        <td>barrier-error</td>
+        <td>Number of times the barrier encountered an error while attaining consensus on the job model version</td>
+    </tr>
+    <tr>
+        <td>single-barrier-rebalancing-time</td>
+        <td><a href="#average-time">Average time</a> taken for all the processors to get the latest version of the job model after single processor change (without the occurence of a barrier timeout)</td>
+    </tr>
     </tbody>
 </table>
 </body>

http://git-wip-us.apache.org/repos/asf/samza/blob/4072c9e8/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java b/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
index 22380d3..bafebcc 100644
--- a/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
+++ b/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
@@ -54,6 +54,7 @@ public class LocalityManager extends AbstractCoordinatorStreamManager {
     this.taskAssignmentManager = new TaskAssignmentManager(coordinatorStreamProducer, coordinatorStreamConsumer);
   }
 
+
   /**
    * Special constructor that creates a write-only {@link LocalityManager} that only writes
    * to coordinator stream in {@link SamzaContainer}

http://git-wip-us.apache.org/repos/asf/samza/blob/4072c9e8/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java b/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
index 581387d..c1343b1 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
@@ -93,7 +93,7 @@ public class ZkBarrierForVersionUpgrade {
 
     // subscribe for participant's list changes
     LOG.info("Subscribing for child changes at " + barrierParticipantsPath);
-    zkUtils.getZkClient().subscribeChildChanges(barrierParticipantsPath, new ZkBarrierChangeHandler(version, participants));
+    zkUtils.subscribeChildChanges(barrierParticipantsPath, new ZkBarrierChangeHandler(version, participants));
 
     barrierListenerOptional.ifPresent(zkBarrierListener -> zkBarrierListener.onBarrierCreated(version));
   }
@@ -106,7 +106,7 @@ public class ZkBarrierForVersionUpgrade {
    */
   public void join(String version, String participantId) {
     String barrierDonePath = keyBuilder.getBarrierStatePath(version);
-    zkUtils.getZkClient().subscribeDataChanges(barrierDonePath, new ZkBarrierReachedHandler(barrierDonePath, version));
+    zkUtils.subscribeDataChanges(barrierDonePath, new ZkBarrierReachedHandler(barrierDonePath, version));
 
     // TODO: Handle ZkNodeExistsException - SAMZA-1304
     zkUtils.getZkClient().createPersistent(
@@ -119,7 +119,7 @@ public class ZkBarrierForVersionUpgrade {
    * @param version Version associated with the Barrier
    */
   public void expire(String version) {
-    zkUtils.getZkClient().writeData(
+    zkUtils.writeData(
         keyBuilder.getBarrierStatePath(version),
         State.TIMED_OUT);
 
@@ -150,8 +150,8 @@ public class ZkBarrierForVersionUpgrade {
       if (currentChildren.size() == names.size() && CollectionUtils.containsAll(currentChildren, names)) {
         String barrierDonePath = keyBuilder.getBarrierStatePath(barrierVersion);
         LOG.info("Writing BARRIER DONE to " + barrierDonePath);
-        zkUtils.getZkClient().writeData(barrierDonePath, State.DONE); // this will trigger notifications
-        zkUtils.getZkClient().unsubscribeChildChanges(barrierDonePath, this);
+        zkUtils.writeData(barrierDonePath, State.DONE); // this will trigger notifications
+        zkUtils.unsubscribeChildChanges(barrierDonePath, this);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/4072c9e8/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index cb32252..f2fc3de 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -21,11 +21,13 @@ package org.apache.samza.zk;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.samza.config.ApplicationConfig;
 import org.apache.samza.config.Config;
 import org.apache.samza.config.ConfigException;
 import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MetricsConfig;
 import org.apache.samza.config.ZkConfig;
 import org.apache.samza.coordinator.JobCoordinator;
 import org.apache.samza.coordinator.JobCoordinatorListener;
@@ -33,9 +35,13 @@ import org.apache.samza.coordinator.JobModelManager;
 import org.apache.samza.coordinator.LeaderElector;
 import org.apache.samza.coordinator.LeaderElectorListener;
 import org.apache.samza.job.model.JobModel;
+import org.apache.samza.metrics.MetricsReporter;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.metrics.ReadableMetricsRegistry;
 import org.apache.samza.runtime.ProcessorIdGenerator;
 import org.apache.samza.system.StreamMetadataCache;
 import org.apache.samza.util.ClassLoaderHelper;
+import org.apache.samza.util.MetricsReporterLoader;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -48,32 +54,33 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
   // with locality. Since host-affinity is not yet implemented, this can be fixed as part of SAMZA-1197
   private static final int METADATA_CACHE_TTL_MS = 5000;
 
-
   private final ZkUtils zkUtils;
   private final String processorId;
   private final ZkController zkController;
 
   private final Config config;
   private final ZkBarrierForVersionUpgrade barrier;
+  private final ZkJobCoordinatorMetrics metrics;
+  private final Map<String, MetricsReporter> reporters;
 
   private StreamMetadataCache streamMetadataCache = null;
   private ScheduleAfterDebounceTime debounceTimer = null;
   private JobCoordinatorListener coordinatorListener = null;
   private JobModel newJobModel;
-
   private int debounceTimeMs;
 
-  public ZkJobCoordinator(Config config) {
+  public ZkJobCoordinator(Config config, MetricsRegistry metricsRegistry) {
     this.config = config;
     ZkConfig zkConfig = new ZkConfig(config);
     ZkKeyBuilder keyBuilder = new ZkKeyBuilder(new ApplicationConfig(config).getGlobalAppId());
+    this.metrics = new ZkJobCoordinatorMetrics(metricsRegistry);
     this.zkUtils = new ZkUtils(
         keyBuilder,
         ZkCoordinationServiceFactory.createZkClient(
             zkConfig.getZkConnect(),
             zkConfig.getZkSessionTimeoutMs(),
             zkConfig.getZkConnectionTimeoutMs()),
-        zkConfig.getZkConnectionTimeoutMs());
+        zkConfig.getZkConnectionTimeoutMs(), metrics);
 
     this.processorId = createProcessorId(config);
     LeaderElector leaderElector = new ZkLeaderElector(processorId, zkUtils);
@@ -84,11 +91,13 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
         zkUtils,
         new ZkBarrierListenerImpl());
     this.debounceTimeMs = new JobConfig(config).getDebounceTimeMs();
+    this.reporters = MetricsReporterLoader.getMetricsReporters(new MetricsConfig(config), processorId);
 
   }
 
   @Override
   public void start() {
+    startMetrics();
     streamMetadataCache = StreamMetadataCache.apply(METADATA_CACHE_TTL_MS, config);
 
     debounceTimer = new ScheduleAfterDebounceTime(throwable -> {
@@ -104,15 +113,30 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
     if (coordinatorListener != null) {
       coordinatorListener.onJobModelExpired();
     }
-
+    //Setting the isLeader metric to false when the stream processor shuts down because it does not remain the leader anymore
+    metrics.isLeader.set(false);
     debounceTimer.stopScheduler();
     zkController.stop();
 
+    shutdownMetrics();
     if (coordinatorListener != null) {
       coordinatorListener.onCoordinatorStop();
     }
   }
 
+  private void startMetrics() {
+    for (MetricsReporter reporter: reporters.values()) {
+      reporter.register("job-coordinator-" + processorId, (ReadableMetricsRegistry) metrics.getMetricsRegistry());
+      reporter.start();
+    }
+  }
+
+  private void shutdownMetrics() {
+    for (MetricsReporter reporter: reporters.values()) {
+      reporter.stop();
+    }
+  }
+
   @Override
   public void setListener(JobCoordinatorListener listener) {
     this.coordinatorListener = listener;
@@ -143,7 +167,6 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
 
     // Generate the JobModel
     JobModel jobModel = generateNewJobModel(currentProcessorIds);
-
     // Assign the next version of JobModel
     String currentJMVersion  = zkUtils.getJobModelVersion();
     String nextJMVersion;
@@ -171,6 +194,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
     debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.JOB_MODEL_VERSION_CHANGE, 0, () ->
       {
         LOG.info("pid=" + processorId + "new JobModel available");
+
         // stop current work
         if (coordinatorListener != null) {
           coordinatorListener.onJobModelExpired();
@@ -236,6 +260,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
     @Override
     public void onBecomingLeader() {
       LOG.info("ZkJobCoordinator::onBecomeLeader - I became the leader!");
+      metrics.isLeader.set(true);
       zkController.subscribeToProcessorChange();
       debounceTimer.scheduleAfterDebounceTime(
         ScheduleAfterDebounceTime.ON_PROCESSOR_CHANGE,
@@ -248,8 +273,14 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
 
   class ZkBarrierListenerImpl implements ZkBarrierListener {
     private final String barrierAction = "BarrierAction";
+    private long startTime = 0;
+
     @Override
     public void onBarrierCreated(String version) {
+      // Start the timer for rebalancing
+      startTime = System.nanoTime();
+
+      metrics.barrierCreation.inc();
       debounceTimer.scheduleAfterDebounceTime(
           barrierAction,
         (new ZkConfig(config)).getZkBarrierTimeoutMs(),
@@ -259,6 +290,8 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
 
     public void onBarrierStateChanged(final String version, ZkBarrierForVersionUpgrade.State state) {
       LOG.info("JobModel version " + version + " obtained consensus successfully!");
+      metrics.barrierStateChange.inc();
+      metrics.singleBarrierRebalancingTime.update(System.nanoTime() - startTime);
       if (ZkBarrierForVersionUpgrade.State.DONE.equals(state)) {
         debounceTimer.scheduleAfterDebounceTime(
             barrierAction,
@@ -278,6 +311,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
     @Override
     public void onBarrierError(String version, Throwable t) {
       LOG.error("Encountered error while attaining consensus on JobModel version " + version);
+      metrics.barrierError.inc();
       stop();
     }
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/4072c9e8/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
index d2e0d14..c077f94 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
@@ -22,6 +22,8 @@ package org.apache.samza.zk;
 import org.apache.samza.config.Config;
 import org.apache.samza.coordinator.JobCoordinator;
 import org.apache.samza.coordinator.JobCoordinatorFactory;
+import org.apache.samza.metrics.MetricsRegistryMap;
+
 
 public class ZkJobCoordinatorFactory implements JobCoordinatorFactory {
   /**
@@ -32,6 +34,6 @@ public class ZkJobCoordinatorFactory implements JobCoordinatorFactory {
    */
   @Override
   public JobCoordinator getJobCoordinator(Config config) {
-    return new ZkJobCoordinator(config);
+    return new ZkJobCoordinator(config, new MetricsRegistryMap());
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/4072c9e8/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorMetrics.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorMetrics.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorMetrics.java
new file mode 100644
index 0000000..3437602
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorMetrics.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.zk;
+
+
+import org.apache.samza.metrics.MetricsBase;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.Gauge;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.metrics.Timer;
+
+
+public class ZkJobCoordinatorMetrics extends MetricsBase {
+
+  private final MetricsRegistry metricsRegistry;
+
+  public final Counter reads;
+  public final Counter writes;
+  public final Counter subscriptions;
+  public final Counter zkConnectionError;
+
+  /**
+   * Denotes if the processor is a leader or not
+   */
+  public final Gauge<Boolean> isLeader;
+
+  /**
+   * Number of times a barrier was created by the leader
+   */
+  public final Counter barrierCreation;
+
+  /**
+   * Number of times the barrier state changed
+   */
+  public final Counter barrierStateChange;
+
+  /**
+   * Number of times the barrier encountered an error while attaining consensus on the job model version
+   */
+  public final Counter barrierError;
+
+  /**
+   * Average time taken for all the processors to get the latest version of the job model after single
+   * processor change (without the occurence of a barrier timeout)
+   */
+  public final Timer singleBarrierRebalancingTime;
+
+  public ZkJobCoordinatorMetrics(MetricsRegistry metricsRegistry) {
+    super(metricsRegistry);
+    this.metricsRegistry = metricsRegistry;
+    this.reads = newCounter("reads");
+    this.writes = newCounter("writes");
+    this.subscriptions = newCounter("subscriptions");
+    this.zkConnectionError = newCounter("zk-connection-error");
+    this.isLeader = newGauge("is-leader", false);
+    this.barrierCreation = newCounter("barrier-creation");
+    this.barrierStateChange = newCounter("barrier-state-change");
+    this.barrierError = newCounter("barrier-error");
+    this.singleBarrierRebalancingTime = newTimer("single-barrier-rebalancing-time");
+  }
+
+  public MetricsRegistry getMetricsRegistry() {
+    return this.metricsRegistry;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/4072c9e8/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
index 677ce54..6560cb4 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
@@ -65,6 +65,7 @@ public class ZkUtils {
   private volatile String ephemeralPath = null;
   private final ZkKeyBuilder keyBuilder;
   private final int connectionTimeoutMs;
+  private ZkJobCoordinatorMetrics metrics;
 
   public ZkUtils(ZkKeyBuilder zkKeyBuilder, ZkClient zkClient, int connectionTimeoutMs) {
     this.keyBuilder = zkKeyBuilder;
@@ -72,9 +73,17 @@ public class ZkUtils {
     this.zkClient = zkClient;
   }
 
+  public ZkUtils(ZkKeyBuilder zkKeyBuilder, ZkClient zkClient, int connectionTimeoutMs, ZkJobCoordinatorMetrics metrics) {
+    this.keyBuilder = zkKeyBuilder;
+    this.connectionTimeoutMs = connectionTimeoutMs;
+    this.zkClient = zkClient;
+    this.metrics = metrics;
+  }
+
   public void connect() throws ZkInterruptedException {
     boolean isConnected = zkClient.waitUntilConnected(connectionTimeoutMs, TimeUnit.MILLISECONDS);
     if (!isConnected) {
+      metrics.zkConnectionError.inc();
       throw new RuntimeException("Unable to connect to Zookeeper within connectionTimeout " + connectionTimeoutMs + "ms. Shutting down!");
     }
   }
@@ -135,6 +144,7 @@ public class ZkUtils {
    */
   String readProcessorData(String fullPath) {
     String data = zkClient.<String>readData(fullPath, true);
+    metrics.reads.inc();
     if (data == null) {
       throw new SamzaException(String.format("Cannot read ZK node:", fullPath));
     }
@@ -177,6 +187,21 @@ public class ZkUtils {
 
   public void subscribeDataChanges(String path, IZkDataListener dataListener) {
     zkClient.subscribeDataChanges(path, dataListener);
+    metrics.subscriptions.inc();
+  }
+
+  public void subscribeChildChanges(String path, IZkChildListener listener) {
+    zkClient.subscribeChildChanges(path, listener);
+    metrics.subscriptions.inc();
+  }
+
+  public void unsubscribeChildChanges(String path, IZkChildListener childListener) {
+    zkClient.unsubscribeChildChanges(path, childListener);
+  }
+
+  public void writeData(String path, Object object) {
+    zkClient.writeData(path, object);
+    metrics.writes.inc();
   }
 
   public boolean exists(String path) {
@@ -194,6 +219,7 @@ public class ZkUtils {
   public void subscribeToJobModelVersionChange(IZkDataListener dataListener) {
     LOG.info(" subscribing for jm version change at:" + keyBuilder.getJobModelVersionPath());
     zkClient.subscribeDataChanges(keyBuilder.getJobModelVersionPath(), dataListener);
+    metrics.subscriptions.inc();
   }
 
   /**
@@ -224,6 +250,7 @@ public class ZkUtils {
   public JobModel getJobModel(String jobModelVersion) {
     LOG.info("read the model ver=" + jobModelVersion + " from " + keyBuilder.getJobModelPath(jobModelVersion));
     Object data = zkClient.readData(keyBuilder.getJobModelPath(jobModelVersion));
+    metrics.reads.inc();
     ObjectMapper mmapper = SamzaObjectMapper.getObjectMapper();
     JobModel jm;
     try {
@@ -250,6 +277,7 @@ public class ZkUtils {
   public void publishJobModelVersion(String oldVersion, String newVersion) {
     Stat stat = new Stat();
     String currentVersion = zkClient.<String>readData(keyBuilder.getJobModelVersionPath(), stat);
+    metrics.reads.inc();
     LOG.info("publishing new version: " + newVersion + "; oldVersion = " + oldVersion + "(" + stat
         .getVersion() + ")");
 
@@ -261,6 +289,7 @@ public class ZkUtils {
     int dataVersion = stat.getVersion();
     try {
       stat = zkClient.writeDataReturnStat(keyBuilder.getJobModelVersionPath(), newVersion, dataVersion);
+      metrics.writes.inc();
     } catch (Exception e) {
       String msg = "publish job model version failed for new version = " + newVersion + "; old version = " + oldVersion;
       LOG.error(msg, e);
@@ -290,5 +319,6 @@ public class ZkUtils {
   public void subscribeToProcessorChange(IZkChildListener listener) {
     LOG.info("subscribing for child change at:" + keyBuilder.getProcessorsPath());
     zkClient.subscribeChildChanges(keyBuilder.getProcessorsPath(), listener);
+    metrics.subscriptions.inc();
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/4072c9e8/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 4f5df94..3bf5c95 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -668,6 +668,7 @@ class SamzaContainer(
     try {
       info("Starting container.")
 
+      val startTime = System.nanoTime()
       status = SamzaContainerStatus.STARTING
 
       jmxServer = new JmxServer()
@@ -689,6 +690,7 @@ class SamzaContainer(
       if (containerListener != null) {
         containerListener.onContainerStart()
       }
+      metrics.containerStartupTime.update(System.nanoTime() - startTime)
       runLoop.run
     } catch {
       case e: Throwable =>

http://git-wip-us.apache.org/repos/asf/samza/blob/4072c9e8/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
index 18664d8..d080939 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
@@ -37,6 +37,7 @@ class SamzaContainerMetrics(
   val processNs = newTimer("process-ns")
   val commitNs = newTimer("commit-ns")
   val blockNs = newTimer("block-ns")
+  val containerStartupTime = newTimer("container-startup-time")
   val utilization = newGauge("event-loop-utilization", 0.0F)
   val diskUsageBytes = newGauge("disk-usage-bytes", 0L)
   val diskQuotaBytes = newGauge("disk-quota-bytes", Long.MaxValue)

http://git-wip-us.apache.org/repos/asf/samza/blob/4072c9e8/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
index 63d6663..49cd280 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
@@ -22,6 +22,7 @@ import junit.framework.Assert;
 import org.I0Itec.zkclient.ZkClient;
 import org.apache.samza.config.ZkConfig;
 import org.apache.samza.testUtils.EmbeddedZookeeper;
+import org.apache.samza.util.NoOpMetricsRegistry;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -52,9 +53,9 @@ public class TestZkBarrierForVersionUpgrade {
   @Before
   public void testSetup() {
     ZkClient zkClient = new ZkClient(testZkConnectionString, ZkConfig.DEFAULT_SESSION_TIMEOUT_MS, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS);
-    this.zkUtils = new ZkUtils(new ZkKeyBuilder("group1"), zkClient, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS);
+    this.zkUtils = new ZkUtils(new ZkKeyBuilder("group1"), zkClient, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS, new ZkJobCoordinatorMetrics(new NoOpMetricsRegistry()));
     ZkClient zkClient1 = new ZkClient(testZkConnectionString, ZkConfig.DEFAULT_SESSION_TIMEOUT_MS, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS);
-    this.zkUtils1 = new ZkUtils(new ZkKeyBuilder("group1"), zkClient1, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS);
+    this.zkUtils1 = new ZkUtils(new ZkKeyBuilder("group1"), zkClient1, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS, new ZkJobCoordinatorMetrics(new NoOpMetricsRegistry()));
   }
 
   @After

http://git-wip-us.apache.org/repos/asf/samza/blob/4072c9e8/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
index 393d733..993297b 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
@@ -28,6 +28,7 @@ import org.I0Itec.zkclient.ZkClient;
 import org.I0Itec.zkclient.exception.ZkNodeExistsException;
 import org.apache.samza.SamzaException;
 import org.apache.samza.testUtils.EmbeddedZookeeper;
+import org.apache.samza.util.NoOpMetricsRegistry;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -436,6 +437,6 @@ public class TestZkLeaderElector {
     return new ZkUtils(
         KEY_BUILDER,
         zkClient,
-        CONNECTION_TIMEOUT_MS);
+        CONNECTION_TIMEOUT_MS, new ZkJobCoordinatorMetrics(new NoOpMetricsRegistry()));
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/4072c9e8/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
index b7a0eb8..9e33484 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
@@ -31,6 +31,7 @@ import org.apache.samza.config.MapConfig;
 import org.apache.samza.job.model.ContainerModel;
 import org.apache.samza.job.model.JobModel;
 import org.apache.samza.testUtils.EmbeddedZookeeper;
+import org.apache.samza.util.NoOpMetricsRegistry;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -70,7 +71,7 @@ public class TestZkUtils {
     zkUtils = new ZkUtils(
         KEY_BUILDER,
         zkClient,
-        SESSION_TIMEOUT_MS);
+        SESSION_TIMEOUT_MS, new ZkJobCoordinatorMetrics(new NoOpMetricsRegistry()));
 
     zkUtils.connect();
   }
@@ -116,7 +117,7 @@ public class TestZkUtils {
     Assert.assertEquals(" ID1 didn't match", "1", l.get(0));
     Assert.assertEquals(" ID2 didn't match", "2", l.get(1));
   }
-  
+
   @Test
   public void testSubscribeToJobModelVersionChange() {