You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by lx...@apache.org on 2017/02/08 17:59:45 UTC

[10/38] helix git commit: Add StrictMatchExternalViewVerifier that verifies whether the ExternalViews of given resources (or all resources in the cluster) match exactly as its ideal mapping (in idealstate).

Add StrictMatchExternalViewVerifier that verifies whether the ExternalViews of given resources (or all resources in the cluster) match exactly as its ideal mapping (in idealstate).


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/981d0e29
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/981d0e29
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/981d0e29

Branch: refs/heads/helix-0.6.x
Commit: 981d0e295e01cd0839c8d5d0e54350f794ad52f7
Parents: 04495e7
Author: Lei Xia <lx...@linkedin.com>
Authored: Thu Jul 21 11:29:02 2016 -0700
Committer: Lei Xia <lx...@linkedin.com>
Committed: Sun Feb 5 18:57:09 2017 -0800

----------------------------------------------------------------------
 .../BestPossibleExternalViewVerifier.java       |  18 +-
 .../StrictMatchExternalViewVerifier.java        | 331 +++++++++++++++++++
 .../apache/helix/tools/TestClusterVerifier.java |  82 +++--
 3 files changed, 391 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/981d0e29/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/BestPossibleExternalViewVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/BestPossibleExternalViewVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/BestPossibleExternalViewVerifier.java
index b350d91..295f5f8 100644
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/BestPossibleExternalViewVerifier.java
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/BestPossibleExternalViewVerifier.java
@@ -82,6 +82,10 @@ public class BestPossibleExternalViewVerifier extends ZkHelixClusterVerifier {
     private String _zkAddr;
     private ZkClient _zkClient;
 
+    public Builder(String clusterName) {
+      _clusterName = clusterName;
+    }
+
     public BestPossibleExternalViewVerifier build() {
       if (_clusterName == null || (_zkAddr == null && _zkClient == null)) {
         throw new IllegalArgumentException("Cluster name or zookeeper info is missing!");
@@ -99,11 +103,6 @@ public class BestPossibleExternalViewVerifier extends ZkHelixClusterVerifier {
       return _clusterName;
     }
 
-    public Builder setClusterName(String clusterName) {
-      _clusterName = clusterName;
-      return this;
-    }
-
     public Map<String, Map<String, String>> getErrStates() {
       return _errStates;
     }
@@ -333,6 +332,8 @@ public class BestPossibleExternalViewVerifier extends ZkHelixClusterVerifier {
 
     runStage(event, new ResourceComputationStage());
     runStage(event, new CurrentStateComputationStage());
+
+    // TODO: be caution here, should be handled statelessly.
     runStage(event, new BestPossibleStateCalcStage());
 
     BestPossibleStateOutput output =
@@ -351,9 +352,8 @@ public class BestPossibleExternalViewVerifier extends ZkHelixClusterVerifier {
 
   @Override
   public String toString() {
-    String verifierName = getClass().getName();
-    verifierName = verifierName.substring(verifierName.lastIndexOf('.') + 1, verifierName.length());
-    return verifierName + "(" + _clusterName + "@" + _zkClient.getServers() + "@resources["
-        + _resources != null ? Arrays.toString(_resources.toArray()) : "" + "])";
+    String verifierName = getClass().getSimpleName();
+    return verifierName + "(" + _clusterName + "@" + _zkClient + "@resources["
+       + _resources != null ? Arrays.toString(_resources.toArray()) : "" + "])";
   }
 }

http://git-wip-us.apache.org/repos/asf/helix/blob/981d0e29/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/StrictMatchExternalViewVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/StrictMatchExternalViewVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/StrictMatchExternalViewVerifier.java
new file mode 100644
index 0000000..8b5bb77
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/StrictMatchExternalViewVerifier.java
@@ -0,0 +1,331 @@
+package org.apache.helix.tools.ClusterStateVerifier;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.PropertyKey;
+import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.task.TaskConstants;
+import org.apache.log4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Verifier that verifies whether the ExternalViews of given resources (or all resources in the cluster)
+ * match exactly as its ideal mapping (in idealstate).
+ */
+public class StrictMatchExternalViewVerifier extends ZkHelixClusterVerifier {
+  private static Logger LOG = Logger.getLogger(StrictMatchExternalViewVerifier.class);
+
+  private final Set<String> _resources;
+  private final Set<String> _expectLiveInstances;
+
+  public StrictMatchExternalViewVerifier(String zkAddr, String clusterName, Set<String> resources,
+      Set<String> expectLiveInstances) {
+    super(zkAddr, clusterName);
+    _resources = resources;
+    _expectLiveInstances = expectLiveInstances;
+  }
+
+  public StrictMatchExternalViewVerifier(ZkClient zkClient, String clusterName,
+      Set<String> resources, Set<String> expectLiveInstances) {
+    super(zkClient, clusterName);
+    _resources = resources;
+    _expectLiveInstances = expectLiveInstances;
+  }
+
+  public static class Builder {
+    private String _clusterName;
+    private Set<String> _resources;
+    private Set<String> _expectLiveInstances;
+    private String _zkAddr;
+    private ZkClient _zkClient;
+
+    public StrictMatchExternalViewVerifier build() {
+      if (_clusterName == null || (_zkAddr == null && _zkClient == null)) {
+        throw new IllegalArgumentException("Cluster name or zookeeper info is missing!");
+      }
+
+      if (_zkClient != null) {
+        return new StrictMatchExternalViewVerifier(_zkClient, _clusterName, _resources,
+            _expectLiveInstances);
+      }
+      return new StrictMatchExternalViewVerifier(_zkAddr, _clusterName, _resources,
+          _expectLiveInstances);
+    }
+
+    public Builder(String clusterName) {
+      _clusterName = clusterName;
+    }
+
+    public String getClusterName() {
+      return _clusterName;
+    }
+
+    public Set<String> getResources() {
+      return _resources;
+    }
+
+    public Builder setResources(Set<String> resources) {
+      _resources = resources;
+      return this;
+    }
+
+    public Set<String> getExpectLiveInstances() {
+      return _expectLiveInstances;
+    }
+
+    public Builder setExpectLiveInstances(Set<String> expectLiveInstances) {
+      _expectLiveInstances = expectLiveInstances;
+      return this;
+    }
+
+    public String getZkAddr() {
+      return _zkAddr;
+    }
+
+    public Builder setZkAddr(String zkAddr) {
+      _zkAddr = zkAddr;
+      return this;
+    }
+
+    public ZkClient getZkClient() {
+      return _zkClient;
+    }
+
+    public Builder setZkClient(ZkClient zkClient) {
+      _zkClient = zkClient;
+      return this;
+    }
+  }
+
+  @Override
+  public boolean verify(long timeout) {
+    return verifyByZkCallback(timeout);
+  }
+
+  @Override
+  public boolean verifyByZkCallback(long timeout) {
+    List<ClusterVerifyTrigger> triggers = new ArrayList<ClusterVerifyTrigger>();
+
+    // setup triggers
+    if (_resources != null && !_resources.isEmpty()) {
+      for (String resource : _resources) {
+        triggers
+            .add(new ClusterVerifyTrigger(_keyBuilder.idealStates(resource), true, false, false));
+        triggers
+            .add(new ClusterVerifyTrigger(_keyBuilder.externalView(resource), true, false, false));
+      }
+
+    } else {
+      triggers.add(new ClusterVerifyTrigger(_keyBuilder.idealStates(), false, true, true));
+      triggers.add(new ClusterVerifyTrigger(_keyBuilder.externalViews(), false, true, true));
+    }
+
+    return verifyByCallback(timeout, triggers);
+  }
+
+  @Override
+  protected boolean verifyState() {
+    try {
+      PropertyKey.Builder keyBuilder = _accessor.keyBuilder();
+      // read cluster once and do verification
+      ClusterDataCache cache = new ClusterDataCache();
+      cache.refresh(_accessor);
+
+      Map<String, IdealState> idealStates = cache.getIdealStates();
+      if (idealStates == null) {
+        // ideal state is null because ideal state is dropped
+        idealStates = Collections.emptyMap();
+      }
+
+      // filter out all resources that use Task state model
+      Iterator<Map.Entry<String, IdealState>> it = idealStates.entrySet().iterator();
+      while (it.hasNext()) {
+        Map.Entry<String, IdealState> pair = it.next();
+        if (pair.getValue().getStateModelDefRef().equals(TaskConstants.STATE_MODEL_NAME)) {
+          it.remove();
+        }
+      }
+
+      // verify live instances.
+      if (_expectLiveInstances != null && !_expectLiveInstances.isEmpty()) {
+        Set<String> actualLiveNodes = cache.getLiveInstances().keySet();
+        if (!_expectLiveInstances.equals(actualLiveNodes)) {
+          return false;
+        }
+      }
+
+      Map<String, ExternalView> extViews = _accessor.getChildValuesMap(keyBuilder.externalViews());
+      if (extViews == null) {
+        extViews = Collections.emptyMap();
+      }
+
+      // Filter resources if requested
+      if (_resources != null && !_resources.isEmpty()) {
+        idealStates.keySet().retainAll(_resources);
+        extViews.keySet().retainAll(_resources);
+      }
+
+      // if externalView is not empty and idealState doesn't exist
+      // add empty idealState for the resource
+      for (String resource : extViews.keySet()) {
+        if (!idealStates.containsKey(resource)) {
+          idealStates.put(resource, new IdealState(resource));
+        }
+      }
+
+      for (String resourceName : idealStates.keySet()) {
+        ExternalView extView = extViews.get(resourceName);
+        IdealState idealState = idealStates.get(resourceName);
+        if (extView == null) {
+          if (idealState.isExternalViewDisabled()) {
+            continue;
+          } else {
+            LOG.debug("externalView for " + resourceName + " is not available");
+            return false;
+          }
+        }
+
+        boolean result = verifyExternalView(cache, extView, idealState);
+        if (!result) {
+          return false;
+        }
+      }
+      return true;
+    } catch (Exception e) {
+      LOG.error("exception in verification", e);
+      return false;
+    }
+  }
+
+  private boolean verifyExternalView(ClusterDataCache dataCache, ExternalView externalView,
+      IdealState idealState) {
+    Map<String, Map<String, String>> mappingInExtview = externalView.getRecord().getMapFields();
+    Map<String, Map<String, String>> idealPartitionState;
+
+    switch (idealState.getRebalanceMode()) {
+    case FULL_AUTO:
+    case SEMI_AUTO:
+    case USER_DEFINED:
+      idealPartitionState = computeIdealPartitionState(dataCache, idealState);
+      break;
+    case CUSTOMIZED:
+      idealPartitionState = idealState.getRecord().getMapFields();
+      break;
+    case TASK:
+      // ignore jobs
+    default:
+      return true;
+    }
+
+    return mappingInExtview.equals(idealPartitionState);
+  }
+
+  private Map<String, Map<String, String>> computeIdealPartitionState(ClusterDataCache cache,
+      IdealState idealState) {
+    String stateModelDefName = idealState.getStateModelDefRef();
+    StateModelDefinition stateModelDef = cache.getStateModelDef(stateModelDefName);
+
+    Map<String, Map<String, String>> idealPartitionState =
+        new HashMap<String, Map<String, String>>();
+
+    Set<String> liveEnabledInstances = new HashSet<String>(cache.getLiveInstances().keySet());
+    liveEnabledInstances.removeAll(cache.getDisabledInstances());
+
+    for (String partition : idealState.getPartitionSet()) {
+      List<String> preferenceList = ConstraintBasedAssignment
+          .getPreferenceList(cache, new Partition(partition), idealState, stateModelDef);
+      Map<String, String> idealMapping =
+          computeIdealMapping(preferenceList, stateModelDef, liveEnabledInstances);
+      idealPartitionState.put(partition, idealMapping);
+    }
+
+    return idealPartitionState;
+  }
+
+  /**
+   * compute the ideal mapping for resource in SEMI-AUTO based on its preference list
+   */
+  private Map<String, String> computeIdealMapping(List<String> instancePreferenceList,
+      StateModelDefinition stateModelDef, Set<String> liveEnabledInstances) {
+    Map<String, String> instanceStateMap = new HashMap<String, String>();
+
+    if (instancePreferenceList == null) {
+      return instanceStateMap;
+    }
+
+    List<String> statesPriorityList = stateModelDef.getStatesPriorityList();
+    boolean assigned[] = new boolean[instancePreferenceList.size()];
+
+    for (String state : statesPriorityList) {
+      String num = stateModelDef.getNumInstancesPerState(state);
+      int stateCount = -1;
+      if ("N".equals(num)) {
+        stateCount = liveEnabledInstances.size();
+      } else if ("R".equals(num)) {
+        stateCount = instancePreferenceList.size();
+      } else {
+        try {
+          stateCount = Integer.parseInt(num);
+        } catch (Exception e) {
+          LOG.error("Invalid count for state:" + state + " ,count=" + num);
+        }
+      }
+      if (stateCount > 0) {
+        int count = 0;
+        for (int i = 0; i < instancePreferenceList.size(); i++) {
+          String instanceName = instancePreferenceList.get(i);
+
+          if (!assigned[i]) {
+            instanceStateMap.put(instanceName, state);
+            count = count + 1;
+            assigned[i] = true;
+            if (count == stateCount) {
+              break;
+            }
+          }
+        }
+      }
+    }
+
+    return instanceStateMap;
+  }
+
+  @Override
+  public String toString() {
+    String verifierName = getClass().getSimpleName();
+    return verifierName + "(" + _clusterName + "@" + _zkClient.getServers() + "@resources["
+        + _resources != null ? Arrays.toString(_resources.toArray()) : "" + "])";
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/981d0e29/helix-core/src/test/java/org/apache/helix/tools/TestClusterVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/tools/TestClusterVerifier.java b/helix-core/src/test/java/org/apache/helix/tools/TestClusterVerifier.java
index 92d8640..5e251c8 100644
--- a/helix-core/src/test/java/org/apache/helix/tools/TestClusterVerifier.java
+++ b/helix-core/src/test/java/org/apache/helix/tools/TestClusterVerifier.java
@@ -30,6 +30,7 @@ import org.apache.helix.model.IdealState;
 import org.apache.helix.model.IdealState.RebalanceMode;
 import org.apache.helix.tools.ClusterStateVerifier.BestPossibleExternalViewVerifier;
 import org.apache.helix.tools.ClusterStateVerifier.HelixClusterVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.StrictMatchExternalViewVerifier;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -39,48 +40,50 @@ import java.util.Arrays;
 
 public class TestClusterVerifier extends ZkUnitTestBase {
   final String[] RESOURCES = {
-      "resource0", "resource1"
+      "resource0", "resource1", "resource2", "resource3"
   };
   private HelixAdmin _admin;
   private MockParticipantManager[] _participants;
   private ClusterControllerManager _controller;
   private String _clusterName;
+  private ClusterSetup _setupTool;
 
   @BeforeMethod
   public void beforeMethod() throws InterruptedException {
-    final int NUM_PARTITIONS = 1;
-    final int NUM_REPLICAS = 1;
+    final int NUM_PARTITIONS = 10;
+    final int NUM_REPLICAS = 3;
 
     // Cluster and resource setup
     String className = TestHelper.getTestClassName();
     String methodName = TestHelper.getTestMethodName();
     _clusterName = className + "_" + methodName;
-    ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
-    _admin = setupTool.getClusterManagementTool();
-    setupTool.addCluster(_clusterName, true);
-    setupTool.addResourceToCluster(_clusterName, RESOURCES[0], NUM_PARTITIONS,
+    _setupTool = new ClusterSetup(ZK_ADDR);
+    _admin = _setupTool.getClusterManagementTool();
+    _setupTool.addCluster(_clusterName, true);
+    _setupTool.addResourceToCluster(_clusterName, RESOURCES[0], NUM_PARTITIONS,
         BuiltInStateModelDefinitions.MasterSlave.name(), RebalanceMode.SEMI_AUTO.toString());
-    setupTool.addResourceToCluster(_clusterName, RESOURCES[1], NUM_PARTITIONS,
+    _setupTool.addResourceToCluster(_clusterName, RESOURCES[1], NUM_PARTITIONS,
         BuiltInStateModelDefinitions.OnlineOffline.name(), RebalanceMode.SEMI_AUTO.toString());
 
+    _setupTool.addResourceToCluster(_clusterName, RESOURCES[2], NUM_PARTITIONS,
+        BuiltInStateModelDefinitions.MasterSlave.name(), RebalanceMode.FULL_AUTO.toString());
+    _setupTool.addResourceToCluster(_clusterName, RESOURCES[3], NUM_PARTITIONS,
+        BuiltInStateModelDefinitions.OnlineOffline.name(), RebalanceMode.FULL_AUTO.toString());
+
     // Configure and start the participants
     _participants = new MockParticipantManager[RESOURCES.length];
     for (int i = 0; i < _participants.length; i++) {
       String host = "localhost";
       int port = 12918 + i;
       String id = host + '_' + port;
-      setupTool.addInstanceToCluster(_clusterName, id);
+      _setupTool.addInstanceToCluster(_clusterName, id);
       _participants[i] = new MockParticipantManager(ZK_ADDR, _clusterName, id);
       _participants[i].syncStart();
     }
 
     // Rebalance the resources
     for (int i = 0; i < RESOURCES.length; i++) {
-      IdealState idealState = _admin.getResourceIdealState(_clusterName, RESOURCES[i]);
-      idealState.setReplicas(Integer.toString(NUM_REPLICAS));
-      idealState.getRecord().setListField(RESOURCES[i] + "_0",
-          Arrays.asList(_participants[i].getInstanceName()));
-      _admin.setResourceIdealState(_clusterName, RESOURCES[i], idealState);
+      _setupTool.rebalanceResource(_clusterName, RESOURCES[i], NUM_REPLICAS);
     }
 
     // Start the controller
@@ -99,41 +102,58 @@ public class TestClusterVerifier extends ZkUnitTestBase {
     _admin.dropCluster(_clusterName);
   }
 
-  @Test public void testEntireCluster() {
+  @Test
+  public void testEntireCluster() throws InterruptedException {
     // Just ensure that the entire cluster passes
     // ensure that the external view coalesces
-    HelixClusterVerifier verifier =
-        new BestPossibleExternalViewVerifier.Builder().setClusterName(_clusterName)
-            .setZkClient(_gZkClient).build();
+    HelixClusterVerifier bestPossibleVerifier =
+        new BestPossibleExternalViewVerifier.Builder(_clusterName).setZkClient(_gZkClient).build();
+    Assert.assertTrue(bestPossibleVerifier.verify(10000));
 
-    boolean result = verifier.verify(10000);
-    Assert.assertTrue(result);
+    HelixClusterVerifier strictMatchVerifier =
+        new StrictMatchExternalViewVerifier.Builder(_clusterName).setZkClient(_gZkClient).build();
+    Assert.assertTrue(strictMatchVerifier.verify(10000));
+
+    _participants[0].syncStop();
+    Thread.sleep(1000);
+    Assert.assertFalse(strictMatchVerifier.verify(10000));
   }
 
-  @Test
-  public void testResourceSubset() throws InterruptedException {
+  @Test public void testResourceSubset() throws InterruptedException {
+    String testDB = "resource-testDB";
+    _setupTool.addResourceToCluster(_clusterName, testDB, 1,
+        BuiltInStateModelDefinitions.MasterSlave.name(), RebalanceMode.SEMI_AUTO.toString());
+
+    IdealState idealState = _admin.getResourceIdealState(_clusterName, testDB);
+    idealState.setReplicas(Integer.toString(2));
+    idealState.getRecord().setListField(testDB + "_0",
+        Arrays.asList(_participants[1].getInstanceName(), _participants[2].getInstanceName()));
+    _admin.setResourceIdealState(_clusterName, testDB, idealState);
+
     // Ensure that this passes even when one resource is down
     _admin.enableInstance(_clusterName, "localhost_12918", false);
     Thread.sleep(1000);
     _admin.enableCluster(_clusterName, false);
     _admin.enableInstance(_clusterName, "localhost_12918", true);
 
-
     HelixClusterVerifier verifier =
-        new BestPossibleExternalViewVerifier.Builder().setClusterName(_clusterName)
-            .setZkClient(_gZkClient).setResources(Sets.newHashSet(RESOURCES[1])).build();
+        new BestPossibleExternalViewVerifier.Builder(_clusterName).setZkClient(_gZkClient)
+            .setResources(Sets.newHashSet(testDB)).build();
+    Assert.assertTrue(verifier.verify());
 
-    boolean result = verifier.verify();
-    Assert.assertTrue(result);
+    verifier = new StrictMatchExternalViewVerifier.Builder(_clusterName).setZkClient(_gZkClient)
+        .setResources(Sets.newHashSet(testDB)).build();
+    Assert.assertTrue(verifier.verify());
 
     // But the full cluster verification should fail
+    verifier =
+        new BestPossibleExternalViewVerifier.Builder(_clusterName).setZkClient(_gZkClient).build();
+    Assert.assertFalse(verifier.verify());
 
     verifier =
-        new BestPossibleExternalViewVerifier.Builder().setClusterName(_clusterName)
-            .setZkClient(_gZkClient).build();
+        new StrictMatchExternalViewVerifier.Builder(_clusterName).setZkClient(_gZkClient).build();
+    Assert.assertFalse(verifier.verify());
 
-    result = verifier.verify();
-    Assert.assertFalse(result);
     _admin.enableCluster(_clusterName, true);
   }
 }