You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by zz...@apache.org on 2013/09/04 22:15:00 UTC
[1/2] [HELIX-109] Review Helix model package,
first half of rebalance-pipeline refactor
Updated Branches:
refs/heads/helix-logical-model 75b534ddb -> 9c7de4c33
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9c7de4c3/helix-core/src/main/java/org/apache/helix/controller/stages/NewTaskAssignmentStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewTaskAssignmentStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewTaskAssignmentStage.java
new file mode 100644
index 0000000..95862ae
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewTaskAssignmentStage.java
@@ -0,0 +1,154 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerProperties;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.PropertyKey.Builder;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.Participant;
+import org.apache.helix.api.ParticipantId;
+import org.apache.helix.api.Resource;
+import org.apache.helix.api.ResourceId;
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Partition;
+import org.apache.log4j.Logger;
+
+public class NewTaskAssignmentStage extends AbstractBaseStage {
+ private static Logger logger = Logger.getLogger(NewTaskAssignmentStage.class);
+
+ @Override
+ public void process(ClusterEvent event) throws Exception {
+ long startTime = System.currentTimeMillis();
+ logger.info("START TaskAssignmentStage.process()");
+
+ HelixManager manager = event.getAttribute("helixmanager");
+ Map<ResourceId, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+ MessageThrottleStageOutput messageOutput =
+ event.getAttribute(AttributeName.MESSAGES_THROTTLE.toString());
+ Cluster cluster = event.getAttribute("ClusterDataCache");
+ Map<ParticipantId, Participant> liveParticipantMap = cluster.getLiveParticipantMap();
+
+ if (manager == null || resourceMap == null || messageOutput == null || cluster == null
+ || liveParticipantMap == null) {
+ throw new StageException("Missing attributes in event:" + event
+ + ". Requires HelixManager|RESOURCES|MESSAGES_THROTTLE|DataCache|liveInstanceMap");
+ }
+
+ HelixDataAccessor dataAccessor = manager.getHelixDataAccessor();
+ List<Message> messagesToSend = new ArrayList<Message>();
+ for (ResourceId resourceId : resourceMap.keySet()) {
+ Resource resource = resourceMap.get(resourceId);
+ // TODO fix it
+ // for (Partition partition : resource.getPartitions()) {
+ // List<Message> messages = messageOutput.getMessages(resourceName, partition);
+ // messagesToSend.addAll(messages);
+ // }
+ }
+
+ List<Message> outputMessages =
+ batchMessage(dataAccessor.keyBuilder(), messagesToSend, resourceMap, liveParticipantMap,
+ manager.getProperties());
+ sendMessages(dataAccessor, outputMessages);
+
+ long endTime = System.currentTimeMillis();
+ logger.info("END TaskAssignmentStage.process(). took: " + (endTime - startTime) + " ms");
+
+ }
+
+ List<Message> batchMessage(Builder keyBuilder, List<Message> messages,
+ Map<ResourceId, Resource> resourceMap, Map<ParticipantId, Participant> liveParticipantMap,
+ HelixManagerProperties properties) {
+ // group messages by its CurrentState path + "/" + fromState + "/" + toState
+ Map<String, Message> batchMessages = new HashMap<String, Message>();
+ List<Message> outputMessages = new ArrayList<Message>();
+
+ Iterator<Message> iter = messages.iterator();
+ while (iter.hasNext()) {
+ Message message = iter.next();
+ ResourceId resourceId = message.getResourceId();
+ Resource resource = resourceMap.get(resourceId.stringify());
+
+ String participantId = message.getTgtName();
+ Participant liveParticipant = liveParticipantMap.get(participantId);
+ String participantVersion = null;
+ if (liveParticipant != null) {
+ participantVersion = liveParticipant.getRunningInstance().getVersion().toString();
+ }
+
+ if (resource == null || !resource.getRebalancerConfig().getBatchMessageMode()
+ || participantVersion == null
+ || !properties.isFeatureSupported("batch_message", participantVersion)) {
+ outputMessages.add(message);
+ continue;
+ }
+
+ String key =
+ keyBuilder.currentState(message.getTgtName(), message.getTgtSessionId().stringify(),
+ message.getResourceId().stringify()).getPath()
+ + "/" + message.getFromState() + "/" + message.getToState();
+
+ if (!batchMessages.containsKey(key)) {
+ Message batchMessage = new Message(message.getRecord());
+ batchMessage.setBatchMessageMode(true);
+ outputMessages.add(batchMessage);
+ batchMessages.put(key, batchMessage);
+ }
+ batchMessages.get(key).addPartitionName(message.getPartitionId().stringify());
+ }
+
+ return outputMessages;
+ }
+
+ protected void sendMessages(HelixDataAccessor dataAccessor, List<Message> messages) {
+ if (messages == null || messages.isEmpty()) {
+ return;
+ }
+
+ Builder keyBuilder = dataAccessor.keyBuilder();
+
+ List<PropertyKey> keys = new ArrayList<PropertyKey>();
+ for (Message message : messages) {
+ logger.info("Sending Message " + message.getMsgId() + " to " + message.getTgtName()
+ + " transit " + message.getPartitionId() + "|" + message.getPartitionIds() + " from:"
+ + message.getFromState() + " to:" + message.getToState());
+
+ // System.out.println("[dbg] Sending Message " + message.getMsgId() + " to " +
+ // message.getTgtName()
+ // + " transit " + message.getPartitionName() + "|" + message.getPartitionNames()
+ // + " from: " + message.getFromState() + " to: " + message.getToState());
+
+ keys.add(keyBuilder.message(message.getTgtName(), message.getId()));
+ }
+
+ dataAccessor.createChildren(keys, new ArrayList<Message>(messages));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9c7de4c3/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java b/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
new file mode 100644
index 0000000..98ae60b
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/api/TestNewStages.java
@@ -0,0 +1,142 @@
+package org.apache.helix.api;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Date;
+import java.util.Map;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.mock.controller.ClusterController;
+import org.apache.helix.mock.participant.MockParticipant;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.ClusterStateVerifier.BestPossAndExtViewZkVerifier;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestNewStages extends ZkUnitTestBase {
+ final int n = 2;
+ final int p = 8;
+ MockParticipant[] _participants = new MockParticipant[n];
+ ClusterController _controller;
+
+ ClusterId _clusterId;
+ HelixDataAccessor _dataAccessor;
+
+ @Test
+ public void testReadClusterDataStage() {
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String testName = className + "_" + methodName;
+
+ System.out.println("START " + testName + " at " + new Date(System.currentTimeMillis()));
+
+ ClusterAccessor clusterAccessor = new ClusterAccessor(_clusterId, _dataAccessor);
+ Cluster cluster = clusterAccessor.readCluster();
+
+ ClusterId id = cluster.getId();
+ Assert.assertEquals(id, _clusterId);
+ Map<ParticipantId, Participant> liveParticipantMap = cluster.getLiveParticipantMap();
+ Assert.assertEquals(liveParticipantMap.size(), n);
+
+ for (ParticipantId participantId : liveParticipantMap.keySet()) {
+ Participant participant = liveParticipantMap.get(participantId);
+ Map<ResourceId, CurrentState> curStateMap = participant.getCurrentStateMap();
+ Assert.assertEquals(curStateMap.size(), 1);
+
+ ResourceId resourceId = new ResourceId("TestDB0");
+ Assert.assertTrue(curStateMap.containsKey(resourceId));
+ CurrentState curState = curStateMap.get(resourceId);
+ Map<PartitionId, State> partitionStateMap = curState.getPartitionStateMap();
+ Assert.assertEquals(partitionStateMap.size(), p);
+ }
+
+ Map<ResourceId, Resource> resourceMap = cluster.getResourceMap();
+ Assert.assertEquals(resourceMap.size(), 1);
+
+ ResourceId resourceId = new ResourceId("TestDB0");
+ Assert.assertTrue(resourceMap.containsKey(resourceId));
+ Resource resource = resourceMap.get(resourceId);
+ Assert
+ .assertEquals(resource.getRebalancerConfig().getRebalancerMode(), RebalanceMode.SEMI_AUTO);
+
+ System.out.println("END " + testName + " at " + new Date(System.currentTimeMillis()));
+ }
+
+
+ @BeforeClass
+ public void beforeClass() throws Exception {
+ // set up a running class
+ // Logger.getRootLogger().setLevel(Level.INFO);
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+ _clusterId = new ClusterId(clusterName);
+
+ System.out.println("START " + _clusterId + " at " + new Date(System.currentTimeMillis()));
+
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestDB", // resource name prefix
+ 1, // resources
+ p, // partitions per resource
+ n, // number of nodes
+ 2, // replicas
+ "MasterSlave", true); // do rebalance
+
+ _controller = new ClusterController(clusterName, "controller_0", ZK_ADDR);
+ _controller.syncStart();
+
+ // start participants
+ for (int i = 0; i < n; i++) {
+ String instanceName = "localhost_" + (12918 + i);
+
+ _participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null);
+ _participants[i].syncStart();
+ }
+
+ boolean result =
+ ClusterStateVerifier.verifyByZkCallback(new BestPossAndExtViewZkVerifier(ZK_ADDR,
+ clusterName));
+ Assert.assertTrue(result);
+
+ _dataAccessor =
+ new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_gZkClient));
+ }
+
+ @AfterClass
+ public void afterClass() {
+ // tear down the cluster
+ _controller.syncStop();
+ for (int i = 0; i < n; i++) {
+ _participants[i].syncStop();
+ }
+
+ System.out.println("END " + _clusterId + " at " + new Date(System.currentTimeMillis()));
+ }
+}
[2/2] git commit: [HELIX-109] Review Helix model package,
first half of rebalance-pipeline refactor
Posted by zz...@apache.org.
[HELIX-109] Review Helix model package, first half of rebalance-pipeline refactor
Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/9c7de4c3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/9c7de4c3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/9c7de4c3
Branch: refs/heads/helix-logical-model
Commit: 9c7de4c33ec57fc677a8b05153655a2306b14c68
Parents: 75b534d
Author: zzhang <zz...@apache.org>
Authored: Wed Sep 4 13:14:33 2013 -0700
Committer: zzhang <zz...@apache.org>
Committed: Wed Sep 4 13:14:33 2013 -0700
----------------------------------------------------------------------
.../src/main/java/org/apache/helix/api/Id.java | 15 +-
.../apache/helix/api/ParticipantAccessor.java | 2 +-
.../org/apache/helix/api/RebalancerConfig.java | 74 ++++-
.../java/org/apache/helix/api/Resource.java | 69 +++--
.../org/apache/helix/api/ResourceAccessor.java | 2 +
.../main/java/org/apache/helix/api/State.java | 6 +-
.../helix/api/StateModelDefinitionAccessor.java | 65 +++++
.../apache/helix/api/StateModelFactoryId.java | 34 +++
.../stages/NewBestPossibleStateCalcStage.java | 140 +++++++++
.../stages/NewBestPossibleStateOutput.java | 19 ++
.../stages/NewCurrentStateComputationStage.java | 143 +++++++++
.../stages/NewCurrentStateOutput.java | 242 ++++++++++++++++
.../stages/NewMessageGenerationPhase.java | 233 +++++++++++++++
.../stages/NewMessageSelectionStage.java | 290 +++++++++++++++++++
.../stages/NewMessageThrottleStage.java | 196 +++++++++++++
.../stages/NewRebalanceIdealStateStage.java | 84 ++++++
.../stages/NewResourceComputationStage.java | 108 +++++++
.../stages/NewTaskAssignmentStage.java | 154 ++++++++++
.../org/apache/helix/api/TestNewStages.java | 142 +++++++++
19 files changed, 1990 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9c7de4c3/helix-core/src/main/java/org/apache/helix/api/Id.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Id.java b/helix-core/src/main/java/org/apache/helix/api/Id.java
index 7024d22..6c9556a 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Id.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Id.java
@@ -20,7 +20,7 @@ package org.apache.helix.api;
*/
/**
- *
+ *
*/
public abstract class Id implements Comparable<Id> {
public abstract String stringify();
@@ -35,7 +35,7 @@ public abstract class Id implements Comparable<Id> {
if (that instanceof Id) {
return this.stringify().equals(((Id) that).stringify());
} else if (that instanceof String) {
- return this.stringify().equals((String) that);
+ return this.stringify().equals(that);
}
return false;
}
@@ -149,6 +149,17 @@ public abstract class Id implements Comparable<Id> {
}
/**
+ * @param stateModelFactoryId
+ * @return
+ */
+ public static StateModelFactoryId stateModelFactory(String stateModelFactoryId) {
+ if (stateModelFactoryId == null) {
+ return null;
+ }
+ return new StateModelFactoryId(stateModelFactoryId);
+ }
+
+ /**
* Get a concrete message id
* @param messageId string message identifier
* @return MsgId
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9c7de4c3/helix-core/src/main/java/org/apache/helix/api/ParticipantAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/ParticipantAccessor.java b/helix-core/src/main/java/org/apache/helix/api/ParticipantAccessor.java
index 4bc502e..d2ae927 100644
--- a/helix-core/src/main/java/org/apache/helix/api/ParticipantAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/ParticipantAccessor.java
@@ -321,7 +321,7 @@ public class ParticipantAccessor {
}
return new Participant(participantId, hostName, port, isEnabled, disabledPartitionIdSet, tags,
- runningInstance, null, msgMap);
+ runningInstance, curStateMap, msgMap);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9c7de4c3/helix-core/src/main/java/org/apache/helix/api/RebalancerConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/RebalancerConfig.java b/helix-core/src/main/java/org/apache/helix/api/RebalancerConfig.java
index e2e33eb..2baf63b 100644
--- a/helix-core/src/main/java/org/apache/helix/api/RebalancerConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/api/RebalancerConfig.java
@@ -35,9 +35,13 @@ public class RebalancerConfig {
private final int _replicaCount;
private final String _participantGroupTag;
private final int _maxPartitionsPerParticipant;
+ private final int _bucketSize;
+ private final boolean _batchMessageMode;
+ private final StateModelFactoryId _stateModelFactoryId;
public RebalancerConfig(RebalanceMode mode, RebalancerRef rebalancerRef,
- StateModelDefId stateModelDefId, ResourceAssignment resourceAssignment) {
+ StateModelDefId stateModelDefId, ResourceAssignment resourceAssignment, int bucketSize,
+ boolean batchMessageMode, StateModelFactoryId stateModelFactoryId) {
_rebalancerMode = mode;
_rebalancerRef = rebalancerRef;
_stateModelDefId = stateModelDefId;
@@ -46,6 +50,9 @@ public class RebalancerConfig {
_replicaCount = 0; // TODO: stub
_participantGroupTag = null; // TODO: stub
_maxPartitionsPerParticipant = Integer.MAX_VALUE; // TODO: stub
+ _bucketSize = bucketSize;
+ _batchMessageMode = batchMessageMode;
+ _stateModelFactoryId = stateModelFactoryId;
}
/**
@@ -114,6 +121,35 @@ public class RebalancerConfig {
}
/**
+ * Get bucket size
+ * @return bucket size
+ */
+ public int getBucketSize() {
+ return _bucketSize;
+ }
+
+ /**
+ * Get batch message mode
+ * @return true if in batch message mode, false otherwise
+ */
+ public boolean getBatchMessageMode() {
+ return _batchMessageMode;
+ }
+
+ /**
+ * Get state model factory id
+ * @return state model factory id
+ */
+ public StateModelFactoryId getStateModelFactoryId() {
+ return _stateModelFactoryId;
+ }
+
+ // TODO impl this
+ public String getRebalancerClassName() {
+ throw new UnsupportedOperationException("impl this");
+ }
+
+ /**
* Assembles a RebalancerConfig
*/
public static class Builder {
@@ -121,6 +157,9 @@ public class RebalancerConfig {
private RebalancerRef _rebalancerRef;
private StateModelDefId _stateModelDefId;
private ResourceAssignment _resourceAssignment;
+ private int _bucketSize;
+ private boolean _batchMessageMode;
+ private StateModelFactoryId _stateModelFactoryId;
/**
* Set the rebalancer mode
@@ -162,11 +201,42 @@ public class RebalancerConfig {
}
/**
+ * Set bucket size
+ * @param bucketSize
+ * @return Builder
+ */
+ public Builder bucketSize(int bucketSize) {
+ _bucketSize = bucketSize;
+ return this;
+ }
+
+ /**
+ * Set batch message mode
+ * @param batchMessageMode
+ * @return Builder
+ */
+ public Builder batchMessageMode(boolean batchMessageMode) {
+ _batchMessageMode = batchMessageMode;
+ return this;
+ }
+
+ /**
+ * Set state model factory
+ * @param stateModelFactoryId
+ * @return Builder
+ */
+ public Builder stateModelFactoryId(StateModelFactoryId stateModelFactoryId) {
+ _stateModelFactoryId = stateModelFactoryId;
+ return this;
+ }
+
+ /**
* Assemble a RebalancerConfig
* @return a fully defined rebalancer configuration
*/
public RebalancerConfig build() {
- return new RebalancerConfig(_mode, _rebalancerRef, _stateModelDefId, _resourceAssignment);
+ return new RebalancerConfig(_mode, _rebalancerRef, _stateModelDefId, _resourceAssignment,
+ _bucketSize, _batchMessageMode, _stateModelFactoryId);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9c7de4c3/helix-core/src/main/java/org/apache/helix/api/Resource.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/Resource.java b/helix-core/src/main/java/org/apache/helix/api/Resource.java
index a916025..f976fad 100644
--- a/helix-core/src/main/java/org/apache/helix/api/Resource.java
+++ b/helix-core/src/main/java/org/apache/helix/api/Resource.java
@@ -19,13 +19,16 @@ package org.apache.helix.api;
* under the License.
*/
+import java.util.HashMap;
import java.util.HashSet;
+import java.util.Map;
import java.util.Set;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.ResourceAssignment;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
/**
@@ -35,32 +38,28 @@ public class Resource {
private final ResourceId _id;
private final RebalancerConfig _rebalancerConfig;
- private final Set<Partition> _partitionSet;
+ private final Map<PartitionId, Partition> _partitionMap;
private final ExternalView _externalView;
private final ExternalView _pendingExternalView;
- // TODO move construct logic to ResourceAccessor
/**
* Construct a resource
* @param idealState
* @param currentStateMap map of participant-id to current state
*/
- public Resource(ResourceId id, IdealState idealState, ResourceAssignment rscAssignment) {
+ public Resource(ResourceId id, IdealState idealState, ResourceAssignment resourceAssignment) {
_id = id;
- // _rebalancerMode = idealState.getRebalanceMode();
- // _rebalancerRef = new RebalancerRef(idealState.getRebalancerClassName());
- // _stateModelDefId = new StateModelDefId(idealState.getStateModelDefRef());
- _rebalancerConfig = null;
+ _rebalancerConfig = new RebalancerConfig(idealState.getRebalanceMode(), idealState.getRebalancerRef(),
+ idealState.getStateModelDefId(), resourceAssignment, idealState.getBucketSize(),
+ idealState.getBatchMessageMode(), Id.stateModelFactory(
+ idealState.getStateModelFactoryName()));
- Set<Partition> partitionSet = new HashSet<Partition>();
+ Map<PartitionId, Partition> partitionMap = new HashMap<PartitionId, Partition>();
for (PartitionId partitionId : idealState.getPartitionSet()) {
- partitionSet.add(new Partition(partitionId));
+ partitionMap.put(partitionId, new Partition(partitionId));
}
- _partitionSet = ImmutableSet.copyOf(partitionSet);
-
- // TODO
- // _resourceAssignment = null;
+ _partitionMap = ImmutableMap.copyOf(partitionMap);
_externalView = null;
_pendingExternalView = null; // TODO: stub
@@ -74,10 +73,11 @@ public class Resource {
* @param pendingExternalView pending external view based on unprocessed messages
* @param rebalancerConfig configuration properties for rebalancing this resource
*/
- public Resource(ResourceId id, Set<Partition> partitionSet, ExternalView externalView,
+ public Resource(ResourceId id, Map<PartitionId, Partition> partitionMap,
+ ExternalView externalView,
ExternalView pendingExternalView, RebalancerConfig rebalancerConfig) {
_id = id;
- _partitionSet = ImmutableSet.copyOf(partitionSet);
+ _partitionMap = ImmutableMap.copyOf(partitionMap);
_externalView = externalView;
_pendingExternalView = pendingExternalView;
_rebalancerConfig = rebalancerConfig;
@@ -87,8 +87,25 @@ public class Resource {
* Get the set of partitions of the resource
* @return set of partitions or empty set if none
*/
+ public Map<PartitionId, Partition> getPartitionMap() {
+ return _partitionMap;
+ }
+
+ /**
+ * @param partitionId
+ * @return
+ */
+ public Partition getPartition(PartitionId partitionId) {
+ return _partitionMap.get(partitionId);
+ }
+
+ /**
+ * @return
+ */
public Set<Partition> getPartitionSet() {
- return _partitionSet;
+ Set<Partition> partitionSet = new HashSet<Partition>();
+ partitionSet.addAll(_partitionMap.values());
+ return ImmutableSet.copyOf(partitionSet);
}
/**
@@ -120,7 +137,7 @@ public class Resource {
*/
public static class Builder {
private final ResourceId _id;
- private final Set<Partition> _partitionSet;
+ private final Map<PartitionId, Partition> _partitionMap;
private ExternalView _externalView;
private ExternalView _pendingExternalView;
private RebalancerConfig _rebalancerConfig;
@@ -131,7 +148,7 @@ public class Resource {
*/
public Builder(ResourceId id) {
_id = id;
- _partitionSet = new HashSet<Partition>();
+ _partitionMap = new HashMap<PartitionId, Partition>();
}
/**
@@ -140,7 +157,19 @@ public class Resource {
* @return Builder
*/
public Builder addPartition(Partition partition) {
- _partitionSet.add(partition);
+ _partitionMap.put(partition.getId(), partition);
+ return this;
+ }
+
+ /**
+ * Add a set of partitions
+ * @param partitions
+ * @return Builder
+ */
+ public Builder addPartitions(Set<Partition> partitions) {
+ for (Partition partition : partitions) {
+ addPartition(partition);
+ }
return this;
}
@@ -179,7 +208,7 @@ public class Resource {
* @return instantiated Resource
*/
public Resource build() {
- return new Resource(_id, _partitionSet, _externalView, _pendingExternalView,
+ return new Resource(_id, _partitionMap, _externalView, _pendingExternalView,
_rebalancerConfig);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9c7de4c3/helix-core/src/main/java/org/apache/helix/api/ResourceAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/ResourceAccessor.java b/helix-core/src/main/java/org/apache/helix/api/ResourceAccessor.java
index ba7cd3d..c3bcb37 100644
--- a/helix-core/src/main/java/org/apache/helix/api/ResourceAccessor.java
+++ b/helix-core/src/main/java/org/apache/helix/api/ResourceAccessor.java
@@ -23,6 +23,7 @@ import org.apache.helix.HelixDataAccessor;
import org.apache.helix.PropertyKey;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
import org.apache.helix.model.ResourceAssignment;
public class ResourceAccessor {
@@ -69,4 +70,5 @@ public class ResourceAccessor {
public void dropExternalView(ResourceId resourceId) {
_accessor.removeProperty(_keyBuilder.externalView(resourceId.stringify()));
}
+
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9c7de4c3/helix-core/src/main/java/org/apache/helix/api/State.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/State.java b/helix-core/src/main/java/org/apache/helix/api/State.java
index b8c38ea..5d0af41 100644
--- a/helix-core/src/main/java/org/apache/helix/api/State.java
+++ b/helix-core/src/main/java/org/apache/helix/api/State.java
@@ -22,13 +22,13 @@ import org.apache.helix.HelixDefinedState;
*/
/**
- *
+ *
*/
public class State {
private final String _state;
public State(String state) {
- _state = state;
+ _state = state.toUpperCase();
}
@Override
@@ -41,7 +41,7 @@ public class State {
if (that instanceof State) {
return this.toString().equals(((State) that).toString());
} else if (that instanceof String) {
- return _state.equals((String) that);
+ return _state.equals(that);
}
return false;
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9c7de4c3/helix-core/src/main/java/org/apache/helix/api/StateModelDefinitionAccessor.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/StateModelDefinitionAccessor.java b/helix-core/src/main/java/org/apache/helix/api/StateModelDefinitionAccessor.java
new file mode 100644
index 0000000..89b1979
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/api/StateModelDefinitionAccessor.java
@@ -0,0 +1,65 @@
+package org.apache.helix.api;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.ImmutableMap;
+
+public class StateModelDefinitionAccessor {
+ private static Logger LOG = Logger.getLogger(StateModelDefinitionAccessor.class);
+
+ private final HelixDataAccessor _accessor;
+ private final PropertyKey.Builder _keyBuilder;
+ private final ClusterId _clusterId;
+
+ /**
+ * @param clusterId
+ * @param accessor
+ */
+ public StateModelDefinitionAccessor(ClusterId clusterId, HelixDataAccessor accessor) {
+ _accessor = accessor;
+ _keyBuilder = accessor.keyBuilder();
+ _clusterId = clusterId;
+ }
+
+ /**
+ * @return
+ */
+ public Map<StateModelDefId, StateModelDefinition> readStateModelDefinitions() {
+ Map<String, StateModelDefinition> stateModelDefs =
+ _accessor.getChildValuesMap(_keyBuilder.stateModelDefs());
+ Map<StateModelDefId, StateModelDefinition> stateModelDefMap =
+ new HashMap<StateModelDefId, StateModelDefinition>();
+
+ for (String stateModelDefName : stateModelDefs.keySet()) {
+ stateModelDefMap.put(new StateModelDefId(stateModelDefName),
+ stateModelDefs.get(stateModelDefName));
+ }
+
+ return ImmutableMap.copyOf(stateModelDefMap);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9c7de4c3/helix-core/src/main/java/org/apache/helix/api/StateModelFactoryId.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/api/StateModelFactoryId.java b/helix-core/src/main/java/org/apache/helix/api/StateModelFactoryId.java
new file mode 100644
index 0000000..37be836
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/api/StateModelFactoryId.java
@@ -0,0 +1,34 @@
+package org.apache.helix.api;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+public class StateModelFactoryId extends Id {
+ private final String _id;
+
+ public StateModelFactoryId(String id) {
+ _id = id;
+ }
+
+ @Override
+ public String stringify() {
+ return _id;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9c7de4c3/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
new file mode 100644
index 0000000..995bb74
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateCalcStage.java
@@ -0,0 +1,140 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Map;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.Id;
+import org.apache.helix.api.ParticipantId;
+import org.apache.helix.api.RebalancerConfig;
+import org.apache.helix.api.Resource;
+import org.apache.helix.api.ResourceId;
+import org.apache.helix.api.State;
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.controller.rebalancer.AutoRebalancer;
+import org.apache.helix.controller.rebalancer.CustomRebalancer;
+import org.apache.helix.controller.rebalancer.NewAutoRebalancer;
+import org.apache.helix.controller.rebalancer.NewCustomRebalancer;
+import org.apache.helix.controller.rebalancer.NewRebalancer;
+import org.apache.helix.controller.rebalancer.NewSemiAutoRebalancer;
+import org.apache.helix.controller.rebalancer.Rebalancer;
+import org.apache.helix.controller.rebalancer.SemiAutoRebalancer;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.util.HelixUtil;
+import org.apache.log4j.Logger;
+
+/**
+ * For partition compute best possible (instance,state) pair based on
+ * IdealState,StateModel,LiveInstance
+ */
+public class NewBestPossibleStateCalcStage extends AbstractBaseStage {
+ private static final Logger LOG = Logger.getLogger(NewBestPossibleStateCalcStage.class
+ .getName());
+
+ @Override
+ public void process(ClusterEvent event) throws Exception {
+ long startTime = System.currentTimeMillis();
+ LOG.info("START BestPossibleStateCalcStage.process()");
+
+ NewCurrentStateOutput currentStateOutput =
+ event.getAttribute(AttributeName.CURRENT_STATE.toString());
+ Map<ResourceId, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+ Cluster cluster = event.getAttribute("ClusterDataCache");
+
+ if (currentStateOutput == null || resourceMap == null || cluster == null) {
+ throw new StageException("Missing attributes in event:" + event
+ + ". Requires CURRENT_STATE|RESOURCES|DataCache");
+ }
+
+ NewBestPossibleStateOutput bestPossibleStateOutput =
+ compute(cluster, event, resourceMap, currentStateOutput);
+ event.addAttribute(AttributeName.BEST_POSSIBLE_STATE.toString(), bestPossibleStateOutput);
+
+ long endTime = System.currentTimeMillis();
+ LOG.info("END BestPossibleStateCalcStage.process(). took: " + (endTime - startTime) + " ms");
+ }
+
+ // TODO check this
+ private NewBestPossibleStateOutput compute(Cluster cluster, ClusterEvent event,
+ Map<ResourceId, Resource> resourceMap, NewCurrentStateOutput currentStateOutput) {
+ // for each ideal state
+ // read the state model def
+ // for each resource
+ // get the preference list
+ // for each instanceName check if its alive then assign a state
+ // ClusterDataCache cache = event.getAttribute("ClusterDataCache");
+
+ NewBestPossibleStateOutput output = new NewBestPossibleStateOutput();
+
+ for (ResourceId resourceId : resourceMap.keySet()) {
+ LOG.debug("Processing resource:" + resourceId);
+
+ Resource resource = resourceMap.get(resourceId);
+ // Ideal state may be gone. In that case we need to get the state model name
+ // from the current state
+ // IdealState idealState = cache.getIdealState(resourceName);
+
+ Resource existResource = cluster.getResource(resourceId);
+ if (existResource == null) {
+ // if ideal state is deleted, use an empty one
+ LOG.info("resource:" + resourceId + " does not exist anymore");
+ // TODO
+ // existResource = new Resource();
+ }
+
+ RebalancerConfig rebalancerConfig = existResource.getRebalancerConfig();
+ NewRebalancer rebalancer = null;
+ if (rebalancerConfig.getRebalancerMode() == RebalanceMode.USER_DEFINED
+ && rebalancerConfig.getRebalancerClassName() != null) {
+ String rebalancerClassName = rebalancerConfig.getRebalancerClassName();
+ LOG.info("resource " + resourceId + " use idealStateRebalancer " + rebalancerClassName);
+ try {
+ rebalancer =
+ (NewRebalancer) (HelixUtil.loadClass(getClass(), rebalancerClassName).newInstance());
+ } catch (Exception e) {
+ LOG.warn("Exception while invoking custom rebalancer class:" + rebalancerClassName, e);
+ }
+ }
+ if (rebalancer == null) {
+ if (rebalancerConfig.getRebalancerMode() == RebalanceMode.FULL_AUTO) {
+ rebalancer = new NewAutoRebalancer();
+ } else if (rebalancerConfig.getRebalancerMode() == RebalanceMode.SEMI_AUTO) {
+ rebalancer = new NewSemiAutoRebalancer();
+ } else {
+ rebalancer = new NewCustomRebalancer();
+ }
+ }
+
+ // TODO pass state model definition
+ ResourceAssignment resourceAssignment =
+ rebalancer.computeResourceMapping(resource, cluster, null);
+
+ output.setResourceAssignment(resourceId, resourceAssignment);
+ }
+
+ return output;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9c7de4c3/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateOutput.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateOutput.java
new file mode 100644
index 0000000..474f463
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewBestPossibleStateOutput.java
@@ -0,0 +1,19 @@
+package org.apache.helix.controller.stages;
+
+import java.util.Map;
+
+import org.apache.helix.api.ResourceId;
+import org.apache.helix.model.ResourceAssignment;
+
+public class NewBestPossibleStateOutput {
+
+ Map<ResourceId, ResourceAssignment> _resourceAssignmentMap;
+
+ /**
+ * @param resourceId
+ * @param resourceAssignment
+ */
+ public void setResourceAssignment(ResourceId resourceId, ResourceAssignment resourceAssignment) {
+ _resourceAssignmentMap.put(resourceId, resourceAssignment);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9c7de4c3/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateComputationStage.java
new file mode 100644
index 0000000..0f8c33e
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateComputationStage.java
@@ -0,0 +1,143 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.MessageId;
+import org.apache.helix.api.Participant;
+import org.apache.helix.api.ParticipantId;
+import org.apache.helix.api.Partition;
+import org.apache.helix.api.PartitionId;
+import org.apache.helix.api.Resource;
+import org.apache.helix.api.ResourceId;
+import org.apache.helix.api.SessionId;
+import org.apache.helix.api.State;
+import org.apache.helix.api.StateModelDefId;
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.MessageType;
+
+/**
+ * For each LiveInstances select currentState and message whose sessionId matches
+ * sessionId from LiveInstance Get Partition,State for all the resources computed in
+ * previous State [ResourceComputationStage]
+ */
+public class NewCurrentStateComputationStage extends AbstractBaseStage {
+ @Override
+ public void process(ClusterEvent event) throws Exception {
+ Cluster cluster = event.getAttribute("ClusterDataCache");
+ Map<ResourceId, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+
+ if (cluster == null || resourceMap == null) {
+ throw new StageException("Missing attributes in event:" + event
+ + ". Requires DataCache|RESOURCE");
+ }
+
+ NewCurrentStateOutput currentStateOutput = new NewCurrentStateOutput();
+
+ for (Participant liveParticipant : cluster.getLiveParticipantMap().values()) {
+ ParticipantId participantId = liveParticipant.getId();
+
+ // add pending messages
+ Map<MessageId, Message> instanceMsgs = liveParticipant.getMessageMap();
+ for (Message message : instanceMsgs.values()) {
+ if (!MessageType.STATE_TRANSITION.toString().equalsIgnoreCase(message.getMsgType())) {
+ continue;
+ }
+
+ if (!liveParticipant.getRunningInstance().getSessionId().equals(message.getTgtSessionId())) {
+ continue;
+ }
+
+ ResourceId resourceId = message.getResourceId();
+ Resource resource = resourceMap.get(resourceId);
+ if (resource == null) {
+ continue;
+ }
+
+ if (!message.getBatchMessageMode()) {
+ PartitionId partitionId = message.getPartitionId();
+ Partition partition = resource.getPartition(partitionId);
+ if (partition != null) {
+ currentStateOutput.setPendingState(resourceId, partitionId, participantId,
+ message.getToState());
+ } else {
+ // log
+ }
+ } else {
+ List<PartitionId> partitionNames = message.getPartitionIds();
+ if (!partitionNames.isEmpty()) {
+ for (PartitionId partitionId : partitionNames) {
+ Partition partition = resource.getPartition(partitionId);
+ if (partition != null) {
+ currentStateOutput.setPendingState(resourceId, partitionId,
+ participantId,
+ message.getToState());
+ } else {
+ // log
+ }
+ }
+ }
+ }
+ }
+
+ // add current state
+ SessionId sessionId = liveParticipant.getRunningInstance().getSessionId();
+ Map<ResourceId, CurrentState> curStateMap = liveParticipant.getCurrentStateMap();
+ for (CurrentState curState : curStateMap.values()) {
+ if (!sessionId.equals(curState.getSessionId())) {
+ continue;
+ }
+
+ ResourceId resourceId = curState.getResourceId();
+ StateModelDefId stateModelDefId = curState.getStateModelDefId();
+ Resource resource = resourceMap.get(resourceId);
+ if (resource == null) {
+ continue;
+ }
+
+ if (stateModelDefId != null) {
+ currentStateOutput.setResourceStateModelDef(resourceId, stateModelDefId);
+ }
+
+ currentStateOutput.setBucketSize(resourceId, curState.getBucketSize());
+
+ Map<PartitionId, State> partitionStateMap = curState.getPartitionStateMap();
+ for (PartitionId partitionId : partitionStateMap.keySet()) {
+ Partition partition = resource.getPartition(partitionId);
+ if (partition != null) {
+ currentStateOutput.setCurrentState(resourceId, partitionId, participantId,
+ curState.getState(partitionId));
+
+ } else {
+ // log
+ }
+ }
+ }
+ }
+
+ event.addAttribute(AttributeName.CURRENT_STATE.toString(), currentStateOutput);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9c7de4c3/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateOutput.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateOutput.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateOutput.java
new file mode 100644
index 0000000..417512e
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewCurrentStateOutput.java
@@ -0,0 +1,242 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.api.ParticipantId;
+import org.apache.helix.api.PartitionId;
+import org.apache.helix.api.ResourceId;
+import org.apache.helix.api.State;
+import org.apache.helix.api.StateModelDefId;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.Partition;
+
+public class NewCurrentStateOutput {
+ /**
+ * map of resource-id to map of partition-id to map of participant-id to state
+ * represent current-state for the participant
+ */
+ private final Map<ResourceId, Map<PartitionId, Map<ParticipantId, State>>> _currentStateMap;
+
+ /**
+ * map of resource-id to map of partition-id to map of participant-id to state
+ * represent pending messages for the participant
+ */
+ private final Map<ResourceId, Map<PartitionId, Map<ParticipantId, State>>> _pendingStateMap;
+
+ /**
+ * map of resource-id to state model definition id
+ */
+ private final Map<ResourceId, StateModelDefId> _resourceStateModelMap;
+
+ /**
+ * map of resource-id to current-state config's
+ */
+ private final Map<ResourceId, CurrentState> _curStateMetaMap;
+
+ /**
+ * construct
+ */
+ public NewCurrentStateOutput() {
+ _currentStateMap = new HashMap<ResourceId, Map<PartitionId, Map<ParticipantId, State>>>();
+ _pendingStateMap = new HashMap<ResourceId, Map<PartitionId, Map<ParticipantId, State>>>();
+ _resourceStateModelMap = new HashMap<ResourceId, StateModelDefId>();
+ _curStateMetaMap = new HashMap<ResourceId, CurrentState>();
+
+ }
+
+ /**
+ * @param resourceId
+ * @param stateModelDefId
+ */
+ public void setResourceStateModelDef(ResourceId resourceId, StateModelDefId stateModelDefId) {
+ _resourceStateModelMap.put(resourceId, stateModelDefId);
+ }
+
+ /**
+ * @param resourceId
+ * @return
+ */
+ public StateModelDefId getResourceStateModelDef(ResourceId resourceId) {
+ return _resourceStateModelMap.get(resourceId);
+ }
+
+ /**
+ * @param resourceId
+ * @param bucketSize
+ */
+ public void setBucketSize(ResourceId resourceId, int bucketSize) {
+ CurrentState curStateMeta = _curStateMetaMap.get(resourceId);
+ if (curStateMeta == null) {
+ curStateMeta = new CurrentState(resourceId);
+ _curStateMetaMap.put(resourceId, curStateMeta);
+ }
+ curStateMeta.setBucketSize(bucketSize);
+ }
+
+ /**
+ * @param resourceId
+ * @return
+ */
+ public int getBucketSize(ResourceId resourceId) {
+ int bucketSize = 0;
+ CurrentState curStateMeta = _curStateMetaMap.get(resourceId);
+ if (curStateMeta != null) {
+ bucketSize = curStateMeta.getBucketSize();
+ }
+
+ return bucketSize;
+ }
+
+ /**
+ * @param stateMap
+ * @param resourceId
+ * @param partitionId
+ * @param participantId
+ * @param state
+ */
+ static void setStateMap(Map<ResourceId, Map<PartitionId, Map<ParticipantId, State>>> stateMap,
+ ResourceId resourceId, PartitionId partitionId, ParticipantId participantId, State state) {
+ if (!stateMap.containsKey(resourceId)) {
+ stateMap.put(resourceId, new HashMap<PartitionId, Map<ParticipantId, State>>());
+ }
+
+ if (!stateMap.get(resourceId).containsKey(partitionId)) {
+ stateMap.get(resourceId).put(partitionId, new HashMap<ParticipantId, State>());
+ }
+ stateMap.get(resourceId).get(partitionId).put(participantId, state);
+ }
+
+ /**
+ * @param stateMap
+ * @param resourceId
+ * @param partitionId
+ * @param participantId
+ * @return state
+ */
+ static State getState(Map<ResourceId, Map<PartitionId, Map<ParticipantId, State>>> stateMap,
+ ResourceId resourceId, PartitionId partitionId, ParticipantId participantId) {
+ Map<PartitionId, Map<ParticipantId, State>> map = stateMap.get(resourceId);
+ if (map != null) {
+ Map<ParticipantId, State> instanceStateMap = map.get(partitionId);
+ if (instanceStateMap != null) {
+ return instanceStateMap.get(participantId);
+ }
+ }
+ return null;
+
+ }
+
+ /**
+ * @param stateMap
+ * @param resourceId
+ * @param partitionId
+ * @return
+ */
+ static Map<ParticipantId, State> getStateMap(
+ Map<ResourceId, Map<PartitionId, Map<ParticipantId, State>>> stateMap, ResourceId resourceId,
+ PartitionId partitionId) {
+ if (stateMap.containsKey(resourceId)) {
+ Map<PartitionId, Map<ParticipantId, State>> map = stateMap.get(resourceId);
+ if (map.containsKey(partitionId)) {
+ return map.get(partitionId);
+ }
+ }
+ return Collections.emptyMap();
+ }
+
+ /**
+ * @param resourceId
+ * @param partitionId
+ * @param participantId
+ * @param state
+ */
+ public void setCurrentState(ResourceId resourceId, PartitionId partitionId,
+ ParticipantId participantId, State state) {
+ setStateMap(_currentStateMap, resourceId, partitionId, participantId, state);
+ }
+
+ /**
+ * @param resourceId
+ * @param partitionId
+ * @param participantId
+ * @param state
+ */
+ public void setPendingState(ResourceId resourceId, PartitionId partitionId,
+ ParticipantId participantId, State state) {
+ setStateMap(_pendingStateMap, resourceId, partitionId, participantId, state);
+ }
+
+ /**
+ * given (resource, partition, instance), returns currentState
+ * @param resourceName
+ * @param partition
+ * @param instanceName
+ * @return
+ */
+ public State getCurrentState(ResourceId resourceId, PartitionId partitionId,
+ ParticipantId participantId) {
+ return getState(_currentStateMap, resourceId, partitionId, participantId);
+ }
+
+ /**
+ * given (resource, partition, instance), returns toState
+ * @param resourceName
+ * @param partition
+ * @param instanceName
+ * @return
+ */
+ public State getPendingState(ResourceId resourceId, PartitionId partitionId,
+ ParticipantId participantId) {
+ return getState(_pendingStateMap, resourceId, partitionId, participantId);
+ }
+
+ /**
+ * @param resourceId
+ * @param partitionId
+ * @return
+ */
+ public Map<ParticipantId, State> getCurrentStateMap(ResourceId resourceId, PartitionId partitionId) {
+ return getStateMap(_currentStateMap, resourceId, partitionId);
+ }
+
+ /**
+ * @param resourceId
+ * @param partitionId
+ * @return
+ */
+ public Map<ParticipantId, State> getPendingStateMap(ResourceId resourceId, PartitionId partitionId) {
+ return getStateMap(_currentStateMap, resourceId, partitionId);
+
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("current state= ").append(_currentStateMap);
+ sb.append(", pending state= ").append(_pendingStateMap);
+ return sb.toString();
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9c7de4c3/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationPhase.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationPhase.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationPhase.java
new file mode 100644
index 0000000..0fdfe56
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageGenerationPhase.java
@@ -0,0 +1,233 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.Id;
+import org.apache.helix.api.MessageId;
+import org.apache.helix.api.ParticipantId;
+import org.apache.helix.api.Partition;
+import org.apache.helix.api.PartitionId;
+import org.apache.helix.api.Resource;
+import org.apache.helix.api.ResourceId;
+import org.apache.helix.api.SessionId;
+import org.apache.helix.api.State;
+import org.apache.helix.api.StateModelDefId;
+import org.apache.helix.api.StateModelFactoryId;
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.manager.zk.DefaultSchedulerMessageHandlerFactory;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.MessageState;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.log4j.Logger;
+
+/**
+ * Compares the currentState, pendingState with IdealState and generate messages
+ */
+public class NewMessageGenerationPhase extends AbstractBaseStage {
+ private static Logger LOG = Logger.getLogger(NewMessageGenerationPhase.class);
+
+ @Override
+ public void process(ClusterEvent event) throws Exception {
+ HelixManager manager = event.getAttribute("helixmanager");
+ Cluster cluster = event.getAttribute("ClusterDataCache");
+ Map<ResourceId, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+ NewCurrentStateOutput currentStateOutput =
+ event.getAttribute(AttributeName.CURRENT_STATE.toString());
+ BestPossibleStateOutput bestPossibleStateOutput =
+ event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.toString());
+ if (manager == null || cluster == null || resourceMap == null || currentStateOutput == null
+ || bestPossibleStateOutput == null) {
+ throw new StageException("Missing attributes in event:" + event
+ + ". Requires HelixManager|DataCache|RESOURCES|CURRENT_STATE|BEST_POSSIBLE_STATE");
+ }
+
+ // Map<String, LiveInstance> liveInstances = cache.getLiveInstances();
+ // Map<String, String> sessionIdMap = new HashMap<String, String>();
+
+ // for (LiveInstance liveInstance : liveInstances.values()) {
+ // sessionIdMap.put(liveInstance.getInstanceName(), liveInstance.getSessionId().stringify());
+ // }
+ MessageGenerationOutput output = new MessageGenerationOutput();
+
+ for (ResourceId resourceId : resourceMap.keySet()) {
+ Resource resource = resourceMap.get(resourceId);
+ int bucketSize = resource.getRebalancerConfig().getBucketSize();
+
+ // TODO fix it
+ StateModelDefinition stateModelDef = null;
+ // cache.getStateModelDef(resource.getStateModelDefRef());
+
+ for (PartitionId partitionId : resource.getPartitionMap().keySet()) {
+ // TODO fix it
+ Map<ParticipantId, State> instanceStateMap = null;
+ // bestPossibleStateOutput.getInstanceStateMap(resourceId, partition);
+
+ // we should generate message based on the desired-state priority
+ // so keep generated messages in a temp map keyed by state
+ // desired-state->list of generated-messages
+ Map<State, List<Message>> messageMap = new HashMap<State, List<Message>>();
+
+ for (ParticipantId participantId : instanceStateMap.keySet()) {
+ State desiredState = instanceStateMap.get(participantId);
+
+ State currentState =
+ currentStateOutput.getCurrentState(resourceId, partitionId, participantId);
+ if (currentState == null) {
+ // TODO fix it
+ // currentState = stateModelDef.getInitialStateString();
+ }
+
+ if (desiredState.equals(currentState)) {
+ continue;
+ }
+
+ State pendingState =
+ currentStateOutput.getPendingState(resourceId, partitionId, participantId);
+
+ // TODO fix it
+ State nextState = new State("");
+ // stateModelDef.getNextStateForTransition(currentState, desiredState);
+ if (nextState == null) {
+ LOG.error("Unable to find a next state for partition: " + partitionId
+ + " from stateModelDefinition"
+ + stateModelDef.getClass() + " from:" + currentState + " to:" + desiredState);
+ continue;
+ }
+
+ if (pendingState != null) {
+ if (nextState.equals(pendingState)) {
+ LOG.debug("Message already exists for " + participantId + " to transit "
+ + partitionId + " from " + currentState + " to " + nextState);
+ } else if (currentState.equals(pendingState)) {
+ LOG.info("Message hasn't been removed for " + participantId + " to transit"
+ + partitionId + " to " + pendingState + ", desiredState: "
+ + desiredState);
+ } else {
+ LOG.info("IdealState changed before state transition completes for " + partitionId
+ + " on " + participantId + ", pendingState: "
+ + pendingState + ", currentState: " + currentState + ", nextState: " + nextState);
+ }
+ } else {
+ // TODO check if instance is alive
+ SessionId sessionId =
+ cluster.getLiveParticipantMap().get(participantId).getRunningInstance()
+ .getSessionId();
+ Message message =
+ createMessage(manager, resourceId, partitionId, participantId, currentState,
+ nextState, sessionId, new StateModelDefId(stateModelDef.getId()), resource
+ .getRebalancerConfig()
+ .getStateModelFactoryId(), bucketSize);
+
+ // TODO fix this
+ // IdealState idealState = cache.getIdealState(resourceName);
+ // if (idealState != null
+ // && idealState.getStateModelDefRef().equalsIgnoreCase(
+ // DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
+ // if (idealState.getRecord().getMapField(partition.getPartitionName()) != null) {
+ // message.getRecord().setMapField(Message.Attributes.INNER_MESSAGE.toString(),
+ // idealState.getRecord().getMapField(partition.getPartitionName()));
+ // }
+ // }
+ // Set timeout of needed
+ // String stateTransition =
+ // currentState + "-" + nextState + "_" + Message.Attributes.TIMEOUT;
+ // if (idealState != null) {
+ // String timeOutStr = idealState.getRecord().getSimpleField(stateTransition);
+ // if (timeOutStr == null
+ // && idealState.getStateModelDefRef().equalsIgnoreCase(
+ // DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE)) {
+ // // scheduled task queue
+ // if (idealState.getRecord().getMapField(partition.getPartitionName()) != null) {
+ // timeOutStr =
+ // idealState.getRecord().getMapField(partition.getPartitionName())
+ // .get(Message.Attributes.TIMEOUT.toString());
+ // }
+ // }
+ // if (timeOutStr != null) {
+ // try {
+ // int timeout = Integer.parseInt(timeOutStr);
+ // if (timeout > 0) {
+ // message.setExecutionTimeout(timeout);
+ // }
+ // } catch (Exception e) {
+ // logger.error("", e);
+ // }
+ // }
+ // }
+ // message.getRecord().setSimpleField("ClusterEventName", event.getName());
+
+ if (!messageMap.containsKey(desiredState)) {
+ messageMap.put(desiredState, new ArrayList<Message>());
+ }
+ messageMap.get(desiredState).add(message);
+ }
+ }
+
+ // add generated messages to output according to state priority
+ List<String> statesPriorityList = stateModelDef.getStatesPriorityStringList();
+ for (String state : statesPriorityList) {
+ if (messageMap.containsKey(state)) {
+ for (Message message : messageMap.get(state)) {
+ // TODO fix it
+ // output.addMessage(resourceId, partitionId, message);
+ }
+ }
+ }
+
+ } // end of for-each-partition
+ }
+ event.addAttribute(AttributeName.MESSAGES_ALL.toString(), output);
+ }
+
+ private Message createMessage(HelixManager manager, ResourceId resourceId,
+ PartitionId partitionId, ParticipantId participantId, State currentState, State nextState,
+ SessionId sessionId, StateModelDefId stateModelDefId,
+ StateModelFactoryId stateModelFactoryId, int bucketSize) {
+ // MessageId uuid = Id.message(UUID.randomUUID().toString());
+ // Message message = new Message(MessageType.STATE_TRANSITION, uuid);
+ // message.setSrcName(manager.getInstanceName());
+ // message.setTgtName(instanceName);
+ // message.setMsgState(MessageState.NEW);
+ // message.setPartitionId(Id.partition(partitionName));
+ // message.setResourceId(Id.resource(resourceName));
+ // message.setFromState(State.from(currentState));
+ // message.setToState(State.from(nextState));
+ // message.setTgtSessionId(Id.session(sessionId));
+ // message.setSrcSessionId(Id.session(manager.getSessionId()));
+ // message.setStateModelDef(Id.stateModelDef(stateModelDefName));
+ // message.setStateModelFactoryName(stateModelFactoryName);
+ // message.setBucketSize(bucketSize);
+ //
+ // return message;
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9c7de4c3/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageSelectionStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageSelectionStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageSelectionStage.java
new file mode 100644
index 0000000..4038c69
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageSelectionStage.java
@@ -0,0 +1,290 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.RebalancerConfig;
+import org.apache.helix.api.Resource;
+import org.apache.helix.api.ResourceId;
+import org.apache.helix.api.State;
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.log4j.Logger;
+
+public class NewMessageSelectionStage extends AbstractBaseStage {
+ private static final Logger LOG = Logger.getLogger(NewMessageSelectionStage.class);
+
+ public static class Bounds {
+ private int upper;
+ private int lower;
+
+ public Bounds(int lower, int upper) {
+ this.lower = lower;
+ this.upper = upper;
+ }
+
+ public void increaseUpperBound() {
+ upper++;
+ }
+
+ public void increaseLowerBound() {
+ lower++;
+ }
+
+ public void decreaseUpperBound() {
+ upper--;
+ }
+
+ public void decreaseLowerBound() {
+ lower--;
+ }
+
+ public int getLowerBound() {
+ return lower;
+ }
+
+ public int getUpperBound() {
+ return upper;
+ }
+ }
+
+ @Override
+ public void process(ClusterEvent event) throws Exception {
+ Cluster cluster = event.getAttribute("ClusterDataCache");
+ Map<ResourceId, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+ NewCurrentStateOutput currentStateOutput =
+ event.getAttribute(AttributeName.CURRENT_STATE.toString());
+ MessageGenerationOutput messageGenOutput =
+ event.getAttribute(AttributeName.MESSAGES_ALL.toString());
+ if (cluster == null || resourceMap == null || currentStateOutput == null
+ || messageGenOutput == null) {
+ throw new StageException("Missing attributes in event:" + event
+ + ". Requires DataCache|RESOURCES|CURRENT_STATE|MESSAGES_ALL");
+ }
+
+ MessageSelectionStageOutput output = new MessageSelectionStageOutput();
+
+ for (ResourceId resourceId : resourceMap.keySet()) {
+ Resource resource = resourceMap.get(resourceId);
+ // TODO fix it
+ StateModelDefinition stateModelDef = null;
+ // cache.getStateModelDef(resource.getStateModelDefRef());
+
+ Map<String, Integer> stateTransitionPriorities = getStateTransitionPriorityMap(stateModelDef);
+ // IdealState idealState = cache.getIdealState(resourceName);
+ Map<String, Bounds> stateConstraints =
+ computeStateConstraints(stateModelDef, resource.getRebalancerConfig(), cluster);
+
+ // TODO fix it
+ // for (Partition partition : resource.getPartitions()) {
+ // List<Message> messages = messageGenOutput.getMessages(resourceId.stringify(), partition);
+ // List<Message> selectedMessages =
+ // selectMessages(cache.getLiveInstances(),
+ // currentStateOutput.getCurrentStateMap(resourceName, partition),
+ // currentStateOutput.getPendingStateMap(resourceName, partition), messages,
+ // stateConstraints, stateTransitionPriorities, stateModelDef.getInitialStateString());
+ // output.addMessages(resourceId.stringify(), partition, selectedMessages);
+ // }
+ }
+ event.addAttribute(AttributeName.MESSAGES_SELECTED.toString(), output);
+ }
+
+ // TODO: This method deserves its own class. The class should not understand helix but
+ // just be
+ // able to solve the problem using the algo. I think the method is following that but if
+ // we don't move it to another class its quite easy to break that contract
+ /**
+ * greedy message selection algorithm: 1) calculate CS+PS state lower/upper-bounds 2)
+ * group messages by state transition and sorted by priority 3) from highest priority to
+ * lowest, for each message group with the same transition add message one by one and
+ * make sure state constraint is not violated update state lower/upper-bounds when a new
+ * message is selected
+ * @param currentStates
+ * @param pendingStates
+ * @param messages
+ * @param stateConstraints
+ * : STATE -> bound (lower:upper)
+ * @param stateTransitionPriorities
+ * : FROME_STATE-TO_STATE -> priority
+ * @return: selected messages
+ */
+ List<Message> selectMessages(Map<String, LiveInstance> liveInstances,
+ Map<String, String> currentStates, Map<String, String> pendingStates, List<Message> messages,
+ Map<String, Bounds> stateConstraints, final Map<String, Integer> stateTransitionPriorities,
+ String initialState) {
+ if (messages == null || messages.isEmpty()) {
+ return Collections.emptyList();
+ }
+
+ List<Message> selectedMessages = new ArrayList<Message>();
+ Map<String, Bounds> bounds = new HashMap<String, Bounds>();
+
+ // count currentState, if no currentState, count as in initialState
+ for (String instance : liveInstances.keySet()) {
+ String state = initialState;
+ if (currentStates.containsKey(instance)) {
+ state = currentStates.get(instance);
+ }
+
+ if (!bounds.containsKey(state)) {
+ bounds.put(state, new Bounds(0, 0));
+ }
+ bounds.get(state).increaseLowerBound();
+ bounds.get(state).increaseUpperBound();
+ }
+
+ // count pendingStates
+ for (String instance : pendingStates.keySet()) {
+ String state = pendingStates.get(instance);
+ if (!bounds.containsKey(state)) {
+ bounds.put(state, new Bounds(0, 0));
+ }
+ // TODO: add lower bound, need to refactor pendingState to include fromState also
+ bounds.get(state).increaseUpperBound();
+ }
+
+ // group messages based on state transition priority
+ Map<Integer, List<Message>> messagesGroupByStateTransitPriority =
+ new TreeMap<Integer, List<Message>>();
+ for (Message message : messages) {
+ State fromState = message.getFromState();
+ State toState = message.getToState();
+ String transition = fromState + "-" + toState;
+ int priority = Integer.MAX_VALUE;
+
+ if (stateTransitionPriorities.containsKey(transition)) {
+ priority = stateTransitionPriorities.get(transition);
+ }
+
+ if (!messagesGroupByStateTransitPriority.containsKey(priority)) {
+ messagesGroupByStateTransitPriority.put(priority, new ArrayList<Message>());
+ }
+ messagesGroupByStateTransitPriority.get(priority).add(message);
+ }
+
+ // select messages
+ for (List<Message> messageList : messagesGroupByStateTransitPriority.values()) {
+ for (Message message : messageList) {
+ State fromState = message.getFromState();
+ State toState = message.getToState();
+
+ if (!bounds.containsKey(fromState)) {
+ LOG.error("Message's fromState is not in currentState. message: " + message);
+ continue;
+ }
+
+ if (!bounds.containsKey(toState)) {
+ bounds.put(toState.toString(), new Bounds(0, 0));
+ }
+
+ // check lower bound of fromState
+ if (stateConstraints.containsKey(fromState)) {
+ int newLowerBound = bounds.get(fromState).getLowerBound() - 1;
+ if (newLowerBound < 0) {
+ LOG.error("Number of currentState in " + fromState
+ + " is less than number of messages transiting from " + fromState);
+ continue;
+ }
+
+ if (newLowerBound < stateConstraints.get(fromState).getLowerBound()) {
+ continue;
+ }
+ }
+
+ // check upper bound of toState
+ if (stateConstraints.containsKey(toState)) {
+ int newUpperBound = bounds.get(toState).getUpperBound() + 1;
+ if (newUpperBound > stateConstraints.get(toState).getUpperBound()) {
+ continue;
+ }
+ }
+
+ selectedMessages.add(message);
+ bounds.get(fromState).increaseLowerBound();
+ bounds.get(toState).increaseUpperBound();
+ }
+ }
+
+ return selectedMessages;
+ }
+
+ // TODO change to return Map<State, Bounds>
+ /**
+ * TODO: This code is duplicate in multiple places. Can we do it in to one place in the
+ * beginning and compute the stateConstraint instance once and re use at other places.
+ * Each IdealState must have a constraint object associated with it
+ */
+ private Map<String, Bounds> computeStateConstraints(StateModelDefinition stateModelDefinition,
+ RebalancerConfig rebalancerConfig, Cluster cluster) {
+ Map<String, Bounds> stateConstraints = new HashMap<String, Bounds>();
+
+ List<String> statePriorityList = stateModelDefinition.getStatesPriorityStringList();
+ for (String state : statePriorityList) {
+ String numInstancesPerState = stateModelDefinition.getNumInstancesPerState(state);
+ int max = -1;
+ if ("N".equals(numInstancesPerState)) {
+ max = cluster.getLiveParticipantMap().size();
+ } else if ("R".equals(numInstancesPerState)) {
+ // idealState is null when resource has been dropped,
+ // R can't be evaluated and ignore state constraints
+ if (rebalancerConfig != null) {
+ max = rebalancerConfig.getReplicaCount();
+ }
+ } else {
+ try {
+ max = Integer.parseInt(numInstancesPerState);
+ } catch (Exception e) {
+ // use -1
+ }
+ }
+
+ if (max > -1) {
+ // if state has no constraint, will not put in map
+ stateConstraints.put(state, new Bounds(0, max));
+ }
+ }
+
+ return stateConstraints;
+ }
+
+ // TODO: if state transition priority is not provided then use lexicographical sorting
+ // so that behavior is consistent
+ private Map<String, Integer> getStateTransitionPriorityMap(StateModelDefinition stateModelDef) {
+ Map<String, Integer> stateTransitionPriorities = new HashMap<String, Integer>();
+ List<String> stateTransitionPriorityList = stateModelDef.getStateTransitionPriorityStringList();
+ for (int i = 0; i < stateTransitionPriorityList.size(); i++) {
+ stateTransitionPriorities.put(stateTransitionPriorityList.get(i), i);
+ }
+
+ return stateTransitionPriorities;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9c7de4c3/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageThrottleStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageThrottleStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageThrottleStage.java
new file mode 100644
index 0000000..e45cd38
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewMessageThrottleStage.java
@@ -0,0 +1,196 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.Resource;
+import org.apache.helix.api.ResourceId;
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.model.ClusterConstraints;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Partition;
+import org.apache.helix.model.ClusterConstraints.ConstraintAttribute;
+import org.apache.helix.model.ConstraintItem;
+import org.apache.helix.model.ClusterConstraints.ConstraintType;
+import org.apache.helix.model.ClusterConstraints.ConstraintValue;
+import org.apache.log4j.Logger;
+
+public class NewMessageThrottleStage extends AbstractBaseStage {
+ private static final Logger LOG = Logger.getLogger(NewMessageThrottleStage.class.getName());
+
+ int valueOf(String valueStr) {
+ int value = Integer.MAX_VALUE;
+
+ try {
+ ConstraintValue valueToken = ConstraintValue.valueOf(valueStr);
+ switch (valueToken) {
+ case ANY:
+ value = Integer.MAX_VALUE;
+ break;
+ default:
+ LOG.error("Invalid constraintValue token:" + valueStr + ". Use default value:"
+ + Integer.MAX_VALUE);
+ break;
+ }
+ } catch (Exception e) {
+ try {
+ value = Integer.parseInt(valueStr);
+ } catch (NumberFormatException ne) {
+ LOG.error("Invalid constraintValue string:" + valueStr + ". Use default value:"
+ + Integer.MAX_VALUE);
+ }
+ }
+ return value;
+ }
+
+ /**
+ * constraints are selected in the order of the following rules: 1) don't select
+ * constraints with CONSTRAINT_VALUE=ANY; 2) if one constraint is more specific than the
+ * other, select the most specific one 3) if a message matches multiple constraints of
+ * incomparable specificity, select the one with the minimum value 4) if a message
+ * matches multiple constraints of incomparable specificity, and they all have the same
+ * value, select the first in alphabetic order
+ */
+ Set<ConstraintItem> selectConstraints(Set<ConstraintItem> items,
+ Map<ConstraintAttribute, String> attributes) {
+ Map<String, ConstraintItem> selectedItems = new HashMap<String, ConstraintItem>();
+ for (ConstraintItem item : items) {
+ // don't select constraints with CONSTRAINT_VALUE=ANY
+ if (item.getConstraintValue().equals(ConstraintValue.ANY.toString())) {
+ continue;
+ }
+
+ String key = item.filter(attributes).toString();
+ if (!selectedItems.containsKey(key)) {
+ selectedItems.put(key, item);
+ } else {
+ ConstraintItem existingItem = selectedItems.get(key);
+ if (existingItem.match(item.getAttributes())) {
+ // item is more specific than existingItem
+ selectedItems.put(key, item);
+ } else if (!item.match(existingItem.getAttributes())) {
+ // existingItem and item are of incomparable specificity
+ int value = valueOf(item.getConstraintValue());
+ int existingValue = valueOf(existingItem.getConstraintValue());
+ if (value < existingValue) {
+ // item's constraint value is less than that of existingItem
+ selectedItems.put(key, item);
+ } else if (value == existingValue) {
+ if (item.toString().compareTo(existingItem.toString()) < 0) {
+ // item is ahead of existingItem in alphabetic order
+ selectedItems.put(key, item);
+ }
+ }
+ }
+ }
+ }
+ return new HashSet<ConstraintItem>(selectedItems.values());
+ }
+
+ @Override
+ public void process(ClusterEvent event) throws Exception {
+ Cluster cluster = event.getAttribute("ClusterDataCache");
+ MessageSelectionStageOutput msgSelectionOutput =
+ event.getAttribute(AttributeName.MESSAGES_SELECTED.toString());
+ Map<ResourceId, Resource> resourceMap = event.getAttribute(AttributeName.RESOURCES.toString());
+
+ if (cluster == null || resourceMap == null || msgSelectionOutput == null) {
+ throw new StageException("Missing attributes in event: " + event
+ + ". Requires ClusterDataCache|RESOURCES|MESSAGES_SELECTED");
+ }
+
+ MessageThrottleStageOutput output = new MessageThrottleStageOutput();
+
+ // TODO fix it
+ ClusterConstraints constraint = null;
+ // cache.getConstraint(ConstraintType.MESSAGE_CONSTRAINT);
+ Map<String, Integer> throttleCounterMap = new HashMap<String, Integer>();
+
+ // TODO fix it
+ // if (constraint != null) {
+ // // go through all pending messages, they should be counted but not throttled
+ // for (String instance : cache.getLiveInstances().keySet()) {
+ // throttle(throttleCounterMap, constraint, new ArrayList<Message>(cache.getMessages(instance)
+ // .values()), false);
+ // }
+ // }
+
+ // go through all new messages, throttle if necessary
+ // assume messages should be sorted by state transition priority in messageSelection stage
+ for (ResourceId resourceId : resourceMap.keySet()) {
+ Resource resource = resourceMap.get(resourceId);
+ // TODO fix it
+ // for (Partition partition : resource.getPartitions()) {
+ // List<Message> messages = msgSelectionOutput.getMessages(resourceName, partition);
+ // if (constraint != null && messages != null && messages.size() > 0) {
+ // messages = throttle(throttleCounterMap, constraint, messages, true);
+ // }
+ // output.addMessages(resourceName, partition, messages);
+ // }
+ }
+
+ event.addAttribute(AttributeName.MESSAGES_THROTTLE.toString(), output);
+ }
+
+ private List<Message> throttle(Map<String, Integer> throttleMap, ClusterConstraints constraint,
+ List<Message> messages, final boolean needThrottle) {
+
+ List<Message> throttleOutputMsgs = new ArrayList<Message>();
+ for (Message message : messages) {
+ Map<ConstraintAttribute, String> msgAttr = ClusterConstraints.toConstraintAttributes(message);
+
+ Set<ConstraintItem> matches = constraint.match(msgAttr);
+ matches = selectConstraints(matches, msgAttr);
+
+ boolean msgThrottled = false;
+ for (ConstraintItem item : matches) {
+ String key = item.filter(msgAttr).toString();
+ if (!throttleMap.containsKey(key)) {
+ throttleMap.put(key, valueOf(item.getConstraintValue()));
+ }
+ int value = throttleMap.get(key);
+ throttleMap.put(key, --value);
+
+ if (needThrottle && value < 0) {
+ msgThrottled = true;
+
+ if (LOG.isDebugEnabled()) {
+ // TODO: printout constraint item that throttles the message
+ LOG.debug("message: " + message + " is throttled by constraint: " + item);
+ }
+ }
+ }
+
+ if (!msgThrottled) {
+ throttleOutputMsgs.add(message);
+ }
+ }
+
+ return throttleOutputMsgs;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9c7de4c3/helix-core/src/main/java/org/apache/helix/controller/stages/NewRebalanceIdealStateStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewRebalanceIdealStateStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewRebalanceIdealStateStage.java
new file mode 100644
index 0000000..3359b50
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewRebalanceIdealStateStage.java
@@ -0,0 +1,84 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.HelixManager;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.PartitionId;
+import org.apache.helix.api.RebalancerConfig;
+import org.apache.helix.api.Resource;
+import org.apache.helix.api.ResourceId;
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.rebalancer.NewRebalancer;
+import org.apache.helix.controller.rebalancer.Rebalancer;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.ResourceAssignment;
+import org.apache.helix.util.HelixUtil;
+import org.apache.log4j.Logger;
+
+/**
+ * Check and invoke custom implementation idealstate rebalancers.<br/>
+ * If the resourceConfig has specified className of the customized rebalancer, <br/>
+ * the rebalancer will be invoked to re-write the idealstate of the resource<br/>
+ */
+public class NewRebalanceIdealStateStage extends AbstractBaseStage {
+ private static final Logger LOG = Logger.getLogger(NewRebalanceIdealStateStage.class.getName());
+
+ @Override
+ public void process(ClusterEvent event) throws Exception {
+ Cluster cluster = event.getAttribute("ClusterDataCache");
+ NewCurrentStateOutput currentStateOutput =
+ event.getAttribute(AttributeName.CURRENT_STATE.toString());
+
+ // Map<String, IdealState> updatedIdealStates = new HashMap<String, IdealState>();
+ for (ResourceId resourceId : cluster.getResourceMap().keySet()) {
+ // IdealState currentIdealState = idealStateMap.get(resourceName);
+ Resource resource = cluster.getResource(resourceId);
+ RebalancerConfig rebalancerConfig = resource.getRebalancerConfig();
+ if (rebalancerConfig.getRebalancerMode() == RebalanceMode.USER_DEFINED
+ && rebalancerConfig.getRebalancerClassName() != null) {
+ String rebalancerClassName = rebalancerConfig.getRebalancerClassName();
+ LOG.info("resource " + resourceId + " use idealStateRebalancer " + rebalancerClassName);
+ try {
+ NewRebalancer balancer =
+ (NewRebalancer) (HelixUtil.loadClass(getClass(), rebalancerClassName).newInstance());
+
+ // TODO add state model def
+ ResourceAssignment resourceAssignment =
+ balancer.computeResourceMapping(resource, cluster, null);
+
+ // TODO impl this
+ // currentIdealState.updateFromAssignment(resourceAssignment);
+ // updatedIdealStates.put(resourceName, currentIdealState);
+ } catch (Exception e) {
+ LOG.error("Exception while invoking custom rebalancer class:" + rebalancerClassName, e);
+ }
+ }
+ }
+
+ // TODO
+ // if (updatedIdealStates.size() > 0) {
+ // cache.getIdealStates().putAll(updatedIdealStates);
+ // }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/9c7de4c3/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
new file mode 100644
index 0000000..b8c1ecf
--- /dev/null
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/NewResourceComputationStage.java
@@ -0,0 +1,108 @@
+package org.apache.helix.controller.stages;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.LinkedHashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.Participant;
+import org.apache.helix.api.Partition;
+import org.apache.helix.api.PartitionId;
+import org.apache.helix.api.RebalancerConfig;
+import org.apache.helix.api.Resource;
+import org.apache.helix.api.ResourceId;
+import org.apache.helix.api.StateModelFactoryId;
+import org.apache.helix.controller.pipeline.AbstractBaseStage;
+import org.apache.helix.controller.pipeline.StageException;
+import org.apache.helix.model.CurrentState;
+import org.apache.log4j.Logger;
+
+/**
+ * This stage computes all the resources in a cluster. The resources are
+ * computed from IdealStates -> this gives all the resources currently active
+ * CurrentState for liveInstance-> Helps in finding resources that are inactive
+ * and needs to be dropped
+ */
+public class NewResourceComputationStage extends AbstractBaseStage {
+ private static Logger LOG = Logger.getLogger(NewResourceComputationStage.class);
+
+ @Override
+ public void process(ClusterEvent event) throws Exception {
+ Cluster cluster = event.getAttribute("ClusterDataCache");
+ if (cluster == null) {
+ throw new StageException("Missing attributes in event:" + event + ". Requires Cluster");
+ }
+
+ Map<ResourceId, Resource.Builder> resourceBuilderMap =
+ new LinkedHashMap<ResourceId, Resource.Builder>();
+ // include all resources in ideal-state
+ for (ResourceId resourceId : cluster.getResourceMap().keySet()) {
+ Resource resource = cluster.getResource(resourceId);
+ RebalancerConfig rebalancerConfig = resource.getRebalancerConfig();
+
+ Resource.Builder resourceBuilder = new Resource.Builder(resourceId);
+ resourceBuilder.rebalancerConfig(rebalancerConfig);
+ resourceBuilder.addPartitions(resource.getPartitionSet());
+ resourceBuilderMap.put(resourceId, resourceBuilder);
+ }
+
+ // include all partitions from CurrentState as well since idealState might be removed
+ for (Participant liveParticipant : cluster.getLiveParticipantMap().values()) {
+ for ( ResourceId resourceId : liveParticipant.getCurrentStateMap().keySet()) {
+ CurrentState currentState = liveParticipant.getCurrentStateMap().get(resourceId);
+
+ if (currentState.getStateModelDefRef() == null) {
+ LOG.error("state model def is null." + "resource:" + currentState.getResourceId()
+ + ", partitions: " + currentState.getPartitionStateStringMap().keySet()
+ + ", states: " + currentState.getPartitionStateStringMap().values());
+ throw new StageException("State model def is null for resource:"
+ + currentState.getResourceId());
+ }
+
+ // don't overwrite ideal state configs
+ if (!resourceBuilderMap.containsKey(resourceId)) {
+ RebalancerConfig.Builder rebalancerConfigBuilder = new RebalancerConfig.Builder();
+ rebalancerConfigBuilder.stateModelDef(currentState.getStateModelDefId());
+ rebalancerConfigBuilder.stateModelFactoryId(new StateModelFactoryId(currentState.getStateModelFactoryName()));
+ rebalancerConfigBuilder.bucketSize(currentState.getBucketSize());
+ rebalancerConfigBuilder.batchMessageMode(currentState.getBatchMessageMode());
+
+ org.apache.helix.api.Resource.Builder resourceBuilder = new org.apache.helix.api.Resource.Builder(resourceId);
+ resourceBuilder.rebalancerConfig(rebalancerConfigBuilder.build());
+ resourceBuilderMap.put(resourceId, resourceBuilder);
+ }
+
+ for (PartitionId partitionId : currentState.getPartitionStateMap().keySet()) {
+ resourceBuilderMap.get(resourceId).addPartition(new Partition(partitionId));
+ }
+ }
+ }
+
+ // convert builder-map to resource-map
+ Map<ResourceId, Resource> resourceMap = new LinkedHashMap<ResourceId, Resource>();
+ for (ResourceId resourceId : resourceBuilderMap.keySet()) {
+ resourceMap.put(resourceId, resourceBuilderMap.get(resourceId).build());
+ }
+
+ event.addAttribute(AttributeName.RESOURCES.toString(), resourceMap);
+ }
+}