You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sh...@apache.org on 2019/01/22 20:56:07 UTC
samza git commit: SAMZA-2078: Add zookeeper session metrics in
standalone
Repository: samza
Updated Branches:
refs/heads/master 5ce320c26 -> 3a2010604
SAMZA-2078: Add zookeeper session metrics in standalone
Author: Shanthoosh Venkataraman <sp...@usc.edu>
Reviewers: Jagadish <jv...@linkedin.com>
Closes #888 from shanthoosh/SAMZA-2078 and squashes the following commits:
00f2e58b [Shanthoosh Venkataraman] Address review comments.
b59ea623 [Shanthoosh Venkataraman] SAMZA-2078: Add zookeeper session metrics to ZkJobCoordinator.
Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/3a201060
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/3a201060
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/3a201060
Branch: refs/heads/master
Commit: 3a2010604468769577bd6e541d5f0e47be322f6c
Parents: 5ce320c
Author: Shanthoosh Venkataraman <sp...@usc.edu>
Authored: Tue Jan 22 12:55:48 2019 -0800
Committer: Shanthoosh Venkataraman <sp...@usc.edu>
Committed: Tue Jan 22 12:55:48 2019 -0800
----------------------------------------------------------------------
.../org/apache/samza/zk/ZkJobCoordinator.java | 10 ++++
.../org/apache/samza/zk/ZkSessionMetrics.java | 60 ++++++++++++++++++++
.../apache/samza/zk/TestZkJobCoordinator.java | 36 ++++++++++++
3 files changed, 106 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/samza/blob/3a201060/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index cedca3c..9bfc3f8 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -113,6 +113,9 @@ public class ZkJobCoordinator implements JobCoordinator {
private String cachedJobModelVersion = null;
@VisibleForTesting
+ ZkSessionMetrics zkSessionMetrics;
+
+ @VisibleForTesting
ScheduleAfterDebounceTime debounceTimer;
@VisibleForTesting
@@ -121,6 +124,7 @@ public class ZkJobCoordinator implements JobCoordinator {
ZkJobCoordinator(String processorId, Config config, MetricsRegistry metricsRegistry, ZkUtils zkUtils) {
this.config = config;
this.metrics = new ZkJobCoordinatorMetrics(metricsRegistry);
+ this.zkSessionMetrics = new ZkSessionMetrics(metricsRegistry);
this.processorId = processorId;
this.zkUtils = zkUtils;
@@ -554,6 +558,7 @@ public class ZkJobCoordinator implements JobCoordinator {
switch (state) {
case Expired:
// if the session has expired it means that all the registration's ephemeral nodes are gone.
+ zkSessionMetrics.zkSessionExpirations.inc();
LOG.warn("Got " + state.toString() + " event for processor=" + processorId + ". Stopping the container and unregister the processor node.");
// increase generation of the ZK session. All the callbacks from the previous generation will be ignored.
@@ -587,6 +592,7 @@ public class ZkJobCoordinator implements JobCoordinator {
return;
case Disconnected:
// if the session has expired it means that all the registration's ephemeral nodes are gone.
+ zkSessionMetrics.zkSessionDisconnects.inc();
LOG.warn("Got " + state.toString() + " event for processor=" + processorId + ". Scheduling a coordinator stop.");
// If the connection is not restored after debounceTimeMs, the process is considered dead.
@@ -595,10 +601,12 @@ public class ZkJobCoordinator implements JobCoordinator {
case AuthFailed:
case NoSyncConnected:
case Unknown:
+ zkSessionMetrics.zkSessionErrors.inc();
LOG.warn("Got unexpected failure event " + state.toString() + " for processor=" + processorId + ". Stopping the job coordinator.");
debounceTimer.scheduleAfterDebounceTime(ZK_SESSION_ERROR, 0, () -> stop());
return;
case SyncConnected:
+ zkSessionMetrics.zkSyncConnected.inc();
LOG.info("Got syncconnected event for processor=" + processorId + ".");
debounceTimer.cancelAction(ZK_SESSION_ERROR);
return;
@@ -610,6 +618,7 @@ public class ZkJobCoordinator implements JobCoordinator {
@Override
public void handleNewSession() {
+ zkSessionMetrics.zkNewSessions.inc();
LOG.info("Got new session created event for processor=" + processorId);
debounceTimer.cancelAllScheduledActions();
LOG.info("register zk controller for the new session");
@@ -620,6 +629,7 @@ public class ZkJobCoordinator implements JobCoordinator {
@Override
public void handleSessionEstablishmentError(Throwable error) {
// this means we cannot connect to zookeeper to establish a session
+ zkSessionMetrics.zkSessionErrors.inc();
LOG.info("handleSessionEstablishmentError received for processor=" + processorId, error);
debounceTimer.scheduleAfterDebounceTime(ZK_SESSION_ERROR, 0, () -> stop());
}
http://git-wip-us.apache.org/repos/asf/samza/blob/3a201060/samza-core/src/main/java/org/apache/samza/zk/ZkSessionMetrics.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkSessionMetrics.java b/samza-core/src/main/java/org/apache/samza/zk/ZkSessionMetrics.java
new file mode 100644
index 0000000..3f2a8a8
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkSessionMetrics.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.zk;
+
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.MetricsBase;
+import org.apache.samza.metrics.MetricsRegistry;
+
+public class ZkSessionMetrics extends MetricsBase {
+
+ /**
+ * Number of zookeeper client session expirations.
+ */
+ public final Counter zkSessionExpirations;
+
+ /**
+ * Number of zookeeper client session disconnects.
+ */
+ public final Counter zkSessionDisconnects;
+
+ /**
+ * Number of zookeeper client session errors.
+ */
+ public final Counter zkSessionErrors;
+
+ /**
+ * Number of new zookeeper client sessions.
+ */
+ public final Counter zkNewSessions;
+
+ /**
+ * Number of zookeeper sync connected events.
+ */
+ public final Counter zkSyncConnected;
+
+ public ZkSessionMetrics(MetricsRegistry registry) {
+ super(registry);
+ this.zkSessionExpirations = newCounter("zk-session-expirations");
+ this.zkSessionDisconnects = newCounter("zk-session-disconnects");
+ this.zkSessionErrors = newCounter("zk-session-errors");
+ this.zkNewSessions = newCounter("zk-new-sessions");
+ this.zkSyncConnected = newCounter("zk-sync-connected");
+ }
+}
http://git-wip-us.apache.org/repos/asf/samza/blob/3a201060/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
index e0a0941..1ade66d 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
@@ -23,12 +23,15 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.I0Itec.zkclient.ZkClient;
+import org.apache.samza.SamzaException;
import org.apache.samza.config.MapConfig;
import org.apache.samza.coordinator.StreamPartitionCountMonitor;
import org.apache.samza.job.model.JobModel;
+import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.util.NoOpMetricsRegistry;
import org.apache.samza.zk.ZkJobCoordinator.ZkSessionStateChangedListener;
import org.apache.zookeeper.Watcher;
+import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
@@ -82,6 +85,7 @@ public class TestZkJobCoordinator {
ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new NoOpMetricsRegistry(), zkUtils));
zkJobCoordinator.debounceTimer = mockDebounceTimer;
+ zkJobCoordinator.zkSessionMetrics = new ZkSessionMetrics(new MetricsRegistryMap());
final ZkSessionStateChangedListener zkSessionStateChangedListener = zkJobCoordinator.new ZkSessionStateChangedListener();
zkSessionStateChangedListener.handleStateChanged(Watcher.Event.KeeperState.Expired);
@@ -89,6 +93,38 @@ public class TestZkJobCoordinator {
verify(zkUtils).incGeneration();
verify(mockDebounceTimer).cancelAllScheduledActions();
verify(mockDebounceTimer).scheduleAfterDebounceTime(Mockito.eq("ZK_SESSION_EXPIRED"), Mockito.eq(0L), Mockito.any(Runnable.class));
+ Assert.assertEquals(1, zkJobCoordinator.zkSessionMetrics.zkSessionExpirations.getCount());
+ }
+
+ @Test
+ public void testZookeeperSessionMetricsAreUpdatedCoorrectly() {
+ ZkKeyBuilder keyBuilder = Mockito.mock(ZkKeyBuilder.class);
+ ZkClient mockZkClient = Mockito.mock(ZkClient.class);
+ when(keyBuilder.getJobModelVersionBarrierPrefix()).thenReturn(TEST_BARRIER_ROOT);
+
+ ZkUtils zkUtils = Mockito.mock(ZkUtils.class);
+ when(zkUtils.getKeyBuilder()).thenReturn(keyBuilder);
+ when(zkUtils.getZkClient()).thenReturn(mockZkClient);
+ when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(new JobModel(new MapConfig(), new HashMap<>()));
+
+ ScheduleAfterDebounceTime mockDebounceTimer = Mockito.mock(ScheduleAfterDebounceTime.class);
+
+ ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new NoOpMetricsRegistry(), zkUtils));
+ zkJobCoordinator.debounceTimer = mockDebounceTimer;
+ zkJobCoordinator.zkSessionMetrics = new ZkSessionMetrics(new MetricsRegistryMap());
+ final ZkSessionStateChangedListener zkSessionStateChangedListener = zkJobCoordinator.new ZkSessionStateChangedListener();
+
+ zkSessionStateChangedListener.handleStateChanged(Watcher.Event.KeeperState.Disconnected);
+ zkSessionStateChangedListener.handleStateChanged(Watcher.Event.KeeperState.SyncConnected);
+ zkSessionStateChangedListener.handleStateChanged(Watcher.Event.KeeperState.AuthFailed);
+
+ Assert.assertEquals(1, zkJobCoordinator.zkSessionMetrics.zkSessionErrors.getCount());
+
+ zkSessionStateChangedListener.handleSessionEstablishmentError(new SamzaException("Test exception"));
+
+ Assert.assertEquals(1, zkJobCoordinator.zkSessionMetrics.zkSessionDisconnects.getCount());
+ Assert.assertEquals(1, zkJobCoordinator.zkSessionMetrics.zkSyncConnected.getCount());
+ Assert.assertEquals(2, zkJobCoordinator.zkSessionMetrics.zkSessionErrors.getCount());
}
@Test