You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by lx...@apache.org on 2018/03/23 19:29:27 UTC

[6/6] helix git commit: Add RoutingTableSnapshot class to hold a snapshot of routing table information and provide API to return RoutingTableSnapshot from RoutingTableProvider.

Add RoutingTableSnapshot class to hold a snapshot of routing table information and provide API to return RoutingTableSnapshot from RoutingTableProvider.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/73d243fd
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/73d243fd
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/73d243fd

Branch: refs/heads/master
Commit: 73d243fd1fd3846731aa02fb9d29ed09af73a378
Parents: e572846
Author: Lei Xia <lx...@linkedin.com>
Authored: Thu Feb 15 16:12:22 2018 -0800
Committer: Lei Xia <lx...@linkedin.com>
Committed: Fri Mar 23 12:12:44 2018 -0700

----------------------------------------------------------------------
 .../helix/spectator/RoutingDataCache.java       |   2 +-
 .../helix/spectator/RoutingTableProvider.java   |  10 ++
 .../helix/spectator/RoutingTableSnapshot.java   | 151 +++++++++++++++++++
 .../spectator/TestRoutingTableSnapshot.java     | 140 +++++++++++++++++
 4 files changed, 302 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/73d243fd/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java b/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java
index ffa2fe7..17c83b3 100644
--- a/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java
@@ -36,7 +36,7 @@ import org.slf4j.LoggerFactory;
 /**
  * Cache the cluster data that are needed by RoutingTableProvider.
  */
-public class RoutingDataCache extends BasicClusterDataCache {
+class RoutingDataCache extends BasicClusterDataCache {
   private static Logger LOG = LoggerFactory.getLogger(RoutingDataCache.class.getName());
 
   private final PropertyType _sourceDataType;

http://git-wip-us.apache.org/repos/asf/helix/blob/73d243fd/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
index f539ac9..3e01ea4 100644
--- a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
@@ -152,6 +152,16 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc
   }
 
   /**
+   * Get an snapshot of current RoutingTable information. The snapshot is immutable, it reflects the
+   * routing table information at the time this method is called.
+   *
+   * @return snapshot of current routing table.
+   */
+  public RoutingTableSnapshot getRoutingTableSnapshot() {
+    return new RoutingTableSnapshot(_routingTableRef.get());
+  }
+
+  /**
    * returns the instances for {resource,partition} pair that are in a specific
    * {state}
    *

http://git-wip-us.apache.org/repos/asf/helix/blob/73d243fd/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableSnapshot.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableSnapshot.java b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableSnapshot.java
new file mode 100644
index 0000000..1592fbe
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableSnapshot.java
@@ -0,0 +1,151 @@
+package org.apache.helix.spectator;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+
+/**
+ * The snapshot of RoutingTable information.  It is immutable, it reflects the routing table
+ * information at the time it is generated.
+ */
+public class RoutingTableSnapshot {
+  private final RoutingTable _routingTable;
+
+  public RoutingTableSnapshot(RoutingTable routingTable) {
+    _routingTable = routingTable;
+  }
+
+  /**
+   * returns all instances for {resource} that are in a specific {state}.
+   *
+   * @param resourceName
+   * @param state
+   *
+   * @return empty list if there is no instance in a given state
+   */
+  public Set<InstanceConfig> getInstancesForResource(String resourceName, String state) {
+    return _routingTable.getInstancesForResource(resourceName, state);
+  }
+
+  /**
+   * returns the instances for {resource,partition} pair that are in a specific {state}
+   *
+   * @param resourceName
+   * @param partitionName
+   * @param state
+   *
+   * @return empty list if there is no instance in a given state
+   */
+  public List<InstanceConfig> getInstancesForResource(String resourceName, String partitionName,
+      String state) {
+    return _routingTable.getInstancesForResource(resourceName, partitionName, state);
+  }
+
+  /**
+   * returns all instances for resources contains any given tags in {resource group} that are in a
+   * specific {state}
+   *
+   * @param resourceGroupName
+   * @param state
+   *
+   * @return empty list if there is no instance in a given state
+   */
+  public Set<InstanceConfig> getInstancesForResourceGroup(String resourceGroupName, String state,
+      List<String> resourceTags) {
+    return _routingTable.getInstancesForResourceGroup(resourceGroupName, state, resourceTags);
+  }
+
+  /**
+   * returns all instances for all resources in {resource group} that are in a specific {state}
+   *
+   * @param resourceGroupName
+   * @param state
+   *
+   * @return empty set if there is no instance in a given state
+   */
+  public Set<InstanceConfig> getInstancesForResourceGroup(String resourceGroupName, String state) {
+    return _routingTable.getInstancesForResourceGroup(resourceGroupName, state);
+  }
+
+  /**
+   * returns the instances for {resource group,partition} pair in all resources belongs to the given
+   * resource group that are in a specific {state}.
+   * The return results aggregate all partition states from all the resources in the given resource
+   * group.
+   *
+   * @param resourceGroupName
+   * @param partitionName
+   * @param state
+   *
+   * @return empty list if there is no instance in a given state
+   */
+  public List<InstanceConfig> getInstancesForResourceGroup(String resourceGroupName,
+      String partitionName, String state) {
+    return _routingTable.getInstancesForResourceGroup(resourceGroupName, partitionName, state);
+  }
+
+  /**
+   * returns the instances for {resource group,partition} pair contains any of the given tags that
+   * are in a specific {state}.
+   * Find all resources belongs to the given resource group that have any of the given resource tags
+   * and return the aggregated partition states from all these resources.
+   *
+   * @param resourceGroupName
+   * @param partitionName
+   * @param state
+   * @param resourceTags
+   *
+   * @return empty list if there is no instance in a given state
+   */
+  public List<InstanceConfig> getInstancesForResourceGroup(String resourceGroupName,
+      String partitionName, String state, List<String> resourceTags) {
+    return _routingTable
+        .getInstancesForResourceGroup(resourceGroupName, partitionName, state, resourceTags);
+  }
+
+  /**
+   * Return all liveInstances in the cluster now.
+   *
+   * @return
+   */
+  public Collection<LiveInstance> getLiveInstances() {
+    return _routingTable.getLiveInstances();
+  }
+
+  /**
+   * Return all instance's config in this cluster.
+   *
+   * @return
+   */
+  public Collection<InstanceConfig> getInstanceConfigs() {
+    return _routingTable.getInstanceConfigs();
+  }
+
+  /**
+   * Return names of all resources (shown in ExternalView) in this cluster.
+   */
+  public Collection<String> getResources() {
+    return _routingTable.getResources();
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/73d243fd/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableSnapshot.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableSnapshot.java b/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableSnapshot.java
new file mode 100644
index 0000000..d556b7a
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableSnapshot.java
@@ -0,0 +1,140 @@
+package org.apache.helix.integration.spectator;
+
+import java.util.List;
+import java.util.Set;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyType;
+import org.apache.helix.integration.common.ZkIntegrationTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.spectator.RoutingTableProvider;
+import org.apache.helix.spectator.RoutingTableSnapshot;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestRoutingTableSnapshot extends ZkIntegrationTestBase {
+  private HelixManager _manager;
+  private ClusterSetup _setupTool;
+  private final int NUM_NODES = 10;
+  protected int NUM_PARTITIONS = 20;
+  protected int NUM_REPLICAS = 3;
+  private final int START_PORT = 12918;
+  private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + getShortClassName();
+  private MockParticipantManager[] _participants;
+  private ClusterControllerManager _controller;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    String namespace = "/" + CLUSTER_NAME;
+    _participants =  new MockParticipantManager[NUM_NODES];
+    if (_gZkClient.exists(namespace)) {
+      _gZkClient.deleteRecursively(namespace);
+    }
+
+    _setupTool = new ClusterSetup(ZK_ADDR);
+    _setupTool.addCluster(CLUSTER_NAME, true);
+
+    _participants = new MockParticipantManager[NUM_NODES];
+    for (int i = 0; i < NUM_NODES; i++) {
+      String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+    }
+
+    for (int i = 0; i < NUM_NODES; i++) {
+      String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+      _participants[i].syncStart();
+    }
+
+    _manager = HelixManagerFactory
+        .getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR, ZK_ADDR);
+    _manager.connect();
+
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+    _controller.syncStart();
+  }
+
+  @AfterClass
+  public void afterClass() throws Exception {
+    _manager.disconnect();
+    for (int i = 0; i < NUM_NODES; i++) {
+      if (_participants[i] != null && _participants[i].isConnected()) {
+        _participants[i].reset();
+      }
+    }
+  }
+
+  @Test
+  public void testRoutingTableSnapshot() throws InterruptedException {
+    RoutingTableProvider routingTableProvider =
+        new RoutingTableProvider(_manager, PropertyType.EXTERNALVIEW);
+
+    String db1 = "TestDB-1";
+    _setupTool.addResourceToCluster(CLUSTER_NAME, db1, NUM_PARTITIONS, "MasterSlave",
+        IdealState.RebalanceMode.FULL_AUTO.name());
+    _setupTool.rebalanceStorageCluster(CLUSTER_NAME, db1, NUM_REPLICAS);
+
+    Thread.sleep(200);
+    HelixClusterVerifier clusterVerifier =
+        new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build();
+    Assert.assertTrue(clusterVerifier.verify());
+
+    IdealState idealState1 = _setupTool.getClusterManagementTool().getResourceIdealState(
+        CLUSTER_NAME, db1);
+
+    RoutingTableSnapshot routingTableSnapshot = routingTableProvider.getRoutingTableSnapshot();
+    validateMapping(idealState1, routingTableSnapshot);
+
+    Assert.assertEquals(routingTableSnapshot.getInstanceConfigs().size(), NUM_NODES);
+    Assert.assertEquals(routingTableSnapshot.getResources().size(), 1);
+    Assert.assertEquals(routingTableSnapshot.getLiveInstances().size(), NUM_NODES);
+
+    // add new DB and shutdown an instance
+    String db2 = "TestDB-2";
+    _setupTool.addResourceToCluster(CLUSTER_NAME, db2, NUM_PARTITIONS, "MasterSlave",
+        IdealState.RebalanceMode.FULL_AUTO.name());
+    _setupTool.rebalanceStorageCluster(CLUSTER_NAME, db2, NUM_REPLICAS);
+
+    // shutdown an instance
+    _participants[0].syncStop();
+    Thread.sleep(200);
+    Assert.assertTrue(clusterVerifier.verify());
+
+    // the original snapshot should not change
+    Assert.assertEquals(routingTableSnapshot.getInstanceConfigs().size(), NUM_NODES);
+    Assert.assertEquals(routingTableSnapshot.getResources().size(), 1);
+    Assert.assertEquals(routingTableSnapshot.getLiveInstances().size(), NUM_NODES);
+
+    RoutingTableSnapshot newRoutingTableSnapshot = routingTableProvider.getRoutingTableSnapshot();
+
+    Assert.assertEquals(newRoutingTableSnapshot.getInstanceConfigs().size(), NUM_NODES);
+    Assert.assertEquals(newRoutingTableSnapshot.getResources().size(), 2);
+    Assert.assertEquals(newRoutingTableSnapshot.getLiveInstances().size(), NUM_NODES - 1);
+  }
+
+  private void validateMapping(IdealState idealState, RoutingTableSnapshot routingTableSnapshot) {
+    String db = idealState.getResourceName();
+    Set<String> partitions = idealState.getPartitionSet();
+    for (String partition : partitions) {
+      List<InstanceConfig> masterInsEv =
+          routingTableSnapshot.getInstancesForResource(db, partition, "MASTER");
+      Assert.assertEquals(masterInsEv.size(), 1);
+
+      List<InstanceConfig> slaveInsEv =
+          routingTableSnapshot.getInstancesForResource(db, partition, "SLAVE");
+      Assert.assertEquals(slaveInsEv.size(), 2);
+    }
+  }
+}
+