You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/07/28 23:19:59 UTC
[1/2] [HELIX-483] Simplify logical config classes
Repository: helix
Updated Branches:
refs/heads/master ce1e926c9 -> ff958b19f
http://git-wip-us.apache.org/repos/asf/helix/blob/ff958b19/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java
index 46c0fd9..c09e937 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestMsgSelectionStage.java
@@ -25,23 +25,17 @@ import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import org.apache.helix.TestHelper;
-import org.apache.helix.api.HelixVersion;
import org.apache.helix.api.Participant;
-import org.apache.helix.api.RunningInstance;
-import org.apache.helix.api.Scope;
import org.apache.helix.api.State;
-import org.apache.helix.api.config.UserConfig;
+import org.apache.helix.api.config.ParticipantConfig;
import org.apache.helix.api.id.MessageId;
import org.apache.helix.api.id.ParticipantId;
-import org.apache.helix.api.id.PartitionId;
-import org.apache.helix.api.id.ProcId;
import org.apache.helix.api.id.ResourceId;
-import org.apache.helix.api.id.SessionId;
import org.apache.helix.controller.stages.MessageSelectionStage.Bounds;
import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Message;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -51,25 +45,24 @@ public class TestMsgSelectionStage {
public void testMasterXfer() {
System.out.println("START testMasterXfer at " + new Date(System.currentTimeMillis()));
- Map<ParticipantId, Participant> liveInstances = new HashMap<ParticipantId, Participant>();
- Set<PartitionId> disabledPartitions = Collections.emptySet();
- Set<String> tags = Collections.emptySet();
+ Map<ParticipantId, Participant> liveParticipants = new HashMap<ParticipantId, Participant>();
Map<ResourceId, CurrentState> currentStateMap = Collections.emptyMap();
Map<MessageId, Message> messageMap = Collections.emptyMap();
- RunningInstance runningInstance0 =
- new RunningInstance(SessionId.from("session_0"), HelixVersion.from("1.2.3.4"),
- ProcId.from("0"));
- RunningInstance runningInstance1 =
- new RunningInstance(SessionId.from("session_1"), HelixVersion.from("1.2.3.4"),
- ProcId.from("1"));
- liveInstances.put(ParticipantId.from("localhost_0"),
- new Participant(ParticipantId.from("localhost_0"), "localhost", 0, true,
- disabledPartitions, tags, runningInstance0, currentStateMap, messageMap,
- new UserConfig(Scope.participant(ParticipantId.from("localhost_0"))), null));
- liveInstances.put(ParticipantId.from("localhost_1"),
- new Participant(ParticipantId.from("localhost_1"), "localhost", 1, true,
- disabledPartitions, tags, runningInstance1, currentStateMap, messageMap,
- new UserConfig(Scope.participant(ParticipantId.from("localhost_1"))), null));
+ ParticipantId participantId0 = ParticipantId.from("localhost_0");
+ ParticipantId participantId1 = ParticipantId.from("localhost_1");
+ LiveInstance[] liveInstances = new LiveInstance[2];
+ for (int i = 0; i < 2; i++) {
+ liveInstances[i] = new LiveInstance("localhost_" + i);
+ liveInstances[i].setSessionId("session_" + i);
+ liveInstances[i].setHelixVersion("1.2.3.4");
+ liveInstances[i].setLiveInstance(i + "@localhost");
+ }
+ liveParticipants.put(participantId0, new Participant(new ParticipantConfig.Builder(
+ participantId0).hostName("localhost").port(0).build(), liveInstances[0], currentStateMap,
+ messageMap, null));
+ liveParticipants.put(participantId1, new Participant(new ParticipantConfig.Builder(
+ participantId1).hostName("localhost").port(1).build(), liveInstances[1], currentStateMap,
+ messageMap, null));
Map<ParticipantId, State> currentStates = new HashMap<ParticipantId, State>();
currentStates.put(ParticipantId.from("localhost_0"), State.from("SLAVE"));
@@ -92,7 +85,7 @@ public class TestMsgSelectionStage {
stateTransitionPriorities.put("SLAVE-MASTER", 1);
List<Message> selectedMsg =
- new MessageSelectionStage().selectMessages(liveInstances, currentStates, pendingStates,
+ new MessageSelectionStage().selectMessages(liveParticipants, currentStates, pendingStates,
messages, stateConstraints, stateTransitionPriorities, State.from("OFFLINE"));
Assert.assertEquals(selectedMsg.size(), 1);
@@ -105,25 +98,24 @@ public class TestMsgSelectionStage {
System.out.println("START testMasterXferAfterMasterResume at "
+ new Date(System.currentTimeMillis()));
- Map<ParticipantId, Participant> liveInstances = new HashMap<ParticipantId, Participant>();
- Set<PartitionId> disabledPartitions = Collections.emptySet();
- Set<String> tags = Collections.emptySet();
+ Map<ParticipantId, Participant> liveParticipants = new HashMap<ParticipantId, Participant>();
Map<ResourceId, CurrentState> currentStateMap = Collections.emptyMap();
Map<MessageId, Message> messageMap = Collections.emptyMap();
- RunningInstance runningInstance0 =
- new RunningInstance(SessionId.from("session_0"), HelixVersion.from("1.2.3.4"),
- ProcId.from("0"));
- RunningInstance runningInstance1 =
- new RunningInstance(SessionId.from("session_1"), HelixVersion.from("1.2.3.4"),
- ProcId.from("1"));
- liveInstances.put(ParticipantId.from("localhost_0"),
- new Participant(ParticipantId.from("localhost_0"), "localhost", 0, true,
- disabledPartitions, tags, runningInstance0, currentStateMap, messageMap,
- new UserConfig(Scope.participant(ParticipantId.from("localhost_0"))), null));
- liveInstances.put(ParticipantId.from("localhost_1"),
- new Participant(ParticipantId.from("localhost_1"), "localhost", 1, true,
- disabledPartitions, tags, runningInstance1, currentStateMap, messageMap,
- new UserConfig(Scope.participant(ParticipantId.from("localhost_1"))), null));
+ ParticipantId participantId0 = ParticipantId.from("localhost_0");
+ ParticipantId participantId1 = ParticipantId.from("localhost_1");
+ LiveInstance[] liveInstances = new LiveInstance[2];
+ for (int i = 0; i < 2; i++) {
+ liveInstances[i] = new LiveInstance("localhost_" + i);
+ liveInstances[i].setSessionId("session_" + i);
+ liveInstances[i].setHelixVersion("1.2.3.4");
+ liveInstances[i].setLiveInstance(i + "@localhost");
+ }
+ liveParticipants.put(participantId0, new Participant(new ParticipantConfig.Builder(
+ participantId0).hostName("localhost").port(0).build(), liveInstances[0], currentStateMap,
+ messageMap, null));
+ liveParticipants.put(participantId1, new Participant(new ParticipantConfig.Builder(
+ participantId1).hostName("localhost").port(1).build(), liveInstances[1], currentStateMap,
+ messageMap, null));
Map<ParticipantId, State> currentStates = new HashMap<ParticipantId, State>();
currentStates.put(ParticipantId.from("localhost_0"), State.from("SLAVE"));
@@ -145,7 +137,7 @@ public class TestMsgSelectionStage {
stateTransitionPriorities.put("SLAVE-MASTER", 1);
List<Message> selectedMsg =
- new MessageSelectionStage().selectMessages(liveInstances, currentStates, pendingStates,
+ new MessageSelectionStage().selectMessages(liveParticipants, currentStates, pendingStates,
messages, stateConstraints, stateTransitionPriorities, State.from("OFFLINE"));
Assert.assertEquals(selectedMsg.size(), 0);
http://git-wip-us.apache.org/repos/asf/helix/blob/ff958b19/helix-core/src/test/java/org/apache/helix/controller/strategy/TestNewAutoRebalanceStrategy.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestNewAutoRebalanceStrategy.java b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestNewAutoRebalanceStrategy.java
index c8ec90a..4535df5 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/strategy/TestNewAutoRebalanceStrategy.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/strategy/TestNewAutoRebalanceStrategy.java
@@ -36,10 +36,9 @@ import java.util.TreeSet;
import org.apache.helix.HelixDefinedState;
import org.apache.helix.ZNRecord;
import org.apache.helix.api.Participant;
-import org.apache.helix.api.Scope;
import org.apache.helix.api.State;
import org.apache.helix.api.config.ClusterConfig;
-import org.apache.helix.api.config.UserConfig;
+import org.apache.helix.api.config.ParticipantConfig;
import org.apache.helix.api.id.ClusterId;
import org.apache.helix.api.id.MessageId;
import org.apache.helix.api.id.ParticipantId;
@@ -225,17 +224,10 @@ public class TestNewAutoRebalanceStrategy {
ClusterId clusterId = ClusterId.from("clusterId");
ClusterConfig.Builder clusterConfigBuilder =
new ClusterConfig.Builder(clusterId).addStateModelDefinition(_stateModelDef);
- for (State state : _stateModelDef.getTypedStatesPriorityList()) {
- clusterConfigBuilder.addStateUpperBoundConstraint(Scope.cluster(clusterId),
- _stateModelDef.getStateModelDefId(), state,
- _stateModelDef.getNumParticipantsPerState(state));
- }
ClusterConfig clusterConfig = clusterConfigBuilder.build();
for (String partition : _partitions) {
PartitionId partitionId = PartitionId.from(partition);
Set<ParticipantId> disabledParticipantsForPartition = Collections.emptySet();
- Set<PartitionId> disabledPartitionIdSet = Collections.emptySet();
- Set<String> tags = Collections.emptySet();
Map<MessageId, Message> messageMap = Collections.emptyMap();
Map<ResourceId, CurrentState> currentStateMap = Collections.emptyMap();
Map<ParticipantId, Participant> liveParticipantMap =
@@ -243,10 +235,10 @@ public class TestNewAutoRebalanceStrategy {
// set up some participants
for (String nodeName : _liveNodes) {
ParticipantId participantId = ParticipantId.from(nodeName);
+ ParticipantConfig participantConfig =
+ new ParticipantConfig.Builder(participantId).hostName("hostname").port(0).build();
Participant participant =
- new Participant(participantId, "hostname", 0, true, disabledPartitionIdSet, tags,
- null, currentStateMap, messageMap, new UserConfig(
- Scope.participant(participantId)), null);
+ new Participant(participantConfig, null, currentStateMap, messageMap, null);
liveParticipantMap.put(participantId, participant);
}
List<ParticipantId> participantPreferenceList =
http://git-wip-us.apache.org/repos/asf/helix/blob/ff958b19/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
index 911832a..e39615d 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
@@ -88,7 +88,8 @@ public class TestTaskRebalancer extends ZkTestBase {
// Set up target db
_setupTool.addResourceToCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, NUM_PARTITIONS,
MASTER_SLAVE_STATE_MODEL);
- _setupTool.rebalanceStorageCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, NUM_REPLICAS);
+ _setupTool
+ .rebalanceStorageCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, NUM_REPLICAS);
Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
taskFactoryReg.put("Reindex", new TaskFactory() {
http://git-wip-us.apache.org/repos/asf/helix/blob/ff958b19/helix-examples/src/main/java/org/apache/helix/examples/LogicalModelExample.java
----------------------------------------------------------------------
diff --git a/helix-examples/src/main/java/org/apache/helix/examples/LogicalModelExample.java b/helix-examples/src/main/java/org/apache/helix/examples/LogicalModelExample.java
index 163ac5e..9e897a0 100644
--- a/helix-examples/src/main/java/org/apache/helix/examples/LogicalModelExample.java
+++ b/helix-examples/src/main/java/org/apache/helix/examples/LogicalModelExample.java
@@ -90,11 +90,7 @@ public class LogicalModelExample {
new ClusterConfig.Builder(clusterId).addResource(resource).addParticipant(participant)
.addStateModelDefinition(lockUnlock).userConfig(userConfig).autoJoin(true);
- // add a state constraint that is more restrictive than what is in the state model
- clusterBuilder.addStateUpperBoundConstraint(Scope.cluster(clusterId),
- lockUnlock.getStateModelDefId(), State.from("LOCKED"), 1);
-
- // add a transition constraint (this time with a resource scope)
+ // add a transition constraint (with a resource scope)
clusterBuilder.addTransitionConstraint(Scope.resource(resource.getId()),
lockUnlock.getStateModelDefId(),
Transition.from(State.from("RELEASED"), State.from("LOCKED")), 1);
http://git-wip-us.apache.org/repos/asf/helix/blob/ff958b19/helix-provisioning/src/main/java/org/apache/helix/provisioning/tools/ContainerAdmin.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/tools/ContainerAdmin.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/tools/ContainerAdmin.java
index fcc137b..b84fae6 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/tools/ContainerAdmin.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/tools/ContainerAdmin.java
@@ -62,7 +62,7 @@ public class ContainerAdmin {
if (participant != null && participant.isAlive()) {
Message message = new Message(MessageType.SHUTDOWN, UUID.randomUUID().toString());
message.setTgtName(participant.getId().toString());
- message.setTgtSessionId(participant.getRunningInstance().getSessionId());
+ message.setTgtSessionId(participant.getLiveInstance().getSessionId());
message.setMsgId(message.getId());
dataAccessor.createProperty(
dataAccessor.keyBuilder().message(participant.getId().toString(), message.getId()),
http://git-wip-us.apache.org/repos/asf/helix/blob/ff958b19/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/NMCallbackHandler.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/NMCallbackHandler.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/NMCallbackHandler.java
index 7d7883e..558d033 100644
--- a/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/NMCallbackHandler.java
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/yarn/NMCallbackHandler.java
@@ -86,18 +86,18 @@ class NMCallbackHandler implements NMClientAsync.CallbackHandler {
@Override
public void onStartContainerError(ContainerId containerId, Throwable t) {
- LOG.error("Failed to start Container " + containerId);
+ LOG.error("Failed to start Container " + containerId, t);
containers.remove(containerId);
}
@Override
public void onGetContainerStatusError(ContainerId containerId, Throwable t) {
- LOG.error("Failed to query the status of Container " + containerId);
+ LOG.error("Failed to query the status of Container " + containerId, t);
}
@Override
public void onStopContainerError(ContainerId containerId, Throwable t) {
- LOG.error("Failed to stop Container " + containerId);
+ LOG.error("Failed to stop Container " + containerId, t);
containers.remove(containerId);
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/ff958b19/recipes/jobrunner-yarn/src/main/java/org/apache/helix/provisioning/yarn/example/JobRunnerMain.java
----------------------------------------------------------------------
diff --git a/recipes/jobrunner-yarn/src/main/java/org/apache/helix/provisioning/yarn/example/JobRunnerMain.java b/recipes/jobrunner-yarn/src/main/java/org/apache/helix/provisioning/yarn/example/JobRunnerMain.java
index ddbf27a..e588ea8 100644
--- a/recipes/jobrunner-yarn/src/main/java/org/apache/helix/provisioning/yarn/example/JobRunnerMain.java
+++ b/recipes/jobrunner-yarn/src/main/java/org/apache/helix/provisioning/yarn/example/JobRunnerMain.java
@@ -34,12 +34,12 @@ import org.apache.helix.HelixRole;
import org.apache.helix.InstanceType;
import org.apache.helix.api.Cluster;
import org.apache.helix.api.Participant;
-import org.apache.helix.api.RunningInstance;
import org.apache.helix.api.accessor.ClusterAccessor;
import org.apache.helix.api.config.ContainerConfig;
import org.apache.helix.api.id.ClusterId;
import org.apache.helix.api.id.Id;
import org.apache.helix.manager.zk.HelixConnectionAdaptor;
+import org.apache.helix.model.LiveInstance;
import org.apache.helix.provisioning.ApplicationSpec;
import org.apache.helix.provisioning.ApplicationSpecFactory;
import org.apache.helix.provisioning.HelixYarnUtil;
@@ -138,8 +138,8 @@ public class JobRunnerMain {
+ containerConfig.getState());
}
if (participant.isAlive()) {
- RunningInstance runningInstance = participant.getRunningInstance();
- System.out.println("\tProcess: " + runningInstance.getPid());
+ LiveInstance runningInstance = participant.getLiveInstance();
+ System.out.println("\tProcess: " + runningInstance.getProcessId());
}
}
System.out.println("----------------");
[2/2] git commit: [HELIX-483] Simplify logical config classes
Posted by ka...@apache.org.
[HELIX-483] Simplify logical config classes
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/ff958b19
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/ff958b19
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/ff958b19
Branch: refs/heads/master
Commit: ff958b19f0911ad337439616ada746eee7c95e23
Parents: ce1e926
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Fri Jul 25 16:33:35 2014 -0700
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Fri Jul 25 16:33:35 2014 -0700
----------------------------------------------------------------------
.../main/java/org/apache/helix/api/Cluster.java | 13 --
.../java/org/apache/helix/api/Controller.java | 16 +-
.../java/org/apache/helix/api/Participant.java | 22 ++-
.../java/org/apache/helix/api/Resource.java | 39 +----
.../org/apache/helix/api/RunningInstance.java | 69 --------
.../helix/api/accessor/ClusterAccessor.java | 92 ++---------
.../apache/helix/api/config/ClusterConfig.java | 161 -------------------
.../helix/api/config/ParticipantConfig.java | 82 +++++-----
.../apache/helix/api/config/ResourceConfig.java | 139 ++--------------
.../util/ConstraintBasedAssignment.java | 5 +-
.../stages/CompatibilityCheckStage.java | 4 +-
.../stages/ContainerProvisioningStage.java | 2 +-
.../stages/CurrentStateComputationStage.java | 5 +-
.../stages/ExternalViewComputeStage.java | 4 +-
.../stages/MessageGenerationStage.java | 9 +-
.../stages/MessageSelectionStage.java | 5 +-
.../stages/ResourceComputationStage.java | 11 +-
.../controller/stages/TaskAssignmentStage.java | 5 +-
.../org/apache/helix/model/InstanceConfig.java | 7 +-
.../helix/model/ResourceConfiguration.java | 18 ---
.../org/apache/helix/task/TaskRebalancer.java | 4 +-
.../apache/helix/api/TestNamespacedConfig.java | 5 +-
.../org/apache/helix/api/TestUpdateConfig.java | 55 ++-----
.../helix/api/accessor/TestAtomicAccessors.java | 19 +--
.../helix/controller/stages/BaseStageTest.java | 11 +-
.../stages/TestMsgSelectionStage.java | 80 +++++----
.../strategy/TestNewAutoRebalanceStrategy.java | 16 +-
.../integration/task/TestTaskRebalancer.java | 3 +-
.../helix/examples/LogicalModelExample.java | 6 +-
.../provisioning/tools/ContainerAdmin.java | 2 +-
.../provisioning/yarn/NMCallbackHandler.java | 6 +-
.../yarn/example/JobRunnerMain.java | 6 +-
32 files changed, 196 insertions(+), 725 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/ff958b19/helix-core/src/main/java/org/apache/helix/api/Cluster.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Cluster.java b/helix-core/src/main/java/org/apache/helix/api/Cluster.java
index adaf200..421ff60 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Cluster.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Cluster.java
@@ -252,19 +252,6 @@ public class Cluster {
}
/**
- * Get the maximum number of participants that can be in a state
- * @param scope the scope for the bound
- * @param stateModelDefId the state model of the state
- * @param state the constrained state
- * @return The upper bound, which can be "-1" if unspecified, a numerical upper bound, "R" for
- * number of replicas, or "N" for number of participants
- */
- public String getStateUpperBoundConstraint(Scope<?> scope, StateModelDefId stateModelDefId,
- State state) {
- return _config.getStateUpperBoundConstraint(scope, stateModelDefId, state);
- }
-
- /**
* Get the limit of simultaneous execution of a transition
* @param scope the scope under which the transition is constrained
* @param stateModelDefId the state model of which the transition is a part
http://git-wip-us.apache.org/repos/asf/helix/blob/ff958b19/helix-core/src/main/java/org/apache/helix/api/Controller.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Controller.java b/helix-core/src/main/java/org/apache/helix/api/Controller.java
index c94a6e1..d4cfaf3 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Controller.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Controller.java
@@ -27,7 +27,7 @@ import org.apache.helix.model.LiveInstance;
*/
public class Controller {
private final ControllerId _id;
- private final RunningInstance _runningInstance;
+ private final LiveInstance _liveInstance;
private final boolean _isLeader;
/**
@@ -36,15 +36,7 @@ public class Controller {
*/
public Controller(ControllerId id, LiveInstance liveInstance, boolean isLeader) {
_id = id;
-
- if (liveInstance != null) {
- _runningInstance =
- new RunningInstance(liveInstance.getTypedSessionId(),
- liveInstance.getTypedHelixVersion(), liveInstance.getProcessId());
- } else {
- _runningInstance = null;
- }
-
+ _liveInstance = liveInstance;
_isLeader = isLeader;
}
@@ -68,7 +60,7 @@ public class Controller {
* Get the running instance
* @return running instance or null if not running
*/
- public RunningInstance getRunningInstance() {
- return _runningInstance;
+ public LiveInstance getLiveInstance() {
+ return _liveInstance;
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/ff958b19/helix-core/src/main/java/org/apache/helix/api/Participant.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Participant.java b/helix-core/src/main/java/org/apache/helix/api/Participant.java
index 3ed395b..ad6811b 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Participant.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Participant.java
@@ -30,6 +30,7 @@ import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.api.id.PartitionId;
import org.apache.helix.api.id.ResourceId;
import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Message;
import com.google.common.collect.ImmutableMap;
@@ -39,10 +40,10 @@ import com.google.common.collect.ImmutableMap;
*/
public class Participant {
private final ParticipantConfig _config;
-
+
private final ContainerConfig _containerConfig;
- private final RunningInstance _runningInstance;
+ private final LiveInstance _liveInstance;
/**
* map of resource-id to current-state
@@ -58,15 +59,12 @@ public class Participant {
* Construct a participant
* @param config
*/
- public Participant(ParticipantId id, String hostName, int port, boolean isEnabled,
- Set<PartitionId> disabledPartitionIdSet, Set<String> tags, RunningInstance runningInstance,
+ public Participant(ParticipantConfig participantConfig, LiveInstance liveInstance,
Map<ResourceId, CurrentState> currentStateMap, Map<MessageId, Message> messageMap,
- UserConfig userConfig, ContainerConfig containerConfig) {
- _config =
- new ParticipantConfig(id, hostName, port, isEnabled, disabledPartitionIdSet, tags,
- userConfig);
+ ContainerConfig containerConfig) {
+ _config = participantConfig;
_containerConfig = containerConfig;
- _runningInstance = runningInstance;
+ _liveInstance = liveInstance;
_currentStateMap = ImmutableMap.copyOf(currentStateMap);
_messageMap = ImmutableMap.copyOf(messageMap);
}
@@ -100,15 +98,15 @@ public class Participant {
* @return true if running or false otherwise
*/
public boolean isAlive() {
- return _runningInstance != null;
+ return _liveInstance != null;
}
/**
* Get the running instance
* @return running instance or null if not running
*/
- public RunningInstance getRunningInstance() {
- return _runningInstance;
+ public LiveInstance getLiveInstance() {
+ return _liveInstance;
}
/**
http://git-wip-us.apache.org/repos/asf/helix/blob/ff958b19/helix-core/src/main/java/org/apache/helix/api/Resource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Resource.java b/helix-core/src/main/java/org/apache/helix/api/Resource.java
index 239748c..3a7a9f2 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Resource.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Resource.java
@@ -24,7 +24,6 @@ import java.util.Map;
import java.util.Set;
import org.apache.helix.api.config.ResourceConfig;
-import org.apache.helix.api.config.ResourceConfig.ResourceType;
import org.apache.helix.api.config.SchedulerTaskConfig;
import org.apache.helix.api.config.UserConfig;
import org.apache.helix.api.id.PartitionId;
@@ -47,25 +46,13 @@ public class Resource {
/**
* Construct a resource
- * @param id resource id
- * @param type ResourceType type
- * @param idealState ideal state of the resource
+ * @param resourceConfig full resource configuration
* @param externalView external view of the resource
* @param resourceAssignment current resource assignment of the cluster
- * @param rebalancerConfig parameters that the rebalancer should be aware of
- * @param provisionerConfig parameters that the provisioner should be aware of
- * @param userConfig any resource user-defined configuration
- * @param bucketSize the bucket size to use for physically saved state
- * @param batchMessageMode true if batch messaging allowed, false otherwise
*/
- public Resource(ResourceId id, ResourceType type, IdealState idealState,
- ResourceAssignment resourceAssignment, ExternalView externalView,
- RebalancerConfig rebalancerConfig, ProvisionerConfig provisionerConfig,
- UserConfig userConfig, int bucketSize, boolean batchMessageMode) {
- SchedulerTaskConfig schedulerTaskConfig = schedulerTaskConfig(idealState);
- _config =
- new ResourceConfig(id, type, idealState, schedulerTaskConfig, rebalancerConfig,
- provisionerConfig, userConfig, bucketSize, batchMessageMode);
+ public Resource(ResourceConfig resourceConfig, ResourceAssignment resourceAssignment,
+ ExternalView externalView) {
+ _config = resourceConfig;
_externalView = externalView;
_resourceAssignment = resourceAssignment;
}
@@ -75,7 +62,7 @@ public class Resource {
* @param idealState
* @return scheduler-task config or null if state-model-def is not SchedulerTaskQueue
*/
- SchedulerTaskConfig schedulerTaskConfig(IdealState idealState) {
+ public static SchedulerTaskConfig schedulerTaskConfig(IdealState idealState) {
if (idealState == null) {
return null;
}
@@ -169,22 +156,6 @@ public class Resource {
}
/**
- * Get bucket size
- * @return bucket size
- */
- public int getBucketSize() {
- return _config.getBucketSize();
- }
-
- /**
- * Get batch message mode
- * @return true if in batch message mode, false otherwise
- */
- public boolean getBatchMessageMode() {
- return _config.getBatchMessageMode();
- }
-
- /**
* Get the properties configuring provisioning
* @return ProvisionerConfig properties
*/
http://git-wip-us.apache.org/repos/asf/helix/blob/ff958b19/helix-core/src/main/java/org/apache/helix/api/RunningInstance.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/RunningInstance.java b/helix-core/src/main/java/org/apache/helix/api/RunningInstance.java
deleted file mode 100644
index 4e5aedb..0000000
--- a/helix-core/src/main/java/org/apache/helix/api/RunningInstance.java
+++ /dev/null
@@ -1,69 +0,0 @@
-package org.apache.helix.api;
-
-import org.apache.helix.api.id.ProcId;
-import org.apache.helix.api.id.SessionId;
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * A running attributes of a helix instance
- */
-public class RunningInstance {
- private final SessionId _sessionId;
- private final HelixVersion _version;
- private final ProcId _pid;
-
- /**
- * Construct running instance
- * @param sessionId zookeeper session-id
- * @param version helix-version
- * @param pid running jvm name
- */
- public RunningInstance(SessionId sessionId, HelixVersion version, ProcId pid) {
- _sessionId = sessionId;
- _version = version;
- _pid = pid;
- }
-
- /**
- * Get session id of the running instance
- * session id is the zookeeper session id
- * @return session id
- */
- public SessionId getSessionId() {
- return _sessionId;
- }
-
- /**
- * Get helix version of the running instance
- * @return helix version
- */
- public HelixVersion getVersion() {
- return _version;
- }
-
- /**
- * Get the name of the running jvm of the running instance
- * @return running jvm name (e.g. 1111@host)
- */
- public ProcId getPid() {
- return _pid;
- }
-}
http://git-wip-us.apache.org/repos/asf/helix/blob/ff958b19/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
index 21d40b1..70e600b 100644
--- a/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/accessor/ClusterAccessor.java
@@ -22,10 +22,8 @@ package org.apache.helix.api.accessor;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import org.apache.helix.AccessOption;
import org.apache.helix.BaseDataAccessor;
@@ -35,13 +33,11 @@ import org.apache.helix.api.Cluster;
import org.apache.helix.api.Controller;
import org.apache.helix.api.Participant;
import org.apache.helix.api.Resource;
-import org.apache.helix.api.RunningInstance;
import org.apache.helix.api.Scope;
import org.apache.helix.api.config.ClusterConfig;
import org.apache.helix.api.config.ContainerConfig;
import org.apache.helix.api.config.ParticipantConfig;
import org.apache.helix.api.config.ResourceConfig;
-import org.apache.helix.api.config.ResourceConfig.ResourceType;
import org.apache.helix.api.config.UserConfig;
import org.apache.helix.api.id.ClusterId;
import org.apache.helix.api.id.ContextId;
@@ -82,7 +78,6 @@ import org.apache.helix.util.HelixUtil;
import org.apache.log4j.Logger;
import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
public class ClusterAccessor {
private static Logger LOG = Logger.getLogger(ClusterAccessor.class);
@@ -360,11 +355,8 @@ public class ClusterAccessor {
}
// populate all the resources
- Set<String> allResources = Sets.newHashSet();
- allResources.addAll(idealStateMap.keySet());
- allResources.addAll(resourceConfigMap.keySet());
Map<ResourceId, Resource> resourceMap = Maps.newHashMap();
- for (String resourceName : allResources) {
+ for (String resourceName : idealStateMap.keySet()) {
ResourceId resourceId = ResourceId.from(resourceName);
resourceMap.put(
resourceId,
@@ -518,8 +510,8 @@ public class ClusterAccessor {
// Create an IdealState from a RebalancerConfig (if the resource supports it)
IdealState idealState =
- PartitionedRebalancerConfig.rebalancerConfigToIdealState(resource.getRebalancerConfig(),
- resource.getBucketSize(), resource.getBatchMessageMode());
+ PartitionedRebalancerConfig.rebalancerConfigToIdealState(resource.getRebalancerConfig(), 0,
+ false);
if (idealState != null) {
_accessor.setProperty(_keyBuilder.idealStates(resourceId.stringify()), idealState);
}
@@ -527,7 +519,6 @@ public class ClusterAccessor {
// Add resource user config
if (resource.getUserConfig() != null) {
ResourceConfiguration configuration = new ResourceConfiguration(resourceId);
- configuration.setType(resource.getType());
configuration.addNamespacedConfig(resource.getUserConfig());
PartitionedRebalancerConfig partitionedConfig = PartitionedRebalancerConfig.from(config);
if (idealState == null
@@ -636,21 +627,8 @@ public class ClusterAccessor {
initParticipantStructure(participantId);
// add the config
- InstanceConfig instanceConfig = new InstanceConfig(participant.getId());
- instanceConfig.setHostName(participant.getHostName());
- instanceConfig.setPort(Integer.toString(participant.getPort()));
- instanceConfig.setInstanceEnabled(participant.isEnabled());
- UserConfig userConfig = participant.getUserConfig();
- instanceConfig.addNamespacedConfig(userConfig);
- Set<String> tags = participant.getTags();
- for (String tag : tags) {
- instanceConfig.addTag(tag);
- }
- Set<PartitionId> disabledPartitions = participant.getDisabledPartitions();
- for (PartitionId partitionId : disabledPartitions) {
- instanceConfig.setParticipantEnabledForPartition(partitionId, false);
- }
- _accessor.setProperty(_keyBuilder.instanceConfig(participantId.stringify()), instanceConfig);
+ _accessor.setProperty(_keyBuilder.instanceConfig(participantId.stringify()),
+ participant.getInstanceConfig());
return true;
}
@@ -770,39 +748,6 @@ public class ClusterAccessor {
private static Participant createParticipant(ParticipantId participantId,
InstanceConfig instanceConfig, UserConfig userConfig, LiveInstance liveInstance,
Map<String, Message> instanceMsgMap, Map<String, CurrentState> instanceCurStateMap) {
-
- String hostName = instanceConfig.getHostName();
-
- int port = -1;
- try {
- port = Integer.parseInt(instanceConfig.getPort());
- } catch (IllegalArgumentException e) {
- // keep as -1
- }
- if (port < 0 || port > 65535) {
- port = -1;
- }
- boolean isEnabled = instanceConfig.getInstanceEnabled();
-
- List<String> disabledPartitions = instanceConfig.getDisabledPartitions();
- Set<PartitionId> disabledPartitionIdSet = Collections.emptySet();
- if (disabledPartitions != null) {
- disabledPartitionIdSet = new HashSet<PartitionId>();
- for (String partitionId : disabledPartitions) {
- disabledPartitionIdSet.add(PartitionId.from(PartitionId.extractResourceId(partitionId),
- PartitionId.stripResourceId(partitionId)));
- }
- }
-
- Set<String> tags = new HashSet<String>(instanceConfig.getTags());
-
- RunningInstance runningInstance = null;
- if (liveInstance != null) {
- runningInstance =
- new RunningInstance(liveInstance.getTypedSessionId(),
- liveInstance.getTypedHelixVersion(), liveInstance.getProcessId());
- }
-
Map<MessageId, Message> msgMap = new HashMap<MessageId, Message>();
if (instanceMsgMap != null) {
for (String msgId : instanceMsgMap.keySet()) {
@@ -828,8 +773,9 @@ public class ClusterAccessor {
containerConfig = new ContainerConfig(containerId, containerSpec, containerState);
}
- return new Participant(participantId, hostName, port, isEnabled, disabledPartitionIdSet, tags,
- runningInstance, curStateMap, msgMap, userConfig, containerConfig);
+ // Populate the logical class
+ ParticipantConfig participantConfig = ParticipantConfig.from(instanceConfig);
+ return new Participant(participantConfig, liveInstance, curStateMap, msgMap, containerConfig);
}
/**
@@ -876,7 +822,7 @@ public class ClusterAccessor {
_accessor.getProperty(_keyBuilder.resourceConfig(resourceId.stringify()));
IdealState idealState = _accessor.getProperty(_keyBuilder.idealStates(resourceId.stringify()));
- if (config == null && idealState == null) {
+ if (idealState == null) {
LOG.error("Resource " + resourceId + " not present on the cluster");
return null;
}
@@ -958,8 +904,6 @@ public class ClusterAccessor {
// only persist if this is not easily convertible to an ideal state
config.addNamespacedConfig(new RebalancerConfigHolder(resourceConfig.getRebalancerConfig())
.toNamespacedConfig());
- config.setBucketSize(resourceConfig.getBucketSize());
- config.setBatchMessageMode(resourceConfig.getBatchMessageMode());
} else if (userConfig == null) {
config = null;
}
@@ -967,8 +911,6 @@ public class ClusterAccessor {
config.addNamespacedConfig(new ProvisionerConfigHolder(resourceConfig.getProvisionerConfig())
.toNamespacedConfig());
}
- config.setBucketSize(resourceConfig.getBucketSize());
- config.setBatchMessageMode(resourceConfig.getBatchMessageMode());
setResourceConfiguration(resourceId, config, resourceConfig.getRebalancerConfig());
return true;
}
@@ -988,15 +930,11 @@ public class ClusterAccessor {
UserConfig userConfig;
ProvisionerConfig provisionerConfig = null;
RebalancerConfig rebalancerConfig = null;
- ResourceType type = ResourceType.DATA;
if (resourceConfiguration != null) {
userConfig = resourceConfiguration.getUserConfig();
- type = resourceConfiguration.getType();
} else {
userConfig = new UserConfig(Scope.resource(resourceId));
}
- int bucketSize = 0;
- boolean batchMessageMode = false;
if (idealState != null) {
if (resourceConfiguration != null
&& idealState.getRebalanceMode() == RebalanceMode.USER_DEFINED) {
@@ -1008,12 +946,8 @@ public class ClusterAccessor {
// prefer ideal state for non-user_defined data rebalancing
rebalancerConfig = PartitionedRebalancerConfig.from(idealState);
}
- bucketSize = idealState.getBucketSize();
- batchMessageMode = idealState.getBatchMessageMode();
idealState.updateUserConfig(userConfig);
} else if (resourceConfiguration != null) {
- bucketSize = resourceConfiguration.getBucketSize();
- batchMessageMode = resourceConfiguration.getBatchMessageMode();
rebalancerConfig = resourceConfiguration.getRebalancerConfig(RebalancerConfig.class);
}
if (rebalancerConfig == null) {
@@ -1022,8 +956,12 @@ public class ClusterAccessor {
if (resourceConfiguration != null) {
provisionerConfig = resourceConfiguration.getProvisionerConfig(ProvisionerConfig.class);
}
- return new Resource(resourceId, type, idealState, resourceAssignment, externalView,
- rebalancerConfig, provisionerConfig, userConfig, bucketSize, batchMessageMode);
+ ResourceConfig resourceConfig =
+ new ResourceConfig.Builder(resourceId).idealState(idealState)
+ .rebalancerConfig(rebalancerConfig).provisionerConfig(provisionerConfig)
+ .schedulerTaskConfig(Resource.schedulerTaskConfig(idealState)).userConfig(userConfig)
+ .build();
+ return new Resource(resourceConfig, resourceAssignment, externalView);
}
/**
http://git-wip-us.apache.org/repos/asf/helix/blob/ff958b19/helix-core/src/main/java/org/apache/helix/api/config/ClusterConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/config/ClusterConfig.java b/helix-core/src/main/java/org/apache/helix/api/config/ClusterConfig.java
index d5dd337..f39482c 100644
--- a/helix-core/src/main/java/org/apache/helix/api/config/ClusterConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/api/config/ClusterConfig.java
@@ -6,7 +6,6 @@ import java.util.Map;
import java.util.Set;
import org.apache.helix.api.Scope;
-import org.apache.helix.api.State;
import org.apache.helix.api.id.ClusterId;
import org.apache.helix.api.id.ConstraintId;
import org.apache.helix.api.id.ParticipantId;
@@ -15,7 +14,6 @@ import org.apache.helix.api.id.StateModelDefId;
import org.apache.helix.model.ClusterConstraints;
import org.apache.helix.model.ClusterConstraints.ConstraintAttribute;
import org.apache.helix.model.ClusterConstraints.ConstraintType;
-import org.apache.helix.model.ClusterConstraints.ConstraintValue;
import org.apache.helix.model.ConstraintItem;
import org.apache.helix.model.Message.MessageType;
import org.apache.helix.model.StateModelDefinition;
@@ -112,57 +110,6 @@ public class ClusterConfig {
}
/**
- * Get the maximum number of participants that can be in a state
- * @param scope the scope for the bound
- * @param stateModelDefId the state model of the state
- * @param state the constrained state
- * @return The upper bound, which can be "-1" if unspecified, a numerical upper bound, "R" for
- * number of replicas, or "N" for number of participants
- */
- public String getStateUpperBoundConstraint(Scope<?> scope, StateModelDefId stateModelDefId,
- State state) {
- // set up attributes to match based on the scope
- ClusterConstraints stateConstraints = getConstraintMap().get(ConstraintType.STATE_CONSTRAINT);
- Map<ConstraintAttribute, String> matchAttributes = Maps.newHashMap();
- matchAttributes.put(ConstraintAttribute.STATE, state.toString());
- matchAttributes.put(ConstraintAttribute.STATE_MODEL, stateModelDefId.toString());
- switch (scope.getType()) {
- case CLUSTER:
- // cluster is implicit
- break;
- case RESOURCE:
- matchAttributes.put(ConstraintAttribute.RESOURCE, scope.getScopedId().stringify());
- break;
- default:
- LOG.error("Unsupported scope for state constraint: " + scope);
- return "-1";
- }
- Set<ConstraintItem> matches = stateConstraints.match(matchAttributes);
- int value = -1;
- for (ConstraintItem item : matches) {
- // match: if an R or N is found, always choose that one
- // otherwise, take the minimum of the counts specified in the constraints
- String constraintValue = item.getConstraintValue();
- if (constraintValue != null) {
- if (constraintValue.equals(ConstraintValue.N.toString())
- || constraintValue.equals(ConstraintValue.R.toString())) {
- return constraintValue;
- } else {
- try {
- int current = Integer.parseInt(constraintValue);
- if (value == -1 || current < value) {
- value = current;
- }
- } catch (NumberFormatException e) {
- LOG.error("Invalid state upper bound: " + constraintValue);
- }
- }
- }
- }
- return Integer.toString(value);
- }
-
- /**
* Get the limit of simultaneous execution of a transition
* @param scope the scope under which the transition is constrained
* @param stateModelDefId the state model of which the transition is a part
@@ -281,55 +228,6 @@ public class ClusterConfig {
}
/**
- * Add a state upper bound constraint
- * @param scope scope under which the constraint is valid
- * @param stateModelDefId identifier of the state model that owns the state
- * @param state the state to constrain
- * @param upperBound maximum number of replicas per partition in the state
- * @return Delta
- */
- public Delta addStateUpperBoundConstraint(Scope<?> scope, StateModelDefId stateModelDefId,
- State state, int upperBound) {
- return addStateUpperBoundConstraint(scope, stateModelDefId, state,
- Integer.toString(upperBound));
- }
-
- /**
- * Add a state upper bound constraint
- * @param scope scope under which the constraint is valid
- * @param stateModelDefId identifier of the state model that owns the state
- * @param state the state to constrain
- * @param dynamicUpperBound the upper bound of replicas per partition in the state, can be a
- * number, or the currently supported special bound values:<br />
- * "R" - Refers to the number of replicas specified during resource
- * creation. This allows having different replication factor for each
- * resource without having to create a different state machine. <br />
- * "N" - Refers to all nodes in the cluster. Useful for resources that need
- * to exist on all nodes. This way one can add/remove nodes without having
- * the change the bounds.
- * @return Delta
- */
- public Delta addStateUpperBoundConstraint(Scope<?> scope, StateModelDefId stateModelDefId,
- State state, String dynamicUpperBound) {
- _builder.addStateUpperBoundConstraint(scope, stateModelDefId, state, dynamicUpperBound);
- return this;
- }
-
- /**
- * Remove state upper bound constraint
- * @param scope scope under which the constraint is valid
- * @param stateModelDefId identifier of the state model that owns the state
- * @param state the state to constrain
- * @return Delta
- */
- public Delta removeStateUpperBoundConstraint(Scope<?> scope, StateModelDefId stateModelDefId,
- State state) {
- _removedConstraints.get(ConstraintType.STATE_CONSTRAINT).add(
- ConstraintId.from(scope, stateModelDefId, state));
- return this;
- }
-
- /**
* Add a constraint on the maximum number of in-flight transitions of a certain type
* @param scope scope of the constraint
* @param stateModelDefId identifies the state model containing the transition
@@ -604,71 +502,12 @@ public class ClusterConfig {
}
/**
- * Add a state upper bound constraint
- * @param scope scope under which the constraint is valid
- * @param stateModelDefId identifier of the state model that owns the state
- * @param state the state to constrain
- * @param upperBound maximum number of replicas per partition in the state
- * @return Builder
- */
- public Builder addStateUpperBoundConstraint(Scope<?> scope, StateModelDefId stateModelDefId,
- State state, int upperBound) {
- return addStateUpperBoundConstraint(scope, stateModelDefId, state,
- Integer.toString(upperBound));
- }
-
- /**
- * Add a state upper bound constraint
- * @param scope scope under which the constraint is valid
- * @param stateModelDefId identifier of the state model that owns the state
- * @param state the state to constrain
- * @param dynamicUpperBound the upper bound of replicas per partition in the state, can be a
- * number, or the currently supported special bound values:<br />
- * "R" - Refers to the number of replicas specified during resource
- * creation. This allows having different replication factor for each
- * resource without having to create a different state machine. <br />
- * "N" - Refers to all nodes in the cluster. Useful for resources that need
- * to exist on all nodes. This way one can add/remove nodes without having
- * the change the bounds.
- * @return Builder
- */
- public Builder addStateUpperBoundConstraint(Scope<?> scope, StateModelDefId stateModelDefId,
- State state, String dynamicUpperBound) {
- Map<String, String> attributes = Maps.newHashMap();
- attributes.put(ConstraintAttribute.STATE.toString(), state.toString());
- attributes.put(ConstraintAttribute.STATE_MODEL.toString(), stateModelDefId.stringify());
- attributes.put(ConstraintAttribute.CONSTRAINT_VALUE.toString(), dynamicUpperBound);
- switch (scope.getType()) {
- case CLUSTER:
- // cluster is implicit
- break;
- case RESOURCE:
- attributes.put(ConstraintAttribute.RESOURCE.toString(), scope.getScopedId().stringify());
- break;
- default:
- LOG.error("Unsupported scope for adding a state constraint: " + scope);
- return this;
- }
- ConstraintItem item = new ConstraintItemBuilder().addConstraintAttributes(attributes).build();
- ClusterConstraints constraints = getConstraintsInstance(ConstraintType.STATE_CONSTRAINT);
- constraints.addConstraintItem(ConstraintId.from(scope, stateModelDefId, state), item);
- return this;
- }
-
- /**
* Add a state model definition to the cluster
* @param stateModelDef state model definition of the cluster
* @return Builder
*/
public Builder addStateModelDefinition(StateModelDefinition stateModelDef) {
_stateModelMap.put(stateModelDef.getStateModelDefId(), stateModelDef);
- // add state constraints from the state model definition
- for (State state : stateModelDef.getTypedStatesPriorityList()) {
- if (!stateModelDef.getNumParticipantsPerState(state).equals("-1")) {
- addStateUpperBoundConstraint(Scope.cluster(_id), stateModelDef.getStateModelDefId(),
- state, stateModelDef.getNumParticipantsPerState(state));
- }
- }
return this;
}
http://git-wip-us.apache.org/repos/asf/helix/blob/ff958b19/helix-core/src/main/java/org/apache/helix/api/config/ParticipantConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/config/ParticipantConfig.java b/helix-core/src/main/java/org/apache/helix/api/config/ParticipantConfig.java
index 119ff52..4fd42b9 100644
--- a/helix-core/src/main/java/org/apache/helix/api/config/ParticipantConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/api/config/ParticipantConfig.java
@@ -1,13 +1,12 @@
package org.apache.helix.api.config;
-import java.util.HashSet;
import java.util.Set;
-import org.apache.helix.api.Scope;
import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.InstanceConfig.InstanceConfigProperty;
-import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
/*
@@ -33,10 +32,7 @@ import com.google.common.collect.Sets;
* Configuration properties of a Helix participant
*/
public class ParticipantConfig {
- private final ParticipantId _id;
- private final String _hostName;
- private final int _port;
- private final boolean _isEnabled;
+ private final InstanceConfig _instanceConfig;
private final Set<PartitionId> _disabledPartitions;
private final Set<String> _tags;
private final UserConfig _userConfig;
@@ -50,15 +46,14 @@ public class ParticipantConfig {
* @param disabledPartitions set of partitions, if any to disable on this participant
* @param tags tags to set for the participant
*/
- public ParticipantConfig(ParticipantId id, String hostName, int port, boolean isEnabled,
- Set<PartitionId> disabledPartitions, Set<String> tags, UserConfig userConfig) {
- _id = id;
- _hostName = hostName;
- _port = port;
- _isEnabled = isEnabled;
- _disabledPartitions = ImmutableSet.copyOf(disabledPartitions);
- _tags = ImmutableSet.copyOf(tags);
- _userConfig = userConfig;
+ private ParticipantConfig(InstanceConfig instanceConfig) {
+ _instanceConfig = instanceConfig;
+ _disabledPartitions = Sets.newHashSet();
+ for (String partitionName : instanceConfig.getDisabledPartitions()) {
+ _disabledPartitions.add(PartitionId.from(partitionName));
+ }
+ _tags = Sets.newHashSet(instanceConfig.getTags());
+ _userConfig = instanceConfig.getUserConfig();
}
/**
@@ -66,7 +61,7 @@ public class ParticipantConfig {
* @return host name, or null if not applicable
*/
public String getHostName() {
- return _hostName;
+ return _instanceConfig.getHostName();
}
/**
@@ -74,7 +69,8 @@ public class ParticipantConfig {
* @return port number, or -1 if not applicable
*/
public int getPort() {
- return _port;
+ return _instanceConfig.getRecord()
+ .getIntField(InstanceConfigProperty.HELIX_PORT.toString(), -1);
}
/**
@@ -82,7 +78,7 @@ public class ParticipantConfig {
* @return true if enabled or false otherwise
*/
public boolean isEnabled() {
- return _isEnabled;
+ return _instanceConfig.getInstanceEnabled();
}
/**
@@ -123,7 +119,15 @@ public class ParticipantConfig {
* @return ParticipantId
*/
public ParticipantId getId() {
- return _id;
+ return _instanceConfig.getParticipantId();
+ }
+
+ /**
+ * Get the physical instance config
+ * @return InstanceConfig
+ */
+ public InstanceConfig getInstanceConfig() {
+ return _instanceConfig;
}
/**
@@ -284,24 +288,14 @@ public class ParticipantConfig {
* Assemble a participant
*/
public static class Builder {
- private final ParticipantId _id;
- private String _hostName;
- private int _port;
- private boolean _isEnabled;
- private final Set<PartitionId> _disabledPartitions;
- private final Set<String> _tags;
- private UserConfig _userConfig;
+ private final InstanceConfig _instanceConfig;
/**
* Build a participant with a given id
* @param id participant id
*/
public Builder(ParticipantId id) {
- _id = id;
- _disabledPartitions = new HashSet<PartitionId>();
- _tags = new HashSet<String>();
- _isEnabled = true;
- _userConfig = new UserConfig(Scope.participant(id));
+ _instanceConfig = new InstanceConfig(id);
}
/**
@@ -310,7 +304,7 @@ public class ParticipantConfig {
* @return Builder
*/
public Builder hostName(String hostName) {
- _hostName = hostName;
+ _instanceConfig.setHostName(hostName);
return this;
}
@@ -320,7 +314,7 @@ public class ParticipantConfig {
* @return Builder
*/
public Builder port(int port) {
- _port = port;
+ _instanceConfig.setPort(String.valueOf(port));
return this;
}
@@ -330,7 +324,7 @@ public class ParticipantConfig {
* @return Builder
*/
public Builder enabled(boolean isEnabled) {
- _isEnabled = isEnabled;
+ _instanceConfig.setInstanceEnabled(isEnabled);
return this;
}
@@ -340,7 +334,7 @@ public class ParticipantConfig {
* @return Builder
*/
public Builder addDisabledPartition(PartitionId partitionId) {
- _disabledPartitions.add(partitionId);
+ _instanceConfig.setInstanceEnabledForPartition(partitionId.toString(), false);
return this;
}
@@ -350,7 +344,7 @@ public class ParticipantConfig {
* @return Builder
*/
public Builder addTag(String tag) {
- _tags.add(tag);
+ _instanceConfig.addTag(tag);
return this;
}
@@ -360,7 +354,7 @@ public class ParticipantConfig {
* @return Builder
*/
public Builder userConfig(UserConfig userConfig) {
- _userConfig = userConfig;
+ _instanceConfig.addNamespacedConfig(userConfig);
return this;
}
@@ -369,8 +363,16 @@ public class ParticipantConfig {
* @return instantiated Participant
*/
public ParticipantConfig build() {
- return new ParticipantConfig(_id, _hostName, _port, _isEnabled, _disabledPartitions, _tags,
- _userConfig);
+ return new ParticipantConfig(_instanceConfig);
}
}
+
+ /**
+ * Get a participant config from a physical instance config
+ * @param instanceConfig the populated config
+ * @return ParticipantConfig instance
+ */
+ public static ParticipantConfig from(InstanceConfig instanceConfig) {
+ return new ParticipantConfig(instanceConfig);
+ }
}
http://git-wip-us.apache.org/repos/asf/helix/blob/ff958b19/helix-core/src/main/java/org/apache/helix/api/config/ResourceConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/config/ResourceConfig.java b/helix-core/src/main/java/org/apache/helix/api/config/ResourceConfig.java
index 26df5d7..fd9f20f 100644
--- a/helix-core/src/main/java/org/apache/helix/api/config/ResourceConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/api/config/ResourceConfig.java
@@ -34,15 +34,6 @@ import com.google.common.collect.Sets;
* Full configuration of a Helix resource. Typically used to add or modify resources on a cluster
*/
public class ResourceConfig {
- /**
- * Type of a resource. A resource is any entity that can be managed by Helix.
- */
- public enum ResourceType {
- /**
- * A resource that is persistent, and potentially partitioned and replicated.
- */
- DATA
- }
private final ResourceId _id;
private final RebalancerConfig _rebalancerConfig;
@@ -50,34 +41,25 @@ public class ResourceConfig {
private final SchedulerTaskConfig _schedulerTaskConfig;
private final ProvisionerConfig _provisionerConfig;
private final UserConfig _userConfig;
- private final int _bucketSize;
- private final boolean _batchMessageMode;
- private final ResourceType _resourceType;
/**
* Instantiate a configuration. Consider using ResourceConfig.Builder
* @param id resource id
- * @param partitionMap map of partition identifiers to partition objects
+ * @param idealState the ideal state of the resource
* @param schedulerTaskConfig configuration for scheduler tasks associated with the resource
* @param rebalancerConfig configuration for rebalancing the resource
* @param provisionerConfig configuration for provisioning for the resource
* @param userConfig user-defined resource properties
- * @param bucketSize bucket size for this resource
- * @param batchMessageMode whether or not batch messaging is allowed
*/
- public ResourceConfig(ResourceId id, ResourceType resourceType, IdealState idealState,
+ private ResourceConfig(ResourceId id, IdealState idealState,
SchedulerTaskConfig schedulerTaskConfig, RebalancerConfig rebalancerConfig,
- ProvisionerConfig provisionerConfig, UserConfig userConfig, int bucketSize,
- boolean batchMessageMode) {
+ ProvisionerConfig provisionerConfig, UserConfig userConfig) {
_id = id;
- _resourceType = resourceType;
_schedulerTaskConfig = schedulerTaskConfig;
_idealState = idealState;
_rebalancerConfig = rebalancerConfig;
_provisionerConfig = provisionerConfig;
_userConfig = userConfig;
- _bucketSize = bucketSize;
- _batchMessageMode = batchMessageMode;
}
/**
@@ -113,14 +95,6 @@ public class ResourceConfig {
}
/**
- * Get the resource type
- * @return ResourceType
- */
- public ResourceType getType() {
- return _resourceType;
- }
-
- /**
* Get the properties configuring scheduler tasks
* @return SchedulerTaskConfig properties
*/
@@ -144,22 +118,6 @@ public class ResourceConfig {
return _userConfig;
}
- /**
- * Get the bucket size for this resource
- * @return bucket size
- */
- public int getBucketSize() {
- return _bucketSize;
- }
-
- /**
- * Get the batch message mode
- * @return true if enabled, false if disabled
- */
- public boolean getBatchMessageMode() {
- return _batchMessageMode;
- }
-
@Override
public String toString() {
return _idealState.toString();
@@ -170,12 +128,10 @@ public class ResourceConfig {
*/
public static class Delta {
private enum Fields {
- TYPE,
+ IDEAL_STATE,
REBALANCER_CONFIG,
PROVISIONER_CONFIG,
USER_CONFIG,
- BUCKET_SIZE,
- BATCH_MESSAGE_MODE
}
private Set<Fields> _updateFields;
@@ -191,13 +147,13 @@ public class ResourceConfig {
}
/**
- * Set the type of this resource
- * @param type ResourceType
+ * Set the ideal state
+ * @param idealState updated ideal state
* @return Delta
*/
- public Delta setType(ResourceType type) {
- _builder.type(type);
- _updateFields.add(Fields.TYPE);
+ public Delta setIdealState(IdealState idealState) {
+ _builder.idealState(idealState);
+ _updateFields.add(Fields.IDEAL_STATE);
return this;
}
@@ -235,28 +191,6 @@ public class ResourceConfig {
}
/**
- * Set the bucket size
- * @param bucketSize the size to use
- * @return Delta
- */
- public Delta setBucketSize(int bucketSize) {
- _builder.bucketSize(bucketSize);
- _updateFields.add(Fields.BUCKET_SIZE);
- return this;
- }
-
- /**
- * Set the batch message mode
- * @param batchMessageMode true to enable, false to disable
- * @return Delta
- */
- public Delta setBatchMessageMode(boolean batchMessageMode) {
- _builder.batchMessageMode(batchMessageMode);
- _updateFields.add(Fields.BATCH_MESSAGE_MODE);
- return this;
- }
-
- /**
* Create a ResourceConfig that is the combination of an existing ResourceConfig and this delta
* @param orig the original ResourceConfig
* @return updated ResourceConfig
@@ -264,15 +198,14 @@ public class ResourceConfig {
public ResourceConfig mergeInto(ResourceConfig orig) {
ResourceConfig deltaConfig = _builder.build();
Builder builder =
- new Builder(orig.getId()).type(orig.getType())
+ new Builder(orig.getId()).idealState(orig.getIdealState())
.rebalancerConfig(orig.getRebalancerConfig())
.provisionerConfig(orig.getProvisionerConfig())
- .schedulerTaskConfig(orig.getSchedulerTaskConfig()).userConfig(orig.getUserConfig())
- .bucketSize(orig.getBucketSize()).batchMessageMode(orig.getBatchMessageMode());
+ .schedulerTaskConfig(orig.getSchedulerTaskConfig()).userConfig(orig.getUserConfig());
for (Fields field : _updateFields) {
switch (field) {
- case TYPE:
- builder.type(deltaConfig.getType());
+ case IDEAL_STATE:
+ builder.idealState(deltaConfig.getIdealState());
break;
case REBALANCER_CONFIG:
builder.rebalancerConfig(deltaConfig.getRebalancerConfig());
@@ -283,12 +216,6 @@ public class ResourceConfig {
case USER_CONFIG:
builder.userConfig(deltaConfig.getUserConfig());
break;
- case BUCKET_SIZE:
- builder.bucketSize(deltaConfig.getBucketSize());
- break;
- case BATCH_MESSAGE_MODE:
- builder.batchMessageMode(deltaConfig.getBatchMessageMode());
- break;
}
}
return builder.build();
@@ -300,14 +227,11 @@ public class ResourceConfig {
*/
public static class Builder {
private final ResourceId _id;
- private ResourceType _type;
private IdealState _idealState;
private RebalancerConfig _rebalancerConfig;
private SchedulerTaskConfig _schedulerTaskConfig;
private ProvisionerConfig _provisionerConfig;
private UserConfig _userConfig;
- private int _bucketSize;
- private boolean _batchMessageMode;
/**
* Build a Resource with an id
@@ -315,23 +239,10 @@ public class ResourceConfig {
*/
public Builder(ResourceId id) {
_id = id;
- _type = ResourceType.DATA;
- _bucketSize = 0;
- _batchMessageMode = false;
_userConfig = new UserConfig(Scope.resource(id));
}
/**
- * Set the type of this resource
- * @param type ResourceType
- * @return Builder
- */
- public Builder type(ResourceType type) {
- _type = type;
- return this;
- }
-
- /**
* Set the rebalancer configuration
* @param rebalancerConfig properties of interest for rebalancing
* @return Builder
@@ -380,32 +291,12 @@ public class ResourceConfig {
}
/**
- * Set the bucket size
- * @param bucketSize the size to use
- * @return Builder
- */
- public Builder bucketSize(int bucketSize) {
- _bucketSize = bucketSize;
- return this;
- }
-
- /**
- * Set the batch message mode
- * @param batchMessageMode true to enable, false to disable
- * @return Builder
- */
- public Builder batchMessageMode(boolean batchMessageMode) {
- _batchMessageMode = batchMessageMode;
- return this;
- }
-
- /**
* Create a Resource object
* @return instantiated Resource
*/
public ResourceConfig build() {
- return new ResourceConfig(_id, _type, _idealState, _schedulerTaskConfig, _rebalancerConfig,
- _provisionerConfig, _userConfig, _bucketSize, _batchMessageMode);
+ return new ResourceConfig(_id, _idealState, _schedulerTaskConfig, _rebalancerConfig,
+ _provisionerConfig, _userConfig);
}
}
}
http://git-wip-us.apache.org/repos/asf/helix/blob/ff958b19/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
index addd652..d51d546 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/util/ConstraintBasedAssignment.java
@@ -33,7 +33,6 @@ import org.apache.helix.HelixConstants.StateModelToken;
import org.apache.helix.HelixDefinedState;
import org.apache.helix.api.Cluster;
import org.apache.helix.api.Participant;
-import org.apache.helix.api.Scope;
import org.apache.helix.api.State;
import org.apache.helix.api.config.ClusterConfig;
import org.apache.helix.api.id.ParticipantId;
@@ -102,9 +101,7 @@ public class ConstraintBasedAssignment {
ResourceId resourceId, ClusterConfig cluster) {
Map<State, String> stateMap = Maps.newHashMap();
for (State state : stateModelDef.getTypedStatesPriorityList()) {
- String num =
- cluster.getStateUpperBoundConstraint(Scope.resource(resourceId),
- stateModelDef.getStateModelDefId(), state);
+ String num = stateModelDef.getNumParticipantsPerState(state);
stateMap.put(state, num);
}
return stateMap;
http://git-wip-us.apache.org/repos/asf/helix/blob/ff958b19/helix-core/src/main/java/org/apache/helix/controller/stages/CompatibilityCheckStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CompatibilityCheckStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CompatibilityCheckStage.java
index 15264ca..a498773 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CompatibilityCheckStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CompatibilityCheckStage.java
@@ -24,7 +24,6 @@ import java.util.Map;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerProperties;
import org.apache.helix.api.Cluster;
-import org.apache.helix.api.HelixVersion;
import org.apache.helix.api.Participant;
import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
@@ -50,8 +49,7 @@ public class CompatibilityCheckStage extends AbstractBaseStage {
// Map<String, LiveInstance> liveInstanceMap = cache.getLiveInstances();
Map<ParticipantId, Participant> liveParticipants = cluster.getLiveParticipantMap();
for (Participant liveParticipant : liveParticipants.values()) {
- HelixVersion version = liveParticipant.getRunningInstance().getVersion();
- String participantVersion = (version != null) ? version.toString() : null;
+ String participantVersion = liveParticipant.getLiveInstance().getHelixVersion();
if (!properties.isParticipantCompatible(participantVersion)) {
String errorMsg =
"incompatible participant. pipeline will not continue. " + "controller: "
http://git-wip-us.apache.org/repos/asf/helix/blob/ff958b19/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
index 25645d3..f062766 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ContainerProvisioningStage.java
@@ -276,7 +276,7 @@ public class ContainerProvisioningStage extends AbstractBaseStage {
if (participant.isAlive()) {
Message message = new Message(MessageType.SHUTDOWN, UUID.randomUUID().toString());
message.setTgtName(participant.getId().toString());
- message.setTgtSessionId(participant.getRunningInstance().getSessionId());
+ message.setTgtSessionId(participant.getLiveInstance().getSessionId());
message.setMsgId(message.getId());
accessor.createProperty(
keyBuilder.message(participant.getId().toString(), message.getId()), message);
http://git-wip-us.apache.org/repos/asf/helix/blob/ff958b19/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
index 4cc1b9f..e554324 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/CurrentStateComputationStage.java
@@ -68,8 +68,7 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
continue;
}
- if (!liveParticipant.getRunningInstance().getSessionId()
- .equals(message.getTypedTgtSessionId())) {
+ if (!liveParticipant.getLiveInstance().getSessionId().equals(message.getTgtSessionId())) {
continue;
}
@@ -104,7 +103,7 @@ public class CurrentStateComputationStage extends AbstractBaseStage {
}
// add current state
- SessionId sessionId = liveParticipant.getRunningInstance().getSessionId();
+ SessionId sessionId = SessionId.from(liveParticipant.getLiveInstance().getSessionId());
Map<ResourceId, CurrentState> curStateMap = liveParticipant.getCurrentStateMap();
for (CurrentState curState : curStateMap.values()) {
if (!sessionId.equals(curState.getTypedSessionId())) {
http://git-wip-us.apache.org/repos/asf/helix/blob/ff958b19/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
index 9f2721f..deabb56 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ExternalViewComputeStage.java
@@ -96,8 +96,8 @@ public class ExternalViewComputeStage extends AbstractBaseStage {
RebalancerConfig rebalancerConfig = resource.getRebalancerConfig();
SchedulerTaskConfig schedulerTaskConfig = resource.getSchedulerTaskConfig();
- if (resource.getBucketSize() > 0) {
- view.setBucketSize(resource.getBucketSize());
+ if (resource.getIdealState().getBucketSize() > 0) {
+ view.setBucketSize(resource.getIdealState().getBucketSize());
} else {
view.setBucketSize(currentStateOutput.getBucketSize(resourceId));
}
http://git-wip-us.apache.org/repos/asf/helix/blob/ff958b19/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationStage.java
index 893e116..61da673 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationStage.java
@@ -74,7 +74,10 @@ public class MessageGenerationStage extends AbstractBaseStage {
for (ResourceId resourceId : resourceMap.keySet()) {
ResourceConfig resourceConfig = resourceMap.get(resourceId);
- int bucketSize = resourceConfig.getBucketSize();
+ int bucketSize = 0;
+ if (resourceConfig.getIdealState() != null) {
+ bucketSize = resourceConfig.getIdealState().getBucketSize();
+ }
RebalancerConfig rebalancerCfg = resourceConfig.getRebalancerConfig();
StateModelDefinition stateModelDef = stateModelDefMap.get(rebalancerCfg.getStateModelDefId());
@@ -129,8 +132,8 @@ public class MessageGenerationStage extends AbstractBaseStage {
} else {
// TODO check if instance is alive
SessionId sessionId =
- cluster.getLiveParticipantMap().get(participantId).getRunningInstance()
- .getSessionId();
+ SessionId.from(cluster.getLiveParticipantMap().get(participantId).getLiveInstance()
+ .getSessionId());
RebalancerConfig rebalancerConfig = resourceConfig.getRebalancerConfig();
Message message =
createMessage(manager, resourceId, subUnitId, participantId, currentState,
http://git-wip-us.apache.org/repos/asf/helix/blob/ff958b19/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
index b5ed39e..2408e29 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageSelectionStage.java
@@ -29,7 +29,6 @@ import java.util.TreeMap;
import org.apache.helix.api.Cluster;
import org.apache.helix.api.Participant;
import org.apache.helix.api.Resource;
-import org.apache.helix.api.Scope;
import org.apache.helix.api.State;
import org.apache.helix.api.config.ResourceConfig;
import org.apache.helix.api.id.ParticipantId;
@@ -279,9 +278,7 @@ public class MessageSelectionStage extends AbstractBaseStage {
List<State> statePriorityList = stateModelDefinition.getTypedStatesPriorityList();
for (State state : statePriorityList) {
- String numInstancesPerState =
- cluster.getStateUpperBoundConstraint(Scope.cluster(cluster.getId()),
- stateModelDefinition.getStateModelDefId(), state);
+ String numInstancesPerState = stateModelDefinition.getNumParticipantsPerState(state);
int max = -1;
if ("N".equals(numInstancesPerState)) {
max = cluster.getLiveParticipantMap().size();
http://git-wip-us.apache.org/repos/asf/helix/blob/ff958b19/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
index 1fc7142..1036b35 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java
@@ -68,8 +68,6 @@ public class ResourceComputationStage extends AbstractBaseStage {
RebalancerConfig rebalancerCfg = resource.getRebalancerConfig();
ResourceConfig.Builder resCfgBuilder = new ResourceConfig.Builder(resourceId);
- resCfgBuilder.bucketSize(resource.getBucketSize());
- resCfgBuilder.batchMessageMode(resource.getBatchMessageMode());
resCfgBuilder.schedulerTaskConfig(resource.getSchedulerTaskConfig());
resCfgBuilder.rebalancerConfig(rebalancerCfg);
resCfgBuilder.provisionerConfig(resource.getProvisionerConfig());
@@ -93,6 +91,9 @@ public class ResourceComputationStage extends AbstractBaseStage {
Map<ResourceId, PartitionedRebalancerConfig.Builder> rebCtxBuilderMap =
new HashMap<ResourceId, PartitionedRebalancerConfig.Builder>();
+ Map<ResourceId, Integer> bucketSizeMap = new HashMap<ResourceId, Integer>();
+ Map<ResourceId, Boolean> batchModeMap = new HashMap<ResourceId, Boolean>();
+
for (Participant liveParticipant : cluster.getLiveParticipantMap().values()) {
for (ResourceId resourceId : liveParticipant.getCurrentStateMap().keySet()) {
CurrentState currentState = liveParticipant.getCurrentStateMap().get(resourceId);
@@ -119,9 +120,9 @@ public class ResourceComputationStage extends AbstractBaseStage {
rebCtxBuilderMap.put(resourceId, rebCtxBuilder);
ResourceConfig.Builder resCfgBuilder = new ResourceConfig.Builder(resourceId);
- resCfgBuilder.bucketSize(currentState.getBucketSize());
- resCfgBuilder.batchMessageMode(currentState.getBatchMessageMode());
resCfgBuilderMap.put(resourceId, resCfgBuilder);
+ bucketSizeMap.put(resourceId, currentState.getBucketSize());
+ batchModeMap.put(resourceId, currentState.getBatchMessageMode());
}
PartitionedRebalancerConfig.Builder rebCtxBuilder = rebCtxBuilderMap.get(resourceId);
@@ -138,7 +139,7 @@ public class ResourceComputationStage extends AbstractBaseStage {
RebalancerConfig rebalancerConfig = rebCtxBuilder.build();
resCfgBuilder.rebalancerConfig(rebalancerConfig);
resCfgBuilder.idealState(PartitionedRebalancerConfig.rebalancerConfigToIdealState(
- rebalancerConfig, 0, false));
+ rebalancerConfig, bucketSizeMap.get(resourceId), batchModeMap.get(resourceId)));
resCfgMap.put(resourceId, resCfgBuilder.build());
}
http://git-wip-us.apache.org/repos/asf/helix/blob/ff958b19/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
index aa47b4b..9d6228e 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/TaskAssignmentStage.java
@@ -109,10 +109,11 @@ public class TaskAssignmentStage extends AbstractBaseStage {
Participant liveParticipant = liveParticipantMap.get(participantId);
String participantVersion = null;
if (liveParticipant != null) {
- participantVersion = liveParticipant.getRunningInstance().getVersion().toString();
+ participantVersion = liveParticipant.getLiveInstance().getHelixVersion();
}
- if (resource == null || !resource.getBatchMessageMode() || participantVersion == null
+ if (resource == null || resource.getIdealState() == null
+ || !resource.getIdealState().getBatchMessageMode() || participantVersion == null
|| !properties.isFeatureSupported("batch_message", participantVersion)) {
outputMessages.add(message);
continue;
http://git-wip-us.apache.org/repos/asf/helix/blob/ff958b19/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
index 2dde23e..c386035 100644
--- a/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/model/InstanceConfig.java
@@ -207,7 +207,12 @@ public class InstanceConfig extends HelixProperty {
* @return a list of partition names
*/
public List<String> getDisabledPartitions() {
- return _record.getListField(InstanceConfigProperty.HELIX_DISABLED_PARTITION.toString());
+ List<String> disabledPartitions =
+ _record.getListField(InstanceConfigProperty.HELIX_DISABLED_PARTITION.toString());
+ if (disabledPartitions == null) {
+ disabledPartitions = Collections.emptyList();
+ }
+ return disabledPartitions;
}
/**
http://git-wip-us.apache.org/repos/asf/helix/blob/ff958b19/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java b/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java
index 452ca65..001b792 100644
--- a/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java
+++ b/helix-core/src/main/java/org/apache/helix/model/ResourceConfiguration.java
@@ -22,7 +22,6 @@ package org.apache.helix.model;
import org.apache.helix.HelixProperty;
import org.apache.helix.ZNRecord;
import org.apache.helix.api.config.NamespacedConfig;
-import org.apache.helix.api.config.ResourceConfig.ResourceType;
import org.apache.helix.api.config.UserConfig;
import org.apache.helix.api.id.ResourceId;
import org.apache.helix.controller.provisioner.ProvisionerConfig;
@@ -40,7 +39,6 @@ public class ResourceConfiguration extends HelixProperty {
private static final Logger LOG = Logger.getLogger(ResourceConfiguration.class);
public enum Fields {
- TYPE
}
/**
@@ -68,22 +66,6 @@ public class ResourceConfiguration extends HelixProperty {
}
/**
- * Set the resource type
- * @param type ResourceType type
- */
- public void setType(ResourceType type) {
- _record.setEnumField(Fields.TYPE.toString(), type);
- }
-
- /**
- * Get the resource type
- * @return ResourceType type
- */
- public ResourceType getType() {
- return _record.getEnumField(Fields.TYPE.toString(), ResourceType.class, ResourceType.DATA);
- }
-
- /**
* Get a backward-compatible resource user config
* @return UserConfig
*/
http://git-wip-us.apache.org/repos/asf/helix/blob/ff958b19/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index 29990ed..7fdf734 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -116,10 +116,10 @@ public abstract class TaskRebalancer implements HelixRebalancer {
RebalancerConfig rebalancerConfig, ResourceAssignment prevAssignment, Cluster cluster,
ResourceCurrentState currentState) {
return computeBestPossiblePartitionState(cluster, taskIs,
- cluster.getResource(rebalancerConfig.getResourceId()), currentState);
+ cluster.getResource(taskIs.getResourceId()), currentState);
}
- public ResourceAssignment computeBestPossiblePartitionState(Cluster clusterData,
+ private ResourceAssignment computeBestPossiblePartitionState(Cluster clusterData,
IdealState taskIs, Resource resource, ResourceCurrentState currStateOutput) {
final String resourceName = resource.getId().toString();
http://git-wip-us.apache.org/repos/asf/helix/blob/ff958b19/helix-core/src/test/java/org/apache/helix/api/TestNamespacedConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/api/TestNamespacedConfig.java b/helix-core/src/test/java/org/apache/helix/api/TestNamespacedConfig.java
index 761ffe2..0e1402d 100644
--- a/helix-core/src/test/java/org/apache/helix/api/TestNamespacedConfig.java
+++ b/helix-core/src/test/java/org/apache/helix/api/TestNamespacedConfig.java
@@ -3,7 +3,6 @@ package org.apache.helix.api;
import java.util.List;
import java.util.Map;
-import org.apache.helix.api.config.ResourceConfig.ResourceType;
import org.apache.helix.api.config.UserConfig;
import org.apache.helix.api.id.ClusterId;
import org.apache.helix.api.id.ParticipantId;
@@ -109,20 +108,18 @@ public class TestNamespacedConfig {
UserConfig userConfig = new UserConfig(Scope.resource(resourceId));
userConfig.setSimpleField(KEY1, VALUE1);
ResourceConfiguration resourceConfig = new ResourceConfiguration(resourceId);
- resourceConfig.setType(ResourceType.DATA);
resourceConfig.addNamespacedConfig(userConfig);
resourceConfig.getRecord().setSimpleField(KEY2, VALUE2);
IdealState idealState = new IdealState(resourceId);
idealState.setRebalanceMode(RebalanceMode.USER_DEFINED);
idealState.getRecord().setSimpleField(KEY3, VALUE3);
- // should have key1, key2, and key3, not type or rebalance mode
+ // should have key1, key2, and key3, not rebalance mode
UserConfig result = resourceConfig.getUserConfig();
idealState.updateUserConfig(result);
Assert.assertEquals(result.getSimpleField(KEY1), VALUE1);
Assert.assertEquals(result.getSimpleField(KEY2), VALUE2);
Assert.assertEquals(result.getSimpleField(KEY3), VALUE3);
- Assert.assertNull(result.getSimpleField(ResourceConfiguration.Fields.TYPE.toString()));
Assert
.assertNull(result.getSimpleField(IdealState.IdealStateProperty.REBALANCE_MODE.toString()));
}
http://git-wip-us.apache.org/repos/asf/helix/blob/ff958b19/helix-core/src/test/java/org/apache/helix/api/TestUpdateConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/api/TestUpdateConfig.java b/helix-core/src/test/java/org/apache/helix/api/TestUpdateConfig.java
index a8d1589..feed534 100644
--- a/helix-core/src/test/java/org/apache/helix/api/TestUpdateConfig.java
+++ b/helix-core/src/test/java/org/apache/helix/api/TestUpdateConfig.java
@@ -78,19 +78,16 @@ public class TestUpdateConfig {
Assert.assertFalse(updated.getDisabledPartitions().contains(partition1));
Assert.assertTrue(updated.getDisabledPartitions().contains(partition2));
Assert.assertTrue(updated.getDisabledPartitions().contains(partition3));
- Assert.assertNull(updated.getUserConfig().getSimpleField("key1"));
Assert.assertEquals(updated.getUserConfig().getSimpleField("key2"), "value2");
+ Assert.assertEquals(updated.getUserConfig().getSimpleField("key1"), "value1");
Assert.assertFalse(updated.isEnabled());
}
@Test
public void testResourceConfigUpdate() {
- final int OLD_BUCKET_SIZE = 0;
- final int NEW_BUCKET_SIZE = 1;
final ResourceId resourceId = ResourceId.from("resource");
- // start: add a user config, a semi auto rebalancer context, and set bucket size and batch
- // message mode
+ // start: add a user config, a semi auto rebalancer context
UserConfig userConfig = new UserConfig(Scope.resource(resourceId));
userConfig.setSimpleField("key1", "value1");
SemiAutoRebalancerConfig rebalancerContext =
@@ -101,21 +98,21 @@ public class TestUpdateConfig {
.userConfig(userConfig)
.rebalancerConfig(rebalancerContext)
.idealState(
- PartitionedRebalancerConfig.rebalancerConfigToIdealState(rebalancerContext, 0,
- false)).bucketSize(OLD_BUCKET_SIZE).batchMessageMode(true).build();
+ PartitionedRebalancerConfig
+ .rebalancerConfigToIdealState(rebalancerContext, 0, true)).build();
- // update: overwrite user config, change to full auto rebalancer context, and change the bucket
- // size
+ // update: overwrite user config, change to full auto rebalancer context
UserConfig newUserConfig = new UserConfig(Scope.resource(resourceId));
newUserConfig.setSimpleField("key2", "value2");
FullAutoRebalancerConfig newRebalancerContext =
- new FullAutoRebalancerConfig.Builder(resourceId).build();
+ new FullAutoRebalancerConfig.Builder(resourceId).stateModelDefId(
+ StateModelDefId.from("MasterSlave")).build();
ResourceConfig updated =
- new ResourceConfig.Delta(resourceId).setBucketSize(NEW_BUCKET_SIZE)
- .setUserConfig(newUserConfig).setRebalancerConfig(newRebalancerContext)
- .mergeInto(config);
- Assert.assertEquals(updated.getBucketSize(), NEW_BUCKET_SIZE);
- Assert.assertTrue(updated.getBatchMessageMode());
+ new ResourceConfig.Delta(resourceId)
+ .setUserConfig(newUserConfig)
+ .setIdealState(
+ PartitionedRebalancerConfig.rebalancerConfigToIdealState(newRebalancerContext, 0,
+ true)).setRebalancerConfig(newRebalancerContext).mergeInto(config);
Assert.assertNull(BasicRebalancerConfig.convert(updated.getRebalancerConfig(),
SemiAutoRebalancerConfig.class));
Assert.assertNotNull(BasicRebalancerConfig.convert(updated.getRebalancerConfig(),
@@ -127,36 +124,18 @@ public class TestUpdateConfig {
@Test
public void testClusterConfigUpdate() {
final ClusterId clusterId = ClusterId.from("cluster");
- final StateModelDefId masterSlave = StateModelDefId.from("MasterSlave");
- final State master = State.from("MASTER");
- final State slave = State.from("SLAVE");
- final State offline = State.from("OFFLINE");
-
- // start: add a user config, add master and slave constraints
+ // start: add a user config
UserConfig userConfig = new UserConfig(Scope.cluster(clusterId));
userConfig.setSimpleField("key1", "value1");
ClusterConfig config =
- new ClusterConfig.Builder(clusterId)
- .addStateUpperBoundConstraint(Scope.cluster(clusterId), masterSlave, master, 2)
- .addStateUpperBoundConstraint(Scope.cluster(clusterId), masterSlave, slave, 3)
- .userConfig(userConfig).autoJoin(true).build();
+ new ClusterConfig.Builder(clusterId).userConfig(userConfig).autoJoin(true).build();
- // update: overwrite user config, change master constraint, remove slave constraint, add offline
- // constraint, change auto join
+ // update: overwrite user config, change auto join
UserConfig newUserConfig = new UserConfig(Scope.cluster(clusterId));
newUserConfig.setSimpleField("key2", "value2");
ClusterConfig updated =
- new ClusterConfig.Delta(clusterId)
- .addStateUpperBoundConstraint(Scope.cluster(clusterId), masterSlave, master, 1)
- .removeStateUpperBoundConstraint(Scope.cluster(clusterId), masterSlave, slave)
- .addStateUpperBoundConstraint(Scope.cluster(clusterId), masterSlave, offline, "R")
- .setUserConfig(newUserConfig).setAutoJoin(false).mergeInto(config);
- Assert.assertEquals(
- updated.getStateUpperBoundConstraint(Scope.cluster(clusterId), masterSlave, master), "1");
- Assert.assertEquals(
- updated.getStateUpperBoundConstraint(Scope.cluster(clusterId), masterSlave, slave), "-1");
- Assert.assertEquals(
- updated.getStateUpperBoundConstraint(Scope.cluster(clusterId), masterSlave, offline), "R");
+ new ClusterConfig.Delta(clusterId).setUserConfig(newUserConfig).setAutoJoin(false)
+ .mergeInto(config);
Assert.assertNull(updated.getUserConfig().getSimpleField("key1"));
Assert.assertEquals(updated.getUserConfig().getSimpleField("key2"), "value2");
Assert.assertFalse(updated.autoJoinAllowed());
http://git-wip-us.apache.org/repos/asf/helix/blob/ff958b19/helix-core/src/test/java/org/apache/helix/api/accessor/TestAtomicAccessors.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/api/accessor/TestAtomicAccessors.java b/helix-core/src/test/java/org/apache/helix/api/accessor/TestAtomicAccessors.java
index 9122e62..fd5bc76 100644
--- a/helix-core/src/test/java/org/apache/helix/api/accessor/TestAtomicAccessors.java
+++ b/helix-core/src/test/java/org/apache/helix/api/accessor/TestAtomicAccessors.java
@@ -2,20 +2,15 @@ package org.apache.helix.api.accessor;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.helix.BaseDataAccessor;
import org.apache.helix.HelixDataAccessor;
-import org.apache.helix.ZNRecord;
import org.apache.helix.api.Scope;
-import org.apache.helix.api.State;
import org.apache.helix.api.config.ClusterConfig;
import org.apache.helix.api.config.UserConfig;
import org.apache.helix.api.id.ClusterId;
-import org.apache.helix.api.id.StateModelDefId;
import org.apache.helix.lock.HelixLock;
import org.apache.helix.lock.HelixLockable;
import org.apache.helix.lock.zk.ZKHelixLock;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
-import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.manager.zk.ZkClient;
import org.apache.helix.testutil.ZkTestBase;
import org.testng.Assert;
@@ -53,10 +48,6 @@ public class TestAtomicAccessors extends ZkTestBase {
final HelixDataAccessor helixAccessor =
new ZKHelixDataAccessor(clusterId.stringify(), _baseAccessor);
final LockProvider lockProvider = new LockProvider();
- final StateModelDefId stateModelDefId = StateModelDefId.from("FakeModel");
- final State state = State.from("fake");
- final int constraint1 = 10;
- final int constraint2 = 11;
final String key1 = "key1";
final String key2 = "key2";
@@ -72,10 +63,7 @@ public class TestAtomicAccessors extends ZkTestBase {
public void run() {
UserConfig userConfig = new UserConfig(Scope.cluster(clusterId));
userConfig.setBooleanField(key1, true);
- ClusterConfig.Delta delta =
- new ClusterConfig.Delta(clusterId).addStateUpperBoundConstraint(
- Scope.cluster(clusterId), stateModelDefId, state, constraint1).setUserConfig(
- userConfig);
+ ClusterConfig.Delta delta = new ClusterConfig.Delta(clusterId).setUserConfig(userConfig);
ClusterAccessor accessor =
new AtomicClusterAccessor(clusterId, helixAccessor, lockProvider);
accessor.updateCluster(delta);
@@ -88,10 +76,7 @@ public class TestAtomicAccessors extends ZkTestBase {
public void run() {
UserConfig userConfig = new UserConfig(Scope.cluster(clusterId));
userConfig.setBooleanField(key2, true);
- ClusterConfig.Delta delta =
- new ClusterConfig.Delta(clusterId).addStateUpperBoundConstraint(
- Scope.cluster(clusterId), stateModelDefId, state, constraint2).setUserConfig(
- userConfig);
+ ClusterConfig.Delta delta = new ClusterConfig.Delta(clusterId).setUserConfig(userConfig);
ClusterAccessor accessor =
new AtomicClusterAccessor(clusterId, helixAccessor, lockProvider);
accessor.updateCluster(delta);
http://git-wip-us.apache.org/repos/asf/helix/blob/ff958b19/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java b/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
index 8b26d86..5456884 100644
--- a/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
+++ b/helix-core/src/test/java/org/apache/helix/controller/stages/BaseStageTest.java
@@ -31,10 +31,8 @@ import org.apache.helix.HelixManager;
import org.apache.helix.Mocks;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.ZNRecord;
-import org.apache.helix.api.Resource;
import org.apache.helix.api.Scope;
import org.apache.helix.api.config.ResourceConfig;
-import org.apache.helix.api.config.ResourceConfig.ResourceType;
import org.apache.helix.api.config.UserConfig;
import org.apache.helix.api.id.ParticipantId;
import org.apache.helix.api.id.ResourceId;
@@ -170,11 +168,10 @@ public class BaseStageTest {
for (IdealState idealState : idealStates) {
ResourceId resourceId = idealState.getResourceId();
RebalancerConfig context = PartitionedRebalancerConfig.from(idealState);
- Resource resource =
- new Resource(resourceId, ResourceType.DATA, idealState, null, null, context, null,
- new UserConfig(Scope.resource(resourceId)), idealState.getBucketSize(),
- idealState.getBatchMessageMode());
- resourceMap.put(resourceId, resource.getConfig());
+ ResourceConfig resourceConfig =
+ new ResourceConfig.Builder(resourceId).idealState(idealState).rebalancerConfig(context)
+ .userConfig(new UserConfig(Scope.resource(resourceId))).build();
+ resourceMap.put(resourceId, resourceConfig);
}
return resourceMap;