You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/03/26 22:41:56 UTC
helix git commit: Fail rebalance pipeline and retry if the data load
from zookeeper fails in any read/batch-read calls.
Repository: helix
Updated Branches:
refs/heads/master c78f84284 -> 149831c8f
Fail rebalance pipeline and retry if the data load from zookeeper fails in any read/batch-read calls.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/149831c8
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/149831c8
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/149831c8
Branch: refs/heads/master
Commit: 149831c8f49ee893f4966c2d01c21c005cca9d0a
Parents: c78f842
Author: Lei Xia <lx...@linkedin.com>
Authored: Sat Mar 24 20:29:36 2018 -0700
Committer: Lei Xia <lx...@linkedin.com>
Committed: Mon Mar 26 14:50:31 2018 -0700
----------------------------------------------------------------------
.../java/org/apache/helix/HelixProperty.java | 2 +
.../HelixMetaDataAccessException.java | 41 +++++
.../common/caches/BasicClusterDataCache.java | 10 +-
.../helix/common/caches/CurrentStateCache.java | 4 +-
.../common/caches/InstanceMessagesCache.java | 4 +-
.../controller/GenericHelixController.java | 154 ++++++++++++++-----
.../controller/rebalancer/CustomRebalancer.java | 2 +-
.../controller/stages/ClusterDataCache.java | 14 +-
.../controller/stages/ClusterEventType.java | 1 +
.../helix/manager/zk/ZKHelixDataAccessor.java | 6 +-
.../helix/manager/zk/ZkBaseDataAccessor.java | 11 +-
.../manager/zk/ZkCacheBaseDataAccessor.java | 3 -
.../monitoring/mbeans/ClusterStatusMonitor.java | 20 ++-
.../mbeans/ClusterStatusMonitorMBean.java | 6 +
.../java/org/apache/helix/MockAccessor.java | 11 +-
.../TestClusterDataCacheSelectiveUpdate.java | 16 ++
.../manager/TestHelixDataAccessor.java | 52 +++++--
.../org/apache/helix/mock/MockZkClient.java | 92 ++---------
18 files changed, 284 insertions(+), 165 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/149831c8/helix-core/src/main/java/org/apache/helix/HelixProperty.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixProperty.java b/helix-core/src/main/java/org/apache/helix/HelixProperty.java
index f3805fe..2f3e68d 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixProperty.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixProperty.java
@@ -31,6 +31,8 @@ import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.helix.api.exceptions.HelixMetaDataAccessException;
+
/**
* A wrapper class for ZNRecord. Used as a base class for IdealState, CurrentState, etc.
*/
http://git-wip-us.apache.org/repos/asf/helix/blob/149831c8/helix-core/src/main/java/org/apache/helix/api/exceptions/HelixMetaDataAccessException.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/exceptions/HelixMetaDataAccessException.java b/helix-core/src/main/java/org/apache/helix/api/exceptions/HelixMetaDataAccessException.java
new file mode 100644
index 0000000..e8acf0c
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/api/exceptions/HelixMetaDataAccessException.java
@@ -0,0 +1,41 @@
+package org.apache.helix.api.exceptions;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixException;
+
+/**
+ * Class for an exception thrown by Helix due to Helix's failure to read or write some metadata from zookeeper.
+ */
+public class HelixMetaDataAccessException extends HelixException {
+ private static final long serialVersionUID = 6558251214364526258L;
+
+ public HelixMetaDataAccessException(String message) {
+ super(message);
+ }
+
+ public HelixMetaDataAccessException(Throwable cause) {
+ super(cause);
+ }
+
+ public HelixMetaDataAccessException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/149831c8/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java
index e3acf14..117d18a 100644
--- a/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java
@@ -62,7 +62,7 @@ public class BasicClusterDataCache {
*
* @return
*/
- public synchronized void refresh(HelixDataAccessor accessor) {
+ public void refresh(HelixDataAccessor accessor) {
LOG.info("START: BasicClusterDataCache.refresh() for cluster " + _clusterName);
long startTime = System.currentTimeMillis();
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
@@ -70,7 +70,7 @@ public class BasicClusterDataCache {
if (_propertyDataChangedMap.get(HelixConstants.ChangeType.EXTERNAL_VIEW)) {
long start = System.currentTimeMillis();
_propertyDataChangedMap.put(HelixConstants.ChangeType.EXTERNAL_VIEW, Boolean.valueOf(false));
- _externalViewMap = accessor.getChildValuesMap(keyBuilder.externalViews());
+ _externalViewMap = accessor.getChildValuesMap(keyBuilder.externalViews(), true);
LOG.info("Reload ExternalViews: " + _externalViewMap.keySet() + ". Takes " + (
System.currentTimeMillis() - start) + " ms");
}
@@ -78,7 +78,7 @@ public class BasicClusterDataCache {
if (_propertyDataChangedMap.get(HelixConstants.ChangeType.LIVE_INSTANCE)) {
long start = System.currentTimeMillis();
_propertyDataChangedMap.put(HelixConstants.ChangeType.LIVE_INSTANCE, Boolean.valueOf(false));
- _liveInstanceMap = accessor.getChildValuesMap(keyBuilder.liveInstances());
+ _liveInstanceMap = accessor.getChildValuesMap(keyBuilder.liveInstances(), true);
LOG.info("Reload LiveInstances: " + _liveInstanceMap.keySet() + ". Takes " + (
System.currentTimeMillis() - start) + " ms");
}
@@ -87,7 +87,7 @@ public class BasicClusterDataCache {
long start = System.currentTimeMillis();
_propertyDataChangedMap
.put(HelixConstants.ChangeType.INSTANCE_CONFIG, Boolean.valueOf(false));
- _instanceConfigMap = accessor.getChildValuesMap(keyBuilder.instanceConfigs());
+ _instanceConfigMap = accessor.getChildValuesMap(keyBuilder.instanceConfigs(), true);
LOG.info("Reload InstanceConfig: " + _instanceConfigMap.keySet() + ". Takes " + (
System.currentTimeMillis() - start) + " ms");
}
@@ -173,7 +173,7 @@ public class BasicClusterDataCache {
/**
* Indicate that a full read should be done on the next refresh
*/
- public synchronized void requireFullRefresh() {
+ public void requireFullRefresh() {
for(HelixConstants.ChangeType type : HelixConstants.ChangeType.values()) {
_propertyDataChangedMap.put(type, Boolean.valueOf(true));
}
http://git-wip-us.apache.org/repos/asf/helix/blob/149831c8/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateCache.java
index 5e69c1d..711a6d6 100644
--- a/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateCache.java
+++ b/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateCache.java
@@ -59,7 +59,7 @@ public class CurrentStateCache {
*
* @return
*/
- public synchronized boolean refresh(HelixDataAccessor accessor,
+ public boolean refresh(HelixDataAccessor accessor,
Map<String, LiveInstance> liveInstanceMap) {
LOG.info("START: CurrentStateCache.refresh()");
long startTime = System.currentTimeMillis();
@@ -143,7 +143,7 @@ public class CurrentStateCache {
}
}
- List<CurrentState> currentStates = accessor.getProperty(reloadKeys);
+ List<CurrentState> currentStates = accessor.getProperty(reloadKeys, true);
Iterator<PropertyKey> csKeyIter = reloadKeys.iterator();
for (CurrentState currentState : currentStates) {
PropertyKey key = csKeyIter.next();
http://git-wip-us.apache.org/repos/asf/helix/blob/149831c8/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java
index 438e3a6..69094d3 100644
--- a/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java
+++ b/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java
@@ -64,7 +64,7 @@ public class InstanceMessagesCache {
*
* @return
*/
- public synchronized boolean refresh(HelixDataAccessor accessor,
+ public boolean refresh(HelixDataAccessor accessor,
Map<String, LiveInstance> liveInstanceMap) {
LOG.info("START: InstanceMessagesCache.refresh()");
long startTime = System.currentTimeMillis();
@@ -108,7 +108,7 @@ public class InstanceMessagesCache {
// get the new messages
if (newMessageKeys.size() > 0) {
- List<Message> newMessages = accessor.getProperty(newMessageKeys);
+ List<Message> newMessages = accessor.getProperty(newMessageKeys, true);
for (Message message : newMessages) {
if (message != null) {
Map<String, Message> cachedMap = _messageCache.get(message.getTgtName());
http://git-wip-us.apache.org/repos/asf/helix/blob/149831c8/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index df5c86b..fd14e65 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -19,13 +19,15 @@ package org.apache.helix.controller;
* under the License.
*/
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
import org.I0Itec.zkclient.exception.ZkInterruptedException;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.NotificationContext;
import org.apache.helix.NotificationContext.Type;
import org.apache.helix.PropertyKey.Builder;
-import org.apache.helix.ZNRecord;
+import org.apache.helix.api.exceptions.HelixMetaDataAccessException;
import org.apache.helix.api.listeners.*;
import org.apache.helix.common.ClusterEventBlockingQueue;
import org.apache.helix.controller.pipeline.Pipeline;
@@ -39,7 +41,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.*;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
@@ -89,6 +90,8 @@ public class GenericHelixController implements IdealStateChangeListener,
private final ClusterEventBlockingQueue _taskEventQueue;
private final ClusterEventProcessor _taskEventThread;
+ private long _continousRebalanceFailureCount = 0;
+
/**
* The _paused flag is checked by function handleEvent(), while if the flag is set handleEvent()
* will be no-op. Other event handling logic keeps the same when the flag is set.
@@ -100,7 +103,13 @@ public class GenericHelixController implements IdealStateChangeListener,
* The timer that can periodically run the rebalancing pipeline. The timer will start if there is
* one resource group has the config to use the timer.
*/
- Timer _rebalanceTimer = null;
+ Timer _periodicalRebalanceTimer = null;
+
+ /**
+ * The timer to schedule ad hoc forced rebalance or retry rebalance event.
+ */
+ Timer _forceRebalanceTimer = null;
+
long _timerPeriod = Long.MAX_VALUE;
/**
@@ -108,7 +117,7 @@ public class GenericHelixController implements IdealStateChangeListener,
*/
private ClusterDataCache _cache;
private ClusterDataCache _taskCache;
- private ExecutorService _asyncTasksThreadPool;
+ private ScheduledExecutorService _asyncTasksThreadPool;
private String _clusterName;
@@ -134,41 +143,53 @@ public class GenericHelixController implements IdealStateChangeListener,
class RebalanceTask extends TimerTask {
HelixManager _manager;
+ ClusterEventType _clusterEventType;
- public RebalanceTask(HelixManager manager) {
+ public RebalanceTask(HelixManager manager, ClusterEventType clusterEventType) {
_manager = manager;
+ _clusterEventType = clusterEventType;
}
@Override
public void run() {
- //TODO: this is the temporary workaround
- _cache.requireFullRefresh();
- _taskCache.requireFullRefresh();
- _cache.refresh(_manager.getHelixDataAccessor());
- _taskCache.refresh(_manager.getHelixDataAccessor());
- if (_cache.getLiveInstances() != null) {
- NotificationContext changeContext = new NotificationContext(_manager);
- changeContext.setType(NotificationContext.Type.CALLBACK);
- synchronized (_manager) {
- checkLiveInstancesObservation(new ArrayList<>(_cache.getLiveInstances().values()),
- changeContext);
+ try {
+ if (_clusterEventType.equals(ClusterEventType.PeriodicalRebalance)) {
+ _cache.requireFullRefresh();
+ _taskCache.requireFullRefresh();
+ _cache.refresh(_manager.getHelixDataAccessor());
+ if (_cache.getLiveInstances() != null) {
+ NotificationContext changeContext = new NotificationContext(_manager);
+ changeContext.setType(NotificationContext.Type.CALLBACK);
+ synchronized (_manager) {
+ checkLiveInstancesObservation(new ArrayList<>(_cache.getLiveInstances().values()),
+ changeContext);
+ }
+ }
}
- }
- NotificationContext changeContext = new NotificationContext(_manager);
- changeContext.setType(NotificationContext.Type.CALLBACK);
- ClusterEvent event = new ClusterEvent(_clusterName, ClusterEventType.PeriodicalRebalance);
- event.addAttribute(AttributeName.helixmanager.name(), changeContext.getManager());
- event.addAttribute(AttributeName.changeContext.name(), changeContext);
- List<ZNRecord> dummy = new ArrayList<>();
- event.addAttribute(AttributeName.eventData.name(), dummy);
- // Should be able to process
- _eventQueue.put(event);
- _taskEventQueue.put(event.clone());
- logger.info("Controller periodicalRebalance event triggered!");
+ forceRebalance(_manager, _clusterEventType);
+ } catch (Throwable ex) {
+ logger.error("Time task failed. Rebalance task type: " + _clusterEventType + ", cluster: "
+ + _clusterName);
+ }
}
}
+ /* Trigger a rebalance pipeline */
+ private void forceRebalance(HelixManager manager, ClusterEventType eventType) {
+ NotificationContext changeContext = new NotificationContext(manager);
+ changeContext.setType(NotificationContext.Type.CALLBACK);
+ ClusterEvent event = new ClusterEvent(_clusterName, eventType);
+ event.addAttribute(AttributeName.helixmanager.name(), changeContext.getManager());
+ event.addAttribute(AttributeName.changeContext.name(), changeContext);
+ event.addAttribute(AttributeName.eventData.name(), new ArrayList<>());
+
+ _taskEventQueue.put(event);
+ _eventQueue.put(event);
+
+ logger.info("Controller rebalance event triggered with event type: " + eventType);
+ }
+
// TODO who should stop this timer
/**
* Starts the rebalancing timer with the specified period. Start the timer if necessary; If the
@@ -177,13 +198,14 @@ public class GenericHelixController implements IdealStateChangeListener,
void startRebalancingTimer(long period, HelixManager manager) {
if (period != _timerPeriod) {
logger.info("Controller starting timer at period " + period);
- if (_rebalanceTimer != null) {
- _rebalanceTimer.cancel();
+ if (_periodicalRebalanceTimer != null) {
+ _periodicalRebalanceTimer.cancel();
}
- _rebalanceTimer = new Timer(true);
+ _periodicalRebalanceTimer = new Timer(true);
_timerPeriod = period;
- _rebalanceTimer
- .scheduleAtFixedRate(new RebalanceTask(manager), _timerPeriod, _timerPeriod);
+ _periodicalRebalanceTimer
+ .scheduleAtFixedRate(new RebalanceTask(manager, ClusterEventType.PeriodicalRebalance),
+ _timerPeriod, _timerPeriod);
} else {
logger.info("Controller already has timer at period " + _timerPeriod);
}
@@ -192,10 +214,10 @@ public class GenericHelixController implements IdealStateChangeListener,
/**
* Stops the rebalancing timer
*/
- void stopRebalancingTimer() {
- if (_rebalanceTimer != null) {
- _rebalanceTimer.cancel();
- _rebalanceTimer = null;
+ void stopRebalancingTimers() {
+ if (_periodicalRebalanceTimer != null) {
+ _periodicalRebalanceTimer.cancel();
+ _periodicalRebalanceTimer = null;
}
_timerPeriod = Integer.MAX_VALUE;
}
@@ -260,7 +282,7 @@ public class GenericHelixController implements IdealStateChangeListener,
_lastSeenSessions = new AtomicReference<>();
_clusterName = clusterName;
_asyncTasksThreadPool =
- Executors.newFixedThreadPool(ASYNC_TASKS_THREADPOOL_SIZE, new ThreadFactory() {
+ Executors.newScheduledThreadPool(ASYNC_TASKS_THREADPOOL_SIZE, new ThreadFactory() {
@Override public Thread newThread(Runnable r) {
return new Thread(r, "GerenricHelixController-async_task_thread");
}
@@ -274,12 +296,22 @@ public class GenericHelixController implements IdealStateChangeListener,
_eventThread = new ClusterEventProcessor(_cache, _eventQueue);
_taskEventThread = new ClusterEventProcessor(_taskCache, _taskEventQueue);
+ _forceRebalanceTimer = new Timer();
+
initPipelines(_eventThread, _cache, false);
initPipelines(_taskEventThread, _taskCache, true);
_clusterStatusMonitor = new ClusterStatusMonitor(_clusterName);
}
+ private boolean isEventQueueEmpty(boolean taskQueue) {
+ if (taskQueue) {
+ return _taskEventQueue.isEmpty();
+ } else {
+ return _eventQueue.isEmpty();
+ }
+ }
+
/**
* lock-always: caller always needs to obtain an external lock before call, calls to handleEvent()
* should be serialized
@@ -314,7 +346,7 @@ public class GenericHelixController implements IdealStateChangeListener,
if (context != null) {
if (context.getType() == Type.FINALIZE) {
- stopRebalancingTimer();
+ stopRebalancingTimers();
logger.info("Get FINALIZE notification, skip the pipeline. Event :" + event.getEventType());
return;
} else {
@@ -345,6 +377,7 @@ public class GenericHelixController implements IdealStateChangeListener,
logger.info(String.format("START: Invoking %s controller pipeline for cluster %s event: %s",
manager.getClusterName(), getPipelineType(cache.isTaskCache()), event.getEventType()));
long startTime = System.currentTimeMillis();
+ boolean rebalanceFail = false;
for (Pipeline pipeline : pipelines) {
try {
pipeline.handle(event);
@@ -352,10 +385,36 @@ public class GenericHelixController implements IdealStateChangeListener,
} catch (Exception e) {
logger.error(
"Exception while executing " + getPipelineType(cache.isTaskCache()) + "pipeline: "
- + pipeline + ". Will not continue to next pipeline", e);
+ + pipeline + "for cluster ." + _clusterName
+ + ". Will not continue to next pipeline", e);
+
+ if (e instanceof HelixMetaDataAccessException) {
+ rebalanceFail = true;
+ // If pipeline failed due to read/write fails to zookeeper, retry the pipeline.
+ cache.requireFullRefresh();
+ logger.warn("Rebalance pipeline failed due to read failure from zookeeper, cluster: " + _clusterName);
+
+ // only push a retry event when there is no pending event in the corresponding event queue.
+ if (isEventQueueEmpty(cache.isTaskCache())) {
+ _continousRebalanceFailureCount ++;
+ long delay = getRetryDelay(_continousRebalanceFailureCount);
+ if (delay == 0) {
+ forceRebalance(manager, ClusterEventType.RetryRebalance);
+ } else {
+ _asyncTasksThreadPool
+ .schedule(new RebalanceTask(manager, ClusterEventType.RetryRebalance), delay,
+ TimeUnit.MILLISECONDS);
+ }
+ logger.info("Retry rebalance pipeline with delay " + delay + "ms for cluster: " + _clusterName);
+ }
+ }
+ _clusterStatusMonitor.reportRebalanceFailure();
break;
}
}
+ if (!rebalanceFail) {
+ _continousRebalanceFailureCount = 0;
+ }
long endTime = System.currentTimeMillis();
logger.info(
"END: Invoking " + getPipelineType(cache.isTaskCache()) + " controller pipeline for event: "
@@ -409,6 +468,19 @@ public class GenericHelixController implements IdealStateChangeListener,
resetClusterStatusMonitor();
}
+ /**
+ * get the delay on next retry rebalance due to zk read failure, We use a simple exponential
+ * backoff to make the delay between [10ms, 1000ms]
+ */
+ private long getRetryDelay(long failCount) {
+ int lowLimit = 5;
+ if (failCount <= lowLimit) {
+ return 0;
+ }
+ long backoff = (long) (Math.pow(2, failCount - lowLimit) * 10);
+ return Math.min(backoff, 1000);
+ }
+
@Override
@PreFetch(enabled = false)
public void onStateChange(String instanceName, List<CurrentState> statesInfo,
@@ -692,7 +764,7 @@ public class GenericHelixController implements IdealStateChangeListener,
}
public void shutdown() throws InterruptedException {
- stopRebalancingTimer();
+ stopRebalancingTimers();
terminateEventThread(_eventThread);
terminateEventThread(_taskEventThread);
http://git-wip-us.apache.org/repos/asf/helix/blob/149831c8/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
index dae9f57..1f4adca 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
@@ -65,7 +65,7 @@ public class CustomRebalancer extends AbstractRebalancer {
return partitionMapping;
}
- LOG.info("Computing BestPossibleMapping for " + resource);
+ LOG.info("Computing BestPossibleMapping for " + resource.getResourceName());
String stateModelDefName = idealState.getStateModelDefRef();
StateModelDefinition stateModelDef = cache.getStateModelDef(stateModelDefName);
http://git-wip-us.apache.org/repos/asf/helix/blob/149831c8/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index 2317197..3056531 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -141,7 +141,7 @@ public class ClusterDataCache {
long start = System.currentTimeMillis();
_propertyDataChangedMap.put(ChangeType.IDEAL_STATE, Boolean.valueOf(false));
clearCachedResourceAssignments();
- _idealStateCacheMap = accessor.getChildValuesMap(keyBuilder.idealStates());
+ _idealStateCacheMap = accessor.getChildValuesMap(keyBuilder.idealStates(), true);
if (LOG.isDebugEnabled()) {
LOG.debug("Reload IdealStates: " + _idealStateCacheMap.keySet() + ". Takes " + (
System.currentTimeMillis() - start) + " ms");
@@ -151,7 +151,7 @@ public class ClusterDataCache {
if (_propertyDataChangedMap.get(ChangeType.LIVE_INSTANCE)) {
_propertyDataChangedMap.put(ChangeType.LIVE_INSTANCE, Boolean.valueOf(false));
clearCachedResourceAssignments();
- _liveInstanceCacheMap = accessor.getChildValuesMap(keyBuilder.liveInstances());
+ _liveInstanceCacheMap = accessor.getChildValuesMap(keyBuilder.liveInstances(), true);
_updateInstanceOfflineTime = true;
LOG.debug("Reload LiveInstances: " + _liveInstanceCacheMap.keySet());
}
@@ -159,14 +159,14 @@ public class ClusterDataCache {
if (_propertyDataChangedMap.get(ChangeType.INSTANCE_CONFIG)) {
_propertyDataChangedMap.put(ChangeType.INSTANCE_CONFIG, Boolean.valueOf(false));
clearCachedResourceAssignments();
- _instanceConfigCacheMap = accessor.getChildValuesMap(keyBuilder.instanceConfigs());
+ _instanceConfigCacheMap = accessor.getChildValuesMap(keyBuilder.instanceConfigs(), true);
LOG.debug("Reload InstanceConfig: " + _instanceConfigCacheMap.keySet());
}
if (_propertyDataChangedMap.get(ChangeType.RESOURCE_CONFIG)) {
_propertyDataChangedMap.put(ChangeType.RESOURCE_CONFIG, Boolean.valueOf(false));
clearCachedResourceAssignments();
- _resourceConfigCacheMap = accessor.getChildValuesMap(accessor.keyBuilder().resourceConfigs());
+ _resourceConfigCacheMap = accessor.getChildValuesMap(accessor.keyBuilder().resourceConfigs(), true);
LOG.debug("Reload ResourceConfigs: " + _resourceConfigCacheMap.size());
}
@@ -184,9 +184,9 @@ public class ClusterDataCache {
}
Map<String, StateModelDefinition> stateDefMap =
- accessor.getChildValuesMap(keyBuilder.stateModelDefs());
+ accessor.getChildValuesMap(keyBuilder.stateModelDefs(), true);
_stateModelDefMap = new ConcurrentHashMap<>(stateDefMap);
- _constraintMap = accessor.getChildValuesMap(keyBuilder.constraints());
+ _constraintMap = accessor.getChildValuesMap(keyBuilder.constraints(), true);
_clusterConfig = accessor.getProperty(keyBuilder.clusterConfig());
@@ -793,7 +793,7 @@ public class ClusterDataCache {
}
- public void clearCachedResourceAssignments() {
+ protected void clearCachedResourceAssignments() {
_resourceAssignmentCache.clear();
_idealMappingCache.clear();
}
http://git-wip-us.apache.org/repos/asf/helix/blob/149831c8/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java
index d19c7e8..5312c35 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java
@@ -32,6 +32,7 @@ public enum ClusterEventType {
TargetExternalViewChange,
Resume,
PeriodicalRebalance,
+ RetryRebalance,
StateVerifier,
Unknown
}
http://git-wip-us.apache.org/repos/asf/helix/blob/149831c8/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
index a0c06f7..4c25741 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java
@@ -31,7 +31,6 @@ import org.apache.helix.AccessOption;
import org.apache.helix.BaseDataAccessor;
import org.apache.helix.GroupCommit;
import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.HelixException;
import org.apache.helix.HelixProperty;
import org.apache.helix.InstanceType;
import org.apache.helix.PropertyKey;
@@ -42,6 +41,7 @@ import org.apache.helix.ZNRecord;
import org.apache.helix.ZNRecordAssembler;
import org.apache.helix.ZNRecordBucketizer;
import org.apache.helix.ZNRecordUpdater;
+import org.apache.helix.api.exceptions.HelixMetaDataAccessException;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.MaintenanceSignal;
import org.apache.helix.model.Message;
@@ -124,7 +124,7 @@ public class ZKHelixDataAccessor implements HelixDataAccessor {
public <T extends HelixProperty> boolean setProperty(PropertyKey key, T value) {
PropertyType type = key.getType();
if (!value.isValid()) {
- throw new HelixException("The ZNRecord for " + type + " is not valid.");
+ throw new HelixMetaDataAccessException("The ZNRecord for " + type + " is not valid.");
}
String path = key.getPath();
@@ -201,7 +201,7 @@ public class ZKHelixDataAccessor implements HelixDataAccessor {
@Override
public <T extends HelixProperty> List<T> getProperty(List<PropertyKey> keys,
- boolean throwException) throws HelixException {
+ boolean throwException) throws HelixMetaDataAccessException {
if (keys == null || keys.size() == 0) {
return Collections.emptyList();
}
http://git-wip-us.apache.org/repos/asf/helix/blob/149831c8/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
index c83d9d5..6ee64a4 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java
@@ -37,6 +37,7 @@ import org.apache.helix.AccessOption;
import org.apache.helix.BaseDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.ZNRecord;
+import org.apache.helix.api.exceptions.HelixMetaDataAccessException;
import org.apache.helix.manager.zk.ZkAsyncCallbacks.CreateCallbackHandler;
import org.apache.helix.manager.zk.ZkAsyncCallbacks.DeleteCallbackHandler;
import org.apache.helix.manager.zk.ZkAsyncCallbacks.ExistsCallbackHandler;
@@ -400,7 +401,7 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
stats.set(i, cb._stat);
}
} else if (Code.get(cb.getRc()) != Code.NONODE && throwException) {
- throw new HelixException(String.format("Failed to read node %s", paths.get(i)));
+ throw new HelixMetaDataAccessException(String.format("Failed to read node %s", paths.get(i)));
} else {
pathFailToRead.put(paths.get(i), cb.getRc());
}
@@ -410,7 +411,7 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
}
return records;
} catch (Exception e) {
- throw new HelixException(String.format("Fail to read nodes for %s", paths));
+ throw new HelixMetaDataAccessException(String.format("Fail to read nodes for %s", paths));
} finally {
long endT = System.nanoTime();
if (LOG.isTraceEnabled()) {
@@ -440,14 +441,14 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> {
readCount--;
List<T> records = getChildren(parentPath, stats, options, true);
return records;
- } catch (HelixException e) {
+ } catch (HelixMetaDataAccessException e) {
if (readCount == 0) {
- throw new HelixException(String.format("Failed to get full list of %s", parentPath), e);
+ throw new HelixMetaDataAccessException(String.format("Failed to get full list of %s", parentPath), e);
}
try {
Thread.sleep(retryInterval);
} catch (InterruptedException interruptedException) {
- throw new HelixException("Fail to interrupt the sleep", interruptedException);
+ throw new HelixMetaDataAccessException("Fail to interrupt the sleep", interruptedException);
}
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/149831c8/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
index 4c0fcf2..73cd2ae 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java
@@ -32,13 +32,10 @@ import java.util.concurrent.locks.ReentrantLock;
import org.I0Itec.zkclient.DataUpdater;
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkDataListener;
-import org.I0Itec.zkclient.exception.ZkBadVersionException;
import org.I0Itec.zkclient.exception.ZkNoNodeException;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.helix.AccessOption;
-import org.apache.helix.BaseDataAccessor;
import org.apache.helix.HelixException;
-import org.apache.helix.InstanceType;
import org.apache.helix.manager.zk.ZkAsyncCallbacks.CreateCallbackHandler;
import org.apache.helix.manager.zk.ZkBaseDataAccessor.RetCode;
import org.apache.helix.store.HelixPropertyListener;
http://git-wip-us.apache.org/repos/asf/helix/blob/149831c8/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
index 40801b1..c7e0fdb 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
@@ -21,6 +21,7 @@ package org.apache.helix.monitoring.mbeans;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.helix.controller.stages.BestPossibleStateOutput;
import org.apache.helix.model.*;
import org.apache.helix.task.*;
@@ -66,6 +67,8 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
private Map<String, List<String>> _oldDisabledPartitions = Collections.emptyMap();
private Map<String, Long> _instanceMsgQueueSizes = Maps.newConcurrentMap();
private boolean _rebalanceFailure = false;
+ private AtomicLong _rebalanceFailureCount = new AtomicLong(0L);
+
private final ConcurrentHashMap<String, ResourceMonitor> _resourceMbeanMap = new ConcurrentHashMap<>();
@@ -441,8 +444,9 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
ResourceMonitor resourceMonitor = getOrCreateResourceMonitor(resourceName);
if (resourceMonitor != null) {
- resourceMonitor.updateRebalancerStat(numPendingRecoveryRebalancePartitions, numPendingLoadRebalancePartitions,
- numRecoveryRebalanceThrottledPartitions, numLoadRebalanceThrottledPartitions);
+ resourceMonitor.updateRebalancerStat(numPendingRecoveryRebalancePartitions,
+ numPendingLoadRebalancePartitions, numRecoveryRebalanceThrottledPartitions,
+ numLoadRebalanceThrottledPartitions);
}
}
@@ -655,7 +659,8 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
String resourceName = monitor.getResourceName();
String beanName = getPerInstanceResourceBeanName(instanceName, resourceName);
register(monitor, getObjectName(beanName));
- _perInstanceResourceMap.put(new PerInstanceResourceMonitor.BeanName(instanceName, resourceName), monitor);
+ _perInstanceResourceMap.put(
+ new PerInstanceResourceMonitor.BeanName(instanceName, resourceName), monitor);
}
}
@@ -785,4 +790,13 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
public void setEnabled(boolean enabled) {
this._enabled = enabled;
}
+
+ public void reportRebalanceFailure() {
+ _rebalanceFailureCount.incrementAndGet();
+ }
+
+ @Override
+ public long getRebalanceFailureCounter() {
+ return _rebalanceFailureCount.get();
+ }
}
http://git-wip-us.apache.org/repos/asf/helix/blob/149831c8/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java
index 4cdd357..49d316e 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitorMBean.java
@@ -61,4 +61,10 @@ public interface ClusterStatusMonitorMBean extends SensorNameProvider {
* @return 1 if cluster is paused, otherwise 0
*/
public long getPaused();
+
+ /**
+ * The number of failures during rebalance pipeline.
+ * @return
+ */
+ long getRebalanceFailureCounter();
}
http://git-wip-us.apache.org/repos/asf/helix/blob/149831c8/helix-core/src/test/java/org/apache/helix/MockAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/MockAccessor.java b/helix-core/src/test/java/org/apache/helix/MockAccessor.java
index e3189f8..8bcfe19 100644
--- a/helix-core/src/test/java/org/apache/helix/MockAccessor.java
+++ b/helix-core/src/test/java/org/apache/helix/MockAccessor.java
@@ -192,7 +192,7 @@ public class MockAccessor implements HelixDataAccessor {
@Override public <T extends HelixProperty> List<T> getChildValues(PropertyKey key,
boolean throwException) {
- return null;
+ return getChildValues(key);
}
@Override
@@ -203,14 +203,13 @@ public class MockAccessor implements HelixDataAccessor {
@Override public <T extends HelixProperty> Map<String, T> getChildValuesMap(PropertyKey key,
boolean throwException) {
- return null;
+ return getChildValuesMap(key);
}
@Override
public <T extends HelixProperty> boolean[] createChildren(List<PropertyKey> keys,
List<T> children) {
- // TODO Auto-generated method stub
- return null;
+ throw new HelixException("Method not implemented!");
}
@Override
@@ -236,7 +235,7 @@ public class MockAccessor implements HelixDataAccessor {
public <T extends HelixProperty> boolean[] updateChildren(List<String> paths,
List<DataUpdater<ZNRecord>> updaters, int options) {
// TODO Auto-generated method stub
- return null;
+ throw new HelixException("Method not implemented!");
}
@Override
@@ -252,6 +251,6 @@ public class MockAccessor implements HelixDataAccessor {
@Override public <T extends HelixProperty> List<T> getProperty(List<PropertyKey> keys,
boolean throwException) {
- return null;
+ return getProperty(keys);
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/149831c8/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterDataCacheSelectiveUpdate.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterDataCacheSelectiveUpdate.java b/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterDataCacheSelectiveUpdate.java
index fee1ee3..de4e9d1 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterDataCacheSelectiveUpdate.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/controller/TestClusterDataCacheSelectiveUpdate.java
@@ -139,17 +139,33 @@ public class TestClusterDataCacheSelectiveUpdate extends ZkStandAloneCMTestBase
}
@Override
+ public <T extends HelixProperty> List<T> getProperty(List<PropertyKey> keys, boolean throwException) {
+ for (PropertyKey key : keys) {
+ addCount(key);
+ }
+ return super.getProperty(keys, throwException);
+ }
+
+ @Override
public <T extends HelixProperty> T getProperty(PropertyKey key) {
addCount(key);
return super.getProperty(key);
}
+ @Override
public <T extends HelixProperty> Map<String, T> getChildValuesMap(PropertyKey key) {
Map<String, T> map = super.getChildValuesMap(key);
addCount(key, map.keySet().size());
return map;
}
+ @Override
+ public <T extends HelixProperty> Map<String, T> getChildValuesMap(PropertyKey key, boolean throwException) {
+ Map<String, T> map = super.getChildValuesMap(key, throwException);
+ addCount(key, map.keySet().size());
+ return map;
+ }
+
private void addCount(PropertyKey key) {
addCount(key, 1);
}
http://git-wip-us.apache.org/repos/asf/helix/blob/149831c8/helix-core/src/test/java/org/apache/helix/integration/manager/TestHelixDataAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/TestHelixDataAccessor.java b/helix-core/src/test/java/org/apache/helix/integration/manager/TestHelixDataAccessor.java
index ec0275a..39fdb2a 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/manager/TestHelixDataAccessor.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/manager/TestHelixDataAccessor.java
@@ -10,6 +10,10 @@ import org.apache.helix.HelixException;
import org.apache.helix.HelixProperty;
import org.apache.helix.PropertyKey;
import org.apache.helix.ZNRecord;
+import org.apache.helix.api.exceptions.HelixMetaDataAccessException;
+import org.apache.helix.controller.stages.AttributeName;
+import org.apache.helix.controller.stages.ClusterDataCache;
+import org.apache.helix.controller.stages.ReadClusterDataStage;
import org.apache.helix.integration.common.ZkIntegrationTestBase;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
@@ -18,21 +22,21 @@ import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-public class TestHelixDataAccessor extends ZkIntegrationTestBase{
+public class TestHelixDataAccessor extends ZkIntegrationTestBase {
private MockZkClient _zkClient;
+ BaseDataAccessor<ZNRecord> baseDataAccessor;
+ HelixDataAccessor accessor;
+ List<PropertyKey> propertyKeys;
@BeforeClass
public void beforeClass() {
_zkClient = new MockZkClient(ZK_ADDR);
- }
- @Test(expectedExceptions = HelixException.class)
- public void testHelixDataAccessorReadData() {
- BaseDataAccessor<ZNRecord> baseDataAccessor = new ZkBaseDataAccessor<>(_zkClient);
- HelixDataAccessor accessor = new ZKHelixDataAccessor("HELIX", baseDataAccessor);
+ baseDataAccessor = new ZkBaseDataAccessor<>(_zkClient);
+ accessor = new ZKHelixDataAccessor("HELIX", baseDataAccessor);
Map<String, HelixProperty> paths = new TreeMap<>();
- List<PropertyKey> propertyKeys = new ArrayList<>();
+ propertyKeys = new ArrayList<>();
for (int i = 0; i < 5; i++) {
PropertyKey key = accessor.keyBuilder().idealStates("RESOURCE" + i);
propertyKeys.add(key);
@@ -40,12 +44,42 @@ public class TestHelixDataAccessor extends ZkIntegrationTestBase{
accessor.setProperty(key, paths.get(key.getPath()));
}
- List<HelixProperty> data = accessor.getProperty(new ArrayList<>(propertyKeys));
+ List<HelixProperty> data = accessor.getProperty(new ArrayList<>(propertyKeys), true);
Assert.assertEquals(data.size(), 5);
PropertyKey key = accessor.keyBuilder().idealStates("RESOURCE6");
propertyKeys.add(key);
_zkClient.putData(key.getPath(), null);
- accessor.getProperty(new ArrayList<>(propertyKeys), true);
+ }
+
+ @Test
+ public void testHelixDataAccessorReadData() {
+ accessor.getProperty(new ArrayList<>(propertyKeys), false);
+ try {
+ accessor.getProperty(new ArrayList<>(propertyKeys), true);
+ Assert.fail();
+ } catch (HelixMetaDataAccessException ex) {
+ }
+
+ PropertyKey idealStates = accessor.keyBuilder().idealStates();
+ accessor.getChildValues(idealStates, false);
+ try {
+ accessor.getChildValues(idealStates, true);
+ Assert.fail();
+ } catch (HelixMetaDataAccessException ex) {
+ }
+
+ accessor.getChildValuesMap(idealStates, false);
+ try {
+ accessor.getChildValuesMap(idealStates, true);
+ Assert.fail();
+ } catch (HelixMetaDataAccessException ex) {
+ }
+ }
+
+ @Test (expectedExceptions = {HelixMetaDataAccessException.class})
+ public void testClusterDataCache() {
+ ClusterDataCache cache = new ClusterDataCache("MyCluster");
+ cache.refresh(accessor);
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/149831c8/helix-core/src/test/java/org/apache/helix/mock/MockZkClient.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/mock/MockZkClient.java b/helix-core/src/test/java/org/apache/helix/mock/MockZkClient.java
index 05d7894..5026999 100644
--- a/helix-core/src/test/java/org/apache/helix/mock/MockZkClient.java
+++ b/helix-core/src/test/java/org/apache/helix/mock/MockZkClient.java
@@ -1,14 +1,11 @@
package org.apache.helix.mock;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
-import org.I0Itec.zkclient.IZkConnection;
-import org.I0Itec.zkclient.serialize.ZkSerializer;
-import org.apache.helix.manager.zk.PathBasedZkSerializer;
import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.manager.zk.ZkAsyncCallbacks;
import org.apache.helix.manager.zk.ZkClient;
-import org.apache.zookeeper.KeeperException;
public class MockZkClient extends ZkClient {
Map<String, byte[]> _dataMap;
@@ -18,79 +15,6 @@ public class MockZkClient extends ZkClient {
_dataMap = new HashMap<>();
setZkSerializer(new ZNRecordSerializer());
}
- public MockZkClient(IZkConnection zkConnection, int connectionTimeout, long operationRetryTimeout,
- PathBasedZkSerializer zkSerializer, String monitorType, String monitorKey,
- String monitorInstanceName, boolean monitorRootPathOnly) {
- super(zkConnection, connectionTimeout, operationRetryTimeout, zkSerializer, monitorType,
- monitorKey, monitorInstanceName, monitorRootPathOnly);
- }
-
- public MockZkClient(IZkConnection connection, int connectionTimeout,
- PathBasedZkSerializer zkSerializer, String monitorType, String monitorKey,
- long operationRetryTimeout) {
- super(connection, connectionTimeout, zkSerializer, monitorType, monitorKey,
- operationRetryTimeout);
- }
-
- public MockZkClient(IZkConnection connection, int connectionTimeout,
- PathBasedZkSerializer zkSerializer, String monitorType, String monitorKey) {
- super(connection, connectionTimeout, zkSerializer, monitorType, monitorKey);
- }
-
- public MockZkClient(String zkServers, String monitorType, String monitorKey) {
- super(zkServers, monitorType, monitorKey);
- }
-
- public MockZkClient(String zkServers, int sessionTimeout, int connectionTimeout,
- PathBasedZkSerializer zkSerializer, String monitorType, String monitorKey) {
- super(zkServers, sessionTimeout, connectionTimeout, zkSerializer, monitorType, monitorKey);
- }
-
- public MockZkClient(IZkConnection connection, int connectionTimeout,
- PathBasedZkSerializer zkSerializer) {
- super(connection, connectionTimeout, zkSerializer);
- }
-
- public MockZkClient(IZkConnection connection, int connectionTimeout, ZkSerializer zkSerializer) {
- super(connection, connectionTimeout, zkSerializer);
- }
-
- public MockZkClient(IZkConnection connection, int connectionTimeout) {
- super(connection, connectionTimeout);
- }
-
- public MockZkClient(IZkConnection connection) {
- super(connection);
- }
-
- public MockZkClient(String zkServers, int sessionTimeout, int connectionTimeout,
- ZkSerializer zkSerializer) {
- super(zkServers, sessionTimeout, connectionTimeout, zkSerializer);
- }
-
- public MockZkClient(String zkServers, int sessionTimeout, int connectionTimeout,
- PathBasedZkSerializer zkSerializer) {
- super(zkServers, sessionTimeout, connectionTimeout, zkSerializer);
- }
-
- public MockZkClient(String zkServers, int sessionTimeout, int connectionTimeout) {
- super(zkServers, sessionTimeout, connectionTimeout);
- }
-
- public MockZkClient(String zkServers, int connectionTimeout) {
- super(zkServers, connectionTimeout);
- }
-
- public MockZkClient(String zkServers, int sessionTimeout, int connectionTimeout,
- ZkSerializer zkSerializer, long operationRetryTimeout) {
- super(zkServers, sessionTimeout, connectionTimeout, zkSerializer, operationRetryTimeout);
- }
-
- public MockZkClient(IZkConnection zkConnection, int connectionTimeout, ZkSerializer zkSerializer,
- long operationRetryTimeout) {
- super(zkConnection, connectionTimeout, zkSerializer, operationRetryTimeout);
- }
-
@Override
public void asyncGetData(final String path,
@@ -102,8 +26,20 @@ public class MockZkClient extends ZkClient {
cb.processResult(0, path, null, _dataMap.get(path), null);
}
} else {
- cb.processResult(KeeperException.Code.NONODE.intValue(), path, null, null, null);
+ super.asyncGetData(path, cb);
+ }
+ }
+
+ public List<String> getChildren(final String path) {
+ List<String> children = super.getChildren(path);
+ for (String p : _dataMap.keySet()) {
+ if (p.contains(path)) {
+ String[] paths = p.split("/");
+ children.add(paths[paths.length-1]);
+ }
}
+
+ return children;
}
public void putData(String path, byte[] data) {