You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by na...@apache.org on 2017/06/26 23:35:38 UTC
samza git commit: SAMZA-1324 : Add a metricsreporter lifecycle for
JobCoordinator component of StreamProcessor
Repository: samza
Updated Branches:
refs/heads/master fde243475 -> 4072c9e85
SAMZA-1324 : Add a metricsreporter lifecycle for JobCoordinator component of StreamProcessor
Added a metrics class for ZK based job coordinator that reports a few metrics.
Author: PawasChhokra <Jaimatadi1$>
Author: Pawas Chhokra <pc...@pchhokra-mn1.linkedin.biz>
Author: PawasChhokra <pa...@gmail.com>
Reviewers: Navina Ramesh <na...@apache.org>
Closes #223 from PawasChhokra/ZkJobCoordinatorMetrics
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/4072c9e8
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/4072c9e8
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/4072c9e8
Branch: refs/heads/master
Commit: 4072c9e85c86212372eb527509ffa9c3eba97315
Parents: fde2434
Author: PawasChhokra <pa...@gmail.com>
Authored: Mon Jun 26 16:35:26 2017 -0700
Committer: navina <na...@apache.org>
Committed: Mon Jun 26 16:35:26 2017 -0700
----------------------------------------------------------------------
.../versioned/container/metrics-table.html | 45 +++++++++++
.../apache/samza/container/LocalityManager.java | 1 +
.../samza/zk/ZkBarrierForVersionUpgrade.java | 10 +--
.../org/apache/samza/zk/ZkJobCoordinator.java | 46 +++++++++--
.../samza/zk/ZkJobCoordinatorFactory.java | 4 +-
.../samza/zk/ZkJobCoordinatorMetrics.java | 83 ++++++++++++++++++++
.../main/java/org/apache/samza/zk/ZkUtils.java | 30 +++++++
.../apache/samza/container/SamzaContainer.scala | 2 +
.../samza/container/SamzaContainerMetrics.scala | 1 +
.../zk/TestZkBarrierForVersionUpgrade.java | 5 +-
.../apache/samza/zk/TestZkLeaderElector.java | 3 +-
.../java/org/apache/samza/zk/TestZkUtils.java | 5 +-
12 files changed, 218 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/4072c9e8/docs/learn/documentation/versioned/container/metrics-table.html
----------------------------------------------------------------------
diff --git a/docs/learn/documentation/versioned/container/metrics-table.html b/docs/learn/documentation/versioned/container/metrics-table.html
index 2eb46e3..7fbbc40 100644
--- a/docs/learn/documentation/versioned/container/metrics-table.html
+++ b/docs/learn/documentation/versioned/container/metrics-table.html
@@ -142,6 +142,7 @@
<li><a href="#bootstrapping-chooser-metrics">BootstrappingChooserMetrics</a></li>
<li><a href="#hdfs-system-producer-metrics">HdfsSystemProducerMetrics</a></li>
<li><a href="#elasticsearch-system-producer-metrics">ElasticsearchSystemProducerMetrics</a></li>
+ <li><a href="#zookeeper-job-coordinator-metrics">ZkJobCoordinatorMetrics</a></li>
</ul>
<p>Words highlighted like <span class="system">this</span> are placeholders for your own variable names defined in configuration file or system variables defined while starting the job.</p>
<p id="average-time" style="color: #00a">Note: Average time is calculated for the current time window (set to 300 seconds)</p>
@@ -215,6 +216,10 @@
<td>physical-memory-mb</td>
<td>The physical memory used by the Samza container process (native + on heap) (in megabytes)</td>
</tr>
+ <tr>
+ <td>container-startup-time</td>
+ <td><a href="#average-time">Average time</a> spent for the container to startup</td>
+ </tr>
<tr>
<th colspan="2" class="section" id="job-coordinator">job-coordinator</th>
@@ -887,6 +892,46 @@
<td><span class="system">system</span>-version-conflicts</td>
<td>Number of times the request could not be completed due to a conflict with the current state of the document</td>
</tr>
+
+ <tr>
+ <th colspan="2" class="section" id="zookeeper-job-coordinator-metrics">org.apache.samza.zk.ZkJobCoordinatorMetrics</th>
+ </tr>
+ <tr>
+ <td>reads</td>
+ <td>Number of reads from Zookeeper</td>
+ </tr>
+ <tr>
+ <td>writes</td>
+ <td>Number of writes to Zookeeper</td>
+ </tr>
+ <tr>
+ <td>subscriptions</td>
+ <td>Number of subscriptions to znodes in Zookeeper</td>
+ </tr>
+ <tr>
+ <td>zk-connection-error</td>
+ <td>Number of Zookeeper connection errors</td>
+ </tr>
+ <tr>
+ <td>is-leader</td>
+ <td>Denotes if the processor is a leader or not</td>
+ </tr>
+ <tr>
+ <td>barrier-creation</td>
+ <td>Number of times a barrier was created by the leader</td>
+ </tr>
+ <tr>
+ <td>barrier-state-change</td>
+ <td>Number of times the barrier state changed</td>
+ </tr>
+ <tr>
+ <td>barrier-error</td>
+ <td>Number of times the barrier encountered an error while attaining consensus on the job model version</td>
+ </tr>
+ <tr>
+ <td>single-barrier-rebalancing-time</td>
+ <td><a href="#average-time">Average time</a> taken for all the processors to get the latest version of the job model after single processor change (without the occurence of a barrier timeout)</td>
+ </tr>
</tbody>
</table>
</body>
http://git-wip-us.apache.org/repos/asf/samza/blob/4072c9e8/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java b/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
index 22380d3..bafebcc 100644
--- a/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
+++ b/samza-core/src/main/java/org/apache/samza/container/LocalityManager.java
@@ -54,6 +54,7 @@ public class LocalityManager extends AbstractCoordinatorStreamManager {
this.taskAssignmentManager = new TaskAssignmentManager(coordinatorStreamProducer, coordinatorStreamConsumer);
}
+
/**
* Special constructor that creates a write-only {@link LocalityManager} that only writes
* to coordinator stream in {@link SamzaContainer}
http://git-wip-us.apache.org/repos/asf/samza/blob/4072c9e8/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java b/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
index 581387d..c1343b1 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkBarrierForVersionUpgrade.java
@@ -93,7 +93,7 @@ public class ZkBarrierForVersionUpgrade {
// subscribe for participant's list changes
LOG.info("Subscribing for child changes at " + barrierParticipantsPath);
- zkUtils.getZkClient().subscribeChildChanges(barrierParticipantsPath, new ZkBarrierChangeHandler(version, participants));
+ zkUtils.subscribeChildChanges(barrierParticipantsPath, new ZkBarrierChangeHandler(version, participants));
barrierListenerOptional.ifPresent(zkBarrierListener -> zkBarrierListener.onBarrierCreated(version));
}
@@ -106,7 +106,7 @@ public class ZkBarrierForVersionUpgrade {
*/
public void join(String version, String participantId) {
String barrierDonePath = keyBuilder.getBarrierStatePath(version);
- zkUtils.getZkClient().subscribeDataChanges(barrierDonePath, new ZkBarrierReachedHandler(barrierDonePath, version));
+ zkUtils.subscribeDataChanges(barrierDonePath, new ZkBarrierReachedHandler(barrierDonePath, version));
// TODO: Handle ZkNodeExistsException - SAMZA-1304
zkUtils.getZkClient().createPersistent(
@@ -119,7 +119,7 @@ public class ZkBarrierForVersionUpgrade {
* @param version Version associated with the Barrier
*/
public void expire(String version) {
- zkUtils.getZkClient().writeData(
+ zkUtils.writeData(
keyBuilder.getBarrierStatePath(version),
State.TIMED_OUT);
@@ -150,8 +150,8 @@ public class ZkBarrierForVersionUpgrade {
if (currentChildren.size() == names.size() && CollectionUtils.containsAll(currentChildren, names)) {
String barrierDonePath = keyBuilder.getBarrierStatePath(barrierVersion);
LOG.info("Writing BARRIER DONE to " + barrierDonePath);
- zkUtils.getZkClient().writeData(barrierDonePath, State.DONE); // this will trigger notifications
- zkUtils.getZkClient().unsubscribeChildChanges(barrierDonePath, this);
+ zkUtils.writeData(barrierDonePath, State.DONE); // this will trigger notifications
+ zkUtils.unsubscribeChildChanges(barrierDonePath, this);
}
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/4072c9e8/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index cb32252..f2fc3de 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -21,11 +21,13 @@ package org.apache.samza.zk;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.samza.config.ApplicationConfig;
import org.apache.samza.config.Config;
import org.apache.samza.config.ConfigException;
import org.apache.samza.config.JobConfig;
+import org.apache.samza.config.MetricsConfig;
import org.apache.samza.config.ZkConfig;
import org.apache.samza.coordinator.JobCoordinator;
import org.apache.samza.coordinator.JobCoordinatorListener;
@@ -33,9 +35,13 @@ import org.apache.samza.coordinator.JobModelManager;
import org.apache.samza.coordinator.LeaderElector;
import org.apache.samza.coordinator.LeaderElectorListener;
import org.apache.samza.job.model.JobModel;
+import org.apache.samza.metrics.MetricsReporter;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.metrics.ReadableMetricsRegistry;
import org.apache.samza.runtime.ProcessorIdGenerator;
import org.apache.samza.system.StreamMetadataCache;
import org.apache.samza.util.ClassLoaderHelper;
+import org.apache.samza.util.MetricsReporterLoader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -48,32 +54,33 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
// with locality. Since host-affinity is not yet implemented, this can be fixed as part of SAMZA-1197
private static final int METADATA_CACHE_TTL_MS = 5000;
-
private final ZkUtils zkUtils;
private final String processorId;
private final ZkController zkController;
private final Config config;
private final ZkBarrierForVersionUpgrade barrier;
+ private final ZkJobCoordinatorMetrics metrics;
+ private final Map<String, MetricsReporter> reporters;
private StreamMetadataCache streamMetadataCache = null;
private ScheduleAfterDebounceTime debounceTimer = null;
private JobCoordinatorListener coordinatorListener = null;
private JobModel newJobModel;
-
private int debounceTimeMs;
- public ZkJobCoordinator(Config config) {
+ public ZkJobCoordinator(Config config, MetricsRegistry metricsRegistry) {
this.config = config;
ZkConfig zkConfig = new ZkConfig(config);
ZkKeyBuilder keyBuilder = new ZkKeyBuilder(new ApplicationConfig(config).getGlobalAppId());
+ this.metrics = new ZkJobCoordinatorMetrics(metricsRegistry);
this.zkUtils = new ZkUtils(
keyBuilder,
ZkCoordinationServiceFactory.createZkClient(
zkConfig.getZkConnect(),
zkConfig.getZkSessionTimeoutMs(),
zkConfig.getZkConnectionTimeoutMs()),
- zkConfig.getZkConnectionTimeoutMs());
+ zkConfig.getZkConnectionTimeoutMs(), metrics);
this.processorId = createProcessorId(config);
LeaderElector leaderElector = new ZkLeaderElector(processorId, zkUtils);
@@ -84,11 +91,13 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
zkUtils,
new ZkBarrierListenerImpl());
this.debounceTimeMs = new JobConfig(config).getDebounceTimeMs();
+ this.reporters = MetricsReporterLoader.getMetricsReporters(new MetricsConfig(config), processorId);
}
@Override
public void start() {
+ startMetrics();
streamMetadataCache = StreamMetadataCache.apply(METADATA_CACHE_TTL_MS, config);
debounceTimer = new ScheduleAfterDebounceTime(throwable -> {
@@ -104,15 +113,30 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
if (coordinatorListener != null) {
coordinatorListener.onJobModelExpired();
}
-
+ //Setting the isLeader metric to false when the stream processor shuts down because it does not remain the leader anymore
+ metrics.isLeader.set(false);
debounceTimer.stopScheduler();
zkController.stop();
+ shutdownMetrics();
if (coordinatorListener != null) {
coordinatorListener.onCoordinatorStop();
}
}
+ private void startMetrics() {
+ for (MetricsReporter reporter: reporters.values()) {
+ reporter.register("job-coordinator-" + processorId, (ReadableMetricsRegistry) metrics.getMetricsRegistry());
+ reporter.start();
+ }
+ }
+
+ private void shutdownMetrics() {
+ for (MetricsReporter reporter: reporters.values()) {
+ reporter.stop();
+ }
+ }
+
@Override
public void setListener(JobCoordinatorListener listener) {
this.coordinatorListener = listener;
@@ -143,7 +167,6 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
// Generate the JobModel
JobModel jobModel = generateNewJobModel(currentProcessorIds);
-
// Assign the next version of JobModel
String currentJMVersion = zkUtils.getJobModelVersion();
String nextJMVersion;
@@ -171,6 +194,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
debounceTimer.scheduleAfterDebounceTime(ScheduleAfterDebounceTime.JOB_MODEL_VERSION_CHANGE, 0, () ->
{
LOG.info("pid=" + processorId + "new JobModel available");
+
// stop current work
if (coordinatorListener != null) {
coordinatorListener.onJobModelExpired();
@@ -236,6 +260,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
@Override
public void onBecomingLeader() {
LOG.info("ZkJobCoordinator::onBecomeLeader - I became the leader!");
+ metrics.isLeader.set(true);
zkController.subscribeToProcessorChange();
debounceTimer.scheduleAfterDebounceTime(
ScheduleAfterDebounceTime.ON_PROCESSOR_CHANGE,
@@ -248,8 +273,14 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
class ZkBarrierListenerImpl implements ZkBarrierListener {
private final String barrierAction = "BarrierAction";
+ private long startTime = 0;
+
@Override
public void onBarrierCreated(String version) {
+ // Start the timer for rebalancing
+ startTime = System.nanoTime();
+
+ metrics.barrierCreation.inc();
debounceTimer.scheduleAfterDebounceTime(
barrierAction,
(new ZkConfig(config)).getZkBarrierTimeoutMs(),
@@ -259,6 +290,8 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
public void onBarrierStateChanged(final String version, ZkBarrierForVersionUpgrade.State state) {
LOG.info("JobModel version " + version + " obtained consensus successfully!");
+ metrics.barrierStateChange.inc();
+ metrics.singleBarrierRebalancingTime.update(System.nanoTime() - startTime);
if (ZkBarrierForVersionUpgrade.State.DONE.equals(state)) {
debounceTimer.scheduleAfterDebounceTime(
barrierAction,
@@ -278,6 +311,7 @@ public class ZkJobCoordinator implements JobCoordinator, ZkControllerListener {
@Override
public void onBarrierError(String version, Throwable t) {
LOG.error("Encountered error while attaining consensus on JobModel version " + version);
+ metrics.barrierError.inc();
stop();
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/4072c9e8/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
index d2e0d14..c077f94 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorFactory.java
@@ -22,6 +22,8 @@ package org.apache.samza.zk;
import org.apache.samza.config.Config;
import org.apache.samza.coordinator.JobCoordinator;
import org.apache.samza.coordinator.JobCoordinatorFactory;
+import org.apache.samza.metrics.MetricsRegistryMap;
+
public class ZkJobCoordinatorFactory implements JobCoordinatorFactory {
/**
@@ -32,6 +34,6 @@ public class ZkJobCoordinatorFactory implements JobCoordinatorFactory {
*/
@Override
public JobCoordinator getJobCoordinator(Config config) {
- return new ZkJobCoordinator(config);
+ return new ZkJobCoordinator(config, new MetricsRegistryMap());
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/4072c9e8/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorMetrics.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorMetrics.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorMetrics.java
new file mode 100644
index 0000000..3437602
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinatorMetrics.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.samza.zk;
+
+
+import org.apache.samza.metrics.MetricsBase;
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.Gauge;
+import org.apache.samza.metrics.MetricsRegistry;
+import org.apache.samza.metrics.Timer;
+
+
+public class ZkJobCoordinatorMetrics extends MetricsBase {
+
+ private final MetricsRegistry metricsRegistry;
+
+ public final Counter reads;
+ public final Counter writes;
+ public final Counter subscriptions;
+ public final Counter zkConnectionError;
+
+ /**
+ * Denotes if the processor is a leader or not
+ */
+ public final Gauge<Boolean> isLeader;
+
+ /**
+ * Number of times a barrier was created by the leader
+ */
+ public final Counter barrierCreation;
+
+ /**
+ * Number of times the barrier state changed
+ */
+ public final Counter barrierStateChange;
+
+ /**
+ * Number of times the barrier encountered an error while attaining consensus on the job model version
+ */
+ public final Counter barrierError;
+
+ /**
+ * Average time taken for all the processors to get the latest version of the job model after single
+ * processor change (without the occurence of a barrier timeout)
+ */
+ public final Timer singleBarrierRebalancingTime;
+
+ public ZkJobCoordinatorMetrics(MetricsRegistry metricsRegistry) {
+ super(metricsRegistry);
+ this.metricsRegistry = metricsRegistry;
+ this.reads = newCounter("reads");
+ this.writes = newCounter("writes");
+ this.subscriptions = newCounter("subscriptions");
+ this.zkConnectionError = newCounter("zk-connection-error");
+ this.isLeader = newGauge("is-leader", false);
+ this.barrierCreation = newCounter("barrier-creation");
+ this.barrierStateChange = newCounter("barrier-state-change");
+ this.barrierError = newCounter("barrier-error");
+ this.singleBarrierRebalancingTime = newTimer("single-barrier-rebalancing-time");
+ }
+
+ public MetricsRegistry getMetricsRegistry() {
+ return this.metricsRegistry;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/4072c9e8/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
index 677ce54..6560cb4 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkUtils.java
@@ -65,6 +65,7 @@ public class ZkUtils {
private volatile String ephemeralPath = null;
private final ZkKeyBuilder keyBuilder;
private final int connectionTimeoutMs;
+ private ZkJobCoordinatorMetrics metrics;
public ZkUtils(ZkKeyBuilder zkKeyBuilder, ZkClient zkClient, int connectionTimeoutMs) {
this.keyBuilder = zkKeyBuilder;
@@ -72,9 +73,17 @@ public class ZkUtils {
this.zkClient = zkClient;
}
+ public ZkUtils(ZkKeyBuilder zkKeyBuilder, ZkClient zkClient, int connectionTimeoutMs, ZkJobCoordinatorMetrics metrics) {
+ this.keyBuilder = zkKeyBuilder;
+ this.connectionTimeoutMs = connectionTimeoutMs;
+ this.zkClient = zkClient;
+ this.metrics = metrics;
+ }
+
public void connect() throws ZkInterruptedException {
boolean isConnected = zkClient.waitUntilConnected(connectionTimeoutMs, TimeUnit.MILLISECONDS);
if (!isConnected) {
+ metrics.zkConnectionError.inc();
throw new RuntimeException("Unable to connect to Zookeeper within connectionTimeout " + connectionTimeoutMs + "ms. Shutting down!");
}
}
@@ -135,6 +144,7 @@ public class ZkUtils {
*/
String readProcessorData(String fullPath) {
String data = zkClient.<String>readData(fullPath, true);
+ metrics.reads.inc();
if (data == null) {
throw new SamzaException(String.format("Cannot read ZK node:", fullPath));
}
@@ -177,6 +187,21 @@ public class ZkUtils {
public void subscribeDataChanges(String path, IZkDataListener dataListener) {
zkClient.subscribeDataChanges(path, dataListener);
+ metrics.subscriptions.inc();
+ }
+
+ public void subscribeChildChanges(String path, IZkChildListener listener) {
+ zkClient.subscribeChildChanges(path, listener);
+ metrics.subscriptions.inc();
+ }
+
+ public void unsubscribeChildChanges(String path, IZkChildListener childListener) {
+ zkClient.unsubscribeChildChanges(path, childListener);
+ }
+
+ public void writeData(String path, Object object) {
+ zkClient.writeData(path, object);
+ metrics.writes.inc();
}
public boolean exists(String path) {
@@ -194,6 +219,7 @@ public class ZkUtils {
public void subscribeToJobModelVersionChange(IZkDataListener dataListener) {
LOG.info(" subscribing for jm version change at:" + keyBuilder.getJobModelVersionPath());
zkClient.subscribeDataChanges(keyBuilder.getJobModelVersionPath(), dataListener);
+ metrics.subscriptions.inc();
}
/**
@@ -224,6 +250,7 @@ public class ZkUtils {
public JobModel getJobModel(String jobModelVersion) {
LOG.info("read the model ver=" + jobModelVersion + " from " + keyBuilder.getJobModelPath(jobModelVersion));
Object data = zkClient.readData(keyBuilder.getJobModelPath(jobModelVersion));
+ metrics.reads.inc();
ObjectMapper mmapper = SamzaObjectMapper.getObjectMapper();
JobModel jm;
try {
@@ -250,6 +277,7 @@ public class ZkUtils {
public void publishJobModelVersion(String oldVersion, String newVersion) {
Stat stat = new Stat();
String currentVersion = zkClient.<String>readData(keyBuilder.getJobModelVersionPath(), stat);
+ metrics.reads.inc();
LOG.info("publishing new version: " + newVersion + "; oldVersion = " + oldVersion + "(" + stat
.getVersion() + ")");
@@ -261,6 +289,7 @@ public class ZkUtils {
int dataVersion = stat.getVersion();
try {
stat = zkClient.writeDataReturnStat(keyBuilder.getJobModelVersionPath(), newVersion, dataVersion);
+ metrics.writes.inc();
} catch (Exception e) {
String msg = "publish job model version failed for new version = " + newVersion + "; old version = " + oldVersion;
LOG.error(msg, e);
@@ -290,5 +319,6 @@ public class ZkUtils {
public void subscribeToProcessorChange(IZkChildListener listener) {
LOG.info("subscribing for child change at:" + keyBuilder.getProcessorsPath());
zkClient.subscribeChildChanges(keyBuilder.getProcessorsPath(), listener);
+ metrics.subscriptions.inc();
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/4072c9e8/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
index 4f5df94..3bf5c95 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala
@@ -668,6 +668,7 @@ class SamzaContainer(
try {
info("Starting container.")
+ val startTime = System.nanoTime()
status = SamzaContainerStatus.STARTING
jmxServer = new JmxServer()
@@ -689,6 +690,7 @@ class SamzaContainer(
if (containerListener != null) {
containerListener.onContainerStart()
}
+ metrics.containerStartupTime.update(System.nanoTime() - startTime)
runLoop.run
} catch {
case e: Throwable =>
http://git-wip-us.apache.org/repos/asf/samza/blob/4072c9e8/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
----------------------------------------------------------------------
diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
index 18664d8..d080939 100644
--- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
+++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainerMetrics.scala
@@ -37,6 +37,7 @@ class SamzaContainerMetrics(
val processNs = newTimer("process-ns")
val commitNs = newTimer("commit-ns")
val blockNs = newTimer("block-ns")
+ val containerStartupTime = newTimer("container-startup-time")
val utilization = newGauge("event-loop-utilization", 0.0F)
val diskUsageBytes = newGauge("disk-usage-bytes", 0L)
val diskQuotaBytes = newGauge("disk-quota-bytes", Long.MaxValue)
http://git-wip-us.apache.org/repos/asf/samza/blob/4072c9e8/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
index 63d6663..49cd280 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkBarrierForVersionUpgrade.java
@@ -22,6 +22,7 @@ import junit.framework.Assert;
import org.I0Itec.zkclient.ZkClient;
import org.apache.samza.config.ZkConfig;
import org.apache.samza.testUtils.EmbeddedZookeeper;
+import org.apache.samza.util.NoOpMetricsRegistry;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@@ -52,9 +53,9 @@ public class TestZkBarrierForVersionUpgrade {
@Before
public void testSetup() {
ZkClient zkClient = new ZkClient(testZkConnectionString, ZkConfig.DEFAULT_SESSION_TIMEOUT_MS, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS);
- this.zkUtils = new ZkUtils(new ZkKeyBuilder("group1"), zkClient, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS);
+ this.zkUtils = new ZkUtils(new ZkKeyBuilder("group1"), zkClient, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS, new ZkJobCoordinatorMetrics(new NoOpMetricsRegistry()));
ZkClient zkClient1 = new ZkClient(testZkConnectionString, ZkConfig.DEFAULT_SESSION_TIMEOUT_MS, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS);
- this.zkUtils1 = new ZkUtils(new ZkKeyBuilder("group1"), zkClient1, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS);
+ this.zkUtils1 = new ZkUtils(new ZkKeyBuilder("group1"), zkClient1, ZkConfig.DEFAULT_CONNECTION_TIMEOUT_MS, new ZkJobCoordinatorMetrics(new NoOpMetricsRegistry()));
}
@After
http://git-wip-us.apache.org/repos/asf/samza/blob/4072c9e8/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
index 393d733..993297b 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkLeaderElector.java
@@ -28,6 +28,7 @@ import org.I0Itec.zkclient.ZkClient;
import org.I0Itec.zkclient.exception.ZkNodeExistsException;
import org.apache.samza.SamzaException;
import org.apache.samza.testUtils.EmbeddedZookeeper;
+import org.apache.samza.util.NoOpMetricsRegistry;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -436,6 +437,6 @@ public class TestZkLeaderElector {
return new ZkUtils(
KEY_BUILDER,
zkClient,
- CONNECTION_TIMEOUT_MS);
+ CONNECTION_TIMEOUT_MS, new ZkJobCoordinatorMetrics(new NoOpMetricsRegistry()));
}
}
http://git-wip-us.apache.org/repos/asf/samza/blob/4072c9e8/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
index b7a0eb8..9e33484 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkUtils.java
@@ -31,6 +31,7 @@ import org.apache.samza.config.MapConfig;
import org.apache.samza.job.model.ContainerModel;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.testUtils.EmbeddedZookeeper;
+import org.apache.samza.util.NoOpMetricsRegistry;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
@@ -70,7 +71,7 @@ public class TestZkUtils {
zkUtils = new ZkUtils(
KEY_BUILDER,
zkClient,
- SESSION_TIMEOUT_MS);
+ SESSION_TIMEOUT_MS, new ZkJobCoordinatorMetrics(new NoOpMetricsRegistry()));
zkUtils.connect();
}
@@ -116,7 +117,7 @@ public class TestZkUtils {
Assert.assertEquals(" ID1 didn't match", "1", l.get(0));
Assert.assertEquals(" ID2 didn't match", "2", l.get(1));
}
-
+
@Test
public void testSubscribeToJobModelVersionChange() {