You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by lx...@apache.org on 2018/03/23 19:29:22 UTC

[1/6] helix git commit: Fix NPE for RoutingTableProvider listener

Repository: helix
Updated Branches:
  refs/heads/master 623330e3a -> 1c78fef91


Fix NPE for RoutingTableProvider listener


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/1c78fef9
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/1c78fef9
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/1c78fef9

Branch: refs/heads/master
Commit: 1c78fef91e48345876ccdb038aa4ca1099a3bacd
Parents: 90730bf
Author: Junkai Xue <jx...@linkedin.com>
Authored: Mon Mar 12 16:15:30 2018 -0700
Committer: Lei Xia <lx...@linkedin.com>
Committed: Fri Mar 23 12:12:44 2018 -0700

----------------------------------------------------------------------
 .../helix/spectator/RoutingTableProvider.java   | 22 +++++++++++++++-----
 .../spectator/TestRoutingTableProvider.java     |  2 +-
 2 files changed, 18 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/1c78fef9/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
index 004d7ca..8907922 100644
--- a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
@@ -60,7 +60,7 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc
   private final HelixManager _helixManager;
   private final RouterUpdater _routerUpdater;
   private final PropertyType _sourceDataType;
-  private final Map<RoutingTableChangeListener, Object> _routingTableChangeListenerMap;
+  private final Map<RoutingTableChangeListener, ListenerContext> _routingTableChangeListenerMap;
 
   public RoutingTableProvider() {
     this(null);
@@ -173,7 +173,7 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc
    */
   public void addRoutingTableChangeListener(final RoutingTableChangeListener routingTableChangeListener,
       Object context) {
-    _routingTableChangeListenerMap.put(routingTableChangeListener, context);
+    _routingTableChangeListenerMap.put(routingTableChangeListener, new ListenerContext(context));
   }
 
   /**
@@ -494,10 +494,10 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc
   }
 
   private void notifyRoutingTableChange() {
-    for (Map.Entry<RoutingTableChangeListener, Object> entry : _routingTableChangeListenerMap
+    for (Map.Entry<RoutingTableChangeListener, ListenerContext> entry : _routingTableChangeListenerMap
         .entrySet()) {
-      entry.getKey()
-          .onRoutingTableChange(new RoutingTableSnapshot(_routingTableRef.get()), entry.getValue());
+      entry.getKey().onRoutingTableChange(new RoutingTableSnapshot(_routingTableRef.get()),
+          entry.getValue().getContext());
     }
   }
 
@@ -561,4 +561,16 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc
       queueEvent(event);
     }
   }
+
+  private class ListenerContext {
+    private Object _context;
+
+    public ListenerContext(Object context) {
+      _context = context;
+    }
+
+    public Object getContext() {
+      return _context;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/1c78fef9/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProvider.java b/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProvider.java
index 6fa7f6d..ff35dbd 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProvider.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProvider.java
@@ -164,7 +164,7 @@ public class TestRoutingTableProvider extends ZkIntegrationTestBase {
     context.put("MASTER", Sets.newSet(_instances.get(0)));
     context.put("SLAVE", Sets.newSet(_instances.get(1), _instances.get(2)));
     _routingTableProvider.addRoutingTableChangeListener(routingTableChangeListener, context);
-
+    _routingTableProvider.addRoutingTableChangeListener(new MockRoutingTableChangeListener(), null);
     // reenable the master instance to cause change
     String prevMasterInstance = _instances.get(0);
     _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, prevMasterInstance, true);


[3/6] helix git commit: Allow to get all resources from RoutingTableProvider class.

Posted by lx...@apache.org.
Allow to get all resources from RoutingTableProvider class.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/6f6ab65a
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/6f6ab65a
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/6f6ab65a

Branch: refs/heads/master
Commit: 6f6ab65a6e6c4526f7c6957f2f0cdfaed78e44d5
Parents: 623330e
Author: Lei Xia <lx...@linkedin.com>
Authored: Thu Feb 8 15:52:01 2018 -0800
Committer: Lei Xia <lx...@linkedin.com>
Committed: Fri Mar 23 12:12:44 2018 -0700

----------------------------------------------------------------------
 .../apache/helix/spectator/RoutingTable.java    | 15 +++++---
 .../helix/spectator/RoutingTableProvider.java   | 11 +++++-
 .../java/org/apache/helix/TestRoutingTable.java | 36 +++++++++++++++++---
 .../Spectator/TestRoutingTableProvider.java     |  4 +++
 4 files changed, 56 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/6f6ab65a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTable.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTable.java b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTable.java
index 564a218..f704ee4 100644
--- a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTable.java
+++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTable.java
@@ -211,7 +211,7 @@ class RoutingTable {
     if (instanceList == null) {
       instanceList = Collections.emptyList();
     }
-    return instanceList;
+    return Collections.unmodifiableList(instanceList);
   }
 
   /**
@@ -219,7 +219,7 @@ class RoutingTable {
    * @return
    */
   protected Collection<LiveInstance> getLiveInstances() {
-    return _liveInstances;
+    return Collections.unmodifiableCollection(_liveInstances);
   }
 
   /**
@@ -227,7 +227,14 @@ class RoutingTable {
    * @return
    */
   protected Collection<InstanceConfig> getInstanceConfigs() {
-    return _instanceConfigs;
+    return Collections.unmodifiableCollection(_instanceConfigs);
+  }
+
+  /**
+   * Return names of all resources (shown in ExternalView) in this cluster.
+   */
+  protected Collection<String> getResources() {
+    return Collections.unmodifiableCollection(_resourceInfoMap.keySet());
   }
 
   /**
@@ -261,7 +268,7 @@ class RoutingTable {
       return Collections.emptyList();
     }
 
-    return instanceList;
+    return Collections.unmodifiableList(instanceList);
   }
 
   private void refresh(Collection<ExternalView> externalViewList,

http://git-wip-us.apache.org/repos/asf/helix/blob/6f6ab65a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
index cd4e3d2..be85b65 100644
--- a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
@@ -20,6 +20,7 @@ package org.apache.helix.spectator;
  */
 
 import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
@@ -140,7 +141,8 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc
    */
   public List<InstanceConfig> getInstancesForResourceGroup(String resourceGroupName,
       String partitionName, String state) {
-    return _routingTableRef.get().getInstancesForResourceGroup(resourceGroupName, partitionName, state);
+    return _routingTableRef.get().getInstancesForResourceGroup(resourceGroupName, partitionName,
+        state);
   }
 
   /**
@@ -229,6 +231,13 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc
     return _routingTableRef.get().getInstanceConfigs();
   }
 
+  /**
+   * Return names of all resources (shown in ExternalView) in this cluster.
+   */
+  public Collection<String> getResources() {
+    return _routingTableRef.get().getResources();
+  }
+
   @Override
   @PreFetch(enabled = false)
   public void onExternalViewChange(List<ExternalView> externalViewList,

http://git-wip-us.apache.org/repos/asf/helix/blob/6f6ab65a/helix-core/src/test/java/org/apache/helix/TestRoutingTable.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/TestRoutingTable.java b/helix-core/src/test/java/org/apache/helix/TestRoutingTable.java
index 11f3701..8987147 100644
--- a/helix-core/src/test/java/org/apache/helix/TestRoutingTable.java
+++ b/helix-core/src/test/java/org/apache/helix/TestRoutingTable.java
@@ -20,8 +20,10 @@ package org.apache.helix;
  */
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -36,6 +38,7 @@ import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.spectator.RoutingTableProvider;
+import org.testng.Assert;
 import org.testng.AssertJUnit;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -60,7 +63,6 @@ public class TestRoutingTable {
             @SuppressWarnings("unchecked")
             @Override
             public <T extends HelixProperty> List<T> getChildValues(PropertyKey key)
-            // public List<ZNRecord> getChildValues(PropertyType type, String... keys)
             {
               PropertyType type = key.getType();
               String[] keys = key.getParams();
@@ -104,7 +106,7 @@ public class TestRoutingTable {
 
     // one master
     add(record, "TESTDB_0", "localhost_8900", "MASTER");
-    List<ExternalView> externalViewList = new ArrayList<ExternalView>();
+    List<ExternalView> externalViewList = new ArrayList<>();
     externalViewList.add(new ExternalView(record));
     routingTable.onExternalViewChange(externalViewList, changeContext);
 
@@ -119,6 +121,7 @@ public class TestRoutingTable {
     externalViewList = new ArrayList<ExternalView>();
     externalViewList.add(new ExternalView(record));
     routingTable.onExternalViewChange(externalViewList, changeContext);
+
     instances = routingTable.getInstances("TESTDB", "TESTDB_0", "MASTER");
     AssertJUnit.assertNotNull(instances);
     AssertJUnit.assertEquals(instances.size(), 2);
@@ -129,7 +132,7 @@ public class TestRoutingTable {
 
     // updates
     add(record, "TESTDB_0", "localhost_8901", "SLAVE");
-    externalViewList = new ArrayList<ExternalView>();
+    externalViewList = new ArrayList<>();
     externalViewList.add(new ExternalView(record));
     routingTable.onExternalViewChange(externalViewList, changeContext);
     instances = routingTable.getInstances("TESTDB", "TESTDB_0", "SLAVE");
@@ -137,6 +140,29 @@ public class TestRoutingTable {
     AssertJUnit.assertEquals(instances.size(), 1);
   }
 
+
+  @Test()
+  public void testGetResources() {
+    RoutingTableProvider routingTable = new RoutingTableProvider();
+    List<ExternalView> externalViewList = new ArrayList<>();
+    Set<String> databases = new HashSet<>();
+
+    for (int i = 0; i < 5; i++) {
+      String db = "TESTDB" + i;
+      ZNRecord record = new ZNRecord(db);
+      // one master
+      add(record, db+"_0", "localhost_8900", "MASTER");
+      add(record, db+"_1", "localhost_8901", "SLAVE");
+      externalViewList.add(new ExternalView(record));
+      databases.add(db);
+    }
+
+    routingTable.onExternalViewChange(externalViewList, changeContext);
+    Collection<String> resources = routingTable.getResources();
+    Assert.assertEquals(databases.size(), externalViewList.size());
+    Assert.assertEquals(databases, new HashSet<>(resources));
+  }
+
   @Test()
   public void testStateUnitGroupDeletion() throws InterruptedException {
     List<InstanceConfig> instances;
@@ -207,7 +233,7 @@ public class TestRoutingTable {
   @Test()
   public void testMultiThread() throws Exception {
     final RoutingTableProvider routingTable = new RoutingTableProvider();
-    List<ExternalView> externalViewList = new ArrayList<ExternalView>();
+    List<ExternalView> externalViewList = new ArrayList<>();
     ZNRecord record = new ZNRecord("TESTDB");
     for (int i = 0; i < 1000; i++) {
       add(record, "TESTDB_" + i, "localhost_8900", "MASTER");
@@ -258,7 +284,7 @@ public class TestRoutingTable {
   private void add(ZNRecord record, String stateUnitKey, String instanceName, String state) {
     Map<String, String> stateUnitKeyMap = record.getMapField(stateUnitKey);
     if (stateUnitKeyMap == null) {
-      stateUnitKeyMap = new HashMap<String, String>();
+      stateUnitKeyMap = new HashMap<>();
       record.setMapField(stateUnitKey, stateUnitKeyMap);
     }
     stateUnitKeyMap.put(instanceName, state);

http://git-wip-us.apache.org/repos/asf/helix/blob/6f6ab65a/helix-core/src/test/java/org/apache/helix/integration/Spectator/TestRoutingTableProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/Spectator/TestRoutingTableProvider.java b/helix-core/src/test/java/org/apache/helix/integration/Spectator/TestRoutingTableProvider.java
index aa731e5..0f72091 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/Spectator/TestRoutingTableProvider.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/Spectator/TestRoutingTableProvider.java
@@ -2,6 +2,7 @@ package org.apache.helix.integration.Spectator;
 
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Date;
 import java.util.HashSet;
 import java.util.List;
@@ -112,6 +113,9 @@ public class TestRoutingTableProvider extends ZkIntegrationTestBase {
         Sets.newSet(_instances.get(1), _instances.get(2)));
     validateRoutingTable(_routingTableProvider2, Sets.newSet(_instances.get(0)),
         Sets.newSet(_instances.get(1), _instances.get(2)));
+
+    Collection<String> databases = _routingTableProvider.getResources();
+    Assert.assertEquals(databases.size(), 1);
   }
 
   @Test(dependsOnMethods = { "testRoutingTable" })


[2/6] helix git commit: Support RoutingTableChangeListener in RoutingTableProvider. Add test for it.

Posted by lx...@apache.org.
Support RoutingTableChangeListener in RoutingTableProvider. Add test for it.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/1c04f2f8
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/1c04f2f8
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/1c04f2f8

Branch: refs/heads/master
Commit: 1c04f2f8b352e37fe9d64b40a7c9d14cbfb15e4c
Parents: 73d243f
Author: Junkai Xue <jx...@linkedin.com>
Authored: Wed Feb 28 14:12:31 2018 -0800
Committer: Lei Xia <lx...@linkedin.com>
Committed: Fri Mar 23 12:12:44 2018 -0700

----------------------------------------------------------------------
 .../listeners/RoutingTableChangeListener.java   | 17 ++++++++
 .../helix/spectator/RoutingTableProvider.java   | 35 ++++++++++++++++
 .../spectator/TestRoutingTableProvider.java     | 43 +++++++++++++++++++-
 3 files changed, 93 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/1c04f2f8/helix-core/src/main/java/org/apache/helix/api/listeners/RoutingTableChangeListener.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/listeners/RoutingTableChangeListener.java b/helix-core/src/main/java/org/apache/helix/api/listeners/RoutingTableChangeListener.java
new file mode 100644
index 0000000..96015c1
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/api/listeners/RoutingTableChangeListener.java
@@ -0,0 +1,17 @@
+package org.apache.helix.api.listeners;
+
+import org.apache.helix.spectator.RoutingTableSnapshot;
+
+/**
+ * Interface to implement to listen for changes to RoutingTable on changes
+ */
+public interface RoutingTableChangeListener {
+
+  /**
+   * Invoked when RoutingTable on changes
+   *
+   * @param routingTableSnapshot
+   * @param context
+   */
+  void onRoutingTableChange(RoutingTableSnapshot routingTableSnapshot, Object context);
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/1c04f2f8/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
index 3e01ea4..004d7ca 100644
--- a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
@@ -25,6 +25,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.helix.HelixConstants;
@@ -40,6 +41,7 @@ import org.apache.helix.NotificationContext;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.api.listeners.LiveInstanceChangeListener;
 import org.apache.helix.api.listeners.PreFetch;
+import org.apache.helix.api.listeners.RoutingTableChangeListener;
 import org.apache.helix.common.ClusterEventProcessor;
 import org.apache.helix.controller.stages.AttributeName;
 import org.apache.helix.controller.stages.ClusterEvent;
@@ -58,6 +60,7 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc
   private final HelixManager _helixManager;
   private final RouterUpdater _routerUpdater;
   private final PropertyType _sourceDataType;
+  private final Map<RoutingTableChangeListener, Object> _routingTableChangeListenerMap;
 
   public RoutingTableProvider() {
     this(null);
@@ -72,6 +75,7 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc
     _routingTableRef = new AtomicReference<>(new RoutingTable());
     _helixManager = helixManager;
     _sourceDataType = sourceDataType;
+    _routingTableChangeListenerMap = new ConcurrentHashMap<>();
     String clusterName = _helixManager != null ? _helixManager.getClusterName() : null;
     _routerUpdater = new RouterUpdater(clusterName, _sourceDataType);
     _routerUpdater.start();
@@ -162,6 +166,27 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc
   }
 
   /**
+   * Add RoutingTableChangeListener with user defined context
+   *
+   * @param routingTableChangeListener
+   * @param context user defined context
+   */
+  public void addRoutingTableChangeListener(final RoutingTableChangeListener routingTableChangeListener,
+      Object context) {
+    _routingTableChangeListenerMap.put(routingTableChangeListener, context);
+  }
+
+  /**
+   * Remove RoutingTableChangeListener
+   *
+   * @param routingTableChangeListener
+   */
+  public Object removeRoutingTableChangeListener(
+      final RoutingTableChangeListener routingTableChangeListener) {
+    return _routingTableChangeListenerMap.remove(routingTableChangeListener);
+  }
+
+  /**
    * returns the instances for {resource,partition} pair that are in a specific
    * {state}
    *
@@ -454,6 +479,7 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc
     _routingTableRef.set(newRoutingTable);
     logger.info("Refreshed the RoutingTable for cluster " + (_helixManager != null ? _helixManager
         .getClusterName() : null) + ", takes " + (System.currentTimeMillis() - startTime) + "ms.");
+    notifyRoutingTableChange();
   }
 
   protected void refresh(Map<String, Map<String, Map<String, CurrentState>>> currentStateMap,
@@ -464,6 +490,15 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc
     _routingTableRef.set(newRoutingTable);
     logger.info("Refresh the RoutingTable for cluster " + (_helixManager != null ? _helixManager
         .getClusterName() : null) + ", takes " + (System.currentTimeMillis() - startTime) + "ms.");
+    notifyRoutingTableChange();
+  }
+
+  private void notifyRoutingTableChange() {
+    for (Map.Entry<RoutingTableChangeListener, Object> entry : _routingTableChangeListenerMap
+        .entrySet()) {
+      entry.getKey()
+          .onRoutingTableChange(new RoutingTableSnapshot(_routingTableRef.get()), entry.getValue());
+    }
   }
 
   private class RouterUpdater extends ClusterEventProcessor {

http://git-wip-us.apache.org/repos/asf/helix/blob/1c04f2f8/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProvider.java b/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProvider.java
index eb8f4b1..6fa7f6d 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProvider.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProvider.java
@@ -3,12 +3,15 @@ package org.apache.helix.integration.spectator;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Date;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import org.apache.helix.HelixManager;
 import org.apache.helix.HelixManagerFactory;
 import org.apache.helix.InstanceType;
+import org.apache.helix.api.listeners.RoutingTableChangeListener;
 import org.apache.helix.integration.common.ZkIntegrationTestBase;
 import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
@@ -16,6 +19,7 @@ import org.apache.helix.model.BuiltInStateModelDefinitions;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.spectator.RoutingTableProvider;
+import org.apache.helix.spectator.RoutingTableSnapshot;
 import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
 import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier;
 import org.mockito.internal.util.collections.Sets;
@@ -43,6 +47,28 @@ public class TestRoutingTableProvider extends ZkIntegrationTestBase {
   private HelixClusterVerifier _clusterVerifier;
   private RoutingTableProvider _routingTableProvider;
   private RoutingTableProvider _routingTableProvider2;
+  private boolean _listenerTestResult = true;
+
+  class MockRoutingTableChangeListener implements RoutingTableChangeListener {
+    @Override
+    public void onRoutingTableChange(RoutingTableSnapshot routingTableSnapshot, Object context) {
+      Set<String> masterInstances = new HashSet<>();
+      Set<String> slaveInstances = new HashSet<>();
+      for (InstanceConfig config : routingTableSnapshot
+          .getInstancesForResource(TEST_DB, "MASTER")) {
+        masterInstances.add(config.getInstanceName());
+      }
+      for (InstanceConfig config : routingTableSnapshot.getInstancesForResource(TEST_DB, "SLAVE")) {
+        slaveInstances.add(config.getInstanceName());
+      }
+      if (!masterInstances.equals(Map.class.cast(context).get("MASTER")) || !slaveInstances
+          .equals(Map.class.cast(context).get("SLAVE"))) {
+        _listenerTestResult = false;
+      } else {
+        _listenerTestResult = true;
+      }
+    }
+  }
 
   @BeforeClass
   public void beforeClass() throws Exception {
@@ -130,12 +156,25 @@ public class TestRoutingTableProvider extends ZkIntegrationTestBase {
         Sets.newSet(_instances.get(2)));
   }
 
+
   @Test(dependsOnMethods = { "testDisableInstance" })
-  public void testShutdownInstance() throws InterruptedException {
-    // reenable the first instance
+  public void testRoutingTableListener() throws InterruptedException {
+    RoutingTableChangeListener routingTableChangeListener = new MockRoutingTableChangeListener();
+    Map<String, Set<String>> context = new HashMap<>();
+    context.put("MASTER", Sets.newSet(_instances.get(0)));
+    context.put("SLAVE", Sets.newSet(_instances.get(1), _instances.get(2)));
+    _routingTableProvider.addRoutingTableChangeListener(routingTableChangeListener, context);
+
+    // reenable the master instance to cause change
     String prevMasterInstance = _instances.get(0);
     _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, prevMasterInstance, true);
+    Assert.assertTrue(_clusterVerifier.verify());
+    Assert.assertTrue(_listenerTestResult);
+  }
 
+
+  @Test(dependsOnMethods = { "testRoutingTableListener" })
+  public void testShutdownInstance() throws InterruptedException {
     // shutdown second instance.
     _participants.get(1).syncStop();
 


[4/6] helix git commit: Change RoutingTableProvider to support direct aggregating routing information from CurrentStates in each liveinstance. When sourceDataType is set as CurrentState, RoutingTableProvider will listen on CurrentStateChanges and refresh

Posted by lx...@apache.org.
Change RoutingTableProvider to support direct aggregating routing information from CurrentStates in each liveinstance. When sourceDataType is set as CurrentState, RoutingTableProvider will listen on CurrentStateChanges and refresh routing table from CurrentStates upon changes.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/e5728469
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/e5728469
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/e5728469

Branch: refs/heads/master
Commit: e5728469e02f196690654b4f7f2ed8ca9130a631
Parents: 6f6ab65
Author: Lei Xia <lx...@linkedin.com>
Authored: Tue Mar 20 15:04:30 2018 -0700
Committer: Lei Xia <lx...@linkedin.com>
Committed: Fri Mar 23 12:12:44 2018 -0700

----------------------------------------------------------------------
 .../helix/common/ClusterEventProcessor.java     |   5 +
 .../common/caches/BasicClusterDataCache.java    |  66 +++---
 .../controller/stages/ClusterEventType.java     |   1 +
 .../helix/spectator/RoutingDataCache.java       |  83 ++++++-
 .../apache/helix/spectator/RoutingTable.java    | 106 ++++++---
 .../helix/spectator/RoutingTableProvider.java   | 221 +++++++++++++++++--
 .../java/org/apache/helix/MockAccessor.java     |   7 +-
 .../Spectator/TestRoutingTableProvider.java     | 177 ---------------
 .../TestRoutingTableProviderWithSourceType.java | 152 -------------
 .../spectator/TestRoutingTableProvider.java     | 176 +++++++++++++++
 ...stRoutingTableProviderFromCurrentStates.java | 162 ++++++++++++++
 .../TestRoutingTableProviderFromTargetEV.java   | 152 +++++++++++++
 12 files changed, 882 insertions(+), 426 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/e5728469/helix-core/src/main/java/org/apache/helix/common/ClusterEventProcessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/common/ClusterEventProcessor.java b/helix-core/src/main/java/org/apache/helix/common/ClusterEventProcessor.java
index 6001edc..e4ceb85 100644
--- a/helix-core/src/main/java/org/apache/helix/common/ClusterEventProcessor.java
+++ b/helix-core/src/main/java/org/apache/helix/common/ClusterEventProcessor.java
@@ -54,4 +54,9 @@ public abstract class ClusterEventProcessor extends Thread {
   public void queueEvent(ClusterEvent event) {
     _eventQueue.put(event);
   }
+
+  public void shutdown() {
+    _eventQueue.clear();
+    this.interrupt();
+  }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/e5728469/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java
index f470272..e3acf14 100644
--- a/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java
@@ -26,7 +26,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.PropertyKey;
-import org.apache.helix.PropertyType;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
@@ -37,28 +36,23 @@ import org.slf4j.LoggerFactory;
  * Cache the basic cluster data, including LiveInstances, InstanceConfigs and ExternalViews.
  */
 public class BasicClusterDataCache {
-  protected final Logger LOG = LoggerFactory.getLogger(this.getClass().getName());
+  private static Logger LOG = LoggerFactory.getLogger(BasicClusterDataCache.class.getName());
 
-  private Map<String, LiveInstance> _liveInstanceMap;
-  private Map<String, InstanceConfig> _instanceConfigMap;
-  private Map<String, ExternalView> _externalViewMap;
-  private final PropertyType _sourceDataType;
+  protected Map<String, LiveInstance> _liveInstanceMap;
+  protected Map<String, InstanceConfig> _instanceConfigMap;
+  protected Map<String, ExternalView> _externalViewMap;
 
   protected String _clusterName;
 
   protected Map<HelixConstants.ChangeType, Boolean> _propertyDataChangedMap;
 
   public BasicClusterDataCache(String clusterName) {
-    this(clusterName, PropertyType.EXTERNALVIEW);
-  }
-
-  public BasicClusterDataCache(String clusterName, PropertyType sourceDataType) {
     _propertyDataChangedMap = new ConcurrentHashMap<>();
     _liveInstanceMap = new HashMap<>();
     _instanceConfigMap = new HashMap<>();
     _externalViewMap = new HashMap<>();
     _clusterName = clusterName;
-    _sourceDataType = sourceDataType;
+    requireFullRefresh();
   }
 
   /**
@@ -69,54 +63,44 @@ public class BasicClusterDataCache {
    * @return
    */
   public synchronized void refresh(HelixDataAccessor accessor) {
-    LOG.info("START: ClusterDataCache.refresh() for cluster " + _clusterName);
+    LOG.info("START: BasicClusterDataCache.refresh() for cluster " + _clusterName);
     long startTime = System.currentTimeMillis();
     PropertyKey.Builder keyBuilder = accessor.keyBuilder();
 
     if (_propertyDataChangedMap.get(HelixConstants.ChangeType.EXTERNAL_VIEW)) {
       long start = System.currentTimeMillis();
       _propertyDataChangedMap.put(HelixConstants.ChangeType.EXTERNAL_VIEW, Boolean.valueOf(false));
-      switch (_sourceDataType) {
-        case EXTERNALVIEW:
-          _externalViewMap = accessor.getChildValuesMap(keyBuilder.externalViews());
-          break;
-        case TARGETEXTERNALVIEW:
-          _externalViewMap = accessor.getChildValuesMap(keyBuilder.targetExternalViews());
-          break;
-        default:
-          break;
-      }
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Reload ExternalViews: " + _externalViewMap.keySet() + ". Takes " + (
+      _externalViewMap = accessor.getChildValuesMap(keyBuilder.externalViews());
+      LOG.info("Reload ExternalViews: " + _externalViewMap.keySet() + ". Takes " + (
             System.currentTimeMillis() - start) + " ms");
-      }
     }
 
     if (_propertyDataChangedMap.get(HelixConstants.ChangeType.LIVE_INSTANCE)) {
+      long start = System.currentTimeMillis();
       _propertyDataChangedMap.put(HelixConstants.ChangeType.LIVE_INSTANCE, Boolean.valueOf(false));
       _liveInstanceMap = accessor.getChildValuesMap(keyBuilder.liveInstances());
-      LOG.debug("Reload LiveInstances: " + _liveInstanceMap.keySet());
+      LOG.info("Reload LiveInstances: " + _liveInstanceMap.keySet() + ". Takes " + (
+          System.currentTimeMillis() - start) + " ms");
     }
 
     if (_propertyDataChangedMap.get(HelixConstants.ChangeType.INSTANCE_CONFIG)) {
+      long start = System.currentTimeMillis();
       _propertyDataChangedMap
           .put(HelixConstants.ChangeType.INSTANCE_CONFIG, Boolean.valueOf(false));
       _instanceConfigMap = accessor.getChildValuesMap(keyBuilder.instanceConfigs());
-      LOG.debug("Reload InstanceConfig: " + _instanceConfigMap.keySet());
+      LOG.info("Reload InstanceConfig: " + _instanceConfigMap.keySet() + ". Takes " + (
+          System.currentTimeMillis() - start) + " ms");
     }
 
     long endTime = System.currentTimeMillis();
     LOG.info(
-        "END: RoutingDataCache.refresh() for cluster " + _clusterName + ", took " + (endTime
+        "END: BasicClusterDataCache.refresh() for cluster " + _clusterName + ", took " + (endTime
             - startTime) + " ms");
 
     if (LOG.isDebugEnabled()) {
-      LOG.debug("LiveInstances: " + _liveInstanceMap.keySet());
-      for (LiveInstance instance : _liveInstanceMap.values()) {
-        LOG.debug("live instance: " + instance.getInstanceName() + " " + instance.getSessionId());
-      }
-      LOG.debug("ExternalViews: " + _externalViewMap.keySet());
-      LOG.debug("InstanceConfigs: " + _instanceConfigMap.keySet());
+      LOG.debug("LiveInstances: " + _liveInstanceMap);
+      LOG.debug("ExternalViews: " + _externalViewMap);
+      LOG.debug("InstanceConfigs: " + _instanceConfigMap);
     }
   }
 
@@ -149,8 +133,20 @@ public class BasicClusterDataCache {
 
   /**
    * Notify the cache that some part of the cluster data has been changed.
+   *
+   * @param changeType
+   * @param pathChanged
+   */
+  public void notifyDataChange(HelixConstants.ChangeType changeType, String pathChanged) {
+    notifyDataChange(changeType);
+  }
+
+  /**
+   * Notify the cache that some part of the cluster data has been changed.
+   *
+   * @param changeType
    */
-  public synchronized void notifyDataChange(HelixConstants.ChangeType changeType) {
+  public void notifyDataChange(HelixConstants.ChangeType changeType) {
     _propertyDataChangedMap.put(changeType, Boolean.valueOf(true));
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/e5728469/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java
index 9ca697f..d19c7e8 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java
@@ -29,6 +29,7 @@ public enum ClusterEventType {
   LiveInstanceChange,
   MessageChange,
   ExternalViewChange,
+  TargetExternalViewChange,
   Resume,
   PeriodicalRebalance,
   StateVerifier,

http://git-wip-us.apache.org/repos/asf/helix/blob/e5728469/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java b/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java
index 332cd8a..ffa2fe7 100644
--- a/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java
@@ -19,24 +19,99 @@ package org.apache.helix.spectator;
  * under the License.
  */
 
+import java.util.Collections;
+import java.util.Map;
 import org.apache.helix.HelixConstants;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyType;
 import org.apache.helix.common.caches.BasicClusterDataCache;
+import org.apache.helix.common.caches.CurrentStateCache;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.LiveInstance;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Cache the cluster data that are needed by RoutingTableProvider.
  */
 public class RoutingDataCache extends BasicClusterDataCache {
+  private static Logger LOG = LoggerFactory.getLogger(RoutingDataCache.class.getName());
+
+  private final PropertyType _sourceDataType;
+  private CurrentStateCache _currentStateCache;
+  private Map<String, ExternalView> _targetExternalViewMap;
+
   public RoutingDataCache(String clusterName, PropertyType sourceDataType) {
-    super(clusterName, sourceDataType);
+    super(clusterName);
+    _sourceDataType = sourceDataType;
+    _currentStateCache = new CurrentStateCache(clusterName);
+    _targetExternalViewMap = Collections.emptyMap();
     requireFullRefresh();
   }
 
   /**
-   * Notify the cache that some part of the cluster data has been changed.
+   * This refreshes the cluster data by re-fetching the data from zookeeper in an efficient way
+   *
+   * @param accessor
+   *
+   * @return
+   */
+  @Override
+  public synchronized void refresh(HelixDataAccessor accessor) {
+    LOG.info("START: RoutingDataCache.refresh() for cluster " + _clusterName);
+    long startTime = System.currentTimeMillis();
+
+    super.refresh(accessor);
+
+    if (_sourceDataType.equals(PropertyType.TARGETEXTERNALVIEW) && _propertyDataChangedMap
+        .get(HelixConstants.ChangeType.TARGET_EXTERNAL_VIEW)) {
+      long start = System.currentTimeMillis();
+      PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+      _propertyDataChangedMap
+          .put(HelixConstants.ChangeType.TARGET_EXTERNAL_VIEW, Boolean.valueOf(false));
+      _targetExternalViewMap = accessor.getChildValuesMap(keyBuilder.targetExternalViews());
+      LOG.info("Reload TargetExternalViews: " + _targetExternalViewMap.keySet() + ". Takes " + (
+          System.currentTimeMillis() - start) + " ms");
+    }
+
+    if (_sourceDataType.equals(PropertyType.CURRENTSTATES) && _propertyDataChangedMap
+        .get(HelixConstants.ChangeType.CURRENT_STATE)) {
+      long start = System.currentTimeMillis();
+      Map<String, LiveInstance> liveInstanceMap = getLiveInstances();
+      _currentStateCache.refresh(accessor, liveInstanceMap);
+      LOG.info("Reload CurrentStates: " + _targetExternalViewMap.keySet() + ". Takes " + (
+          System.currentTimeMillis() - start) + " ms");
+    }
+
+    long endTime = System.currentTimeMillis();
+    LOG.info("END: RoutingDataCache.refresh() for cluster " + _clusterName + ", took " + (endTime
+        - startTime) + " ms");
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("CurrentStates: " + _currentStateCache);
+      LOG.debug("TargetExternalViews: " + _targetExternalViewMap);
+    }
+  }
+
+  /**
+   * Retrieves the TargetExternalView for all resources
+   *
+   * @return
+   */
+  public Map<String, ExternalView> getTargetExternalViews() {
+    return Collections.unmodifiableMap(_targetExternalViewMap);
+  }
+
+  /**
+   * Get map of current states in cluster. {InstanceName -> {SessionId -> {ResourceName ->
+   * CurrentState}}}
+   *
+   * @return
    */
-  public void notifyDataChange(HelixConstants.ChangeType changeType, String pathChanged) {
-    _propertyDataChangedMap.put(changeType, Boolean.valueOf(true));
+  public Map<String, Map<String, Map<String, CurrentState>>> getCurrentStatesMap() {
+    return _currentStateCache.getCurrentStatesMap();
   }
 }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/e5728469/helix-core/src/main/java/org/apache/helix/spectator/RoutingTable.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTable.java b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTable.java
index f704ee4..b705dff 100644
--- a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTable.java
+++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTable.java
@@ -29,6 +29,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
+import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
@@ -58,9 +59,81 @@ class RoutingTable {
       Collection<LiveInstance> liveInstances) {
     _resourceInfoMap = new HashMap<>();
     _resourceGroupInfoMap = new HashMap<>();
+    _liveInstances = new HashSet<>(liveInstances);
+    _instanceConfigs = new HashSet<>(instanceConfigs);
+    refresh(externalViews);
+  }
+
+  public RoutingTable(Map<String, Map<String, Map<String, CurrentState>>> currentStateMap,
+      Collection<InstanceConfig> instanceConfigs, Collection<LiveInstance> liveInstances) {
+    _resourceInfoMap = new HashMap<>();
+    _resourceGroupInfoMap = new HashMap<>();
     _liveInstances = liveInstances;
     _instanceConfigs = instanceConfigs;
-    refresh(externalViews, instanceConfigs);
+    refresh(currentStateMap);
+  }
+
+  private void refresh(Collection<ExternalView> externalViewList) {
+    Map<String, InstanceConfig> instanceConfigMap = new HashMap<>();
+    for (InstanceConfig config : _instanceConfigs) {
+      instanceConfigMap.put(config.getId(), config);
+    }
+    if (externalViewList != null) {
+      for (ExternalView extView : externalViewList) {
+        String resourceName = extView.getId();
+        for (String partitionName : extView.getPartitionSet()) {
+          Map<String, String> stateMap = extView.getStateMap(partitionName);
+          for (String instanceName : stateMap.keySet()) {
+            String currentState = stateMap.get(instanceName);
+            if (instanceConfigMap.containsKey(instanceName)) {
+              InstanceConfig instanceConfig = instanceConfigMap.get(instanceName);
+              if (extView.isGroupRoutingEnabled()) {
+                addEntry(resourceName, extView.getResourceGroupName(),
+                    extView.getInstanceGroupTag(), partitionName, currentState, instanceConfig);
+              } else {
+                addEntry(resourceName, partitionName, currentState, instanceConfig);
+              }
+            } else {
+              logger.error("Invalid instance name. " + instanceName
+                  + " .Not found in /cluster/configs/. instanceName: ");
+            }
+          }
+        }
+      }
+    }
+  }
+
+  private void refresh(Map<String, Map<String, Map<String, CurrentState>>> currentStateMap) {
+    Map<String, InstanceConfig> instanceConfigMap = new HashMap<>();
+    for (InstanceConfig config : _instanceConfigs) {
+      instanceConfigMap.put(config.getId(), config);
+    }
+
+    for (LiveInstance liveInstance : _liveInstances) {
+      String instanceName = liveInstance.getInstanceName();
+      String sessionId = liveInstance.getSessionId();
+      InstanceConfig instanceConfig = instanceConfigMap.get(instanceName);
+      if (instanceConfig == null) {
+        logger.error("Invalid instance name. " + instanceName
+            + " .Not found in /cluster/configs/. instanceName: ");
+      }
+
+      Map<String, CurrentState> currentStates = Collections.emptyMap();
+      if (currentStateMap.containsKey(instanceName) && currentStateMap.get(instanceName)
+          .containsKey(sessionId)) {
+        currentStates = currentStateMap.get(instanceName).get(sessionId);
+      }
+
+      for (CurrentState currentState : currentStates.values()) {
+        String resourceName = currentState.getResourceName();
+        Map<String, String> stateMap = currentState.getPartitionStateMap();
+
+        for (String partitionName : stateMap.keySet()) {
+          String state = stateMap.get(partitionName);
+          addEntry(resourceName, partitionName, state, instanceConfig);
+        }
+      }
+    }
   }
 
   private void addEntry(String resourceName, String partitionName, String state,
@@ -271,37 +344,6 @@ class RoutingTable {
     return Collections.unmodifiableList(instanceList);
   }
 
-  private void refresh(Collection<ExternalView> externalViewList,
-      Collection<InstanceConfig> instanceConfigList) {
-    Map<String, InstanceConfig> instanceConfigMap = new HashMap<>();
-    for (InstanceConfig config : instanceConfigList) {
-      instanceConfigMap.put(config.getId(), config);
-    }
-    if (externalViewList != null) {
-      for (ExternalView extView : externalViewList) {
-        String resourceName = extView.getId();
-        for (String partitionName : extView.getPartitionSet()) {
-          Map<String, String> stateMap = extView.getStateMap(partitionName);
-          for (String instanceName : stateMap.keySet()) {
-            String currentState = stateMap.get(instanceName);
-            if (instanceConfigMap.containsKey(instanceName)) {
-              InstanceConfig instanceConfig = instanceConfigMap.get(instanceName);
-              if (extView.isGroupRoutingEnabled()) {
-                addEntry(resourceName, extView.getResourceGroupName(),
-                    extView.getInstanceGroupTag(), partitionName, currentState, instanceConfig);
-              } else {
-                addEntry(resourceName, partitionName, currentState, instanceConfig);
-              }
-            } else {
-              logger.error("Invalid instance name. " + instanceName
-                  + " .Not found in /cluster/configs/. instanceName: ");
-            }
-          }
-        }
-      }
-    }
-  }
-
   /**
    * Class to store instances, partitions and their states for each resource.
    */

http://git-wip-us.apache.org/repos/asf/helix/blob/e5728469/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
index be85b65..f539ac9 100644
--- a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
@@ -21,7 +21,9 @@ package org.apache.helix.spectator;
 
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -30,6 +32,7 @@ import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
 import org.apache.helix.PropertyType;
 import org.apache.helix.api.listeners.ConfigChangeListener;
+import org.apache.helix.api.listeners.CurrentStateChangeListener;
 import org.apache.helix.api.listeners.InstanceConfigChangeListener;
 import org.apache.helix.api.listeners.ExternalViewChangeListener;
 import org.apache.helix.HelixDataAccessor;
@@ -41,6 +44,7 @@ import org.apache.helix.common.ClusterEventProcessor;
 import org.apache.helix.controller.stages.AttributeName;
 import org.apache.helix.controller.stages.ClusterEvent;
 import org.apache.helix.controller.stages.ClusterEventType;
+import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
@@ -48,7 +52,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class RoutingTableProvider implements ExternalViewChangeListener, InstanceConfigChangeListener,
-    ConfigChangeListener, LiveInstanceChangeListener {
+    ConfigChangeListener, LiveInstanceChangeListener, CurrentStateChangeListener {
   private static final Logger logger = LoggerFactory.getLogger(RoutingTableProvider.class);
   private final AtomicReference<RoutingTable> _routingTableRef;
   private final HelixManager _helixManager;
@@ -63,7 +67,8 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc
     this(helixManager, PropertyType.EXTERNALVIEW);
   }
 
-  public RoutingTableProvider(HelixManager helixManager, PropertyType sourceDataType) throws HelixException {
+  public RoutingTableProvider(HelixManager helixManager, PropertyType sourceDataType)
+      throws HelixException {
     _routingTableRef = new AtomicReference<>(new RoutingTable());
     _helixManager = helixManager;
     _sourceDataType = sourceDataType;
@@ -71,28 +76,77 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc
     _routerUpdater = new RouterUpdater(clusterName, _sourceDataType);
     _routerUpdater.start();
     if (_helixManager != null) {
-      try {
-        switch (_sourceDataType) {
-        case EXTERNALVIEW:
+      switch (_sourceDataType) {
+      case EXTERNALVIEW:
+        try {
           _helixManager.addExternalViewChangeListener(this);
-          break;
-        case TARGETEXTERNALVIEW:
-          // Check whether target external has been enabled or not
-          if (!_helixManager.getHelixDataAccessor().getBaseDataAccessor().exists(
-              _helixManager.getHelixDataAccessor().keyBuilder().targetExternalViews().getPath(),
-              0)) {
-            throw new HelixException("Target External View is not enabled!");
-          }
+        } catch (Exception e) {
+          shutdown();
+          logger.error("Failed to attach ExternalView Listener to HelixManager!");
+          throw new HelixException("Failed to attach ExternalView Listener to HelixManager!", e);
+        }
+        break;
+
+      case TARGETEXTERNALVIEW:
+        // Check whether target external has been enabled or not
+        if (!_helixManager.getHelixDataAccessor().getBaseDataAccessor().exists(
+            _helixManager.getHelixDataAccessor().keyBuilder().targetExternalViews().getPath(), 0)) {
+          shutdown();
+          throw new HelixException("Target External View is not enabled!");
+        }
+
+        try {
           _helixManager.addTargetExternalViewChangeListener(this);
-          break;
-        default:
-          throw new HelixException("Unsupported source data type: " + sourceDataType);
+        } catch (Exception e) {
+          shutdown();
+          logger.error("Failed to attach TargetExternalView Listener to HelixManager!");
+          throw new HelixException("Failed to attach TargetExternalView Listener to HelixManager!", e);
         }
+        break;
+
+      case CURRENTSTATES:
+        // CurrentState change listeners will be added later in LiveInstanceChange call.
+        break;
+
+      default:
+        throw new HelixException("Unsupported source data type: " + sourceDataType);
+      }
+
+      try {
         _helixManager.addInstanceConfigChangeListener(this);
         _helixManager.addLiveInstanceChangeListener(this);
       } catch (Exception e) {
-        logger.error("Failed to attach listeners to HelixManager!");
-        throw new HelixException("Failed to attach listeners to HelixManager!", e);
+        shutdown();
+        logger.error(
+            "Failed to attach InstanceConfig and LiveInstance Change listeners to HelixManager!");
+        throw new HelixException(
+            "Failed to attach InstanceConfig and LiveInstance Change listeners to HelixManager!",
+            e);
+      }
+    }
+  }
+
+  /**
+   * Shutdown current RoutingTableProvider. Once it is shutdown, it should never be reused.
+   */
+  public void shutdown() {
+    _routerUpdater.shutdown();
+    if (_helixManager != null) {
+      PropertyKey.Builder keyBuilder = _helixManager.getHelixDataAccessor().keyBuilder();
+      switch (_sourceDataType) {
+        case EXTERNALVIEW:
+          _helixManager.removeListener(keyBuilder.externalViews(), this);
+          break;
+        case TARGETEXTERNALVIEW:
+          _helixManager.removeListener(keyBuilder.targetExternalViews(), this);
+          break;
+        case CURRENTSTATES:
+          NotificationContext context = new NotificationContext(_helixManager);
+          context.setType(NotificationContext.Type.FINALIZE);
+          updateCurrentStatesListeners(Collections.<LiveInstance>emptyList(), context);
+          break;
+        default:
+          break;
       }
     }
   }
@@ -249,12 +303,21 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc
       return;
     }
     // Refresh with full list of external view.
-    // keep this here for back-compatibility
     if (externalViewList != null && externalViewList.size() > 0) {
+      // keep this here for back-compatibility, application can call onExternalViewChange directly with externalview list supplied.
       refresh(externalViewList, changeContext);
     } else {
-      _routerUpdater.queueEvent(changeContext, ClusterEventType.ExternalViewChange,
-          HelixConstants.ChangeType.EXTERNAL_VIEW);
+      ClusterEventType eventType;
+      if (_sourceDataType.equals(PropertyType.EXTERNALVIEW)) {
+        eventType = ClusterEventType.ExternalViewChange;
+      } else if (_sourceDataType.equals(PropertyType.TARGETEXTERNALVIEW)) {
+        eventType = ClusterEventType.TargetExternalViewChange;
+      } else {
+        logger.warn("onExternalViewChange called with dis-matched change types. Source data type "
+            + _sourceDataType + ", change type: " + changeType);
+        return;
+      }
+      _routerUpdater.queueEvent(changeContext, eventType, changeType);
     }
   }
 
@@ -274,13 +337,91 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc
   }
 
   @Override
-  @PreFetch(enabled = false)
+  @PreFetch(enabled = true)
   public void onLiveInstanceChange(List<LiveInstance> liveInstances,
       NotificationContext changeContext) {
+    if (_sourceDataType.equals(PropertyType.CURRENTSTATES)) {
+      // Go though the live instance list and update CurrentState listeners
+      updateCurrentStatesListeners(liveInstances, changeContext);
+    }
+
     _routerUpdater.queueEvent(changeContext, ClusterEventType.LiveInstanceChange,
         HelixConstants.ChangeType.LIVE_INSTANCE);
   }
 
+  @Override
+  @PreFetch(enabled = false)
+  public void onStateChange(String instanceName,
+      List<CurrentState> statesInfo, NotificationContext changeContext) {
+    if (_sourceDataType.equals(PropertyType.CURRENTSTATES)) {
+      _routerUpdater.queueEvent(changeContext, ClusterEventType.CurrentStateChange,
+          HelixConstants.ChangeType.CURRENT_STATE);
+    } else {
+      logger.warn(
+          "RoutingTableProvider does not use CurrentStates as source, ignore CurrentState changes!");
+    }
+  }
+
+  final AtomicReference<Map<String, LiveInstance>> _lastSeenSessions = new AtomicReference<>();
+
+  /**
+   * Go through all live instances in the cluster, add CurrentStateChange listener to
+   * them if they are newly added, and remove CurrentStateChange listener if instance is offline.
+   */
+  private void updateCurrentStatesListeners(List<LiveInstance> liveInstances,
+      NotificationContext changeContext) {
+    HelixManager manager = changeContext.getManager();
+    PropertyKey.Builder keyBuilder = new PropertyKey.Builder(manager.getClusterName());
+
+    if (changeContext.getType() == NotificationContext.Type.FINALIZE) {
+      // on finalize, should remove all current-state listeners
+      logger.info("remove current-state listeners. lastSeenSessions: " + _lastSeenSessions);
+      liveInstances = Collections.emptyList();
+    }
+
+    Map<String, LiveInstance> curSessions = new HashMap<>();
+    for (LiveInstance liveInstance : liveInstances) {
+      curSessions.put(liveInstance.getSessionId(), liveInstance);
+    }
+
+    // Go though the live instance list and update CurrentState listeners
+    synchronized (_lastSeenSessions) {
+      Map<String, LiveInstance> lastSessions = _lastSeenSessions.get();
+      if (lastSessions == null) {
+        lastSessions = Collections.emptyMap();
+      }
+
+      // add listeners to new live instances
+      for (String session : curSessions.keySet()) {
+        if (!lastSessions.containsKey(session)) {
+          String instanceName = curSessions.get(session).getInstanceName();
+          try {
+            // add current-state listeners for new sessions
+            manager.addCurrentStateChangeListener(this, instanceName, session);
+            logger.info(manager.getInstanceName() + " added current-state listener for instance: "
+                + instanceName + ", session: " + session + ", listener: " + this);
+          } catch (Exception e) {
+            logger.error("Fail to add current state listener for instance: " + instanceName
+                + " with session: " + session, e);
+          }
+        }
+      }
+
+      // remove current-state listener for expired session
+      for (String session : lastSessions.keySet()) {
+        if (!curSessions.containsKey(session)) {
+          String instanceName = lastSessions.get(session).getInstanceName();
+          manager.removeListener(keyBuilder.currentStates(instanceName, session), this);
+          logger.info("remove current-state listener for instance:" + instanceName + ", session: "
+              + session);
+        }
+      }
+
+      // update last-seen
+      _lastSeenSessions.set(curSessions);
+    }
+  }
+
   private void reset() {
     logger.info("Resetting the routing table.");
     RoutingTable newRoutingTable = new RoutingTable();
@@ -298,8 +439,21 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc
 
   public void refresh(Collection<ExternalView> externalViews,
       Collection<InstanceConfig> instanceConfigs, Collection<LiveInstance> liveInstances) {
+    long startTime = System.currentTimeMillis();
     RoutingTable newRoutingTable = new RoutingTable(externalViews, instanceConfigs, liveInstances);
     _routingTableRef.set(newRoutingTable);
+    logger.info("Refreshed the RoutingTable for cluster " + (_helixManager != null ? _helixManager
+        .getClusterName() : null) + ", takes " + (System.currentTimeMillis() - startTime) + "ms.");
+  }
+
+  protected void refresh(Map<String, Map<String, Map<String, CurrentState>>> currentStateMap,
+      Collection<InstanceConfig> instanceConfigs, Collection<LiveInstance> liveInstances) {
+    long startTime = System.currentTimeMillis();
+    RoutingTable newRoutingTable =
+        new RoutingTable(currentStateMap, instanceConfigs, liveInstances);
+    _routingTableRef.set(newRoutingTable);
+    logger.info("Refresh the RoutingTable for cluster " + (_helixManager != null ? _helixManager
+        .getClusterName() : null) + ", takes " + (System.currentTimeMillis() - startTime) + "ms.");
   }
 
   private class RouterUpdater extends ClusterEventProcessor {
@@ -324,8 +478,27 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc
           throw new HelixException("HelixManager is null for router update event.");
         }
         _dataCache.refresh(manager.getHelixDataAccessor());
-        refresh(_dataCache.getExternalViews().values(), _dataCache.getInstanceConfigMap().values(),
-            _dataCache.getLiveInstances().values());
+
+        switch (_sourceDataType) {
+          case EXTERNALVIEW:
+            refresh(_dataCache.getExternalViews().values(),
+                _dataCache.getInstanceConfigMap().values(), _dataCache.getLiveInstances().values());
+            break;
+
+          case TARGETEXTERNALVIEW:
+            refresh(_dataCache.getTargetExternalViews().values(),
+                _dataCache.getInstanceConfigMap().values(), _dataCache.getLiveInstances().values());
+            break;
+
+          case CURRENTSTATES:
+            refresh(_dataCache.getCurrentStatesMap(), _dataCache.getInstanceConfigMap().values(),
+                _dataCache.getLiveInstances().values());
+            break;
+
+          default:
+            logger.warn("Unsupported source data type: " + _sourceDataType
+                + ", stop refreshing the routing table!");
+        }
       }
     }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/e5728469/helix-core/src/test/java/org/apache/helix/MockAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/MockAccessor.java b/helix-core/src/test/java/org/apache/helix/MockAccessor.java
index 583dbd8..14c1845 100644
--- a/helix-core/src/test/java/org/apache/helix/MockAccessor.java
+++ b/helix-core/src/test/java/org/apache/helix/MockAccessor.java
@@ -205,8 +205,11 @@ public class MockAccessor implements HelixDataAccessor {
 
   @Override
   public <T extends HelixProperty> boolean[] setChildren(List<PropertyKey> keys, List<T> children) {
-    // TODO Auto-generated method stub
-    return null;
+    boolean[] results = new boolean[keys.size()];
+    for (int i = 0; i < keys.size(); i++) {
+      results[i] = setProperty(keys.get(i), children.get(i));
+    }
+    return results;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/helix/blob/e5728469/helix-core/src/test/java/org/apache/helix/integration/Spectator/TestRoutingTableProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/Spectator/TestRoutingTableProvider.java b/helix-core/src/test/java/org/apache/helix/integration/Spectator/TestRoutingTableProvider.java
deleted file mode 100644
index 0f72091..0000000
--- a/helix-core/src/test/java/org/apache/helix/integration/Spectator/TestRoutingTableProvider.java
+++ /dev/null
@@ -1,177 +0,0 @@
-package org.apache.helix.integration.Spectator;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import org.apache.helix.HelixManager;
-import org.apache.helix.HelixManagerFactory;
-import org.apache.helix.InstanceType;
-import org.apache.helix.integration.common.ZkIntegrationTestBase;
-import org.apache.helix.integration.manager.ClusterControllerManager;
-import org.apache.helix.integration.manager.MockParticipantManager;
-import org.apache.helix.model.BuiltInStateModelDefinitions;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.InstanceConfig;
-import org.apache.helix.spectator.RoutingTableProvider;
-import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
-import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier;
-import org.mockito.internal.util.collections.Sets;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-public class TestRoutingTableProvider extends ZkIntegrationTestBase {
-
-  static final String STATE_MODEL = BuiltInStateModelDefinitions.MasterSlave.name();
-  static final String TEST_DB = "TestDB";
-  static final String CLASS_NAME = TestRoutingTableProvider.class.getSimpleName();
-  static final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
-  static final int PARTICIPANT_NUMBER = 3;
-  static final int PARTICIPANT_START_PORT = 12918;
-
-  static final int PARTITION_NUMBER = 20;
-  static final int REPLICA_NUMBER = 3;
-
-  private HelixManager _spectator;
-  private List<MockParticipantManager> _participants = new ArrayList<MockParticipantManager>();
-  private List<String> _instances = new ArrayList<>();
-  private ClusterControllerManager _controller;
-  private HelixClusterVerifier _clusterVerifier;
-  private RoutingTableProvider _routingTableProvider;
-  private RoutingTableProvider _routingTableProvider2;
-
-  @BeforeClass
-  public void beforeClass() throws Exception {
-    System.out.println(
-        "START " + getShortClassName() + " at " + new Date(System.currentTimeMillis()));
-
-    // setup storage cluster
-    _gSetupTool.addCluster(CLUSTER_NAME, true);
-
-    for (int i = 0; i < PARTICIPANT_NUMBER; i++) {
-      String instance = PARTICIPANT_PREFIX + "_" + (PARTICIPANT_START_PORT + i);
-      _gSetupTool.addInstanceToCluster(CLUSTER_NAME, instance);
-      _instances.add(instance);
-    }
-
-    // start dummy participants
-    for (int i = 0; i < PARTICIPANT_NUMBER; i++) {
-      MockParticipantManager participant =
-          new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, _instances.get(i));
-      participant.syncStart();
-      _participants.add(participant);
-    }
-
-    createDBInSemiAuto(_gSetupTool, CLUSTER_NAME, TEST_DB, _instances,
-        STATE_MODEL, PARTITION_NUMBER, REPLICA_NUMBER);
-
-    // start controller
-    String controllerName = CONTROLLER_PREFIX + "_0";
-    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
-    _controller.syncStart();
-
-    // start speculator
-    _routingTableProvider = new RoutingTableProvider();
-    _spectator = HelixManagerFactory
-        .getZKHelixManager(CLUSTER_NAME, "spectator", InstanceType.SPECTATOR, ZK_ADDR);
-    _spectator.connect();
-    _spectator.addExternalViewChangeListener(_routingTableProvider);
-    _spectator.addLiveInstanceChangeListener(_routingTableProvider);
-    _spectator.addInstanceConfigChangeListener(_routingTableProvider);
-
-    _routingTableProvider2 = new RoutingTableProvider(_spectator);
-
-    _clusterVerifier = new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkClient(_gZkClient).build();
-    Assert.assertTrue(_clusterVerifier.verify());
-  }
-
-  @AfterClass
-  public void afterClass() {
-    // stop participants
-    for (MockParticipantManager p : _participants) {
-      p.syncStop();
-    }
-    _controller.syncStop();
-    _spectator.disconnect();
-    _gSetupTool.deleteCluster(CLUSTER_NAME);
-  }
-
-  @Test
-  public void testRoutingTable() {
-    Assert.assertEquals(_routingTableProvider.getLiveInstances().size(), _instances.size());
-    Assert.assertEquals(_routingTableProvider.getInstanceConfigs().size(), _instances.size());
-
-    Assert.assertEquals(_routingTableProvider2.getLiveInstances().size(), _instances.size());
-    Assert.assertEquals(_routingTableProvider2.getInstanceConfigs().size(), _instances.size());
-
-    validateRoutingTable(_routingTableProvider, Sets.newSet(_instances.get(0)),
-        Sets.newSet(_instances.get(1), _instances.get(2)));
-    validateRoutingTable(_routingTableProvider2, Sets.newSet(_instances.get(0)),
-        Sets.newSet(_instances.get(1), _instances.get(2)));
-
-    Collection<String> databases = _routingTableProvider.getResources();
-    Assert.assertEquals(databases.size(), 1);
-  }
-
-  @Test(dependsOnMethods = { "testRoutingTable" })
-  public void testDisableInstance() throws InterruptedException {
-    // disable the master instance
-    String prevMasterInstance = _instances.get(0);
-    _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, prevMasterInstance, false);
-    Assert.assertTrue(_clusterVerifier.verify());
-
-    validateRoutingTable(_routingTableProvider, Sets.newSet(_instances.get(1)),
-        Sets.newSet(_instances.get(2)));
-    validateRoutingTable(_routingTableProvider2, Sets.newSet(_instances.get(1)),
-        Sets.newSet(_instances.get(2)));
-  }
-
-  @Test(dependsOnMethods = { "testDisableInstance" })
-  public void testShutdownInstance() throws InterruptedException {
-    // reenable the first instance
-    String prevMasterInstance = _instances.get(0);
-    _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, prevMasterInstance, true);
-
-    // shutdown second instance.
-    _participants.get(1).syncStop();
-
-    Assert.assertTrue(_clusterVerifier.verify());
-
-    Assert.assertEquals(_routingTableProvider.getLiveInstances().size(), _instances.size() - 1);
-    Assert.assertEquals(_routingTableProvider.getInstanceConfigs().size(), _instances.size());
-
-    Assert.assertEquals(_routingTableProvider2.getLiveInstances().size(), _instances.size() - 1);
-    Assert.assertEquals(_routingTableProvider2.getInstanceConfigs().size(), _instances.size());
-
-    validateRoutingTable(_routingTableProvider, Sets.newSet(_instances.get(0)),
-        Sets.newSet(_instances.get(2)));
-    validateRoutingTable(_routingTableProvider2, Sets.newSet(_instances.get(0)),
-        Sets.newSet(_instances.get(2)));
-  }
-
-  private void validateRoutingTable(RoutingTableProvider routingTableProvider,
-      Set<String> masterNodes, Set<String> slaveNodes) {
-    IdealState is =
-        _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, TEST_DB);
-    for (String p : is.getPartitionSet()) {
-      Set<String> masterInstances = new HashSet<>();
-      for (InstanceConfig config : routingTableProvider.getInstances(TEST_DB, p, "MASTER")) {
-        masterInstances.add(config.getInstanceName());
-      }
-
-      Set<String> slaveInstances = new HashSet<>();
-      for (InstanceConfig config : routingTableProvider.getInstances(TEST_DB, p, "SLAVE")) {
-        slaveInstances.add(config.getInstanceName());
-      }
-
-      Assert.assertEquals(masterInstances, masterNodes);
-      Assert.assertEquals(slaveInstances, slaveNodes);
-    }
-  }
-}
-

http://git-wip-us.apache.org/repos/asf/helix/blob/e5728469/helix-core/src/test/java/org/apache/helix/integration/Spectator/TestRoutingTableProviderWithSourceType.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/Spectator/TestRoutingTableProviderWithSourceType.java b/helix-core/src/test/java/org/apache/helix/integration/Spectator/TestRoutingTableProviderWithSourceType.java
deleted file mode 100644
index ddd27e3..0000000
--- a/helix-core/src/test/java/org/apache/helix/integration/Spectator/TestRoutingTableProviderWithSourceType.java
+++ /dev/null
@@ -1,152 +0,0 @@
-package org.apache.helix.integration.Spectator;
-
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-import org.apache.helix.ConfigAccessor;
-import org.apache.helix.HelixException;
-import org.apache.helix.HelixManager;
-import org.apache.helix.HelixManagerFactory;
-import org.apache.helix.InstanceType;
-import org.apache.helix.PropertyType;
-import org.apache.helix.integration.common.ZkIntegrationTestBase;
-import org.apache.helix.integration.manager.ClusterControllerManager;
-import org.apache.helix.integration.manager.MockParticipantManager;
-import org.apache.helix.integration.task.WorkflowGenerator;
-import org.apache.helix.mock.participant.MockDelayMSStateModelFactory;
-import org.apache.helix.model.ClusterConfig;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.InstanceConfig;
-import org.apache.helix.participant.StateMachineEngine;
-import org.apache.helix.spectator.RoutingTableProvider;
-import org.apache.helix.tools.ClusterSetup;
-import org.testng.Assert;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Test;
-
-public class TestRoutingTableProviderWithSourceType extends ZkIntegrationTestBase {
-  private HelixManager _manager;
-  private ClusterSetup _setupTool;
-  private final String MASTER_SLAVE_STATE_MODEL = "MasterSlave";
-  private final int NUM_NODES = 10;
-  protected int NUM_PARTITIONS = 20;
-  protected int NUM_REPLICAS = 3;
-  private final int START_PORT = 12918;
-  private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + getShortClassName();
-  private MockParticipantManager[] _participants;
-  private ClusterControllerManager _controller;
-  private ConfigAccessor _configAccessor;
-
-  @BeforeClass
-  public void beforeClass() throws Exception {
-    String namespace = "/" + CLUSTER_NAME;
-    _participants =  new MockParticipantManager[NUM_NODES];
-    if (_gZkClient.exists(namespace)) {
-      _gZkClient.deleteRecursively(namespace);
-    }
-
-    _setupTool = new ClusterSetup(ZK_ADDR);
-    _setupTool.addCluster(CLUSTER_NAME, true);
-
-    _participants = new MockParticipantManager[NUM_NODES];
-    for (int i = 0; i < NUM_NODES; i++) {
-      String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
-      _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
-    }
-
-    _setupTool.addResourceToCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, NUM_PARTITIONS,
-        MASTER_SLAVE_STATE_MODEL, IdealState.RebalanceMode.FULL_AUTO.name());
-
-    _setupTool
-        .rebalanceStorageCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, NUM_REPLICAS);
-
-    for (int i = 0; i < NUM_NODES; i++) {
-      String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
-      _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
-
-      // add a delayed state model
-      StateMachineEngine stateMachine = _participants[i].getStateMachineEngine();
-      MockDelayMSStateModelFactory delayFactory =
-          new MockDelayMSStateModelFactory().setDelay(-300000L);
-      stateMachine.registerStateModelFactory(MASTER_SLAVE_STATE_MODEL, delayFactory);
-      _participants[i].syncStart();
-    }
-
-    _manager = HelixManagerFactory
-        .getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR, ZK_ADDR);
-    _manager.connect();
-
-    String controllerName = CONTROLLER_PREFIX + "_0";
-    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
-    _controller.syncStart();
-    _configAccessor = new ConfigAccessor(_gZkClient);
-  }
-
-  @AfterClass
-  public void afterClass() throws Exception {
-    _manager.disconnect();
-    for (int i = 0; i < NUM_NODES; i++) {
-      if (_participants[i] != null && _participants[i].isConnected()) {
-        _participants[i].reset();
-      }
-    }
-  }
-
-  @Test (expectedExceptions = HelixException.class)
-  public void testTargetExternalViewWithoutEnable() {
-    new RoutingTableProvider(_manager, PropertyType.TARGETEXTERNALVIEW);
-  }
-
-  @Test
-  public void testExternalViewDoesNotExist() {
-    String resourceName = WorkflowGenerator.DEFAULT_TGT_DB + 1;
-    RoutingTableProvider externalViewProvider =
-        new RoutingTableProvider(_manager, PropertyType.EXTERNALVIEW);
-    Assert.assertEquals(externalViewProvider.getInstancesForResource(resourceName, "SLAVE").size(),
-        0);
-  }
-
-  @Test (dependsOnMethods = "testTargetExternalViewWithoutEnable")
-  public void testExternalViewDiffFromTargetExternalView() throws InterruptedException {
-    ClusterConfig clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME);
-    clusterConfig.enableTargetExternalView(true);
-    clusterConfig.setPersistBestPossibleAssignment(true);
-    _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
-    Thread.sleep(2000);
-
-    RoutingTableProvider externalViewProvider =
-        new RoutingTableProvider(_manager, PropertyType.EXTERNALVIEW);
-    RoutingTableProvider targetExternalViewProvider =
-        new RoutingTableProvider(_manager, PropertyType.TARGETEXTERNALVIEW);
-
-    // ExternalView should not contain any MASTERS
-    // TargetExternalView should contain MASTERS same as the partition number
-    Set<InstanceConfig> externalViewMasters =
-        externalViewProvider.getInstancesForResource(WorkflowGenerator.DEFAULT_TGT_DB, "MASTER");
-    Assert.assertEquals(externalViewMasters.size(), 0);
-    Set<InstanceConfig> targetExternalViewMasters = targetExternalViewProvider
-        .getInstancesForResource(WorkflowGenerator.DEFAULT_TGT_DB, "MASTER");
-    Assert.assertEquals(targetExternalViewMasters.size(), NUM_NODES);
-
-    // TargetExternalView MASTERS mapping should exactly match IdealState MASTERS mapping
-    Map<String, Map<String, String>> stateMap = _setupTool.getClusterManagementTool()
-        .getResourceIdealState(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB).getRecord()
-        .getMapFields();
-
-    Set<String> idealMasters = new HashSet<>();
-    Set<String> targetMasters = new HashSet<>();
-    for (Map<String, String> instanceMap : stateMap.values()) {
-      for (String instance : instanceMap.keySet()) {
-        if (instanceMap.get(instance).equals("MASTER")) {
-          idealMasters.add(instance);
-        }
-      }
-    }
-
-    for (InstanceConfig instanceConfig : targetExternalViewMasters) {
-      targetMasters.add(instanceConfig.getInstanceName());
-    }
-    Assert.assertTrue(idealMasters.equals(targetMasters));
-  }
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/e5728469/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProvider.java b/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProvider.java
new file mode 100644
index 0000000..eb8f4b1
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProvider.java
@@ -0,0 +1,176 @@
+package org.apache.helix.integration.spectator;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.integration.common.ZkIntegrationTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.spectator.RoutingTableProvider;
+import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier;
+import org.mockito.internal.util.collections.Sets;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestRoutingTableProvider extends ZkIntegrationTestBase {
+
+  static final String STATE_MODEL = BuiltInStateModelDefinitions.MasterSlave.name();
+  static final String TEST_DB = "TestDB";
+  static final String CLASS_NAME = TestRoutingTableProvider.class.getSimpleName();
+  static final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
+  static final int PARTICIPANT_NUMBER = 3;
+  static final int PARTICIPANT_START_PORT = 12918;
+
+  static final int PARTITION_NUMBER = 20;
+  static final int REPLICA_NUMBER = 3;
+
+  private HelixManager _spectator;
+  private List<MockParticipantManager> _participants = new ArrayList<MockParticipantManager>();
+  private List<String> _instances = new ArrayList<>();
+  private ClusterControllerManager _controller;
+  private HelixClusterVerifier _clusterVerifier;
+  private RoutingTableProvider _routingTableProvider;
+  private RoutingTableProvider _routingTableProvider2;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    System.out.println(
+        "START " + getShortClassName() + " at " + new Date(System.currentTimeMillis()));
+
+    // setup storage cluster
+    _gSetupTool.addCluster(CLUSTER_NAME, true);
+
+    for (int i = 0; i < PARTICIPANT_NUMBER; i++) {
+      String instance = PARTICIPANT_PREFIX + "_" + (PARTICIPANT_START_PORT + i);
+      _gSetupTool.addInstanceToCluster(CLUSTER_NAME, instance);
+      _instances.add(instance);
+    }
+
+    // start dummy participants
+    for (int i = 0; i < PARTICIPANT_NUMBER; i++) {
+      MockParticipantManager participant =
+          new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, _instances.get(i));
+      participant.syncStart();
+      _participants.add(participant);
+    }
+
+    createDBInSemiAuto(_gSetupTool, CLUSTER_NAME, TEST_DB, _instances,
+        STATE_MODEL, PARTITION_NUMBER, REPLICA_NUMBER);
+
+    // start controller
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+    _controller.syncStart();
+
+    // start speculator
+    _routingTableProvider = new RoutingTableProvider();
+    _spectator = HelixManagerFactory
+        .getZKHelixManager(CLUSTER_NAME, "spectator", InstanceType.SPECTATOR, ZK_ADDR);
+    _spectator.connect();
+    _spectator.addExternalViewChangeListener(_routingTableProvider);
+    _spectator.addLiveInstanceChangeListener(_routingTableProvider);
+    _spectator.addInstanceConfigChangeListener(_routingTableProvider);
+
+    _routingTableProvider2 = new RoutingTableProvider(_spectator);
+
+    _clusterVerifier = new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkClient(_gZkClient).build();
+    Assert.assertTrue(_clusterVerifier.verify());
+  }
+
+  @AfterClass
+  public void afterClass() {
+    // stop participants
+    for (MockParticipantManager p : _participants) {
+      p.syncStop();
+    }
+    _controller.syncStop();
+    _spectator.disconnect();
+    _gSetupTool.deleteCluster(CLUSTER_NAME);
+  }
+
+  @Test
+  public void testRoutingTable() {
+    Assert.assertEquals(_routingTableProvider.getLiveInstances().size(), _instances.size());
+    Assert.assertEquals(_routingTableProvider.getInstanceConfigs().size(), _instances.size());
+
+    Assert.assertEquals(_routingTableProvider2.getLiveInstances().size(), _instances.size());
+    Assert.assertEquals(_routingTableProvider2.getInstanceConfigs().size(), _instances.size());
+
+    validateRoutingTable(_routingTableProvider, Sets.newSet(_instances.get(0)),
+        Sets.newSet(_instances.get(1), _instances.get(2)));
+    validateRoutingTable(_routingTableProvider2, Sets.newSet(_instances.get(0)),
+        Sets.newSet(_instances.get(1), _instances.get(2)));
+
+    Collection<String> databases = _routingTableProvider.getResources();
+    Assert.assertEquals(databases.size(), 1);
+  }
+
+  @Test(dependsOnMethods = { "testRoutingTable" })
+  public void testDisableInstance() throws InterruptedException {
+    // disable the master instance
+    String prevMasterInstance = _instances.get(0);
+    _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, prevMasterInstance, false);
+    Assert.assertTrue(_clusterVerifier.verify());
+
+    validateRoutingTable(_routingTableProvider, Sets.newSet(_instances.get(1)),
+        Sets.newSet(_instances.get(2)));
+    validateRoutingTable(_routingTableProvider2, Sets.newSet(_instances.get(1)),
+        Sets.newSet(_instances.get(2)));
+  }
+
+  @Test(dependsOnMethods = { "testDisableInstance" })
+  public void testShutdownInstance() throws InterruptedException {
+    // reenable the first instance
+    String prevMasterInstance = _instances.get(0);
+    _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, prevMasterInstance, true);
+
+    // shutdown second instance.
+    _participants.get(1).syncStop();
+
+    Assert.assertTrue(_clusterVerifier.verify());
+
+    Assert.assertEquals(_routingTableProvider.getLiveInstances().size(), _instances.size() - 1);
+    Assert.assertEquals(_routingTableProvider.getInstanceConfigs().size(), _instances.size());
+
+    Assert.assertEquals(_routingTableProvider2.getLiveInstances().size(), _instances.size() - 1);
+    Assert.assertEquals(_routingTableProvider2.getInstanceConfigs().size(), _instances.size());
+
+    validateRoutingTable(_routingTableProvider, Sets.newSet(_instances.get(0)),
+        Sets.newSet(_instances.get(2)));
+    validateRoutingTable(_routingTableProvider2, Sets.newSet(_instances.get(0)),
+        Sets.newSet(_instances.get(2)));
+  }
+
+  private void validateRoutingTable(RoutingTableProvider routingTableProvider,
+      Set<String> masterNodes, Set<String> slaveNodes) {
+    IdealState is =
+        _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, TEST_DB);
+    for (String p : is.getPartitionSet()) {
+      Set<String> masterInstances = new HashSet<>();
+      for (InstanceConfig config : routingTableProvider.getInstances(TEST_DB, p, "MASTER")) {
+        masterInstances.add(config.getInstanceName());
+      }
+
+      Set<String> slaveInstances = new HashSet<>();
+      for (InstanceConfig config : routingTableProvider.getInstances(TEST_DB, p, "SLAVE")) {
+        slaveInstances.add(config.getInstanceName());
+      }
+
+      Assert.assertEquals(masterInstances, masterNodes);
+      Assert.assertEquals(slaveInstances, slaveNodes);
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/helix/blob/e5728469/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderFromCurrentStates.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderFromCurrentStates.java b/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderFromCurrentStates.java
new file mode 100644
index 0000000..6187cff
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderFromCurrentStates.java
@@ -0,0 +1,162 @@
+package org.apache.helix.integration.spectator;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyType;
+import org.apache.helix.integration.common.ZkIntegrationTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.spectator.RoutingTableProvider;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestRoutingTableProviderFromCurrentStates extends ZkIntegrationTestBase {
+  private HelixManager _manager;
+  private ClusterSetup _setupTool;
+  private final int NUM_NODES = 10;
+  protected int NUM_PARTITIONS = 20;
+  protected int NUM_REPLICAS = 3;
+  private final int START_PORT = 12918;
+  private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + getShortClassName();
+  private MockParticipantManager[] _participants;
+  private ClusterControllerManager _controller;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    String namespace = "/" + CLUSTER_NAME;
+    _participants =  new MockParticipantManager[NUM_NODES];
+    if (_gZkClient.exists(namespace)) {
+      _gZkClient.deleteRecursively(namespace);
+    }
+
+    _setupTool = new ClusterSetup(ZK_ADDR);
+    _setupTool.addCluster(CLUSTER_NAME, true);
+
+    _participants = new MockParticipantManager[NUM_NODES];
+    for (int i = 0; i < NUM_NODES; i++) {
+      String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+    }
+
+    for (int i = 0; i < NUM_NODES; i++) {
+      String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+      _participants[i].syncStart();
+    }
+
+    _manager = HelixManagerFactory
+        .getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR, ZK_ADDR);
+    _manager.connect();
+
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+    _controller.syncStart();
+
+    ConfigAccessor _configAccessor = _manager.getConfigAccessor();
+    ClusterConfig clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME);
+    clusterConfig.enableTargetExternalView(true);
+    _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+  }
+
+  @AfterClass
+  public void afterClass() throws Exception {
+    _manager.disconnect();
+    for (int i = 0; i < NUM_NODES; i++) {
+      if (_participants[i] != null && _participants[i].isConnected()) {
+        _participants[i].reset();
+      }
+    }
+  }
+
+  @Test
+  public void testRoutingTableWithCurrentStates() throws InterruptedException {
+    RoutingTableProvider routingTableEV =
+        new RoutingTableProvider(_manager, PropertyType.EXTERNALVIEW);
+    RoutingTableProvider routingTableCurrentStates = new RoutingTableProvider(_manager, PropertyType.CURRENTSTATES);
+
+    String db1 = "TestDB-1";
+    _setupTool.addResourceToCluster(CLUSTER_NAME, db1, NUM_PARTITIONS, "MasterSlave",
+        IdealState.RebalanceMode.FULL_AUTO.name());
+    _setupTool.rebalanceStorageCluster(CLUSTER_NAME, db1, NUM_REPLICAS);
+
+    Thread.sleep(200);
+    HelixClusterVerifier clusterVerifier =
+        new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build();
+    Assert.assertTrue(clusterVerifier.verify());
+
+    IdealState idealState1 = _setupTool.getClusterManagementTool().getResourceIdealState(
+        CLUSTER_NAME, db1);
+    validate(idealState1, routingTableEV, routingTableCurrentStates);
+
+    // add new DB
+    String db2 = "TestDB-2";
+    _setupTool.addResourceToCluster(CLUSTER_NAME, db2, NUM_PARTITIONS, "MasterSlave",
+        IdealState.RebalanceMode.FULL_AUTO.name());
+    _setupTool.rebalanceStorageCluster(CLUSTER_NAME, db2, NUM_REPLICAS);
+
+    Thread.sleep(200);
+    Assert.assertTrue(clusterVerifier.verify());
+
+    IdealState idealState2 = _setupTool.getClusterManagementTool().getResourceIdealState(
+        CLUSTER_NAME, db2);
+    validate(idealState2, routingTableEV, routingTableCurrentStates);
+
+    // shutdown an instance
+    _participants[0].syncStop();
+    Thread.sleep(200);
+    Assert.assertTrue(clusterVerifier.verify());
+    validate(idealState1, routingTableEV, routingTableCurrentStates);
+    validate(idealState2, routingTableEV, routingTableCurrentStates);
+  }
+
+  @Test (dependsOnMethods = {"testRoutingTableWithCurrentStates"})
+  public void testWithSupportSourceDataType() {
+    new RoutingTableProvider(_manager, PropertyType.EXTERNALVIEW);
+    new RoutingTableProvider(_manager, PropertyType.TARGETEXTERNALVIEW);
+    new RoutingTableProvider(_manager, PropertyType.CURRENTSTATES);
+
+    try {
+      new RoutingTableProvider(_manager, PropertyType.IDEALSTATES);
+      Assert.fail();
+    } catch (HelixException ex) {
+      Assert.assertTrue(ex.getMessage().contains("Unsupported source data type"));
+    }
+  }
+
+  private void validate(IdealState idealState, RoutingTableProvider routingTableEV,
+      RoutingTableProvider routingTableCurrentStates) {
+    String db = idealState.getResourceName();
+    Set<String> partitions = idealState.getPartitionSet();
+    for (String partition : partitions) {
+      List<InstanceConfig> masterInsEv =
+          routingTableEV.getInstancesForResource(db, partition, "MASTER");
+      List<InstanceConfig> masterInsCs =
+          routingTableCurrentStates.getInstancesForResource(db, partition, "MASTER");
+      Assert.assertEquals(masterInsEv.size(), 1);
+      Assert.assertEquals(masterInsCs.size(), 1);
+      Assert.assertEquals(masterInsCs, masterInsEv);
+
+      List<InstanceConfig> slaveInsEv =
+          routingTableEV.getInstancesForResource(db, partition, "SLAVE");
+      List<InstanceConfig> slaveInsCs =
+          routingTableCurrentStates.getInstancesForResource(db, partition, "SLAVE");
+      Assert.assertEquals(slaveInsEv.size(), 2);
+      Assert.assertEquals(slaveInsCs.size(), 2);
+      Assert.assertEquals(new HashSet(slaveInsCs), new HashSet(slaveInsEv));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/e5728469/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderFromTargetEV.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderFromTargetEV.java b/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderFromTargetEV.java
new file mode 100644
index 0000000..fbf17cf
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderFromTargetEV.java
@@ -0,0 +1,152 @@
+package org.apache.helix.integration.spectator;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyType;
+import org.apache.helix.integration.common.ZkIntegrationTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.integration.task.WorkflowGenerator;
+import org.apache.helix.mock.participant.MockDelayMSStateModelFactory;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.spectator.RoutingTableProvider;
+import org.apache.helix.tools.ClusterSetup;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestRoutingTableProviderFromTargetEV extends ZkIntegrationTestBase {
+  private HelixManager _manager;
+  private ClusterSetup _setupTool;
+  private final String MASTER_SLAVE_STATE_MODEL = "MasterSlave";
+  private final int NUM_NODES = 10;
+  protected int NUM_PARTITIONS = 20;
+  protected int NUM_REPLICAS = 3;
+  private final int START_PORT = 12918;
+  private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + getShortClassName();
+  private MockParticipantManager[] _participants;
+  private ClusterControllerManager _controller;
+  private ConfigAccessor _configAccessor;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    String namespace = "/" + CLUSTER_NAME;
+    _participants =  new MockParticipantManager[NUM_NODES];
+    if (_gZkClient.exists(namespace)) {
+      _gZkClient.deleteRecursively(namespace);
+    }
+
+    _setupTool = new ClusterSetup(ZK_ADDR);
+    _setupTool.addCluster(CLUSTER_NAME, true);
+
+    _participants = new MockParticipantManager[NUM_NODES];
+    for (int i = 0; i < NUM_NODES; i++) {
+      String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+    }
+
+    _setupTool.addResourceToCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, NUM_PARTITIONS,
+        MASTER_SLAVE_STATE_MODEL, IdealState.RebalanceMode.FULL_AUTO.name());
+
+    _setupTool
+        .rebalanceStorageCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, NUM_REPLICAS);
+
+    for (int i = 0; i < NUM_NODES; i++) {
+      String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+
+      // add a delayed state model
+      StateMachineEngine stateMachine = _participants[i].getStateMachineEngine();
+      MockDelayMSStateModelFactory delayFactory =
+          new MockDelayMSStateModelFactory().setDelay(-300000L);
+      stateMachine.registerStateModelFactory(MASTER_SLAVE_STATE_MODEL, delayFactory);
+      _participants[i].syncStart();
+    }
+
+    _manager = HelixManagerFactory
+        .getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR, ZK_ADDR);
+    _manager.connect();
+
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+    _controller.syncStart();
+    _configAccessor = new ConfigAccessor(_gZkClient);
+  }
+
+  @AfterClass
+  public void afterClass() throws Exception {
+    _manager.disconnect();
+    for (int i = 0; i < NUM_NODES; i++) {
+      if (_participants[i] != null && _participants[i].isConnected()) {
+        _participants[i].reset();
+      }
+    }
+  }
+
+  @Test (expectedExceptions = HelixException.class)
+  public void testTargetExternalViewWithoutEnable() {
+    new RoutingTableProvider(_manager, PropertyType.TARGETEXTERNALVIEW);
+  }
+
+  @Test
+  public void testExternalViewDoesNotExist() {
+    String resourceName = WorkflowGenerator.DEFAULT_TGT_DB + 1;
+    RoutingTableProvider externalViewProvider =
+        new RoutingTableProvider(_manager, PropertyType.EXTERNALVIEW);
+    Assert.assertEquals(externalViewProvider.getInstancesForResource(resourceName, "SLAVE").size(),
+        0);
+  }
+
+  @Test (dependsOnMethods = "testTargetExternalViewWithoutEnable")
+  public void testExternalViewDiffFromTargetExternalView() throws InterruptedException {
+    ClusterConfig clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME);
+    clusterConfig.enableTargetExternalView(true);
+    clusterConfig.setPersistBestPossibleAssignment(true);
+    _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+    Thread.sleep(2000);
+
+    RoutingTableProvider externalViewProvider =
+        new RoutingTableProvider(_manager, PropertyType.EXTERNALVIEW);
+    RoutingTableProvider targetExternalViewProvider =
+        new RoutingTableProvider(_manager, PropertyType.TARGETEXTERNALVIEW);
+
+    // ExternalView should not contain any MASTERS
+    // TargetExternalView should contain MASTERS same as the partition number
+    Set<InstanceConfig> externalViewMasters =
+        externalViewProvider.getInstancesForResource(WorkflowGenerator.DEFAULT_TGT_DB, "MASTER");
+    Assert.assertEquals(externalViewMasters.size(), 0);
+    Set<InstanceConfig> targetExternalViewMasters = targetExternalViewProvider
+        .getInstancesForResource(WorkflowGenerator.DEFAULT_TGT_DB, "MASTER");
+    Assert.assertEquals(targetExternalViewMasters.size(), NUM_NODES);
+
+    // TargetExternalView MASTERS mapping should exactly match IdealState MASTERS mapping
+    Map<String, Map<String, String>> stateMap = _setupTool.getClusterManagementTool()
+        .getResourceIdealState(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB).getRecord()
+        .getMapFields();
+
+    Set<String> idealMasters = new HashSet<>();
+    Set<String> targetMasters = new HashSet<>();
+    for (Map<String, String> instanceMap : stateMap.values()) {
+      for (String instance : instanceMap.keySet()) {
+        if (instanceMap.get(instance).equals("MASTER")) {
+          idealMasters.add(instance);
+        }
+      }
+    }
+
+    for (InstanceConfig instanceConfig : targetExternalViewMasters) {
+      targetMasters.add(instanceConfig.getInstanceName());
+    }
+    Assert.assertTrue(idealMasters.equals(targetMasters));
+  }
+}


[6/6] helix git commit: Add RoutingTableSnapshot class to hold a snapshot of routing table information and provide API to return RoutingTableSnapshot from RoutingTableProvider.

Posted by lx...@apache.org.
Add RoutingTableSnapshot class to hold a snapshot of routing table information and provide API to return RoutingTableSnapshot from RoutingTableProvider.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/73d243fd
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/73d243fd
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/73d243fd

Branch: refs/heads/master
Commit: 73d243fd1fd3846731aa02fb9d29ed09af73a378
Parents: e572846
Author: Lei Xia <lx...@linkedin.com>
Authored: Thu Feb 15 16:12:22 2018 -0800
Committer: Lei Xia <lx...@linkedin.com>
Committed: Fri Mar 23 12:12:44 2018 -0700

----------------------------------------------------------------------
 .../helix/spectator/RoutingDataCache.java       |   2 +-
 .../helix/spectator/RoutingTableProvider.java   |  10 ++
 .../helix/spectator/RoutingTableSnapshot.java   | 151 +++++++++++++++++++
 .../spectator/TestRoutingTableSnapshot.java     | 140 +++++++++++++++++
 4 files changed, 302 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/73d243fd/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java b/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java
index ffa2fe7..17c83b3 100644
--- a/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java
@@ -36,7 +36,7 @@ import org.slf4j.LoggerFactory;
 /**
  * Cache the cluster data that are needed by RoutingTableProvider.
  */
-public class RoutingDataCache extends BasicClusterDataCache {
+class RoutingDataCache extends BasicClusterDataCache {
   private static Logger LOG = LoggerFactory.getLogger(RoutingDataCache.class.getName());
 
   private final PropertyType _sourceDataType;

http://git-wip-us.apache.org/repos/asf/helix/blob/73d243fd/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
index f539ac9..3e01ea4 100644
--- a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
@@ -152,6 +152,16 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc
   }
 
   /**
+   * Get an snapshot of current RoutingTable information. The snapshot is immutable, it reflects the
+   * routing table information at the time this method is called.
+   *
+   * @return snapshot of current routing table.
+   */
+  public RoutingTableSnapshot getRoutingTableSnapshot() {
+    return new RoutingTableSnapshot(_routingTableRef.get());
+  }
+
+  /**
    * returns the instances for {resource,partition} pair that are in a specific
    * {state}
    *

http://git-wip-us.apache.org/repos/asf/helix/blob/73d243fd/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableSnapshot.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableSnapshot.java b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableSnapshot.java
new file mode 100644
index 0000000..1592fbe
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableSnapshot.java
@@ -0,0 +1,151 @@
+package org.apache.helix.spectator;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+
+/**
+ * The snapshot of RoutingTable information.  It is immutable, it reflects the routing table
+ * information at the time it is generated.
+ */
+public class RoutingTableSnapshot {
+  private final RoutingTable _routingTable;
+
+  public RoutingTableSnapshot(RoutingTable routingTable) {
+    _routingTable = routingTable;
+  }
+
+  /**
+   * returns all instances for {resource} that are in a specific {state}.
+   *
+   * @param resourceName
+   * @param state
+   *
+   * @return empty list if there is no instance in a given state
+   */
+  public Set<InstanceConfig> getInstancesForResource(String resourceName, String state) {
+    return _routingTable.getInstancesForResource(resourceName, state);
+  }
+
+  /**
+   * returns the instances for {resource,partition} pair that are in a specific {state}
+   *
+   * @param resourceName
+   * @param partitionName
+   * @param state
+   *
+   * @return empty list if there is no instance in a given state
+   */
+  public List<InstanceConfig> getInstancesForResource(String resourceName, String partitionName,
+      String state) {
+    return _routingTable.getInstancesForResource(resourceName, partitionName, state);
+  }
+
+  /**
+   * returns all instances for resources contains any given tags in {resource group} that are in a
+   * specific {state}
+   *
+   * @param resourceGroupName
+   * @param state
+   *
+   * @return empty list if there is no instance in a given state
+   */
+  public Set<InstanceConfig> getInstancesForResourceGroup(String resourceGroupName, String state,
+      List<String> resourceTags) {
+    return _routingTable.getInstancesForResourceGroup(resourceGroupName, state, resourceTags);
+  }
+
+  /**
+   * returns all instances for all resources in {resource group} that are in a specific {state}
+   *
+   * @param resourceGroupName
+   * @param state
+   *
+   * @return empty set if there is no instance in a given state
+   */
+  public Set<InstanceConfig> getInstancesForResourceGroup(String resourceGroupName, String state) {
+    return _routingTable.getInstancesForResourceGroup(resourceGroupName, state);
+  }
+
+  /**
+   * returns the instances for {resource group,partition} pair in all resources belongs to the given
+   * resource group that are in a specific {state}.
+   * The return results aggregate all partition states from all the resources in the given resource
+   * group.
+   *
+   * @param resourceGroupName
+   * @param partitionName
+   * @param state
+   *
+   * @return empty list if there is no instance in a given state
+   */
+  public List<InstanceConfig> getInstancesForResourceGroup(String resourceGroupName,
+      String partitionName, String state) {
+    return _routingTable.getInstancesForResourceGroup(resourceGroupName, partitionName, state);
+  }
+
+  /**
+   * returns the instances for {resource group,partition} pair contains any of the given tags that
+   * are in a specific {state}.
+   * Find all resources belongs to the given resource group that have any of the given resource tags
+   * and return the aggregated partition states from all these resources.
+   *
+   * @param resourceGroupName
+   * @param partitionName
+   * @param state
+   * @param resourceTags
+   *
+   * @return empty list if there is no instance in a given state
+   */
+  public List<InstanceConfig> getInstancesForResourceGroup(String resourceGroupName,
+      String partitionName, String state, List<String> resourceTags) {
+    return _routingTable
+        .getInstancesForResourceGroup(resourceGroupName, partitionName, state, resourceTags);
+  }
+
+  /**
+   * Return all liveInstances in the cluster now.
+   *
+   * @return
+   */
+  public Collection<LiveInstance> getLiveInstances() {
+    return _routingTable.getLiveInstances();
+  }
+
+  /**
+   * Return all instance's config in this cluster.
+   *
+   * @return
+   */
+  public Collection<InstanceConfig> getInstanceConfigs() {
+    return _routingTable.getInstanceConfigs();
+  }
+
+  /**
+   * Return names of all resources (shown in ExternalView) in this cluster.
+   */
+  public Collection<String> getResources() {
+    return _routingTable.getResources();
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/73d243fd/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableSnapshot.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableSnapshot.java b/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableSnapshot.java
new file mode 100644
index 0000000..d556b7a
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableSnapshot.java
@@ -0,0 +1,140 @@
+package org.apache.helix.integration.spectator;
+
+import java.util.List;
+import java.util.Set;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyType;
+import org.apache.helix.integration.common.ZkIntegrationTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.spectator.RoutingTableProvider;
+import org.apache.helix.spectator.RoutingTableSnapshot;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestRoutingTableSnapshot extends ZkIntegrationTestBase {
+  private HelixManager _manager;
+  private ClusterSetup _setupTool;
+  private final int NUM_NODES = 10;
+  protected int NUM_PARTITIONS = 20;
+  protected int NUM_REPLICAS = 3;
+  private final int START_PORT = 12918;
+  private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + getShortClassName();
+  private MockParticipantManager[] _participants;
+  private ClusterControllerManager _controller;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    String namespace = "/" + CLUSTER_NAME;
+    _participants =  new MockParticipantManager[NUM_NODES];
+    if (_gZkClient.exists(namespace)) {
+      _gZkClient.deleteRecursively(namespace);
+    }
+
+    _setupTool = new ClusterSetup(ZK_ADDR);
+    _setupTool.addCluster(CLUSTER_NAME, true);
+
+    _participants = new MockParticipantManager[NUM_NODES];
+    for (int i = 0; i < NUM_NODES; i++) {
+      String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+    }
+
+    for (int i = 0; i < NUM_NODES; i++) {
+      String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+      _participants[i].syncStart();
+    }
+
+    _manager = HelixManagerFactory
+        .getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR, ZK_ADDR);
+    _manager.connect();
+
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+    _controller.syncStart();
+  }
+
+  @AfterClass
+  public void afterClass() throws Exception {
+    _manager.disconnect();
+    for (int i = 0; i < NUM_NODES; i++) {
+      if (_participants[i] != null && _participants[i].isConnected()) {
+        _participants[i].reset();
+      }
+    }
+  }
+
+  @Test
+  public void testRoutingTableSnapshot() throws InterruptedException {
+    RoutingTableProvider routingTableProvider =
+        new RoutingTableProvider(_manager, PropertyType.EXTERNALVIEW);
+
+    String db1 = "TestDB-1";
+    _setupTool.addResourceToCluster(CLUSTER_NAME, db1, NUM_PARTITIONS, "MasterSlave",
+        IdealState.RebalanceMode.FULL_AUTO.name());
+    _setupTool.rebalanceStorageCluster(CLUSTER_NAME, db1, NUM_REPLICAS);
+
+    Thread.sleep(200);
+    HelixClusterVerifier clusterVerifier =
+        new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build();
+    Assert.assertTrue(clusterVerifier.verify());
+
+    IdealState idealState1 = _setupTool.getClusterManagementTool().getResourceIdealState(
+        CLUSTER_NAME, db1);
+
+    RoutingTableSnapshot routingTableSnapshot = routingTableProvider.getRoutingTableSnapshot();
+    validateMapping(idealState1, routingTableSnapshot);
+
+    Assert.assertEquals(routingTableSnapshot.getInstanceConfigs().size(), NUM_NODES);
+    Assert.assertEquals(routingTableSnapshot.getResources().size(), 1);
+    Assert.assertEquals(routingTableSnapshot.getLiveInstances().size(), NUM_NODES);
+
+    // add new DB and shutdown an instance
+    String db2 = "TestDB-2";
+    _setupTool.addResourceToCluster(CLUSTER_NAME, db2, NUM_PARTITIONS, "MasterSlave",
+        IdealState.RebalanceMode.FULL_AUTO.name());
+    _setupTool.rebalanceStorageCluster(CLUSTER_NAME, db2, NUM_REPLICAS);
+
+    // shutdown an instance
+    _participants[0].syncStop();
+    Thread.sleep(200);
+    Assert.assertTrue(clusterVerifier.verify());
+
+    // the original snapshot should not change
+    Assert.assertEquals(routingTableSnapshot.getInstanceConfigs().size(), NUM_NODES);
+    Assert.assertEquals(routingTableSnapshot.getResources().size(), 1);
+    Assert.assertEquals(routingTableSnapshot.getLiveInstances().size(), NUM_NODES);
+
+    RoutingTableSnapshot newRoutingTableSnapshot = routingTableProvider.getRoutingTableSnapshot();
+
+    Assert.assertEquals(newRoutingTableSnapshot.getInstanceConfigs().size(), NUM_NODES);
+    Assert.assertEquals(newRoutingTableSnapshot.getResources().size(), 2);
+    Assert.assertEquals(newRoutingTableSnapshot.getLiveInstances().size(), NUM_NODES - 1);
+  }
+
+  private void validateMapping(IdealState idealState, RoutingTableSnapshot routingTableSnapshot) {
+    String db = idealState.getResourceName();
+    Set<String> partitions = idealState.getPartitionSet();
+    for (String partition : partitions) {
+      List<InstanceConfig> masterInsEv =
+          routingTableSnapshot.getInstancesForResource(db, partition, "MASTER");
+      Assert.assertEquals(masterInsEv.size(), 1);
+
+      List<InstanceConfig> slaveInsEv =
+          routingTableSnapshot.getInstancesForResource(db, partition, "SLAVE");
+      Assert.assertEquals(slaveInsEv.size(), 2);
+    }
+  }
+}
+


[5/6] helix git commit: Wrap debug log statement in CallbackHandler and ZkClient to reduce GC pressure.

Posted by lx...@apache.org.
Wrap debug log statement in CallbackHandler and ZkClient to reduce GC pressure.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/90730bff
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/90730bff
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/90730bff

Branch: refs/heads/master
Commit: 90730bff49120fc1ca0103e8be2cbab90e6ceced
Parents: 1c04f2f
Author: Lei Xia <lx...@linkedin.com>
Authored: Fri Mar 23 10:35:29 2018 -0700
Committer: Lei Xia <lx...@linkedin.com>
Committed: Fri Mar 23 12:12:44 2018 -0700

----------------------------------------------------------------------
 .../helix/manager/zk/CallbackHandler.java       | 36 ++++++++++++++------
 .../helix/manager/zk/zookeeper/ZkClient.java    | 20 +++++++----
 2 files changed, 40 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/90730bff/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
index c5a035f..663fa32 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java
@@ -276,7 +276,7 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener {
       throws Exception {
     //async mode only applicable to CALLBACK from ZK, During INIT and FINALIZE invoke the callback's immediately.
     if (_batchModeEnabled && changeContext.getType() == NotificationContext.Type.CALLBACK) {
-      logger.info("Enqueuing callback");
+      logger.debug("Enqueuing callback");
       _queue.put(changeContext);
     } else {
       invoke(changeContext);
@@ -442,15 +442,23 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener {
 
     if (_eventTypes.contains(EventType.NodeDataChanged) || _eventTypes
         .contains(EventType.NodeCreated) || _eventTypes.contains(EventType.NodeDeleted)) {
-      logger.debug("Subscribing data change listener to path:" + path);
+      if (logger.isDebugEnabled()) {
+        logger.debug("Subscribing data change listener to path:" + path);
+      }
       subscribeDataChange(path, context);
     }
 
     if (_eventTypes.contains(EventType.NodeChildrenChanged)) {
-      logger.debug("Subscribing child change listener to path:" + path);
+      if (logger.isDebugEnabled()) {
+        logger.debug("Subscribing child change listener to path:" + path);
+      }
+
       subscribeChildChange(path, context);
       if (watchChild) {
-        logger.debug("Subscribing data change listener to all children for path:" + path);
+        if (logger.isDebugEnabled()) {
+          logger.debug("Subscribing data change listener to all children for path:" + path);
+        }
+
         try {
           switch (_changeType) {
           case CURRENT_STATE:
@@ -504,7 +512,7 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener {
     }
 
     long end = System.currentTimeMillis();
-    logger.info("Subcribing to path:" + path + " took:" + (end - start));
+    logger.info("Subscribing to path:" + path + " took:" + (end - start));
   }
 
   public EventType[] getEventTypes() {
@@ -530,7 +538,9 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener {
 
   @Override
   public void handleDataChange(String dataPath, Object data) {
-    logger.info("Data change callback: paths changed: " + dataPath);
+    if (logger.isDebugEnabled()) {
+      logger.debug("Data change callback: paths changed: " + dataPath);
+    }
 
     try {
       updateNotificationTime(System.nanoTime());
@@ -547,8 +557,11 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener {
     }
   }
 
-  @Override public void handleDataDeleted(String dataPath) {
-    logger.info("Data change callback: path deleted: " + dataPath);
+  @Override
+  public void handleDataDeleted(String dataPath) {
+    if (logger.isDebugEnabled()) {
+      logger.debug("Data change callback: path deleted: " + dataPath);
+    }
 
     try {
       updateNotificationTime(System.nanoTime());
@@ -573,8 +586,11 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener {
 
   @Override
   public void handleChildChange(String parentPath, List<String> currentChilds) {
-    logger.info("Data change callback: child changed, path: " + parentPath + ", current child count: "
-        + currentChilds.size());
+    if (logger.isDebugEnabled()) {
+      logger.debug(
+          "Data change callback: child changed, path: " + parentPath + ", current child count: "
+              + currentChilds.size());
+    }
 
     try {
       updateNotificationTime(System.nanoTime());

http://git-wip-us.apache.org/repos/asf/helix/blob/90730bff/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
index 45c397a..2ae7f63 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/zookeeper/ZkClient.java
@@ -142,7 +142,9 @@ public class ZkClient implements Watcher {
       listeners.add(listener);
     }
     watchForData(path);
-    LOG.debug("Subscribed data changes for " + path);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Subscribed data changes for " + path);
+    }
   }
 
   public void unsubscribeDataChanges(String path, IZkDataListener dataListener) {
@@ -525,7 +527,9 @@ public class ZkClient implements Watcher {
 
   @Override
   public void process(WatchedEvent event) {
-    LOG.debug("Received event: " + event);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Received event: " + event);
+    }
     _zookeeperEventThread = Thread.currentThread();
 
     boolean stateChanged = event.getPath() == null;
@@ -540,8 +544,10 @@ public class ZkClient implements Watcher {
 
       // We might have to install child change event listener if a new node was created
       if (getShutdownTrigger()) {
-        LOG.debug("ignoring event '{" + event.getType() + " | " + event.getPath()
-            + "}' since shutdown triggered");
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("ignoring event '{" + event.getType() + " | " + event.getPath()
+              + "}' since shutdown triggered");
+        }
         return;
       }
       if (stateChanged) {
@@ -692,7 +698,7 @@ public class ZkClient implements Watcher {
         reconnect();
         fireNewSessionEvents();
       } catch (final Exception e) {
-        LOG.info(
+        LOG.warn(
             "Unable to re-establish connection. Notifying consumer of the following exception: ",
             e);
         fireSessionEstablishmentError(e);
@@ -855,7 +861,9 @@ public class ZkClient implements Watcher {
   public boolean waitUntilExists(String path, TimeUnit timeUnit, long time)
       throws ZkInterruptedException {
     Date timeout = new Date(System.currentTimeMillis() + timeUnit.toMillis(time));
-    LOG.debug("Waiting until znode '" + path + "' becomes available.");
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Waiting until znode '" + path + "' becomes available.");
+    }
     if (exists(path)) {
       return true;
     }