You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/03/15 19:00:18 UTC
helix git commit: RoutingTableProvider for TargetExternalView
Repository: helix
Updated Branches:
refs/heads/master baf8b830e -> 07e05a4c1
RoutingTableProvider for TargetExternalView
Implement the RoutingTableProvide for TargetExternalView subscription. Write several tests for it.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/07e05a4c
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/07e05a4c
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/07e05a4c
Branch: refs/heads/master
Commit: 07e05a4c1c456b7e09c7750955fa46c34e1a288a
Parents: baf8b83
Author: Junkai Xue <jx...@linkedin.com>
Authored: Mon Feb 5 17:09:28 2018 -0800
Committer: Lei Xia <lx...@linkedin.com>
Committed: Thu Mar 15 11:03:11 2018 -0700
----------------------------------------------------------------------
.../java/org/apache/helix/HelixConstants.java | 36 +++--
.../helix/common/BasicClusterDataCache.java | 20 ++-
.../helix/spectator/RoutingDataCache.java | 6 +-
.../helix/spectator/RoutingTableProvider.java | 44 +++++-
.../TestRoutingTableProviderWithSourceType.java | 152 +++++++++++++++++++
5 files changed, 233 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/07e05a4c/helix-core/src/main/java/org/apache/helix/HelixConstants.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixConstants.java b/helix-core/src/main/java/org/apache/helix/HelixConstants.java
index 6935a23..f84173b 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixConstants.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixConstants.java
@@ -26,20 +26,30 @@ public interface HelixConstants {
// TODO: ChangeType and PropertyType are duplicated, consider unifying
enum ChangeType {
// @formatter:off
- IDEAL_STATE,
- CONFIG,
- INSTANCE_CONFIG,
- RESOURCE_CONFIG,
- CLUSTER_CONFIG,
- LIVE_INSTANCE,
- CURRENT_STATE,
- MESSAGE,
- EXTERNAL_VIEW,
- TARGET_EXTERNAL_VIEW,
- CONTROLLER,
- MESSAGES_CONTROLLER,
- HEALTH
+ IDEAL_STATE (PropertyType.IDEALSTATES),
+ CONFIG (PropertyType.CONFIGS),
+ INSTANCE_CONFIG (PropertyType.CONFIGS),
+ RESOURCE_CONFIG (PropertyType.CONFIGS),
+ CLUSTER_CONFIG (PropertyType.CONFIGS),
+ LIVE_INSTANCE (PropertyType.LIVEINSTANCES),
+ CURRENT_STATE (PropertyType.CURRENTSTATES),
+ MESSAGE (PropertyType.MESSAGES),
+ EXTERNAL_VIEW (PropertyType.EXTERNALVIEW),
+ TARGET_EXTERNAL_VIEW (PropertyType.TARGETEXTERNALVIEW),
+ CONTROLLER (PropertyType.CONTROLLER),
+ MESSAGES_CONTROLLER (PropertyType.MESSAGES_CONTROLLER),
+ HEALTH (PropertyType.HEALTHREPORT);
// @formatter:on
+
+ private final PropertyType _propertyType;
+
+ ChangeType(PropertyType propertyType) {
+ _propertyType = propertyType;
+ }
+
+ public PropertyType getPropertyType() {
+ return _propertyType;
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/helix/blob/07e05a4c/helix-core/src/main/java/org/apache/helix/common/BasicClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/common/BasicClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/common/BasicClusterDataCache.java
index 994ebfb..d48cfbb 100644
--- a/helix-core/src/main/java/org/apache/helix/common/BasicClusterDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/common/BasicClusterDataCache.java
@@ -25,7 +25,9 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.helix.HelixConstants;
import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
import org.apache.helix.PropertyKey;
+import org.apache.helix.PropertyType;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
@@ -41,16 +43,23 @@ public class BasicClusterDataCache {
private Map<String, LiveInstance> _liveInstanceMap;
private Map<String, InstanceConfig> _instanceConfigMap;
private Map<String, ExternalView> _externalViewMap;
+ private final PropertyType _sourceDataType;
+
protected String _clusterName;
protected Map<HelixConstants.ChangeType, Boolean> _propertyDataChangedMap;
public BasicClusterDataCache(String clusterName) {
+ this(clusterName, PropertyType.EXTERNALVIEW);
+ }
+
+ public BasicClusterDataCache(String clusterName, PropertyType sourceDataType) {
_propertyDataChangedMap = new ConcurrentHashMap<>();
_liveInstanceMap = new HashMap<>();
_instanceConfigMap = new HashMap<>();
_externalViewMap = new HashMap<>();
_clusterName = clusterName;
+ _sourceDataType = sourceDataType;
}
/**
@@ -68,7 +77,16 @@ public class BasicClusterDataCache {
if (_propertyDataChangedMap.get(HelixConstants.ChangeType.EXTERNAL_VIEW)) {
long start = System.currentTimeMillis();
_propertyDataChangedMap.put(HelixConstants.ChangeType.EXTERNAL_VIEW, Boolean.valueOf(false));
- _externalViewMap = accessor.getChildValuesMap(keyBuilder.externalViews());
+ switch (_sourceDataType) {
+ case EXTERNALVIEW:
+ _externalViewMap = accessor.getChildValuesMap(keyBuilder.externalViews());
+ break;
+ case TARGETEXTERNALVIEW:
+ _externalViewMap = accessor.getChildValuesMap(keyBuilder.targetExternalViews());
+ break;
+ default:
+ break;
+ }
if (LOG.isDebugEnabled()) {
LOG.debug("Reload ExternalViews: " + _externalViewMap.keySet() + ". Takes " + (
System.currentTimeMillis() - start) + " ms");
http://git-wip-us.apache.org/repos/asf/helix/blob/07e05a4c/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java b/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java
index a754f55..5602333 100644
--- a/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java
@@ -20,15 +20,15 @@ package org.apache.helix.spectator;
*/
import org.apache.helix.HelixConstants;
+import org.apache.helix.PropertyType;
import org.apache.helix.common.BasicClusterDataCache;
/**
* Cache the cluster data that are needed by RoutingTableProvider.
*/
public class RoutingDataCache extends BasicClusterDataCache {
-
- public RoutingDataCache(String clusterName) {
- super(clusterName);
+ public RoutingDataCache(String clusterName, PropertyType sourceDataType) {
+ super(clusterName, sourceDataType);
requireFullRefresh();
}
http://git-wip-us.apache.org/repos/asf/helix/blob/07e05a4c/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
index a89636b..cd4e3d2 100644
--- a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
@@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicReference;
import org.apache.helix.HelixConstants;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyType;
import org.apache.helix.api.listeners.ConfigChangeListener;
import org.apache.helix.api.listeners.InstanceConfigChangeListener;
import org.apache.helix.api.listeners.ExternalViewChangeListener;
@@ -51,19 +52,41 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc
private final AtomicReference<RoutingTable> _routingTableRef;
private final HelixManager _helixManager;
private final RouterUpdater _routerUpdater;
+ private final PropertyType _sourceDataType;
public RoutingTableProvider() {
this(null);
}
public RoutingTableProvider(HelixManager helixManager) throws HelixException {
+ this(helixManager, PropertyType.EXTERNALVIEW);
+ }
+
+ public RoutingTableProvider(HelixManager helixManager, PropertyType sourceDataType) throws HelixException {
_routingTableRef = new AtomicReference<>(new RoutingTable());
_helixManager = helixManager;
- String clusterName = null;
+ _sourceDataType = sourceDataType;
+ String clusterName = _helixManager != null ? _helixManager.getClusterName() : null;
+ _routerUpdater = new RouterUpdater(clusterName, _sourceDataType);
+ _routerUpdater.start();
if (_helixManager != null) {
- clusterName = _helixManager.getClusterName();
try {
- _helixManager.addExternalViewChangeListener(this);
+ switch (_sourceDataType) {
+ case EXTERNALVIEW:
+ _helixManager.addExternalViewChangeListener(this);
+ break;
+ case TARGETEXTERNALVIEW:
+ // Check whether target external has been enabled or not
+ if (!_helixManager.getHelixDataAccessor().getBaseDataAccessor().exists(
+ _helixManager.getHelixDataAccessor().keyBuilder().targetExternalViews().getPath(),
+ 0)) {
+ throw new HelixException("Target External View is not enabled!");
+ }
+ _helixManager.addTargetExternalViewChangeListener(this);
+ break;
+ default:
+ throw new HelixException("Unsupported source data type: " + sourceDataType);
+ }
_helixManager.addInstanceConfigChangeListener(this);
_helixManager.addLiveInstanceChangeListener(this);
} catch (Exception e) {
@@ -71,8 +94,6 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc
throw new HelixException("Failed to attach listeners to HelixManager!", e);
}
}
- _routerUpdater = new RouterUpdater(clusterName);
- _routerUpdater.start();
}
/**
@@ -188,7 +209,8 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc
*/
public Set<InstanceConfig> getInstancesForResourceGroup(String resourceGroupName, String state,
List<String> resourceTags) {
- return _routingTableRef.get().getInstancesForResourceGroup(resourceGroupName, state, resourceTags);
+ return _routingTableRef.get().getInstancesForResourceGroup(resourceGroupName, state,
+ resourceTags);
}
/**
@@ -211,6 +233,12 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc
@PreFetch(enabled = false)
public void onExternalViewChange(List<ExternalView> externalViewList,
NotificationContext changeContext) {
+ HelixConstants.ChangeType changeType = changeContext.getChangeType();
+ if (changeType != null && !changeType.getPropertyType().equals(_sourceDataType)) {
+ logger.warn("onExternalViewChange called with dis-matched change types. Source data type "
+ + _sourceDataType + ", changed data type: " + changeType);
+ return;
+ }
// Refresh with full list of external view.
// keep this here for back-compatibility
if (externalViewList != null && externalViewList.size() > 0) {
@@ -268,9 +296,9 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc
private class RouterUpdater extends ClusterEventProcessor {
private final RoutingDataCache _dataCache;
- public RouterUpdater(String clusterName) {
+ public RouterUpdater(String clusterName, PropertyType sourceDataType) {
super("Helix-RouterUpdater-event_process");
- _dataCache = new RoutingDataCache(clusterName);
+ _dataCache = new RoutingDataCache(clusterName, sourceDataType);
}
@Override
http://git-wip-us.apache.org/repos/asf/helix/blob/07e05a4c/helix-core/src/test/java/org/apache/helix/integration/Spectator/TestRoutingTableProviderWithSourceType.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/Spectator/TestRoutingTableProviderWithSourceType.java b/helix-core/src/test/java/org/apache/helix/integration/Spectator/TestRoutingTableProviderWithSourceType.java
new file mode 100644
index 0000000..ddd27e3
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/Spectator/TestRoutingTableProviderWithSourceType.java
@@ -0,0 +1,152 @@
+package org.apache.helix.integration.Spectator;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyType;
+import org.apache.helix.integration.common.ZkIntegrationTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.integration.task.WorkflowGenerator;
+import org.apache.helix.mock.participant.MockDelayMSStateModelFactory;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.spectator.RoutingTableProvider;
+import org.apache.helix.tools.ClusterSetup;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestRoutingTableProviderWithSourceType extends ZkIntegrationTestBase {
+ private HelixManager _manager;
+ private ClusterSetup _setupTool;
+ private final String MASTER_SLAVE_STATE_MODEL = "MasterSlave";
+ private final int NUM_NODES = 10;
+ protected int NUM_PARTITIONS = 20;
+ protected int NUM_REPLICAS = 3;
+ private final int START_PORT = 12918;
+ private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + getShortClassName();
+ private MockParticipantManager[] _participants;
+ private ClusterControllerManager _controller;
+ private ConfigAccessor _configAccessor;
+
+ @BeforeClass
+ public void beforeClass() throws Exception {
+ String namespace = "/" + CLUSTER_NAME;
+ _participants = new MockParticipantManager[NUM_NODES];
+ if (_gZkClient.exists(namespace)) {
+ _gZkClient.deleteRecursively(namespace);
+ }
+
+ _setupTool = new ClusterSetup(ZK_ADDR);
+ _setupTool.addCluster(CLUSTER_NAME, true);
+
+ _participants = new MockParticipantManager[NUM_NODES];
+ for (int i = 0; i < NUM_NODES; i++) {
+ String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+ _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+ }
+
+ _setupTool.addResourceToCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, NUM_PARTITIONS,
+ MASTER_SLAVE_STATE_MODEL, IdealState.RebalanceMode.FULL_AUTO.name());
+
+ _setupTool
+ .rebalanceStorageCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, NUM_REPLICAS);
+
+ for (int i = 0; i < NUM_NODES; i++) {
+ String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+ _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+
+ // add a delayed state model
+ StateMachineEngine stateMachine = _participants[i].getStateMachineEngine();
+ MockDelayMSStateModelFactory delayFactory =
+ new MockDelayMSStateModelFactory().setDelay(-300000L);
+ stateMachine.registerStateModelFactory(MASTER_SLAVE_STATE_MODEL, delayFactory);
+ _participants[i].syncStart();
+ }
+
+ _manager = HelixManagerFactory
+ .getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR, ZK_ADDR);
+ _manager.connect();
+
+ String controllerName = CONTROLLER_PREFIX + "_0";
+ _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+ _controller.syncStart();
+ _configAccessor = new ConfigAccessor(_gZkClient);
+ }
+
+ @AfterClass
+ public void afterClass() throws Exception {
+ _manager.disconnect();
+ for (int i = 0; i < NUM_NODES; i++) {
+ if (_participants[i] != null && _participants[i].isConnected()) {
+ _participants[i].reset();
+ }
+ }
+ }
+
+ @Test (expectedExceptions = HelixException.class)
+ public void testTargetExternalViewWithoutEnable() {
+ new RoutingTableProvider(_manager, PropertyType.TARGETEXTERNALVIEW);
+ }
+
+ @Test
+ public void testExternalViewDoesNotExist() {
+ String resourceName = WorkflowGenerator.DEFAULT_TGT_DB + 1;
+ RoutingTableProvider externalViewProvider =
+ new RoutingTableProvider(_manager, PropertyType.EXTERNALVIEW);
+ Assert.assertEquals(externalViewProvider.getInstancesForResource(resourceName, "SLAVE").size(),
+ 0);
+ }
+
+ @Test (dependsOnMethods = "testTargetExternalViewWithoutEnable")
+ public void testExternalViewDiffFromTargetExternalView() throws InterruptedException {
+ ClusterConfig clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME);
+ clusterConfig.enableTargetExternalView(true);
+ clusterConfig.setPersistBestPossibleAssignment(true);
+ _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+ Thread.sleep(2000);
+
+ RoutingTableProvider externalViewProvider =
+ new RoutingTableProvider(_manager, PropertyType.EXTERNALVIEW);
+ RoutingTableProvider targetExternalViewProvider =
+ new RoutingTableProvider(_manager, PropertyType.TARGETEXTERNALVIEW);
+
+ // ExternalView should not contain any MASTERS
+ // TargetExternalView should contain MASTERS same as the partition number
+ Set<InstanceConfig> externalViewMasters =
+ externalViewProvider.getInstancesForResource(WorkflowGenerator.DEFAULT_TGT_DB, "MASTER");
+ Assert.assertEquals(externalViewMasters.size(), 0);
+ Set<InstanceConfig> targetExternalViewMasters = targetExternalViewProvider
+ .getInstancesForResource(WorkflowGenerator.DEFAULT_TGT_DB, "MASTER");
+ Assert.assertEquals(targetExternalViewMasters.size(), NUM_NODES);
+
+ // TargetExternalView MASTERS mapping should exactly match IdealState MASTERS mapping
+ Map<String, Map<String, String>> stateMap = _setupTool.getClusterManagementTool()
+ .getResourceIdealState(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB).getRecord()
+ .getMapFields();
+
+ Set<String> idealMasters = new HashSet<>();
+ Set<String> targetMasters = new HashSet<>();
+ for (Map<String, String> instanceMap : stateMap.values()) {
+ for (String instance : instanceMap.keySet()) {
+ if (instanceMap.get(instance).equals("MASTER")) {
+ idealMasters.add(instance);
+ }
+ }
+ }
+
+ for (InstanceConfig instanceConfig : targetExternalViewMasters) {
+ targetMasters.add(instanceConfig.getInstanceName());
+ }
+ Assert.assertTrue(idealMasters.equals(targetMasters));
+ }
+}