You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/11/14 00:50:54 UTC

helix git commit: Adding new attribute outstanding request gauge to ZkClient Monitor.

Repository: helix
Updated Branches:
  refs/heads/master 00e50db3a -> 3cf29010c


Adding new attribute outstanding request gauge to ZkClient Monitor.

This attribute will help to debug ZkClient related issues. For example, connection lost and requests are stuck.
Also upgrade ZkClientMonitor to be a Dynamic MBean Provider.
Related tests added to TestRawZkClient and TestZkClientMonitor.

Additional change:
Remove the data size test in TestRawZkClient. Since we have auto compression enabled now, this test case is no longer valid.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/3cf29010
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/3cf29010
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/3cf29010

Branch: refs/heads/master
Commit: 3cf29010c03ad6db1498261a37304eb2949b1b24
Parents: 00e50db
Author: Jiajun Wang <jj...@linkedin.com>
Authored: Thu Nov 1 22:23:40 2018 -0700
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Tue Nov 13 16:50:47 2018 -0800

----------------------------------------------------------------------
 .../helix/manager/zk/zookeeper/ZkClient.java    |  73 ++++----
 .../monitoring/mbeans/ZkClientMonitor.java      | 166 ++++++++++++-------
 .../monitoring/mbeans/ZkClientMonitorMBean.java |  30 ----
 .../monitoring/mbeans/ZkClientPathMonitor.java  |  25 ++-
 .../helix/manager/zk/TestRawZkClient.java       |  87 ++++++++--
 .../monitoring/mbeans/TestZkClientMonitor.java  |  33 +++-
 6 files changed, 259 insertions(+), 155 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/3cf29010/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
index 4d2a93d..132d9e3 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
@@ -147,8 +147,9 @@ public class ZkClient implements Watcher {
       if (monitorKey != null && !monitorKey.isEmpty() && monitorType != null && !monitorType
           .isEmpty()) {
         _monitor =
-            new ZkClientMonitor(monitorType, monitorKey, monitorInstanceName, monitorRootPathOnly);
-        _monitor.setZkEventThread(_eventThread);
+            new ZkClientMonitor(monitorType, monitorKey, monitorInstanceName, monitorRootPathOnly,
+                _eventThread);
+        _monitor.register();
       } else {
         LOG.info("ZkClient monitor key or type is not provided. Skip monitoring.");
       }
@@ -1085,38 +1086,46 @@ public class ZkClient implements Watcher {
       throw new IllegalArgumentException("Must not be done in the zookeeper event thread.");
     }
     final long operationStartTime = System.currentTimeMillis();
-    while (true) {
-      if (isClosed()) {
-        throw new IllegalStateException("ZkClient already closed!");
-      }
-      try {
-        final ZkConnection zkConnection = (ZkConnection) getConnection();
-        // Validate that the connection is not null before trigger callback
-        if (zkConnection == null || zkConnection.getZookeeper() == null) {
-          throw new IllegalStateException(
-              "ZkConnection is in invalid state! Please close this ZkClient and create new client.");
+    if (_monitor != null) {
+      _monitor.increaseOutstandingRequestGauge();
+    }
+    try {
+      while (true) {
+        if (isClosed()) {
+          throw new IllegalStateException("ZkClient already closed!");
+        }
+        try {
+          final ZkConnection zkConnection = (ZkConnection) getConnection();
+          // Validate that the connection is not null before trigger callback
+          if (zkConnection == null || zkConnection.getZookeeper() == null) {
+            throw new IllegalStateException(
+                "ZkConnection is in invalid state! Please close this ZkClient and create new client.");
+          }
+          return callable.call();
+        } catch (ConnectionLossException e) {
+          // we give the event thread some time to update the status to 'Disconnected'
+          Thread.yield();
+          waitForRetry();
+        } catch (SessionExpiredException e) {
+          // we give the event thread some time to update the status to 'Expired'
+          Thread.yield();
+          waitForRetry();
+        } catch (KeeperException e) {
+          throw ZkException.create(e);
+        } catch (InterruptedException e) {
+          throw new ZkInterruptedException(e);
+        } catch (Exception e) {
+          throw ExceptionUtil.convertToRuntimeException(e);
+        }
+        // before attempting a retry, check whether retry timeout has elapsed
+        if (System.currentTimeMillis() - operationStartTime > _operationRetryTimeoutInMillis) {
+          throw new ZkTimeoutException("Operation cannot be retried because of retry timeout (" + _operationRetryTimeoutInMillis
+              + " milli seconds)");
         }
-        return callable.call();
-      } catch (ConnectionLossException e) {
-        // we give the event thread some time to update the status to 'Disconnected'
-        Thread.yield();
-        waitForRetry();
-      } catch (SessionExpiredException e) {
-        // we give the event thread some time to update the status to 'Expired'
-        Thread.yield();
-        waitForRetry();
-      } catch (KeeperException e) {
-        throw ZkException.create(e);
-      } catch (InterruptedException e) {
-        throw new ZkInterruptedException(e);
-      } catch (Exception e) {
-        throw ExceptionUtil.convertToRuntimeException(e);
       }
-      // before attempting a retry, check whether retry timeout has elapsed
-      if (System.currentTimeMillis() - operationStartTime > _operationRetryTimeoutInMillis) {
-        throw new ZkTimeoutException(
-            "Operation cannot be retried because of retry timeout (" + _operationRetryTimeoutInMillis
-                + " milli seconds)");
+    } finally {
+      if (_monitor != null) {
+        _monitor.decreaseOutstandingRequestGauge();
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/helix/blob/3cf29010/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientMonitor.java
index febc4f3..0fe6911 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientMonitor.java
@@ -19,36 +19,49 @@ package org.apache.helix.monitoring.mbeans;
  * under the License.
  */
 
-import org.apache.helix.HelixException;
-
 import javax.management.JMException;
+import javax.management.MBeanAttributeInfo;
 import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.helix.HelixException;
 import org.apache.helix.manager.zk.zookeeper.ZkEventThread;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMBeanProvider;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.SimpleDynamicMetric;
 
-public class ZkClientMonitor implements ZkClientMonitorMBean {
+public class ZkClientMonitor extends DynamicMBeanProvider {
   public static final String MONITOR_TYPE = "Type";
   public static final String MONITOR_KEY = "Key";
+  protected static final String MBEAN_DESCRIPTION = "Helix Zookeeper Client Monitor";
 
   public enum AccessType {
-    READ,
-    WRITE
+    READ, WRITE
   }
 
-  private ObjectName _objectName;
   private String _sensorName;
+  private String _monitorType;
+  private String _monitorKey;
+  private String _monitorInstanceName;
+  private boolean _monitorRootOnly;
 
-  private long _stateChangeEventCounter;
-  private long _dataChangeEventCounter;
-  private ZkEventThread _zkEventThread;
+  private SimpleDynamicMetric<Long> _stateChangeEventCounter;
+  private SimpleDynamicMetric<Long> _dataChangeEventCounter;
+  private SimpleDynamicMetric<Long> _outstandingRequestGauge;
+
+  private ZkThreadMetric _zkEventThreadMetric;
 
   private Map<ZkClientPathMonitor.PredefinedPath, ZkClientPathMonitor> _zkClientPathMonitorMap =
       new ConcurrentHashMap<>();
 
   public ZkClientMonitor(String monitorType, String monitorKey, String monitorInstanceName,
-      boolean monitorRootPathOnly) throws JMException {
+      boolean monitorRootOnly, ZkEventThread zkEventThread) {
     if (monitorKey == null || monitorKey.isEmpty() || monitorType == null || monitorType
         .isEmpty()) {
       throw new HelixException("Cannot create ZkClientMonitor without monitor key and type.");
@@ -56,24 +69,19 @@ public class ZkClientMonitor implements ZkClientMonitorMBean {
 
     _sensorName =
         String.format("%s.%s.%s", MonitorDomainNames.HelixZkClient.name(), monitorType, monitorKey);
-
-    _objectName =
-        MBeanRegistrar.register(this, getObjectName(monitorType, monitorKey, monitorInstanceName));
-
-    for (ZkClientPathMonitor.PredefinedPath path : ZkClientPathMonitor.PredefinedPath.values()) {
-      // If monitor root path only, check if the current path is Root.
-      // Otherwise, add monitors for every path.
-      if (!monitorRootPathOnly || path.equals(ZkClientPathMonitor.PredefinedPath.Root)) {
-        _zkClientPathMonitorMap.put(path,
-            new ZkClientPathMonitor(path, monitorType, monitorKey, monitorInstanceName).register());
-      }
+    _monitorType = monitorType;
+    _monitorKey = monitorKey;
+    _monitorInstanceName = monitorInstanceName;
+    _monitorRootOnly = monitorRootOnly;
+
+    _stateChangeEventCounter = new SimpleDynamicMetric("StateChangeEventCounter", 0l);
+    _dataChangeEventCounter = new SimpleDynamicMetric("DataChangeEventCounter", 0l);
+    _outstandingRequestGauge = new SimpleDynamicMetric("OutstandingRequestGauge", 0l);
+    if (zkEventThread != null) {
+      _zkEventThreadMetric = new ZkThreadMetric(zkEventThread);
     }
   }
 
-  public void setZkEventThread(ZkEventThread zkEventThread) {
-    _zkEventThread = zkEventThread;
-  }
-
   protected static ObjectName getObjectName(String monitorType, String monitorKey,
       String monitorInstanceName) throws MalformedObjectNameException {
     return MBeanRegistrar
@@ -82,11 +90,34 @@ public class ZkClientMonitor implements ZkClientMonitorMBean {
             (monitorKey + (monitorInstanceName == null ? "" : "." + monitorInstanceName)));
   }
 
+  @Override
+  public DynamicMBeanProvider register() throws JMException {
+    List<DynamicMetric<?, ?>> attributeList = new ArrayList<>();
+    attributeList.add(_dataChangeEventCounter);
+    attributeList.add(_outstandingRequestGauge);
+    attributeList.add(_stateChangeEventCounter);
+    if (_zkEventThreadMetric != null) {
+      attributeList.add(_zkEventThreadMetric);
+    }
+    doRegister(attributeList, MBEAN_DESCRIPTION,
+        getObjectName(_monitorType, _monitorKey, _monitorInstanceName));
+    for (ZkClientPathMonitor.PredefinedPath path : ZkClientPathMonitor.PredefinedPath.values()) {
+      // If monitor root path only, check if the current path is Root.
+      // Otherwise, add monitors for every path.
+      if (!_monitorRootOnly || path.equals(ZkClientPathMonitor.PredefinedPath.Root)) {
+        _zkClientPathMonitorMap.put(path,
+            new ZkClientPathMonitor(path, _monitorType, _monitorKey, _monitorInstanceName)
+                .register());
+      }
+    }
+    return this;
+  }
+
   /**
    * After unregistered, the MBean can't be registered again, a new monitor has be to created.
    */
   public void unregister() {
-    MBeanRegistrar.unregister(_objectName);
+    super.unregister();
     for (ZkClientPathMonitor zkClientPathMonitor : _zkClientPathMonitorMap.values()) {
       zkClientPathMonitor.unregister();
     }
@@ -98,48 +129,27 @@ public class ZkClientMonitor implements ZkClientMonitorMBean {
   }
 
   public void increaseStateChangeEventCounter() {
-    _stateChangeEventCounter++;
-  }
-
-  @Override
-  public long getStateChangeEventCounter() {
-    return _stateChangeEventCounter;
+    synchronized (_stateChangeEventCounter) {
+      _stateChangeEventCounter.updateValue(_stateChangeEventCounter.getValue() + 1);
+    }
   }
 
   public void increaseDataChangeEventCounter() {
-    _dataChangeEventCounter++;
-  }
-
-  @Override
-  public long getDataChangeEventCounter() {
-    return _dataChangeEventCounter;
-  }
-
-  @Override
-  public long getPendingCallbackGauge() {
-    if (_zkEventThread != null) {
-      return _zkEventThread.getPendingEventsCount();
+    synchronized (_dataChangeEventCounter) {
+      _dataChangeEventCounter.updateValue(_dataChangeEventCounter.getValue() + 1);
     }
-
-    return -1;
   }
 
-  @Override
-  public long getTotalCallbackCounter() {
-    if (_zkEventThread != null) {
-      return _zkEventThread.getTotalEventCount();
+  public void increaseOutstandingRequestGauge() {
+    synchronized (_outstandingRequestGauge) {
+      _outstandingRequestGauge.updateValue(_outstandingRequestGauge.getValue() + 1);
     }
-
-    return -1;
   }
 
-  @Override
-  public long getTotalCallbackHandledCounter() {
-    if (_zkEventThread != null) {
-      return _zkEventThread.getTotalHandledEventCount();
+  public void decreaseOutstandingRequestGauge() {
+    synchronized (_outstandingRequestGauge) {
+      _outstandingRequestGauge.updateValue(_outstandingRequestGauge.getValue() - 1);
     }
-
-    return -1;
   }
 
   private void record(String path, int bytes, long latencyMilliSec, boolean isFailure,
@@ -163,7 +173,6 @@ public class ZkClientMonitor implements ZkClientMonitorMBean {
     case WRITE:
       record(path, dataSize, System.currentTimeMillis() - startTimeMilliSec, false, false);
       return;
-
     default:
       return;
     }
@@ -181,4 +190,43 @@ public class ZkClientMonitor implements ZkClientMonitorMBean {
       return;
     }
   }
+
+  class ZkThreadMetric extends DynamicMetric<ZkEventThread, ZkEventThread> {
+    public ZkThreadMetric(ZkEventThread eventThread) {
+      super("ZkEventThead", eventThread);
+    }
+
+    @Override
+    protected Set<MBeanAttributeInfo> generateAttributeInfos(String metricName,
+        ZkEventThread eventThread) {
+      Set<MBeanAttributeInfo> attributeInfoSet = new HashSet<>();
+      attributeInfoSet.add(new MBeanAttributeInfo("PendingCallbackGauge", Long.TYPE.getName(),
+          DEFAULT_ATTRIBUTE_DESCRIPTION, true, false, false));
+      attributeInfoSet.add(new MBeanAttributeInfo("TotalCallbackCounter", Long.TYPE.getName(),
+          DEFAULT_ATTRIBUTE_DESCRIPTION, true, false, false));
+      attributeInfoSet.add(
+          new MBeanAttributeInfo("TotalCallbackHandledCounter", Long.TYPE.getName(),
+              DEFAULT_ATTRIBUTE_DESCRIPTION, true, false, false));
+      return attributeInfoSet;
+    }
+
+    @Override
+    public Object getAttributeValue(String attributeName) {
+      switch (attributeName) {
+      case "PendingCallbackGauge":
+        return getMetricObject().getPendingEventsCount();
+      case "TotalCallbackCounter":
+        return getMetricObject().getTotalEventCount();
+      case "TotalCallbackHandledCounter":
+        return getMetricObject().getTotalHandledEventCount();
+      default:
+        throw new HelixException("Unknown attribute name: " + attributeName);
+      }
+    }
+
+    @Override
+    public void updateValue(ZkEventThread newEventThread) {
+      setMetricObject(newEventThread);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/3cf29010/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientMonitorMBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientMonitorMBean.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientMonitorMBean.java
deleted file mode 100644
index 67eaecb..0000000
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientMonitorMBean.java
+++ /dev/null
@@ -1,30 +0,0 @@
-package org.apache.helix.monitoring.mbeans;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import org.apache.helix.monitoring.SensorNameProvider;
-
-public interface ZkClientMonitorMBean extends SensorNameProvider {
-  long getStateChangeEventCounter();
-  long getDataChangeEventCounter();
-  long getPendingCallbackGauge();
-  long getTotalCallbackCounter();
-  long getTotalCallbackHandledCounter();
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/3cf29010/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientPathMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientPathMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientPathMonitor.java
index 2c06eb0..bc294e3 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientPathMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ZkClientPathMonitor.java
@@ -19,23 +19,21 @@ package org.apache.helix.monitoring.mbeans;
  * under the License.
  */
 
+import javax.management.JMException;
+import javax.management.ObjectName;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
 import com.codahale.metrics.Histogram;
 import com.codahale.metrics.SlidingTimeWindowArrayReservoir;
-import java.util.concurrent.TimeUnit;
 import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMBeanProvider;
 import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric;
 import org.apache.helix.monitoring.mbeans.dynamicMBeans.HistogramDynamicMetric;
 import org.apache.helix.monitoring.mbeans.dynamicMBeans.SimpleDynamicMetric;
 
-import javax.management.JMException;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-import java.util.ArrayList;
-import java.util.List;
-
 public class ZkClientPathMonitor extends DynamicMBeanProvider {
   public static final String MONITOR_PATH = "PATH";
-  private static final String MBEAN_DESCRIPTION = "Helix Zookeeper Client Monitor";
   private final String _sensorName;
   private final String _type;
   private final String _key;
@@ -128,15 +126,16 @@ public class ZkClientPathMonitor extends DynamicMBeanProvider {
     attributeList.add(_readBytesGauge);
     attributeList.add(_writeBytesGauge);
 
-    ObjectName objectName = new ObjectName(String.format("%s,%s=%s",
-        ZkClientMonitor.getObjectName(_type, _key, _instanceName).toString(),
-        MONITOR_PATH, _path.name()));
-    doRegister(attributeList, MBEAN_DESCRIPTION, objectName);
+    ObjectName objectName = new ObjectName(String
+        .format("%s,%s=%s", ZkClientMonitor.getObjectName(_type, _key, _instanceName).toString(),
+            MONITOR_PATH, _path.name()));
+    doRegister(attributeList, ZkClientMonitor.MBEAN_DESCRIPTION, objectName);
 
     return this;
   }
 
-  protected void record(int bytes, long latencyMilliSec, boolean isFailure, boolean isRead) {
+  protected synchronized void record(int bytes, long latencyMilliSec, boolean isFailure,
+      boolean isRead) {
     if (isFailure) {
       increaseFailureCounter(isRead);
     } else {

http://git-wip-us.apache.org/repos/asf/helix/blob/3cf29010/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java
index d0cf004..7b232fe 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestRawZkClient.java
@@ -20,6 +20,9 @@ package org.apache.helix.manager.zk;
  */
 
 import java.lang.management.ManagementFactory;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.Lock;
@@ -29,7 +32,9 @@ import javax.management.ObjectName;
 import org.I0Itec.zkclient.IZkDataListener;
 import org.I0Itec.zkclient.IZkStateListener;
 import org.I0Itec.zkclient.ZkConnection;
+import org.I0Itec.zkclient.ZkServer;
 import org.apache.helix.HelixException;
+import org.apache.helix.TestHelper;
 import org.apache.helix.ZNRecord;
 import org.apache.helix.ZkUnitTestBase;
 import org.apache.helix.monitoring.mbeans.MBeanRegistrar;
@@ -53,6 +58,12 @@ import org.testng.annotations.Test;
 public class TestRawZkClient extends ZkUnitTestBase {
   private static Logger LOG = LoggerFactory.getLogger(TestRawZkClient.class);
 
+  private final String TEST_TAG = "test_monitor";
+  private final String TEST_DATA = "testData";
+  private final String TEST_ROOT = "/my_cluster/IDEALSTATES";
+  private final String TEST_NODE = "/test_zkclient_monitor";
+  private final String TEST_PATH = TEST_ROOT + TEST_NODE;
+
   ZkClient _zkClient;
 
   @BeforeClass
@@ -129,21 +140,9 @@ public class TestRawZkClient extends ZkUnitTestBase {
     System.out.println("After session expiry sessionId= " + zookeeper.getSessionId());
   }
 
-  @Test(expectedExceptions = HelixException.class, expectedExceptionsMessageRegExp = "Data size larger than 1M.*")
-  void testDataSizeLimit() {
-    ZNRecord data = new ZNRecord(new String(new char[1024 * 1024]));
-    _zkClient.writeData("/test", data, -1);
-  }
-
   @Test
   public void testZkClientMonitor() throws Exception {
-    final String TEST_TAG = "test_monitor";
-    final String TEST_KEY = "test_key";
-    final String TEST_DATA = "testData";
-    final String TEST_ROOT = "/my_cluster/IDEALSTATES";
-    final String TEST_NODE = "/test_zkclient_monitor";
-    final String TEST_PATH = TEST_ROOT + TEST_NODE;
-
+    final String TEST_KEY = "testZkClientMonitor";
     ZkClient.Builder builder = new ZkClient.Builder();
     builder.setZkServer(ZK_ADDR).setMonitorKey(TEST_KEY).setMonitorType(TEST_TAG)
         .setMonitorRootPathOnly(false);
@@ -174,6 +173,11 @@ public class TestRawZkClient extends ZkUnitTestBase {
     Assert.assertTrue(beanServer.isRegistered(rootname));
     Assert.assertTrue(beanServer.isRegistered(idealStatename));
 
+    Assert.assertEquals((long) beanServer.getAttribute(name, "DataChangeEventCounter"), 0);
+    Assert.assertEquals((long) beanServer.getAttribute(name, "StateChangeEventCounter"), 0);
+    Assert.assertEquals((long) beanServer.getAttribute(name, "OutstandingRequestGauge"), 0);
+    Assert.assertEquals((long) beanServer.getAttribute(name, "TotalCallbackCounter"), 0);
+
     // Test exists
     Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadCounter"), 0);
     Assert.assertEquals((long) beanServer.getAttribute(rootname, "ReadTotalLatencyCounter"), 0);
@@ -290,5 +294,62 @@ public class TestRawZkClient extends ZkUnitTestBase {
     _zkClient.delete(TEST_PATH);
     Assert.assertTrue(callbackFinish.await(10, TimeUnit.SECONDS));
     Assert.assertEquals((long) beanServer.getAttribute(name, "DataChangeEventCounter"), 1);
+    Assert.assertEquals((long) beanServer.getAttribute(name, "OutstandingRequestGauge"), 0);
+    Assert.assertEquals((long) beanServer.getAttribute(name, "TotalCallbackCounter"), 1);
+    Assert.assertEquals((long) beanServer.getAttribute(name, "TotalCallbackHandledCounter"), 1);
+    Assert.assertEquals((long) beanServer.getAttribute(name, "PendingCallbackGauge"), 0);
+  }
+
+  @Test(dependsOnMethods = "testZkClientMonitor")
+  void testPendingRequestGauge() throws Exception {
+    final String TEST_KEY = "testPendingRequestGauge";
+
+    final MBeanServer beanServer = ManagementFactory.getPlatformMBeanServer();
+    final ObjectName name = MBeanRegistrar
+        .buildObjectName(MonitorDomainNames.HelixZkClient.name(), ZkClientMonitor.MONITOR_TYPE,
+            TEST_TAG, ZkClientMonitor.MONITOR_KEY, TEST_KEY);
+
+    final int zkPort = TestHelper.getRandomPort();
+    final String zkAddr = String.format("localhost:%d", zkPort);
+    final ZkServer zkServer = TestHelper.startZkServer(zkAddr);
+
+    try {
+      ZkClient.Builder builder = new ZkClient.Builder();
+      builder.setZkServer(zkAddr).setMonitorKey(TEST_KEY).setMonitorType(TEST_TAG).setMonitorRootPathOnly(true);
+      final ZkClient zkClient = builder.build();
+
+      zkServer.shutdown();
+      zkClient.waitForKeeperState(KeeperState.Disconnected, 5000, TimeUnit.MILLISECONDS);
+      Assert.assertFalse(zkClient.waitUntilConnected(0, TimeUnit.MILLISECONDS));
+
+      Assert.assertEquals((long) beanServer.getAttribute(name, "OutstandingRequestGauge"), 0);
+
+      // Request a read in a separate thread. This will be a pending request
+      ExecutorService executorService = Executors.newSingleThreadExecutor();
+      executorService.submit(new Runnable() {
+        @Override
+        public void run() {
+          zkClient.exists(TEST_ROOT);
+        }
+      });
+      Assert.assertTrue(TestHelper.verify(new TestHelper.Verifier() {
+        @Override
+        public boolean verify() throws Exception {
+          return (long) beanServer.getAttribute(name, "OutstandingRequestGauge") == 1;
+        }
+      }, 1000));
+
+      zkServer.start();
+      Assert.assertTrue(zkClient.waitUntilConnected(5000, TimeUnit.MILLISECONDS));
+      Assert.assertTrue(TestHelper.verify(new TestHelper.Verifier() {
+        @Override
+        public boolean verify() throws Exception {
+          return (long) beanServer.getAttribute(name, "OutstandingRequestGauge") == 0;
+        }
+      }, 2000));
+      zkClient.close();
+    } finally {
+      zkServer.shutdown();
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/3cf29010/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestZkClientMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestZkClientMonitor.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestZkClientMonitor.java
index 4d37edd..988ef60 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestZkClientMonitor.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestZkClientMonitor.java
@@ -20,15 +20,13 @@ package org.apache.helix.monitoring.mbeans;
  */
 
 import java.lang.management.ManagementFactory;
-import javax.management.JMException;
-import javax.management.MBeanServer;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
+import javax.management.*;
+
+import org.apache.helix.manager.zk.zookeeper.ZkEventThread;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
 public class TestZkClientMonitor {
-
   private MBeanServer _beanServer = ManagementFactory.getPlatformMBeanServer();
 
   private ObjectName buildObjectName(String tag, String key, String instance) throws MalformedObjectNameException {
@@ -58,7 +56,9 @@ public class TestZkClientMonitor {
     final String TEST_TAG_1 = "test_tag_1";
     final String TEST_KEY_1 = "test_key_1";
 
-    ZkClientMonitor monitor = new ZkClientMonitor(TEST_TAG_1, TEST_KEY_1, null, true);
+    ZkClientMonitor monitor = new ZkClientMonitor(TEST_TAG_1, TEST_KEY_1, null, true, null);
+    Assert.assertFalse(_beanServer.isRegistered(buildObjectName(TEST_TAG_1, TEST_KEY_1, null)));
+    monitor.register();
     Assert.assertTrue(_beanServer.isRegistered(buildObjectName(TEST_TAG_1, TEST_KEY_1, null)));
 
     // no per-path monitor items created since "monitorRootPathOnly" = true
@@ -66,7 +66,8 @@ public class TestZkClientMonitor {
         buildPathMonitorObjectName(TEST_TAG_1, TEST_KEY_1, null,
             ZkClientPathMonitor.PredefinedPath.IdealStates.name())));
 
-    ZkClientMonitor monitorDuplicate = new ZkClientMonitor(TEST_TAG_1, TEST_KEY_1, null, true);
+    ZkClientMonitor monitorDuplicate = new ZkClientMonitor(TEST_TAG_1, TEST_KEY_1, null, true, null);
+    monitorDuplicate.register();
     Assert.assertTrue(_beanServer.isRegistered(buildObjectName(TEST_TAG_1, TEST_KEY_1, null, 1)));
 
     monitor.unregister();
@@ -82,7 +83,9 @@ public class TestZkClientMonitor {
     final String TEST_KEY = "test_key_3";
     final String TEST_INSTANCE = "test_instance_3";
 
-    ZkClientMonitor monitor = new ZkClientMonitor(TEST_TAG, TEST_KEY, TEST_INSTANCE, false);
+    ZkClientMonitor monitor = new ZkClientMonitor(TEST_TAG, TEST_KEY, TEST_INSTANCE, false, null);
+    monitor.register();
+
     ObjectName name = buildObjectName(TEST_TAG, TEST_KEY, TEST_INSTANCE);
     ObjectName rootName = buildPathMonitorObjectName(TEST_TAG, TEST_KEY,
         TEST_INSTANCE, ZkClientPathMonitor.PredefinedPath.Root.name());
@@ -97,6 +100,20 @@ public class TestZkClientMonitor {
     long eventCount = (long) _beanServer.getAttribute(name, "DataChangeEventCounter");
     Assert.assertEquals(eventCount, 1);
 
+    monitor.increaseStateChangeEventCounter();
+    long stateChangeCount = (long) _beanServer.getAttribute(name, "StateChangeEventCounter");
+    Assert.assertEquals(stateChangeCount, 1);
+
+    monitor.increaseOutstandingRequestGauge();
+    long requestGauge = (long) _beanServer.getAttribute(name, "OutstandingRequestGauge");
+    Assert.assertEquals(requestGauge, 1);
+
+    monitor.decreaseOutstandingRequestGauge();
+    requestGauge = (long) _beanServer.getAttribute(name, "OutstandingRequestGauge");
+    Assert.assertEquals(requestGauge, 0);
+
+    Assert.assertNull(_beanServer.getAttribute(name, "PendingCallbackGauge"));
+
     monitor.record("TEST/IDEALSTATES/myResource", 0, System.currentTimeMillis() - 10,
         ZkClientMonitor.AccessType.READ);
     Assert.assertEquals((long) _beanServer.getAttribute(rootName, "ReadCounter"), 1);