You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@samza.apache.org by sh...@apache.org on 2019/01/22 20:56:07 UTC

samza git commit: SAMZA-2078: Add zookeeper session metrics in standalone

Repository: samza
Updated Branches:
  refs/heads/master 5ce320c26 -> 3a2010604


SAMZA-2078: Add zookeeper session metrics in standalone

Author: Shanthoosh Venkataraman <sp...@usc.edu>

Reviewers: Jagadish <jv...@linkedin.com>

Closes #888 from shanthoosh/SAMZA-2078 and squashes the following commits:

00f2e58b [Shanthoosh Venkataraman] Address review comments.
b59ea623 [Shanthoosh Venkataraman] SAMZA-2078: Add zookeeper session metrics to ZkJobCoordinator.


Project: http://git-wip-us.apache.org/repos/asf/samza/repo
Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/3a201060
Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/3a201060
Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/3a201060

Branch: refs/heads/master
Commit: 3a2010604468769577bd6e541d5f0e47be322f6c
Parents: 5ce320c
Author: Shanthoosh Venkataraman <sp...@usc.edu>
Authored: Tue Jan 22 12:55:48 2019 -0800
Committer: Shanthoosh Venkataraman <sp...@usc.edu>
Committed: Tue Jan 22 12:55:48 2019 -0800

----------------------------------------------------------------------
 .../org/apache/samza/zk/ZkJobCoordinator.java   | 10 ++++
 .../org/apache/samza/zk/ZkSessionMetrics.java   | 60 ++++++++++++++++++++
 .../apache/samza/zk/TestZkJobCoordinator.java   | 36 ++++++++++++
 3 files changed, 106 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/samza/blob/3a201060/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
index cedca3c..9bfc3f8 100644
--- a/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java
@@ -113,6 +113,9 @@ public class ZkJobCoordinator implements JobCoordinator {
   private String cachedJobModelVersion = null;
 
   @VisibleForTesting
+  ZkSessionMetrics zkSessionMetrics;
+
+  @VisibleForTesting
   ScheduleAfterDebounceTime debounceTimer;
 
   @VisibleForTesting
@@ -121,6 +124,7 @@ public class ZkJobCoordinator implements JobCoordinator {
   ZkJobCoordinator(String processorId, Config config, MetricsRegistry metricsRegistry, ZkUtils zkUtils) {
     this.config = config;
     this.metrics = new ZkJobCoordinatorMetrics(metricsRegistry);
+    this.zkSessionMetrics = new ZkSessionMetrics(metricsRegistry);
 
     this.processorId = processorId;
     this.zkUtils = zkUtils;
@@ -554,6 +558,7 @@ public class ZkJobCoordinator implements JobCoordinator {
       switch (state) {
         case Expired:
           // if the session has expired it means that all the registration's ephemeral nodes are gone.
+          zkSessionMetrics.zkSessionExpirations.inc();
           LOG.warn("Got " + state.toString() + " event for processor=" + processorId + ". Stopping the container and unregister the processor node.");
 
           // increase generation of the ZK session. All the callbacks from the previous generation will be ignored.
@@ -587,6 +592,7 @@ public class ZkJobCoordinator implements JobCoordinator {
           return;
         case Disconnected:
           // if the session has expired it means that all the registration's ephemeral nodes are gone.
+          zkSessionMetrics.zkSessionDisconnects.inc();
           LOG.warn("Got " + state.toString() + " event for processor=" + processorId + ". Scheduling a coordinator stop.");
 
           // If the connection is not restored after debounceTimeMs, the process is considered dead.
@@ -595,10 +601,12 @@ public class ZkJobCoordinator implements JobCoordinator {
         case AuthFailed:
         case NoSyncConnected:
         case Unknown:
+          zkSessionMetrics.zkSessionErrors.inc();
           LOG.warn("Got unexpected failure event " + state.toString() + " for processor=" + processorId + ". Stopping the job coordinator.");
           debounceTimer.scheduleAfterDebounceTime(ZK_SESSION_ERROR, 0, () -> stop());
           return;
         case SyncConnected:
+          zkSessionMetrics.zkSyncConnected.inc();
           LOG.info("Got syncconnected event for processor=" + processorId + ".");
           debounceTimer.cancelAction(ZK_SESSION_ERROR);
           return;
@@ -610,6 +618,7 @@ public class ZkJobCoordinator implements JobCoordinator {
 
     @Override
     public void handleNewSession() {
+      zkSessionMetrics.zkNewSessions.inc();
       LOG.info("Got new session created event for processor=" + processorId);
       debounceTimer.cancelAllScheduledActions();
       LOG.info("register zk controller for the new session");
@@ -620,6 +629,7 @@ public class ZkJobCoordinator implements JobCoordinator {
     @Override
     public void handleSessionEstablishmentError(Throwable error) {
       // this means we cannot connect to zookeeper to establish a session
+      zkSessionMetrics.zkSessionErrors.inc();
       LOG.info("handleSessionEstablishmentError received for processor=" + processorId, error);
       debounceTimer.scheduleAfterDebounceTime(ZK_SESSION_ERROR, 0, () -> stop());
     }

http://git-wip-us.apache.org/repos/asf/samza/blob/3a201060/samza-core/src/main/java/org/apache/samza/zk/ZkSessionMetrics.java
----------------------------------------------------------------------
diff --git a/samza-core/src/main/java/org/apache/samza/zk/ZkSessionMetrics.java b/samza-core/src/main/java/org/apache/samza/zk/ZkSessionMetrics.java
new file mode 100644
index 0000000..3f2a8a8
--- /dev/null
+++ b/samza-core/src/main/java/org/apache/samza/zk/ZkSessionMetrics.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.samza.zk;
+
+import org.apache.samza.metrics.Counter;
+import org.apache.samza.metrics.MetricsBase;
+import org.apache.samza.metrics.MetricsRegistry;
+
+public class ZkSessionMetrics extends MetricsBase {
+
+  /**
+   * Number of zookeeper client session expirations.
+   */
+  public final Counter zkSessionExpirations;
+
+  /**
+   * Number of zookeeper client session disconnects.
+   */
+  public final Counter zkSessionDisconnects;
+
+  /**
+   * Number of zookeeper client session errors.
+   */
+  public final Counter zkSessionErrors;
+
+  /**
+   * Number of new zookeeper client sessions.
+   */
+  public final Counter zkNewSessions;
+
+  /**
+   * Number of zookeeper sync connected events.
+   */
+  public final Counter zkSyncConnected;
+
+  public ZkSessionMetrics(MetricsRegistry registry) {
+    super(registry);
+    this.zkSessionExpirations = newCounter("zk-session-expirations");
+    this.zkSessionDisconnects = newCounter("zk-session-disconnects");
+    this.zkSessionErrors = newCounter("zk-session-errors");
+    this.zkNewSessions = newCounter("zk-new-sessions");
+    this.zkSyncConnected = newCounter("zk-sync-connected");
+  }
+}

http://git-wip-us.apache.org/repos/asf/samza/blob/3a201060/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
----------------------------------------------------------------------
diff --git a/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java b/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
index e0a0941..1ade66d 100644
--- a/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
+++ b/samza-core/src/test/java/org/apache/samza/zk/TestZkJobCoordinator.java
@@ -23,12 +23,15 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import org.I0Itec.zkclient.ZkClient;
+import org.apache.samza.SamzaException;
 import org.apache.samza.config.MapConfig;
 import org.apache.samza.coordinator.StreamPartitionCountMonitor;
 import org.apache.samza.job.model.JobModel;
+import org.apache.samza.metrics.MetricsRegistryMap;
 import org.apache.samza.util.NoOpMetricsRegistry;
 import org.apache.samza.zk.ZkJobCoordinator.ZkSessionStateChangedListener;
 import org.apache.zookeeper.Watcher;
+import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
@@ -82,6 +85,7 @@ public class TestZkJobCoordinator {
 
     ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new NoOpMetricsRegistry(), zkUtils));
     zkJobCoordinator.debounceTimer = mockDebounceTimer;
+    zkJobCoordinator.zkSessionMetrics = new ZkSessionMetrics(new MetricsRegistryMap());
     final ZkSessionStateChangedListener zkSessionStateChangedListener = zkJobCoordinator.new ZkSessionStateChangedListener();
 
     zkSessionStateChangedListener.handleStateChanged(Watcher.Event.KeeperState.Expired);
@@ -89,6 +93,38 @@ public class TestZkJobCoordinator {
     verify(zkUtils).incGeneration();
     verify(mockDebounceTimer).cancelAllScheduledActions();
     verify(mockDebounceTimer).scheduleAfterDebounceTime(Mockito.eq("ZK_SESSION_EXPIRED"), Mockito.eq(0L), Mockito.any(Runnable.class));
+    Assert.assertEquals(1, zkJobCoordinator.zkSessionMetrics.zkSessionExpirations.getCount());
+  }
+
+  @Test
+  public void testZookeeperSessionMetricsAreUpdatedCoorrectly() {
+    ZkKeyBuilder keyBuilder = Mockito.mock(ZkKeyBuilder.class);
+    ZkClient mockZkClient = Mockito.mock(ZkClient.class);
+    when(keyBuilder.getJobModelVersionBarrierPrefix()).thenReturn(TEST_BARRIER_ROOT);
+
+    ZkUtils zkUtils = Mockito.mock(ZkUtils.class);
+    when(zkUtils.getKeyBuilder()).thenReturn(keyBuilder);
+    when(zkUtils.getZkClient()).thenReturn(mockZkClient);
+    when(zkUtils.getJobModel(TEST_JOB_MODEL_VERSION)).thenReturn(new JobModel(new MapConfig(), new HashMap<>()));
+
+    ScheduleAfterDebounceTime mockDebounceTimer = Mockito.mock(ScheduleAfterDebounceTime.class);
+
+    ZkJobCoordinator zkJobCoordinator = Mockito.spy(new ZkJobCoordinator("TEST_PROCESSOR_ID", new MapConfig(), new NoOpMetricsRegistry(), zkUtils));
+    zkJobCoordinator.debounceTimer = mockDebounceTimer;
+    zkJobCoordinator.zkSessionMetrics = new ZkSessionMetrics(new MetricsRegistryMap());
+    final ZkSessionStateChangedListener zkSessionStateChangedListener = zkJobCoordinator.new ZkSessionStateChangedListener();
+
+    zkSessionStateChangedListener.handleStateChanged(Watcher.Event.KeeperState.Disconnected);
+    zkSessionStateChangedListener.handleStateChanged(Watcher.Event.KeeperState.SyncConnected);
+    zkSessionStateChangedListener.handleStateChanged(Watcher.Event.KeeperState.AuthFailed);
+
+    Assert.assertEquals(1, zkJobCoordinator.zkSessionMetrics.zkSessionErrors.getCount());
+
+    zkSessionStateChangedListener.handleSessionEstablishmentError(new SamzaException("Test exception"));
+
+    Assert.assertEquals(1, zkJobCoordinator.zkSessionMetrics.zkSessionDisconnects.getCount());
+    Assert.assertEquals(1, zkJobCoordinator.zkSessionMetrics.zkSyncConnected.getCount());
+    Assert.assertEquals(2, zkJobCoordinator.zkSessionMetrics.zkSessionErrors.getCount());
   }
 
   @Test