You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by lx...@apache.org on 2018/03/23 19:29:27 UTC
[6/6] helix git commit: Add RoutingTableSnapshot class to hold a
snapshot of routing table information and provide API to return
RoutingTableSnapshot from RoutingTableProvider.
Add RoutingTableSnapshot class to hold a snapshot of routing table information and provide API to return RoutingTableSnapshot from RoutingTableProvider.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/73d243fd
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/73d243fd
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/73d243fd
Branch: refs/heads/master
Commit: 73d243fd1fd3846731aa02fb9d29ed09af73a378
Parents: e572846
Author: Lei Xia <lx...@linkedin.com>
Authored: Thu Feb 15 16:12:22 2018 -0800
Committer: Lei Xia <lx...@linkedin.com>
Committed: Fri Mar 23 12:12:44 2018 -0700
----------------------------------------------------------------------
.../helix/spectator/RoutingDataCache.java | 2 +-
.../helix/spectator/RoutingTableProvider.java | 10 ++
.../helix/spectator/RoutingTableSnapshot.java | 151 +++++++++++++++++++
.../spectator/TestRoutingTableSnapshot.java | 140 +++++++++++++++++
4 files changed, 302 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/73d243fd/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java b/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java
index ffa2fe7..17c83b3 100644
--- a/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java
@@ -36,7 +36,7 @@ import org.slf4j.LoggerFactory;
/**
* Cache the cluster data that are needed by RoutingTableProvider.
*/
-public class RoutingDataCache extends BasicClusterDataCache {
+class RoutingDataCache extends BasicClusterDataCache {
private static Logger LOG = LoggerFactory.getLogger(RoutingDataCache.class.getName());
private final PropertyType _sourceDataType;
http://git-wip-us.apache.org/repos/asf/helix/blob/73d243fd/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
index f539ac9..3e01ea4 100644
--- a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
@@ -152,6 +152,16 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc
}
/**
+ * Get an snapshot of current RoutingTable information. The snapshot is immutable, it reflects the
+ * routing table information at the time this method is called.
+ *
+ * @return snapshot of current routing table.
+ */
+ public RoutingTableSnapshot getRoutingTableSnapshot() {
+ return new RoutingTableSnapshot(_routingTableRef.get());
+ }
+
+ /**
* returns the instances for {resource,partition} pair that are in a specific
* {state}
*
http://git-wip-us.apache.org/repos/asf/helix/blob/73d243fd/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableSnapshot.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableSnapshot.java b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableSnapshot.java
new file mode 100644
index 0000000..1592fbe
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableSnapshot.java
@@ -0,0 +1,151 @@
+package org.apache.helix.spectator;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+
+/**
+ * The snapshot of RoutingTable information. It is immutable, it reflects the routing table
+ * information at the time it is generated.
+ */
+public class RoutingTableSnapshot {
+ private final RoutingTable _routingTable;
+
+ public RoutingTableSnapshot(RoutingTable routingTable) {
+ _routingTable = routingTable;
+ }
+
+ /**
+ * returns all instances for {resource} that are in a specific {state}.
+ *
+ * @param resourceName
+ * @param state
+ *
+ * @return empty list if there is no instance in a given state
+ */
+ public Set<InstanceConfig> getInstancesForResource(String resourceName, String state) {
+ return _routingTable.getInstancesForResource(resourceName, state);
+ }
+
+ /**
+ * returns the instances for {resource,partition} pair that are in a specific {state}
+ *
+ * @param resourceName
+ * @param partitionName
+ * @param state
+ *
+ * @return empty list if there is no instance in a given state
+ */
+ public List<InstanceConfig> getInstancesForResource(String resourceName, String partitionName,
+ String state) {
+ return _routingTable.getInstancesForResource(resourceName, partitionName, state);
+ }
+
+ /**
+ * returns all instances for resources contains any given tags in {resource group} that are in a
+ * specific {state}
+ *
+ * @param resourceGroupName
+ * @param state
+ *
+ * @return empty list if there is no instance in a given state
+ */
+ public Set<InstanceConfig> getInstancesForResourceGroup(String resourceGroupName, String state,
+ List<String> resourceTags) {
+ return _routingTable.getInstancesForResourceGroup(resourceGroupName, state, resourceTags);
+ }
+
+ /**
+ * returns all instances for all resources in {resource group} that are in a specific {state}
+ *
+ * @param resourceGroupName
+ * @param state
+ *
+ * @return empty set if there is no instance in a given state
+ */
+ public Set<InstanceConfig> getInstancesForResourceGroup(String resourceGroupName, String state) {
+ return _routingTable.getInstancesForResourceGroup(resourceGroupName, state);
+ }
+
+ /**
+ * returns the instances for {resource group,partition} pair in all resources belongs to the given
+ * resource group that are in a specific {state}.
+ * The return results aggregate all partition states from all the resources in the given resource
+ * group.
+ *
+ * @param resourceGroupName
+ * @param partitionName
+ * @param state
+ *
+ * @return empty list if there is no instance in a given state
+ */
+ public List<InstanceConfig> getInstancesForResourceGroup(String resourceGroupName,
+ String partitionName, String state) {
+ return _routingTable.getInstancesForResourceGroup(resourceGroupName, partitionName, state);
+ }
+
+ /**
+ * returns the instances for {resource group,partition} pair contains any of the given tags that
+ * are in a specific {state}.
+ * Find all resources belongs to the given resource group that have any of the given resource tags
+ * and return the aggregated partition states from all these resources.
+ *
+ * @param resourceGroupName
+ * @param partitionName
+ * @param state
+ * @param resourceTags
+ *
+ * @return empty list if there is no instance in a given state
+ */
+ public List<InstanceConfig> getInstancesForResourceGroup(String resourceGroupName,
+ String partitionName, String state, List<String> resourceTags) {
+ return _routingTable
+ .getInstancesForResourceGroup(resourceGroupName, partitionName, state, resourceTags);
+ }
+
+ /**
+ * Return all liveInstances in the cluster now.
+ *
+ * @return
+ */
+ public Collection<LiveInstance> getLiveInstances() {
+ return _routingTable.getLiveInstances();
+ }
+
+ /**
+ * Return all instance's config in this cluster.
+ *
+ * @return
+ */
+ public Collection<InstanceConfig> getInstanceConfigs() {
+ return _routingTable.getInstanceConfigs();
+ }
+
+ /**
+ * Return names of all resources (shown in ExternalView) in this cluster.
+ */
+ public Collection<String> getResources() {
+ return _routingTable.getResources();
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/73d243fd/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableSnapshot.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableSnapshot.java b/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableSnapshot.java
new file mode 100644
index 0000000..d556b7a
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableSnapshot.java
@@ -0,0 +1,140 @@
+package org.apache.helix.integration.spectator;
+
+import java.util.List;
+import java.util.Set;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyType;
+import org.apache.helix.integration.common.ZkIntegrationTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.spectator.RoutingTableProvider;
+import org.apache.helix.spectator.RoutingTableSnapshot;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestRoutingTableSnapshot extends ZkIntegrationTestBase {
+ private HelixManager _manager;
+ private ClusterSetup _setupTool;
+ private final int NUM_NODES = 10;
+ protected int NUM_PARTITIONS = 20;
+ protected int NUM_REPLICAS = 3;
+ private final int START_PORT = 12918;
+ private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + getShortClassName();
+ private MockParticipantManager[] _participants;
+ private ClusterControllerManager _controller;
+
+ @BeforeClass
+ public void beforeClass() throws Exception {
+ String namespace = "/" + CLUSTER_NAME;
+ _participants = new MockParticipantManager[NUM_NODES];
+ if (_gZkClient.exists(namespace)) {
+ _gZkClient.deleteRecursively(namespace);
+ }
+
+ _setupTool = new ClusterSetup(ZK_ADDR);
+ _setupTool.addCluster(CLUSTER_NAME, true);
+
+ _participants = new MockParticipantManager[NUM_NODES];
+ for (int i = 0; i < NUM_NODES; i++) {
+ String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+ _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+ }
+
+ for (int i = 0; i < NUM_NODES; i++) {
+ String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+ _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+ _participants[i].syncStart();
+ }
+
+ _manager = HelixManagerFactory
+ .getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR, ZK_ADDR);
+ _manager.connect();
+
+ String controllerName = CONTROLLER_PREFIX + "_0";
+ _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+ _controller.syncStart();
+ }
+
+ @AfterClass
+ public void afterClass() throws Exception {
+ _manager.disconnect();
+ for (int i = 0; i < NUM_NODES; i++) {
+ if (_participants[i] != null && _participants[i].isConnected()) {
+ _participants[i].reset();
+ }
+ }
+ }
+
+ @Test
+ public void testRoutingTableSnapshot() throws InterruptedException {
+ RoutingTableProvider routingTableProvider =
+ new RoutingTableProvider(_manager, PropertyType.EXTERNALVIEW);
+
+ String db1 = "TestDB-1";
+ _setupTool.addResourceToCluster(CLUSTER_NAME, db1, NUM_PARTITIONS, "MasterSlave",
+ IdealState.RebalanceMode.FULL_AUTO.name());
+ _setupTool.rebalanceStorageCluster(CLUSTER_NAME, db1, NUM_REPLICAS);
+
+ Thread.sleep(200);
+ HelixClusterVerifier clusterVerifier =
+ new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkAddr(ZK_ADDR).build();
+ Assert.assertTrue(clusterVerifier.verify());
+
+ IdealState idealState1 = _setupTool.getClusterManagementTool().getResourceIdealState(
+ CLUSTER_NAME, db1);
+
+ RoutingTableSnapshot routingTableSnapshot = routingTableProvider.getRoutingTableSnapshot();
+ validateMapping(idealState1, routingTableSnapshot);
+
+ Assert.assertEquals(routingTableSnapshot.getInstanceConfigs().size(), NUM_NODES);
+ Assert.assertEquals(routingTableSnapshot.getResources().size(), 1);
+ Assert.assertEquals(routingTableSnapshot.getLiveInstances().size(), NUM_NODES);
+
+ // add new DB and shutdown an instance
+ String db2 = "TestDB-2";
+ _setupTool.addResourceToCluster(CLUSTER_NAME, db2, NUM_PARTITIONS, "MasterSlave",
+ IdealState.RebalanceMode.FULL_AUTO.name());
+ _setupTool.rebalanceStorageCluster(CLUSTER_NAME, db2, NUM_REPLICAS);
+
+ // shutdown an instance
+ _participants[0].syncStop();
+ Thread.sleep(200);
+ Assert.assertTrue(clusterVerifier.verify());
+
+ // the original snapshot should not change
+ Assert.assertEquals(routingTableSnapshot.getInstanceConfigs().size(), NUM_NODES);
+ Assert.assertEquals(routingTableSnapshot.getResources().size(), 1);
+ Assert.assertEquals(routingTableSnapshot.getLiveInstances().size(), NUM_NODES);
+
+ RoutingTableSnapshot newRoutingTableSnapshot = routingTableProvider.getRoutingTableSnapshot();
+
+ Assert.assertEquals(newRoutingTableSnapshot.getInstanceConfigs().size(), NUM_NODES);
+ Assert.assertEquals(newRoutingTableSnapshot.getResources().size(), 2);
+ Assert.assertEquals(newRoutingTableSnapshot.getLiveInstances().size(), NUM_NODES - 1);
+ }
+
+ private void validateMapping(IdealState idealState, RoutingTableSnapshot routingTableSnapshot) {
+ String db = idealState.getResourceName();
+ Set<String> partitions = idealState.getPartitionSet();
+ for (String partition : partitions) {
+ List<InstanceConfig> masterInsEv =
+ routingTableSnapshot.getInstancesForResource(db, partition, "MASTER");
+ Assert.assertEquals(masterInsEv.size(), 1);
+
+ List<InstanceConfig> slaveInsEv =
+ routingTableSnapshot.getInstancesForResource(db, partition, "SLAVE");
+ Assert.assertEquals(slaveInsEv.size(), 2);
+ }
+ }
+}
+