You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by lx...@apache.org on 2017/02/08 17:59:43 UTC
[08/38] helix git commit: Refactor: put all cluster verifiers into a
sub-module of tools.
Refactor: put all cluster verifiers into a sub-module of tools.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/695228e0
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/695228e0
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/695228e0
Branch: refs/heads/helix-0.6.x
Commit: 695228e0ec6fd5fc48d5408633205f8ab935838d
Parents: 0f7c3e4
Author: Lei Xia <lx...@linkedin.com>
Authored: Mon Jul 18 16:06:22 2016 -0700
Committer: Lei Xia <lx...@linkedin.com>
Committed: Sun Feb 5 18:55:18 2017 -0800
----------------------------------------------------------------------
.../webapp/TestHelixAdminScenariosRest.java | 6 +-
.../apache/helix/webapp/TestResetInstance.java | 2 +-
.../helix/webapp/TestResetPartitionState.java | 2 +-
.../apache/helix/webapp/TestResetResource.java | 2 +-
.../webapp/resources/TestJobQueuesResource.java | 2 +-
.../org/apache/helix/agent/TestHelixAgent.java | 4 +-
.../tools/ClusterExternalViewVerifier.java | 170 -----
.../helix/tools/ClusterLiveNodesVerifier.java | 45 --
.../helix/tools/ClusterStateVerifier.java | 749 -------------------
.../ClusterExternalViewVerifier.java | 171 +++++
.../ClusterLiveNodesVerifier.java | 45 ++
.../ClusterStateVerifier.java | 734 ++++++++++++++++++
.../ClusterStateVerifier/ClusterVerifier.java | 147 ++++
.../org/apache/helix/tools/ClusterVerifier.java | 147 ----
.../apache/helix/tools/IntegrationTestUtil.java | 2 +
.../java/org/apache/helix/ZkUnitTestBase.java | 3 +-
.../helix/integration/TestAddClusterV2.java | 2 +-
.../TestAddNodeAfterControllerStart.java | 3 +-
.../TestAddStateModelFactoryAfterConnect.java | 4 +-
.../integration/TestAutoIsWithEmptyMap.java | 5 +-
.../helix/integration/TestAutoRebalance.java | 4 +-
.../TestAutoRebalancePartitionLimit.java | 4 +-
.../TestAutoRebalanceWithDisabledInstance.java | 2 +-
.../helix/integration/TestBasicSpectator.java | 2 +-
.../helix/integration/TestBatchMessage.java | 5 +-
.../integration/TestBatchMessageWrapper.java | 4 +-
.../integration/TestBucketizedResource.java | 6 +-
.../integration/TestCarryOverBadCurState.java | 7 +-
.../integration/TestCleanupExternalView.java | 2 +-
.../integration/TestControllerLiveLock.java | 4 +-
.../TestCorrectnessOnConnectivityLoss.java | 4 +-
.../integration/TestCrushAutoRebalance.java | 2 +-
.../TestCustomizedIdealStateRebalancer.java | 4 +-
.../apache/helix/integration/TestDisable.java | 4 +-
.../TestDisableCustomCodeRunner.java | 2 +-
.../integration/TestDisableExternalView.java | 4 +-
.../helix/integration/TestDisableNode.java | 2 +-
.../helix/integration/TestDisablePartition.java | 2 +-
.../helix/integration/TestDisableResource.java | 2 +-
.../integration/TestDistributedCMMain.java | 4 +-
.../TestDistributedClusterController.java | 4 +-
.../apache/helix/integration/TestDriver.java | 3 +-
.../org/apache/helix/integration/TestDrop.java | 4 +-
.../helix/integration/TestDropResource.java | 2 +-
.../integration/TestEnableCompression.java | 5 +-
.../TestEnablePartitionDuringDisable.java | 4 +-
.../integration/TestEntropyFreeNodeBounce.java | 6 +-
.../helix/integration/TestErrorPartition.java | 2 +-
.../integration/TestExternalViewUpdates.java | 6 +-
.../integration/TestFullAutoNodeTagging.java | 6 +-
.../integration/TestHelixCustomCodeRunner.java | 2 +-
.../helix/integration/TestHelixInstanceTag.java | 2 +-
.../integration/TestInvalidAutoIdealState.java | 5 +-
.../TestInvalidResourceRebalance.java | 2 +-
.../helix/integration/TestMessageThrottle.java | 7 +-
.../helix/integration/TestMessageThrottle2.java | 4 +-
.../integration/TestNonOfflineInitState.java | 4 +-
.../helix/integration/TestNullReplica.java | 5 +-
.../TestParticipantErrorMessage.java | 4 +-
.../TestPartitionLevelTransitionConstraint.java | 2 +-
.../helix/integration/TestPauseSignal.java | 4 +-
.../TestRebalancerPersistAssignments.java | 2 +-
.../TestReelectedPipelineCorrectness.java | 4 +-
.../helix/integration/TestRenamePartition.java | 2 +-
.../helix/integration/TestResetInstance.java | 2 +-
.../integration/TestResetPartitionState.java | 2 +-
.../helix/integration/TestResetResource.java | 2 +-
.../integration/TestResourceGroupEndtoEnd.java | 11 +-
.../TestResourceWithSamePartitionKey.java | 2 +-
.../integration/TestRestartParticipant.java | 4 +-
.../helix/integration/TestSchemataSM.java | 4 +-
.../TestSessionExpiryInTransition.java | 4 +-
.../helix/integration/TestStandAloneCMMain.java | 2 +-
.../TestStandAloneCMSessionExpiry.java | 2 +-
.../integration/TestStateTransitionTimeout.java | 4 +-
.../helix/integration/TestSwapInstance.java | 2 +-
.../integration/TestZkCallbackHandlerLeak.java | 2 +-
.../helix/integration/TestZkReconnect.java | 4 +-
.../helix/integration/TestZkSessionExpiry.java | 2 +-
.../integration/ZkStandAloneCMTestBase.java | 6 +-
.../manager/TestConsecutiveZkSessionExpiry.java | 4 +-
.../manager/TestControllerManager.java | 4 +-
.../TestDistributedControllerManager.java | 4 +-
.../manager/TestParticipantManager.java | 5 +-
.../integration/manager/TestStateModelLeak.java | 4 +-
.../manager/TestZkCallbackHandlerLeak.java | 2 +-
.../helix/integration/task/TaskTestBase.java | 2 +-
.../task/TestTaskRebalancerStopResume.java | 2 +-
.../manager/zk/TestLiveInstanceBounce.java | 2 +-
.../handling/TestResourceThreadpoolSize.java | 3 +-
.../TestClusterStatusMonitorLifecycle.java | 4 +-
.../mbeans/TestResetClusterMetrics.java | 2 +-
.../helix/tools/TestClusterStateVerifier.java | 7 +-
.../apache/helix/tools/TestHelixAdminCli.java | 5 +-
94 files changed, 1238 insertions(+), 1273 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/695228e0/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestHelixAdminScenariosRest.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestHelixAdminScenariosRest.java b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestHelixAdminScenariosRest.java
index 0b49096..4e55c0c 100644
--- a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestHelixAdminScenariosRest.java
+++ b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestHelixAdminScenariosRest.java
@@ -41,9 +41,9 @@ import org.apache.helix.model.IdealState.IdealStateProperty;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.tools.ClusterSetup;
-import org.apache.helix.tools.ClusterStateVerifier;
-import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
-import org.apache.helix.tools.ClusterStateVerifier.MasterNbInExtViewVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.ClusterStateVerifier.MasterNbInExtViewVerifier;
import org.apache.helix.webapp.resources.ClusterRepresentationUtil;
import org.apache.helix.webapp.resources.InstancesResource.ListInstancesWrapper;
import org.apache.helix.webapp.resources.JsonParameters;
http://git-wip-us.apache.org/repos/asf/helix/blob/695228e0/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestResetInstance.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestResetInstance.java b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestResetInstance.java
index fdcb1e5..ce84c23 100644
--- a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestResetInstance.java
+++ b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestResetInstance.java
@@ -29,7 +29,7 @@ import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.mock.participant.ErrTransition;
import org.apache.helix.tools.ClusterSetup;
-import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.ClusterStateVerifier;
import org.apache.helix.webapp.resources.JsonParameters;
import org.testng.Assert;
import org.testng.annotations.Test;
http://git-wip-us.apache.org/repos/asf/helix/blob/695228e0/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestResetPartitionState.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestResetPartitionState.java b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestResetPartitionState.java
index ddf2ec1..380713a 100644
--- a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestResetPartitionState.java
+++ b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestResetPartitionState.java
@@ -37,7 +37,7 @@ import org.apache.helix.mock.participant.ErrTransition;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Message;
import org.apache.helix.tools.ClusterSetup;
-import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.ClusterStateVerifier;
import org.apache.helix.webapp.resources.JsonParameters;
import org.apache.log4j.Logger;
import org.testng.Assert;
http://git-wip-us.apache.org/repos/asf/helix/blob/695228e0/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestResetResource.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestResetResource.java b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestResetResource.java
index 64ed249..26e8219 100644
--- a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestResetResource.java
+++ b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/TestResetResource.java
@@ -29,7 +29,7 @@ import org.apache.helix.integration.manager.ClusterControllerManager;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.mock.participant.ErrTransition;
import org.apache.helix.tools.ClusterSetup;
-import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.ClusterStateVerifier;
import org.apache.helix.webapp.resources.JsonParameters;
import org.testng.Assert;
import org.testng.annotations.Test;
http://git-wip-us.apache.org/repos/asf/helix/blob/695228e0/helix-admin-webapp/src/test/java/org/apache/helix/webapp/resources/TestJobQueuesResource.java
----------------------------------------------------------------------
diff --git a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/resources/TestJobQueuesResource.java b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/resources/TestJobQueuesResource.java
index 5d8a93b..712925a 100644
--- a/helix-admin-webapp/src/test/java/org/apache/helix/webapp/resources/TestJobQueuesResource.java
+++ b/helix-admin-webapp/src/test/java/org/apache/helix/webapp/resources/TestJobQueuesResource.java
@@ -39,7 +39,7 @@ import org.apache.helix.task.TaskFactory;
import org.apache.helix.task.TaskStateModelFactory;
import org.apache.helix.task.beans.JobBean;
import org.apache.helix.task.beans.WorkflowBean;
-import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.ClusterStateVerifier;
import org.apache.helix.webapp.AdminTestBase;
import org.apache.helix.webapp.AdminTestHelper;
import org.apache.log4j.Logger;
http://git-wip-us.apache.org/repos/asf/helix/blob/695228e0/helix-agent/src/test/java/org/apache/helix/agent/TestHelixAgent.java
----------------------------------------------------------------------
diff --git a/helix-agent/src/test/java/org/apache/helix/agent/TestHelixAgent.java b/helix-agent/src/test/java/org/apache/helix/agent/TestHelixAgent.java
index 27b4d36..9d447d7 100644
--- a/helix-agent/src/test/java/org/apache/helix/agent/TestHelixAgent.java
+++ b/helix-agent/src/test/java/org/apache/helix/agent/TestHelixAgent.java
@@ -34,8 +34,8 @@ import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.helix.tools.ClusterSetup;
-import org.apache.helix.tools.ClusterStateVerifier;
-import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
import org.apache.log4j.Logger;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
http://git-wip-us.apache.org/repos/asf/helix/blob/695228e0/helix-core/src/main/java/org/apache/helix/tools/ClusterExternalViewVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterExternalViewVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterExternalViewVerifier.java
deleted file mode 100644
index c483fbb..0000000
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterExternalViewVerifier.java
+++ /dev/null
@@ -1,170 +0,0 @@
-package org.apache.helix.tools;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.helix.controller.pipeline.Stage;
-import org.apache.helix.controller.pipeline.StageContext;
-import org.apache.helix.controller.stages.AttributeName;
-import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
-import org.apache.helix.controller.stages.BestPossibleStateOutput;
-import org.apache.helix.controller.stages.ClusterDataCache;
-import org.apache.helix.controller.stages.ClusterEvent;
-import org.apache.helix.controller.stages.CurrentStateComputationStage;
-import org.apache.helix.controller.stages.ResourceComputationStage;
-import org.apache.helix.manager.zk.ZkClient;
-import org.apache.helix.model.ExternalView;
-import org.apache.helix.model.Partition;
-import org.apache.log4j.Logger;
-
-/**
- * given zk, cluster, and a list of expected live-instances
- * check whether cluster's external-view reaches best-possible states
- */
-public class ClusterExternalViewVerifier extends ClusterVerifier {
- private static Logger LOG = Logger.getLogger(ClusterExternalViewVerifier.class);
-
- final List<String> _expectSortedLiveNodes; // always sorted
-
- public ClusterExternalViewVerifier(ZkClient zkclient, String clusterName,
- List<String> expectLiveNodes) {
- super(zkclient, clusterName);
- _expectSortedLiveNodes = expectLiveNodes;
- Collections.sort(_expectSortedLiveNodes);
- }
-
- boolean verifyLiveNodes(List<String> actualLiveNodes) {
- Collections.sort(actualLiveNodes);
- return _expectSortedLiveNodes.equals(actualLiveNodes);
- }
-
- /**
- * @param externalView
- * @param bestPossibleState map of partition to map of instance to state
- * @return
- */
- boolean verifyExternalView(ExternalView externalView,
- Map<Partition, Map<String, String>> bestPossibleState) {
- Map<String, Map<String, String>> bestPossibleStateMap =
- convertBestPossibleState(bestPossibleState);
- // trimBestPossibleState(bestPossibleStateMap);
-
- Map<String, Map<String, String>> externalViewMap = externalView.getRecord().getMapFields();
- return externalViewMap.equals(bestPossibleStateMap);
- }
-
- static void runStage(ClusterEvent event, Stage stage) throws Exception {
- StageContext context = new StageContext();
- stage.init(context);
- stage.preProcess();
- stage.process(event);
- stage.postProcess();
- }
-
- BestPossibleStateOutput calculateBestPossibleState(ClusterDataCache cache) throws Exception {
- ClusterEvent event = new ClusterEvent("event");
- event.addAttribute("ClusterDataCache", cache);
-
- List<Stage> stages = new ArrayList<Stage>();
- stages.add(new ResourceComputationStage());
- stages.add(new CurrentStateComputationStage());
- stages.add(new BestPossibleStateCalcStage());
-
- for (Stage stage : stages) {
- runStage(event, stage);
- }
-
- return event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
- }
-
- /**
- * remove empty map and DROPPED state from best possible state
- * @param bestPossibleState
- */
- // static void trimBestPossibleState(Map<String, Map<String, String>> bestPossibleState) {
- // Iterator<Entry<String, Map<String, String>>> iter = bestPossibleState.entrySet().iterator();
- // while (iter.hasNext()) {
- // Map.Entry<String, Map<String, String>> entry = iter.next();
- // Map<String, String> instanceStateMap = entry.getValue();
- // if (instanceStateMap.isEmpty()) {
- // iter.remove();
- // } else {
- // // remove instances with DROPPED state
- // Iterator<Map.Entry<String, String>> insIter = instanceStateMap.entrySet().iterator();
- // while (insIter.hasNext()) {
- // Map.Entry<String, String> insEntry = insIter.next();
- // String state = insEntry.getValue();
- // if (state.equalsIgnoreCase(HelixDefinedState.DROPPED.toString())) {
- // insIter.remove();
- // }
- // }
- // }
- // }
- // }
-
- static Map<String, Map<String, String>> convertBestPossibleState(
- Map<Partition, Map<String, String>> bestPossibleState) {
- Map<String, Map<String, String>> result = new HashMap<String, Map<String, String>>();
- for (Partition partition : bestPossibleState.keySet()) {
- result.put(partition.getPartitionName(), bestPossibleState.get(partition));
- }
- return result;
- }
-
- @Override
- public boolean verify() throws Exception {
- ClusterDataCache cache = new ClusterDataCache();
- cache.refresh(_accessor);
-
- List<String> liveInstances = new ArrayList<String>();
- liveInstances.addAll(cache.getLiveInstances().keySet());
- boolean success = verifyLiveNodes(liveInstances);
- if (!success) {
- LOG.info("liveNodes not match, expect: " + _expectSortedLiveNodes + ", actual: "
- + liveInstances);
- return false;
- }
-
- BestPossibleStateOutput bestPossbileStates = calculateBestPossibleState(cache);
- Map<String, ExternalView> externalViews =
- _accessor.getChildValuesMap(_keyBuilder.externalViews());
-
- // TODO all ideal-states should be included in external-views
-
- for (String resourceName : externalViews.keySet()) {
- ExternalView externalView = externalViews.get(resourceName);
- Map<Partition, Map<String, String>> bestPossbileState =
- bestPossbileStates.getResourceMap(resourceName);
- success = verifyExternalView(externalView, bestPossbileState);
- if (!success) {
- LOG.info("external-view for resource: " + resourceName + " not match");
- return false;
- }
- }
-
- return true;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/695228e0/helix-core/src/main/java/org/apache/helix/tools/ClusterLiveNodesVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterLiveNodesVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterLiveNodesVerifier.java
deleted file mode 100644
index b9de466..0000000
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterLiveNodesVerifier.java
+++ /dev/null
@@ -1,45 +0,0 @@
-package org.apache.helix.tools;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.helix.manager.zk.ZkClient;
-
-public class ClusterLiveNodesVerifier extends ClusterVerifier {
-
- final List<String> _expectSortedLiveNodes; // always sorted
-
- public ClusterLiveNodesVerifier(ZkClient zkclient, String clusterName,
- List<String> expectLiveNodes) {
- super(zkclient, clusterName);
- _expectSortedLiveNodes = expectLiveNodes;
- Collections.sort(_expectSortedLiveNodes);
- }
-
- @Override
- public boolean verify() throws Exception {
- List<String> actualLiveNodes = _accessor.getChildNames(_keyBuilder.liveInstances());
- Collections.sort(actualLiveNodes);
- return _expectSortedLiveNodes.equals(actualLiveNodes);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/695228e0/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
deleted file mode 100644
index 4d4aaf4..0000000
--- a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java
+++ /dev/null
@@ -1,749 +0,0 @@
-package org.apache.helix.tools;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import org.I0Itec.zkclient.IZkChildListener;
-import org.I0Itec.zkclient.IZkDataListener;
-import org.I0Itec.zkclient.exception.ZkNodeExistsException;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.CommandLineParser;
-import org.apache.commons.cli.GnuParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Option;
-import org.apache.commons.cli.OptionBuilder;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixDefinedState;
-import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.PropertyPathBuilder;
-import org.apache.helix.PropertyType;
-import org.apache.helix.ZNRecord;
-import org.apache.helix.controller.pipeline.Stage;
-import org.apache.helix.controller.pipeline.StageContext;
-import org.apache.helix.controller.stages.AttributeName;
-import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
-import org.apache.helix.controller.stages.BestPossibleStateOutput;
-import org.apache.helix.controller.stages.ClusterDataCache;
-import org.apache.helix.controller.stages.ClusterEvent;
-import org.apache.helix.controller.stages.CurrentStateComputationStage;
-import org.apache.helix.controller.stages.ResourceComputationStage;
-import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.manager.zk.ZkBaseDataAccessor;
-import org.apache.helix.manager.zk.ZkClient;
-import org.apache.helix.model.ExternalView;
-import org.apache.helix.model.IdealState;
-import org.apache.helix.model.Partition;
-import org.apache.helix.model.Resource;
-import org.apache.helix.task.TaskConstants;
-import org.apache.helix.util.ZKClientPool;
-import org.apache.log4j.Logger;
-
-import com.google.common.collect.Sets;
-
-public class ClusterStateVerifier {
- public static String cluster = "cluster";
- public static String zkServerAddress = "zkSvr";
- public static String help = "help";
- public static String timeout = "timeout";
- public static String period = "period";
- public static String resources = "resources";
-
- private static Logger LOG = Logger.getLogger(ClusterStateVerifier.class);
-
- public interface Verifier {
- boolean verify();
- }
-
- public interface ZkVerifier extends Verifier {
- ZkClient getZkClient();
-
- String getClusterName();
- }
-
- static class ExtViewVeriferZkListener implements IZkChildListener, IZkDataListener {
- final CountDownLatch _countDown;
- final ZkClient _zkClient;
- final Verifier _verifier;
-
- public ExtViewVeriferZkListener(CountDownLatch countDown, ZkClient zkClient, ZkVerifier verifier) {
- _countDown = countDown;
- _zkClient = zkClient;
- _verifier = verifier;
- }
-
- @Override
- public void handleDataChange(String dataPath, Object data) throws Exception {
- boolean result = _verifier.verify();
- if (result == true) {
- _countDown.countDown();
- }
- }
-
- @Override
- public void handleDataDeleted(String dataPath) throws Exception {
- // TODO Auto-generated method stub
-
- }
-
- @Override
- public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
- for (String child : currentChilds) {
- String childPath = parentPath.equals("/") ? parentPath + child : parentPath + "/" + child;
- _zkClient.subscribeDataChanges(childPath, this);
- }
-
- boolean result = _verifier.verify();
- if (result == true) {
- _countDown.countDown();
- }
- }
-
- }
-
- private static ZkClient validateAndGetClient(String zkAddr, String clusterName) {
- if (zkAddr == null || clusterName == null) {
- throw new IllegalArgumentException("requires zkAddr|clusterName");
- }
- return ZKClientPool.getZkClient(zkAddr);
- }
-
- /**
- * verifier that verifies best possible state and external view
- */
- public static class BestPossAndExtViewZkVerifier implements ZkVerifier {
- private final String clusterName;
- private final Map<String, Map<String, String>> errStates;
- private final ZkClient zkClient;
- private final Set<String> resources;
-
- public BestPossAndExtViewZkVerifier(String zkAddr, String clusterName) {
- this(zkAddr, clusterName, null);
- }
-
- public BestPossAndExtViewZkVerifier(String zkAddr, String clusterName,
- Map<String, Map<String, String>> errStates) {
- this(zkAddr, clusterName, errStates, null);
- }
-
- public BestPossAndExtViewZkVerifier(String zkAddr, String clusterName,
- Map<String, Map<String, String>> errStates, Set<String> resources) {
- this(validateAndGetClient(zkAddr, clusterName), clusterName, errStates, resources);
- }
-
- public BestPossAndExtViewZkVerifier(ZkClient zkClient, String clusterName,
- Map<String, Map<String, String>> errStates, Set<String> resources) {
- if (zkClient == null || clusterName == null) {
- throw new IllegalArgumentException("requires zkClient|clusterName");
- }
- this.clusterName = clusterName;
- this.errStates = errStates;
- this.zkClient = zkClient;
- this.resources = resources;
- }
-
- @Override
- public boolean verify() {
- try {
- HelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(zkClient));
-
- return ClusterStateVerifier.verifyBestPossAndExtView(accessor, errStates, clusterName,
- resources);
- } catch (Exception e) {
- LOG.error("exception in verification", e);
- }
- return false;
- }
-
- @Override
- public ZkClient getZkClient() {
- return zkClient;
- }
-
- @Override
- public String getClusterName() {
- return clusterName;
- }
-
- @Override
- public String toString() {
- String verifierName = getClass().getName();
- verifierName =
- verifierName.substring(verifierName.lastIndexOf('.') + 1, verifierName.length());
- return verifierName + "(" + clusterName + "@" + zkClient.getServers() + ")";
- }
- }
-
- public static class MasterNbInExtViewVerifier implements ZkVerifier {
- private final String clusterName;
- private final ZkClient zkClient;
-
- public MasterNbInExtViewVerifier(String zkAddr, String clusterName) {
- this(validateAndGetClient(zkAddr, clusterName), clusterName);
- }
-
- public MasterNbInExtViewVerifier(ZkClient zkClient, String clusterName) {
- if (zkClient == null || clusterName == null) {
- throw new IllegalArgumentException("requires zkClient|clusterName");
- }
- this.clusterName = clusterName;
- this.zkClient = zkClient;
- }
-
- @Override
- public boolean verify() {
- try {
- ZKHelixDataAccessor accessor =
- new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(zkClient));
-
- return ClusterStateVerifier.verifyMasterNbInExtView(accessor);
- } catch (Exception e) {
- LOG.error("exception in verification", e);
- }
- return false;
- }
-
- @Override
- public ZkClient getZkClient() {
- return zkClient;
- }
-
- @Override
- public String getClusterName() {
- return clusterName;
- }
-
- }
-
- static boolean verifyBestPossAndExtView(HelixDataAccessor accessor,
- Map<String, Map<String, String>> errStates, String clusterName) {
- return verifyBestPossAndExtView(accessor, errStates, clusterName, null);
- }
-
- static boolean verifyBestPossAndExtView(HelixDataAccessor accessor,
- Map<String, Map<String, String>> errStates, String clusterName, Set<String> resources) {
- try {
- Builder keyBuilder = accessor.keyBuilder();
- // read cluster once and do verification
- ClusterDataCache cache = new ClusterDataCache();
- cache.refresh(accessor);
-
- Map<String, IdealState> idealStates = cache.getIdealStates();
- if (idealStates == null) {
- // ideal state is null because ideal state is dropped
- idealStates = Collections.emptyMap();
- }
-
- // filter out all resources that use Task state model
- Iterator<Map.Entry<String, IdealState>> it = idealStates.entrySet().iterator();
- while (it.hasNext()) {
- Map.Entry<String, IdealState> pair = it.next();
- if (pair.getValue().getStateModelDefRef().equals(TaskConstants.STATE_MODEL_NAME)) {
- it.remove();
- }
- }
-
- Map<String, ExternalView> extViews = accessor.getChildValuesMap(keyBuilder.externalViews());
- if (extViews == null) {
- extViews = Collections.emptyMap();
- }
-
- // Filter resources if requested
- if (resources != null && !resources.isEmpty()) {
- idealStates.keySet().retainAll(resources);
- extViews.keySet().retainAll(resources);
- }
-
- // if externalView is not empty and idealState doesn't exist
- // add empty idealState for the resource
- for (String resource : extViews.keySet()) {
- if (!idealStates.containsKey(resource)) {
- idealStates.put(resource, new IdealState(resource));
- }
- }
-
- // calculate best possible state
- BestPossibleStateOutput bestPossOutput =
- ClusterStateVerifier.calcBestPossState(cache, resources);
- Map<String, Map<Partition, Map<String, String>>> bestPossStateMap =
- bestPossOutput.getStateMap();
-
- // set error states
- if (errStates != null) {
- for (String resourceName : errStates.keySet()) {
- Map<String, String> partErrStates = errStates.get(resourceName);
- for (String partitionName : partErrStates.keySet()) {
- String instanceName = partErrStates.get(partitionName);
-
- if (!bestPossStateMap.containsKey(resourceName)) {
- bestPossStateMap.put(resourceName, new HashMap<Partition, Map<String, String>>());
- }
- Partition partition = new Partition(partitionName);
- if (!bestPossStateMap.get(resourceName).containsKey(partition)) {
- bestPossStateMap.get(resourceName).put(partition, new HashMap<String, String>());
- }
- bestPossStateMap.get(resourceName).get(partition)
- .put(instanceName, HelixDefinedState.ERROR.toString());
- }
- }
- }
-
- // System.out.println("stateMap: " + bestPossStateMap);
-
- for (String resourceName : idealStates.keySet()) {
- ExternalView extView = extViews.get(resourceName);
- if (extView == null) {
- IdealState is = idealStates.get(resourceName);
- if (is.isExternalViewDisabled()) {
- continue;
- } else {
- LOG.info("externalView for " + resourceName + " is not available");
- return false;
- }
- }
-
- // step 0: remove empty map and DROPPED state from best possible state
- Map<Partition, Map<String, String>> bpStateMap =
- bestPossOutput.getResourceMap(resourceName);
- Iterator<Entry<Partition, Map<String, String>>> iter = bpStateMap.entrySet().iterator();
- while (iter.hasNext()) {
- Map.Entry<Partition, Map<String, String>> entry = iter.next();
- Map<String, String> instanceStateMap = entry.getValue();
- if (instanceStateMap.isEmpty()) {
- iter.remove();
- } else {
- // remove instances with DROPPED state
- Iterator<Map.Entry<String, String>> insIter = instanceStateMap.entrySet().iterator();
- while (insIter.hasNext()) {
- Map.Entry<String, String> insEntry = insIter.next();
- String state = insEntry.getValue();
- if (state.equalsIgnoreCase(HelixDefinedState.DROPPED.toString())) {
- insIter.remove();
- }
- }
- }
- }
-
- // System.err.println("resource: " + resourceName + ", bpStateMap: " + bpStateMap);
-
- // step 1: externalView and bestPossibleState has equal size
- int extViewSize = extView.getRecord().getMapFields().size();
- int bestPossStateSize = bestPossOutput.getResourceMap(resourceName).size();
- if (extViewSize != bestPossStateSize) {
- LOG.info("exterView size (" + extViewSize + ") is different from bestPossState size ("
- + bestPossStateSize + ") for resource: " + resourceName);
-
- // System.err.println("exterView size (" + extViewSize
- // + ") is different from bestPossState size (" + bestPossStateSize
- // + ") for resource: " + resourceName);
- // System.out.println("extView: " + extView.getRecord().getMapFields());
- // System.out.println("bestPossState: " +
- // bestPossOutput.getResourceMap(resourceName));
- return false;
- }
-
- // step 2: every entry in external view is contained in best possible state
- for (String partition : extView.getRecord().getMapFields().keySet()) {
- Map<String, String> evInstanceStateMap = extView.getRecord().getMapField(partition);
- Map<String, String> bpInstanceStateMap =
- bestPossOutput.getInstanceStateMap(resourceName, new Partition(partition));
-
- boolean result =
- ClusterStateVerifier.<String, String> compareMap(evInstanceStateMap,
- bpInstanceStateMap);
- if (result == false) {
- LOG.info("externalView is different from bestPossibleState for partition:" + partition);
-
- // System.err.println("externalView is different from bestPossibleState for partition: "
- // + partition + ", actual: " + evInstanceStateMap + ", bestPoss: " +
- // bpInstanceStateMap);
- return false;
- }
- }
- }
- return true;
- } catch (Exception e) {
- LOG.error("exception in verification", e);
- return false;
- }
-
- }
-
- static boolean verifyMasterNbInExtView(HelixDataAccessor accessor) {
- Builder keyBuilder = accessor.keyBuilder();
-
- Map<String, IdealState> idealStates = accessor.getChildValuesMap(keyBuilder.idealStates());
- if (idealStates == null || idealStates.size() == 0) {
- LOG.info("No resource idealState");
- return true;
- }
-
- Map<String, ExternalView> extViews = accessor.getChildValuesMap(keyBuilder.externalViews());
- if (extViews == null || extViews.size() < idealStates.size()) {
- LOG.info("No externalViews | externalView.size() < idealState.size()");
- return false;
- }
-
- for (String resource : extViews.keySet()) {
- int partitions = idealStates.get(resource).getNumPartitions();
- Map<String, Map<String, String>> instanceStateMap =
- extViews.get(resource).getRecord().getMapFields();
- if (instanceStateMap.size() < partitions) {
- LOG.info("Number of externalViews (" + instanceStateMap.size() + ") < partitions ("
- + partitions + ")");
- return false;
- }
-
- for (String partition : instanceStateMap.keySet()) {
- boolean foundMaster = false;
- for (String instance : instanceStateMap.get(partition).keySet()) {
- if (instanceStateMap.get(partition).get(instance).equalsIgnoreCase("MASTER")) {
- foundMaster = true;
- break;
- }
- }
- if (!foundMaster) {
- LOG.info("No MASTER for partition: " + partition);
- return false;
- }
- }
- }
- return true;
- }
-
- static void runStage(ClusterEvent event, Stage stage) throws Exception {
- StageContext context = new StageContext();
- stage.init(context);
- stage.preProcess();
- stage.process(event);
- stage.postProcess();
- }
-
- /**
- * calculate the best possible state note that DROPPED states are not checked since when
- * kick off the BestPossibleStateCalcStage we are providing an empty current state map
- * @param cache
- * @return
- * @throws Exception
- */
- static BestPossibleStateOutput calcBestPossState(ClusterDataCache cache) throws Exception {
- return calcBestPossState(cache, null);
- }
-
- static BestPossibleStateOutput calcBestPossState(ClusterDataCache cache, Set<String> resources)
- throws Exception {
- ClusterEvent event = new ClusterEvent("sampleEvent");
- event.addAttribute("ClusterDataCache", cache);
-
- ResourceComputationStage rcState = new ResourceComputationStage();
- CurrentStateComputationStage csStage = new CurrentStateComputationStage();
- BestPossibleStateCalcStage bpStage = new BestPossibleStateCalcStage();
-
- runStage(event, rcState);
-
- // Filter resources if specified
- if (resources != null) {
- Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
- resourceMap.keySet().retainAll(resources);
- }
-
- runStage(event, csStage);
- runStage(event, bpStage);
-
- BestPossibleStateOutput output =
- event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
-
- // System.out.println("output:" + output);
- return output;
- }
-
- public static <K, V> boolean compareMap(Map<K, V> map1, Map<K, V> map2) {
- boolean isEqual = true;
- if (map1 == null && map2 == null) {
- // OK
- } else if (map1 == null && map2 != null) {
- if (!map2.isEmpty()) {
- isEqual = false;
- }
- } else if (map1 != null && map2 == null) {
- if (!map1.isEmpty()) {
- isEqual = false;
- }
- } else {
- // verify size
- if (map1.size() != map2.size()) {
- isEqual = false;
- }
- // verify each <key, value> in map1 is contained in map2
- for (K key : map1.keySet()) {
- if (!map1.get(key).equals(map2.get(key))) {
- LOG.debug("different value for key: " + key + "(map1: " + map1.get(key) + ", map2: "
- + map2.get(key) + ")");
- isEqual = false;
- break;
- }
- }
- }
- return isEqual;
- }
-
- public static boolean verifyByPolling(Verifier verifier) {
- return verifyByPolling(verifier, 30 * 1000);
- }
-
- public static boolean verifyByPolling(Verifier verifier, long timeout) {
- return verifyByPolling(verifier, timeout, 1000);
- }
-
- public static boolean verifyByPolling(Verifier verifier, long timeout, long period) {
- long startTime = System.currentTimeMillis();
- boolean result = false;
- try {
- long curTime;
- do {
- Thread.sleep(period);
- result = verifier.verify();
- if (result == true) {
- break;
- }
- curTime = System.currentTimeMillis();
- } while (curTime <= startTime + timeout);
- return result;
- } catch (Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } finally {
- long endTime = System.currentTimeMillis();
-
- // debug
- System.err.println(result + ": " + verifier + ": wait " + (endTime - startTime)
- + "ms to verify");
-
- }
- return false;
- }
-
- public static boolean verifyByZkCallback(ZkVerifier verifier) {
- return verifyByZkCallback(verifier, 30000);
- }
-
- /**
- * This function should be always single threaded
- *
- * @param verifier
- * @param timeout
- * @return
- */
- public static boolean verifyByZkCallback(ZkVerifier verifier, long timeout) {
- long startTime = System.currentTimeMillis();
- CountDownLatch countDown = new CountDownLatch(1);
- ZkClient zkClient = verifier.getZkClient();
- String clusterName = verifier.getClusterName();
-
- // add an ephemeral node to /{clusterName}/CONFIGS/CLUSTER/verify
- // so when analyze zk log, we know when a test ends
- try {
- zkClient.createEphemeral("/" + clusterName + "/CONFIGS/CLUSTER/verify");
- } catch (ZkNodeExistsException ex) {
- LOG.error("There is already a verification in progress", ex);
- throw ex;
- }
-
- ExtViewVeriferZkListener listener = new ExtViewVeriferZkListener(countDown, zkClient, verifier);
-
- String extViewPath = PropertyPathBuilder.getPath(PropertyType.EXTERNALVIEW, clusterName);
- zkClient.subscribeChildChanges(extViewPath, listener);
- for (String child : zkClient.getChildren(extViewPath)) {
- String childPath = extViewPath.equals("/") ? extViewPath + child : extViewPath + "/" + child;
- zkClient.subscribeDataChanges(childPath, listener);
- }
-
- // do initial verify
- boolean result = verifier.verify();
- if (result == false) {
- try {
- result = countDown.await(timeout, TimeUnit.MILLISECONDS);
- if (result == false) {
- // make a final try if timeout
- result = verifier.verify();
- }
- } catch (Exception e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
- }
-
- // clean up
- zkClient.unsubscribeChildChanges(extViewPath, listener);
- for (String child : zkClient.getChildren(extViewPath)) {
- String childPath = extViewPath.equals("/") ? extViewPath + child : extViewPath + "/" + child;
- zkClient.unsubscribeDataChanges(childPath, listener);
- }
-
- long endTime = System.currentTimeMillis();
-
- zkClient.delete("/" + clusterName + "/CONFIGS/CLUSTER/verify");
- // debug
- System.err.println(result + ": wait " + (endTime - startTime) + "ms, " + verifier);
-
- return result;
- }
-
- @SuppressWarnings("static-access")
- private static Options constructCommandLineOptions() {
- Option helpOption =
- OptionBuilder.withLongOpt(help).withDescription("Prints command-line options info")
- .create();
-
- Option zkServerOption =
- OptionBuilder.withLongOpt(zkServerAddress).withDescription("Provide zookeeper address")
- .create();
- zkServerOption.setArgs(1);
- zkServerOption.setRequired(true);
- zkServerOption.setArgName("ZookeeperServerAddress(Required)");
-
- Option clusterOption =
- OptionBuilder.withLongOpt(cluster).withDescription("Provide cluster name").create();
- clusterOption.setArgs(1);
- clusterOption.setRequired(true);
- clusterOption.setArgName("Cluster name (Required)");
-
- Option timeoutOption =
- OptionBuilder.withLongOpt(timeout).withDescription("Timeout value for verification")
- .create();
- timeoutOption.setArgs(1);
- timeoutOption.setArgName("Timeout value (Optional), default=30s");
-
- Option sleepIntervalOption =
- OptionBuilder.withLongOpt(period).withDescription("Polling period for verification")
- .create();
- sleepIntervalOption.setArgs(1);
- sleepIntervalOption.setArgName("Polling period value (Optional), default=1s");
-
- Option resourcesOption =
- OptionBuilder.withLongOpt(resources).withDescription("Specific set of resources to verify")
- .create();
- resourcesOption.setArgs(1);
- resourcesOption.setArgName("Comma-separated resource names, default is all resources");
-
- Options options = new Options();
- options.addOption(helpOption);
- options.addOption(zkServerOption);
- options.addOption(clusterOption);
- options.addOption(timeoutOption);
- options.addOption(sleepIntervalOption);
- options.addOption(resourcesOption);
-
- return options;
- }
-
- public static void printUsage(Options cliOptions) {
- HelpFormatter helpFormatter = new HelpFormatter();
- helpFormatter.setWidth(1000);
- helpFormatter.printHelp("java " + ClusterSetup.class.getName(), cliOptions);
- }
-
- public static CommandLine processCommandLineArgs(String[] cliArgs) {
- CommandLineParser cliParser = new GnuParser();
- Options cliOptions = constructCommandLineOptions();
- // CommandLine cmd = null;
-
- try {
- return cliParser.parse(cliOptions, cliArgs);
- } catch (ParseException pe) {
- System.err.println("CommandLineClient: failed to parse command-line options: "
- + pe.toString());
- printUsage(cliOptions);
- System.exit(1);
- }
- return null;
- }
-
- public static boolean verifyState(String[] args) {
- // TODO Auto-generated method stub
- String clusterName = "storage-cluster";
- String zkServer = "localhost:2181";
- long timeoutValue = 0;
- long periodValue = 1000;
-
- Set<String> resourceSet = null;
- if (args.length > 0) {
- CommandLine cmd = processCommandLineArgs(args);
- zkServer = cmd.getOptionValue(zkServerAddress);
- clusterName = cmd.getOptionValue(cluster);
- String timeoutStr = cmd.getOptionValue(timeout);
- String periodStr = cmd.getOptionValue(period);
- String resourceStr = cmd.getOptionValue(resources);
-
- if (timeoutStr != null) {
- try {
- timeoutValue = Long.parseLong(timeoutStr);
- } catch (Exception e) {
- System.err.println("Exception in converting " + timeoutStr + " to long. Use default (0)");
- }
- }
-
- if (periodStr != null) {
- try {
- periodValue = Long.parseLong(periodStr);
- } catch (Exception e) {
- System.err.println("Exception in converting " + periodStr
- + " to long. Use default (1000)");
- }
- }
-
- // Allow specifying resources explicitly
- if (resourceStr != null) {
- String[] resources = resourceStr.split("[\\s,]");
- resourceSet = Sets.newHashSet(resources);
- }
-
- }
- // return verifyByPolling(new BestPossAndExtViewZkVerifier(zkServer, clusterName),
- // timeoutValue,
- // periodValue);
-
- ZkVerifier verifier;
- if (resourceSet == null) {
- verifier = new BestPossAndExtViewZkVerifier(zkServer, clusterName);
- } else {
- verifier = new BestPossAndExtViewZkVerifier(zkServer, clusterName, null, resourceSet);
- }
- return verifyByZkCallback(verifier, timeoutValue);
- }
-
- public static void main(String[] args) {
- boolean result = verifyState(args);
- System.out.println(result ? "Successful" : "failed");
- System.exit(1);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/695228e0/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/ClusterExternalViewVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/ClusterExternalViewVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/ClusterExternalViewVerifier.java
new file mode 100644
index 0000000..4ec882e
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/ClusterExternalViewVerifier.java
@@ -0,0 +1,171 @@
+package org.apache.helix.tools.ClusterStateVerifier;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.controller.pipeline.Stage;
+import org.apache.helix.controller.pipeline.StageContext;
+import org.apache.helix.controller.stages.AttributeName;
+import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
+import org.apache.helix.controller.stages.BestPossibleStateOutput;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.controller.stages.ClusterEvent;
+import org.apache.helix.controller.stages.CurrentStateComputationStage;
+import org.apache.helix.controller.stages.ResourceComputationStage;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.Partition;
+import org.apache.log4j.Logger;
+
+/**
+ * given zk, cluster, and a list of expected live-instances
+ * check whether cluster's external-view reaches best-possible states
+ *
+ */
+public class ClusterExternalViewVerifier extends ClusterVerifier {
+ private static Logger LOG = Logger.getLogger(ClusterExternalViewVerifier.class);
+
+ final List<String> _expectSortedLiveNodes; // always sorted
+
+ public ClusterExternalViewVerifier(ZkClient zkclient, String clusterName,
+ List<String> expectLiveNodes) {
+ super(zkclient, clusterName);
+ _expectSortedLiveNodes = expectLiveNodes;
+ Collections.sort(_expectSortedLiveNodes);
+ }
+
+ boolean verifyLiveNodes(List<String> actualLiveNodes) {
+ Collections.sort(actualLiveNodes);
+ return _expectSortedLiveNodes.equals(actualLiveNodes);
+ }
+
+ /**
+ * @param externalView
+ * @param bestPossibleState map of partition to map of instance to state
+ * @return
+ */
+ boolean verifyExternalView(ExternalView externalView,
+ Map<Partition, Map<String, String>> bestPossibleState) {
+ Map<String, Map<String, String>> bestPossibleStateMap =
+ convertBestPossibleState(bestPossibleState);
+ // trimBestPossibleState(bestPossibleStateMap);
+
+ Map<String, Map<String, String>> externalViewMap = externalView.getRecord().getMapFields();
+ return externalViewMap.equals(bestPossibleStateMap);
+ }
+
+ static void runStage(ClusterEvent event, Stage stage) throws Exception {
+ StageContext context = new StageContext();
+ stage.init(context);
+ stage.preProcess();
+ stage.process(event);
+ stage.postProcess();
+ }
+
+ BestPossibleStateOutput calculateBestPossibleState(ClusterDataCache cache) throws Exception {
+ ClusterEvent event = new ClusterEvent("event");
+ event.addAttribute("ClusterDataCache", cache);
+
+ List<Stage> stages = new ArrayList<Stage>();
+ stages.add(new ResourceComputationStage());
+ stages.add(new CurrentStateComputationStage());
+ stages.add(new BestPossibleStateCalcStage());
+
+ for (Stage stage : stages) {
+ runStage(event, stage);
+ }
+
+ return event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
+ }
+
+ /**
+ * remove empty map and DROPPED state from best possible state
+ * @param bestPossibleState
+ */
+ // static void trimBestPossibleState(Map<String, Map<String, String>> bestPossibleState) {
+ // Iterator<Entry<String, Map<String, String>>> iter = bestPossibleState.entrySet().iterator();
+ // while (iter.hasNext()) {
+ // Map.Entry<String, Map<String, String>> entry = iter.next();
+ // Map<String, String> instanceStateMap = entry.getValue();
+ // if (instanceStateMap.isEmpty()) {
+ // iter.remove();
+ // } else {
+ // // remove instances with DROPPED state
+ // Iterator<Map.Entry<String, String>> insIter = instanceStateMap.entrySet().iterator();
+ // while (insIter.hasNext()) {
+ // Map.Entry<String, String> insEntry = insIter.next();
+ // String state = insEntry.getValue();
+ // if (state.equalsIgnoreCase(HelixDefinedState.DROPPED.toString())) {
+ // insIter.remove();
+ // }
+ // }
+ // }
+ // }
+ // }
+
+ static Map<String, Map<String, String>> convertBestPossibleState(
+ Map<Partition, Map<String, String>> bestPossibleState) {
+ Map<String, Map<String, String>> result = new HashMap<String, Map<String, String>>();
+ for (Partition partition : bestPossibleState.keySet()) {
+ result.put(partition.getPartitionName(), bestPossibleState.get(partition));
+ }
+ return result;
+ }
+
+ @Override
+ public boolean verify() throws Exception {
+ ClusterDataCache cache = new ClusterDataCache();
+ cache.refresh(_accessor);
+
+ List<String> liveInstances = new ArrayList<String>();
+ liveInstances.addAll(cache.getLiveInstances().keySet());
+ boolean success = verifyLiveNodes(liveInstances);
+ if (!success) {
+ LOG.info("liveNodes not match, expect: " + _expectSortedLiveNodes + ", actual: "
+ + liveInstances);
+ return false;
+ }
+
+ BestPossibleStateOutput bestPossbileStates = calculateBestPossibleState(cache);
+ Map<String, ExternalView> externalViews =
+ _accessor.getChildValuesMap(_keyBuilder.externalViews());
+
+ // TODO all ideal-states should be included in external-views
+
+ for (String resourceName : externalViews.keySet()) {
+ ExternalView externalView = externalViews.get(resourceName);
+ Map<Partition, Map<String, String>> bestPossbileState =
+ bestPossbileStates.getResourceMap(resourceName);
+ success = verifyExternalView(externalView, bestPossbileState);
+ if (!success) {
+ LOG.info("external-view for resource: " + resourceName + " not match");
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/695228e0/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/ClusterLiveNodesVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/ClusterLiveNodesVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/ClusterLiveNodesVerifier.java
new file mode 100644
index 0000000..5c502e0
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/ClusterLiveNodesVerifier.java
@@ -0,0 +1,45 @@
+package org.apache.helix.tools.ClusterStateVerifier;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.helix.manager.zk.ZkClient;
+
+public class ClusterLiveNodesVerifier extends ClusterVerifier {
+
+ final List<String> _expectSortedLiveNodes; // always sorted
+
+ public ClusterLiveNodesVerifier(ZkClient zkclient, String clusterName,
+ List<String> expectLiveNodes) {
+ super(zkclient, clusterName);
+ _expectSortedLiveNodes = expectLiveNodes;
+ Collections.sort(_expectSortedLiveNodes);
+ }
+
+ @Override
+ public boolean verify() throws Exception {
+ List<String> actualLiveNodes = _accessor.getChildNames(_keyBuilder.liveInstances());
+ Collections.sort(actualLiveNodes);
+ return _expectSortedLiveNodes.equals(actualLiveNodes);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/695228e0/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/ClusterStateVerifier.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/ClusterStateVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/ClusterStateVerifier.java
new file mode 100644
index 0000000..f399f1f
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier/ClusterStateVerifier.java
@@ -0,0 +1,734 @@
+package org.apache.helix.tools.ClusterStateVerifier;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.I0Itec.zkclient.IZkChildListener;
+import org.I0Itec.zkclient.IZkDataListener;
+import org.I0Itec.zkclient.exception.ZkNodeExistsException;
+import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.CommandLineParser;
+import org.apache.commons.cli.GnuParser;
+import org.apache.commons.cli.HelpFormatter;
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.cli.ParseException;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixDefinedState;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.PropertyPathBuilder;
+import org.apache.helix.PropertyType;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.controller.pipeline.Stage;
+import org.apache.helix.controller.pipeline.StageContext;
+import org.apache.helix.controller.stages.AttributeName;
+import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
+import org.apache.helix.controller.stages.BestPossibleStateOutput;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.controller.stages.ClusterEvent;
+import org.apache.helix.controller.stages.CurrentStateComputationStage;
+import org.apache.helix.controller.stages.ResourceComputationStage;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.Resource;
+import org.apache.helix.task.TaskConstants;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.util.ZKClientPool;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Sets;
+
+public class ClusterStateVerifier {
+ public static String cluster = "cluster";
+ public static String zkServerAddress = "zkSvr";
+ public static String help = "help";
+ public static String timeout = "timeout";
+ public static String period = "period";
+ public static String resources = "resources";
+
+ private static Logger LOG = Logger.getLogger(ClusterStateVerifier.class);
+
+ public interface Verifier {
+ boolean verify();
+ }
+
+ public interface ZkVerifier extends Verifier {
+ ZkClient getZkClient();
+
+ String getClusterName();
+ }
+
+ static class ExtViewVeriferZkListener implements IZkChildListener, IZkDataListener {
+ final CountDownLatch _countDown;
+ final ZkClient _zkClient;
+ final Verifier _verifier;
+
+ public ExtViewVeriferZkListener(CountDownLatch countDown, ZkClient zkClient, ZkVerifier verifier) {
+ _countDown = countDown;
+ _zkClient = zkClient;
+ _verifier = verifier;
+ }
+
+ @Override
+ public void handleDataChange(String dataPath, Object data) throws Exception {
+ boolean result = _verifier.verify();
+ if (result == true) {
+ _countDown.countDown();
+ }
+ }
+
+ @Override
+ public void handleDataDeleted(String dataPath) throws Exception {
+ // TODO Auto-generated method stub
+
+ }
+
+ @Override
+ public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
+ for (String child : currentChilds) {
+ String childPath = parentPath.equals("/") ? parentPath + child : parentPath + "/" + child;
+ _zkClient.subscribeDataChanges(childPath, this);
+ }
+
+ boolean result = _verifier.verify();
+ if (result == true) {
+ _countDown.countDown();
+ }
+ }
+ }
+
+ private static ZkClient validateAndGetClient(String zkAddr, String clusterName) {
+ if (zkAddr == null || clusterName == null) {
+ throw new IllegalArgumentException("requires zkAddr|clusterName");
+ }
+ return ZKClientPool.getZkClient(zkAddr);
+ }
+
+ public static class BestPossAndExtViewZkVerifier implements ZkVerifier {
+ private final String clusterName;
+ private final Map<String, Map<String, String>> errStates;
+ private final ZkClient zkClient;
+ private final Set<String> resources;
+
+ public BestPossAndExtViewZkVerifier(String zkAddr, String clusterName) {
+ this(zkAddr, clusterName, null);
+ }
+
+ public BestPossAndExtViewZkVerifier(String zkAddr, String clusterName,
+ Map<String, Map<String, String>> errStates) {
+ this(zkAddr, clusterName, errStates, null);
+ }
+
+ public BestPossAndExtViewZkVerifier(String zkAddr, String clusterName,
+ Map<String, Map<String, String>> errStates, Set<String> resources) {
+ this(validateAndGetClient(zkAddr, clusterName), clusterName, errStates, resources);
+ }
+
+ public BestPossAndExtViewZkVerifier(ZkClient zkClient, String clusterName,
+ Map<String, Map<String, String>> errStates, Set<String> resources) {
+ if (zkClient == null || clusterName == null) {
+ throw new IllegalArgumentException("requires zkClient|clusterName");
+ }
+ this.clusterName = clusterName;
+ this.errStates = errStates;
+ this.zkClient = zkClient;
+ this.resources = resources;
+ }
+
+ @Override
+ public boolean verify() {
+ try {
+ HelixDataAccessor accessor =
+ new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(zkClient));
+
+ return verifyBestPossAndExtView(accessor, errStates, clusterName, resources);
+ } catch (Exception e) {
+ LOG.error("exception in verification", e);
+ }
+ return false;
+ }
+
+ private boolean verifyBestPossAndExtView(HelixDataAccessor accessor,
+ Map<String, Map<String, String>> errStates, String clusterName, Set<String> resources) {
+ try {
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ // read cluster once and do verification
+ ClusterDataCache cache = new ClusterDataCache();
+ cache.refresh(accessor);
+
+ Map<String, IdealState> idealStates = cache.getIdealStates();
+ if (idealStates == null) {
+ // ideal state is null because ideal state is dropped
+ idealStates = Collections.emptyMap();
+ }
+
+ // filter out all resources that use Task state model
+ Iterator<Map.Entry<String, IdealState>> it = idealStates.entrySet().iterator();
+ while (it.hasNext()) {
+ Map.Entry<String, IdealState> pair = it.next();
+ if (pair.getValue().getStateModelDefRef().equals(TaskConstants.STATE_MODEL_NAME)) {
+ it.remove();
+ }
+ }
+
+ Map<String, ExternalView> extViews = accessor.getChildValuesMap(keyBuilder.externalViews());
+ if (extViews == null) {
+ extViews = Collections.emptyMap();
+ }
+
+ // Filter resources if requested
+ if (resources != null && !resources.isEmpty()) {
+ idealStates.keySet().retainAll(resources);
+ extViews.keySet().retainAll(resources);
+ }
+
+ // if externalView is not empty and idealState doesn't exist
+ // add empty idealState for the resource
+ for (String resource : extViews.keySet()) {
+ if (!idealStates.containsKey(resource)) {
+ idealStates.put(resource, new IdealState(resource));
+ }
+ }
+
+ // calculate best possible state
+ BestPossibleStateOutput bestPossOutput = calcBestPossState(cache, resources);
+ Map<String, Map<Partition, Map<String, String>>> bestPossStateMap =
+ bestPossOutput.getStateMap();
+
+ // set error states
+ if (errStates != null) {
+ for (String resourceName : errStates.keySet()) {
+ Map<String, String> partErrStates = errStates.get(resourceName);
+ for (String partitionName : partErrStates.keySet()) {
+ String instanceName = partErrStates.get(partitionName);
+
+ if (!bestPossStateMap.containsKey(resourceName)) {
+ bestPossStateMap.put(resourceName, new HashMap<Partition, Map<String, String>>());
+ }
+ Partition partition = new Partition(partitionName);
+ if (!bestPossStateMap.get(resourceName).containsKey(partition)) {
+ bestPossStateMap.get(resourceName).put(partition, new HashMap<String, String>());
+ }
+ bestPossStateMap.get(resourceName).get(partition)
+ .put(instanceName, HelixDefinedState.ERROR.toString());
+ }
+ }
+ }
+
+ // System.out.println("stateMap: " + bestPossStateMap);
+
+ for (String resourceName : idealStates.keySet()) {
+ ExternalView extView = extViews.get(resourceName);
+ if (extView == null) {
+ IdealState is = idealStates.get(resourceName);
+ if (is.isExternalViewDisabled()) {
+ continue;
+ } else {
+ LOG.info("externalView for " + resourceName + " is not available");
+ return false;
+ }
+ }
+
+ // step 0: remove empty map and DROPPED state from best possible state
+ Map<Partition, Map<String, String>> bpStateMap =
+ bestPossOutput.getResourceMap(resourceName);
+ Iterator<Map.Entry<Partition, Map<String, String>>> iter = bpStateMap.entrySet().iterator();
+ while (iter.hasNext()) {
+ Map.Entry<Partition, Map<String, String>> entry = iter.next();
+ Map<String, String> instanceStateMap = entry.getValue();
+ if (instanceStateMap.isEmpty()) {
+ iter.remove();
+ } else {
+ // remove instances with DROPPED state
+ Iterator<Map.Entry<String, String>> insIter = instanceStateMap.entrySet().iterator();
+ while (insIter.hasNext()) {
+ Map.Entry<String, String> insEntry = insIter.next();
+ String state = insEntry.getValue();
+ if (state.equalsIgnoreCase(HelixDefinedState.DROPPED.toString())) {
+ insIter.remove();
+ }
+ }
+ }
+ }
+
+ // System.err.println("resource: " + resourceName + ", bpStateMap: " + bpStateMap);
+
+ // step 1: externalView and bestPossibleState has equal size
+ int extViewSize = extView.getRecord().getMapFields().size();
+ int bestPossStateSize = bestPossOutput.getResourceMap(resourceName).size();
+ if (extViewSize != bestPossStateSize) {
+ LOG.info("exterView size (" + extViewSize + ") is different from bestPossState size ("
+ + bestPossStateSize + ") for resource: " + resourceName);
+
+ // System.err.println("exterView size (" + extViewSize
+ // + ") is different from bestPossState size (" + bestPossStateSize
+ // + ") for resource: " + resourceName);
+ // System.out.println("extView: " + extView.getRecord().getMapFields());
+ // System.out.println("bestPossState: " +
+ // bestPossOutput.getResourceMap(resourceName));
+ return false;
+ }
+
+ // step 2: every entry in external view is contained in best possible state
+ for (String partition : extView.getRecord().getMapFields().keySet()) {
+ Map<String, String> evInstanceStateMap = extView.getRecord().getMapField(partition);
+ Map<String, String> bpInstanceStateMap =
+ bestPossOutput.getInstanceStateMap(resourceName, new Partition(partition));
+
+ boolean result = compareMap(evInstanceStateMap, bpInstanceStateMap);
+ if (result == false) {
+ LOG.info("externalView is different from bestPossibleState for partition:" + partition);
+
+ // System.err.println("externalView is different from bestPossibleState for partition: "
+ // + partition + ", actual: " + evInstanceStateMap + ", bestPoss: " +
+ // bpInstanceStateMap);
+ return false;
+ }
+ }
+ }
+ return true;
+ } catch (Exception e) {
+ LOG.error("exception in verification", e);
+ return false;
+ }
+ }
+
+ /**
+ * calculate the best possible state note that DROPPED states are not checked since when
+ * kick off the BestPossibleStateCalcStage we are providing an empty current state map
+ *
+ * @param cache
+ * @return
+ * @throws Exception
+ */
+ private BestPossibleStateOutput calcBestPossState(ClusterDataCache cache, Set<String> resources)
+ throws Exception {
+ ClusterEvent event = new ClusterEvent("sampleEvent");
+ event.addAttribute("ClusterDataCache", cache);
+
+ ResourceComputationStage rcState = new ResourceComputationStage();
+ CurrentStateComputationStage csStage = new CurrentStateComputationStage();
+ BestPossibleStateCalcStage bpStage = new BestPossibleStateCalcStage();
+
+ runStage(event, rcState);
+
+ // Filter resources if specified
+ if (resources != null) {
+ Map<String, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+ resourceMap.keySet().retainAll(resources);
+ }
+
+ runStage(event, csStage);
+ runStage(event, bpStage);
+
+ BestPossibleStateOutput output =
+ event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
+
+ // System.out.println("output:" + output);
+ return output;
+ }
+
+ private void runStage(ClusterEvent event, Stage stage) throws Exception {
+ StageContext context = new StageContext();
+ stage.init(context);
+ stage.preProcess();
+ stage.process(event);
+ stage.postProcess();
+ }
+
+ private <K, V> boolean compareMap(Map<K, V> map1, Map<K, V> map2) {
+ boolean isEqual = true;
+ if (map1 == null && map2 == null) {
+ // OK
+ } else if (map1 == null && map2 != null) {
+ if (!map2.isEmpty()) {
+ isEqual = false;
+ }
+ } else if (map1 != null && map2 == null) {
+ if (!map1.isEmpty()) {
+ isEqual = false;
+ }
+ } else {
+ // verify size
+ if (map1.size() != map2.size()) {
+ isEqual = false;
+ }
+ // verify each <key, value> in map1 is contained in map2
+ for (K key : map1.keySet()) {
+ if (!map1.get(key).equals(map2.get(key))) {
+ LOG.debug(
+ "different value for key: " + key + "(map1: " + map1.get(key) + ", map2: " + map2
+ .get(key) + ")");
+ isEqual = false;
+ break;
+ }
+ }
+ }
+ return isEqual;
+ }
+
+ @Override
+ public ZkClient getZkClient() {
+ return zkClient;
+ }
+
+ @Override
+ public String getClusterName() {
+ return clusterName;
+ }
+
+ @Override
+ public String toString() {
+ String verifierName = getClass().getName();
+ verifierName = verifierName.substring(verifierName.lastIndexOf('.') + 1, verifierName.length());
+ return verifierName + "(" + clusterName + "@" + zkClient.getServers() + ")";
+ }
+ }
+
+
+ public static class MasterNbInExtViewVerifier implements ZkVerifier {
+ private final String clusterName;
+ private final ZkClient zkClient;
+
+ public MasterNbInExtViewVerifier(String zkAddr, String clusterName) {
+ this(validateAndGetClient(zkAddr, clusterName), clusterName);
+ }
+
+ public MasterNbInExtViewVerifier(ZkClient zkClient, String clusterName) {
+ if (zkClient == null || clusterName == null) {
+ throw new IllegalArgumentException("requires zkClient|clusterName");
+ }
+ this.clusterName = clusterName;
+ this.zkClient = zkClient;
+ }
+
+ @Override
+ public boolean verify() {
+ try {
+ ZKHelixDataAccessor accessor =
+ new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(zkClient));
+
+ return verifyMasterNbInExtView(accessor);
+ } catch (Exception e) {
+ LOG.error("exception in verification", e);
+ }
+ return false;
+ }
+
+ @Override
+ public ZkClient getZkClient() {
+ return zkClient;
+ }
+
+ @Override
+ public String getClusterName() {
+ return clusterName;
+ }
+
+ private boolean verifyMasterNbInExtView(HelixDataAccessor accessor) {
+ Builder keyBuilder = accessor.keyBuilder();
+
+ Map<String, IdealState> idealStates = accessor.getChildValuesMap(keyBuilder.idealStates());
+ if (idealStates == null || idealStates.size() == 0) {
+ LOG.info("No resource idealState");
+ return true;
+ }
+
+ Map<String, ExternalView> extViews = accessor.getChildValuesMap(keyBuilder.externalViews());
+ if (extViews == null || extViews.size() < idealStates.size()) {
+ LOG.info("No externalViews | externalView.size() < idealState.size()");
+ return false;
+ }
+
+ for (String resource : extViews.keySet()) {
+ int partitions = idealStates.get(resource).getNumPartitions();
+ Map<String, Map<String, String>> instanceStateMap =
+ extViews.get(resource).getRecord().getMapFields();
+ if (instanceStateMap.size() < partitions) {
+ LOG.info("Number of externalViews (" + instanceStateMap.size() + ") < partitions ("
+ + partitions + ")");
+ return false;
+ }
+
+ for (String partition : instanceStateMap.keySet()) {
+ boolean foundMaster = false;
+ for (String instance : instanceStateMap.get(partition).keySet()) {
+ if (instanceStateMap.get(partition).get(instance).equalsIgnoreCase("MASTER")) {
+ foundMaster = true;
+ break;
+ }
+ }
+ if (!foundMaster) {
+ LOG.info("No MASTER for partition: " + partition);
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+ }
+
+ public static boolean verifyByPolling(Verifier verifier) {
+ return verifyByPolling(verifier, 30 * 1000);
+ }
+
+ public static boolean verifyByPolling(Verifier verifier, long timeout) {
+ return verifyByPolling(verifier, timeout, 1000);
+ }
+
+ public static boolean verifyByPolling(Verifier verifier, long timeout, long period) {
+ long startTime = System.currentTimeMillis();
+ boolean result = false;
+ try {
+ long curTime;
+ do {
+ Thread.sleep(period);
+ result = verifier.verify();
+ if (result == true) {
+ break;
+ }
+ curTime = System.currentTimeMillis();
+ } while (curTime <= startTime + timeout);
+ return result;
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ } finally {
+ long endTime = System.currentTimeMillis();
+
+ // debug
+ System.err.println(result + ": " + verifier + ": wait " + (endTime - startTime)
+ + "ms to verify");
+
+ }
+ return false;
+ }
+
+ public static boolean verifyByZkCallback(ZkVerifier verifier) {
+ return verifyByZkCallback(verifier, 30000);
+ }
+
+ /**
+ * This function should be always single threaded
+ *
+ * @param verifier
+ * @param timeout
+ * @return
+ */
+ public static boolean verifyByZkCallback(ZkVerifier verifier, long timeout) {
+ long startTime = System.currentTimeMillis();
+ CountDownLatch countDown = new CountDownLatch(1);
+ ZkClient zkClient = verifier.getZkClient();
+ String clusterName = verifier.getClusterName();
+
+ // add an ephemeral node to /{clusterName}/CONFIGS/CLUSTER/verify
+ // so when analyze zk log, we know when a test ends
+ try {
+ zkClient.createEphemeral("/" + clusterName + "/CONFIGS/CLUSTER/verify");
+ } catch (ZkNodeExistsException ex) {
+ LOG.error("There is already a verification in progress", ex);
+ throw ex;
+ }
+
+ ExtViewVeriferZkListener listener = new ExtViewVeriferZkListener(countDown, zkClient, verifier);
+
+ String extViewPath = PropertyPathBuilder.getPath(PropertyType.EXTERNALVIEW, clusterName);
+ zkClient.subscribeChildChanges(extViewPath, listener);
+ for (String child : zkClient.getChildren(extViewPath)) {
+ String childPath = extViewPath.equals("/") ? extViewPath + child : extViewPath + "/" + child;
+ zkClient.subscribeDataChanges(childPath, listener);
+ }
+
+ // do initial verify
+ boolean result = verifier.verify();
+ if (result == false) {
+ try {
+ result = countDown.await(timeout, TimeUnit.MILLISECONDS);
+ if (result == false) {
+ // make a final try if timeout
+ result = verifier.verify();
+ }
+ } catch (Exception e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+ // clean up
+ zkClient.unsubscribeChildChanges(extViewPath, listener);
+ for (String child : zkClient.getChildren(extViewPath)) {
+ String childPath = extViewPath.equals("/") ? extViewPath + child : extViewPath + "/" + child;
+ zkClient.unsubscribeDataChanges(childPath, listener);
+ }
+
+ long endTime = System.currentTimeMillis();
+
+ zkClient.delete("/" + clusterName + "/CONFIGS/CLUSTER/verify");
+ // debug
+ System.err.println(result + ": wait " + (endTime - startTime) + "ms, " + verifier);
+
+ return result;
+ }
+
+ @SuppressWarnings("static-access")
+ private static Options constructCommandLineOptions() {
+ Option helpOption =
+ OptionBuilder.withLongOpt(help).withDescription("Prints command-line options info")
+ .create();
+
+ Option zkServerOption =
+ OptionBuilder.withLongOpt(zkServerAddress).withDescription("Provide zookeeper address")
+ .create();
+ zkServerOption.setArgs(1);
+ zkServerOption.setRequired(true);
+ zkServerOption.setArgName("ZookeeperServerAddress(Required)");
+
+ Option clusterOption =
+ OptionBuilder.withLongOpt(cluster).withDescription("Provide cluster name").create();
+ clusterOption.setArgs(1);
+ clusterOption.setRequired(true);
+ clusterOption.setArgName("Cluster name (Required)");
+
+ Option timeoutOption =
+ OptionBuilder.withLongOpt(timeout).withDescription("Timeout value for verification")
+ .create();
+ timeoutOption.setArgs(1);
+ timeoutOption.setArgName("Timeout value (Optional), default=30s");
+
+ Option sleepIntervalOption =
+ OptionBuilder.withLongOpt(period).withDescription("Polling period for verification")
+ .create();
+ sleepIntervalOption.setArgs(1);
+ sleepIntervalOption.setArgName("Polling period value (Optional), default=1s");
+
+ Option resourcesOption =
+ OptionBuilder.withLongOpt(resources).withDescription("Specific set of resources to verify")
+ .create();
+ resourcesOption.setArgs(1);
+ resourcesOption.setArgName("Comma-separated resource names, default is all resources");
+
+ Options options = new Options();
+ options.addOption(helpOption);
+ options.addOption(zkServerOption);
+ options.addOption(clusterOption);
+ options.addOption(timeoutOption);
+ options.addOption(sleepIntervalOption);
+ options.addOption(resourcesOption);
+
+ return options;
+ }
+
+ public static void printUsage(Options cliOptions) {
+ HelpFormatter helpFormatter = new HelpFormatter();
+ helpFormatter.setWidth(1000);
+ helpFormatter.printHelp("java " + ClusterSetup.class.getName(), cliOptions);
+ }
+
+ public static CommandLine processCommandLineArgs(String[] cliArgs) {
+ CommandLineParser cliParser = new GnuParser();
+ Options cliOptions = constructCommandLineOptions();
+ // CommandLine cmd = null;
+
+ try {
+ return cliParser.parse(cliOptions, cliArgs);
+ } catch (ParseException pe) {
+ System.err.println("CommandLineClient: failed to parse command-line options: "
+ + pe.toString());
+ printUsage(cliOptions);
+ System.exit(1);
+ }
+ return null;
+ }
+
+ public static boolean verifyState(String[] args) {
+ // TODO Auto-generated method stub
+ String clusterName = "storage-cluster";
+ String zkServer = "localhost:2181";
+ long timeoutValue = 0;
+ long periodValue = 1000;
+
+ Set<String> resourceSet = null;
+ if (args.length > 0) {
+ CommandLine cmd = processCommandLineArgs(args);
+ zkServer = cmd.getOptionValue(zkServerAddress);
+ clusterName = cmd.getOptionValue(cluster);
+ String timeoutStr = cmd.getOptionValue(timeout);
+ String periodStr = cmd.getOptionValue(period);
+ String resourceStr = cmd.getOptionValue(resources);
+
+ if (timeoutStr != null) {
+ try {
+ timeoutValue = Long.parseLong(timeoutStr);
+ } catch (Exception e) {
+ System.err.println("Exception in converting " + timeoutStr + " to long. Use default (0)");
+ }
+ }
+
+ if (periodStr != null) {
+ try {
+ periodValue = Long.parseLong(periodStr);
+ } catch (Exception e) {
+ System.err.println("Exception in converting " + periodStr
+ + " to long. Use default (1000)");
+ }
+ }
+
+ // Allow specifying resources explicitly
+ if (resourceStr != null) {
+ String[] resources = resourceStr.split("[\\s,]");
+ resourceSet = Sets.newHashSet(resources);
+ }
+
+ }
+ // return verifyByPolling(new BestPossAndExtViewZkVerifier(zkServer, clusterName),
+ // timeoutValue,
+ // periodValue);
+
+ ZkVerifier verifier;
+ if (resourceSet == null) {
+ verifier = new BestPossAndExtViewZkVerifier(zkServer, clusterName);
+ } else {
+ verifier = new BestPossAndExtViewZkVerifier(zkServer, clusterName, null, resourceSet);
+ }
+ return verifyByZkCallback(verifier, timeoutValue);
+ }
+
+ public static void main(String[] args) {
+ boolean result = verifyState(args);
+ System.out.println(result ? "Successful" : "failed");
+ System.exit(1);
+ }
+
+}