You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ji...@apache.org on 2018/07/17 19:06:09 UTC

[1/4] helix git commit: Refine RoutingTable refresh() logic.

Repository: helix
Updated Branches:
  refs/heads/master 3ba447f97 -> 993beb383


Refine RoutingTable refresh() logic.

Simplify the construtors.
Refine Log string to a clearer statement.
Fixing a potential bug that empty instanceConfig is recorded in the RoutingTable.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/acea2f16
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/acea2f16
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/acea2f16

Branch: refs/heads/master
Commit: acea2f16bce6c439fdc8d1dfb8ad37650679f822
Parents: 3ba447f
Author: Jiajun Wang <jj...@linkedin.com>
Authored: Fri Jun 29 15:07:53 2018 -0700
Committer: jiajunwang <er...@gmail.com>
Committed: Tue Jul 17 11:52:25 2018 -0700

----------------------------------------------------------------------
 .../apache/helix/spectator/RoutingTable.java    | 86 ++++++++++----------
 .../spectator/TestRoutingTableProvider.java     | 70 ++++++++++------
 2 files changed, 88 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/acea2f16/helix-core/src/main/java/org/apache/helix/spectator/RoutingTable.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTable.java b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTable.java
index e3a3349..115131b 100644
--- a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTable.java
+++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTable.java
@@ -47,6 +47,7 @@ class RoutingTable {
   private final Map<String, ResourceInfo> _resourceInfoMap;
   // mapping a resource group name to a resourceGroupInfo
   private final Map<String, ResourceGroupInfo> _resourceGroupInfoMap;
+
   private final Collection<LiveInstance> _liveInstances;
   private final Collection<InstanceConfig> _instanceConfigs;
   private final Collection<ExternalView> _externalViews;
@@ -56,32 +57,29 @@ class RoutingTable {
         Collections.<LiveInstance>emptyList());
   }
 
-  public RoutingTable(Collection<ExternalView> externalViews, Collection<InstanceConfig> instanceConfigs,
-      Collection<LiveInstance> liveInstances) {
-    _externalViews = externalViews;
-    _resourceInfoMap = new HashMap<>();
-    _resourceGroupInfoMap = new HashMap<>();
-    _liveInstances = new HashSet<>(liveInstances);
-    _instanceConfigs = new HashSet<>(instanceConfigs);
-    refresh(externalViews);
+  public RoutingTable(Map<String, Map<String, Map<String, CurrentState>>> currentStateMap,
+      Collection<InstanceConfig> instanceConfigs, Collection<LiveInstance> liveInstances) {
+    // TODO Aggregate currentState to an ExternalView in the RoutingTable, so there is no need to refresh according to the currentStateMap. - jjwang
+    this(Collections.<ExternalView>emptyList(), instanceConfigs, liveInstances);
+    refresh(currentStateMap);
   }
 
-  public RoutingTable(Map<String, Map<String, Map<String, CurrentState>>> currentStateMap,
+  public RoutingTable(Collection<ExternalView> externalViews,
       Collection<InstanceConfig> instanceConfigs, Collection<LiveInstance> liveInstances) {
-    _externalViews = Collections.emptyList();
     _resourceInfoMap = new HashMap<>();
     _resourceGroupInfoMap = new HashMap<>();
-    _liveInstances = liveInstances;
-    _instanceConfigs = instanceConfigs;
-    refresh(currentStateMap);
+    _liveInstances = new HashSet<>(liveInstances);
+    _instanceConfigs = new HashSet<>(instanceConfigs);
+    _externalViews = new HashSet<>(externalViews);
+    refresh(externalViews);
   }
 
   private void refresh(Collection<ExternalView> externalViewList) {
     Map<String, InstanceConfig> instanceConfigMap = new HashMap<>();
-    for (InstanceConfig config : _instanceConfigs) {
-      instanceConfigMap.put(config.getId(), config);
-    }
-    if (externalViewList != null) {
+    if (externalViewList != null && !externalViewList.isEmpty()) {
+      for (InstanceConfig config : _instanceConfigs) {
+        instanceConfigMap.put(config.getId(), config);
+      }
       for (ExternalView extView : externalViewList) {
         String resourceName = extView.getId();
         for (String partitionName : extView.getPartitionSet()) {
@@ -97,8 +95,10 @@ class RoutingTable {
                 addEntry(resourceName, partitionName, currentState, instanceConfig);
               }
             } else {
-              logger.error("Invalid instance name. " + instanceName
-                  + " .Not found in /cluster/configs/. instanceName: ");
+              logger.warn(
+                  "Participant {} is not found with proper configuration information. It might already be removed from the cluster. "
+                      + "Skip recording partition assignment entry: Partition {}, Participant {}, State {}.",
+                  instanceName, partitionName, instanceName, stateMap.get(instanceName));
             }
           }
         }
@@ -108,32 +108,36 @@ class RoutingTable {
 
   private void refresh(Map<String, Map<String, Map<String, CurrentState>>> currentStateMap) {
     Map<String, InstanceConfig> instanceConfigMap = new HashMap<>();
-    for (InstanceConfig config : _instanceConfigs) {
-      instanceConfigMap.put(config.getId(), config);
-    }
-
-    for (LiveInstance liveInstance : _liveInstances) {
-      String instanceName = liveInstance.getInstanceName();
-      String sessionId = liveInstance.getSessionId();
-      InstanceConfig instanceConfig = instanceConfigMap.get(instanceName);
-      if (instanceConfig == null) {
-        logger.error("Invalid instance name. " + instanceName
-            + " .Not found in /cluster/configs/. instanceName: ");
+    if (currentStateMap != null && !currentStateMap.isEmpty()) {
+      for (InstanceConfig config : _instanceConfigs) {
+        instanceConfigMap.put(config.getId(), config);
       }
+      for (LiveInstance liveInstance : _liveInstances) {
+        String instanceName = liveInstance.getInstanceName();
+        String sessionId = liveInstance.getSessionId();
+        InstanceConfig instanceConfig = instanceConfigMap.get(instanceName);
+        if (instanceConfig == null) {
+          logger.warn(
+              "Participant {} is not found with proper configuration information. It might already be removed from the cluster. "
+                  + "Skip recording partition assignments that are related to this instance.",
+              instanceName);
+          continue;
+        }
 
-      Map<String, CurrentState> currentStates = Collections.emptyMap();
-      if (currentStateMap.containsKey(instanceName) && currentStateMap.get(instanceName)
-          .containsKey(sessionId)) {
-        currentStates = currentStateMap.get(instanceName).get(sessionId);
-      }
+        Map<String, CurrentState> currentStates = Collections.emptyMap();
+        if (currentStateMap.containsKey(instanceName) && currentStateMap.get(instanceName)
+            .containsKey(sessionId)) {
+          currentStates = currentStateMap.get(instanceName).get(sessionId);
+        }
 
-      for (CurrentState currentState : currentStates.values()) {
-        String resourceName = currentState.getResourceName();
-        Map<String, String> stateMap = currentState.getPartitionStateMap();
+        for (CurrentState currentState : currentStates.values()) {
+          String resourceName = currentState.getResourceName();
+          Map<String, String> stateMap = currentState.getPartitionStateMap();
 
-        for (String partitionName : stateMap.keySet()) {
-          String state = stateMap.get(partitionName);
-          addEntry(resourceName, partitionName, state, instanceConfig);
+          for (String partitionName : stateMap.keySet()) {
+            String state = stateMap.get(partitionName);
+            addEntry(resourceName, partitionName, state, instanceConfig);
+          }
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/acea2f16/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProvider.java b/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProvider.java
index dbe621c..01fa6df 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProvider.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProvider.java
@@ -11,6 +11,7 @@ import java.util.Set;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyType;
 import org.apache.helix.api.listeners.RoutingTableChangeListener;
 import org.apache.helix.common.ZkTestBase;
 import org.apache.helix.integration.manager.ClusterControllerManager;
@@ -41,12 +42,13 @@ public class TestRoutingTableProvider extends ZkTestBase {
   static final int REPLICA_NUMBER = 3;
 
   private HelixManager _spectator;
-  private List<MockParticipantManager> _participants = new ArrayList<MockParticipantManager>();
+  private List<MockParticipantManager> _participants = new ArrayList<>();
   private List<String> _instances = new ArrayList<>();
   private ClusterControllerManager _controller;
   private ZkHelixClusterVerifier _clusterVerifier;
-  private RoutingTableProvider _routingTableProvider;
-  private RoutingTableProvider _routingTableProvider2;
+  private RoutingTableProvider _routingTableProvider_default;
+  private RoutingTableProvider _routingTableProvider_ev;
+  private RoutingTableProvider _routingTableProvider_cs;
   private boolean _listenerTestResult = true;
 
   class MockRoutingTableChangeListener implements RoutingTableChangeListener {
@@ -101,15 +103,16 @@ public class TestRoutingTableProvider extends ZkTestBase {
     _controller.syncStart();
 
     // start speculator
-    _routingTableProvider = new RoutingTableProvider();
+    _routingTableProvider_default = new RoutingTableProvider();
     _spectator = HelixManagerFactory
         .getZKHelixManager(CLUSTER_NAME, "spectator", InstanceType.SPECTATOR, ZK_ADDR);
     _spectator.connect();
-    _spectator.addExternalViewChangeListener(_routingTableProvider);
-    _spectator.addLiveInstanceChangeListener(_routingTableProvider);
-    _spectator.addInstanceConfigChangeListener(_routingTableProvider);
+    _spectator.addExternalViewChangeListener(_routingTableProvider_default);
+    _spectator.addLiveInstanceChangeListener(_routingTableProvider_default);
+    _spectator.addInstanceConfigChangeListener(_routingTableProvider_default);
 
-    _routingTableProvider2 = new RoutingTableProvider(_spectator);
+    _routingTableProvider_ev = new RoutingTableProvider(_spectator);
+    _routingTableProvider_cs = new RoutingTableProvider(_spectator, PropertyType.CURRENTSTATES);
 
     _clusterVerifier = new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkClient(_gZkClient).build();
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
@@ -122,26 +125,31 @@ public class TestRoutingTableProvider extends ZkTestBase {
       p.syncStop();
     }
     _controller.syncStop();
-    _routingTableProvider.shutdown();
-    _routingTableProvider2.shutdown();
+    _routingTableProvider_default.shutdown();
+    _routingTableProvider_ev.shutdown();
     _spectator.disconnect();
     deleteCluster(CLUSTER_NAME);
   }
 
   @Test
   public void testRoutingTable() {
-    Assert.assertEquals(_routingTableProvider.getLiveInstances().size(), _instances.size());
-    Assert.assertEquals(_routingTableProvider.getInstanceConfigs().size(), _instances.size());
+    Assert.assertEquals(_routingTableProvider_default.getLiveInstances().size(), _instances.size());
+    Assert.assertEquals(_routingTableProvider_default.getInstanceConfigs().size(), _instances.size());
 
-    Assert.assertEquals(_routingTableProvider2.getLiveInstances().size(), _instances.size());
-    Assert.assertEquals(_routingTableProvider2.getInstanceConfigs().size(), _instances.size());
+    Assert.assertEquals(_routingTableProvider_ev.getLiveInstances().size(), _instances.size());
+    Assert.assertEquals(_routingTableProvider_ev.getInstanceConfigs().size(), _instances.size());
 
-    validateRoutingTable(_routingTableProvider, Sets.newSet(_instances.get(0)),
+    Assert.assertEquals(_routingTableProvider_cs.getLiveInstances().size(), _instances.size());
+    Assert.assertEquals(_routingTableProvider_cs.getInstanceConfigs().size(), _instances.size());
+
+    validateRoutingTable(_routingTableProvider_default, Sets.newSet(_instances.get(0)),
+        Sets.newSet(_instances.get(1), _instances.get(2)));
+    validateRoutingTable(_routingTableProvider_ev, Sets.newSet(_instances.get(0)),
         Sets.newSet(_instances.get(1), _instances.get(2)));
-    validateRoutingTable(_routingTableProvider2, Sets.newSet(_instances.get(0)),
+    validateRoutingTable(_routingTableProvider_cs, Sets.newSet(_instances.get(0)),
         Sets.newSet(_instances.get(1), _instances.get(2)));
 
-    Collection<String> databases = _routingTableProvider.getResources();
+    Collection<String> databases = _routingTableProvider_default.getResources();
     Assert.assertEquals(databases.size(), 1);
   }
 
@@ -152,9 +160,11 @@ public class TestRoutingTableProvider extends ZkTestBase {
     _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, prevMasterInstance, false);
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
 
-    validateRoutingTable(_routingTableProvider, Sets.newSet(_instances.get(1)),
+    validateRoutingTable(_routingTableProvider_default, Sets.newSet(_instances.get(1)),
+        Sets.newSet(_instances.get(2)));
+    validateRoutingTable(_routingTableProvider_ev, Sets.newSet(_instances.get(1)),
         Sets.newSet(_instances.get(2)));
-    validateRoutingTable(_routingTableProvider2, Sets.newSet(_instances.get(1)),
+    validateRoutingTable(_routingTableProvider_cs, Sets.newSet(_instances.get(1)),
         Sets.newSet(_instances.get(2)));
   }
 
@@ -165,8 +175,9 @@ public class TestRoutingTableProvider extends ZkTestBase {
     Map<String, Set<String>> context = new HashMap<>();
     context.put("MASTER", Sets.newSet(_instances.get(0)));
     context.put("SLAVE", Sets.newSet(_instances.get(1), _instances.get(2)));
-    _routingTableProvider.addRoutingTableChangeListener(routingTableChangeListener, context);
-    _routingTableProvider.addRoutingTableChangeListener(new MockRoutingTableChangeListener(), null);
+    _routingTableProvider_default.addRoutingTableChangeListener(routingTableChangeListener, context);
+    _routingTableProvider_default
+        .addRoutingTableChangeListener(new MockRoutingTableChangeListener(), null);
     // reenable the master instance to cause change
     String prevMasterInstance = _instances.get(0);
     _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, prevMasterInstance, true);
@@ -182,15 +193,20 @@ public class TestRoutingTableProvider extends ZkTestBase {
 
     Assert.assertTrue(_clusterVerifier.verifyByPolling());
 
-    Assert.assertEquals(_routingTableProvider.getLiveInstances().size(), _instances.size() - 1);
-    Assert.assertEquals(_routingTableProvider.getInstanceConfigs().size(), _instances.size());
+    Assert.assertEquals(_routingTableProvider_default.getLiveInstances().size(), _instances.size() - 1);
+    Assert.assertEquals(_routingTableProvider_default.getInstanceConfigs().size(), _instances.size());
 
-    Assert.assertEquals(_routingTableProvider2.getLiveInstances().size(), _instances.size() - 1);
-    Assert.assertEquals(_routingTableProvider2.getInstanceConfigs().size(), _instances.size());
+    Assert.assertEquals(_routingTableProvider_ev.getLiveInstances().size(), _instances.size() - 1);
+    Assert.assertEquals(_routingTableProvider_ev.getInstanceConfigs().size(), _instances.size());
 
-    validateRoutingTable(_routingTableProvider, Sets.newSet(_instances.get(0)),
+    Assert.assertEquals(_routingTableProvider_cs.getLiveInstances().size(), _instances.size() - 1);
+    Assert.assertEquals(_routingTableProvider_cs.getInstanceConfigs().size(), _instances.size());
+
+    validateRoutingTable(_routingTableProvider_default, Sets.newSet(_instances.get(0)),
+        Sets.newSet(_instances.get(2)));
+    validateRoutingTable(_routingTableProvider_ev, Sets.newSet(_instances.get(0)),
         Sets.newSet(_instances.get(2)));
-    validateRoutingTable(_routingTableProvider2, Sets.newSet(_instances.get(0)),
+    validateRoutingTable(_routingTableProvider_cs, Sets.newSet(_instances.get(0)),
         Sets.newSet(_instances.get(2)));
   }
 


[2/4] helix git commit: Fix 2 test bugs that fails mvn test.

Posted by ji...@apache.org.
Fix 2 test bugs that fails mvn test.

TestZkReconnect has a racing issue that the expected flag maybe set earlier. Change the test script to wait until the flag is configured.
BestPossibleExternalViewVerifier has an issue that when best possible state is not able to be calculated, the verifier always return false. The expected behavior is return true if the external view does not exist.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/964b802c
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/964b802c
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/964b802c

Branch: refs/heads/master
Commit: 964b802cfb5ea32bd1ff4f780c0efff97ceb64b3
Parents: acea2f1
Author: Jiajun Wang <jj...@linkedin.com>
Authored: Mon Jul 9 15:08:12 2018 -0700
Committer: jiajunwang <er...@gmail.com>
Committed: Tue Jul 17 11:52:50 2018 -0700

----------------------------------------------------------------------
 .../BestPossibleExternalViewVerifier.java            | 10 +++++-----
 .../org/apache/helix/manager/zk/TestZkReconnect.java | 15 ++++++++++++---
 2 files changed, 17 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/964b802c/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
index fa5d29f..6e73df6 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
@@ -82,7 +82,7 @@ public class BestPossibleExternalViewVerifier extends ZkHelixClusterVerifier {
     _expectLiveInstances = expectLiveInstances;
     _clusterDataCache = new ClusterDataCache();
   }
-  
+
   public static class Builder {
     private String _clusterName;
     private Map<String, Map<String, String>> _errStates;
@@ -260,15 +260,15 @@ public class BestPossibleExternalViewVerifier extends ZkHelixClusterVerifier {
       }
 
       for (String resourceName : idealStates.keySet()) {
-        ExternalView extView = extViews.get(resourceName);
         IdealState is = idealStates.get(resourceName);
+        ExternalView extView = extViews.get(resourceName);
         if (extView == null) {
           if (is.isExternalViewDisabled()) {
             continue;
-          } else {
-            LOG.error("externalView for " + resourceName + " is not available");
-            return false;
           }
+          LOG.warn("externalView for " + resourceName
+              + " is not available, check if best possible state is available.");
+          extView = new ExternalView(resourceName);
         }
 
         // step 0: remove empty map and DROPPED state from best possible state

http://git-wip-us.apache.org/repos/asf/helix/blob/964b802c/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkReconnect.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkReconnect.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkReconnect.java
index 303cf68..1f72948 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkReconnect.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkReconnect.java
@@ -109,13 +109,22 @@ public class TestZkReconnect {
       ZkHelixPropertyStore propertyStore = controller.getHelixPropertyStore();
       propertyStore.get("/", null, 0);
 
-      onConnectedFlag.set(false);
+      TestHelper.verify(new TestHelper.Verifier() {
+        @Override
+        public boolean verify() throws Exception {
+          return onConnectedFlag.getAndSet(false);
+        }
+      }, 1000);
 
       // Inject expire to test handler
       // onDisconnectedFlag should be set within onDisconnected handler
       controller.handleSessionEstablishmentError(new Exception("For testing"));
-      Thread.sleep(10);
-      Assert.assertTrue(onDisconnectedFlag.get());
+      TestHelper.verify(new TestHelper.Verifier() {
+        @Override
+        public boolean verify() throws Exception {
+          return onDisconnectedFlag.get();
+        }
+      }, 1000);
       Assert.assertFalse(onConnectedFlag.get());
       Assert.assertFalse(controller.isConnected());
 


[3/4] helix git commit: Fix the issue that resource MBean may not be cleaned up when the resource is dropped.

Posted by ji...@apache.org.
Fix the issue that resource MBean may not be cleaned up when the resource is dropped.

If a resource is not successfully created on any participant, and it is removed in this situation, the corresponding MBean maybe left over by the controller.
This fix will ensure all resource MBeans that are no longer related to any living resource to be removed.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/3deeeaba
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/3deeeaba
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/3deeeaba

Branch: refs/heads/master
Commit: 3deeeabaa988bf40c0ba953209dd6b26df984552
Parents: 964b802
Author: Jiajun Wang <jj...@linkedin.com>
Authored: Thu Jul 12 15:20:19 2018 -0700
Committer: jiajunwang <er...@gmail.com>
Committed: Tue Jul 17 11:53:26 2018 -0700

----------------------------------------------------------------------
 .../stages/ExternalViewComputeStage.java        | 71 ++++++++++-----
 .../monitoring/mbeans/ClusterStatusMonitor.java | 53 +++++++----
 .../mbeans/TestDropResourceMetricsReset.java    | 94 ++++++++++++++++----
 3 files changed, 160 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/3deeeaba/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
index bf7be01..d901327 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
@@ -19,21 +19,41 @@ package org.apache.helix.controller.stages;
  * under the License.
  */
 
-import org.apache.helix.*;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixDefinedState;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZNRecordDelta;
 import org.apache.helix.ZNRecordDelta.MergeOperation;
 import org.apache.helix.controller.LogUtil;
 import org.apache.helix.controller.pipeline.AbstractAsyncBaseStage;
 import org.apache.helix.controller.pipeline.AsyncWorkerType;
 import org.apache.helix.controller.pipeline.StageException;
 import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
-import org.apache.helix.model.*;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Message;
 import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.ResourceConfig;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.model.StatusUpdate;
 import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
 
 public class ExternalViewComputeStage extends AbstractAsyncBaseStage {
   private static Logger LOG = LoggerFactory.getLogger(ExternalViewComputeStage.class);
@@ -60,8 +80,11 @@ public class ExternalViewComputeStage extends AbstractAsyncBaseStage {
 
     CurrentStateOutput currentStateOutput =
         event.getAttribute(AttributeName.CURRENT_STATE.name());
+    ClusterStatusMonitor clusterStatusMonitor =
+        event.getAttribute(AttributeName.clusterStatusMonitor.name());
 
     List<ExternalView> newExtViews = new ArrayList<>();
+    Set<String> monitoringResources = new HashSet<>();
 
     Map<String, ExternalView> curExtViews = cache.getExternalViews();
 
@@ -100,24 +123,19 @@ public class ExternalViewComputeStage extends AbstractAsyncBaseStage {
       IdealState idealState = cache.getIdealState(resourceName);
       if (!cache.isTaskCache()) {
         ResourceConfig resourceConfig = cache.getResourceConfig(resourceName);
-        ClusterStatusMonitor clusterStatusMonitor =
-            event.getAttribute(AttributeName.clusterStatusMonitor.name());
         if (clusterStatusMonitor != null) {
-          if (idealState != null && (resourceConfig == null || !resourceConfig
-              .isMonitoringDisabled())) {
-            if (!idealState.getStateModelDefRef()
-                .equalsIgnoreCase(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
-              StateModelDefinition stateModelDef =
-                  cache.getStateModelDef(idealState.getStateModelDefRef());
-              clusterStatusMonitor
-                  .setResourceStatus(view, cache.getIdealState(view.getResourceName()),
-                      stateModelDef);
-              clusterStatusMonitor
-                  .updatePendingMessages(resource.getResourceName(), totalPendingMessageCount);
-            }
-          } else {
-            // Drop the metrics if the resource is dropped, or the MonitorDisabled is changed to true.
-            clusterStatusMonitor.unregisterResource(view.getResourceName());
+          if (idealState != null // has ideal state
+              && (resourceConfig == null || !resourceConfig.isMonitoringDisabled()) // monitoring not disabled
+              && !idealState.getStateModelDefRef() // and not a job resource
+              .equalsIgnoreCase(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
+            StateModelDefinition stateModelDef =
+                cache.getStateModelDef(idealState.getStateModelDefRef());
+            clusterStatusMonitor
+                .setResourceStatus(view, cache.getIdealState(view.getResourceName()),
+                    stateModelDef);
+            clusterStatusMonitor
+                .updatePendingMessages(resource.getResourceName(), totalPendingMessageCount);
+            monitoringResources.add(resourceName);
           }
         }
       }
@@ -145,7 +163,12 @@ public class ExternalViewComputeStage extends AbstractAsyncBaseStage {
       }
     }
 
-    List<String> externalviewsToRemove = new ArrayList<>();
+    // Keep MBeans for existing resources and unregister MBeans for dropped resources
+    if (clusterStatusMonitor != null) {
+      clusterStatusMonitor.retainResourceMonitor(monitoringResources);
+    }
+
+    List<String> externalViewsToRemove = new ArrayList<>();
     // TODO: consider not setting the externalview of SCHEDULER_TASK_QUEUE at all.
     // Are there any entity that will be interested in its change?
 
@@ -163,7 +186,7 @@ public class ExternalViewComputeStage extends AbstractAsyncBaseStage {
           LogUtil
               .logInfo(LOG, _eventId, "Remove externalView for resource: " + resourceName);
           dataAccessor.removeProperty(keyBuilder.externalView(resourceName));
-          externalviewsToRemove.add(resourceName);
+          externalViewsToRemove.add(resourceName);
         }
       } else {
         keys.add(keyBuilder.externalView(resourceName));
@@ -181,10 +204,10 @@ public class ExternalViewComputeStage extends AbstractAsyncBaseStage {
       if (!resourceMap.keySet().contains(resourceName)) {
         LogUtil.logInfo(LOG, _eventId, "Remove externalView for resource: " + resourceName);
         dataAccessor.removeProperty(keyBuilder.externalView(resourceName));
-        externalviewsToRemove.add(resourceName);
+        externalViewsToRemove.add(resourceName);
       }
     }
-    cache.removeExternalViews(externalviewsToRemove);
+    cache.removeExternalViews(externalViewsToRemove);
   }
 
   private void updateScheduledTaskStatus(ExternalView ev, HelixManager manager,

http://git-wip-us.apache.org/repos/asf/helix/blob/3deeeaba/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
index 52d06e3..e3655d8 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
@@ -19,12 +19,22 @@ package org.apache.helix.monitoring.mbeans;
  * under the License.
  */
 
+import com.google.common.base.Joiner;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
-import java.util.concurrent.atomic.AtomicLong;
 import org.apache.helix.controller.stages.BestPossibleStateOutput;
-import org.apache.helix.model.*;
-import org.apache.helix.task.*;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.WorkflowConfig;
+import org.apache.helix.task.WorkflowContext;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,8 +43,16 @@ import javax.management.MBeanServer;
 import javax.management.MalformedObjectNameException;
 import javax.management.ObjectName;
 import java.lang.management.ManagementFactory;
-import java.util.*;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
 
 
 public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
@@ -398,20 +416,21 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
   }
 
   /**
-   * Indicate that a resource has been dropped, thus making it OK to drop its metrics
-   * @param resourceName the resource that has been dropped
+   * Cleanup resource monitors. Keep the monitors if only exist in the input set.
+   * @param resourceNames the resources that still exist
    */
-  public void unregisterResource(String resourceName) {
-    if (_resourceMbeanMap.containsKey(resourceName)) {
-      synchronized (this) {
-        if (_resourceMbeanMap.containsKey(resourceName)) {
-          try {
-            unregisterResources(Arrays.asList(resourceName));
-          } catch (MalformedObjectNameException e) {
-            LOG.error("Could not unregister beans for " + resourceName, e);
-          }
-        }
-      }
+  public void retainResourceMonitor(Set<String> resourceNames) {
+    Set<String> resourcesToRemove = new HashSet<>();
+    synchronized (this) {
+      resourcesToRemove.addAll(_resourceMbeanMap.keySet());
+    }
+    resourcesToRemove.removeAll(resourceNames);
+
+    try {
+      unregisterResources(resourcesToRemove);
+    } catch (MalformedObjectNameException e) {
+      LOG.error(String.format("Could not unregister beans for the following resources: %s",
+          Joiner.on(',').join(resourcesToRemove)), e);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/3deeeaba/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDropResourceMetricsReset.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDropResourceMetricsReset.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDropResourceMetricsReset.java
index b815160..04e79e6 100644
--- a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDropResourceMetricsReset.java
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestDropResourceMetricsReset.java
@@ -19,15 +19,6 @@ package org.apache.helix.monitoring.mbeans;
  * under the License.
  */
 
-import java.io.IOException;
-import java.util.Date;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import javax.management.InstanceNotFoundException;
-import javax.management.MBeanServerConnection;
-import javax.management.MBeanServerNotification;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
 import org.apache.helix.TestHelper;
 import org.apache.helix.ZkUnitTestBase;
 import org.apache.helix.integration.manager.ClusterControllerManager;
@@ -35,23 +26,39 @@ import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.model.IdealState.RebalanceMode;
 import org.apache.helix.tools.ClusterSetup;
 import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import javax.management.InstanceNotFoundException;
+import javax.management.MBeanServerConnection;
+import javax.management.MBeanServerNotification;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import java.io.IOException;
+import java.util.Date;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
 public class TestDropResourceMetricsReset extends ZkUnitTestBase {
-  private final CountDownLatch _registerLatch = new CountDownLatch(1);
-  private final CountDownLatch _unregisterLatch = new CountDownLatch(1);
+  private CountDownLatch _registerLatch;
+  private CountDownLatch _unregisterLatch;
+  private String _className = TestHelper.getTestClassName();
+
+  @BeforeMethod
+  public void beforeMethod() {
+    _registerLatch = new CountDownLatch(1);
+    _unregisterLatch = new CountDownLatch(1);
+  }
 
   @Test
   public void testBasic() throws Exception {
     final int NUM_PARTICIPANTS = 4;
     final int NUM_PARTITIONS = 64;
     final int NUM_REPLICAS = 1;
-    final String RESOURCE_NAME = "TestDB0";
+    final String RESOURCE_NAME = "BasicDB0";
 
-    String className = TestHelper.getTestClassName();
     String methodName = TestHelper.getTestMethodName();
-    String clusterName = className + "_" + methodName;
-    System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+    String clusterName = _className + "_" + methodName;
 
     ParticipantMonitorListener listener =
         new ParticipantMonitorListener("ClusterStatus", clusterName, RESOURCE_NAME);
@@ -59,7 +66,7 @@ public class TestDropResourceMetricsReset extends ZkUnitTestBase {
     // Set up cluster
     TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
         "localhost", // participant name prefix
-        "TestDB", // resource name prefix
+        "BasicDB", // resource name prefix
         1, // resources
         NUM_PARTITIONS, // partitions per resource
         NUM_PARTICIPANTS, // number of nodes
@@ -97,7 +104,60 @@ public class TestDropResourceMetricsReset extends ZkUnitTestBase {
       participant.syncStop();
     }
     TestHelper.dropCluster(clusterName, _gZkClient);
-    System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis()));
+  }
+
+  @Test (dependsOnMethods = "testBasic")
+  public void testDropWithNoCurrentState() throws Exception {
+    final int NUM_PARTICIPANTS = 1;
+    final int NUM_PARTITIONS = 1;
+    final int NUM_REPLICAS = 1;
+    final String RESOURCE_NAME = "TestDB0";
+
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = _className + "_" + methodName;
+
+    ParticipantMonitorListener listener =
+        new ParticipantMonitorListener("ClusterStatus", clusterName, RESOURCE_NAME);
+
+    // Set up cluster
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+        "localhost", // participant name prefix
+        "TestDB", // resource name prefix
+        1, // resources
+        NUM_PARTITIONS, // partitions per resource
+        NUM_PARTICIPANTS, // number of nodes
+        NUM_REPLICAS, // replicas
+        "MasterSlave", RebalanceMode.FULL_AUTO, // use FULL_AUTO mode to test node tagging
+        true); // do rebalance
+
+    // Start participants and controller
+    ClusterSetup setupTool = new ClusterSetup(_gZkClient);
+    MockParticipantManager participant =
+        new MockParticipantManager(ZK_ADDR, clusterName, "localhost_12918");
+    participant.syncStart();
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_0");
+    controller.syncStart();
+
+    // Verify that the bean was created
+    boolean noTimeout = _registerLatch.await(30000, TimeUnit.MILLISECONDS);
+    Assert.assertTrue(noTimeout);
+
+    // stop the participant, so the resource does not exist in any current states.
+    participant.syncStop();
+
+    // Drop the resource
+    setupTool.dropResourceFromCluster(clusterName, RESOURCE_NAME);
+
+    // Verify that the bean was removed
+    noTimeout = _unregisterLatch.await(30000, TimeUnit.MILLISECONDS);
+    Assert.assertTrue(noTimeout);
+
+    // Clean up
+    listener.disconnect();
+    controller.syncStop();
+
+    TestHelper.dropCluster(clusterName, _gZkClient);
   }
 
   private ObjectName getObjectName(String resourceName, String clusterName)


[4/4] helix git commit: Adding RoutingTableProvider monitor for tracking refresh actions and routing table update callbacks.

Posted by ji...@apache.org.
Adding RoutingTableProvider monitor for tracking refresh actions and routing table update callbacks.

The monitor contains following metrics.
DataRefreshLatencyGauge
CallbackCounter
EventQueueSizeGauge
DataRefreshCounter

Also add tests for this monitor.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/993beb38
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/993beb38
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/993beb38

Branch: refs/heads/master
Commit: 993beb3834f4013de8d6d8221bd71ccdced93632
Parents: 3deeeab
Author: Jiajun Wang <jj...@linkedin.com>
Authored: Thu Jul 12 10:33:21 2018 -0700
Committer: jiajunwang <er...@gmail.com>
Committed: Tue Jul 17 11:57:12 2018 -0700

----------------------------------------------------------------------
 .../monitoring/mbeans/MonitorDomainNames.java   |   1 +
 .../mbeans/RoutingTableProviderMonitor.java     | 100 +++++++++++++++++++
 .../helix/spectator/RoutingTableProvider.java   |  28 +++++-
 ...TestRoutingTableProviderPeriodicRefresh.java |   4 +-
 .../mbeans/TestRoutingTableProviderMonitor.java | 100 +++++++++++++++++++
 5 files changed, 228 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/993beb38/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MonitorDomainNames.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MonitorDomainNames.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MonitorDomainNames.java
index c28570d..73bf057 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MonitorDomainNames.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/MonitorDomainNames.java
@@ -27,5 +27,6 @@ public enum MonitorDomainNames {
   HelixZkClient,
   HelixThreadPoolExecutor,
   HelixCallback,
+  RoutingTableProvider,
   CLMParticipantReport
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/993beb38/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/RoutingTableProviderMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/RoutingTableProviderMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/RoutingTableProviderMonitor.java
new file mode 100644
index 0000000..1c64783
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/RoutingTableProviderMonitor.java
@@ -0,0 +1,100 @@
+package org.apache.helix.monitoring.mbeans;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import com.codahale.metrics.Histogram;
+import com.codahale.metrics.SlidingTimeWindowArrayReservoir;
+import org.apache.helix.PropertyType;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMBeanProvider;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.DynamicMetric;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.HistogramDynamicMetric;
+import org.apache.helix.monitoring.mbeans.dynamicMBeans.SimpleDynamicMetric;
+
+import javax.management.JMException;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+public class RoutingTableProviderMonitor extends DynamicMBeanProvider {
+  public static final String DATA_TYPE_KEY = "DataType";
+  public static final String CLUSTER_KEY = "Cluster";
+  public static final String DEFAULT = "DEFAULT";
+
+  private static final String MBEAN_DESCRIPTION = "Helix RoutingTableProvider Monitor";
+  private final String _sensorName;
+  private final PropertyType _propertyType;
+  private final String _clusterName;
+
+  private SimpleDynamicMetric<Long> _callbackCounter;
+  private SimpleDynamicMetric<Long> _eventQueueSizeGauge;
+  private SimpleDynamicMetric<Long> _dataRefreshCounter;
+  private HistogramDynamicMetric _dataRefreshLatencyGauge;
+
+  public RoutingTableProviderMonitor(final PropertyType propertyType, String clusterName) {
+    _propertyType = propertyType;
+    _clusterName = clusterName == null ? DEFAULT : clusterName;
+
+    // Don't put instanceName into sensor name. This detail information is in the MBean name already.
+    _sensorName = String
+        .format("%s.%s.%s", MonitorDomainNames.RoutingTableProvider.name(), _clusterName,
+            _propertyType.name());
+
+    _dataRefreshLatencyGauge = new HistogramDynamicMetric("DataRefreshLatencyGauge", new Histogram(
+        new SlidingTimeWindowArrayReservoir(DEFAULT_RESET_INTERVAL_MS, TimeUnit.MILLISECONDS)));
+    _callbackCounter = new SimpleDynamicMetric("CallbackCounter", 0l);
+    _eventQueueSizeGauge = new SimpleDynamicMetric("EventQueueSizeGauge", 0l);
+    _dataRefreshCounter = new SimpleDynamicMetric("DataRefreshCounter", 0l);
+  }
+
+  @Override
+  public String getSensorName() {
+    return _sensorName;
+  }
+
+  private ObjectName getMBeanName() throws MalformedObjectNameException {
+    return new ObjectName(String
+        .format("%s:%s=%s,%s=%s", MonitorDomainNames.RoutingTableProvider.name(), CLUSTER_KEY,
+            _clusterName, DATA_TYPE_KEY, _propertyType.name()));
+  }
+
+  public void increaseCallbackCounters(long currentQueueSize) {
+    _callbackCounter.updateValue(_callbackCounter.getValue() + 1);
+    _eventQueueSizeGauge.updateValue(currentQueueSize);
+  }
+
+  public void increaseDataRefreshCounters(long startTime) {
+    _dataRefreshCounter.updateValue(_dataRefreshCounter.getValue() + 1);
+    _dataRefreshLatencyGauge.updateValue(System.currentTimeMillis() - startTime);
+  }
+
+  @Override
+  public RoutingTableProviderMonitor register() throws JMException {
+    List<DynamicMetric<?, ?>> attributeList = new ArrayList<>();
+    attributeList.add(_dataRefreshLatencyGauge);
+    attributeList.add(_callbackCounter);
+    attributeList.add(_eventQueueSizeGauge);
+    attributeList.add(_dataRefreshCounter);
+
+    doRegister(attributeList, MBEAN_DESCRIPTION, getMBeanName());
+    return this;
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/993beb38/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
index 57b2fad..cc373db 100644
--- a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
@@ -30,6 +30,8 @@ import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
+import javax.management.JMException;
+
 import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
@@ -52,6 +54,7 @@ import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
+import org.apache.helix.monitoring.mbeans.RoutingTableProviderMonitor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -65,6 +68,7 @@ public class RoutingTableProvider
   private final RouterUpdater _routerUpdater;
   private final PropertyType _sourceDataType;
   private final Map<RoutingTableChangeListener, ListenerContext> _routingTableChangeListenerMap;
+  private final RoutingTableProviderMonitor _monitor;
 
   // For periodic refresh
   private long _lastRefreshTimestamp;
@@ -101,6 +105,14 @@ public class RoutingTableProvider
     _sourceDataType = sourceDataType;
     _routingTableChangeListenerMap = new ConcurrentHashMap<>();
     String clusterName = _helixManager != null ? _helixManager.getClusterName() : null;
+
+    _monitor = new RoutingTableProviderMonitor(_sourceDataType, clusterName);
+    try {
+      _monitor.register();
+    } catch (JMException e) {
+      logger.error("Failed to register RoutingTableProvider monitor MBean.", e);
+    }
+
     _routerUpdater = new RouterUpdater(clusterName, _sourceDataType);
     _routerUpdater.start();
 
@@ -189,6 +201,9 @@ public class RoutingTableProvider
       _periodicRefreshExecutor.shutdown();
     }
     _routerUpdater.shutdown();
+
+    _monitor.unregister();
+
     if (_helixManager != null) {
       PropertyKey.Builder keyBuilder = _helixManager.getHelixDataAccessor().keyBuilder();
       switch (_sourceDataType) {
@@ -511,7 +526,7 @@ public class RoutingTableProvider
     _routingTableRef.set(newRoutingTable);
   }
 
-  public void refresh(List<ExternalView> externalViewList, NotificationContext changeContext) {
+  protected void refresh(List<ExternalView> externalViewList, NotificationContext changeContext) {
     HelixDataAccessor accessor = changeContext.getManager().getHelixDataAccessor();
     PropertyKey.Builder keyBuilder = accessor.keyBuilder();
 
@@ -520,7 +535,7 @@ public class RoutingTableProvider
     refresh(externalViewList, configList, liveInstances);
   }
 
-  public void refresh(Collection<ExternalView> externalViews,
+  protected void refresh(Collection<ExternalView> externalViews,
       Collection<InstanceConfig> instanceConfigs, Collection<LiveInstance> liveInstances) {
     long startTime = System.currentTimeMillis();
     RoutingTable newRoutingTable = new RoutingTable(externalViews, instanceConfigs, liveInstances);
@@ -581,6 +596,9 @@ public class RoutingTableProvider
           logger.error(String.format("HelixManager is not connected for router update event: %s", event));
           throw new HelixException("HelixManager is not connected for router update event.");
         }
+
+        long startTime = System.currentTimeMillis();
+
         _dataCache.refresh(manager.getHelixDataAccessor());
         switch (_sourceDataType) {
           case EXTERNALVIEW:
@@ -599,6 +617,8 @@ public class RoutingTableProvider
             logger.warn("Unsupported source data type: {}, stop refreshing the routing table!",
                 _sourceDataType);
         }
+
+        _monitor.increaseDataRefreshCounters(startTime);
       }
     }
 
@@ -616,6 +636,8 @@ public class RoutingTableProvider
       event.addAttribute(AttributeName.helixmanager.name(), context.getManager());
       event.addAttribute(AttributeName.changeContext.name(), context);
       queueEvent(event);
+
+      _monitor.increaseCallbackCounters(_eventQueue.size());
     }
   }
 
@@ -630,4 +652,4 @@ public class RoutingTableProvider
       return _context;
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/993beb38/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderPeriodicRefresh.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderPeriodicRefresh.java b/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderPeriodicRefresh.java
index c78b8e6..e2b6df1 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderPeriodicRefresh.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderPeriodicRefresh.java
@@ -142,7 +142,7 @@ public class TestRoutingTableProviderPeriodicRefresh extends ZkTestBase {
     }
 
     @Override
-    public synchronized void refresh(List<ExternalView> externalViewList,
+    protected synchronized void refresh(List<ExternalView> externalViewList,
         NotificationContext changeContext) {
       super.refresh(externalViewList, changeContext);
       _refreshCount++;
@@ -152,7 +152,7 @@ public class TestRoutingTableProviderPeriodicRefresh extends ZkTestBase {
     }
 
     @Override
-    public synchronized void refresh(Collection<ExternalView> externalViews,
+    protected synchronized void refresh(Collection<ExternalView> externalViews,
         Collection<InstanceConfig> instanceConfigs, Collection<LiveInstance> liveInstances) {
       super.refresh(externalViews, instanceConfigs, liveInstances);
       _refreshCount++;

http://git-wip-us.apache.org/repos/asf/helix/blob/993beb38/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRoutingTableProviderMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRoutingTableProviderMonitor.java b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRoutingTableProviderMonitor.java
new file mode 100644
index 0000000..05240c1
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/monitoring/mbeans/TestRoutingTableProviderMonitor.java
@@ -0,0 +1,100 @@
+package org.apache.helix.monitoring.mbeans;
+
+import org.apache.helix.PropertyType;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import javax.management.JMException;
+import javax.management.MBeanServer;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import java.lang.management.ManagementFactory;
+import java.util.HashSet;
+import java.util.Set;
+
+public class TestRoutingTableProviderMonitor {
+
+  private MBeanServer _beanServer = ManagementFactory.getPlatformMBeanServer();
+
+  private final String TEST_CLUSTER = "test_cluster";
+
+  private ObjectName buildObjectName(PropertyType type, String cluster)
+      throws MalformedObjectNameException {
+    return MBeanRegistrar.buildObjectName(MonitorDomainNames.RoutingTableProvider.name(),
+        RoutingTableProviderMonitor.CLUSTER_KEY, cluster, RoutingTableProviderMonitor.DATA_TYPE_KEY,
+        type.name());
+  }
+
+  private ObjectName buildObjectName(PropertyType type, String cluster, int num)
+      throws MalformedObjectNameException {
+    ObjectName objectName = buildObjectName(type, cluster);
+    if (num > 0) {
+      return new ObjectName(String
+          .format("%s,%s=%s", objectName.toString(), MBeanRegistrar.DUPLICATE,
+              String.valueOf(num)));
+    } else {
+      return objectName;
+    }
+  }
+
+  @Test
+  public void testMBeanRegisteration() throws JMException {
+    Set<RoutingTableProviderMonitor> monitors = new HashSet<>();
+    for (PropertyType type : PropertyType.values()) {
+      monitors.add(new RoutingTableProviderMonitor(type, TEST_CLUSTER).register());
+      Assert.assertTrue(_beanServer.isRegistered(buildObjectName(type, TEST_CLUSTER)));
+    }
+
+    for (PropertyType type : PropertyType.values()) {
+      monitors.add(new RoutingTableProviderMonitor(type, TEST_CLUSTER).register());
+      Assert.assertTrue(_beanServer.isRegistered(buildObjectName(type, TEST_CLUSTER, 1)));
+    }
+
+    for (PropertyType type : PropertyType.values()) {
+      monitors.add(new RoutingTableProviderMonitor(type, TEST_CLUSTER).register());
+      Assert.assertTrue(_beanServer.isRegistered(buildObjectName(type, TEST_CLUSTER, 2)));
+    }
+
+    // Un-register all monitors
+    for (RoutingTableProviderMonitor monitor : monitors) {
+      monitor.unregister();
+    }
+
+    for (PropertyType type : PropertyType.values()) {
+      Assert.assertFalse(_beanServer.isRegistered(buildObjectName(type, TEST_CLUSTER)));
+      Assert.assertFalse(_beanServer.isRegistered(buildObjectName(type, TEST_CLUSTER, 1)));
+      Assert.assertFalse(_beanServer.isRegistered(buildObjectName(type, TEST_CLUSTER, 2)));
+    }
+  }
+
+  @Test
+  public void testMetrics() throws JMException, InterruptedException {
+    PropertyType type = PropertyType.EXTERNALVIEW;
+    RoutingTableProviderMonitor monitor = new RoutingTableProviderMonitor(type, TEST_CLUSTER);
+    monitor.register();
+    ObjectName name = buildObjectName(type, TEST_CLUSTER);
+
+    monitor.increaseCallbackCounters(10);
+    Assert.assertEquals((long) _beanServer.getAttribute(name, "CallbackCounter"), 1);
+    Assert.assertEquals((long) _beanServer.getAttribute(name, "EventQueueSizeGauge"), 10);
+    monitor.increaseCallbackCounters(15);
+    Assert.assertEquals((long) _beanServer.getAttribute(name, "CallbackCounter"), 2);
+    Assert.assertEquals((long) _beanServer.getAttribute(name, "EventQueueSizeGauge"), 15);
+    Assert.assertEquals((long) _beanServer.getAttribute(name, "DataRefreshLatencyGauge.Max"), 0);
+    Assert.assertEquals((long) _beanServer.getAttribute(name, "DataRefreshCounter"), 0);
+
+    long startTime = System.currentTimeMillis();
+    Thread.sleep(5);
+    monitor.increaseDataRefreshCounters(startTime);
+    long latency = (long) _beanServer.getAttribute(name, "DataRefreshLatencyGauge.Max");
+    Assert.assertTrue(latency >= 5 && latency <= System.currentTimeMillis() - startTime);
+    Assert.assertEquals((long) _beanServer.getAttribute(name, "DataRefreshCounter"), 1);
+
+    monitor.increaseDataRefreshCounters(startTime);
+    long newLatency = (long) _beanServer.getAttribute(name, "DataRefreshLatencyGauge.Max");
+    Assert.assertTrue(newLatency >= latency);
+    Assert.assertEquals((long) _beanServer.getAttribute(name, "DataRefreshCounter"), 2);
+
+    monitor.unregister();
+  }
+}