You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by zz...@apache.org on 2013/09/12 23:13:54 UTC
[1/2] [HELIX-100] improve helix config api, namespaced user configs
Updated Branches:
refs/heads/helix-logical-model 7de6a7fe8 -> d8ef5a2d7
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d8ef5a2d/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
index 9fb7ba9..bd6cd13 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestAutoRebalanceStrategy.java
@@ -37,6 +37,8 @@ import org.apache.helix.HelixDefinedState;
import org.apache.helix.Mocks.MockAccessor;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.ZNRecord;
+import org.apache.helix.api.Id;
+import org.apache.helix.api.State;
import org.apache.helix.controller.rebalancer.util.ConstraintBasedAssignment;
import org.apache.helix.controller.stages.ClusterDataCache;
import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme;
@@ -122,13 +124,14 @@ public class TestAutoRebalanceStrategy {
*/
private StateModelDefinition getIncompleteStateModelDef(String modelName, String initialState,
LinkedHashMap<String, Integer> states) {
- StateModelDefinition.Builder builder = new StateModelDefinition.Builder(modelName);
- builder.initialState(initialState);
- int i = states.size();
+ StateModelDefinition.Builder builder =
+ new StateModelDefinition.Builder(Id.stateModelDef(modelName));
+ builder.initialState(State.from(initialState));
+ int i = 0;
for (String state : states.keySet()) {
- builder.addState(state, i);
- builder.upperBound(state, states.get(state));
- i--;
+ builder.addState(State.from(state), i);
+ builder.upperBound(State.from(state), states.get(state));
+ i++;
}
return builder.build();
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d8ef5a2d/helix-core/src/test/java/org/apache/helix/controller/strategy/TestNewAutoRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestNewAutoRebalanceStrategy.java b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestNewAutoRebalanceStrategy.java
index d220db8..6db3e6c 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestNewAutoRebalanceStrategy.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestNewAutoRebalanceStrategy.java
@@ -42,6 +42,7 @@ import org.apache.helix.api.ParticipantId;
import org.apache.helix.api.PartitionId;
import org.apache.helix.api.ResourceId;
import org.apache.helix.api.State;
+import org.apache.helix.api.UserConfig;
import org.apache.helix.controller.rebalancer.util.NewConstraintBasedAssignment;
import org.apache.helix.controller.strategy.AutoRebalanceStrategy.ReplicaPlacementScheme;
import org.apache.helix.model.CurrentState;
@@ -131,13 +132,14 @@ public class TestNewAutoRebalanceStrategy {
*/
private StateModelDefinition getIncompleteStateModelDef(String modelName, String initialState,
LinkedHashMap<String, Integer> states) {
- StateModelDefinition.Builder builder = new StateModelDefinition.Builder(modelName);
- builder.initialState(initialState);
- int i = states.size();
+ StateModelDefinition.Builder builder =
+ new StateModelDefinition.Builder(Id.stateModelDef(modelName));
+ builder.initialState(State.from(initialState));
+ int i = 0;
for (String state : states.keySet()) {
- builder.addState(state, i);
- builder.upperBound(state, states.get(state));
- i--;
+ builder.addState(State.from(state), i);
+ builder.upperBound(State.from(state), states.get(state));
+ i++;
}
return builder.build();
}
@@ -231,7 +233,7 @@ public class TestNewAutoRebalanceStrategy {
ParticipantId participantId = Id.participant(nodeName);
Participant participant =
new Participant(participantId, "hostname", 0, true, disabledPartitionIdSet, tags,
- null, currentStateMap, messageMap);
+ null, currentStateMap, messageMap, new UserConfig(participantId));
liveParticipantMap.put(participantId, participant);
}
List<ParticipantId> participantPreferenceList =
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d8ef5a2d/helix-examples/src/main/java/org/apache/helix/examples/NewModelExample.java
----------------------------------------------------------------------
diff --git a/helix-examples/src/main/java/org/apache/helix/examples/NewModelExample.java b/helix-examples/src/main/java/org/apache/helix/examples/NewModelExample.java
new file mode 100644
index 0000000..642a8a3
--- /dev/null
+++ b/helix-examples/src/main/java/org/apache/helix/examples/NewModelExample.java
@@ -0,0 +1,113 @@
+package org.apache.helix.examples;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.api.ClusterAccessor;
+import org.apache.helix.api.ClusterConfig;
+import org.apache.helix.api.ClusterId;
+import org.apache.helix.api.Id;
+import org.apache.helix.api.ParticipantConfig;
+import org.apache.helix.api.ParticipantId;
+import org.apache.helix.api.RebalancerConfig;
+import org.apache.helix.api.ResourceConfig;
+import org.apache.helix.api.ResourceId;
+import org.apache.helix.api.State;
+import org.apache.helix.api.StateModelDefId;
+import org.apache.helix.api.StateModelDefinitionAccessor;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.manager.zk.ZkClient;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.log4j.Logger;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+public class NewModelExample {
+ private static final Logger LOG = Logger.getLogger(NewModelExample.class);
+
+ public static void main(String[] args) {
+ if (args.length < 1) {
+ LOG.error("USAGE: NewModelExample zkAddress");
+ System.exit(1);
+ }
+ StateModelDefinition lockUnlock = getLockUnlockModel();
+ ResourceConfig resource = getResource(lockUnlock);
+ ParticipantConfig participant = getParticipant();
+ ClusterId clusterId = Id.cluster("exampleCluster");
+ ClusterConfig cluster =
+ new ClusterConfig.Builder(clusterId).addResource(resource).addParticipant(participant)
+ .build();
+ int timeOutInSec = Integer.parseInt(System.getProperty(ZKHelixAdmin.CONNECTION_TIMEOUT, "30"));
+ ZkClient zkClient = new ZkClient(args[0], timeOutInSec * 1000);
+ zkClient.setZkSerializer(new ZNRecordSerializer());
+ zkClient.waitUntilConnected(timeOutInSec, TimeUnit.SECONDS);
+ BaseDataAccessor<ZNRecord> baseDataAccessor = new ZkBaseDataAccessor<ZNRecord>(zkClient);
+ HelixDataAccessor accessor = new ZKHelixDataAccessor(clusterId.stringify(), baseDataAccessor);
+ persistStateModel(lockUnlock, accessor);
+ createCluster(cluster, accessor);
+ }
+
+ private static void persistStateModel(StateModelDefinition stateModelDef,
+ HelixDataAccessor helixAccessor) {
+ StateModelDefinitionAccessor accessor = new StateModelDefinitionAccessor(helixAccessor);
+ accessor.addStateModelDefinition(stateModelDef);
+ }
+
+ private static void createCluster(ClusterConfig cluster, HelixDataAccessor helixAccessor) {
+ ClusterAccessor accessor = new ClusterAccessor(cluster.getId(), helixAccessor);
+ accessor.createCluster(cluster);
+ }
+
+ private static ParticipantConfig getParticipant() {
+ ParticipantId participantId = Id.participant("localhost_0");
+ ParticipantConfig.Builder participantBuilder =
+ new ParticipantConfig.Builder(participantId).hostName("localhost").port(0);
+ return participantBuilder.build();
+ }
+
+ private static ResourceConfig getResource(StateModelDefinition stateModelDef) {
+ ResourceId resourceId = Id.resource("exampleResource");
+ RebalancerConfig.Builder rebalanceConfigBuilder =
+ new RebalancerConfig.Builder(resourceId).rebalancerMode(RebalanceMode.FULL_AUTO)
+ .replicaCount(3).addPartitions(5).stateModelDef(stateModelDef.getStateModelDefId());
+ ResourceConfig.Builder resourceBuilder =
+ new ResourceConfig.Builder(resourceId).rebalancerConfig(rebalanceConfigBuilder.build());
+ return resourceBuilder.build();
+ }
+
+ private static StateModelDefinition getLockUnlockModel() {
+ final State LOCKED = State.from("LOCKED");
+ final State RELEASED = State.from("RELEASED");
+ final State DROPPED = State.from("DROPPED");
+ StateModelDefId stateModelId = Id.stateModelDef("LockUnlock");
+ StateModelDefinition.Builder stateModelBuilder =
+ new StateModelDefinition.Builder(stateModelId).addState(LOCKED, 0).addState(RELEASED, 1)
+ .addState(DROPPED, 2).addTransition(RELEASED, LOCKED, 0)
+ .addTransition(LOCKED, RELEASED, 1).upperBound(LOCKED, 1).upperBound(RELEASED, -1)
+ .upperBound(DROPPED, -1).initialState(RELEASED);
+ return stateModelBuilder.build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d8ef5a2d/helix-examples/src/main/java/org/apache/helix/examples/Quickstart.java
----------------------------------------------------------------------
diff --git a/helix-examples/src/main/java/org/apache/helix/examples/Quickstart.java b/helix-examples/src/main/java/org/apache/helix/examples/Quickstart.java
index 0e12fc8..2ff7c2c 100644
--- a/helix-examples/src/main/java/org/apache/helix/examples/Quickstart.java
+++ b/helix-examples/src/main/java/org/apache/helix/examples/Quickstart.java
@@ -32,9 +32,11 @@ import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
+import org.apache.helix.api.Id;
+import org.apache.helix.api.State;
+import org.apache.helix.api.StateModelDefId;
import org.apache.helix.controller.HelixControllerMain;
import org.apache.helix.manager.zk.ZKHelixAdmin;
-import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.StateModelDefinition;
@@ -49,13 +51,13 @@ public class Quickstart {
private static final int NUM_PARTITIONS = 6;
private static final int NUM_REPLICAS = 2;
- private static final String STATE_MODEL_NAME = "MyStateModel";
+ private static final StateModelDefId STATE_MODEL_NAME = Id.stateModelDef("MyStateModel");
// states
- private static final String SLAVE = "SLAVE";
- private static final String OFFLINE = "OFFLINE";
- private static final String MASTER = "MASTER";
- private static final String DROPPED = "DROPPED";
+ private static final State SLAVE = State.from("SLAVE");
+ private static final State OFFLINE = State.from("OFFLINE");
+ private static final State MASTER = State.from("MASTER");
+ private static final State DROPPED = State.from("DROPPED");
private static List<InstanceConfig> INSTANCE_CONFIG_LIST;
private static List<MyProcess> PROCESS_LIST;
@@ -90,11 +92,11 @@ public class Quickstart {
// Add a state model
StateModelDefinition myStateModel = defineStateModel();
echo("Configuring StateModel: " + "MyStateModel with 1 Master and 1 Slave");
- admin.addStateModelDef(CLUSTER_NAME, STATE_MODEL_NAME, myStateModel);
+ admin.addStateModelDef(CLUSTER_NAME, STATE_MODEL_NAME.stringify(), myStateModel);
// Add a resource with 6 partitions and 2 replicas
echo("Adding a resource MyResource: " + "with 6 partitions and 2 replicas");
- admin.addResource(CLUSTER_NAME, RESOURCE_NAME, NUM_PARTITIONS, STATE_MODEL_NAME, "AUTO");
+ admin.addResource(CLUSTER_NAME, RESOURCE_NAME, NUM_PARTITIONS, STATE_MODEL_NAME.stringify(), "AUTO");
// this will set up the ideal state, it calculates the preference list for
// each partition similar to consistent hashing
admin.rebalance(CLUSTER_NAME, RESOURCE_NAME, NUM_REPLICAS);
@@ -250,7 +252,7 @@ public class Quickstart {
new MasterSlaveStateModelFactory(instanceName);
StateMachineEngine stateMach = manager.getStateMachineEngine();
- stateMach.registerStateModelFactory(STATE_MODEL_NAME, stateModelFactory);
+ stateMach.registerStateModelFactory(STATE_MODEL_NAME.stringify(), stateModelFactory);
manager.connect();
}
[2/2] git commit: [HELIX-100] improve helix config api,
namespaced user configs
Posted by zz...@apache.org.
[HELIX-100] improve helix config api, namespaced user configs
Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/d8ef5a2d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/d8ef5a2d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/d8ef5a2d
Branch: refs/heads/helix-logical-model
Commit: d8ef5a2d74771073b11bc2fc0b008c8357e8bde5
Parents: 7de6a7f
Author: zzhang <zz...@apache.org>
Authored: Thu Sep 12 14:13:37 2013 -0700
Committer: zzhang <zz...@apache.org>
Committed: Thu Sep 12 14:13:37 2013 -0700
----------------------------------------------------------------------
.../java/org/apache/helix/HelixProperty.java | 10 +
.../main/java/org/apache/helix/PropertyKey.java | 31 ++-
.../org/apache/helix/PropertyPathConfig.java | 3 +-
.../main/java/org/apache/helix/ZNRecord.java | 9 +
.../main/java/org/apache/helix/api/Cluster.java | 14 +-
.../org/apache/helix/api/ClusterAccessor.java | 65 ++++++-
.../org/apache/helix/api/ClusterConfig.java | 37 +++-
.../org/apache/helix/api/NamespacedConfig.java | 195 +++++++++++++++++++
.../java/org/apache/helix/api/Participant.java | 15 +-
.../apache/helix/api/ParticipantAccessor.java | 10 +-
.../org/apache/helix/api/ParticipantConfig.java | 27 ++-
.../java/org/apache/helix/api/Partition.java | 20 ++
.../org/apache/helix/api/RebalancerConfig.java | 111 ++++++++---
.../java/org/apache/helix/api/Resource.java | 21 +-
.../org/apache/helix/api/ResourceConfig.java | 82 +++-----
.../java/org/apache/helix/api/UserConfig.java | 52 +++++
.../stages/NewResourceComputationStage.java | 15 +-
.../helix/model/ClusterConfiguration.java | 23 ++-
.../java/org/apache/helix/model/IdealState.java | 8 +-
.../helix/model/PartitionConfiguration.java | 59 ++++++
.../helix/model/ResourceConfiguration.java | 60 ++++++
.../helix/model/StateModelDefinition.java | 38 ++--
.../org/apache/helix/api/TestUserConfig.java | 86 ++++++++
.../helix/controller/stages/BaseStageTest.java | 25 +--
.../TestBestPossibleCalcStageCompatibility.java | 10 +-
.../stages/TestBestPossibleStateCalcStage.java | 6 +-
.../TestCurrentStateComputationStage.java | 15 +-
.../stages/TestMsgSelectionStage.java | 11 +-
.../strategy/TestAutoRebalanceStrategy.java | 15 +-
.../strategy/TestNewAutoRebalanceStrategy.java | 16 +-
.../apache/helix/examples/NewModelExample.java | 113 +++++++++++
.../org/apache/helix/examples/Quickstart.java | 20 +-
32 files changed, 1031 insertions(+), 191 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d8ef5a2d/helix-core/src/main/java/org/apache/helix/HelixProperty.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/HelixProperty.java b/helix-core/src/main/java/org/apache/helix/HelixProperty.java
index 9d39400..9f1195f 100644
--- a/helix-core/src/main/java/org/apache/helix/HelixProperty.java
+++ b/helix-core/src/main/java/org/apache/helix/HelixProperty.java
@@ -27,6 +27,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.helix.api.UserConfig;
+
/**
* A wrapper class for ZNRecord. Used as a base class for IdealState, CurrentState, etc.
*/
@@ -226,6 +228,14 @@ public class HelixProperty {
}
/**
+ * Add user-defined configuration properties to this property
+ * @param userConfig UserConfig properties
+ */
+ public void addUserConfig(UserConfig userConfig) {
+ UserConfig.addConfigToProperty(this, userConfig);
+ }
+
+ /**
* Get property validity
* @return true if valid, false if invalid
*/
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d8ef5a2d/helix-core/src/main/java/org/apache/helix/PropertyKey.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/PropertyKey.java b/helix-core/src/main/java/org/apache/helix/PropertyKey.java
index d77209e..1733d39 100644
--- a/helix-core/src/main/java/org/apache/helix/PropertyKey.java
+++ b/helix-core/src/main/java/org/apache/helix/PropertyKey.java
@@ -47,6 +47,7 @@ import java.util.Arrays;
import org.apache.helix.model.AlertHistory;
import org.apache.helix.model.AlertStatus;
import org.apache.helix.model.Alerts;
+import org.apache.helix.model.ClusterConfiguration;
import org.apache.helix.model.ClusterConstraints;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.Error;
@@ -58,8 +59,10 @@ import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LeaderHistory;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Message;
+import org.apache.helix.model.PartitionConfiguration;
import org.apache.helix.model.PauseSignal;
import org.apache.helix.model.PersistentStats;
+import org.apache.helix.model.ResourceConfiguration;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.model.StatusUpdate;
import org.apache.log4j.Logger;
@@ -193,7 +196,7 @@ public class PropertyKey {
*/
public PropertyKey clusterConfigs() {
- return new PropertyKey(CONFIGS, ConfigScopeProperty.CLUSTER, HelixProperty.class,
+ return new PropertyKey(CONFIGS, ConfigScopeProperty.CLUSTER, ClusterConfiguration.class,
_clusterName, ConfigScopeProperty.CLUSTER.toString());
}
@@ -202,7 +205,7 @@ public class PropertyKey {
* @return {@link PropertyKey}
*/
public PropertyKey clusterConfig() {
- return new PropertyKey(CONFIGS, ConfigScopeProperty.CLUSTER, HelixProperty.class,
+ return new PropertyKey(CONFIGS, ConfigScopeProperty.CLUSTER, ClusterConfiguration.class,
_clusterName, ConfigScopeProperty.CLUSTER.toString(), _clusterName);
}
@@ -230,7 +233,7 @@ public class PropertyKey {
* @return {@link PropertyKey}
*/
public PropertyKey resourceConfigs() {
- return new PropertyKey(CONFIGS, ConfigScopeProperty.RESOURCE, HelixProperty.class,
+ return new PropertyKey(CONFIGS, ConfigScopeProperty.RESOURCE, ResourceConfiguration.class,
_clusterName, ConfigScopeProperty.RESOURCE.toString());
}
@@ -240,19 +243,29 @@ public class PropertyKey {
* @return {@link PropertyKey}
*/
public PropertyKey resourceConfig(String resourceName) {
- return new PropertyKey(CONFIGS, ConfigScopeProperty.RESOURCE, HelixProperty.class,
+ return new PropertyKey(CONFIGS, ConfigScopeProperty.RESOURCE, ResourceConfiguration.class,
_clusterName, ConfigScopeProperty.RESOURCE.toString(), resourceName);
}
/**
- * Get a property key associated with a partition
+ * Get a property key associated with all partition configurations
+ * @param resourceName
+ * @return {@link PropertyKey}
+ */
+ public PropertyKey partitionConfigs(String resourceName) {
+ return new PropertyKey(CONFIGS, ConfigScopeProperty.RESOURCE, PartitionConfiguration.class,
+ _clusterName, ConfigScopeProperty.RESOURCE.toString(), resourceName);
+ }
+
+ /**
+ * Get a property key associated with a partition configuration
* @param resourceName
* @param partitionName
* @return {@link PropertyKey}
*/
public PropertyKey partitionConfig(String resourceName, String partitionName) {
- return new PropertyKey(CONFIGS, ConfigScopeProperty.RESOURCE, HelixProperty.class,
- _clusterName, ConfigScopeProperty.RESOURCE.toString(), resourceName);
+ return new PropertyKey(CONFIGS, ConfigScopeProperty.RESOURCE, PartitionConfiguration.class,
+ _clusterName, ConfigScopeProperty.RESOURCE.toString(), resourceName, partitionName);
}
/**
@@ -264,8 +277,8 @@ public class PropertyKey {
*/
public PropertyKey partitionConfig(String instanceName, String resourceName,
String partitionName) {
- return new PropertyKey(CONFIGS, ConfigScopeProperty.RESOURCE, HelixProperty.class,
- _clusterName, ConfigScopeProperty.RESOURCE.toString(), resourceName);
+ return new PropertyKey(CONFIGS, ConfigScopeProperty.RESOURCE, PartitionConfiguration.class,
+ _clusterName, ConfigScopeProperty.RESOURCE.toString(), resourceName, partitionName);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d8ef5a2d/helix-core/src/main/java/org/apache/helix/PropertyPathConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/PropertyPathConfig.java b/helix-core/src/main/java/org/apache/helix/PropertyPathConfig.java
index ada24c7..96497e0 100644
--- a/helix-core/src/main/java/org/apache/helix/PropertyPathConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/PropertyPathConfig.java
@@ -80,7 +80,7 @@ public class PropertyPathConfig {
typeToClassMapping.put(PAUSE, PauseSignal.class);
// @formatter:off
- addEntry(PropertyType.CLUSTER, 0, "/{clusterName}");
+ addEntry(PropertyType.CLUSTER, 1, "/{clusterName}");
addEntry(PropertyType.CONFIGS, 1, "/{clusterName}/CONFIGS");
addEntry(PropertyType.CONFIGS, 2, "/{clusterName}/CONFIGS/{scope}");
addEntry(PropertyType.CONFIGS, 3, "/{clusterName}/CONFIGS/{scope}/{scopeKey}");
@@ -173,6 +173,7 @@ public class PropertyPathConfig {
String template = null;
if (templateMap.containsKey(type)) {
// keys.length+1 since we add clusterName
+ Map<Integer, String> fullTemplate = templateMap.get(type);
template = templateMap.get(type).get(keys.length + 1);
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d8ef5a2d/helix-core/src/main/java/org/apache/helix/ZNRecord.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/ZNRecord.java b/helix-core/src/main/java/org/apache/helix/ZNRecord.java
index 56a6cf2..37cd5eb 100644
--- a/helix-core/src/main/java/org/apache/helix/ZNRecord.java
+++ b/helix-core/src/main/java/org/apache/helix/ZNRecord.java
@@ -121,6 +121,15 @@ public class ZNRecord {
}
/**
+ * Get the {@link PayloadSerializer} that will serialize/deserialize the payload
+ * @return serializer
+ */
+ @JsonIgnore(true)
+ public PayloadSerializer getPayloadSerializer() {
+ return _serializer;
+ }
+
+ /**
* Set the list of updates to this ZNRecord
* @param deltaList
*/
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d8ef5a2d/helix-core/src/main/java/org/apache/helix/api/Cluster.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Cluster.java b/helix-core/src/main/java/org/apache/helix/api/Cluster.java
index ce2318a..ab95936 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Cluster.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Cluster.java
@@ -73,7 +73,8 @@ public class Cluster {
*/
public Cluster(ClusterId id, Map<ResourceId, Resource> resourceMap,
Map<ParticipantId, Participant> participantMap, Map<ControllerId, Controller> controllerMap,
- ControllerId leaderId, Map<ConstraintType, ClusterConstraints> constraintMap, boolean isPaused) {
+ ControllerId leaderId, Map<ConstraintType, ClusterConstraints> constraintMap,
+ UserConfig userConfig, boolean isPaused) {
// build the config
// Guava's transform and "copy" functions really return views so the maps all reflect each other
@@ -92,7 +93,8 @@ public class Cluster {
}
});
_config =
- new ClusterConfig(id, resourceConfigMap, participantConfigMap, constraintMap, isPaused);
+ new ClusterConfig(id, resourceConfigMap, participantConfigMap, constraintMap, userConfig,
+ isPaused);
_resourceMap = ImmutableMap.copyOf(resourceMap);
@@ -189,6 +191,14 @@ public class Cluster {
}
/**
+ * Get user-specified configuration properties of this cluster
+ * @return UserConfig properties
+ */
+ public UserConfig getUserConfig() {
+ return _config.getUserConfig();
+ }
+
+ /**
* Get a cluster constraint
* @param type the type of constrant to query
* @return cluster constraints, or null if none
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d8ef5a2d/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java
index 7fcbc37..59da8b7 100644
--- a/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/ClusterAccessor.java
@@ -38,7 +38,9 @@ import org.apache.helix.model.IdealState;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Message;
+import org.apache.helix.model.PartitionConfiguration;
import org.apache.helix.model.PauseSignal;
+import org.apache.helix.model.ResourceConfiguration;
import org.apache.log4j.Logger;
public class ClusterAccessor {
@@ -62,7 +64,7 @@ public class ClusterAccessor {
boolean created = _accessor.createProperty(_keyBuilder.cluster(), null);
if (!created) {
LOG.warn("Cluster already created. Aborting.");
- return false;
+ // return false;
}
Map<ResourceId, ResourceConfig> resources = cluster.getResourceMap();
for (ResourceConfig resource : resources.values()) {
@@ -77,8 +79,8 @@ public class ClusterAccessor {
_accessor.createProperty(_keyBuilder.constraint(constraints.getType().toString()),
constraints);
}
- _accessor
- .createProperty(_keyBuilder.clusterConfig(), new ClusterConfiguration(cluster.getId()));
+ _accessor.createProperty(_keyBuilder.clusterConfig(),
+ ClusterConfiguration.from(cluster.getUserConfig()));
if (cluster.isPaused()) {
pauseCluster();
}
@@ -169,26 +171,50 @@ public class ClusterAccessor {
Map<String, ExternalView> externalViewMap =
_accessor.getChildValuesMap(_keyBuilder.externalViews());
+ /**
+ * Map of resource id to user configuration
+ */
+ Map<String, ResourceConfiguration> resourceConfigMap =
+ _accessor.getChildValuesMap(_keyBuilder.resourceConfigs());
+
Map<ResourceId, Resource> resourceMap = new HashMap<ResourceId, Resource>();
for (String resourceName : idealStateMap.keySet()) {
IdealState idealState = idealStateMap.get(resourceName);
// TODO pass resource assignment
ResourceId resourceId = Id.resource(resourceName);
- resourceMap.put(resourceId,
- new Resource(resourceId, idealState, null, externalViewMap.get(resourceName),
- liveInstanceMap.size()));
+ UserConfig userConfig;
+ if (resourceConfigMap != null && resourceConfigMap.containsKey(resourceName)) {
+ userConfig = new UserConfig(resourceConfigMap.get(resourceName));
+ } else {
+ userConfig = new UserConfig(resourceId);
+ }
+
+ Map<String, PartitionConfiguration> partitionConfigMap =
+ _accessor.getChildValuesMap(_keyBuilder.partitionConfigs(resourceName));
+ if (partitionConfigMap != null) {
+ Map<PartitionId, UserConfig> partitionUserConfigs = new HashMap<PartitionId, UserConfig>();
+ for (String partitionName : partitionConfigMap.keySet()) {
+ partitionUserConfigs.put(Id.partition(partitionName),
+ UserConfig.from(partitionConfigMap.get(partitionName)));
+ }
+ resourceMap.put(resourceId,
+ new Resource(resourceId, idealState, null, externalViewMap.get(resourceName),
+ userConfig, partitionUserConfigs, liveInstanceMap.size()));
+ }
}
Map<ParticipantId, Participant> participantMap = new HashMap<ParticipantId, Participant>();
for (String participantName : instanceConfigMap.keySet()) {
InstanceConfig instanceConfig = instanceConfigMap.get(participantName);
+ UserConfig userConfig = UserConfig.from(instanceConfig);
LiveInstance liveInstance = liveInstanceMap.get(participantName);
Map<String, Message> instanceMsgMap = messageMap.get(participantName);
ParticipantId participantId = Id.participant(participantName);
participantMap.put(participantId, ParticipantAccessor.createParticipant(participantId,
- instanceConfig, liveInstance, instanceMsgMap, currentStateMap.get(participantName)));
+ instanceConfig, userConfig, liveInstance, instanceMsgMap,
+ currentStateMap.get(participantName)));
}
Map<ControllerId, Controller> controllerMap = new HashMap<ControllerId, Controller>();
@@ -208,8 +234,15 @@ public class ClusterAccessor {
PauseSignal pauseSignal = _accessor.getProperty(_keyBuilder.pause());
boolean isPaused = pauseSignal != null;
+ ClusterConfiguration clusterUserConfig = _accessor.getProperty(_keyBuilder.clusterConfig());
+ UserConfig userConfig;
+ if (clusterUserConfig != null) {
+ userConfig = new UserConfig(clusterUserConfig);
+ } else {
+ userConfig = new UserConfig(_clusterId);
+ }
return new Cluster(_clusterId, resourceMap, participantMap, controllerMap, leaderId,
- clusterConstraintMap, isPaused);
+ clusterConstraintMap, userConfig, isPaused);
}
/**
@@ -243,6 +276,12 @@ public class ClusterAccessor {
+ ", because resource ideal state already exists in cluster: " + _clusterId);
}
+ // Add resource user config
+ if (resource.getUserConfig() != null) {
+ ResourceConfiguration configuration = ResourceConfiguration.from(resource.getUserConfig());
+ _accessor.setProperty(_keyBuilder.resourceConfig(resourceId.stringify()), configuration);
+ }
+
// Create an IdealState from a RebalancerConfig
RebalancerConfig rebalancerConfig = resource.getRebalancerConfig();
IdealState idealState = new IdealState(resourceId);
@@ -263,6 +302,14 @@ public class ClusterAccessor {
if (preferenceMap != null) {
idealState.setParticipantStateMap(partitionId, preferenceMap);
}
+ Partition partition = resource.getPartition(partitionId);
+ if (partition.getUserConfig() != null) {
+ PartitionConfiguration partitionConfig =
+ PartitionConfiguration.from(partition.getUserConfig());
+ _accessor.setProperty(
+ _keyBuilder.partitionConfig(resourceId.stringify(), partitionId.stringify()),
+ partitionConfig);
+ }
}
idealState.setBucketSize(resource.getBucketSize());
idealState.setBatchMessageMode(resource.getBatchMessageMode());
@@ -329,6 +376,8 @@ public class ClusterAccessor {
instanceConfig.setHostName(participant.getHostName());
instanceConfig.setPort(Integer.toString(participant.getPort()));
instanceConfig.setInstanceEnabled(participant.isEnabled());
+ UserConfig userConfig = participant.getUserConfig();
+ instanceConfig.addUserConfig(userConfig);
Set<String> tags = participant.getTags();
for (String tag : tags) {
instanceConfig.addTag(tag);
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d8ef5a2d/helix-core/src/main/java/org/apache/helix/api/ClusterConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/api/ClusterConfig.java
index 1585aae..5e4a858 100644
--- a/helix-core/src/main/java/org/apache/helix/api/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/api/ClusterConfig.java
@@ -36,22 +36,26 @@ public class ClusterConfig {
private final Map<ResourceId, ResourceConfig> _resourceMap;
private final Map<ParticipantId, ParticipantConfig> _participantMap;
private final Map<ConstraintType, ClusterConstraints> _constraintMap;
+ private final UserConfig _userConfig;
private final boolean _isPaused;
/**
* Initialize a cluster configuration. Also see ClusterConfig.Builder
- * @param id
- * @param resourceMap
- * @param participantMap
- * @param constraintMap
+ * @param id cluster id
+ * @param resourceMap map of resource id to resource config
+ * @param participantMap map of participant id to participant config
+ * @param constraintMapmap of constraint type to all constraints of that type
+ * @param userConfig user-defined cluster properties
+ * @param isPaused true if paused, false if active
*/
public ClusterConfig(ClusterId id, Map<ResourceId, ResourceConfig> resourceMap,
Map<ParticipantId, ParticipantConfig> participantMap,
- Map<ConstraintType, ClusterConstraints> constraintMap, boolean isPaused) {
+ Map<ConstraintType, ClusterConstraints> constraintMap, UserConfig userConfig, boolean isPaused) {
_id = id;
_resourceMap = ImmutableMap.copyOf(resourceMap);
_participantMap = ImmutableMap.copyOf(participantMap);
_constraintMap = ImmutableMap.copyOf(constraintMap);
+ _userConfig = userConfig;
_isPaused = isPaused;
}
@@ -88,6 +92,14 @@ public class ClusterConfig {
}
/**
+ * Get user-specified configuration properties of this cluster
+ * @return UserConfig properties
+ */
+ public UserConfig getUserConfig() {
+ return _userConfig;
+ }
+
+ /**
* Check the pasued status of the cluster
* @return true if paused, false otherwise
*/
@@ -103,6 +115,7 @@ public class ClusterConfig {
private final Map<ResourceId, ResourceConfig> _resourceMap;
private final Map<ParticipantId, ParticipantConfig> _participantMap;
private final Map<ConstraintType, ClusterConstraints> _constraintMap;
+ private UserConfig _userConfig;
private boolean _isPaused;
/**
@@ -115,6 +128,7 @@ public class ClusterConfig {
_participantMap = new HashMap<ParticipantId, ParticipantConfig>();
_constraintMap = new HashMap<ConstraintType, ClusterConstraints>();
_isPaused = false;
+ _userConfig = new UserConfig(id);
}
/**
@@ -194,11 +208,22 @@ public class ClusterConfig {
}
/**
+ * Set the user configuration
+ * @param userConfig user-specified properties
+ * @return Builder
+ */
+ public Builder userConfig(UserConfig userConfig) {
+ _userConfig = userConfig;
+ return this;
+ }
+
+ /**
* Create the cluster configuration
* @return ClusterConfig
*/
public ClusterConfig build() {
- return new ClusterConfig(_id, _resourceMap, _participantMap, _constraintMap, _isPaused);
+ return new ClusterConfig(_id, _resourceMap, _participantMap, _constraintMap, _userConfig,
+ _isPaused);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d8ef5a2d/helix-core/src/main/java/org/apache/helix/api/NamespacedConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/NamespacedConfig.java b/helix-core/src/main/java/org/apache/helix/api/NamespacedConfig.java
new file mode 100644
index 0000000..0a46fa7
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/api/NamespacedConfig.java
@@ -0,0 +1,195 @@
+package org.apache.helix.api;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.HelixProperty;
+import org.apache.helix.ZNRecord;
+
+import com.google.common.base.Predicate;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Maps;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Generic configuration of Helix components prefixed with a namespace
+ */
+public abstract class NamespacedConfig extends ZNRecord {
+ private final String _prefix;
+
+ /**
+ * Instantiate a NamespacedConfig. It is intended for use only by entities that can be identified
+ * @param id id object
+ */
+ public NamespacedConfig(Id id, String prefix) {
+ super(id.stringify());
+ _prefix = prefix + '_';
+ }
+
+ /**
+ * Instantiate a UserConfig from an existing HelixProperty
+ * @param property property wrapping a configuration
+ */
+ public NamespacedConfig(HelixProperty property, String prefix) {
+ super(property.getRecord());
+ _prefix = prefix + '_';
+ // filter out any configuration that isn't user-defined
+ Predicate<String> keyFilter = new Predicate<String>() {
+ @Override
+ public boolean apply(String key) {
+ return key.contains(_prefix);
+ }
+ };
+ super.setMapFields(Maps.filterKeys(super.getMapFields(), keyFilter));
+ super.setListFields(Maps.filterKeys(super.getListFields(), keyFilter));
+ super.setSimpleFields(Maps.filterKeys(super.getSimpleFields(), keyFilter));
+ }
+
+ @Override
+ public void setMapField(String k, Map<String, String> v) {
+ super.setMapField(_prefix + k, v);
+ }
+
+ @Override
+ public Map<String, String> getMapField(String k) {
+ return super.getMapField(_prefix + k);
+ }
+
+ @Override
+ public void setMapFields(Map<String, Map<String, String>> mapFields) {
+ for (String k : mapFields.keySet()) {
+ setMapField(_prefix + k, mapFields.get(k));
+ }
+ }
+
+ /**
+ * Returns an immutable map of map fields
+ */
+ @Override
+ public Map<String, Map<String, String>> getMapFields() {
+ return convertToPrefixlessMap(super.getMapFields(), _prefix);
+ }
+
+ @Override
+ public void setListField(String k, List<String> v) {
+ super.setListField(_prefix + k, v);
+ }
+
+ @Override
+ public List<String> getListField(String k) {
+ return super.getListField(_prefix + k);
+ }
+
+ @Override
+ public void setListFields(Map<String, List<String>> listFields) {
+ for (String k : listFields.keySet()) {
+ setListField(_prefix + k, listFields.get(k));
+ }
+ }
+
+ /**
+ * Returns an immutable map of list fields
+ */
+ @Override
+ public Map<String, List<String>> getListFields() {
+ return convertToPrefixlessMap(super.getListFields(), _prefix);
+ }
+
+ @Override
+ public void setSimpleField(String k, String v) {
+ super.setSimpleField(_prefix + k, v);
+ }
+
+ @Override
+ public String getSimpleField(String k) {
+ return super.getSimpleField(_prefix + k);
+ }
+
+ @Override
+ public void setSimpleFields(Map<String, String> simpleFields) {
+ for (String k : simpleFields.keySet()) {
+ super.setSimpleField(_prefix + k, simpleFields.get(k));
+ }
+ }
+
+ /**
+ * Returns an immutable map of simple fields
+ */
+ @Override
+ public Map<String, String> getSimpleFields() {
+ return convertToPrefixlessMap(super.getSimpleFields(), _prefix);
+ }
+
+ /**
+ * Get all map fields with prefixed keys
+ * @return prefixed map fields
+ */
+ private Map<String, Map<String, String>> getPrefixedMapFields() {
+ return super.getMapFields();
+ }
+
+ /**
+ * Get all list fields with prefixed keys
+ * @return prefixed list fields
+ */
+ private Map<String, List<String>> getPrefixedListFields() {
+ return super.getListFields();
+ }
+
+ /**
+ * Get all simple fields with prefixed keys
+ * @return prefixed simple fields
+ */
+ private Map<String, String> getPrefixedSimpleFields() {
+ return super.getSimpleFields();
+ }
+
+ /**
+ * Add user configuration to an existing helix property.
+ * @param property the property to update
+ * @param config the user config
+ */
+ public static void addConfigToProperty(HelixProperty property, NamespacedConfig config) {
+ ZNRecord record = property.getRecord();
+ record.getMapFields().putAll(config.getPrefixedMapFields());
+ record.getListFields().putAll(config.getPrefixedListFields());
+ record.getSimpleFields().putAll(config.getPrefixedSimpleFields());
+ if (config.getRawPayload() != null && config.getRawPayload().length > 0) {
+ record.setPayloadSerializer(config.getPayloadSerializer());
+ record.setRawPayload(config.getRawPayload());
+ }
+ }
+
+ /**
+ * Get a copy of a map with the key prefix stripped. The resulting map is immutable
+ * @param rawMap map of key, value pairs where the key is prefixed
+ * @return map of key, value pairs where the key is not prefixed
+ */
+ private static <T> Map<String, T> convertToPrefixlessMap(Map<String, T> rawMap, String prefix) {
+ Map<String, T> convertedMap = new HashMap<String, T>();
+ for (String rawKey : rawMap.keySet()) {
+ String k = rawKey.substring(rawKey.indexOf(prefix) + 1);
+ convertedMap.put(k, rawMap.get(rawKey));
+ }
+ return ImmutableMap.copyOf(convertedMap);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d8ef5a2d/helix-core/src/main/java/org/apache/helix/api/Participant.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Participant.java b/helix-core/src/main/java/org/apache/helix/api/Participant.java
index 0c0cd12..45d0315 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Participant.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Participant.java
@@ -51,8 +51,11 @@ public class Participant {
*/
public Participant(ParticipantId id, String hostName, int port, boolean isEnabled,
Set<PartitionId> disabledPartitionIdSet, Set<String> tags, RunningInstance runningInstance,
- Map<ResourceId, CurrentState> currentStateMap, Map<MessageId, Message> messageMap) {
- _config = new ParticipantConfig(id, hostName, port, isEnabled, disabledPartitionIdSet, tags);
+ Map<ResourceId, CurrentState> currentStateMap, Map<MessageId, Message> messageMap,
+ UserConfig userConfig) {
+ _config =
+ new ParticipantConfig(id, hostName, port, isEnabled, disabledPartitionIdSet, tags,
+ userConfig);
_runningInstance = runningInstance;
_currentStateMap = ImmutableMap.copyOf(currentStateMap);
_messageMap = ImmutableMap.copyOf(messageMap);
@@ -140,6 +143,14 @@ public class Participant {
}
/**
+ * Get user-specified configuration properties of this participant
+ * @return UserConfig properties
+ */
+ public UserConfig getUserConfig() {
+ return _config.getUserConfig();
+ }
+
+ /**
* Get the participant id
* @return ParticipantId
*/
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d8ef5a2d/helix-core/src/main/java/org/apache/helix/api/ParticipantAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/ParticipantAccessor.java b/helix-core/src/main/java/org/apache/helix/api/ParticipantAccessor.java
index b9d06ef..bce5b2f 100644
--- a/helix-core/src/main/java/org/apache/helix/api/ParticipantAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/ParticipantAccessor.java
@@ -265,13 +265,14 @@ public class ParticipantAccessor {
* create a participant based on physical model
* @param participantId
* @param instanceConfig
+ * @param userConfig
* @param liveInstance
* @param instanceMsgMap map of message-id to message
* @param instanceCurStateMap map of resource-id to current-state
* @return participant
*/
static Participant createParticipant(ParticipantId participantId, InstanceConfig instanceConfig,
- LiveInstance liveInstance, Map<String, Message> instanceMsgMap,
+ UserConfig userConfig, LiveInstance liveInstance, Map<String, Message> instanceMsgMap,
Map<String, CurrentState> instanceCurStateMap) {
String hostName = instanceConfig.getHostName();
@@ -323,7 +324,7 @@ public class ParticipantAccessor {
}
return new Participant(participantId, hostName, port, isEnabled, disabledPartitionIdSet, tags,
- runningInstance, curStateMap, msgMap);
+ runningInstance, curStateMap, msgMap, userConfig);
}
/**
@@ -335,6 +336,7 @@ public class ParticipantAccessor {
// read physical model
String participantName = participantId.stringify();
InstanceConfig instanceConfig = _accessor.getProperty(_keyBuilder.instance(participantName));
+ UserConfig userConfig = UserConfig.from(instanceConfig);
LiveInstance liveInstance = _accessor.getProperty(_keyBuilder.liveInstance(participantName));
Map<String, Message> instanceMsgMap = Collections.emptyMap();
@@ -348,8 +350,8 @@ public class ParticipantAccessor {
sessionId.stringify()));
}
- return createParticipant(participantId, instanceConfig, liveInstance, instanceMsgMap,
- instanceCurStateMap);
+ return createParticipant(participantId, instanceConfig, userConfig, liveInstance,
+ instanceMsgMap, instanceCurStateMap);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d8ef5a2d/helix-core/src/main/java/org/apache/helix/api/ParticipantConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/ParticipantConfig.java b/helix-core/src/main/java/org/apache/helix/api/ParticipantConfig.java
index 3c77a40..7164882 100644
--- a/helix-core/src/main/java/org/apache/helix/api/ParticipantConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/api/ParticipantConfig.java
@@ -34,6 +34,7 @@ public class ParticipantConfig {
private final boolean _isEnabled;
private final Set<PartitionId> _disabledPartitions;
private final Set<String> _tags;
+ private final UserConfig _userConfig;
/**
* Initialize a participant configuration. Also see ParticipantConfig.Builder
@@ -45,13 +46,14 @@ public class ParticipantConfig {
* @param tags tags to set for the participant
*/
public ParticipantConfig(ParticipantId id, String hostName, int port, boolean isEnabled,
- Set<PartitionId> disabledPartitions, Set<String> tags) {
+ Set<PartitionId> disabledPartitions, Set<String> tags, UserConfig userConfig) {
_id = id;
_hostName = hostName;
_port = port;
_isEnabled = isEnabled;
_disabledPartitions = ImmutableSet.copyOf(disabledPartitions);
_tags = ImmutableSet.copyOf(tags);
+ _userConfig = userConfig;
}
/**
@@ -104,6 +106,14 @@ public class ParticipantConfig {
}
/**
+ * Get user-specified configuration properties of this participant
+ * @return UserConfig properties
+ */
+ public UserConfig getUserConfig() {
+ return _userConfig;
+ }
+
+ /**
* Get the participant id
* @return ParticipantId
*/
@@ -121,6 +131,7 @@ public class ParticipantConfig {
private boolean _isEnabled;
private final Set<PartitionId> _disabledPartitions;
private final Set<String> _tags;
+ private UserConfig _userConfig;
/**
* Build a participant with a given id
@@ -131,6 +142,7 @@ public class ParticipantConfig {
_disabledPartitions = new HashSet<PartitionId>();
_tags = new HashSet<String>();
_isEnabled = true;
+ _userConfig = new UserConfig(id);
}
/**
@@ -184,11 +196,22 @@ public class ParticipantConfig {
}
/**
+ * Set the user configuration
+ * @param userConfig user-specified properties
+ * @return Builder
+ */
+ public Builder userConfig(UserConfig userConfig) {
+ _userConfig = userConfig;
+ return this;
+ }
+
+ /**
* Assemble the participant
* @return instantiated Participant
*/
public ParticipantConfig build() {
- return new ParticipantConfig(_id, _hostName, _port, _isEnabled, _disabledPartitions, _tags);
+ return new ParticipantConfig(_id, _hostName, _port, _isEnabled, _disabledPartitions, _tags,
+ _userConfig);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d8ef5a2d/helix-core/src/main/java/org/apache/helix/api/Partition.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Partition.java b/helix-core/src/main/java/org/apache/helix/api/Partition.java
index 6586105..e76c8d1 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Partition.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Partition.java
@@ -25,6 +25,17 @@ package org.apache.helix.api;
public class Partition {
private final PartitionId _id;
+ private final UserConfig _userConfig;
+
+ /**
+ * Construct a partition with user-specified configuration
+ * @param id
+ * @param userConfig user-defined properties of this partition
+ */
+ public Partition(PartitionId id, UserConfig userConfig) {
+ _id = id;
+ _userConfig = userConfig;
+ }
/**
* Construct a partition
@@ -32,6 +43,7 @@ public class Partition {
*/
public Partition(PartitionId id) {
_id = id;
+ _userConfig = null;
}
/**
@@ -42,6 +54,14 @@ public class Partition {
return _id;
}
+ /**
+ * Get the user config of this partition
+ * @return UserConfig properties, or null if none
+ */
+ public UserConfig getUserConfig() {
+ return _userConfig;
+ }
+
@Override
public String toString() {
return _id.toString();
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d8ef5a2d/helix-core/src/main/java/org/apache/helix/api/RebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/RebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/api/RebalancerConfig.java
index ddd11d6..1f401c8 100644
--- a/helix-core/src/main/java/org/apache/helix/api/RebalancerConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/api/RebalancerConfig.java
@@ -19,9 +19,13 @@ package org.apache.helix.api;
* under the License.
*/
+import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.helix.HelixConstants;
import org.apache.helix.model.IdealState;
@@ -30,11 +34,12 @@ import org.apache.helix.model.ResourceAssignment;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
/**
* Captures the configuration properties necessary for rebalancing
*/
-public class RebalancerConfig {
+public class RebalancerConfig extends NamespacedConfig {
private final RebalanceMode _rebalancerMode;
private final RebalancerRef _rebalancerRef;
private final StateModelDefId _stateModelDefId;
@@ -46,14 +51,17 @@ public class RebalancerConfig {
private final String _participantGroupTag;
private final int _maxPartitionsPerParticipant;
private final StateModelFactoryId _stateModelFactoryId;
+ private final Map<PartitionId, Partition> _partitionMap;
/**
* Instantiate the configuration of a rebalance task
* @param idealState the physical ideal state
* @param resourceAssignment last mapping of a resource
*/
- public RebalancerConfig(IdealState idealState, ResourceAssignment resourceAssignment,
- int liveParticipantCount) {
+ public RebalancerConfig(Map<PartitionId, Partition> partitionMap, IdealState idealState,
+ ResourceAssignment resourceAssignment, int liveParticipantCount) {
+ super(idealState.getResourceId(), RebalancerConfig.class.getSimpleName());
+ _partitionMap = ImmutableMap.copyOf(partitionMap);
_rebalancerMode = idealState.getRebalanceMode();
_rebalancerRef = idealState.getRebalancerRef();
_stateModelDefId = idealState.getStateModelDefId();
@@ -92,6 +100,33 @@ public class RebalancerConfig {
}
/**
+ * Get the partitions of the resource
+ * @return map of partition id to partition or empty map if none
+ */
+ public Map<PartitionId, Partition> getPartitionMap() {
+ return _partitionMap;
+ }
+
+ /**
+ * Get a partition that the resource contains
+ * @param partitionId the partition id to look up
+ * @return Partition or null if none is present with the given id
+ */
+ public Partition getPartition(PartitionId partitionId) {
+ return _partitionMap.get(partitionId);
+ }
+
+ /**
+ * Get the set of partition ids that the resource contains
+ * @return partition id set, or empty if none
+ */
+ public Set<PartitionId> getPartitionSet() {
+ Set<PartitionId> partitionSet = new HashSet<PartitionId>();
+ partitionSet.addAll(_partitionMap.keySet());
+ return ImmutableSet.copyOf(partitionSet);
+ }
+
+ /**
* Get the rebalancer mode
* @return rebalancer mode
*/
@@ -191,17 +226,21 @@ public class RebalancerConfig {
* Assembles a RebalancerConfig
*/
public static class Builder {
+ private final ResourceId _id;
private final IdealState _idealState;
private boolean _anyLiveParticipant;
private ResourceAssignment _resourceAssignment;
+ private final Map<PartitionId, Partition> _partitionMap;
/**
* Configure the rebalancer for a resource
* @param resourceId the resource to rebalance
*/
public Builder(ResourceId resourceId) {
+ _id = resourceId;
_idealState = new IdealState(resourceId);
_anyLiveParticipant = false;
+ _partitionMap = new HashMap<PartitionId, Partition>();
}
/**
@@ -244,26 +283,6 @@ public class RebalancerConfig {
}
/**
- * Set bucket size
- * @param bucketSize
- * @return Builder
- */
- public Builder bucketSize(int bucketSize) {
- _idealState.setBucketSize(bucketSize);
- return this;
- }
-
- /**
- * Set batch message mode
- * @param batchMessageMode
- * @return Builder
- */
- public Builder batchMessageMode(boolean batchMessageMode) {
- _idealState.setBatchMessageMode(batchMessageMode);
- return this;
- }
-
- /**
* Set the number of replicas
* @param replicaCount number of replicas
* @return Builder
@@ -304,15 +323,55 @@ public class RebalancerConfig {
}
/**
+ * Add a partition that the resource serves
+ * @param partition fully-qualified partition
+ * @return Builder
+ */
+ public Builder addPartition(Partition partition) {
+ _partitionMap.put(partition.getId(), partition);
+ return this;
+ }
+
+ /**
+ * Add a collection of partitions
+ * @param partitions
+ * @return Builder
+ */
+ public Builder addPartitions(Collection<Partition> partitions) {
+ for (Partition partition : partitions) {
+ addPartition(partition);
+ }
+ return this;
+ }
+
+ /**
+ * Add a specified number of partitions with a default naming scheme, namely
+ * resourceId_partitionNumber where partitionNumber starts at 0
+ * These partitions are added without any user configuration properties
+ * @param partitionCount number of partitions to add
+ * @return Builder
+ */
+ public Builder addPartitions(int partitionCount) {
+ for (int i = 0; i < partitionCount; i++) {
+ addPartition(new Partition(Id.partition(_id, Integer.toString(i)), null));
+ }
+ return this;
+ }
+
+ /**
* Assemble a RebalancerConfig
* @return a fully defined rebalancer configuration
*/
public RebalancerConfig build() {
+ // add a single partition if one hasn't been added yet since 1 partition is default
+ if (_partitionMap.isEmpty()) {
+ addPartitions(1);
+ }
if (_anyLiveParticipant) {
- return new RebalancerConfig(_idealState, _resourceAssignment, Integer.parseInt(_idealState
- .getReplicas()));
+ return new RebalancerConfig(_partitionMap, _idealState, _resourceAssignment,
+ Integer.parseInt(_idealState.getReplicas()));
} else {
- return new RebalancerConfig(_idealState, _resourceAssignment, -1);
+ return new RebalancerConfig(_partitionMap, _idealState, _resourceAssignment, -1);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d8ef5a2d/helix-core/src/main/java/org/apache/helix/api/Resource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Resource.java b/helix-core/src/main/java/org/apache/helix/api/Resource.java
index 00f9169..a33f0d6 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Resource.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Resource.java
@@ -44,7 +44,8 @@ public class Resource {
* @param liveParticipantCount number of live participants in the system
*/
public Resource(ResourceId id, IdealState idealState, ResourceAssignment resourceAssignment,
- ExternalView externalView, int liveParticipantCount) {
+ ExternalView externalView, UserConfig userConfig,
+ Map<PartitionId, UserConfig> partitionUserConfigs, int liveParticipantCount) {
Map<PartitionId, Partition> partitionMap = new HashMap<PartitionId, Partition>();
Map<PartitionId, Map<String, String>> schedulerTaskConfigMap =
new HashMap<PartitionId, Map<String, String>>();
@@ -57,7 +58,11 @@ public class Resource {
}
}
for (PartitionId partitionId : partitionSet) {
- partitionMap.put(partitionId, new Partition(partitionId));
+ UserConfig partitionUserConfig = partitionUserConfigs.get(partitionId);
+ if (partitionUserConfig == null) {
+ partitionUserConfig = new UserConfig(partitionId);
+ }
+ partitionMap.put(partitionId, new Partition(partitionId, partitionUserConfig));
// TODO refactor it
Map<String, String> taskConfigMap = idealState.getInstanceStateMap(partitionId.stringify());
@@ -81,10 +86,10 @@ public class Resource {
SchedulerTaskConfig schedulerTaskConfig =
new SchedulerTaskConfig(transitionTimeoutMap, schedulerTaskConfigMap);
RebalancerConfig rebalancerConfig =
- new RebalancerConfig(idealState, resourceAssignment, liveParticipantCount);
+ new RebalancerConfig(partitionMap, idealState, resourceAssignment, liveParticipantCount);
_config =
- new ResourceConfig(id, partitionMap, schedulerTaskConfig, rebalancerConfig,
+ new ResourceConfig(id, schedulerTaskConfig, rebalancerConfig, userConfig,
idealState.getBucketSize(), idealState.getBatchMessageMode());
_externalView = externalView;
}
@@ -131,6 +136,14 @@ public class Resource {
}
/**
+ * Get user-specified configuration properties of this resource
+ * @return UserConfig properties
+ */
+ public UserConfig getUserConfig() {
+ return _config.getUserConfig();
+ }
+
+ /**
* Get the resource id
* @return ResourceId
*/
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d8ef5a2d/helix-core/src/main/java/org/apache/helix/api/ResourceConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/ResourceConfig.java b/helix-core/src/main/java/org/apache/helix/api/ResourceConfig.java
index a34a6a3..4be28b0 100644
--- a/helix-core/src/main/java/org/apache/helix/api/ResourceConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/api/ResourceConfig.java
@@ -1,14 +1,8 @@
package org.apache.helix.api;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.Map;
import java.util.Set;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@@ -33,9 +27,9 @@ import com.google.common.collect.ImmutableSet;
*/
public class ResourceConfig {
private final ResourceId _id;
- private final Map<PartitionId, Partition> _partitionMap;
private final RebalancerConfig _rebalancerConfig;
private final SchedulerTaskConfig _schedulerTaskConfig;
+ private final UserConfig _userConfig;
private final int _bucketSize;
private final boolean _batchMessageMode;
@@ -45,16 +39,17 @@ public class ResourceConfig {
* @param partitionMap map of partition identifiers to partition objects
* @param schedulerTaskConfig configuration for scheduler tasks associated with the resource
* @param rebalancerConfig configuration for rebalancing the resource
+ * @param userConfig user-defined resource properties
* @param bucketSize bucket size for this resource
- * @param whether or not batch messaging is allowed
+ * @param batchMessageMode whether or not batch messaging is allowed
*/
- public ResourceConfig(ResourceId id, Map<PartitionId, Partition> partitionMap,
- SchedulerTaskConfig schedulerTaskConfig, RebalancerConfig rebalancerConfig, int bucketSize,
+ public ResourceConfig(ResourceId id, SchedulerTaskConfig schedulerTaskConfig,
+ RebalancerConfig rebalancerConfig, UserConfig userConfig, int bucketSize,
boolean batchMessageMode) {
_id = id;
- _partitionMap = ImmutableMap.copyOf(partitionMap);
_schedulerTaskConfig = schedulerTaskConfig;
_rebalancerConfig = rebalancerConfig;
+ _userConfig = userConfig;
_bucketSize = bucketSize;
_batchMessageMode = batchMessageMode;
}
@@ -64,7 +59,7 @@ public class ResourceConfig {
* @return map of partition id to partition or empty map if none
*/
public Map<PartitionId, Partition> getPartitionMap() {
- return _partitionMap;
+ return _rebalancerConfig.getPartitionMap();
}
/**
@@ -73,7 +68,7 @@ public class ResourceConfig {
* @return Partition or null if none is present with the given id
*/
public Partition getPartition(PartitionId partitionId) {
- return _partitionMap.get(partitionId);
+ return _rebalancerConfig.getPartition(partitionId);
}
/**
@@ -81,9 +76,7 @@ public class ResourceConfig {
* @return partition id set, or empty if none
*/
public Set<PartitionId> getPartitionSet() {
- Set<PartitionId> partitionSet = new HashSet<PartitionId>();
- partitionSet.addAll(_partitionMap.keySet());
- return ImmutableSet.copyOf(partitionSet);
+ return _rebalancerConfig.getPartitionSet();
}
/**
@@ -111,6 +104,14 @@ public class ResourceConfig {
}
/**
+ * Get user-specified configuration properties of this resource
+ * @return UserConfig properties
+ */
+ public UserConfig getUserConfig() {
+ return _userConfig;
+ }
+
+ /**
* Get the bucket size for this resource
* @return bucket size
*/
@@ -128,7 +129,7 @@ public class ResourceConfig {
@Override
public String toString() {
- return _partitionMap.toString();
+ return _rebalancerConfig.getPartitionMap().toString();
}
/**
@@ -136,9 +137,9 @@ public class ResourceConfig {
*/
public static class Builder {
private final ResourceId _id;
- private final Map<PartitionId, Partition> _partitionMap;
private RebalancerConfig _rebalancerConfig;
private SchedulerTaskConfig _schedulerTaskConfig;
+ private UserConfig _userConfig;
private int _bucketSize;
private boolean _batchMessageMode;
@@ -148,53 +149,28 @@ public class ResourceConfig {
*/
public Builder(ResourceId id) {
_id = id;
- _partitionMap = new HashMap<PartitionId, Partition>();
_bucketSize = 0;
_batchMessageMode = false;
+ _userConfig = new UserConfig(id);
}
/**
- * Add a partition that the resource serves
- * @param partition fully-qualified partition
- * @return Builder
- */
- public Builder addPartition(Partition partition) {
- _partitionMap.put(partition.getId(), partition);
- return this;
- }
-
- /**
- * Add a collection of partitions
- * @param partitions
- * @return Builder
- */
- public Builder addPartitions(Collection<Partition> partitions) {
- for (Partition partition : partitions) {
- addPartition(partition);
- }
- return this;
- }
-
- /**
- * Add a specified number of partitions with a default naming scheme, namely
- * resourceId_partitionNumber where partitionNumber starts at 0
- * @param partitionCount number of partitions to add
+ * Set the rebalancer configuration
+ * @param rebalancerConfig properties of interest for rebalancing
* @return Builder
*/
- public Builder addPartitions(int partitionCount) {
- for (int i = 0; i < partitionCount; i++) {
- addPartition(new Partition(Id.partition(_id, Integer.toString(i))));
- }
+ public Builder rebalancerConfig(RebalancerConfig rebalancerConfig) {
+ _rebalancerConfig = rebalancerConfig;
return this;
}
/**
- * Set the rebalancer configuration
- * @param rebalancerConfig properties of interest for rebalancing
+ * Set the user configuration
+ * @param userConfig user-specified properties
* @return Builder
*/
- public Builder rebalancerConfig(RebalancerConfig rebalancerConfig) {
- _rebalancerConfig = rebalancerConfig;
+ public Builder userConfig(UserConfig userConfig) {
+ _userConfig = userConfig;
return this;
}
@@ -232,7 +208,7 @@ public class ResourceConfig {
* @return instantiated Resource
*/
public ResourceConfig build() {
- return new ResourceConfig(_id, _partitionMap, _schedulerTaskConfig, _rebalancerConfig,
+ return new ResourceConfig(_id, _schedulerTaskConfig, _rebalancerConfig, _userConfig,
_bucketSize, _batchMessageMode);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d8ef5a2d/helix-core/src/main/java/org/apache/helix/api/UserConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/UserConfig.java b/helix-core/src/main/java/org/apache/helix/api/UserConfig.java
new file mode 100644
index 0000000..6bef146
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/api/UserConfig.java
@@ -0,0 +1,52 @@
+package org.apache.helix.api;
+
+import org.apache.helix.HelixProperty;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Generic user-defined configuration of Helix components
+ */
+public class UserConfig extends NamespacedConfig {
+ /**
+ * Instantiate a UserConfig. It is intended for use only by entities that can be identified
+ * @param id id object
+ */
+ public UserConfig(Id id) {
+ super(id, UserConfig.class.getSimpleName());
+ }
+
+ /**
+ * Instantiate a UserConfig from an existing HelixProperty
+ * @param property property wrapping a configuration
+ */
+ public UserConfig(HelixProperty property) {
+ super(property, UserConfig.class.getSimpleName());
+ }
+
+ /**
+ * Get a UserConfig that filters out the user-specific configurations in a property
+ * @param property the property to check
+ * @return UserConfig
+ */
+ public static UserConfig from(HelixProperty property) {
+ return new UserConfig(property);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d8ef5a2d/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
index 50d319f..50a825b 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
@@ -19,10 +19,8 @@ package org.apache.helix.controller.stages;
* under the License.
*/
-import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.Map;
-import java.util.Set;
import org.apache.helix.api.Cluster;
import org.apache.helix.api.Participant;
@@ -63,8 +61,6 @@ public class NewResourceComputationStage extends AbstractBaseStage {
ResourceConfig.Builder resourceBuilder = new ResourceConfig.Builder(resourceId);
resourceBuilder.rebalancerConfig(rebalancerConfig);
- Set<Partition> partitionSet = new HashSet<Partition>(resource.getPartitionMap().values());
- resourceBuilder.addPartitions(partitionSet);
resourceBuilder.bucketSize(resource.getBucketSize());
resourceBuilder.batchMessageMode(resource.getBatchMessageMode());
resourceBuilder.schedulerTaskConfig(resource.getSchedulerTaskConfig());
@@ -91,17 +87,16 @@ public class NewResourceComputationStage extends AbstractBaseStage {
rebalancerConfigBuilder.stateModelDef(currentState.getStateModelDefId());
rebalancerConfigBuilder.stateModelFactoryId(new StateModelFactoryId(currentState
.getStateModelFactoryName()));
- rebalancerConfigBuilder.bucketSize(currentState.getBucketSize());
- rebalancerConfigBuilder.batchMessageMode(currentState.getBatchMessageMode());
+ for (PartitionId partitionId : currentState.getPartitionStateMap().keySet()) {
+ rebalancerConfigBuilder.addPartition(new Partition(partitionId));
+ }
ResourceConfig.Builder resourceBuilder = new ResourceConfig.Builder(resourceId);
resourceBuilder.rebalancerConfig(rebalancerConfigBuilder.build());
+ resourceBuilder.bucketSize(currentState.getBucketSize());
+ resourceBuilder.batchMessageMode(currentState.getBatchMessageMode());
resourceBuilderMap.put(resourceId, resourceBuilder);
}
-
- for (PartitionId partitionId : currentState.getPartitionStateMap().keySet()) {
- resourceBuilderMap.get(resourceId).addPartition(new Partition(partitionId));
- }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d8ef5a2d/helix-core/src/main/java/org/apache/helix/model/ClusterConfiguration.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ClusterConfiguration.java b/helix-core/src/main/java/org/apache/helix/model/ClusterConfiguration.java
index 04a2c82..c7fef8d 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ClusterConfiguration.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ClusterConfiguration.java
@@ -1,7 +1,10 @@
package org.apache.helix.model;
import org.apache.helix.HelixProperty;
+import org.apache.helix.ZNRecord;
import org.apache.helix.api.ClusterId;
+import org.apache.helix.api.Id;
+import org.apache.helix.api.UserConfig;
/*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -26,7 +29,6 @@ import org.apache.helix.api.ClusterId;
* Persisted configuration properties for a cluster
*/
public class ClusterConfiguration extends HelixProperty {
-
/**
* Instantiate for an id
* @param id cluster id
@@ -35,4 +37,23 @@ public class ClusterConfiguration extends HelixProperty {
super(id.stringify());
}
+ /**
+ * Instantiate from a record
+ * @param record configuration properties
+ */
+ public ClusterConfiguration(ZNRecord record) {
+ super(record);
+ }
+
+ /**
+ * Create a new ClusterConfiguration from a UserConfig
+ * @param userConfig user-defined configuration properties
+ * @return ClusterConfiguration
+ */
+ public static ClusterConfiguration from(UserConfig userConfig) {
+ ClusterConfiguration clusterConfiguration =
+ new ClusterConfiguration(Id.cluster(userConfig.getId()));
+ clusterConfiguration.addUserConfig(userConfig);
+ return clusterConfiguration;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d8ef5a2d/helix-core/src/main/java/org/apache/helix/model/IdealState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
index 16b3fa1..b2eccad 100644
--- a/helix-core/src/main/java/org/apache/helix/model/IdealState.java
+++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java
@@ -193,7 +193,11 @@ public class IdealState extends HelixProperty {
* @param rebalancerRef a reference to a user-defined rebalancer
*/
public void setRebalancerRef(RebalancerRef rebalancerRef) {
- setRebalancerClassName(rebalancerRef.toString());
+ if (rebalancerRef != null) {
+ setRebalancerClassName(rebalancerRef.toString());
+ } else {
+ setRebalancerClassName(null);
+ }
}
/**
@@ -542,7 +546,7 @@ public class IdealState extends HelixProperty {
break;
default:
replica = "0";
- logger.error("could NOT determine replicas. set to 0");
+ logger.warn("could NOT determine replicas. set to 0");
break;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d8ef5a2d/helix-core/src/main/java/org/apache/helix/model/PartitionConfiguration.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/PartitionConfiguration.java b/helix-core/src/main/java/org/apache/helix/model/PartitionConfiguration.java
new file mode 100644
index 0000000..44d6ac4
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/model/PartitionConfiguration.java
@@ -0,0 +1,59 @@
+package org.apache.helix.model;
+
+import org.apache.helix.HelixProperty;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.api.Id;
+import org.apache.helix.api.PartitionId;
+import org.apache.helix.api.UserConfig;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Persisted configuration properties for a partition
+ */
+public class PartitionConfiguration extends HelixProperty {
+ /**
+ * Instantiate for an id
+ * @param id partition id
+ */
+ public PartitionConfiguration(PartitionId id) {
+ super(id.stringify());
+ }
+
+ /**
+ * Instantiate from a record
+ * @param record configuration properties
+ */
+ public PartitionConfiguration(ZNRecord record) {
+ super(record);
+ }
+
+ /**
+ * Create a new PartitionConfiguration from a UserConfig
+ * @param userConfig user-defined configuration properties
+ * @return PartitionConfiguration
+ */
+ public static PartitionConfiguration from(UserConfig userConfig) {
+ PartitionConfiguration partitionConfiguration =
+ new PartitionConfiguration(Id.partition(userConfig.getId()));
+ partitionConfiguration.addUserConfig(userConfig);
+ return partitionConfiguration;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d8ef5a2d/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java b/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java
new file mode 100644
index 0000000..20c05a4
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java
@@ -0,0 +1,60 @@
+package org.apache.helix.model;
+
+import org.apache.helix.HelixProperty;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.api.Id;
+import org.apache.helix.api.ResourceId;
+import org.apache.helix.api.UserConfig;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * Persisted configuration properties for a resource
+ */
+public class ResourceConfiguration extends HelixProperty {
+ /**
+ * Instantiate for an id
+ * @param id resource id
+ */
+ public ResourceConfiguration(ResourceId id) {
+ super(id.stringify());
+ }
+
+ /**
+ * Instantiate from a record
+ * @param record configuration properties
+ */
+ public ResourceConfiguration(ZNRecord record) {
+ super(record);
+ }
+
+ /**
+ * Create a new ResourceConfiguration from a UserConfig
+ * @param userConfig user-defined configuration properties
+ * @return ResourceConfiguration
+ */
+ public static ResourceConfiguration from(UserConfig userConfig) {
+ ResourceConfiguration resourceConfiguration =
+ new ResourceConfiguration(Id.resource(userConfig.getId()));
+ resourceConfiguration.addUserConfig(userConfig);
+ return resourceConfiguration;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d8ef5a2d/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java b/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java
index f66616c..3c4de68 100644
--- a/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java
+++ b/helix-core/src/main/java/org/apache/helix/model/StateModelDefinition.java
@@ -30,7 +30,9 @@ import java.util.TreeMap;
import org.apache.helix.HelixDefinedState;
import org.apache.helix.HelixProperty;
import org.apache.helix.ZNRecord;
+import org.apache.helix.api.Id;
import org.apache.helix.api.State;
+import org.apache.helix.api.StateModelDefId;
import org.apache.helix.model.builder.StateTransitionTableBuilder;
import org.apache.log4j.Logger;
@@ -141,6 +143,14 @@ public class StateModelDefinition extends HelixProperty {
}
/**
+ * Get a concrete state model definition id
+ * @return StateModelDefId
+ */
+ public StateModelDefId getStateModelDefId() {
+ return Id.stateModelDef(getId());
+ }
+
+ /**
* Get an ordered priority list of transitions
* @return transitions in the form SRC-DEST, the first of which is highest priority
*/
@@ -275,8 +285,8 @@ public class StateModelDefinition extends HelixProperty {
* Start building a state model with a name
* @param name state model name
*/
- public Builder(String name) {
- this._statemodelName = name;
+ public Builder(StateModelDefId stateModelDefId) {
+ this._statemodelName = stateModelDefId.stringify();
statesMap = new HashMap<String, Integer>();
transitionMap = new HashMap<Transition, Integer>();
stateConstraintMap = new HashMap<String, String>();
@@ -287,8 +297,8 @@ public class StateModelDefinition extends HelixProperty {
* state is OFFLINE
* @param state
*/
- public Builder initialState(String initialState) {
- this.initialState = initialState;
+ public Builder initialState(State initialState) {
+ this.initialState = initialState.toString();
return this;
}
@@ -300,8 +310,8 @@ public class StateModelDefinition extends HelixProperty {
* Use -1 to indicates states with no constraints, like OFFLINE
* @param states
*/
- public Builder addState(String state, int priority) {
- statesMap.put(state, priority);
+ public Builder addState(State state, int priority) {
+ statesMap.put(state.toString(), priority);
return this;
}
@@ -309,7 +319,7 @@ public class StateModelDefinition extends HelixProperty {
* Sets the priority to Integer.MAX_VALUE
* @param state
*/
- public Builder addState(String state) {
+ public Builder addState(State state) {
addState(state, Integer.MAX_VALUE);
return this;
}
@@ -326,8 +336,8 @@ public class StateModelDefinition extends HelixProperty {
* @param priority priority, higher value is higher priority
* @return Builder
*/
- public Builder addTransition(String fromState, String toState, int priority) {
- transitionMap.put(new Transition(State.from(fromState), State.from(toState)), priority);
+ public Builder addTransition(State fromState, State toState, int priority) {
+ transitionMap.put(new Transition(fromState, toState), priority);
return this;
}
@@ -338,7 +348,7 @@ public class StateModelDefinition extends HelixProperty {
* @param toState
* @return Builder
*/
- public Builder addTransition(String fromState, String toState) {
+ public Builder addTransition(State fromState, State toState) {
addTransition(fromState, toState, Integer.MAX_VALUE);
return this;
}
@@ -349,8 +359,8 @@ public class StateModelDefinition extends HelixProperty {
* @param upperBound maximum
* @return Builder
*/
- public Builder upperBound(String state, int upperBound) {
- stateConstraintMap.put(state, String.valueOf(upperBound));
+ public Builder upperBound(State state, int upperBound) {
+ stateConstraintMap.put(state.toString(), String.valueOf(upperBound));
return this;
}
@@ -368,8 +378,8 @@ public class StateModelDefinition extends HelixProperty {
* @param bound
* @return Builder
*/
- public Builder dynamicUpperBound(String state, String bound) {
- stateConstraintMap.put(state, bound);
+ public Builder dynamicUpperBound(State state, String bound) {
+ stateConstraintMap.put(state.toString(), bound);
return this;
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d8ef5a2d/helix-core/src/test/java/org/apache/helix/api/TestUserConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/api/TestUserConfig.java b/helix-core/src/test/java/org/apache/helix/api/TestUserConfig.java
new file mode 100644
index 0000000..36cbf61
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/api/TestUserConfig.java
@@ -0,0 +1,86 @@
+package org.apache.helix.api;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.InstanceConfig.InstanceConfigProperty;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**
+ * A user config is a namespaced subset in the physical model and a separate entity in the logical
+ * model. These tests ensure that that separation is honored.
+ */
+public class TestUserConfig {
+ /**
+ * Ensure that user configs are separated from helix configs in properties that hold both
+ */
+ @Test
+ public void testUserConfigUpdates() {
+ final String testKey = "testKey";
+ final String prefixedKey = UserConfig.class.getSimpleName() + "_testKey";
+ final String testSimpleValue = "testValue";
+ final List<String> testListValue = ImmutableList.of("testValue");
+ final Map<String, String> testMapValue = ImmutableMap.of("testInnerKey", "testValue");
+
+ // first, add Helix configuration to an InstanceConfig
+ ParticipantId participantId = Id.participant("testParticipant");
+ InstanceConfig instanceConfig = new InstanceConfig(participantId);
+ instanceConfig.setHostName("localhost");
+
+ // now, add user configuration
+ UserConfig userConfig = new UserConfig(participantId);
+ userConfig.setSimpleField(testKey, testSimpleValue);
+ userConfig.setListField(testKey, testListValue);
+ userConfig.setMapField(testKey, testMapValue);
+
+ // add the user configuration to the Helix configuration
+ instanceConfig.addUserConfig(userConfig);
+
+ // get the user configuration back from the property
+ UserConfig retrievedConfig = UserConfig.from(instanceConfig);
+
+ // check that the property still has the host name
+ Assert.assertTrue(instanceConfig.getHostName().equals("localhost"));
+
+ // check that the retrieved config does not contain the host name
+ Assert.assertEquals(retrievedConfig.getStringField(
+ InstanceConfigProperty.HELIX_HOST.toString(), "not localhost"), "not localhost");
+
+ // check that both the retrieved config and the original config have the added properties
+ Assert.assertEquals(userConfig.getSimpleField(testKey), testSimpleValue);
+ Assert.assertEquals(userConfig.getListField(testKey), testListValue);
+ Assert.assertEquals(userConfig.getMapField(testKey), testMapValue);
+ Assert.assertEquals(retrievedConfig.getSimpleField(testKey), testSimpleValue);
+ Assert.assertEquals(retrievedConfig.getListField(testKey), testListValue);
+ Assert.assertEquals(retrievedConfig.getMapField(testKey), testMapValue);
+
+ // test that the property has the user config, but prefixed
+ Assert.assertEquals(instanceConfig.getRecord().getSimpleField(prefixedKey), testSimpleValue);
+ Assert.assertEquals(instanceConfig.getRecord().getListField(prefixedKey), testListValue);
+ Assert.assertEquals(instanceConfig.getRecord().getMapField(prefixedKey), testMapValue);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d8ef5a2d/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java b/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
index 2fbe1f5..f8494bd 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
@@ -33,6 +33,7 @@ import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.ZNRecord;
import org.apache.helix.api.Id;
import org.apache.helix.api.Partition;
+import org.apache.helix.api.PartitionId;
import org.apache.helix.api.RebalancerConfig;
import org.apache.helix.api.ResourceConfig;
import org.apache.helix.api.ResourceId;
@@ -161,19 +162,19 @@ public class BaseStageTest {
return defs;
}
- protected Map<ResourceId, ResourceConfig> getResourceMap() {
+ protected Map<ResourceId, ResourceConfig> getResourceMap(List<IdealState> idealStates) {
Map<ResourceId, ResourceConfig> resourceMap = new HashMap<ResourceId, ResourceConfig>();
- ResourceId resourceId = Id.resource("testResourceName");
- ResourceConfig.Builder builder = new ResourceConfig.Builder(resourceId);
- builder.addPartition(new Partition(Id.partition("testResourceName_0")));
- builder.addPartition(new Partition(Id.partition("testResourceName_1")));
- builder.addPartition(new Partition(Id.partition("testResourceName_2")));
- builder.addPartition(new Partition(Id.partition("testResourceName_3")));
- builder.addPartition(new Partition(Id.partition("testResourceName_4")));
- RebalancerConfig.Builder rebalancerConfigBuilder = new RebalancerConfig.Builder(resourceId);
- rebalancerConfigBuilder.stateModelDef(Id.stateModelDef("MasterSlave"));
- builder.rebalancerConfig(rebalancerConfigBuilder.build());
- resourceMap.put(Id.resource("testResourceName"), builder.build());
+ for (IdealState idealState : idealStates) {
+ ResourceId resourceId = idealState.getResourceId();
+ Map<PartitionId, Partition> partitionMap = new HashMap<PartitionId, Partition>();
+ for (PartitionId partitionId : idealState.getPartitionSet()) {
+ partitionMap.put(partitionId, new Partition(partitionId));
+ }
+ RebalancerConfig rebalancerConfig = new RebalancerConfig(partitionMap, idealState, null, 0);
+ ResourceConfig resourceConfig =
+ new ResourceConfig.Builder(resourceId).rebalancerConfig(rebalancerConfig).build();
+ resourceMap.put(resourceId, resourceConfig);
+ }
return resourceMap;
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d8ef5a2d/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java
index d024ef4..7dd2d45 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleCalcStageCompatibility.java
@@ -53,11 +53,12 @@ public class TestBestPossibleCalcStageCompatibility extends BaseStageTest {
String[] resources = new String[] {
"testResourceName"
};
- setupIdealStateDeprecated(5, resources, 10, 1, IdealStateModeProperty.AUTO);
+ List<IdealState> idealStates =
+ setupIdealStateDeprecated(5, resources, 10, 1, IdealStateModeProperty.AUTO);
setupLiveInstances(5);
Map<StateModelDefId, StateModelDefinition> stateModelDefs = setupStateModel();
- Map<ResourceId, ResourceConfig> resourceMap = getResourceMap();
+ Map<ResourceId, ResourceConfig> resourceMap = getResourceMap(idealStates);
NewCurrentStateOutput currentStateOutput = new NewCurrentStateOutput();
event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap);
event.addAttribute(AttributeName.CURRENT_STATE.toString(), currentStateOutput);
@@ -91,11 +92,12 @@ public class TestBestPossibleCalcStageCompatibility extends BaseStageTest {
String[] resources = new String[] {
"testResourceName"
};
- setupIdealStateDeprecated(5, resources, 10, 1, IdealStateModeProperty.CUSTOMIZED);
+ List<IdealState> idealStates =
+ setupIdealStateDeprecated(5, resources, 10, 1, IdealStateModeProperty.CUSTOMIZED);
setupLiveInstances(5);
Map<StateModelDefId, StateModelDefinition> stateModelDefs = setupStateModel();
- Map<ResourceId, ResourceConfig> resourceMap = getResourceMap();
+ Map<ResourceId, ResourceConfig> resourceMap = getResourceMap(idealStates);
NewCurrentStateOutput currentStateOutput = new NewCurrentStateOutput();
event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap);
event.addAttribute(AttributeName.CURRENT_STATE.toString(), currentStateOutput);
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d8ef5a2d/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java
index 1a76615..234c441 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestBestPossibleStateCalcStage.java
@@ -20,6 +20,7 @@ package org.apache.helix.controller.stages;
*/
import java.util.Date;
+import java.util.List;
import java.util.Map;
import org.apache.helix.api.Id;
@@ -28,6 +29,7 @@ import org.apache.helix.api.ResourceConfig;
import org.apache.helix.api.ResourceId;
import org.apache.helix.api.State;
import org.apache.helix.api.StateModelDefId;
+import org.apache.helix.model.IdealState;
import org.apache.helix.model.IdealState.RebalanceMode;
import org.apache.helix.model.StateModelDefinition;
import org.testng.AssertJUnit;
@@ -43,11 +45,11 @@ public class TestBestPossibleStateCalcStage extends BaseStageTest {
String[] resources = new String[] {
"testResourceName"
};
- setupIdealState(5, resources, 10, 1, RebalanceMode.SEMI_AUTO);
+ List<IdealState> idealStates = setupIdealState(5, resources, 10, 1, RebalanceMode.SEMI_AUTO);
setupLiveInstances(5);
Map<StateModelDefId, StateModelDefinition> stateModelDefs = setupStateModel();
- Map<ResourceId, ResourceConfig> resourceMap = getResourceMap();
+ Map<ResourceId, ResourceConfig> resourceMap = getResourceMap(idealStates);
NewCurrentStateOutput currentStateOutput = new NewCurrentStateOutput();
event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap);
event.addAttribute(AttributeName.CURRENT_STATE.toString(), currentStateOutput);
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d8ef5a2d/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
index 3f567ae..489537f 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCurrentStateComputationStage.java
@@ -19,6 +19,7 @@ package org.apache.helix.controller.stages;
* under the License.
*/
+import java.util.List;
import java.util.Map;
import org.apache.helix.PropertyKey.Builder;
@@ -28,6 +29,8 @@ import org.apache.helix.api.ResourceConfig;
import org.apache.helix.api.ResourceId;
import org.apache.helix.api.State;
import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
import org.apache.helix.model.Message;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;
@@ -36,7 +39,11 @@ public class TestCurrentStateComputationStage extends BaseStageTest {
@Test
public void testEmptyCS() {
- Map<ResourceId, ResourceConfig> resourceMap = getResourceMap();
+ String[] resources = new String[] {
+ "testResourceName"
+ };
+ List<IdealState> idealStates = setupIdealState(5, resources, 10, 1, RebalanceMode.SEMI_AUTO);
+ Map<ResourceId, ResourceConfig> resourceMap = getResourceMap(idealStates);
event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap);
NewCurrentStateComputationStage stage = new NewCurrentStateComputationStage();
runStage(event, new NewReadClusterDataStage());
@@ -50,7 +57,11 @@ public class TestCurrentStateComputationStage extends BaseStageTest {
@Test
public void testSimpleCS() {
// setup resource
- Map<ResourceId, ResourceConfig> resourceMap = getResourceMap();
+ String[] resources = new String[] {
+ "testResourceName"
+ };
+ List<IdealState> idealStates = setupIdealState(5, resources, 10, 1, RebalanceMode.SEMI_AUTO);
+ Map<ResourceId, ResourceConfig> resourceMap = getResourceMap(idealStates);
setupLiveInstances(5);
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/d8ef5a2d/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java
index dcd817a..28b1461 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java
@@ -37,6 +37,7 @@ import org.apache.helix.api.PartitionId;
import org.apache.helix.api.ResourceId;
import org.apache.helix.api.RunningInstance;
import org.apache.helix.api.State;
+import org.apache.helix.api.UserConfig;
import org.apache.helix.controller.stages.NewMessageSelectionStage.Bounds;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.Message;
@@ -59,10 +60,10 @@ public class TestMsgSelectionStage {
new RunningInstance(Id.session("session_1"), HelixVersion.from("1.2.3.4"), Id.process("1"));
liveInstances.put(Id.participant("localhost_0"), new Participant(Id.participant("localhost_0"),
"localhost", 0, true, disabledPartitions, tags, runningInstance0, currentStateMap,
- messageMap));
+ messageMap, new UserConfig(Id.participant("localhost_0"))));
liveInstances.put(Id.participant("localhost_1"), new Participant(Id.participant("localhost_1"),
"localhost", 1, true, disabledPartitions, tags, runningInstance1, currentStateMap,
- messageMap));
+ messageMap, new UserConfig(Id.participant("localhost_1"))));
Map<ParticipantId, State> currentStates = new HashMap<ParticipantId, State>();
currentStates.put(Id.participant("localhost_0"), State.from("SLAVE"));
@@ -109,10 +110,10 @@ public class TestMsgSelectionStage {
new RunningInstance(Id.session("session_1"), HelixVersion.from("1.2.3.4"), Id.process("1"));
liveInstances.put(Id.participant("localhost_0"), new Participant(Id.participant("localhost_0"),
"localhost", 0, true, disabledPartitions, tags, runningInstance0, currentStateMap,
- messageMap));
- liveInstances.put(Id.participant("localhost_0"), new Participant(Id.participant("localhost_1"),
+ messageMap, new UserConfig(Id.participant("localhost_0"))));
+ liveInstances.put(Id.participant("localhost_1"), new Participant(Id.participant("localhost_1"),
"localhost", 1, true, disabledPartitions, tags, runningInstance1, currentStateMap,
- messageMap));
+ messageMap, new UserConfig(Id.participant("localhost_1"))));
Map<ParticipantId, State> currentStates = new HashMap<ParticipantId, State>();
currentStates.put(Id.participant("localhost_0"), State.from("SLAVE"));