You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by lx...@apache.org on 2017/02/08 17:59:43 UTC

[08/38] helix git commit: Refactor: put all cluster verifiers into a sub-module of tools.

Refactor: put all cluster verifiers into a sub-module of tools.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/695228e0
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/695228e0
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/695228e0

Branch: refs/heads/helix-0.6.x
Commit: 695228e0ec6fd5fc48d5408633205f8ab935838d
Parents: 0f7c3e4
Author: Lei Xia <lx...@linkedin.com>
Authored: Mon Jul 18 16:06:22 2016 -0700
Committer: Lei Xia <lx...@linkedin.com>
Committed: Sun Feb 5 18:55:18 2017 -0800

----------------------------------------------------------------------
 .../webapp/TestHelixAdminScenariosRest.java     |   6 +-
 .../apache/helix/webapp/TestResetInstance.java  |   2 +-
 .../helix/webapp/TestResetPartitionState.java   |   2 +-
 .../apache/helix/webapp/TestResetResource.java  |   2 +-
 .../webapp/resources/TestJobQueuesResource.java |   2 +-
 .../org/apache/helix/agent/TestHelixAgent.java  |   4 +-
 .../tools/ClusterExternalViewVerifier.java      | 170 -----
 .../helix/tools/ClusterLiveNodesVerifier.java   |  45 --
 .../helix/tools/ClusterStateVerifier.java       | 749 -------------------
 .../ClusterExternalViewVerifier.java            | 171 +++++
 .../ClusterLiveNodesVerifier.java               |  45 ++
 .../ClusterStateVerifier.java                   | 734 ++++++++++++++++++
 .../ClusterStateVerifier/ClusterVerifier.java   | 147 ++++
 .../org/apache/helix/tools/ClusterVerifier.java | 147 ----
 .../apache/helix/tools/IntegrationTestUtil.java |   2 +
 .../java/org/apache/helix/ZkUnitTestBase.java   |   3 +-
 .../helix/integration/TestAddClusterV2.java     |   2 +-
 .../TestAddNodeAfterControllerStart.java        |   3 +-
 .../TestAddStateModelFactoryAfterConnect.java   |   4 +-
 .../integration/TestAutoIsWithEmptyMap.java     |   5 +-
 .../helix/integration/TestAutoRebalance.java    |   4 +-
 .../TestAutoRebalancePartitionLimit.java        |   4 +-
 .../TestAutoRebalanceWithDisabledInstance.java  |   2 +-
 .../helix/integration/TestBasicSpectator.java   |   2 +-
 .../helix/integration/TestBatchMessage.java     |   5 +-
 .../integration/TestBatchMessageWrapper.java    |   4 +-
 .../integration/TestBucketizedResource.java     |   6 +-
 .../integration/TestCarryOverBadCurState.java   |   7 +-
 .../integration/TestCleanupExternalView.java    |   2 +-
 .../integration/TestControllerLiveLock.java     |   4 +-
 .../TestCorrectnessOnConnectivityLoss.java      |   4 +-
 .../integration/TestCrushAutoRebalance.java     |   2 +-
 .../TestCustomizedIdealStateRebalancer.java     |   4 +-
 .../apache/helix/integration/TestDisable.java   |   4 +-
 .../TestDisableCustomCodeRunner.java            |   2 +-
 .../integration/TestDisableExternalView.java    |   4 +-
 .../helix/integration/TestDisableNode.java      |   2 +-
 .../helix/integration/TestDisablePartition.java |   2 +-
 .../helix/integration/TestDisableResource.java  |   2 +-
 .../integration/TestDistributedCMMain.java      |   4 +-
 .../TestDistributedClusterController.java       |   4 +-
 .../apache/helix/integration/TestDriver.java    |   3 +-
 .../org/apache/helix/integration/TestDrop.java  |   4 +-
 .../helix/integration/TestDropResource.java     |   2 +-
 .../integration/TestEnableCompression.java      |   5 +-
 .../TestEnablePartitionDuringDisable.java       |   4 +-
 .../integration/TestEntropyFreeNodeBounce.java  |   6 +-
 .../helix/integration/TestErrorPartition.java   |   2 +-
 .../integration/TestExternalViewUpdates.java    |   6 +-
 .../integration/TestFullAutoNodeTagging.java    |   6 +-
 .../integration/TestHelixCustomCodeRunner.java  |   2 +-
 .../helix/integration/TestHelixInstanceTag.java |   2 +-
 .../integration/TestInvalidAutoIdealState.java  |   5 +-
 .../TestInvalidResourceRebalance.java           |   2 +-
 .../helix/integration/TestMessageThrottle.java  |   7 +-
 .../helix/integration/TestMessageThrottle2.java |   4 +-
 .../integration/TestNonOfflineInitState.java    |   4 +-
 .../helix/integration/TestNullReplica.java      |   5 +-
 .../TestParticipantErrorMessage.java            |   4 +-
 .../TestPartitionLevelTransitionConstraint.java |   2 +-
 .../helix/integration/TestPauseSignal.java      |   4 +-
 .../TestRebalancerPersistAssignments.java       |   2 +-
 .../TestReelectedPipelineCorrectness.java       |   4 +-
 .../helix/integration/TestRenamePartition.java  |   2 +-
 .../helix/integration/TestResetInstance.java    |   2 +-
 .../integration/TestResetPartitionState.java    |   2 +-
 .../helix/integration/TestResetResource.java    |   2 +-
 .../integration/TestResourceGroupEndtoEnd.java  |  11 +-
 .../TestResourceWithSamePartitionKey.java       |   2 +-
 .../integration/TestRestartParticipant.java     |   4 +-
 .../helix/integration/TestSchemataSM.java       |   4 +-
 .../TestSessionExpiryInTransition.java          |   4 +-
 .../helix/integration/TestStandAloneCMMain.java |   2 +-
 .../TestStandAloneCMSessionExpiry.java          |   2 +-
 .../integration/TestStateTransitionTimeout.java |   4 +-
 .../helix/integration/TestSwapInstance.java     |   2 +-
 .../integration/TestZkCallbackHandlerLeak.java  |   2 +-
 .../helix/integration/TestZkReconnect.java      |   4 +-
 .../helix/integration/TestZkSessionExpiry.java  |   2 +-
 .../integration/ZkStandAloneCMTestBase.java     |   6 +-
 .../manager/TestConsecutiveZkSessionExpiry.java |   4 +-
 .../manager/TestControllerManager.java          |   4 +-
 .../TestDistributedControllerManager.java       |   4 +-
 .../manager/TestParticipantManager.java         |   5 +-
 .../integration/manager/TestStateModelLeak.java |   4 +-
 .../manager/TestZkCallbackHandlerLeak.java      |   2 +-
 .../helix/integration/task/TaskTestBase.java    |   2 +-
 .../task/TestTaskRebalancerStopResume.java      |   2 +-
 .../manager/zk/TestLiveInstanceBounce.java      |   2 +-
 .../handling/TestResourceThreadpoolSize.java    |   3 +-
 .../TestClusterStatusMonitorLifecycle.java      |   4 +-
 .../mbeans/TestResetClusterMetrics.java         |   2 +-
 .../helix/tools/TestClusterStateVerifier.java   |   7 +-
 .../apache/helix/tools/TestHelixAdminCli.java   |   5 +-
 94 files changed, 1238 insertions(+), 1273 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/695228e0/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestHelixAdminScenariosRest.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestHelixAdminScenariosRest.java b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestHelixAdminScenariosRest.java
index 0b49096..4e55c0c 100644
--- a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestHelixAdminScenariosRest.java
+++ b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestHelixAdminScenariosRest.java
@@ -41,9 +41,9 @@ import org.apache.helix.model.IdealState.IdealStateProperty;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.tools.ClusterSetup;
-import org.apache.helix.tools.ClusterStateVerifier;
-import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
-import org.apache.helix.tools.ClusterStateVerifier.MasterNbInExtViewVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.ClusterStateVerifier.MasterNbInExtViewVerifier;
 import org.apache.helix.webapp.resources.ClusterRepresentationUtil;
 import org.apache.helix.webapp.resources.InstancesResource.ListInstancesWrapper;
 import org.apache.helix.webapp.resources.JsonParameters;

http://git-wip-us.apache.org/repos/asf/helix/blob/695228e0/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestResetInstance.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestResetInstance.java b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestResetInstance.java
index fdcb1e5..ce84c23 100644
--- a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestResetInstance.java
+++ b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestResetInstance.java
@@ -29,7 +29,7 @@ import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.mock.participant.ErrTransition;
 import org.apache.helix.tools.ClusterSetup;
-import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.ClusterStateVerifier;
 import org.apache.helix.webapp.resources.JsonParameters;
 import org.testng.Assert;
 import org.testng.annotations.Test;

http://git-wip-us.apache.org/repos/asf/helix/blob/695228e0/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestResetPartitionState.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestResetPartitionState.java b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestResetPartitionState.java
index ddf2ec1..380713a 100644
--- a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestResetPartitionState.java
+++ b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestResetPartitionState.java
@@ -37,7 +37,7 @@ import org.apache.helix.mock.participant.ErrTransition;
 import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Message;
 import org.apache.helix.tools.ClusterSetup;
-import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.ClusterStateVerifier;
 import org.apache.helix.webapp.resources.JsonParameters;
 import org.apache.log4j.Logger;
 import org.testng.Assert;

http://git-wip-us.apache.org/repos/asf/helix/blob/695228e0/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestResetResource.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestResetResource.java b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestResetResource.java
index 64ed249..26e8219 100644
--- a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestResetResource.java
+++ b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestResetResource.java
@@ -29,7 +29,7 @@ import org.apache.helix.integration.manager.ClusterControllerManager;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.mock.participant.ErrTransition;
 import org.apache.helix.tools.ClusterSetup;
-import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.ClusterStateVerifier;
 import org.apache.helix.webapp.resources.JsonParameters;
 import org.testng.Assert;
 import org.testng.annotations.Test;

http://git-wip-us.apache.org/repos/asf/helix/blob/695228e0/helix-admin-webapp/src/test/java/org/apache/helix/webapp/resources/TestJobQueuesResource.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/resources/TestJobQueuesResource.java b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/resources/TestJobQueuesResource.java
index 5d8a93b..712925a 100644
--- a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/resources/TestJobQueuesResource.java
+++ b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/resources/TestJobQueuesResource.java
@@ -39,7 +39,7 @@ import org.apache.helix.task.TaskFactory;
 import org.apache.helix.task.TaskStateModelFactory;
 import org.apache.helix.task.beans.JobBean;
 import org.apache.helix.task.beans.WorkflowBean;
-import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.ClusterStateVerifier;
 import org.apache.helix.webapp.AdminTestBase;
 import org.apache.helix.webapp.AdminTestHelper;
 import org.apache.log4j.Logger;

http://git-wip-us.apache.org/repos/asf/helix/blob/695228e0/helix-agent/src/test/java/org/apache/helix/agent/TestHelixAgent.java
----------------------------------------------------------------------
diff --git a/helix-agent/src/test/java/org/apache/helix/agent/TestHelixAgent.java b/helix-agent/src/test/java/org/apache/helix/agent/TestHelixAgent.java
index 27b4d36..9d447d7 100644
--- a/helix-agent/src/test/java/org/apache/helix/agent/TestHelixAgent.java
+++ b/helix-agent/src/test/java/org/apache/helix/agent/TestHelixAgent.java
@@ -34,8 +34,8 @@ import org.apache.helix.model.HelixConfigScope;
 import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
 import org.apache.helix.model.builder.HelixConfigScopeBuilder;
 import org.apache.helix.tools.ClusterSetup;
-import org.apache.helix.tools.ClusterStateVerifier;
-import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
 import org.apache.log4j.Logger;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;

http://git-wip-us.apache.org/repos/asf/helix/blob/695228e0/helix-core/src/main/java/org/apache/helix/tools/ClusterExternalViewVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterExternalViewVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterExternalViewVerifier.java
deleted file mode 100644
index c483fbb..0000000
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterExternalViewVerifier.java
+++ /dev/null
@@ -1,170 +0,0 @@
-package org.apache.helix.tools;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.helix.controller.pipeline.Stage;
-import org.apache.helix.controller.pipeline.StageContext;
-import org.apache.helix.controller.stages.AttributeName;
-import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
-import org.apache.helix.controller.stages.BestPossibleStateOutput;
-import org.apache.helix.controller.stages.ClusterDataCache;
-import org.apache.helix.controller.stages.ClusterEvent;
-import org.apache.helix.controller.stages.CurrentStateComputationStage;
-import org.apache.helix.controller.stages.ResourceComputationStage;
-import org.apache.helix.manager.zk.ZkClient;
-import org.apache.helix.model.ExternalView;
-import org.apache.helix.model.Partition;
-import org.apache.log4j.Logger;
-
-/**
- * given zk, cluster, and a list of expected live-instances
- * check whether cluster's external-view reaches best-possible states
- */
-public class ClusterExternalViewVerifier extends ClusterVerifier {
-  private static Logger LOG = Logger.getLogger(ClusterExternalViewVerifier.class);
-
-  final List<String> _expectSortedLiveNodes; // always sorted
-
-  public ClusterExternalViewVerifier(ZkClient zkclient, String clusterName,
-      List<String> expectLiveNodes) {
-    super(zkclient, clusterName);
-    _expectSortedLiveNodes = expectLiveNodes;
-    Collections.sort(_expectSortedLiveNodes);
-  }
-
-  boolean verifyLiveNodes(List<String> actualLiveNodes) {
-    Collections.sort(actualLiveNodes);
-    return _expectSortedLiveNodes.equals(actualLiveNodes);
-  }
-
-  /**
-   * @param externalView
-   * @param bestPossibleState map of partition to map of instance to state
-   * @return
-   */
-  boolean verifyExternalView(ExternalView externalView,
-      Map<Partition, Map<String, String>> bestPossibleState) {
-    Map<String, Map<String, String>> bestPossibleStateMap =
-        convertBestPossibleState(bestPossibleState);
-    // trimBestPossibleState(bestPossibleStateMap);
-
-    Map<String, Map<String, String>> externalViewMap = externalView.getRecord().getMapFields();
-    return externalViewMap.equals(bestPossibleStateMap);
-  }
-
-  static void runStage(ClusterEvent event, Stage stage) throws Exception {
-    StageContext context = new StageContext();
-    stage.init(context);
-    stage.preProcess();
-    stage.process(event);
-    stage.postProcess();
-  }
-
-  BestPossibleStateOutput calculateBestPossibleState(ClusterDataCache cache) throws Exception {
-    ClusterEvent event = new ClusterEvent("event");
-    event.addAttribute("ClusterDataCache", cache);
-
-    List<Stage> stages = new ArrayList<Stage>();
-    stages.add(new ResourceComputationStage());
-    stages.add(new CurrentStateComputationStage());
-    stages.add(new BestPossibleStateCalcStage());
-
-    for (Stage stage : stages) {
-      runStage(event, stage);
-    }
-
-    return event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
-  }
-
-  /**
-   * remove empty map and DROPPED state from best possible state
-   * @param bestPossibleState
-   */
-  // static void trimBestPossibleState(Map<String, Map<String, String>> bestPossibleState) {
-  // Iterator<Entry<String, Map<String, String>>> iter = bestPossibleState.entrySet().iterator();
-  // while (iter.hasNext()) {
-  // Map.Entry<String, Map<String, String>> entry = iter.next();
-  // Map<String, String> instanceStateMap = entry.getValue();
-  // if (instanceStateMap.isEmpty()) {
-  // iter.remove();
-  // } else {
-  // // remove instances with DROPPED state
-  // Iterator<Map.Entry<String, String>> insIter = instanceStateMap.entrySet().iterator();
-  // while (insIter.hasNext()) {
-  // Map.Entry<String, String> insEntry = insIter.next();
-  // String state = insEntry.getValue();
-  // if (state.equalsIgnoreCase(HelixDefinedState.DROPPED.toString())) {
-  // insIter.remove();
-  // }
-  // }
-  // }
-  // }
-  // }
-
-  static Map<String, Map<String, String>> convertBestPossibleState(
-      Map<Partition, Map<String, String>> bestPossibleState) {
-    Map<String, Map<String, String>> result = new HashMap<String, Map<String, String>>();
-    for (Partition partition : bestPossibleState.keySet()) {
-      result.put(partition.getPartitionName(), bestPossibleState.get(partition));
-    }
-    return result;
-  }
-
-  @Override
-  public boolean verify() throws Exception {
-    ClusterDataCache cache = new ClusterDataCache();
-    cache.refresh(_accessor);
-
-    List<String> liveInstances = new ArrayList<String>();
-    liveInstances.addAll(cache.getLiveInstances().keySet());
-    boolean success = verifyLiveNodes(liveInstances);
-    if (!success) {
-      LOG.info("liveNodes not match, expect: " + _expectSortedLiveNodes + ", actual: "
-          + liveInstances);
-      return false;
-    }
-
-    BestPossibleStateOutput bestPossbileStates = calculateBestPossibleState(cache);
-    Map<String, ExternalView> externalViews =
-        _accessor.getChildValuesMap(_keyBuilder.externalViews());
-
-    // TODO all ideal-states should be included in external-views
-
-    for (String resourceName : externalViews.keySet()) {
-      ExternalView externalView = externalViews.get(resourceName);
-      Map<Partition, Map<String, String>> bestPossbileState =
-          bestPossbileStates.getResourceMap(resourceName);
-      success = verifyExternalView(externalView, bestPossbileState);
-      if (!success) {
-        LOG.info("external-view for resource: " + resourceName + " not match");
-        return false;
-      }
-    }
-
-    return true;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/695228e0/helix-core/src/main/java/org/apache/helix/tools/ClusterLiveNodesVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterLiveNodesVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterLiveNodesVerifier.java
deleted file mode 100644
index b9de466..0000000
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterLiveNodesVerifier.java
+++ /dev/null
@@ -1,45 +0,0 @@
-package org.apache.helix.tools;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.helix.manager.zk.ZkClient;
-
-public class ClusterLiveNodesVerifier extends ClusterVerifier {
-
-  final List<String> _expectSortedLiveNodes; // always sorted
-
-  public ClusterLiveNodesVerifier(ZkClient zkclient, String clusterName,
-      List<String> expectLiveNodes) {
-    super(zkclient, clusterName);
-    _expectSortedLiveNodes = expectLiveNodes;
-    Collections.sort(_expectSortedLiveNodes);
-  }
-
-  @Override
-  public boolean verify() throws Exception {
-    List<String> actualLiveNodes = _accessor.getChildNames(_keyBuilder.liveInstances());
-    Collections.sort(actualLiveNodes);
-    return _expectSortedLiveNodes.equals(actualLiveNodes);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/695228e0/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
deleted file mode 100644
index 4d4aaf4..0000000
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
+++ /dev/null
@@ -1,749 +0,0 @@
-package org.apache.helix.tools;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import org.I0Itec.zkclient.IZkChildListener;
-import org.I0Itec.zkclient.IZkDataListener;
-import org.I0Itec.zkclient.exception.ZkNodeExistsException;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.GnuParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixDefinedState;
-import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.PropertyPathBuilder;
-import org.apache.helix.PropertyType;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.controller.pipeline.Stage;
-import org.apache.helix.controller.pipeline.StageContext;
-import org.apache.helix.controller.stages.AttributeName;
-import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
-import org.apache.helix.controller.stages.BestPossibleStateOutput;
-import org.apache.helix.controller.stages.ClusterDataCache;
-import org.apache.helix.controller.stages.ClusterEvent;
-import org.apache.helix.controller.stages.CurrentStateComputationStage;
-import org.apache.helix.controller.stages.ResourceComputationStage;
-import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.manager.zk.ZkClient;
-import org.apache.helix.model.ExternalView;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.Partition;
-import org.apache.helix.model.Resource;
-import org.apache.helix.task.TaskConstants;
-import org.apache.helix.util.ZKClientPool;
-import org.apache.log4j.Logger;
-
-import com.google.common.collect.Sets;
-
-public class ClusterStateVerifier {
-  public static String cluster = "cluster";
-  public static String zkServerAddress = "zkSvr";
-  public static String help = "help";
-  public static String timeout = "timeout";
-  public static String period = "period";
-  public static String resources = "resources";
-
-  private static Logger LOG = Logger.getLogger(ClusterStateVerifier.class);
-
-  public interface Verifier {
-    boolean verify();
-  }
-
-  public interface ZkVerifier extends Verifier {
-    ZkClient getZkClient();
-
-    String getClusterName();
-  }
-
-  static class ExtViewVeriferZkListener implements IZkChildListener, IZkDataListener {
-    final CountDownLatch _countDown;
-    final ZkClient _zkClient;
-    final Verifier _verifier;
-
-    public ExtViewVeriferZkListener(CountDownLatch countDown, ZkClient zkClient, ZkVerifier verifier) {
-      _countDown = countDown;
-      _zkClient = zkClient;
-      _verifier = verifier;
-    }
-
-    @Override
-    public void handleDataChange(String dataPath, Object data) throws Exception {
-      boolean result = _verifier.verify();
-      if (result == true) {
-        _countDown.countDown();
-      }
-    }
-
-    @Override
-    public void handleDataDeleted(String dataPath) throws Exception {
-      // TODO Auto-generated method stub
-
-    }
-
-    @Override
-    public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
-      for (String child : currentChilds) {
-        String childPath = parentPath.equals("/") ? parentPath + child : parentPath + "/" + child;
-        _zkClient.subscribeDataChanges(childPath, this);
-      }
-
-      boolean result = _verifier.verify();
-      if (result == true) {
-        _countDown.countDown();
-      }
-    }
-
-  }
-
-  private static ZkClient validateAndGetClient(String zkAddr, String clusterName) {
-    if (zkAddr == null || clusterName == null) {
-      throw new IllegalArgumentException("requires zkAddr|clusterName");
-    }
-    return ZKClientPool.getZkClient(zkAddr);
-  }
-
-  /**
-   * verifier that verifies best possible state and external view
-   */
-  public static class BestPossAndExtViewZkVerifier implements ZkVerifier {
-    private final String clusterName;
-    private final Map<String, Map<String, String>> errStates;
-    private final ZkClient zkClient;
-    private final Set<String> resources;
-
-    public BestPossAndExtViewZkVerifier(String zkAddr, String clusterName) {
-      this(zkAddr, clusterName, null);
-    }
-
-    public BestPossAndExtViewZkVerifier(String zkAddr, String clusterName,
-        Map<String, Map<String, String>> errStates) {
-      this(zkAddr, clusterName, errStates, null);
-    }
-
-    public BestPossAndExtViewZkVerifier(String zkAddr, String clusterName,
-        Map<String, Map<String, String>> errStates, Set<String> resources) {
-      this(validateAndGetClient(zkAddr, clusterName), clusterName, errStates, resources);
-    }
-
-    public BestPossAndExtViewZkVerifier(ZkClient zkClient, String clusterName,
-        Map<String, Map<String, String>> errStates, Set<String> resources) {
-      if (zkClient == null || clusterName == null) {
-        throw new IllegalArgumentException("requires zkClient|clusterName");
-      }
-      this.clusterName = clusterName;
-      this.errStates = errStates;
-      this.zkClient = zkClient;
-      this.resources = resources;
-    }
-
-    @Override
-    public boolean verify() {
-      try {
-        HelixDataAccessor accessor =
-            new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(zkClient));
-
-        return ClusterStateVerifier.verifyBestPossAndExtView(accessor, errStates, clusterName,
-            resources);
-      } catch (Exception e) {
-        LOG.error("exception in verification", e);
-      }
-      return false;
-    }
-
-    @Override
-    public ZkClient getZkClient() {
-      return zkClient;
-    }
-
-    @Override
-    public String getClusterName() {
-      return clusterName;
-    }
-
-    @Override
-    public String toString() {
-      String verifierName = getClass().getName();
-      verifierName =
-          verifierName.substring(verifierName.lastIndexOf('.') + 1, verifierName.length());
-      return verifierName + "(" + clusterName + "@" + zkClient.getServers() + ")";
-    }
-  }
-
-  public static class MasterNbInExtViewVerifier implements ZkVerifier {
-    private final String clusterName;
-    private final ZkClient zkClient;
-
-    public MasterNbInExtViewVerifier(String zkAddr, String clusterName) {
-      this(validateAndGetClient(zkAddr, clusterName), clusterName);
-    }
-
-    public MasterNbInExtViewVerifier(ZkClient zkClient, String clusterName) {
-      if (zkClient == null || clusterName == null) {
-        throw new IllegalArgumentException("requires zkClient|clusterName");
-      }
-      this.clusterName = clusterName;
-      this.zkClient = zkClient;
-    }
-
-    @Override
-    public boolean verify() {
-      try {
-        ZKHelixDataAccessor accessor =
-            new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(zkClient));
-
-        return ClusterStateVerifier.verifyMasterNbInExtView(accessor);
-      } catch (Exception e) {
-        LOG.error("exception in verification", e);
-      }
-      return false;
-    }
-
-    @Override
-    public ZkClient getZkClient() {
-      return zkClient;
-    }
-
-    @Override
-    public String getClusterName() {
-      return clusterName;
-    }
-
-  }
-
-  static boolean verifyBestPossAndExtView(HelixDataAccessor accessor,
-      Map<String, Map<String, String>> errStates, String clusterName) {
-    return verifyBestPossAndExtView(accessor, errStates, clusterName, null);
-  }
-
-  static boolean verifyBestPossAndExtView(HelixDataAccessor accessor,
-      Map<String, Map<String, String>> errStates, String clusterName, Set<String> resources) {
-    try {
-      Builder keyBuilder = accessor.keyBuilder();
-      // read cluster once and do verification
-      ClusterDataCache cache = new ClusterDataCache();
-      cache.refresh(accessor);
-
-      Map<String, IdealState> idealStates = cache.getIdealStates();
-      if (idealStates == null) {
-        // ideal state is null because ideal state is dropped
-        idealStates = Collections.emptyMap();
-      }
-
-      // filter out all resources that use Task state model
-      Iterator<Map.Entry<String, IdealState>> it = idealStates.entrySet().iterator();
-      while (it.hasNext()) {
-        Map.Entry<String, IdealState> pair = it.next();
-        if (pair.getValue().getStateModelDefRef().equals(TaskConstants.STATE_MODEL_NAME)) {
-          it.remove();
-        }
-      }
-
-      Map<String, ExternalView> extViews = accessor.getChildValuesMap(keyBuilder.externalViews());
-      if (extViews == null) {
-        extViews = Collections.emptyMap();
-      }
-
-      // Filter resources if requested
-      if (resources != null && !resources.isEmpty()) {
-        idealStates.keySet().retainAll(resources);
-        extViews.keySet().retainAll(resources);
-      }
-
-      // if externalView is not empty and idealState doesn't exist
-      // add empty idealState for the resource
-      for (String resource : extViews.keySet()) {
-        if (!idealStates.containsKey(resource)) {
-          idealStates.put(resource, new IdealState(resource));
-        }
-      }
-
-      // calculate best possible state
-      BestPossibleStateOutput bestPossOutput =
-          ClusterStateVerifier.calcBestPossState(cache, resources);
-      Map<String, Map<Partition, Map<String, String>>> bestPossStateMap =
-          bestPossOutput.getStateMap();
-
-      // set error states
-      if (errStates != null) {
-        for (String resourceName : errStates.keySet()) {
-          Map<String, String> partErrStates = errStates.get(resourceName);
-          for (String partitionName : partErrStates.keySet()) {
-            String instanceName = partErrStates.get(partitionName);
-
-            if (!bestPossStateMap.containsKey(resourceName)) {
-              bestPossStateMap.put(resourceName, new HashMap<Partition, Map<String, String>>());
-            }
-            Partition partition = new Partition(partitionName);
-            if (!bestPossStateMap.get(resourceName).containsKey(partition)) {
-              bestPossStateMap.get(resourceName).put(partition, new HashMap<String, String>());
-            }
-            bestPossStateMap.get(resourceName).get(partition)
-                .put(instanceName, HelixDefinedState.ERROR.toString());
-          }
-        }
-      }
-
-      // System.out.println("stateMap: " + bestPossStateMap);
-
-      for (String resourceName : idealStates.keySet()) {
-        ExternalView extView = extViews.get(resourceName);
-        if (extView == null) {
-          IdealState is = idealStates.get(resourceName);
-          if (is.isExternalViewDisabled()) {
-            continue;
-          } else {
-            LOG.info("externalView for " + resourceName + " is not available");
-            return false;
-          }
-        }
-
-        // step 0: remove empty map and DROPPED state from best possible state
-        Map<Partition, Map<String, String>> bpStateMap =
-            bestPossOutput.getResourceMap(resourceName);
-        Iterator<Entry<Partition, Map<String, String>>> iter = bpStateMap.entrySet().iterator();
-        while (iter.hasNext()) {
-          Map.Entry<Partition, Map<String, String>> entry = iter.next();
-          Map<String, String> instanceStateMap = entry.getValue();
-          if (instanceStateMap.isEmpty()) {
-            iter.remove();
-          } else {
-            // remove instances with DROPPED state
-            Iterator<Map.Entry<String, String>> insIter = instanceStateMap.entrySet().iterator();
-            while (insIter.hasNext()) {
-              Map.Entry<String, String> insEntry = insIter.next();
-              String state = insEntry.getValue();
-              if (state.equalsIgnoreCase(HelixDefinedState.DROPPED.toString())) {
-                insIter.remove();
-              }
-            }
-          }
-        }
-
-        // System.err.println("resource: " + resourceName + ", bpStateMap: " + bpStateMap);
-
-        // step 1: externalView and bestPossibleState has equal size
-        int extViewSize = extView.getRecord().getMapFields().size();
-        int bestPossStateSize = bestPossOutput.getResourceMap(resourceName).size();
-        if (extViewSize != bestPossStateSize) {
-          LOG.info("exterView size (" + extViewSize + ") is different from bestPossState size ("
-              + bestPossStateSize + ") for resource: " + resourceName);
-
-          // System.err.println("exterView size (" + extViewSize
-          // + ") is different from bestPossState size (" + bestPossStateSize
-          // + ") for resource: " + resourceName);
-          // System.out.println("extView: " + extView.getRecord().getMapFields());
-          // System.out.println("bestPossState: " +
-          // bestPossOutput.getResourceMap(resourceName));
-          return false;
-        }
-
-        // step 2: every entry in external view is contained in best possible state
-        for (String partition : extView.getRecord().getMapFields().keySet()) {
-          Map<String, String> evInstanceStateMap = extView.getRecord().getMapField(partition);
-          Map<String, String> bpInstanceStateMap =
-              bestPossOutput.getInstanceStateMap(resourceName, new Partition(partition));
-
-          boolean result =
-              ClusterStateVerifier.<String, String> compareMap(evInstanceStateMap,
-                  bpInstanceStateMap);
-          if (result == false) {
-            LOG.info("externalView is different from bestPossibleState for partition:" + partition);
-
-            // System.err.println("externalView is different from bestPossibleState for partition: "
-            // + partition + ", actual: " + evInstanceStateMap + ", bestPoss: " +
-            // bpInstanceStateMap);
-            return false;
-          }
-        }
-      }
-      return true;
-    } catch (Exception e) {
-      LOG.error("exception in verification", e);
-      return false;
-    }
-
-  }
-
-  static boolean verifyMasterNbInExtView(HelixDataAccessor accessor) {
-    Builder keyBuilder = accessor.keyBuilder();
-
-    Map<String, IdealState> idealStates = accessor.getChildValuesMap(keyBuilder.idealStates());
-    if (idealStates == null || idealStates.size() == 0) {
-      LOG.info("No resource idealState");
-      return true;
-    }
-
-    Map<String, ExternalView> extViews = accessor.getChildValuesMap(keyBuilder.externalViews());
-    if (extViews == null || extViews.size() < idealStates.size()) {
-      LOG.info("No externalViews | externalView.size() < idealState.size()");
-      return false;
-    }
-
-    for (String resource : extViews.keySet()) {
-      int partitions = idealStates.get(resource).getNumPartitions();
-      Map<String, Map<String, String>> instanceStateMap =
-          extViews.get(resource).getRecord().getMapFields();
-      if (instanceStateMap.size() < partitions) {
-        LOG.info("Number of externalViews (" + instanceStateMap.size() + ") < partitions ("
-            + partitions + ")");
-        return false;
-      }
-
-      for (String partition : instanceStateMap.keySet()) {
-        boolean foundMaster = false;
-        for (String instance : instanceStateMap.get(partition).keySet()) {
-          if (instanceStateMap.get(partition).get(instance).equalsIgnoreCase("MASTER")) {
-            foundMaster = true;
-            break;
-          }
-        }
-        if (!foundMaster) {
-          LOG.info("No MASTER for partition: " + partition);
-          return false;
-        }
-      }
-    }
-    return true;
-  }
-
-  static void runStage(ClusterEvent event, Stage stage) throws Exception {
-    StageContext context = new StageContext();
-    stage.init(context);
-    stage.preProcess();
-    stage.process(event);
-    stage.postProcess();
-  }
-
-  /**
-   * calculate the best possible state note that DROPPED states are not checked since when
-   * kick off the BestPossibleStateCalcStage we are providing an empty current state map
-   * @param cache
-   * @return
-   * @throws Exception
-   */
-  static BestPossibleStateOutput calcBestPossState(ClusterDataCache cache) throws Exception {
-    return calcBestPossState(cache, null);
-  }
-
-  static BestPossibleStateOutput calcBestPossState(ClusterDataCache cache, Set<String> resources)
-      throws Exception {
-    ClusterEvent event = new ClusterEvent("sampleEvent");
-    event.addAttribute("ClusterDataCache", cache);
-
-    ResourceComputationStage rcState = new ResourceComputationStage();
-    CurrentStateComputationStage csStage = new CurrentStateComputationStage();
-    BestPossibleStateCalcStage bpStage = new BestPossibleStateCalcStage();
-
-    runStage(event, rcState);
-
-    // Filter resources if specified
-    if (resources != null) {
-      Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
-      resourceMap.keySet().retainAll(resources);
-    }
-
-    runStage(event, csStage);
-    runStage(event, bpStage);
-
-    BestPossibleStateOutput output =
-        event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
-
-    // System.out.println("output:" + output);
-    return output;
-  }
-
-  public static <K, V> boolean compareMap(Map<K, V> map1, Map<K, V> map2) {
-    boolean isEqual = true;
-    if (map1 == null && map2 == null) {
-      // OK
-    } else if (map1 == null && map2 != null) {
-      if (!map2.isEmpty()) {
-        isEqual = false;
-      }
-    } else if (map1 != null && map2 == null) {
-      if (!map1.isEmpty()) {
-        isEqual = false;
-      }
-    } else {
-      // verify size
-      if (map1.size() != map2.size()) {
-        isEqual = false;
-      }
-      // verify each <key, value> in map1 is contained in map2
-      for (K key : map1.keySet()) {
-        if (!map1.get(key).equals(map2.get(key))) {
-          LOG.debug("different value for key: " + key + "(map1: " + map1.get(key) + ", map2: "
-              + map2.get(key) + ")");
-          isEqual = false;
-          break;
-        }
-      }
-    }
-    return isEqual;
-  }
-
-  public static boolean verifyByPolling(Verifier verifier) {
-    return verifyByPolling(verifier, 30 * 1000);
-  }
-
-  public static boolean verifyByPolling(Verifier verifier, long timeout) {
-    return verifyByPolling(verifier, timeout, 1000);
-  }
-
-  public static boolean verifyByPolling(Verifier verifier, long timeout, long period) {
-    long startTime = System.currentTimeMillis();
-    boolean result = false;
-    try {
-      long curTime;
-      do {
-        Thread.sleep(period);
-        result = verifier.verify();
-        if (result == true) {
-          break;
-        }
-        curTime = System.currentTimeMillis();
-      } while (curTime <= startTime + timeout);
-      return result;
-    } catch (Exception e) {
-      // TODO Auto-generated catch block
-      e.printStackTrace();
-    } finally {
-      long endTime = System.currentTimeMillis();
-
-      // debug
-      System.err.println(result + ": " + verifier + ": wait " + (endTime - startTime)
-          + "ms to verify");
-
-    }
-    return false;
-  }
-
-  public static boolean verifyByZkCallback(ZkVerifier verifier) {
-    return verifyByZkCallback(verifier, 30000);
-  }
-
-  /**
-   * This function should be always single threaded
-   *
-   * @param verifier
-   * @param timeout
-   * @return
-   */
-  public static boolean verifyByZkCallback(ZkVerifier verifier, long timeout) {
-    long startTime = System.currentTimeMillis();
-    CountDownLatch countDown = new CountDownLatch(1);
-    ZkClient zkClient = verifier.getZkClient();
-    String clusterName = verifier.getClusterName();
-
-    // add an ephemeral node to /{clusterName}/CONFIGS/CLUSTER/verify
-    // so when analyze zk log, we know when a test ends
-    try {
-      zkClient.createEphemeral("/" + clusterName + "/CONFIGS/CLUSTER/verify");
-    } catch (ZkNodeExistsException ex) {
-      LOG.error("There is already a verification in progress", ex);
-      throw ex;
-    }
-
-    ExtViewVeriferZkListener listener = new ExtViewVeriferZkListener(countDown, zkClient, verifier);
-
-    String extViewPath = PropertyPathBuilder.getPath(PropertyType.EXTERNALVIEW, clusterName);
-    zkClient.subscribeChildChanges(extViewPath, listener);
-    for (String child : zkClient.getChildren(extViewPath)) {
-      String childPath = extViewPath.equals("/") ? extViewPath + child : extViewPath + "/" + child;
-      zkClient.subscribeDataChanges(childPath, listener);
-    }
-
-    // do initial verify
-    boolean result = verifier.verify();
-    if (result == false) {
-      try {
-        result = countDown.await(timeout, TimeUnit.MILLISECONDS);
-        if (result == false) {
-          // make a final try if timeout
-          result = verifier.verify();
-        }
-      } catch (Exception e) {
-        // TODO Auto-generated catch block
-        e.printStackTrace();
-      }
-    }
-
-    // clean up
-    zkClient.unsubscribeChildChanges(extViewPath, listener);
-    for (String child : zkClient.getChildren(extViewPath)) {
-      String childPath = extViewPath.equals("/") ? extViewPath + child : extViewPath + "/" + child;
-      zkClient.unsubscribeDataChanges(childPath, listener);
-    }
-
-    long endTime = System.currentTimeMillis();
-
-    zkClient.delete("/" + clusterName + "/CONFIGS/CLUSTER/verify");
-    // debug
-    System.err.println(result + ": wait " + (endTime - startTime) + "ms, " + verifier);
-
-    return result;
-  }
-
-  @SuppressWarnings("static-access")
-  private static Options constructCommandLineOptions() {
-    Option helpOption =
-        OptionBuilder.withLongOpt(help).withDescription("Prints command-line options info")
-            .create();
-
-    Option zkServerOption =
-        OptionBuilder.withLongOpt(zkServerAddress).withDescription("Provide zookeeper address")
-            .create();
-    zkServerOption.setArgs(1);
-    zkServerOption.setRequired(true);
-    zkServerOption.setArgName("ZookeeperServerAddress(Required)");
-
-    Option clusterOption =
-        OptionBuilder.withLongOpt(cluster).withDescription("Provide cluster name").create();
-    clusterOption.setArgs(1);
-    clusterOption.setRequired(true);
-    clusterOption.setArgName("Cluster name (Required)");
-
-    Option timeoutOption =
-        OptionBuilder.withLongOpt(timeout).withDescription("Timeout value for verification")
-            .create();
-    timeoutOption.setArgs(1);
-    timeoutOption.setArgName("Timeout value (Optional), default=30s");
-
-    Option sleepIntervalOption =
-        OptionBuilder.withLongOpt(period).withDescription("Polling period for verification")
-            .create();
-    sleepIntervalOption.setArgs(1);
-    sleepIntervalOption.setArgName("Polling period value (Optional), default=1s");
-
-    Option resourcesOption =
-        OptionBuilder.withLongOpt(resources).withDescription("Specific set of resources to verify")
-            .create();
-    resourcesOption.setArgs(1);
-    resourcesOption.setArgName("Comma-separated resource names, default is all resources");
-
-    Options options = new Options();
-    options.addOption(helpOption);
-    options.addOption(zkServerOption);
-    options.addOption(clusterOption);
-    options.addOption(timeoutOption);
-    options.addOption(sleepIntervalOption);
-    options.addOption(resourcesOption);
-
-    return options;
-  }
-
-  public static void printUsage(Options cliOptions) {
-    HelpFormatter helpFormatter = new HelpFormatter();
-    helpFormatter.setWidth(1000);
-    helpFormatter.printHelp("java " + ClusterSetup.class.getName(), cliOptions);
-  }
-
-  public static CommandLine processCommandLineArgs(String[] cliArgs) {
-    CommandLineParser cliParser = new GnuParser();
-    Options cliOptions = constructCommandLineOptions();
-    // CommandLine cmd = null;
-
-    try {
-      return cliParser.parse(cliOptions, cliArgs);
-    } catch (ParseException pe) {
-      System.err.println("CommandLineClient: failed to parse command-line options: "
-          + pe.toString());
-      printUsage(cliOptions);
-      System.exit(1);
-    }
-    return null;
-  }
-
-  public static boolean verifyState(String[] args) {
-    // TODO Auto-generated method stub
-    String clusterName = "storage-cluster";
-    String zkServer = "localhost:2181";
-    long timeoutValue = 0;
-    long periodValue = 1000;
-
-    Set<String> resourceSet = null;
-    if (args.length > 0) {
-      CommandLine cmd = processCommandLineArgs(args);
-      zkServer = cmd.getOptionValue(zkServerAddress);
-      clusterName = cmd.getOptionValue(cluster);
-      String timeoutStr = cmd.getOptionValue(timeout);
-      String periodStr = cmd.getOptionValue(period);
-      String resourceStr = cmd.getOptionValue(resources);
-
-      if (timeoutStr != null) {
-        try {
-          timeoutValue = Long.parseLong(timeoutStr);
-        } catch (Exception e) {
-          System.err.println("Exception in converting " + timeoutStr + " to long. Use default (0)");
-        }
-      }
-
-      if (periodStr != null) {
-        try {
-          periodValue = Long.parseLong(periodStr);
-        } catch (Exception e) {
-          System.err.println("Exception in converting " + periodStr
-              + " to long. Use default (1000)");
-        }
-      }
-
-      // Allow specifying resources explicitly
-      if (resourceStr != null) {
-        String[] resources = resourceStr.split("[\\s,]");
-        resourceSet = Sets.newHashSet(resources);
-      }
-
-    }
-    // return verifyByPolling(new BestPossAndExtViewZkVerifier(zkServer, clusterName),
-    // timeoutValue,
-    // periodValue);
-
-    ZkVerifier verifier;
-    if (resourceSet == null) {
-      verifier = new BestPossAndExtViewZkVerifier(zkServer, clusterName);
-    } else {
-      verifier = new BestPossAndExtViewZkVerifier(zkServer, clusterName, null, resourceSet);
-    }
-    return verifyByZkCallback(verifier, timeoutValue);
-  }
-
-  public static void main(String[] args) {
-    boolean result = verifyState(args);
-    System.out.println(result ? "Successful" : "failed");
-    System.exit(1);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/helix/blob/695228e0/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/ClusterExternalViewVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/ClusterExternalViewVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/ClusterExternalViewVerifier.java
new file mode 100644
index 0000000..4ec882e
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/ClusterExternalViewVerifier.java
@@ -0,0 +1,171 @@
+package org.apache.helix.tools.ClusterStateVerifier;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.controller.pipeline.Stage;
+import org.apache.helix.controller.pipeline.StageContext;
+import org.apache.helix.controller.stages.AttributeName;
+import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
+import org.apache.helix.controller.stages.BestPossibleStateOutput;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.controller.stages.ClusterEvent;
+import org.apache.helix.controller.stages.CurrentStateComputationStage;
+import org.apache.helix.controller.stages.ResourceComputationStage;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.Partition;
+import org.apache.log4j.Logger;
+
+/**
+ * given zk, cluster, and a list of expected live-instances
+ * check whether cluster's external-view reaches best-possible states
+ *
+ */
+public class ClusterExternalViewVerifier extends ClusterVerifier {
+  private static Logger LOG = Logger.getLogger(ClusterExternalViewVerifier.class);
+
+  final List<String> _expectSortedLiveNodes; // always sorted
+
+  public ClusterExternalViewVerifier(ZkClient zkclient, String clusterName,
+      List<String> expectLiveNodes) {
+    super(zkclient, clusterName);
+    _expectSortedLiveNodes = expectLiveNodes;
+    Collections.sort(_expectSortedLiveNodes);
+  }
+
+  boolean verifyLiveNodes(List<String> actualLiveNodes) {
+    Collections.sort(actualLiveNodes);
+    return _expectSortedLiveNodes.equals(actualLiveNodes);
+  }
+
+  /**
+   * @param externalView
+   * @param bestPossibleState map of partition to map of instance to state
+   * @return
+   */
+  boolean verifyExternalView(ExternalView externalView,
+      Map<Partition, Map<String, String>> bestPossibleState) {
+    Map<String, Map<String, String>> bestPossibleStateMap =
+        convertBestPossibleState(bestPossibleState);
+    // trimBestPossibleState(bestPossibleStateMap);
+
+    Map<String, Map<String, String>> externalViewMap = externalView.getRecord().getMapFields();
+    return externalViewMap.equals(bestPossibleStateMap);
+  }
+
+  static void runStage(ClusterEvent event, Stage stage) throws Exception {
+    StageContext context = new StageContext();
+    stage.init(context);
+    stage.preProcess();
+    stage.process(event);
+    stage.postProcess();
+  }
+
+  BestPossibleStateOutput calculateBestPossibleState(ClusterDataCache cache) throws Exception {
+    ClusterEvent event = new ClusterEvent("event");
+    event.addAttribute("ClusterDataCache", cache);
+
+    List<Stage> stages = new ArrayList<Stage>();
+    stages.add(new ResourceComputationStage());
+    stages.add(new CurrentStateComputationStage());
+    stages.add(new BestPossibleStateCalcStage());
+
+    for (Stage stage : stages) {
+      runStage(event, stage);
+    }
+
+    return event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
+  }
+
+  /**
+   * remove empty map and DROPPED state from best possible state
+   * @param bestPossibleState
+   */
+  // static void trimBestPossibleState(Map<String, Map<String, String>> bestPossibleState) {
+  // Iterator<Entry<String, Map<String, String>>> iter = bestPossibleState.entrySet().iterator();
+  // while (iter.hasNext()) {
+  // Map.Entry<String, Map<String, String>> entry = iter.next();
+  // Map<String, String> instanceStateMap = entry.getValue();
+  // if (instanceStateMap.isEmpty()) {
+  // iter.remove();
+  // } else {
+  // // remove instances with DROPPED state
+  // Iterator<Map.Entry<String, String>> insIter = instanceStateMap.entrySet().iterator();
+  // while (insIter.hasNext()) {
+  // Map.Entry<String, String> insEntry = insIter.next();
+  // String state = insEntry.getValue();
+  // if (state.equalsIgnoreCase(HelixDefinedState.DROPPED.toString())) {
+  // insIter.remove();
+  // }
+  // }
+  // }
+  // }
+  // }
+
+  static Map<String, Map<String, String>> convertBestPossibleState(
+      Map<Partition, Map<String, String>> bestPossibleState) {
+    Map<String, Map<String, String>> result = new HashMap<String, Map<String, String>>();
+    for (Partition partition : bestPossibleState.keySet()) {
+      result.put(partition.getPartitionName(), bestPossibleState.get(partition));
+    }
+    return result;
+  }
+
+  @Override
+  public boolean verify() throws Exception {
+    ClusterDataCache cache = new ClusterDataCache();
+    cache.refresh(_accessor);
+
+    List<String> liveInstances = new ArrayList<String>();
+    liveInstances.addAll(cache.getLiveInstances().keySet());
+    boolean success = verifyLiveNodes(liveInstances);
+    if (!success) {
+      LOG.info("liveNodes not match, expect: " + _expectSortedLiveNodes + ", actual: "
+          + liveInstances);
+      return false;
+    }
+
+    BestPossibleStateOutput bestPossbileStates = calculateBestPossibleState(cache);
+    Map<String, ExternalView> externalViews =
+        _accessor.getChildValuesMap(_keyBuilder.externalViews());
+
+    // TODO all ideal-states should be included in external-views
+
+    for (String resourceName : externalViews.keySet()) {
+      ExternalView externalView = externalViews.get(resourceName);
+      Map<Partition, Map<String, String>> bestPossbileState =
+          bestPossbileStates.getResourceMap(resourceName);
+      success = verifyExternalView(externalView, bestPossbileState);
+      if (!success) {
+        LOG.info("external-view for resource: " + resourceName + " not match");
+        return false;
+      }
+    }
+
+    return true;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/695228e0/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/ClusterLiveNodesVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/ClusterLiveNodesVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/ClusterLiveNodesVerifier.java
new file mode 100644
index 0000000..5c502e0
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/ClusterLiveNodesVerifier.java
@@ -0,0 +1,45 @@
+package org.apache.helix.tools.ClusterStateVerifier;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.helix.manager.zk.ZkClient;
+
+public class ClusterLiveNodesVerifier extends ClusterVerifier {
+
+  final List<String> _expectSortedLiveNodes; // always sorted
+
+  public ClusterLiveNodesVerifier(ZkClient zkclient, String clusterName,
+      List<String> expectLiveNodes) {
+    super(zkclient, clusterName);
+    _expectSortedLiveNodes = expectLiveNodes;
+    Collections.sort(_expectSortedLiveNodes);
+  }
+
+  @Override
+  public boolean verify() throws Exception {
+    List<String> actualLiveNodes = _accessor.getChildNames(_keyBuilder.liveInstances());
+    Collections.sort(actualLiveNodes);
+    return _expectSortedLiveNodes.equals(actualLiveNodes);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/695228e0/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/ClusterStateVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/ClusterStateVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/ClusterStateVerifier.java
new file mode 100644
index 0000000..f399f1f
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/ClusterStateVerifier.java
@@ -0,0 +1,734 @@
+package org.apache.helix.tools.ClusterStateVerifier;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.I0Itec.zkclient.IZkChildListener;
+import org.I0Itec.zkclient.IZkDataListener;
+import org.I0Itec.zkclient.exception.ZkNodeExistsException;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixDefinedState;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.PropertyPathBuilder;
+import org.apache.helix.PropertyType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.controller.pipeline.Stage;
+import org.apache.helix.controller.pipeline.StageContext;
+import org.apache.helix.controller.stages.AttributeName;
+import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
+import org.apache.helix.controller.stages.BestPossibleStateOutput;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.controller.stages.ClusterEvent;
+import org.apache.helix.controller.stages.CurrentStateComputationStage;
+import org.apache.helix.controller.stages.ResourceComputationStage;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.apache.helix.task.TaskConstants;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.util.ZKClientPool;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Sets;
+
+public class ClusterStateVerifier {
+  public static String cluster = "cluster";
+  public static String zkServerAddress = "zkSvr";
+  public static String help = "help";
+  public static String timeout = "timeout";
+  public static String period = "period";
+  public static String resources = "resources";
+
+  private static Logger LOG = Logger.getLogger(ClusterStateVerifier.class);
+
+  public interface Verifier {
+    boolean verify();
+  }
+
+  public interface ZkVerifier extends Verifier {
+    ZkClient getZkClient();
+
+    String getClusterName();
+  }
+
+  static class ExtViewVeriferZkListener implements IZkChildListener, IZkDataListener {
+    final CountDownLatch _countDown;
+    final ZkClient _zkClient;
+    final Verifier _verifier;
+
+    public ExtViewVeriferZkListener(CountDownLatch countDown, ZkClient zkClient, ZkVerifier verifier) {
+      _countDown = countDown;
+      _zkClient = zkClient;
+      _verifier = verifier;
+    }
+
+    @Override
+    public void handleDataChange(String dataPath, Object data) throws Exception {
+      boolean result = _verifier.verify();
+      if (result == true) {
+        _countDown.countDown();
+      }
+    }
+
+    @Override
+    public void handleDataDeleted(String dataPath) throws Exception {
+      // TODO Auto-generated method stub
+
+    }
+
+    @Override
+    public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
+      for (String child : currentChilds) {
+        String childPath = parentPath.equals("/") ? parentPath + child : parentPath + "/" + child;
+        _zkClient.subscribeDataChanges(childPath, this);
+      }
+
+      boolean result = _verifier.verify();
+      if (result == true) {
+        _countDown.countDown();
+      }
+    }
+  }
+
+  private static ZkClient validateAndGetClient(String zkAddr, String clusterName) {
+    if (zkAddr == null || clusterName == null) {
+      throw new IllegalArgumentException("requires zkAddr|clusterName");
+    }
+    return ZKClientPool.getZkClient(zkAddr);
+  }
+
+  public static class BestPossAndExtViewZkVerifier implements ZkVerifier {
+    private final String clusterName;
+    private final Map<String, Map<String, String>> errStates;
+    private final ZkClient zkClient;
+    private final Set<String> resources;
+
+    public BestPossAndExtViewZkVerifier(String zkAddr, String clusterName) {
+      this(zkAddr, clusterName, null);
+    }
+
+    public BestPossAndExtViewZkVerifier(String zkAddr, String clusterName,
+        Map<String, Map<String, String>> errStates) {
+      this(zkAddr, clusterName, errStates, null);
+    }
+
+    public BestPossAndExtViewZkVerifier(String zkAddr, String clusterName,
+        Map<String, Map<String, String>> errStates, Set<String> resources) {
+      this(validateAndGetClient(zkAddr, clusterName), clusterName, errStates, resources);
+    }
+
+    public BestPossAndExtViewZkVerifier(ZkClient zkClient, String clusterName,
+        Map<String, Map<String, String>> errStates, Set<String> resources) {
+      if (zkClient == null || clusterName == null) {
+        throw new IllegalArgumentException("requires zkClient|clusterName");
+      }
+      this.clusterName = clusterName;
+      this.errStates = errStates;
+      this.zkClient = zkClient;
+      this.resources = resources;
+    }
+
+    @Override
+    public boolean verify() {
+      try {
+        HelixDataAccessor accessor =
+            new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(zkClient));
+
+        return verifyBestPossAndExtView(accessor, errStates, clusterName, resources);
+      } catch (Exception e) {
+        LOG.error("exception in verification", e);
+      }
+      return false;
+    }
+
+    private boolean verifyBestPossAndExtView(HelixDataAccessor accessor,
+        Map<String, Map<String, String>> errStates, String clusterName, Set<String> resources) {
+      try {
+        PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+        // read cluster once and do verification
+        ClusterDataCache cache = new ClusterDataCache();
+        cache.refresh(accessor);
+
+        Map<String, IdealState> idealStates = cache.getIdealStates();
+        if (idealStates == null) {
+          // ideal state is null because ideal state is dropped
+          idealStates = Collections.emptyMap();
+        }
+
+        // filter out all resources that use Task state model
+        Iterator<Map.Entry<String, IdealState>> it = idealStates.entrySet().iterator();
+        while (it.hasNext()) {
+          Map.Entry<String, IdealState> pair = it.next();
+          if (pair.getValue().getStateModelDefRef().equals(TaskConstants.STATE_MODEL_NAME)) {
+            it.remove();
+          }
+        }
+
+        Map<String, ExternalView> extViews = accessor.getChildValuesMap(keyBuilder.externalViews());
+        if (extViews == null) {
+          extViews = Collections.emptyMap();
+        }
+
+        // Filter resources if requested
+        if (resources != null && !resources.isEmpty()) {
+          idealStates.keySet().retainAll(resources);
+          extViews.keySet().retainAll(resources);
+        }
+
+        // if externalView is not empty and idealState doesn't exist
+        // add empty idealState for the resource
+        for (String resource : extViews.keySet()) {
+          if (!idealStates.containsKey(resource)) {
+            idealStates.put(resource, new IdealState(resource));
+          }
+        }
+
+        // calculate best possible state
+        BestPossibleStateOutput bestPossOutput = calcBestPossState(cache, resources);
+        Map<String, Map<Partition, Map<String, String>>> bestPossStateMap =
+            bestPossOutput.getStateMap();
+
+        // set error states
+        if (errStates != null) {
+          for (String resourceName : errStates.keySet()) {
+            Map<String, String> partErrStates = errStates.get(resourceName);
+            for (String partitionName : partErrStates.keySet()) {
+              String instanceName = partErrStates.get(partitionName);
+
+              if (!bestPossStateMap.containsKey(resourceName)) {
+                bestPossStateMap.put(resourceName, new HashMap<Partition, Map<String, String>>());
+              }
+              Partition partition = new Partition(partitionName);
+              if (!bestPossStateMap.get(resourceName).containsKey(partition)) {
+                bestPossStateMap.get(resourceName).put(partition, new HashMap<String, String>());
+              }
+              bestPossStateMap.get(resourceName).get(partition)
+                  .put(instanceName, HelixDefinedState.ERROR.toString());
+            }
+          }
+        }
+
+        // System.out.println("stateMap: " + bestPossStateMap);
+
+        for (String resourceName : idealStates.keySet()) {
+          ExternalView extView = extViews.get(resourceName);
+          if (extView == null) {
+            IdealState is = idealStates.get(resourceName);
+            if (is.isExternalViewDisabled()) {
+              continue;
+            } else {
+              LOG.info("externalView for " + resourceName + " is not available");
+              return false;
+            }
+          }
+
+          // step 0: remove empty map and DROPPED state from best possible state
+          Map<Partition, Map<String, String>> bpStateMap =
+              bestPossOutput.getResourceMap(resourceName);
+          Iterator<Map.Entry<Partition, Map<String, String>>> iter = bpStateMap.entrySet().iterator();
+          while (iter.hasNext()) {
+            Map.Entry<Partition, Map<String, String>> entry = iter.next();
+            Map<String, String> instanceStateMap = entry.getValue();
+            if (instanceStateMap.isEmpty()) {
+              iter.remove();
+            } else {
+              // remove instances with DROPPED state
+              Iterator<Map.Entry<String, String>> insIter = instanceStateMap.entrySet().iterator();
+              while (insIter.hasNext()) {
+                Map.Entry<String, String> insEntry = insIter.next();
+                String state = insEntry.getValue();
+                if (state.equalsIgnoreCase(HelixDefinedState.DROPPED.toString())) {
+                  insIter.remove();
+                }
+              }
+            }
+          }
+
+          // System.err.println("resource: " + resourceName + ", bpStateMap: " + bpStateMap);
+
+          // step 1: externalView and bestPossibleState has equal size
+          int extViewSize = extView.getRecord().getMapFields().size();
+          int bestPossStateSize = bestPossOutput.getResourceMap(resourceName).size();
+          if (extViewSize != bestPossStateSize) {
+            LOG.info("exterView size (" + extViewSize + ") is different from bestPossState size ("
+                + bestPossStateSize + ") for resource: " + resourceName);
+
+            // System.err.println("exterView size (" + extViewSize
+            // + ") is different from bestPossState size (" + bestPossStateSize
+            // + ") for resource: " + resourceName);
+            // System.out.println("extView: " + extView.getRecord().getMapFields());
+            // System.out.println("bestPossState: " +
+            // bestPossOutput.getResourceMap(resourceName));
+            return false;
+          }
+
+          // step 2: every entry in external view is contained in best possible state
+          for (String partition : extView.getRecord().getMapFields().keySet()) {
+            Map<String, String> evInstanceStateMap = extView.getRecord().getMapField(partition);
+            Map<String, String> bpInstanceStateMap =
+                bestPossOutput.getInstanceStateMap(resourceName, new Partition(partition));
+
+            boolean result = compareMap(evInstanceStateMap, bpInstanceStateMap);
+            if (result == false) {
+              LOG.info("externalView is different from bestPossibleState for partition:" + partition);
+
+              // System.err.println("externalView is different from bestPossibleState for partition: "
+              // + partition + ", actual: " + evInstanceStateMap + ", bestPoss: " +
+              // bpInstanceStateMap);
+              return false;
+            }
+          }
+        }
+        return true;
+      } catch (Exception e) {
+        LOG.error("exception in verification", e);
+        return false;
+      }
+    }
+
+    /**
+     * calculate the best possible state note that DROPPED states are not checked since when
+     * kick off the BestPossibleStateCalcStage we are providing an empty current state map
+     *
+     * @param cache
+     * @return
+     * @throws Exception
+     */
+    private BestPossibleStateOutput calcBestPossState(ClusterDataCache cache, Set<String> resources)
+        throws Exception {
+      ClusterEvent event = new ClusterEvent("sampleEvent");
+      event.addAttribute("ClusterDataCache", cache);
+
+      ResourceComputationStage rcState = new ResourceComputationStage();
+      CurrentStateComputationStage csStage = new CurrentStateComputationStage();
+      BestPossibleStateCalcStage bpStage = new BestPossibleStateCalcStage();
+
+      runStage(event, rcState);
+
+      // Filter resources if specified
+      if (resources != null) {
+        Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+        resourceMap.keySet().retainAll(resources);
+      }
+
+      runStage(event, csStage);
+      runStage(event, bpStage);
+
+      BestPossibleStateOutput output =
+          event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
+
+      // System.out.println("output:" + output);
+      return output;
+    }
+
+    private void runStage(ClusterEvent event, Stage stage) throws Exception {
+      StageContext context = new StageContext();
+      stage.init(context);
+      stage.preProcess();
+      stage.process(event);
+      stage.postProcess();
+    }
+
+    private <K, V> boolean compareMap(Map<K, V> map1, Map<K, V> map2) {
+      boolean isEqual = true;
+      if (map1 == null && map2 == null) {
+        // OK
+      } else if (map1 == null && map2 != null) {
+        if (!map2.isEmpty()) {
+          isEqual = false;
+        }
+      } else if (map1 != null && map2 == null) {
+        if (!map1.isEmpty()) {
+          isEqual = false;
+        }
+      } else {
+        // verify size
+        if (map1.size() != map2.size()) {
+          isEqual = false;
+        }
+        // verify each <key, value> in map1 is contained in map2
+        for (K key : map1.keySet()) {
+          if (!map1.get(key).equals(map2.get(key))) {
+            LOG.debug(
+                "different value for key: " + key + "(map1: " + map1.get(key) + ", map2: " + map2
+                    .get(key) + ")");
+            isEqual = false;
+            break;
+          }
+        }
+      }
+      return isEqual;
+    }
+
+    @Override
+    public ZkClient getZkClient() {
+      return zkClient;
+    }
+
+    @Override
+    public String getClusterName() {
+      return clusterName;
+    }
+
+    @Override
+    public String toString() {
+      String verifierName = getClass().getName();
+      verifierName = verifierName.substring(verifierName.lastIndexOf('.') + 1, verifierName.length());
+      return verifierName + "(" + clusterName + "@" + zkClient.getServers() + ")";
+    }
+  }
+
+
+  public static class MasterNbInExtViewVerifier implements ZkVerifier {
+    private final String clusterName;
+    private final ZkClient zkClient;
+
+    public MasterNbInExtViewVerifier(String zkAddr, String clusterName) {
+      this(validateAndGetClient(zkAddr, clusterName), clusterName);
+    }
+
+    public MasterNbInExtViewVerifier(ZkClient zkClient, String clusterName) {
+      if (zkClient == null || clusterName == null) {
+        throw new IllegalArgumentException("requires zkClient|clusterName");
+      }
+      this.clusterName = clusterName;
+      this.zkClient = zkClient;
+    }
+
+    @Override
+    public boolean verify() {
+      try {
+        ZKHelixDataAccessor accessor =
+            new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(zkClient));
+
+        return verifyMasterNbInExtView(accessor);
+      } catch (Exception e) {
+        LOG.error("exception in verification", e);
+      }
+      return false;
+    }
+
+    @Override
+    public ZkClient getZkClient() {
+      return zkClient;
+    }
+
+    @Override
+    public String getClusterName() {
+      return clusterName;
+    }
+
+    private boolean verifyMasterNbInExtView(HelixDataAccessor accessor) {
+      Builder keyBuilder = accessor.keyBuilder();
+
+      Map<String, IdealState> idealStates = accessor.getChildValuesMap(keyBuilder.idealStates());
+      if (idealStates == null || idealStates.size() == 0) {
+        LOG.info("No resource idealState");
+        return true;
+      }
+
+      Map<String, ExternalView> extViews = accessor.getChildValuesMap(keyBuilder.externalViews());
+      if (extViews == null || extViews.size() < idealStates.size()) {
+        LOG.info("No externalViews | externalView.size() < idealState.size()");
+        return false;
+      }
+
+      for (String resource : extViews.keySet()) {
+        int partitions = idealStates.get(resource).getNumPartitions();
+        Map<String, Map<String, String>> instanceStateMap =
+            extViews.get(resource).getRecord().getMapFields();
+        if (instanceStateMap.size() < partitions) {
+          LOG.info("Number of externalViews (" + instanceStateMap.size() + ") < partitions ("
+              + partitions + ")");
+          return false;
+        }
+
+        for (String partition : instanceStateMap.keySet()) {
+          boolean foundMaster = false;
+          for (String instance : instanceStateMap.get(partition).keySet()) {
+            if (instanceStateMap.get(partition).get(instance).equalsIgnoreCase("MASTER")) {
+              foundMaster = true;
+              break;
+            }
+          }
+          if (!foundMaster) {
+            LOG.info("No MASTER for partition: " + partition);
+            return false;
+          }
+        }
+      }
+      return true;
+    }
+  }
+
+  public static boolean verifyByPolling(Verifier verifier) {
+    return verifyByPolling(verifier, 30 * 1000);
+  }
+
+  public static boolean verifyByPolling(Verifier verifier, long timeout) {
+    return verifyByPolling(verifier, timeout, 1000);
+  }
+
+  public static boolean verifyByPolling(Verifier verifier, long timeout, long period) {
+    long startTime = System.currentTimeMillis();
+    boolean result = false;
+    try {
+      long curTime;
+      do {
+        Thread.sleep(period);
+        result = verifier.verify();
+        if (result == true) {
+          break;
+        }
+        curTime = System.currentTimeMillis();
+      } while (curTime <= startTime + timeout);
+      return result;
+    } catch (Exception e) {
+      // TODO Auto-generated catch block
+      e.printStackTrace();
+    } finally {
+      long endTime = System.currentTimeMillis();
+
+      // debug
+      System.err.println(result + ": " + verifier + ": wait " + (endTime - startTime)
+          + "ms to verify");
+
+    }
+    return false;
+  }
+
+  public static boolean verifyByZkCallback(ZkVerifier verifier) {
+    return verifyByZkCallback(verifier, 30000);
+  }
+
+  /**
+   * This function should be always single threaded
+   *
+   * @param verifier
+   * @param timeout
+   * @return
+   */
+  public static boolean verifyByZkCallback(ZkVerifier verifier, long timeout) {
+    long startTime = System.currentTimeMillis();
+    CountDownLatch countDown = new CountDownLatch(1);
+    ZkClient zkClient = verifier.getZkClient();
+    String clusterName = verifier.getClusterName();
+
+    // add an ephemeral node to /{clusterName}/CONFIGS/CLUSTER/verify
+    // so when analyze zk log, we know when a test ends
+    try {
+      zkClient.createEphemeral("/" + clusterName + "/CONFIGS/CLUSTER/verify");
+    } catch (ZkNodeExistsException ex) {
+      LOG.error("There is already a verification in progress", ex);
+      throw ex;
+    }
+
+    ExtViewVeriferZkListener listener = new ExtViewVeriferZkListener(countDown, zkClient, verifier);
+
+    String extViewPath = PropertyPathBuilder.getPath(PropertyType.EXTERNALVIEW, clusterName);
+    zkClient.subscribeChildChanges(extViewPath, listener);
+    for (String child : zkClient.getChildren(extViewPath)) {
+      String childPath = extViewPath.equals("/") ? extViewPath + child : extViewPath + "/" + child;
+      zkClient.subscribeDataChanges(childPath, listener);
+    }
+
+    // do initial verify
+    boolean result = verifier.verify();
+    if (result == false) {
+      try {
+        result = countDown.await(timeout, TimeUnit.MILLISECONDS);
+        if (result == false) {
+          // make a final try if timeout
+          result = verifier.verify();
+        }
+      } catch (Exception e) {
+        // TODO Auto-generated catch block
+        e.printStackTrace();
+      }
+    }
+
+    // clean up
+    zkClient.unsubscribeChildChanges(extViewPath, listener);
+    for (String child : zkClient.getChildren(extViewPath)) {
+      String childPath = extViewPath.equals("/") ? extViewPath + child : extViewPath + "/" + child;
+      zkClient.unsubscribeDataChanges(childPath, listener);
+    }
+
+    long endTime = System.currentTimeMillis();
+
+    zkClient.delete("/" + clusterName + "/CONFIGS/CLUSTER/verify");
+    // debug
+    System.err.println(result + ": wait " + (endTime - startTime) + "ms, " + verifier);
+
+    return result;
+  }
+
+  @SuppressWarnings("static-access")
+  private static Options constructCommandLineOptions() {
+    Option helpOption =
+        OptionBuilder.withLongOpt(help).withDescription("Prints command-line options info")
+            .create();
+
+    Option zkServerOption =
+        OptionBuilder.withLongOpt(zkServerAddress).withDescription("Provide zookeeper address")
+            .create();
+    zkServerOption.setArgs(1);
+    zkServerOption.setRequired(true);
+    zkServerOption.setArgName("ZookeeperServerAddress(Required)");
+
+    Option clusterOption =
+        OptionBuilder.withLongOpt(cluster).withDescription("Provide cluster name").create();
+    clusterOption.setArgs(1);
+    clusterOption.setRequired(true);
+    clusterOption.setArgName("Cluster name (Required)");
+
+    Option timeoutOption =
+        OptionBuilder.withLongOpt(timeout).withDescription("Timeout value for verification")
+            .create();
+    timeoutOption.setArgs(1);
+    timeoutOption.setArgName("Timeout value (Optional), default=30s");
+
+    Option sleepIntervalOption =
+        OptionBuilder.withLongOpt(period).withDescription("Polling period for verification")
+            .create();
+    sleepIntervalOption.setArgs(1);
+    sleepIntervalOption.setArgName("Polling period value (Optional), default=1s");
+
+    Option resourcesOption =
+        OptionBuilder.withLongOpt(resources).withDescription("Specific set of resources to verify")
+            .create();
+    resourcesOption.setArgs(1);
+    resourcesOption.setArgName("Comma-separated resource names, default is all resources");
+
+    Options options = new Options();
+    options.addOption(helpOption);
+    options.addOption(zkServerOption);
+    options.addOption(clusterOption);
+    options.addOption(timeoutOption);
+    options.addOption(sleepIntervalOption);
+    options.addOption(resourcesOption);
+
+    return options;
+  }
+
+  public static void printUsage(Options cliOptions) {
+    HelpFormatter helpFormatter = new HelpFormatter();
+    helpFormatter.setWidth(1000);
+    helpFormatter.printHelp("java " + ClusterSetup.class.getName(), cliOptions);
+  }
+
+  public static CommandLine processCommandLineArgs(String[] cliArgs) {
+    CommandLineParser cliParser = new GnuParser();
+    Options cliOptions = constructCommandLineOptions();
+    // CommandLine cmd = null;
+
+    try {
+      return cliParser.parse(cliOptions, cliArgs);
+    } catch (ParseException pe) {
+      System.err.println("CommandLineClient: failed to parse command-line options: "
+          + pe.toString());
+      printUsage(cliOptions);
+      System.exit(1);
+    }
+    return null;
+  }
+
+  public static boolean verifyState(String[] args) {
+    // TODO Auto-generated method stub
+    String clusterName = "storage-cluster";
+    String zkServer = "localhost:2181";
+    long timeoutValue = 0;
+    long periodValue = 1000;
+
+    Set<String> resourceSet = null;
+    if (args.length > 0) {
+      CommandLine cmd = processCommandLineArgs(args);
+      zkServer = cmd.getOptionValue(zkServerAddress);
+      clusterName = cmd.getOptionValue(cluster);
+      String timeoutStr = cmd.getOptionValue(timeout);
+      String periodStr = cmd.getOptionValue(period);
+      String resourceStr = cmd.getOptionValue(resources);
+
+      if (timeoutStr != null) {
+        try {
+          timeoutValue = Long.parseLong(timeoutStr);
+        } catch (Exception e) {
+          System.err.println("Exception in converting " + timeoutStr + " to long. Use default (0)");
+        }
+      }
+
+      if (periodStr != null) {
+        try {
+          periodValue = Long.parseLong(periodStr);
+        } catch (Exception e) {
+          System.err.println("Exception in converting " + periodStr
+              + " to long. Use default (1000)");
+        }
+      }
+
+      // Allow specifying resources explicitly
+      if (resourceStr != null) {
+        String[] resources = resourceStr.split("[\\s,]");
+        resourceSet = Sets.newHashSet(resources);
+      }
+
+    }
+    // return verifyByPolling(new BestPossAndExtViewZkVerifier(zkServer, clusterName),
+    // timeoutValue,
+    // periodValue);
+
+    ZkVerifier verifier;
+    if (resourceSet == null) {
+      verifier = new BestPossAndExtViewZkVerifier(zkServer, clusterName);
+    } else {
+      verifier = new BestPossAndExtViewZkVerifier(zkServer, clusterName, null, resourceSet);
+    }
+    return verifyByZkCallback(verifier, timeoutValue);
+  }
+
+  public static void main(String[] args) {
+    boolean result = verifyState(args);
+    System.out.println(result ? "Successful" : "failed");
+    System.exit(1);
+  }
+
+}