You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/03/15 19:00:18 UTC

helix git commit: RoutingTableProvider for TargetExternalView

Repository: helix
Updated Branches:
  refs/heads/master baf8b830e -> 07e05a4c1


RoutingTableProvider for TargetExternalView

Implement the RoutingTableProvide for TargetExternalView subscription. Write several tests for it.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/07e05a4c
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/07e05a4c
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/07e05a4c

Branch: refs/heads/master
Commit: 07e05a4c1c456b7e09c7750955fa46c34e1a288a
Parents: baf8b83
Author: Junkai Xue <jx...@linkedin.com>
Authored: Mon Feb 5 17:09:28 2018 -0800
Committer: Lei Xia <lx...@linkedin.com>
Committed: Thu Mar 15 11:03:11 2018 -0700

----------------------------------------------------------------------
 .../java/org/apache/helix/HelixConstants.java   |  36 +++--
 .../helix/common/BasicClusterDataCache.java     |  20 ++-
 .../helix/spectator/RoutingDataCache.java       |   6 +-
 .../helix/spectator/RoutingTableProvider.java   |  44 +++++-
 .../TestRoutingTableProviderWithSourceType.java | 152 +++++++++++++++++++
 5 files changed, 233 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/07e05a4c/helix-core/src/main/java/org/apache/helix/HelixConstants.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixConstants.java b/helix-core/src/main/java/org/apache/helix/HelixConstants.java
index 6935a23..f84173b 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixConstants.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixConstants.java
@@ -26,20 +26,30 @@ public interface HelixConstants {
   // TODO: ChangeType and PropertyType are duplicated, consider unifying
   enum ChangeType {
     // @formatter:off
-    IDEAL_STATE,
-    CONFIG,
-    INSTANCE_CONFIG,
-    RESOURCE_CONFIG,
-    CLUSTER_CONFIG,
-    LIVE_INSTANCE,
-    CURRENT_STATE,
-    MESSAGE,
-    EXTERNAL_VIEW,
-    TARGET_EXTERNAL_VIEW,
-    CONTROLLER,
-    MESSAGES_CONTROLLER,
-    HEALTH
+    IDEAL_STATE (PropertyType.IDEALSTATES),
+    CONFIG (PropertyType.CONFIGS),
+    INSTANCE_CONFIG (PropertyType.CONFIGS),
+    RESOURCE_CONFIG (PropertyType.CONFIGS),
+    CLUSTER_CONFIG (PropertyType.CONFIGS),
+    LIVE_INSTANCE (PropertyType.LIVEINSTANCES),
+    CURRENT_STATE (PropertyType.CURRENTSTATES),
+    MESSAGE (PropertyType.MESSAGES),
+    EXTERNAL_VIEW (PropertyType.EXTERNALVIEW),
+    TARGET_EXTERNAL_VIEW (PropertyType.TARGETEXTERNALVIEW),
+    CONTROLLER (PropertyType.CONTROLLER),
+    MESSAGES_CONTROLLER (PropertyType.MESSAGES_CONTROLLER),
+    HEALTH (PropertyType.HEALTHREPORT);
     // @formatter:on
+
+    private final PropertyType _propertyType;
+
+    ChangeType(PropertyType propertyType) {
+      _propertyType = propertyType;
+    }
+
+    public PropertyType getPropertyType() {
+      return _propertyType;
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/helix/blob/07e05a4c/helix-core/src/main/java/org/apache/helix/common/BasicClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/common/BasicClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/common/BasicClusterDataCache.java
index 994ebfb..d48cfbb 100644
--- a/helix-core/src/main/java/org/apache/helix/common/BasicClusterDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/common/BasicClusterDataCache.java
@@ -25,7 +25,9 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixException;
 import org.apache.helix.PropertyKey;
+import org.apache.helix.PropertyType;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
@@ -41,16 +43,23 @@ public class BasicClusterDataCache {
   private Map<String, LiveInstance> _liveInstanceMap;
   private Map<String, InstanceConfig> _instanceConfigMap;
   private Map<String, ExternalView> _externalViewMap;
+  private final PropertyType _sourceDataType;
+
   protected String _clusterName;
 
   protected Map<HelixConstants.ChangeType, Boolean> _propertyDataChangedMap;
 
   public BasicClusterDataCache(String clusterName) {
+    this(clusterName, PropertyType.EXTERNALVIEW);
+  }
+
+  public BasicClusterDataCache(String clusterName, PropertyType sourceDataType) {
     _propertyDataChangedMap = new ConcurrentHashMap<>();
     _liveInstanceMap = new HashMap<>();
     _instanceConfigMap = new HashMap<>();
     _externalViewMap = new HashMap<>();
     _clusterName = clusterName;
+    _sourceDataType = sourceDataType;
   }
 
   /**
@@ -68,7 +77,16 @@ public class BasicClusterDataCache {
     if (_propertyDataChangedMap.get(HelixConstants.ChangeType.EXTERNAL_VIEW)) {
       long start = System.currentTimeMillis();
       _propertyDataChangedMap.put(HelixConstants.ChangeType.EXTERNAL_VIEW, Boolean.valueOf(false));
-      _externalViewMap = accessor.getChildValuesMap(keyBuilder.externalViews());
+      switch (_sourceDataType) {
+        case EXTERNALVIEW:
+          _externalViewMap = accessor.getChildValuesMap(keyBuilder.externalViews());
+          break;
+        case TARGETEXTERNALVIEW:
+          _externalViewMap = accessor.getChildValuesMap(keyBuilder.targetExternalViews());
+          break;
+        default:
+          break;
+      }
       if (LOG.isDebugEnabled()) {
         LOG.debug("Reload ExternalViews: " + _externalViewMap.keySet() + ". Takes " + (
             System.currentTimeMillis() - start) + " ms");

http://git-wip-us.apache.org/repos/asf/helix/blob/07e05a4c/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java b/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java
index a754f55..5602333 100644
--- a/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java
@@ -20,15 +20,15 @@ package org.apache.helix.spectator;
  */
 
 import org.apache.helix.HelixConstants;
+import org.apache.helix.PropertyType;
 import org.apache.helix.common.BasicClusterDataCache;
 
 /**
  * Cache the cluster data that are needed by RoutingTableProvider.
  */
 public class RoutingDataCache extends BasicClusterDataCache {
-
-  public RoutingDataCache(String clusterName) {
-    super(clusterName);
+  public RoutingDataCache(String clusterName, PropertyType sourceDataType) {
+    super(clusterName, sourceDataType);
     requireFullRefresh();
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/07e05a4c/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
index a89636b..cd4e3d2 100644
--- a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
+++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
@@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import org.apache.helix.HelixConstants;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyType;
 import org.apache.helix.api.listeners.ConfigChangeListener;
 import org.apache.helix.api.listeners.InstanceConfigChangeListener;
 import org.apache.helix.api.listeners.ExternalViewChangeListener;
@@ -51,19 +52,41 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc
   private final AtomicReference<RoutingTable> _routingTableRef;
   private final HelixManager _helixManager;
   private final RouterUpdater _routerUpdater;
+  private final PropertyType _sourceDataType;
 
   public RoutingTableProvider() {
     this(null);
   }
 
   public RoutingTableProvider(HelixManager helixManager) throws HelixException {
+    this(helixManager, PropertyType.EXTERNALVIEW);
+  }
+
+  public RoutingTableProvider(HelixManager helixManager, PropertyType sourceDataType) throws HelixException {
     _routingTableRef = new AtomicReference<>(new RoutingTable());
     _helixManager = helixManager;
-    String clusterName = null;
+    _sourceDataType = sourceDataType;
+    String clusterName = _helixManager != null ? _helixManager.getClusterName() : null;
+    _routerUpdater = new RouterUpdater(clusterName, _sourceDataType);
+    _routerUpdater.start();
     if (_helixManager != null) {
-      clusterName = _helixManager.getClusterName();
       try {
-        _helixManager.addExternalViewChangeListener(this);
+        switch (_sourceDataType) {
+        case EXTERNALVIEW:
+          _helixManager.addExternalViewChangeListener(this);
+          break;
+        case TARGETEXTERNALVIEW:
+          // Check whether target external has been enabled or not
+          if (!_helixManager.getHelixDataAccessor().getBaseDataAccessor().exists(
+              _helixManager.getHelixDataAccessor().keyBuilder().targetExternalViews().getPath(),
+              0)) {
+            throw new HelixException("Target External View is not enabled!");
+          }
+          _helixManager.addTargetExternalViewChangeListener(this);
+          break;
+        default:
+          throw new HelixException("Unsupported source data type: " + sourceDataType);
+        }
         _helixManager.addInstanceConfigChangeListener(this);
         _helixManager.addLiveInstanceChangeListener(this);
       } catch (Exception e) {
@@ -71,8 +94,6 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc
         throw new HelixException("Failed to attach listeners to HelixManager!", e);
       }
     }
-    _routerUpdater = new RouterUpdater(clusterName);
-    _routerUpdater.start();
   }
 
   /**
@@ -188,7 +209,8 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc
    */
   public Set<InstanceConfig> getInstancesForResourceGroup(String resourceGroupName, String state,
       List<String> resourceTags) {
-    return _routingTableRef.get().getInstancesForResourceGroup(resourceGroupName, state, resourceTags);
+    return _routingTableRef.get().getInstancesForResourceGroup(resourceGroupName, state,
+        resourceTags);
   }
 
   /**
@@ -211,6 +233,12 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc
   @PreFetch(enabled = false)
   public void onExternalViewChange(List<ExternalView> externalViewList,
       NotificationContext changeContext) {
+    HelixConstants.ChangeType changeType = changeContext.getChangeType();
+    if (changeType != null && !changeType.getPropertyType().equals(_sourceDataType)) {
+      logger.warn("onExternalViewChange called with dis-matched change types. Source data type "
+          + _sourceDataType + ", changed data type: " + changeType);
+      return;
+    }
     // Refresh with full list of external view.
     // keep this here for back-compatibility
     if (externalViewList != null && externalViewList.size() > 0) {
@@ -268,9 +296,9 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc
   private class RouterUpdater extends ClusterEventProcessor {
     private final RoutingDataCache _dataCache;
 
-    public RouterUpdater(String clusterName) {
+    public RouterUpdater(String clusterName, PropertyType sourceDataType) {
       super("Helix-RouterUpdater-event_process");
-      _dataCache = new RoutingDataCache(clusterName);
+      _dataCache = new RoutingDataCache(clusterName, sourceDataType);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/helix/blob/07e05a4c/helix-core/src/test/java/org/apache/helix/integration/Spectator/TestRoutingTableProviderWithSourceType.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/Spectator/TestRoutingTableProviderWithSourceType.java b/helix-core/src/test/java/org/apache/helix/integration/Spectator/TestRoutingTableProviderWithSourceType.java
new file mode 100644
index 0000000..ddd27e3
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/Spectator/TestRoutingTableProviderWithSourceType.java
@@ -0,0 +1,152 @@
+package org.apache.helix.integration.Spectator;
+
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixException;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyType;
+import org.apache.helix.integration.common.ZkIntegrationTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.integration.task.WorkflowGenerator;
+import org.apache.helix.mock.participant.MockDelayMSStateModelFactory;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.participant.StateMachineEngine;
+import org.apache.helix.spectator.RoutingTableProvider;
+import org.apache.helix.tools.ClusterSetup;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestRoutingTableProviderWithSourceType extends ZkIntegrationTestBase {
+  private HelixManager _manager;
+  private ClusterSetup _setupTool;
+  private final String MASTER_SLAVE_STATE_MODEL = "MasterSlave";
+  private final int NUM_NODES = 10;
+  protected int NUM_PARTITIONS = 20;
+  protected int NUM_REPLICAS = 3;
+  private final int START_PORT = 12918;
+  private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + getShortClassName();
+  private MockParticipantManager[] _participants;
+  private ClusterControllerManager _controller;
+  private ConfigAccessor _configAccessor;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    String namespace = "/" + CLUSTER_NAME;
+    _participants =  new MockParticipantManager[NUM_NODES];
+    if (_gZkClient.exists(namespace)) {
+      _gZkClient.deleteRecursively(namespace);
+    }
+
+    _setupTool = new ClusterSetup(ZK_ADDR);
+    _setupTool.addCluster(CLUSTER_NAME, true);
+
+    _participants = new MockParticipantManager[NUM_NODES];
+    for (int i = 0; i < NUM_NODES; i++) {
+      String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
+    }
+
+    _setupTool.addResourceToCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, NUM_PARTITIONS,
+        MASTER_SLAVE_STATE_MODEL, IdealState.RebalanceMode.FULL_AUTO.name());
+
+    _setupTool
+        .rebalanceStorageCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, NUM_REPLICAS);
+
+    for (int i = 0; i < NUM_NODES; i++) {
+      String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+      _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName);
+
+      // add a delayed state model
+      StateMachineEngine stateMachine = _participants[i].getStateMachineEngine();
+      MockDelayMSStateModelFactory delayFactory =
+          new MockDelayMSStateModelFactory().setDelay(-300000L);
+      stateMachine.registerStateModelFactory(MASTER_SLAVE_STATE_MODEL, delayFactory);
+      _participants[i].syncStart();
+    }
+
+    _manager = HelixManagerFactory
+        .getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR, ZK_ADDR);
+    _manager.connect();
+
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+    _controller.syncStart();
+    _configAccessor = new ConfigAccessor(_gZkClient);
+  }
+
+  @AfterClass
+  public void afterClass() throws Exception {
+    _manager.disconnect();
+    for (int i = 0; i < NUM_NODES; i++) {
+      if (_participants[i] != null && _participants[i].isConnected()) {
+        _participants[i].reset();
+      }
+    }
+  }
+
+  @Test (expectedExceptions = HelixException.class)
+  public void testTargetExternalViewWithoutEnable() {
+    new RoutingTableProvider(_manager, PropertyType.TARGETEXTERNALVIEW);
+  }
+
+  @Test
+  public void testExternalViewDoesNotExist() {
+    String resourceName = WorkflowGenerator.DEFAULT_TGT_DB + 1;
+    RoutingTableProvider externalViewProvider =
+        new RoutingTableProvider(_manager, PropertyType.EXTERNALVIEW);
+    Assert.assertEquals(externalViewProvider.getInstancesForResource(resourceName, "SLAVE").size(),
+        0);
+  }
+
+  @Test (dependsOnMethods = "testTargetExternalViewWithoutEnable")
+  public void testExternalViewDiffFromTargetExternalView() throws InterruptedException {
+    ClusterConfig clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME);
+    clusterConfig.enableTargetExternalView(true);
+    clusterConfig.setPersistBestPossibleAssignment(true);
+    _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig);
+    Thread.sleep(2000);
+
+    RoutingTableProvider externalViewProvider =
+        new RoutingTableProvider(_manager, PropertyType.EXTERNALVIEW);
+    RoutingTableProvider targetExternalViewProvider =
+        new RoutingTableProvider(_manager, PropertyType.TARGETEXTERNALVIEW);
+
+    // ExternalView should not contain any MASTERS
+    // TargetExternalView should contain MASTERS same as the partition number
+    Set<InstanceConfig> externalViewMasters =
+        externalViewProvider.getInstancesForResource(WorkflowGenerator.DEFAULT_TGT_DB, "MASTER");
+    Assert.assertEquals(externalViewMasters.size(), 0);
+    Set<InstanceConfig> targetExternalViewMasters = targetExternalViewProvider
+        .getInstancesForResource(WorkflowGenerator.DEFAULT_TGT_DB, "MASTER");
+    Assert.assertEquals(targetExternalViewMasters.size(), NUM_NODES);
+
+    // TargetExternalView MASTERS mapping should exactly match IdealState MASTERS mapping
+    Map<String, Map<String, String>> stateMap = _setupTool.getClusterManagementTool()
+        .getResourceIdealState(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB).getRecord()
+        .getMapFields();
+
+    Set<String> idealMasters = new HashSet<>();
+    Set<String> targetMasters = new HashSet<>();
+    for (Map<String, String> instanceMap : stateMap.values()) {
+      for (String instance : instanceMap.keySet()) {
+        if (instanceMap.get(instance).equals("MASTER")) {
+          idealMasters.add(instance);
+        }
+      }
+    }
+
+    for (InstanceConfig instanceConfig : targetExternalViewMasters) {
+      targetMasters.add(instanceConfig.getInstanceName());
+    }
+    Assert.assertTrue(idealMasters.equals(targetMasters));
+  }
+}