You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by lx...@apache.org on 2017/02/09 21:24:28 UTC
helix git commit: [HELIX-653] Fix enable/disable partition in
instances for resource specific
Repository: helix
Updated Branches:
refs/heads/helix-0.6.x 118691e0b -> befcc65c2
[HELIX-653] Fix enable/disable partition in instances for resource specific
Helix currently enable/disable partition in instances across all the resources if partition is same. Fix it with resource associated partition enable/disable.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/befcc65c
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/befcc65c
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/befcc65c
Branch: refs/heads/helix-0.6.x
Commit: befcc65c2e67df9767ab5d2eca837339894c5581
Parents: 118691e
Author: Junkai Xue <jx...@linkedin.com>
Authored: Thu Feb 9 11:36:32 2017 -0800
Committer: Junkai Xue <jx...@linkedin.com>
Committed: Thu Feb 9 11:36:32 2017 -0800
----------------------------------------------------------------------
.../rebalancer/AbstractRebalancer.java | 2 +-
.../controller/rebalancer/CustomRebalancer.java | 2 +-
.../rebalancer/DelayedAutoRebalancer.java | 2 +-
.../controller/stages/ClusterDataCache.java | 4 +-
.../controller/stages/ReadClusterDataStage.java | 8 +-
.../apache/helix/manager/zk/ZKHelixAdmin.java | 21 +--
.../org/apache/helix/model/InstanceConfig.java | 128 ++++++++++++++++---
.../monitoring/mbeans/ClusterStatusMonitor.java | 16 ++-
.../monitoring/mbeans/InstanceMonitor.java | 27 ++--
.../monitoring/mbeans/InstanceMonitorMBean.java | 6 +
.../java/org/apache/helix/util/HelixUtil.java | 14 ++
.../helix/manager/zk/TestZkHelixAdmin.java | 54 ++++++++
12 files changed, 228 insertions(+), 56 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/befcc65c/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
index 7bf2153..9cd2f96 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/AbstractRebalancer.java
@@ -88,7 +88,7 @@ public abstract class AbstractRebalancer implements Rebalancer, MappingCalculato
Map<String, String> currentStateMap =
currentStateOutput.getCurrentStateMap(resource.getResourceName(), partition);
Set<String> disabledInstancesForPartition =
- cache.getDisabledInstancesForPartition(partition.toString());
+ cache.getDisabledInstancesForPartition(resource.getResourceName(), partition.toString());
List<String> preferenceList = ConstraintBasedAssignment
.getPreferenceList(partition, idealState,
Collections.unmodifiableSet(cache.getLiveInstances().keySet()));
http://git-wip-us.apache.org/repos/asf/helix/blob/befcc65c/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
index a4f5e83..fede2b3 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/CustomRebalancer.java
@@ -65,7 +65,7 @@ public class CustomRebalancer extends AbstractRebalancer {
Map<String, String> currentStateMap =
currentStateOutput.getCurrentStateMap(resource.getResourceName(), partition);
Set<String> disabledInstancesForPartition =
- cache.getDisabledInstancesForPartition(partition.toString());
+ cache.getDisabledInstancesForPartition(resource.getResourceName(), partition.toString());
Map<String, String> idealStateMap =
idealState.getInstanceStateMap(partition.getPartitionName());
Map<String, String> bestStateForPartition =
http://git-wip-us.apache.org/repos/asf/helix/blob/befcc65c/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
index 1f1d94f..9f52d4f 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/DelayedAutoRebalancer.java
@@ -318,7 +318,7 @@ public class DelayedAutoRebalancer extends AbstractRebalancer {
Map<String, String> currentStateMap =
currentStateOutput.getCurrentStateMap(resource.getResourceName(), partition);
Set<String> disabledInstancesForPartition =
- cache.getDisabledInstancesForPartition(partition.toString());
+ cache.getDisabledInstancesForPartition(resource.getResourceName(), partition.toString());
List<String> preferenceList =
ConstraintBasedAssignment.getPreferenceList(partition, idealState, activeNodes);
Map<String, String> bestStateForPartition = ConstraintBasedAssignment
http://git-wip-us.apache.org/repos/asf/helix/blob/befcc65c/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index 9cccc64..e72354f 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -487,12 +487,12 @@ public class ClusterDataCache {
* @param partition
* @return
*/
- public Set<String> getDisabledInstancesForPartition(String partition) {
+ public Set<String> getDisabledInstancesForPartition(String resource, String partition) {
Set<String> disabledInstancesSet = new HashSet<String>();
for (String instance : _instanceConfigMap.keySet()) {
InstanceConfig config = _instanceConfigMap.get(instance);
if (config.getInstanceEnabled() == false
- || config.getInstanceEnabledForPartition(partition) == false) {
+ || config.getInstanceEnabledForPartition(resource, partition) == false) {
disabledInstancesSet.add(instance);
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/befcc65c/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
index abce878..521d315 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ReadClusterDataStage.java
@@ -67,7 +67,7 @@ public class ReadClusterDataStage extends AbstractBaseStage {
Set<String> instanceSet = Sets.newHashSet();
Set<String> liveInstanceSet = Sets.newHashSet();
Set<String> disabledInstanceSet = Sets.newHashSet();
- Map<String, Set<String>> disabledPartitions = Maps.newHashMap();
+ Map<String, Map<String, String>> disabledPartitions = Maps.newHashMap();
Map<String, Set<String>> tags = Maps.newHashMap();
Map<String, LiveInstance> liveInstanceMap = _cache.getLiveInstances();
for (Map.Entry<String, InstanceConfig> e : _cache.getInstanceConfigMap().entrySet()) {
@@ -80,11 +80,7 @@ public class ReadClusterDataStage extends AbstractBaseStage {
if (!config.getInstanceEnabled()) {
disabledInstanceSet.add(instanceName);
}
- List<String> disabledPartitionsList = config.getDisabledPartitions();
- Set<String> partitionNames =
- disabledPartitionsList != null ? new HashSet<String>(config.getDisabledPartitions())
- : new HashSet<String>();
- disabledPartitions.put(instanceName, partitionNames);
+ disabledPartitions.put(instanceName, config.getDisabledPartitionsMap());
Set<String> instanceTags = Sets.newHashSet(config.getTags());
tags.put(instanceName, instanceTags);
}
http://git-wip-us.apache.org/repos/asf/helix/blob/befcc65c/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
index b1ce406..378777f 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java
@@ -70,6 +70,7 @@ import org.apache.helix.model.Message.MessageType;
import org.apache.helix.model.PauseSignal;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.tools.DefaultIdealStateCalculator;
+import org.apache.helix.util.HelixUtil;
import org.apache.helix.util.RebalanceUtil;
import org.apache.log4j.Logger;
@@ -300,24 +301,12 @@ public class ZKHelixAdmin implements HelixAdmin {
+ ", participant config is null");
}
- // TODO: merge with InstanceConfig.setInstanceEnabledForPartition
- List<String> list =
- currentData.getListField(InstanceConfigProperty.HELIX_DISABLED_PARTITION.toString());
- Set<String> disabledPartitions = new HashSet<String>();
- if (list != null) {
- disabledPartitions.addAll(list);
+ InstanceConfig instanceConfig = new InstanceConfig(currentData);
+ for (String partitionName : partitionNames) {
+ instanceConfig.setInstanceEnabledForPartition(resourceName, partitionName, enabled);
}
- if (enabled) {
- disabledPartitions.removeAll(partitionNames);
- } else {
- disabledPartitions.addAll(partitionNames);
- }
-
- list = new ArrayList<String>(disabledPartitions);
- Collections.sort(list);
- currentData.setListField(InstanceConfigProperty.HELIX_DISABLED_PARTITION.toString(), list);
- return currentData;
+ return instanceConfig.getRecord();
}
}, AccessOption.PERSISTENT);
}
http://git-wip-us.apache.org/repos/asf/helix/blob/befcc65c/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
index 52edaa7..0db18fd 100644
--- a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
@@ -21,12 +21,15 @@ package org.apache.helix.model;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import org.apache.helix.HelixProperty;
import org.apache.helix.ZNRecord;
+import org.apache.helix.util.HelixUtil;
import org.apache.log4j.Logger;
/**
@@ -223,12 +226,15 @@ public class InstanceConfig extends HelixProperty {
* @param partition the partition name to check
* @return true if the instance is enabled for the partition, false otherwise
*/
- public boolean getInstanceEnabledForPartition(String partition) {
- // Map<String, String> disabledPartitionMap =
- // _record.getMapField(InstanceConfigProperty.HELIX_DISABLED_PARTITION.toString());
- List<String> disabledPartitions =
- _record.getListField(InstanceConfigProperty.HELIX_DISABLED_PARTITION.toString());
- if (disabledPartitions != null && disabledPartitions.contains(partition)) {
+ public boolean getInstanceEnabledForPartition(String resource, String partition) {
+ // TODO: Remove this old partition list check once old get API removed.
+ List<String> oldDisabledPartition =
+ _record.getListField(InstanceConfigProperty.HELIX_DISABLED_PARTITION.name());
+ Map<String, String> disabledPartitionsMap =
+ _record.getMapField(InstanceConfigProperty.HELIX_DISABLED_PARTITION.name());
+ if ((disabledPartitionsMap != null && disabledPartitionsMap.containsKey(resource) && HelixUtil
+ .deserializeByComma(disabledPartitionsMap.get(resource)).contains(partition))
+ || oldDisabledPartition != null && oldDisabledPartition.contains(partition)) {
return false;
} else {
return true;
@@ -237,34 +243,126 @@ public class InstanceConfig extends HelixProperty {
/**
* Get the partitions disabled by this instance
+ * This method will be deprecated since we persist disabled partitions
+ * based on instance and resource. The result will not be accurate as we
+ * union all the partitions disabled.
+ *
* @return a list of partition names
*/
+ @Deprecated
public List<String> getDisabledPartitions() {
- return _record.getListField(InstanceConfigProperty.HELIX_DISABLED_PARTITION.toString());
+ List<String> oldDisabled =
+ _record.getListField(InstanceConfigProperty.HELIX_DISABLED_PARTITION.name());
+ if (!_record.getMapFields().containsKey(InstanceConfigProperty.HELIX_DISABLED_PARTITION.name())
+ && oldDisabled == null) {
+ return null;
+ }
+
+ Set<String> disabledPartitions = new HashSet<String>();
+ if (oldDisabled != null) {
+ disabledPartitions.addAll(oldDisabled);
+ }
+
+ for (String perResource : _record
+ .getMapField(InstanceConfigProperty.HELIX_DISABLED_PARTITION.name()).values()) {
+ disabledPartitions.addAll(HelixUtil.deserializeByComma(perResource));
+ }
+
+ return new ArrayList<String>(disabledPartitions);
+ }
+
+ /**
+ * Get the partitions disabled by resource on this instance
+ * @param resourceName The resource of disabled partitions
+ * @return A list of partition names if exists, otherwise will be null
+ */
+ public List<String> getDisabledPartitions(String resourceName) {
+ // TODO: Remove this logic getting data from list field when getDisabledParition() removed.
+ List<String> oldDisabled =
+ _record.getListField(InstanceConfigProperty.HELIX_DISABLED_PARTITION.name());
+ if ((!_record.getMapFields().containsKey(InstanceConfigProperty.HELIX_DISABLED_PARTITION.name())
+ || !_record.getMapField(InstanceConfigProperty.HELIX_DISABLED_PARTITION.name())
+ .containsKey(resourceName)) && oldDisabled == null) {
+ return null;
+ }
+
+ Set<String> disabledPartitions = new HashSet<String>();
+ if (oldDisabled != null) {
+ disabledPartitions.addAll(oldDisabled);
+ }
+
+ disabledPartitions.addAll(HelixUtil.deserializeByComma(
+ _record.getMapField(InstanceConfigProperty.HELIX_DISABLED_PARTITION.name())
+ .get(resourceName)));
+
+ return new ArrayList<String>(disabledPartitions);
}
/**
- * Set the enabled state for a partition on this instance
+ * Get a map that mapping resource name to disabled partitions
+ * @return A map of resource name mapping to disabled partitions
+ */
+ public Map<String, String> getDisabledPartitionsMap() {
+ return _record.getMapField(InstanceConfigProperty.HELIX_DISABLED_PARTITION.name());
+ }
+
+ /**
+ * Set the enabled state for a partition on this instance across all the resources
+ *
* @param partitionName the partition to set
* @param enabled true to enable, false to disable
*/
+ @Deprecated
public void setInstanceEnabledForPartition(String partitionName, boolean enabled) {
- List<String> list =
- _record.getListField(InstanceConfigProperty.HELIX_DISABLED_PARTITION.toString());
+ Map<String, String> disabledPartitionMap =
+ _record.getMapField(InstanceConfigProperty.HELIX_DISABLED_PARTITION.name());
+ for (String resourceName : disabledPartitionMap.keySet()) {
+ setInstanceEnabledForPartition(resourceName, partitionName, enabled);
+ }
+ }
+
+ public void setInstanceEnabledForPartition(String resourceName, String partitionName,
+ boolean enabled) {
+ // Get old disabled partitions if exists
+ // TODO: Remove this when getDisabledParition() removed.
+ List<String> oldDisabledPartitions =
+ _record.getListField(InstanceConfigProperty.HELIX_DISABLED_PARTITION.name());
+
+ Map<String, String> currentDisabled =
+ _record.getMapField(InstanceConfigProperty.HELIX_DISABLED_PARTITION.name());
Set<String> disabledPartitions = new HashSet<String>();
- if (list != null) {
- disabledPartitions.addAll(list);
+
+ if (currentDisabled != null && currentDisabled.containsKey(resourceName)) {
+ disabledPartitions.addAll(HelixUtil.deserializeByComma(currentDisabled.get(resourceName)));
}
if (enabled) {
disabledPartitions.remove(partitionName);
+ if (oldDisabledPartitions != null && oldDisabledPartitions.contains(partitionName)) {
+ oldDisabledPartitions.remove(partitionName);
+ }
} else {
disabledPartitions.add(partitionName);
}
- list = new ArrayList<String>(disabledPartitions);
- Collections.sort(list);
- _record.setListField(InstanceConfigProperty.HELIX_DISABLED_PARTITION.toString(), list);
+ List<String> disabledPartitionList = new ArrayList<String>(disabledPartitions);
+ Collections.sort(disabledPartitionList);
+ if (currentDisabled == null) {
+ currentDisabled = new HashMap<String, String>();
+ }
+
+ if (disabledPartitionList != null) {
+ currentDisabled.put(resourceName, HelixUtil.serializeByComma(disabledPartitionList));
+ }
+
+ if (!currentDisabled.isEmpty()) {
+ _record.setMapField(InstanceConfigProperty.HELIX_DISABLED_PARTITION.name(), currentDisabled);
+ }
+
+ if (oldDisabledPartitions != null && !oldDisabledPartitions.isEmpty()) {
+ _record.setListField(InstanceConfigProperty.HELIX_DISABLED_PARTITION.name(),
+ oldDisabledPartitions);
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/helix/blob/befcc65c/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
index 90cdc5a..22d7209 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/ClusterStatusMonitor.java
@@ -46,6 +46,7 @@ import org.apache.helix.task.TaskState;
import org.apache.helix.task.Workflow;
import org.apache.helix.task.WorkflowConfig;
import org.apache.helix.task.WorkflowContext;
+import org.apache.helix.util.HelixUtil;
import org.apache.log4j.Logger;
import com.google.common.collect.Maps;
@@ -74,7 +75,7 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
private Set<String> _liveInstances = Collections.emptySet();
private Set<String> _instances = Collections.emptySet();
private Set<String> _disabledInstances = Collections.emptySet();
- private Map<String, Set<String>> _disabledPartitions = Collections.emptyMap();
+ private Map<String, Map<String, String>> _disabledPartitions = Collections.emptyMap();
private Map<String, Long> _instanceMsgQueueSizes = Maps.newConcurrentMap();
private final ConcurrentHashMap<String, ResourceMonitor> _resourceMbeanMap =
@@ -130,11 +131,14 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
return _disabledInstances.size();
}
- @Override
- public long getDisabledPartitionsGauge() {
+ @Override public long getDisabledPartitionsGauge() {
int numDisabled = 0;
- for (String instance : _disabledPartitions.keySet()) {
- numDisabled += _disabledPartitions.get(instance).size();
+ for (Map<String, String> perInstance : _disabledPartitions.values()) {
+ for (String partitions : perInstance.values()) {
+ if (partitions != null) {
+ numDisabled += HelixUtil.deserializeByComma(partitions).size();
+ }
+ }
}
return numDisabled;
}
@@ -196,7 +200,7 @@ public class ClusterStatusMonitor implements ClusterStatusMonitorMBean {
* @param tags a map of instance name to the set of tags on it
*/
public void setClusterInstanceStatus(Set<String> liveInstanceSet, Set<String> instanceSet,
- Set<String> disabledInstanceSet, Map<String, Set<String>> disabledPartitions,
+ Set<String> disabledInstanceSet, Map<String, Map<String, String>> disabledPartitions,
Map<String, Set<String>> tags) {
// Unregister beans for instances that are no longer configured
Set<String> toUnregister = Sets.newHashSet(_instanceMbeanMap.keySet());
http://git-wip-us.apache.org/repos/asf/helix/blob/befcc65c/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java
index dc4a0a5..46d8a96 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitor.java
@@ -21,8 +21,12 @@ package org.apache.helix.monitoring.mbeans;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Set;
+import org.apache.helix.task.TaskUtil;
+import org.apache.helix.util.HelixUtil;
+
import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
@@ -34,7 +38,7 @@ public class InstanceMonitor implements InstanceMonitorMBean {
private final String _clusterName;
private final String _participantName;
private List<String> _tags;
- private List<String> _disabledPartitions;
+ private long _disabledPartitions;
private boolean _isUp;
private boolean _isEnabled;
private long _totalMessageReceived;
@@ -48,7 +52,7 @@ public class InstanceMonitor implements InstanceMonitorMBean {
_clusterName = clusterName;
_participantName = participantName;
_tags = ImmutableList.of(ClusterStatusMonitor.DEFAULT_TAG);
- _disabledPartitions = Collections.emptyList();
+ _disabledPartitions = 0L;
_isUp = false;
_isEnabled = false;
_totalMessageReceived = 0;
@@ -75,6 +79,11 @@ public class InstanceMonitor implements InstanceMonitorMBean {
return _totalMessageReceived;
}
+ @Override
+ public long getDisabledPartitions() {
+ return _disabledPartitions;
+ }
+
/**
* Get all the tags currently on this instance
* @return list of tags
@@ -110,7 +119,7 @@ public class InstanceMonitor implements InstanceMonitorMBean {
* @param isLive true if running, false otherwise
* @param isEnabled true if enabled, false if disabled
*/
- public synchronized void updateInstance(Set<String> tags, Set<String> disabledPartitions,
+ public synchronized void updateInstance(Set<String> tags, Map<String, String> disabledPartitions,
boolean isLive, boolean isEnabled) {
if (tags == null || tags.isEmpty()) {
_tags = ImmutableList.of(ClusterStatusMonitor.DEFAULT_TAG);
@@ -118,11 +127,13 @@ public class InstanceMonitor implements InstanceMonitorMBean {
_tags = Lists.newArrayList(tags);
Collections.sort(_tags);
}
- if (disabledPartitions == null) {
- _disabledPartitions = Collections.emptyList();
- } else {
- _disabledPartitions = Lists.newArrayList(disabledPartitions);
- Collections.sort(_disabledPartitions);
+ _disabledPartitions = 0L;
+ if (disabledPartitions != null) {
+ for (String partitions : disabledPartitions.values()) {
+ if (partitions != null) {
+ _disabledPartitions += HelixUtil.deserializeByComma(partitions).size();
+ }
+ }
}
_isUp = isLive;
_isEnabled = isEnabled;
http://git-wip-us.apache.org/repos/asf/helix/blob/befcc65c/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitorMBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitorMBean.java b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitorMBean.java
index 4d949b1..a3221d8 100644
--- a/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitorMBean.java
+++ b/helix-core/src/main/java/org/apache/helix/monitoring/mbeans/InstanceMonitorMBean.java
@@ -42,4 +42,10 @@ public interface InstanceMonitorMBean extends SensorNameProvider {
* @return The total number of messages sent to this instance
*/
public long getTotalMessageReceived();
+
+ /**
+ * Get the total disabled partitions number for this instance
+ * @return The total number of disabled partitions
+ */
+ public long getDisabledPartitions();
}
http://git-wip-us.apache.org/repos/asf/helix/blob/befcc65c/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
index 15d2f7b..4adf8ab 100644
--- a/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/HelixUtil.java
@@ -19,6 +19,9 @@ package org.apache.helix.util;
* under the License.
*/
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
import java.util.Map;
import java.util.TreeMap;
@@ -68,6 +71,17 @@ public final class HelixUtil {
return path.substring(path.lastIndexOf('/') + 1);
}
+ public static String serializeByComma(List<String> objects) {
+ return String.join(",", objects);
+ }
+
+ public static List<String> deserializeByComma(String object) {
+ if (object.length() == 0) {
+ return Collections.EMPTY_LIST;
+ }
+ return Arrays.asList(object.split(","));
+ }
+
/**
* parse a csv-formated key-value pairs
* @param keyValuePairs : csv-formatted key-value pairs. e.g. k1=v1,k2=v2,...
http://git-wip-us.apache.org/repos/asf/helix/blob/befcc65c/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
index e8f8f56..bb1b079 100644
--- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
+++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkHelixAdmin.java
@@ -20,6 +20,7 @@ package org.apache.helix.manager.zk;
*/
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
@@ -407,4 +408,57 @@ public class TestZkHelixAdmin extends ZkUnitTestBase {
AssertJUnit.assertEquals(allResources.size(), 4);
AssertJUnit.assertEquals(resourcesWithTag.size(), 2);
}
+ @Test
+ public void testEnableDisablePartitions() {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ String instanceName = "TestInstance";
+ String testResourcePrefix = "TestResource";
+ System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis()));
+ HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
+ admin.addCluster(clusterName, true);
+ admin.addInstance(clusterName, new InstanceConfig(instanceName));
+
+ // Test disable instances with resources
+ admin.enablePartition(false, clusterName, instanceName, testResourcePrefix + "0",
+ Arrays.asList(new String[]{"1", "2"}));
+ admin.enablePartition(false, clusterName, instanceName, testResourcePrefix + "1",
+ Arrays.asList(new String[]{"2", "3", "4"}));
+ InstanceConfig instanceConfig = admin.getInstanceConfig(clusterName, instanceName);
+ Assert.assertEquals(instanceConfig.getDisabledPartitions(testResourcePrefix + "0").size(), 2);
+ Assert.assertEquals(instanceConfig.getDisabledPartitions(testResourcePrefix + "1").size(), 3);
+
+ // Test enable partition across resources
+ instanceConfig.setInstanceEnabledForPartition("2", true);
+ Assert.assertEquals(instanceConfig.getDisabledPartitions(testResourcePrefix + "0").size(), 1);
+ Assert.assertEquals(instanceConfig.getDisabledPartitions(testResourcePrefix + "1").size(), 2);
+
+ // Test disable partition across resources
+ instanceConfig.setInstanceEnabledForPartition("10", false);
+ Assert.assertEquals(instanceConfig.getDisabledPartitions(testResourcePrefix + "0").size(), 2);
+ Assert.assertEquals(instanceConfig.getDisabledPartitions(testResourcePrefix + "1").size(), 3);
+ }
+
+ @Test
+ public void testLegacyEnableDisablePartition() {
+ String instanceName = "TestInstanceLegacy";
+ String testResourcePrefix = "TestResourceLegacy";
+ ZNRecord record = new ZNRecord(instanceName);
+ List<String> disabledPartitions =
+ new ArrayList<String>(Arrays.asList(new String[] { "1", "2", "3" }));
+ record.setListField(InstanceConfig.InstanceConfigProperty.HELIX_DISABLED_PARTITION.name(),
+ disabledPartitions);
+ InstanceConfig instanceConfig = new InstanceConfig(record);
+ instanceConfig.setInstanceEnabledForPartition(testResourcePrefix, "2", false);
+ Assert.assertEquals(instanceConfig.getDisabledPartitions(testResourcePrefix).size(), 3);
+ Assert.assertEquals(instanceConfig.getRecord()
+ .getListField(InstanceConfig.InstanceConfigProperty.HELIX_DISABLED_PARTITION.name()).size(),
+ 3);
+ instanceConfig.setInstanceEnabledForPartition(testResourcePrefix, "2", true);
+ Assert.assertEquals(instanceConfig.getDisabledPartitions(testResourcePrefix).size(), 2);
+ Assert.assertEquals(instanceConfig.getRecord()
+ .getListField(InstanceConfig.InstanceConfigProperty.HELIX_DISABLED_PARTITION.name()).size(),
+ 2);
+ }
}