You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2013/11/20 23:33:30 UTC
[2/2] git commit: fixing compile errors
fixing compile errors
Project: http://git-wip-us.apache.org/repos/asf/incubator-helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-helix/commit/925b7e94
Tree: http://git-wip-us.apache.org/repos/asf/incubator-helix/tree/925b7e94
Diff: http://git-wip-us.apache.org/repos/asf/incubator-helix/diff/925b7e94
Branch: refs/heads/helix-yarn
Commit: 925b7e94e2c73d9cd133a109a87d4cc46ad3b94d
Parents: 1ec06f5
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Wed Nov 20 14:33:20 2013 -0800
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Wed Nov 20 14:33:20 2013 -0800
----------------------------------------------------------------------
.../controller/stages/ResourceCurrentState.java | 20 +
.../org/apache/helix/task/TaskRebalancer.java | 638 ++++++++-----------
.../java/org/apache/helix/task/TaskUtil.java | 124 ++--
.../integration/task/TestTaskRebalancer.java | 247 ++++---
.../task/TestTaskRebalancerStopResume.java | 150 ++---
.../apache/helix/integration/task/TestUtil.java | 113 ++--
.../autoscale/provider/ProviderRebalancer.java | 102 +--
.../apache/helix/autoscale/HelixYarnTest.java | 1 +
8 files changed, 632 insertions(+), 763 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/925b7e94/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceCurrentState.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceCurrentState.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceCurrentState.java
index f04afd0..f986765 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceCurrentState.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceCurrentState.java
@@ -47,6 +47,13 @@ public class ResourceCurrentState {
private final Map<ResourceId, Map<PartitionId, Map<ParticipantId, State>>> _pendingStateMap;
/**
+ * map of resource-id to map of partition-id to map of participant-id to state
+ * represent requested messages for the participant
+ * TODO: this isn't populated
+ */
+ private final Map<ResourceId, Map<PartitionId, Map<ParticipantId, State>>> _requestedStateMap;
+
+ /**
* map of resource-id to state model definition id
*/
private final Map<ResourceId, StateModelDefId> _resourceStateModelMap;
@@ -62,6 +69,7 @@ public class ResourceCurrentState {
public ResourceCurrentState() {
_currentStateMap = new HashMap<ResourceId, Map<PartitionId, Map<ParticipantId, State>>>();
_pendingStateMap = new HashMap<ResourceId, Map<PartitionId, Map<ParticipantId, State>>>();
+ _requestedStateMap = new HashMap<ResourceId, Map<PartitionId, Map<ParticipantId, State>>>();
_resourceStateModelMap = new HashMap<ResourceId, StateModelDefId>();
_curStateMetaMap = new HashMap<ResourceId, CurrentState>();
@@ -225,6 +233,18 @@ public class ResourceCurrentState {
}
/**
+ * given (resource, partition, instance), returns toState
+ * @param resourceName
+ * @param partition
+ * @param instanceName
+ * @return
+ */
+ public State getRequestedState(ResourceId resourceId, PartitionId partitionId,
+ ParticipantId participantId) {
+ return getState(_requestedStateMap, resourceId, partitionId, participantId);
+ }
+
+ /**
* @param resourceId
* @param partitionId
* @return
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/925b7e94/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index 5664713..997fe3b 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -3,10 +3,6 @@
*/
package org.apache.helix.task;
-
-import com.google.common.base.Joiner;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Sets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -16,45 +12,57 @@ import java.util.Set;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
+
import org.apache.helix.AccessOption;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.PropertyKey;
import org.apache.helix.ZNRecord;
-import org.apache.helix.controller.rebalancer.Rebalancer;
-import org.apache.helix.controller.stages.ClusterDataCache;
-import org.apache.helix.controller.stages.CurrentStateOutput;
+import org.apache.helix.api.Cluster;
+import org.apache.helix.api.State;
+import org.apache.helix.api.id.ParticipantId;
+import org.apache.helix.api.id.PartitionId;
+import org.apache.helix.api.id.ResourceId;
+import org.apache.helix.controller.rebalancer.HelixRebalancer;
+import org.apache.helix.controller.rebalancer.context.PartitionedRebalancerContext;
+import org.apache.helix.controller.rebalancer.context.RebalancerConfig;
+import org.apache.helix.controller.stages.ResourceCurrentState;
import org.apache.helix.model.IdealState;
-import org.apache.helix.model.Partition;
-import org.apache.helix.model.Resource;
import org.apache.helix.model.ResourceAssignment;
import org.apache.log4j.Logger;
+import com.google.common.base.Functions;
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
/**
* Custom rebalancer implementation for the {@code Task} state model.
- *
* @author Abe <as...@linkedin.com>
* @version $Revision$
*/
-public class TaskRebalancer implements Rebalancer
-{
+public class TaskRebalancer implements HelixRebalancer {
private static final Logger LOG = Logger.getLogger(TaskRebalancer.class);
private HelixManager _manager;
@Override
- public void init(HelixManager manager)
- {
+ public void init(HelixManager manager) {
_manager = manager;
}
@Override
- public ResourceAssignment computeResourceMapping(Resource resource,
- IdealState taskIs,
- CurrentStateOutput currStateOutput,
- ClusterDataCache clusterData)
- {
- final String resourceName = resource.getResourceName();
+ public ResourceAssignment computeResourceMapping(RebalancerConfig rebalancerConfig,
+ Cluster cluster, ResourceCurrentState currentState) {
+ PartitionedRebalancerContext context =
+ rebalancerConfig.getRebalancerContext(PartitionedRebalancerContext.class);
+ ResourceId resourceId = context.getResourceId();
+ String resourceName = resourceId.toString();
+
+ // get the ideal state
+ HelixDataAccessor accessor = _manager.getHelixDataAccessor();
+ PropertyKey propertyKey = accessor.keyBuilder().idealStates(resourceName);
+ IdealState taskIs = accessor.getProperty(propertyKey);
// Fetch task configuration
TaskConfig taskCfg = TaskUtil.getTaskCfg(_manager, resourceName);
@@ -65,33 +73,29 @@ public class TaskRebalancer implements Rebalancer
WorkflowContext workflowCtx = TaskUtil.getWorkflowContext(_manager, workflowResource);
// Initialize workflow context if needed
- if (workflowCtx == null)
- {
+ if (workflowCtx == null) {
workflowCtx = new WorkflowContext(new ZNRecord("WorkflowContext"));
workflowCtx.setStartTime(System.currentTimeMillis());
}
// Check parent dependencies
- for (String parent : workflowCfg.getTaskDag().getDirectParents(resourceName))
- {
- if (workflowCtx.getTaskState(parent) == null || !workflowCtx.getTaskState(parent).equals(TaskState.COMPLETED))
- {
+ for (String parent : workflowCfg.getTaskDag().getDirectParents(resourceName)) {
+ if (workflowCtx.getTaskState(parent) == null
+ || !workflowCtx.getTaskState(parent).equals(TaskState.COMPLETED)) {
return emptyAssignment(resourceName);
}
}
// Clean up if workflow marked for deletion
TargetState targetState = workflowCfg.getTargetState();
- if (targetState == TargetState.DELETE)
- {
+ if (targetState == TargetState.DELETE) {
cleanup(_manager, resourceName, workflowCfg, workflowResource);
return emptyAssignment(resourceName);
}
// Check if this workflow has been finished past its expiry.
if (workflowCtx.getFinishTime() != WorkflowContext.UNFINISHED
- && workflowCtx.getFinishTime() + workflowCfg.getExpiry() <= System.currentTimeMillis())
- {
+ && workflowCtx.getFinishTime() + workflowCfg.getExpiry() <= System.currentTimeMillis()) {
markForDeletion(_manager, workflowResource);
cleanup(_manager, resourceName, workflowCfg, workflowResource);
return emptyAssignment(resourceName);
@@ -99,48 +103,42 @@ public class TaskRebalancer implements Rebalancer
// Fetch any existing context information from the property store.
TaskContext taskCtx = TaskUtil.getTaskContext(_manager, resourceName);
- if (taskCtx == null)
- {
+ if (taskCtx == null) {
taskCtx = new TaskContext(new ZNRecord("TaskContext"));
taskCtx.setStartTime(System.currentTimeMillis());
}
// The task is already in a final state (completed/failed).
if (workflowCtx.getTaskState(resourceName) == TaskState.FAILED
- || workflowCtx.getTaskState(resourceName) == TaskState.COMPLETED)
- {
+ || workflowCtx.getTaskState(resourceName) == TaskState.COMPLETED) {
return emptyAssignment(resourceName);
}
ResourceAssignment prevAssignment = TaskUtil.getPrevResourceAssignment(_manager, resourceName);
- if (prevAssignment == null)
- {
- prevAssignment = new ResourceAssignment(resourceName);
+ if (prevAssignment == null) {
+ prevAssignment = new ResourceAssignment(resourceId);
}
- // Will contain the list of partitions that must be explicitly dropped from the ideal state that is stored in zk.
- // Fetch the previous resource assignment from the property store. This is required because of HELIX-230.
+ // Will contain the list of partitions that must be explicitly dropped from the ideal state that
+ // is stored in zk.
+ // Fetch the previous resource assignment from the property store. This is required because of
+ // HELIX-230.
Set<Integer> partitionsToDrop = new TreeSet<Integer>();
- ResourceAssignment newAssignment = computeResourceMapping(resourceName,
- workflowCfg,
- taskCfg,
- prevAssignment,
- clusterData.getIdealState(taskCfg.getTargetResource()),
- clusterData.getLiveInstances().keySet(),
- currStateOutput,
- workflowCtx,
- taskCtx,
- partitionsToDrop);
-
- if (!partitionsToDrop.isEmpty())
- {
- for (Integer pId : partitionsToDrop)
- {
+ IdealState targetIs =
+ accessor.getProperty(accessor.keyBuilder().idealStates(taskCfg.getTargetResource()));
+ List<ParticipantId> liveParticipants =
+ Lists.newArrayList(cluster.getLiveParticipantMap().keySet());
+ List<String> rawLiveParticipants =
+ Lists.transform(liveParticipants, Functions.toStringFunction());
+ ResourceAssignment newAssignment =
+ computeResourceMapping(resourceName, workflowCfg, taskCfg, prevAssignment, targetIs,
+ rawLiveParticipants, currentState, workflowCtx, taskCtx, partitionsToDrop);
+
+ if (!partitionsToDrop.isEmpty()) {
+ for (Integer pId : partitionsToDrop) {
taskIs.getRecord().getMapFields().remove(pName(resourceName, pId));
}
- HelixDataAccessor accessor = _manager.getHelixDataAccessor();
- PropertyKey propertyKey = accessor.keyBuilder().idealStates(resourceName);
accessor.setProperty(propertyKey, taskIs);
}
@@ -153,30 +151,20 @@ public class TaskRebalancer implements Rebalancer
}
private static ResourceAssignment computeResourceMapping(String taskResource,
- WorkflowConfig workflowConfig,
- TaskConfig taskCfg,
- ResourceAssignment prevAssignment,
- IdealState tgtResourceIs,
- Iterable<String> liveInstances,
- CurrentStateOutput currStateOutput,
- WorkflowContext workflowCtx,
- TaskContext taskCtx,
- Set<Integer> partitionsToDropFromIs)
- {
+ WorkflowConfig workflowConfig, TaskConfig taskCfg, ResourceAssignment prevAssignment,
+ IdealState tgtResourceIs, Iterable<String> liveInstances,
+ ResourceCurrentState currStateOutput, WorkflowContext workflowCtx, TaskContext taskCtx,
+ Set<Integer> partitionsToDropFromIs) {
TargetState taskTgtState = workflowConfig.getTargetState();
// Update running status in workflow context
- if (taskTgtState == TargetState.STOP)
- {
+ if (taskTgtState == TargetState.STOP) {
workflowCtx.setTaskState(taskResource, TaskState.STOPPED);
// Workflow has been stopped if all tasks are stopped
- if (isWorkflowStopped(workflowCtx, workflowConfig))
- {
+ if (isWorkflowStopped(workflowCtx, workflowConfig)) {
workflowCtx.setWorkflowState(TaskState.STOPPED);
}
- }
- else
- {
+ } else {
workflowCtx.setTaskState(taskResource, TaskState.IN_PROGRESS);
// Workflow is in progress if any task is in progress
workflowCtx.setWorkflowState(TaskState.IN_PROGRESS);
@@ -190,134 +178,121 @@ public class TaskRebalancer implements Rebalancer
// Process all the current assignments of task partitions.
Set<Integer> allPartitions = getAllTaskPartitions(tgtResourceIs, taskCfg);
- Map<String, SortedSet<Integer>> taskAssignments = getTaskPartitionAssignments(liveInstances,
- prevAssignment,
- allPartitions);
- for (String instance : taskAssignments.keySet())
- {
+ Map<String, SortedSet<Integer>> taskAssignments =
+ getTaskPartitionAssignments(liveInstances, prevAssignment, allPartitions);
+ for (String instance : taskAssignments.keySet()) {
Set<Integer> pSet = taskAssignments.get(instance);
- // Used to keep track of partitions that are in one of the final states: COMPLETED, TIMED_OUT, TASK_ERROR, ERROR.
+ // Used to keep track of partitions that are in one of the final states: COMPLETED, TIMED_OUT,
+ // TASK_ERROR, ERROR.
Set<Integer> donePartitions = new TreeSet<Integer>();
- for (int pId : pSet)
- {
+ for (int pId : pSet) {
final String pName = pName(taskResource, pId);
// Check for pending state transitions on this (partition, instance).
- String pendingState = currStateOutput.getPendingState(taskResource,
- new Partition(pName),
- instance);
- if (pendingState != null)
- {
- // There is a pending state transition for this (partition, instance). Just copy forward the state
+ State pendingState =
+ currStateOutput.getPendingState(ResourceId.from(taskResource), PartitionId.from(pName),
+ ParticipantId.from(instance));
+ if (pendingState != null) {
+ // There is a pending state transition for this (partition, instance). Just copy forward
+ // the state
// assignment from the previous ideal state.
- Map<String, String> stateMap = prevAssignment.getReplicaMap(new Partition(pName));
- if (stateMap != null)
- {
- String prevState = stateMap.get(instance);
- paMap.put(pId, new PartitionAssignment(instance, prevState));
+ Map<ParticipantId, State> stateMap =
+ prevAssignment.getReplicaMap(PartitionId.from(pName));
+ if (stateMap != null) {
+ State prevState = stateMap.get(ParticipantId.from(instance));
+ paMap.put(pId, new PartitionAssignment(instance, prevState.toString()));
assignedPartitions.add(pId);
- LOG.debug(String.format("Task partition %s has a pending state transition on instance %s. Using the previous ideal state which was %s.",
- pName,
- instance,
- prevState));
+ LOG.debug(String
+ .format(
+ "Task partition %s has a pending state transition on instance %s. Using the previous ideal state which was %s.",
+ pName, instance, prevState));
}
continue;
}
- TaskPartitionState currState = TaskPartitionState.valueOf(currStateOutput.getCurrentState(taskResource,
- new Partition(pName),
- instance));
+ TaskPartitionState currState =
+ TaskPartitionState.valueOf(currStateOutput.getCurrentState(
+ ResourceId.from(taskResource), PartitionId.from(pName),
+ ParticipantId.from(instance)).toString());
// Process any requested state transitions.
- String requestedStateStr = currStateOutput.getRequestedState(taskResource,
- new Partition(pName),
- instance);
- if (requestedStateStr != null && !requestedStateStr.isEmpty())
- {
+ String requestedStateStr =
+ currStateOutput.getRequestedState(ResourceId.from(taskResource),
+ PartitionId.from(pName), ParticipantId.from(instance)).toString();
+ if (requestedStateStr != null && !requestedStateStr.isEmpty()) {
TaskPartitionState requestedState = TaskPartitionState.valueOf(requestedStateStr);
- if (requestedState.equals(currState))
- {
- LOG.warn(String.format("Requested state %s is the same as the current state for instance %s.",
- requestedState,
- instance));
+ if (requestedState.equals(currState)) {
+ LOG.warn(String.format(
+ "Requested state %s is the same as the current state for instance %s.",
+ requestedState, instance));
}
paMap.put(pId, new PartitionAssignment(instance, requestedState.name()));
assignedPartitions.add(pId);
- LOG.debug(String.format("Instance %s requested a state transition to %s for partition %s.",
- instance,
- requestedState,
- pName));
+ LOG.debug(String.format(
+ "Instance %s requested a state transition to %s for partition %s.", instance,
+ requestedState, pName));
continue;
}
- switch (currState)
- {
- case RUNNING:
- case STOPPED:
- {
- TaskPartitionState nextState;
- if (taskTgtState == TargetState.START)
- {
- nextState = TaskPartitionState.RUNNING;
- }
- else
- {
- nextState = TaskPartitionState.STOPPED;
- }
-
- paMap.put(pId, new PartitionAssignment(instance, nextState.name()));
- assignedPartitions.add(pId);
- LOG.debug(String.format("Setting task partition %s state to %s on instance %s.",
- pName,
- nextState,
- instance));
+ switch (currState) {
+ case RUNNING:
+ case STOPPED: {
+ TaskPartitionState nextState;
+ if (taskTgtState == TargetState.START) {
+ nextState = TaskPartitionState.RUNNING;
+ } else {
+ nextState = TaskPartitionState.STOPPED;
}
+
+ paMap.put(pId, new PartitionAssignment(instance, nextState.name()));
+ assignedPartitions.add(pId);
+ LOG.debug(String.format("Setting task partition %s state to %s on instance %s.", pName,
+ nextState, instance));
+ }
break;
- case COMPLETED:
- {
- // The task has completed on this partition. Mark as such in the context object.
- donePartitions.add(pId);
- LOG.debug(String.format("Task partition %s has completed with state %s. Marking as such in rebalancer context.",
- pName,
- currState));
- partitionsToDropFromIs.add(pId);
- markPartitionCompleted(taskCtx, pId);
- }
+ case COMPLETED: {
+ // The task has completed on this partition. Mark as such in the context object.
+ donePartitions.add(pId);
+ LOG.debug(String
+ .format(
+ "Task partition %s has completed with state %s. Marking as such in rebalancer context.",
+ pName, currState));
+ partitionsToDropFromIs.add(pId);
+ markPartitionCompleted(taskCtx, pId);
+ }
break;
- case TIMED_OUT:
- case TASK_ERROR:
- case ERROR:
- {
- donePartitions.add(pId); // The task may be rescheduled on a different instance.
- LOG.debug(String.format("Task partition %s has error state %s. Marking as such in rebalancer context.",
- pName,
- currState));
- markPartitionError(taskCtx, pId, currState);
- // The error policy is to fail the task as soon a single partition fails for a specified maximum number of
- // attempts.
- if (taskCtx.getPartitionNumAttempts(pId) >= taskCfg.getMaxAttemptsPerPartition())
- {
- workflowCtx.setTaskState(taskResource, TaskState.FAILED);
- workflowCtx.setWorkflowState(TaskState.FAILED);
- addAllPartitions(tgtResourceIs.getPartitionSet(), partitionsToDropFromIs);
- return emptyAssignment(taskResource);
- }
+ case TIMED_OUT:
+ case TASK_ERROR:
+ case ERROR: {
+ donePartitions.add(pId); // The task may be rescheduled on a different instance.
+ LOG.debug(String.format(
+ "Task partition %s has error state %s. Marking as such in rebalancer context.",
+ pName, currState));
+ markPartitionError(taskCtx, pId, currState);
+ // The error policy is to fail the task as soon a single partition fails for a specified
+ // maximum number of
+ // attempts.
+ if (taskCtx.getPartitionNumAttempts(pId) >= taskCfg.getMaxAttemptsPerPartition()) {
+ workflowCtx.setTaskState(taskResource, TaskState.FAILED);
+ workflowCtx.setWorkflowState(TaskState.FAILED);
+ addAllPartitions(tgtResourceIs.getPartitionSet(), partitionsToDropFromIs);
+ return emptyAssignment(taskResource);
}
+ }
break;
- case INIT:
- case DROPPED:
- {
- // currState in [INIT, DROPPED]. Do nothing, the partition is eligible to be reassigned.
- donePartitions.add(pId);
- LOG.debug(String.format("Task partition %s has state %s. It will be dropped from the current ideal state.",
- pName,
- currState));
- }
+ case INIT:
+ case DROPPED: {
+ // currState in [INIT, DROPPED]. Do nothing, the partition is eligible to be reassigned.
+ donePartitions.add(pId);
+ LOG.debug(String.format(
+ "Task partition %s has state %s. It will be dropped from the current ideal state.",
+ pName, currState));
+ }
break;
- default:
- throw new AssertionError("Unknown enum symbol: " + currState);
+ default:
+ throw new AssertionError("Unknown enum symbol: " + currState);
}
}
@@ -325,60 +300,50 @@ public class TaskRebalancer implements Rebalancer
pSet.removeAll(donePartitions);
}
- if (isTaskComplete(taskCtx, allPartitions))
- {
+ if (isTaskComplete(taskCtx, allPartitions)) {
workflowCtx.setTaskState(taskResource, TaskState.COMPLETED);
- if (isWorkflowComplete(workflowCtx, workflowConfig))
- {
+ if (isWorkflowComplete(workflowCtx, workflowConfig)) {
workflowCtx.setWorkflowState(TaskState.COMPLETED);
workflowCtx.setFinishTime(System.currentTimeMillis());
}
}
// Make additional task assignments if needed.
- if (taskTgtState == TargetState.START)
- {
- // Contains the set of task partitions that must be excluded from consideration when making any new assignments.
+ if (taskTgtState == TargetState.START) {
+ // Contains the set of task partitions that must be excluded from consideration when making
+ // any new assignments.
// This includes all completed, failed, already assigned partitions.
Set<Integer> excludeSet = Sets.newTreeSet(assignedPartitions);
addCompletedPartitions(excludeSet, taskCtx, allPartitions);
// Get instance->[partition, ...] mappings for the target resource.
- Map<String, SortedSet<Integer>> tgtPartitionAssignments = getTgtPartitionAssignment(currStateOutput,
- liveInstances,
- tgtResourceIs,
- taskCfg.getTargetPartitionStates(),
- allPartitions);
- for (Map.Entry<String, SortedSet<Integer>> entry : taskAssignments.entrySet())
- {
+ Map<String, SortedSet<Integer>> tgtPartitionAssignments =
+ getTgtPartitionAssignment(currStateOutput, liveInstances, tgtResourceIs,
+ taskCfg.getTargetPartitionStates(), allPartitions);
+ for (Map.Entry<String, SortedSet<Integer>> entry : taskAssignments.entrySet()) {
String instance = entry.getKey();
// Contains the set of task partitions currently assigned to the instance.
Set<Integer> pSet = entry.getValue();
int numToAssign = taskCfg.getNumConcurrentTasksPerInstance() - pSet.size();
- if (numToAssign > 0)
- {
- List<Integer> nextPartitions = getNextPartitions(tgtPartitionAssignments.get(instance),
- excludeSet,
- numToAssign);
- for (Integer pId : nextPartitions)
- {
+ if (numToAssign > 0) {
+ List<Integer> nextPartitions =
+ getNextPartitions(tgtPartitionAssignments.get(instance), excludeSet, numToAssign);
+ for (Integer pId : nextPartitions) {
String pName = pName(taskResource, pId);
paMap.put(pId, new PartitionAssignment(instance, TaskPartitionState.RUNNING.name()));
excludeSet.add(pId);
- LOG.debug(String.format("Setting task partition %s state to %s on instance %s.",
- pName,
- TaskPartitionState.RUNNING,
- instance));
+ LOG.debug(String.format("Setting task partition %s state to %s on instance %s.", pName,
+ TaskPartitionState.RUNNING, instance));
}
}
}
}
// Construct a ResourceAssignment object from the map of partition assignments.
- ResourceAssignment ra = new ResourceAssignment(taskResource);
- for (Map.Entry<Integer, PartitionAssignment> e : paMap.entrySet())
- {
+ ResourceAssignment ra = new ResourceAssignment(ResourceId.from(taskResource));
+ for (Map.Entry<Integer, PartitionAssignment> e : paMap.entrySet()) {
PartitionAssignment pa = e.getValue();
- ra.addReplicaMap(new Partition(pName(taskResource, e.getKey())), ImmutableMap.of(pa._instance, pa._state));
+ ra.addReplicaMap(PartitionId.from(pName(taskResource, e.getKey())),
+ ImmutableMap.of(ParticipantId.from(pa._instance), State.from(pa._state)));
}
return ra;
@@ -386,20 +351,16 @@ public class TaskRebalancer implements Rebalancer
/**
* Checks if the task has completed.
- *
- * @param ctx The rebalancer context.
+ * @param ctx The rebalancer context.
* @param allPartitions The set of partitions to check.
- *
- * @return true if all task partitions have been marked with status {@link TaskPartitionState#COMPLETED} in the rebalancer
+ * @return true if all task partitions have been marked with status
+ * {@link TaskPartitionState#COMPLETED} in the rebalancer
* context, false otherwise.
*/
- private static boolean isTaskComplete(TaskContext ctx, Set<Integer> allPartitions)
- {
- for (Integer pId : allPartitions)
- {
+ private static boolean isTaskComplete(TaskContext ctx, Set<Integer> allPartitions) {
+ for (Integer pId : allPartitions) {
TaskPartitionState state = ctx.getPartitionState(pId);
- if (state != TaskPartitionState.COMPLETED)
- {
+ if (state != TaskPartitionState.COMPLETED) {
return false;
}
}
@@ -408,18 +369,13 @@ public class TaskRebalancer implements Rebalancer
/**
* Checks if the workflow has completed.
- *
* @param ctx Workflow context containing task states
* @param cfg Workflow config containing set of tasks
- *
* @return returns true if all tasks are {@link TaskState#COMPLETED}, false otherwise.
*/
- private static boolean isWorkflowComplete(WorkflowContext ctx, WorkflowConfig cfg)
- {
- for (String task : cfg.getTaskDag().getAllNodes())
- {
- if(ctx.getTaskState(task) != TaskState.COMPLETED)
- {
+ private static boolean isWorkflowComplete(WorkflowContext ctx, WorkflowConfig cfg) {
+ for (String task : cfg.getTaskDag().getAllNodes()) {
+ if (ctx.getTaskState(task) != TaskState.COMPLETED) {
return false;
}
}
@@ -428,139 +384,122 @@ public class TaskRebalancer implements Rebalancer
/**
* Checks if the workflow has been stopped.
- *
* @param ctx Workflow context containing task states
* @param cfg Workflow config containing set of tasks
- *
* @return returns true if all tasks are {@link TaskState#STOPPED}, false otherwise.
*/
- private static boolean isWorkflowStopped(WorkflowContext ctx, WorkflowConfig cfg)
- {
- for (String task : cfg.getTaskDag().getAllNodes())
- {
- if(ctx.getTaskState(task) != TaskState.STOPPED && ctx.getTaskState(task) != null)
- {
+ private static boolean isWorkflowStopped(WorkflowContext ctx, WorkflowConfig cfg) {
+ for (String task : cfg.getTaskDag().getAllNodes()) {
+ if (ctx.getTaskState(task) != TaskState.STOPPED && ctx.getTaskState(task) != null) {
return false;
}
}
return true;
}
- private static void markForDeletion(HelixManager mgr, String resourceName)
- {
- mgr.getConfigAccessor().set(TaskUtil.getResourceConfigScope(mgr.getClusterName(), resourceName),
- WorkflowConfig.TARGET_STATE,
- TargetState.DELETE.name());
+ private static void markForDeletion(HelixManager mgr, String resourceName) {
+ mgr.getConfigAccessor().set(
+ TaskUtil.getResourceConfigScope(mgr.getClusterName(), resourceName),
+ WorkflowConfig.TARGET_STATE, TargetState.DELETE.name());
}
/**
- * Cleans up all Helix state associated with this task, wiping workflow-level information if this is the last
+ * Cleans up all Helix state associated with this task, wiping workflow-level information if this
+ * is the last
* remaining task in its workflow.
*/
- private static void cleanup(HelixManager mgr, String resourceName, WorkflowConfig cfg, String workflowResource)
- {
+ private static void cleanup(HelixManager mgr, String resourceName, WorkflowConfig cfg,
+ String workflowResource) {
HelixDataAccessor accessor = mgr.getHelixDataAccessor();
// Delete resource configs.
PropertyKey cfgKey = getConfigPropertyKey(accessor, resourceName);
- if (!accessor.removeProperty(cfgKey))
- {
- throw new RuntimeException(String.format(
- "Error occurred while trying to clean up task %s. Failed to remove node %s from Helix. Aborting further clean up steps.",
- resourceName,
- cfgKey));
+ if (!accessor.removeProperty(cfgKey)) {
+ throw new RuntimeException(
+ String
+ .format(
+ "Error occurred while trying to clean up task %s. Failed to remove node %s from Helix. Aborting further clean up steps.",
+ resourceName, cfgKey));
}
// Delete property store information for this resource.
String propStoreKey = getRebalancerPropStoreKey(resourceName);
- if (!mgr.getHelixPropertyStore().remove(propStoreKey, AccessOption.PERSISTENT))
- {
- throw new RuntimeException(String.format(
- "Error occurred while trying to clean up task %s. Failed to remove node %s from Helix. Aborting further clean up steps.",
- resourceName,
- propStoreKey));
+ if (!mgr.getHelixPropertyStore().remove(propStoreKey, AccessOption.PERSISTENT)) {
+ throw new RuntimeException(
+ String
+ .format(
+ "Error occurred while trying to clean up task %s. Failed to remove node %s from Helix. Aborting further clean up steps.",
+ resourceName, propStoreKey));
}
// Finally, delete the ideal state itself.
PropertyKey isKey = getISPropertyKey(accessor, resourceName);
- if (!accessor.removeProperty(isKey))
- {
+ if (!accessor.removeProperty(isKey)) {
throw new RuntimeException(String.format(
"Error occurred while trying to clean up task %s. Failed to remove node %s from Helix.",
- resourceName,
- isKey));
+ resourceName, isKey));
}
LOG.info(String.format("Successfully cleaned up task resource %s.", resourceName));
boolean lastInWorkflow = true;
- for(String task : cfg.getTaskDag().getAllNodes())
- {
+ for (String task : cfg.getTaskDag().getAllNodes()) {
// check if property store information or resource configs exist for this task
- if(mgr.getHelixPropertyStore().exists(getRebalancerPropStoreKey(task), AccessOption.PERSISTENT)
- || accessor.getProperty(getConfigPropertyKey(accessor, task)) != null
- || accessor.getProperty(getISPropertyKey(accessor, task)) != null)
- {
+ if (mgr.getHelixPropertyStore().exists(getRebalancerPropStoreKey(task),
+ AccessOption.PERSISTENT)
+ || accessor.getProperty(getConfigPropertyKey(accessor, task)) != null
+ || accessor.getProperty(getISPropertyKey(accessor, task)) != null) {
lastInWorkflow = false;
}
}
// clean up task-level info if this was the last in workflow
- if(lastInWorkflow)
- {
+ if (lastInWorkflow) {
// delete workflow config
PropertyKey workflowCfgKey = getConfigPropertyKey(accessor, workflowResource);
- if (!accessor.removeProperty(workflowCfgKey))
- {
- throw new RuntimeException(String.format(
- "Error occurred while trying to clean up workflow %s. Failed to remove node %s from Helix. Aborting further clean up steps.",
- workflowResource,
- workflowCfgKey));
+ if (!accessor.removeProperty(workflowCfgKey)) {
+ throw new RuntimeException(
+ String
+ .format(
+ "Error occurred while trying to clean up workflow %s. Failed to remove node %s from Helix. Aborting further clean up steps.",
+ workflowResource, workflowCfgKey));
}
// Delete property store information for this workflow
String workflowPropStoreKey = getRebalancerPropStoreKey(workflowResource);
- if (!mgr.getHelixPropertyStore().remove(workflowPropStoreKey, AccessOption.PERSISTENT))
- {
- throw new RuntimeException(String.format(
- "Error occurred while trying to clean up workflow %s. Failed to remove node %s from Helix. Aborting further clean up steps.",
- workflowResource,
- workflowPropStoreKey));
+ if (!mgr.getHelixPropertyStore().remove(workflowPropStoreKey, AccessOption.PERSISTENT)) {
+ throw new RuntimeException(
+ String
+ .format(
+ "Error occurred while trying to clean up workflow %s. Failed to remove node %s from Helix. Aborting further clean up steps.",
+ workflowResource, workflowPropStoreKey));
}
}
}
- private static String getRebalancerPropStoreKey(String resource)
- {
+ private static String getRebalancerPropStoreKey(String resource) {
return Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resource);
}
- private static PropertyKey getISPropertyKey(HelixDataAccessor accessor, String resource)
- {
+ private static PropertyKey getISPropertyKey(HelixDataAccessor accessor, String resource) {
return accessor.keyBuilder().idealStates(resource);
}
- private static PropertyKey getConfigPropertyKey(HelixDataAccessor accessor, String resource)
- {
+ private static PropertyKey getConfigPropertyKey(HelixDataAccessor accessor, String resource) {
return accessor.keyBuilder().resourceConfig(resource);
}
- private static void addAllPartitions(Set<String> pNames, Set<Integer> pIds)
- {
- for (String pName : pNames)
- {
+ private static void addAllPartitions(Set<String> pNames, Set<Integer> pIds) {
+ for (String pName : pNames) {
pIds.add(pId(pName));
}
}
- private static ResourceAssignment emptyAssignment(String name)
- {
- return new ResourceAssignment(name);
+ private static ResourceAssignment emptyAssignment(String name) {
+ return new ResourceAssignment(ResourceId.from(name));
}
- private static void addCompletedPartitions(Set<Integer> set, TaskContext ctx, Iterable<Integer> pIds)
- {
- for (Integer pId : pIds)
- {
+ private static void addCompletedPartitions(Set<Integer> set, TaskContext ctx,
+ Iterable<Integer> pIds) {
+ for (Integer pId : pIds) {
TaskPartitionState state = ctx.getPartitionState(pId);
- if (state == TaskPartitionState.COMPLETED)
- {
+ if (state == TaskPartitionState.COMPLETED) {
set.add(pId);
}
}
@@ -569,23 +508,17 @@ public class TaskRebalancer implements Rebalancer
/**
* Returns the set of all partition ids for a task.
* <p/>
- * If a set of partition ids was explicitly specified in the config, that is used. Otherwise, we use the list of all
- * partition ids from the target resource.
+ * If a set of partition ids was explicitly specified in the config, that is used. Otherwise, we
+ * use the list of all partition ids from the target resource.
*/
- private static Set<Integer> getAllTaskPartitions(IdealState tgtResourceIs, TaskConfig taskCfg)
- {
+ private static Set<Integer> getAllTaskPartitions(IdealState tgtResourceIs, TaskConfig taskCfg) {
Set<Integer> taskPartitions = new HashSet<Integer>();
- if (taskCfg.getTargetPartitions() != null)
- {
- for (Integer pId : taskCfg.getTargetPartitions())
- {
+ if (taskCfg.getTargetPartitions() != null) {
+ for (Integer pId : taskCfg.getTargetPartitions()) {
taskPartitions.add(pId);
}
- }
- else
- {
- for (String pName : tgtResourceIs.getPartitionSet())
- {
+ } else {
+ for (String pName : tgtResourceIs.getPartitionSet()) {
taskPartitions.add(pId(pName));
}
}
@@ -593,18 +526,15 @@ public class TaskRebalancer implements Rebalancer
return taskPartitions;
}
- private static List<Integer> getNextPartitions(SortedSet<Integer> candidatePartitions, Set<Integer> excluded, int n)
- {
+ private static List<Integer> getNextPartitions(SortedSet<Integer> candidatePartitions,
+ Set<Integer> excluded, int n) {
List<Integer> result = new ArrayList<Integer>(n);
- for (Integer pId : candidatePartitions)
- {
- if (result.size() >= n)
- {
+ for (Integer pId : candidatePartitions) {
+ if (result.size() >= n) {
break;
}
- if (!excluded.contains(pId))
- {
+ if (!excluded.contains(pId)) {
result.add(pId);
}
}
@@ -612,15 +542,13 @@ public class TaskRebalancer implements Rebalancer
return result;
}
- private static void markPartitionCompleted(TaskContext ctx, int pId)
- {
+ private static void markPartitionCompleted(TaskContext ctx, int pId) {
ctx.setPartitionState(pId, TaskPartitionState.COMPLETED);
ctx.setPartitionFinishTime(pId, System.currentTimeMillis());
ctx.incrementNumAttempts(pId);
}
- private static void markPartitionError(TaskContext ctx, int pId, TaskPartitionState state)
- {
+ private static void markPartitionError(TaskContext ctx, int pId, TaskPartitionState state) {
ctx.setPartitionState(pId, state);
ctx.setPartitionFinishTime(pId, System.currentTimeMillis());
ctx.incrementNumAttempts(pId);
@@ -628,38 +556,31 @@ public class TaskRebalancer implements Rebalancer
/**
* Get partition assignments for the target resource, but only for the partitions of interest.
- *
* @param currStateOutput The current state of the instances in the cluster.
- * @param instanceList The set of instances.
- * @param tgtIs The ideal state of the target resource.
- * @param tgtStates Only partitions in this set of states will be considered. If null, partitions do not need to
- * be in any specific state to be considered.
- * @param includeSet The set of partitions to consider.
- *
+ * @param instanceList The set of instances.
+ * @param tgtIs The ideal state of the target resource.
+ * @param tgtStates Only partitions in this set of states will be considered. If null, partitions
+ * do not need to
+ * be in any specific state to be considered.
+ * @param includeSet The set of partitions to consider.
* @return A map of instance vs set of partition ids assigned to that instance.
*/
- private static Map<String, SortedSet<Integer>> getTgtPartitionAssignment(CurrentStateOutput currStateOutput,
- Iterable<String> instanceList,
- IdealState tgtIs,
- Set<String> tgtStates,
- Set<Integer> includeSet)
- {
+ private static Map<String, SortedSet<Integer>> getTgtPartitionAssignment(
+ ResourceCurrentState currStateOutput, Iterable<String> instanceList, IdealState tgtIs,
+ Set<String> tgtStates, Set<Integer> includeSet) {
Map<String, SortedSet<Integer>> result = new HashMap<String, SortedSet<Integer>>();
- for (String instance : instanceList)
- {
+ for (String instance : instanceList) {
result.put(instance, new TreeSet<Integer>());
}
- for (String pName : tgtIs.getPartitionSet())
- {
+ for (String pName : tgtIs.getPartitionSet()) {
int pId = pId(pName);
- if (includeSet.contains(pId))
- {
- for (String instance : instanceList)
- {
- String state = currStateOutput.getCurrentState(tgtIs.getResourceName(), new Partition(pName), instance);
- if (tgtStates == null || tgtStates.contains(state))
- {
+ if (includeSet.contains(pId)) {
+ for (String instance : instanceList) {
+ State state =
+ currStateOutput.getCurrentState(tgtIs.getResourceId(), PartitionId.from(pName),
+ ParticipantId.from(instance));
+ if (tgtStates == null || tgtStates.contains(state.toString())) {
result.get(instance).add(pId);
}
}
@@ -672,27 +593,20 @@ public class TaskRebalancer implements Rebalancer
/**
* Return the assignment of task partitions per instance.
*/
- private static Map<String, SortedSet<Integer>> getTaskPartitionAssignments(Iterable<String> instanceList,
- ResourceAssignment assignment,
- Set<Integer> includeSet)
- {
+ private static Map<String, SortedSet<Integer>> getTaskPartitionAssignments(
+ Iterable<String> instanceList, ResourceAssignment assignment, Set<Integer> includeSet) {
Map<String, SortedSet<Integer>> result = new HashMap<String, SortedSet<Integer>>();
- for (String instance : instanceList)
- {
+ for (String instance : instanceList) {
result.put(instance, new TreeSet<Integer>());
}
- for (Partition partition : assignment.getMappedPartitions())
- {
- int pId = pId(partition.getPartitionName());
- if (includeSet.contains(pId))
- {
- Map<String, String> replicaMap = assignment.getReplicaMap(partition);
- for (String instance : replicaMap.keySet())
- {
+ for (PartitionId partition : assignment.getMappedPartitionIds()) {
+ int pId = pId(partition.toString());
+ if (includeSet.contains(pId)) {
+ Map<ParticipantId, State> replicaMap = assignment.getReplicaMap(partition);
+ for (ParticipantId instance : replicaMap.keySet()) {
SortedSet<Integer> pList = result.get(instance);
- if (pList != null)
- {
+ if (pList != null) {
pList.add(pId);
}
}
@@ -705,16 +619,14 @@ public class TaskRebalancer implements Rebalancer
/**
* Computes the partition name given the resource name and partition id.
*/
- private static String pName(String resource, int pId)
- {
+ private static String pName(String resource, int pId) {
return resource + "_" + pId;
}
/**
* Extracts the partition id from the given partition name.
*/
- private static int pId(String pName)
- {
+ private static int pId(String pName) {
String[] tokens = pName.split("_");
return Integer.valueOf(tokens[tokens.length - 1]);
}
@@ -722,13 +634,11 @@ public class TaskRebalancer implements Rebalancer
/**
* An (instance, state) pair.
*/
- private static class PartitionAssignment
- {
+ private static class PartitionAssignment {
private final String _instance;
private final String _state;
- private PartitionAssignment(String instance, String state)
- {
+ private PartitionAssignment(String instance, String state) {
_instance = instance;
_state = state;
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/925b7e94/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
index d7b235e..741ed4d 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskUtil.java
@@ -3,11 +3,10 @@
*/
package org.apache.helix.task;
-
-import com.google.common.base.Joiner;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+
import org.apache.helix.AccessOption;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixDataAccessor;
@@ -20,139 +19,124 @@ import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.log4j.Logger;
+import com.google.common.base.Joiner;
/**
* Static utility methods.
- *
- * @author Abe <as...@linkedin.com>
- * @version $Revision$
*/
-public class TaskUtil
-{
+public class TaskUtil {
private static final Logger LOG = Logger.getLogger(TaskUtil.class);
private static final String CONTEXT_NODE = "Context";
private static final String PREV_RA_NODE = "PreviousResourceAssignment";
/**
* Parses task resource configurations in Helix into a {@link TaskConfig} object.
- *
- * @param manager HelixManager object used to connect to Helix.
+ * @param manager HelixManager object used to connect to Helix.
* @param taskResource The name of the task resource.
- *
- * @return A {@link TaskConfig} object if Helix contains valid configurations for the task, null otherwise.
+ * @return A {@link TaskConfig} object if Helix contains valid configurations for the task, null
+ * otherwise.
*/
- public static TaskConfig getTaskCfg(HelixManager manager, String taskResource)
- {
+ public static TaskConfig getTaskCfg(HelixManager manager, String taskResource) {
Map<String, String> taskCfg = getResourceConfigMap(manager, taskResource);
TaskConfig.Builder b = TaskConfig.Builder.fromMap(taskCfg);
return b.build();
}
- public static WorkflowConfig getWorkflowCfg(HelixManager manager, String workflowResource)
- {
+ public static WorkflowConfig getWorkflowCfg(HelixManager manager, String workflowResource) {
Map<String, String> workflowCfg = getResourceConfigMap(manager, workflowResource);
WorkflowConfig.Builder b = WorkflowConfig.Builder.fromMap(workflowCfg);
return b.build();
}
- public static boolean setRequestedState(HelixDataAccessor accessor,
- String instance,
- String sessionId,
- String resource,
- String partition,
- TaskPartitionState state)
- {
- LOG.debug(String.format("Requesting a state transition to %s for partition %s.", state, partition));
- try
- {
+
+ public static boolean setRequestedState(HelixDataAccessor accessor, String instance,
+ String sessionId, String resource, String partition, TaskPartitionState state) {
+ LOG.debug(String.format("Requesting a state transition to %s for partition %s.", state,
+ partition));
+ try {
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
PropertyKey key = keyBuilder.currentState(instance, sessionId, resource);
CurrentState currStateDelta = new CurrentState(resource);
currStateDelta.setRequestedState(partition, state.name());
return accessor.updateProperty(key, currStateDelta);
- }
- catch (Exception e)
- {
- LOG.error(String.format("Error when requesting a state transition to %s for partition %s.", state, partition), e);
+ } catch (Exception e) {
+ LOG.error(String.format("Error when requesting a state transition to %s for partition %s.",
+ state, partition), e);
return false;
}
}
- public static HelixConfigScope getResourceConfigScope(String clusterName, String resource)
- {
+ public static HelixConfigScope getResourceConfigScope(String clusterName, String resource) {
return new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.RESOURCE)
- .forCluster(clusterName).forResource(resource).build();
+ .forCluster(clusterName).forResource(resource).build();
}
- public static ResourceAssignment getPrevResourceAssignment(HelixManager manager, String resourceName)
- {
- ZNRecord r = manager.getHelixPropertyStore().get(Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT,
- resourceName, PREV_RA_NODE), null, AccessOption.PERSISTENT);
+ public static ResourceAssignment getPrevResourceAssignment(HelixManager manager,
+ String resourceName) {
+ ZNRecord r =
+ manager.getHelixPropertyStore().get(
+ Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resourceName, PREV_RA_NODE),
+ null, AccessOption.PERSISTENT);
return r != null ? new ResourceAssignment(r) : null;
}
- public static void setPrevResourceAssignment(HelixManager manager, String resourceName, ResourceAssignment ra)
- {
- manager.getHelixPropertyStore().set(Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT,
- resourceName, PREV_RA_NODE), ra.getRecord(), AccessOption.PERSISTENT);
+ public static void setPrevResourceAssignment(HelixManager manager, String resourceName,
+ ResourceAssignment ra) {
+ manager.getHelixPropertyStore().set(
+ Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resourceName, PREV_RA_NODE),
+ ra.getRecord(), AccessOption.PERSISTENT);
}
- public static TaskContext getTaskContext(HelixManager manager, String taskResource)
- {
- ZNRecord r = manager.getHelixPropertyStore().get(Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT,
- taskResource,
- CONTEXT_NODE), null, AccessOption.PERSISTENT);
+ public static TaskContext getTaskContext(HelixManager manager, String taskResource) {
+ ZNRecord r =
+ manager.getHelixPropertyStore().get(
+ Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, taskResource, CONTEXT_NODE),
+ null, AccessOption.PERSISTENT);
return r != null ? new TaskContext(r) : null;
}
- public static void setTaskContext(HelixManager manager, String taskResource, TaskContext ctx)
- {
- manager.getHelixPropertyStore().set(Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT,
- taskResource,
- CONTEXT_NODE), ctx.getRecord(), AccessOption.PERSISTENT);
+ public static void setTaskContext(HelixManager manager, String taskResource, TaskContext ctx) {
+ manager.getHelixPropertyStore().set(
+ Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, taskResource, CONTEXT_NODE),
+ ctx.getRecord(), AccessOption.PERSISTENT);
}
- public static WorkflowContext getWorkflowContext(HelixManager manager, String workflowResource)
- {
- ZNRecord r = manager.getHelixPropertyStore().get(Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT,
- workflowResource,
- CONTEXT_NODE), null, AccessOption.PERSISTENT);
+ public static WorkflowContext getWorkflowContext(HelixManager manager, String workflowResource) {
+ ZNRecord r =
+ manager.getHelixPropertyStore().get(
+ Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowResource,
+ CONTEXT_NODE), null, AccessOption.PERSISTENT);
return r != null ? new WorkflowContext(r) : null;
}
- public static void setWorkflowContext(HelixManager manager, String workflowResource, WorkflowContext ctx)
- {
- manager.getHelixPropertyStore().set(Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT,
- workflowResource,
- CONTEXT_NODE), ctx.getRecord(), AccessOption.PERSISTENT);
+ public static void setWorkflowContext(HelixManager manager, String workflowResource,
+ WorkflowContext ctx) {
+ manager.getHelixPropertyStore().set(
+ Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowResource, CONTEXT_NODE),
+ ctx.getRecord(), AccessOption.PERSISTENT);
}
- public static String getNamespacedTaskName(String singleTaskWorkflow)
- {
+ public static String getNamespacedTaskName(String singleTaskWorkflow) {
return getNamespacedTaskName(singleTaskWorkflow, singleTaskWorkflow);
}
- public static String getNamespacedTaskName(String workflowResource, String taskName)
- {
+ public static String getNamespacedTaskName(String workflowResource, String taskName) {
return workflowResource + "_" + taskName;
}
- private static Map<String, String> getResourceConfigMap(HelixManager manager, String resource)
- {
+ private static Map<String, String> getResourceConfigMap(HelixManager manager, String resource) {
HelixConfigScope scope = getResourceConfigScope(manager.getClusterName(), resource);
ConfigAccessor configAccessor = manager.getConfigAccessor();
Map<String, String> taskCfg = new HashMap<String, String>();
List<String> cfgKeys = configAccessor.getKeys(scope);
- if (cfgKeys == null || cfgKeys.isEmpty())
- {
+ if (cfgKeys == null || cfgKeys.isEmpty()) {
return null;
}
- for (String cfgKey : cfgKeys)
- {
+ for (String cfgKey : cfgKeys) {
taskCfg.put(cfgKey, configAccessor.get(scope, cfgKey));
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/925b7e94/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
index dec884b..7db753c 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancer.java
@@ -3,17 +3,30 @@
*/
package org.apache.helix.integration.task;
-
-import com.google.common.base.Joiner;
-import com.google.common.collect.ImmutableList;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.Map;
-import org.apache.helix.*;
-import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.AccessOption;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyKey;
import org.apache.helix.integration.ZkIntegrationTestBase;
-import org.apache.helix.task.*;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskConfig;
+import org.apache.helix.task.TaskConstants;
+import org.apache.helix.task.TaskContext;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.TaskFactory;
+import org.apache.helix.task.TaskPartitionState;
+import org.apache.helix.task.TaskResult;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.TaskUtil;
+import org.apache.helix.task.Workflow;
+import org.apache.helix.task.WorkflowContext;
import org.apache.helix.tools.ClusterSetup;
import org.apache.helix.tools.ClusterStateVerifier;
import org.testng.Assert;
@@ -21,142 +34,120 @@ import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import com.google.common.base.Joiner;
+import com.google.common.collect.ImmutableList;
/**
* @author Abe <as...@linkedin.com>
* @version $Revision$
*/
-public class TestTaskRebalancer extends ZkIntegrationTestBase
-{
+public class TestTaskRebalancer extends ZkIntegrationTestBase {
private static final int NUM_NODES = 5;
private static final int START_PORT = 12918;
private static final String MASTER_SLAVE_STATE_MODEL = "MasterSlave";
private static final int NUM_PARTITIONS = 20;
private static final int NUM_REPLICAS = 3;
private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + getShortClassName();
- private final Map<String, TestHelper.StartCMResult> _startCMResultMap = new HashMap<String, TestHelper.StartCMResult>();
+ // private final Map<String, TestHelper.StartCMResult> _startCMResultMap = new HashMap<String,
+ // TestHelper.StartCMResult>();
+ private final MockParticipantManager[] _participants = new MockParticipantManager[NUM_NODES];
+ ClusterControllerManager _controller;
private HelixManager _manager;
private TaskDriver _driver;
@BeforeClass
- public void beforeClass()
- throws Exception
- {
+ public void beforeClass() throws Exception {
String namespace = "/" + CLUSTER_NAME;
- if (_gZkClient.exists(namespace))
- {
+ if (_gZkClient.exists(namespace)) {
_gZkClient.deleteRecursive(namespace);
}
ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
setupTool.addCluster(CLUSTER_NAME, true);
- for (int i = 0; i < NUM_NODES; i++)
- {
+ for (int i = 0; i < NUM_NODES; i++) {
String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
}
// Set up target db
- setupTool.addResourceToCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, NUM_PARTITIONS, MASTER_SLAVE_STATE_MODEL);
+ setupTool.addResourceToCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, NUM_PARTITIONS,
+ MASTER_SLAVE_STATE_MODEL);
setupTool.rebalanceStorageCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, NUM_REPLICAS);
Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
- taskFactoryReg.put("Reindex", new TaskFactory()
- {
+ taskFactoryReg.put("Reindex", new TaskFactory() {
@Override
- public Task createNewTask(String config)
- {
+ public Task createNewTask(String config) {
return new ReindexTask(config);
}
});
// start dummy participants
- for (int i = 0; i < NUM_NODES; i++)
- {
+ for (int i = 0; i < NUM_NODES; i++) {
String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
- TestHelper.StartCMResult result = TestUtil.startDummyProcess(ZK_ADDR, CLUSTER_NAME, instanceName, taskFactoryReg);
- _startCMResultMap.put(instanceName, result);
+ // TestHelper.StartCMResult result =
+ _participants[i] =
+ TestUtil.startDummyProcess(ZK_ADDR, CLUSTER_NAME, instanceName, taskFactoryReg);
+ // _startCMResultMap.put(instanceName, result);
}
// start controller
String controllerName = CONTROLLER_PREFIX + "_0";
- TestHelper.StartCMResult startResult = TestHelper.startController(CLUSTER_NAME,
- controllerName,
- ZK_ADDR,
- HelixControllerMain.STANDALONE);
- _startCMResultMap.put(controllerName, startResult);
-
+ // TestHelper.StartCMResult startResult =
+ // TestHelper.startController(CLUSTER_NAME, controllerName, ZK_ADDR,
+ // HelixControllerMain.STANDALONE);
+ // _startCMResultMap.put(controllerName, startResult);
+ _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+ _controller.syncStart();
// create cluster manager
- _manager = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR, ZK_ADDR);
+ _manager =
+ HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR,
+ ZK_ADDR);
_manager.connect();
_driver = new TaskDriver(_manager);
- boolean result = ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.MasterNbInExtViewVerifier(ZK_ADDR,
- CLUSTER_NAME));
+ boolean result =
+ ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.MasterNbInExtViewVerifier(
+ ZK_ADDR, CLUSTER_NAME));
Assert.assertTrue(result);
- result = ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
- CLUSTER_NAME));
+ result =
+ ClusterStateVerifier
+ .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+ CLUSTER_NAME));
Assert.assertTrue(result);
}
@AfterClass
- public void afterClass()
- throws Exception
- {
+ public void afterClass() throws Exception {
/**
* shutdown order: 1) disconnect the controller 2) disconnect participants
*/
-
- TestHelper.StartCMResult result;
- Iterator<Map.Entry<String, TestHelper.StartCMResult>> it = _startCMResultMap.entrySet().iterator();
- while (it.hasNext())
- {
- String instanceName = it.next().getKey();
- if (instanceName.startsWith(CONTROLLER_PREFIX))
- {
- result = _startCMResultMap.get(instanceName);
- result._manager.disconnect();
- result._thread.interrupt();
- it.remove();
- }
- }
-
- Thread.sleep(100);
- it = _startCMResultMap.entrySet().iterator();
- while (it.hasNext())
- {
- String instanceName = it.next().getKey();
- result = _startCMResultMap.get(instanceName);
- result._manager.disconnect();
- result._thread.interrupt();
- it.remove();
+ _controller.syncStop();
+ for (MockParticipantManager participant : _participants) {
+ participant.syncStop();
}
-
_manager.disconnect();
}
@Test
- public void basic()
- throws Exception
- {
+ public void basic() throws Exception {
basic(100);
}
@Test
- public void zeroTaskCompletionTime()
- throws Exception
- {
+ public void zeroTaskCompletionTime() throws Exception {
basic(0);
}
@Test
- public void testExpiry() throws Exception
- {
+ public void testExpiry() throws Exception {
String taskName = "Expiry";
long expiry = 1000;
- Workflow flow = WorkflowGenerator.generateDefaultSingleTaskWorkflowBuilderWithExtraConfigs(taskName,
- TaskConfig.COMMAND_CONFIG, String.valueOf(100)).setExpiry(expiry).build();
+ Workflow flow =
+ WorkflowGenerator
+ .generateDefaultSingleTaskWorkflowBuilderWithExtraConfigs(taskName,
+ TaskConfig.COMMAND_CONFIG, String.valueOf(100)).setExpiry(expiry).build();
_driver.start(flow);
TestUtil.pollForWorkflowState(_manager, taskName, TaskState.IN_PROGRESS);
@@ -164,10 +155,12 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase
// Running workflow should have config and context viewable through accessor
HelixDataAccessor accessor = _manager.getHelixDataAccessor();
PropertyKey workflowCfgKey = accessor.keyBuilder().resourceConfig(taskName);
- String workflowPropStoreKey = Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, taskName);
+ String workflowPropStoreKey =
+ Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, taskName);
// Ensure context and config exist
- Assert.assertTrue(_manager.getHelixPropertyStore().exists(workflowPropStoreKey, AccessOption.PERSISTENT));
+ Assert.assertTrue(_manager.getHelixPropertyStore().exists(workflowPropStoreKey,
+ AccessOption.PERSISTENT));
Assert.assertNotSame(accessor.getProperty(workflowCfgKey), null);
// Wait for task to finish and expire
@@ -177,18 +170,21 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase
Thread.sleep(expiry);
// Ensure workflow config and context were cleaned up by now
- Assert.assertFalse(_manager.getHelixPropertyStore().exists(workflowPropStoreKey, AccessOption.PERSISTENT));
+ Assert.assertFalse(_manager.getHelixPropertyStore().exists(workflowPropStoreKey,
+ AccessOption.PERSISTENT));
Assert.assertEquals(accessor.getProperty(workflowCfgKey), null);
}
- private void basic(long taskCompletionTime)
- throws Exception
- {
- // We use a different resource name in each test method as a work around for a helix participant bug where it does
- // not clear locally cached state when a resource partition is dropped. Once that is fixed we should change these
- // tests to use the same resource name and implement a beforeMethod that deletes the task resource.
+ private void basic(long taskCompletionTime) throws Exception {
+ // We use a different resource name in each test method as a work around for a helix participant
+ // bug where it does
+ // not clear locally cached state when a resource partition is dropped. Once that is fixed we
+ // should change these
+ // tests to use the same resource name and implement a beforeMethod that deletes the task
+ // resource.
final String taskResource = "basic" + taskCompletionTime;
- Workflow flow = WorkflowGenerator.generateDefaultSingleTaskWorkflowBuilderWithExtraConfigs(taskResource,
+ Workflow flow =
+ WorkflowGenerator.generateDefaultSingleTaskWorkflowBuilderWithExtraConfigs(taskResource,
TaskConfig.COMMAND_CONFIG, String.valueOf(taskCompletionTime)).build();
_driver.start(flow);
@@ -196,26 +192,25 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase
TestUtil.pollForWorkflowState(_manager, taskResource, TaskState.COMPLETED);
// Ensure all partitions are completed individually
- TaskContext ctx = TaskUtil.getTaskContext(_manager, TaskUtil.getNamespacedTaskName(taskResource));
- for (int i = 0; i < NUM_PARTITIONS; i++)
- {
+ TaskContext ctx =
+ TaskUtil.getTaskContext(_manager, TaskUtil.getNamespacedTaskName(taskResource));
+ for (int i = 0; i < NUM_PARTITIONS; i++) {
Assert.assertEquals(ctx.getPartitionState(i), TaskPartitionState.COMPLETED);
Assert.assertEquals(ctx.getPartitionNumAttempts(i), 1);
}
}
@Test
- public void partitionSet()
- throws Exception
- {
+ public void partitionSet() throws Exception {
final String taskResource = "partitionSet";
ImmutableList<Integer> targetPartitions = ImmutableList.of(1, 2, 3, 5, 8, 13);
// construct and submit our basic workflow
- Workflow flow = WorkflowGenerator.generateDefaultSingleTaskWorkflowBuilderWithExtraConfigs(taskResource,
- TaskConfig.COMMAND_CONFIG, String.valueOf(100),
- TaskConfig.MAX_ATTEMPTS_PER_PARTITION, String.valueOf(1),
- TaskConfig.TARGET_PARTITIONS, Joiner.on(",").join(targetPartitions)).build();
+ Workflow flow =
+ WorkflowGenerator.generateDefaultSingleTaskWorkflowBuilderWithExtraConfigs(taskResource,
+ TaskConfig.COMMAND_CONFIG, String.valueOf(100), TaskConfig.MAX_ATTEMPTS_PER_PARTITION,
+ String.valueOf(1), TaskConfig.TARGET_PARTITIONS, Joiner.on(",").join(targetPartitions))
+ .build();
_driver.start(flow);
// wait for task completeness/timeout
@@ -229,36 +224,33 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase
Assert.assertNotNull(ctx);
Assert.assertNotNull(workflowContext);
Assert.assertEquals(workflowContext.getTaskState(namespacedName), TaskState.COMPLETED);
- for (int i : targetPartitions)
- {
+ for (int i : targetPartitions) {
Assert.assertEquals(ctx.getPartitionState(i), TaskPartitionState.COMPLETED);
Assert.assertEquals(ctx.getPartitionNumAttempts(i), 1);
}
}
@Test
- public void testRepeatedWorkflow() throws Exception
- {
+ public void testRepeatedWorkflow() throws Exception {
String workflowName = "SomeWorkflow";
- Workflow flow = WorkflowGenerator.generateDefaultRepeatedTaskWorkflowBuilder(workflowName).build();
+ Workflow flow =
+ WorkflowGenerator.generateDefaultRepeatedTaskWorkflowBuilder(workflowName).build();
new TaskDriver(_manager).start(flow);
// Wait until the task completes
TestUtil.pollForWorkflowState(_manager, workflowName, TaskState.COMPLETED);
// Assert completion for all tasks within two minutes
- for(String task : flow.getTaskConfigs().keySet())
- {
+ for (String task : flow.getTaskConfigs().keySet()) {
TestUtil.pollForTaskState(_manager, workflowName, task, TaskState.COMPLETED);
}
}
@Test
- public void timeouts()
- throws Exception
- {
+ public void timeouts() throws Exception {
final String taskResource = "timeouts";
- Workflow flow = WorkflowGenerator.generateDefaultSingleTaskWorkflowBuilderWithExtraConfigs(taskResource,
+ Workflow flow =
+ WorkflowGenerator.generateDefaultSingleTaskWorkflowBuilderWithExtraConfigs(taskResource,
TaskConfig.MAX_ATTEMPTS_PER_PARTITION, String.valueOf(2),
TaskConfig.TIMEOUT_PER_PARTITION, String.valueOf(100)).build();
_driver.start(flow);
@@ -267,13 +259,12 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase
TestUtil.pollForWorkflowState(_manager, taskResource, TaskState.FAILED);
// Check that all partitions timed out up to maxAttempts
- TaskContext ctx = TaskUtil.getTaskContext(_manager, TaskUtil.getNamespacedTaskName(taskResource));
+ TaskContext ctx =
+ TaskUtil.getTaskContext(_manager, TaskUtil.getNamespacedTaskName(taskResource));
int maxAttempts = 0;
- for (int i = 0; i < NUM_PARTITIONS; i++)
- {
+ for (int i = 0; i < NUM_PARTITIONS; i++) {
TaskPartitionState state = ctx.getPartitionState(i);
- if (state != null)
- {
+ if (state != null) {
Assert.assertEquals(state, TaskPartitionState.TIMED_OUT);
maxAttempts = Math.max(maxAttempts, ctx.getPartitionNumAttempts(i));
}
@@ -281,48 +272,40 @@ public class TestTaskRebalancer extends ZkIntegrationTestBase
Assert.assertEquals(maxAttempts, 2);
}
- private static class ReindexTask implements Task
- {
+ private static class ReindexTask implements Task {
private final long _delay;
private volatile boolean _canceled;
- public ReindexTask(String cfg)
- {
+ public ReindexTask(String cfg) {
_delay = Long.parseLong(cfg);
}
@Override
- public TaskResult run()
- {
+ public TaskResult run() {
long expiry = System.currentTimeMillis() + _delay;
long timeLeft;
- while (System.currentTimeMillis() < expiry)
- {
- if (_canceled)
- {
+ while (System.currentTimeMillis() < expiry) {
+ if (_canceled) {
timeLeft = expiry - System.currentTimeMillis();
- return new TaskResult(TaskResult.Status.CANCELED, String.valueOf(timeLeft < 0 ? 0 : timeLeft));
+ return new TaskResult(TaskResult.Status.CANCELED, String.valueOf(timeLeft < 0 ? 0
+ : timeLeft));
}
sleep(50);
}
timeLeft = expiry - System.currentTimeMillis();
- return new TaskResult(TaskResult.Status.COMPLETED, String.valueOf(timeLeft < 0 ? 0 : timeLeft));
+ return new TaskResult(TaskResult.Status.COMPLETED,
+ String.valueOf(timeLeft < 0 ? 0 : timeLeft));
}
@Override
- public void cancel()
- {
+ public void cancel() {
_canceled = true;
}
- private static void sleep(long d)
- {
- try
- {
+ private static void sleep(long d) {
+ try {
Thread.sleep(d);
- }
- catch (InterruptedException e)
- {
+ } catch (InterruptedException e) {
e.printStackTrace();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-helix/blob/925b7e94/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
index 4c17397..05ed55e 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerStopResume.java
@@ -3,18 +3,23 @@
*/
package org.apache.helix.integration.task;
-
import java.util.HashMap;
-import java.util.Iterator;
import java.util.Map;
+
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
-import org.apache.helix.TestHelper;
-import org.apache.helix.controller.HelixControllerMain;
import org.apache.helix.integration.ZkIntegrationTestBase;
import org.apache.helix.integration.ZkStandAloneCMTestBase;
-import org.apache.helix.task.*;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskConfig;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.TaskFactory;
+import org.apache.helix.task.TaskResult;
+import org.apache.helix.task.TaskState;
+import org.apache.helix.task.Workflow;
import org.apache.helix.tools.ClusterSetup;
import org.apache.helix.tools.ClusterStateVerifier;
import org.apache.log4j.Logger;
@@ -23,13 +28,11 @@ import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
-
/**
* @author Abe <as...@linkedin.com>
* @version $Revision$
*/
-public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase
-{
+public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase {
private static final Logger LOG = Logger.getLogger(ZkStandAloneCMTestBase.class);
private static final int NUM_NODES = 5;
private static final int START_PORT = 12918;
@@ -39,24 +42,23 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase
private static final int NUM_PARTITIONS = 20;
private static final int NUM_REPLICAS = 3;
private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + getShortClassName();
- private final Map<String, TestHelper.StartCMResult> _startCMResultMap = new HashMap<String, TestHelper.StartCMResult>();
+ // private final Map<String, TestHelper.StartCMResult> _startCMResultMap = new HashMap<String,
+ // TestHelper.StartCMResult>();
+ private MockParticipantManager _participants[] = new MockParticipantManager[NUM_NODES];
+ private ClusterControllerManager _controller;
private HelixManager _manager;
private TaskDriver _driver;
@BeforeClass
- public void beforeClass()
- throws Exception
- {
+ public void beforeClass() throws Exception {
String namespace = "/" + CLUSTER_NAME;
- if (_gZkClient.exists(namespace))
- {
+ if (_gZkClient.exists(namespace)) {
_gZkClient.deleteRecursive(namespace);
}
ClusterSetup setupTool = new ClusterSetup(ZK_ADDR);
setupTool.addCluster(CLUSTER_NAME, true);
- for (int i = 0; i < NUM_NODES; i++)
- {
+ for (int i = 0; i < NUM_NODES; i++) {
String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName);
}
@@ -66,87 +68,69 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase
setupTool.rebalanceStorageCluster(CLUSTER_NAME, TGT_DB, NUM_REPLICAS);
Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
- taskFactoryReg.put("Reindex", new TaskFactory()
- {
+ taskFactoryReg.put("Reindex", new TaskFactory() {
@Override
- public Task createNewTask(String config)
- {
+ public Task createNewTask(String config) {
return new ReindexTask(config);
}
});
// start dummy participants
- for (int i = 0; i < NUM_NODES; i++)
- {
+ for (int i = 0; i < NUM_NODES; i++) {
String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
- TestHelper.StartCMResult result = TestUtil.startDummyProcess(ZK_ADDR, CLUSTER_NAME, instanceName, taskFactoryReg);
- _startCMResultMap.put(instanceName, result);
+ // TestHelper.StartCMResult result =
+ _participants[i] =
+ TestUtil.startDummyProcess(ZK_ADDR, CLUSTER_NAME, instanceName, taskFactoryReg);
+ // _startCMResultMap.put(instanceName, result);
}
// start controller
String controllerName = CONTROLLER_PREFIX + "_0";
- TestHelper.StartCMResult startResult = TestHelper.startController(CLUSTER_NAME,
- controllerName,
- ZK_ADDR,
- HelixControllerMain.STANDALONE);
- _startCMResultMap.put(controllerName, startResult);
+ // TestHelper.StartCMResult startResult =
+ // TestHelper.startController(CLUSTER_NAME, controllerName, ZK_ADDR,
+ // HelixControllerMain.STANDALONE);
+ // _startCMResultMap.put(controllerName, startResult);
+ _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+ _controller.syncStart();
// create cluster manager
- _manager = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR, ZK_ADDR);
+ _manager =
+ HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR,
+ ZK_ADDR);
_manager.connect();
_driver = new TaskDriver(_manager);
- boolean result = ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.MasterNbInExtViewVerifier(ZK_ADDR,
- CLUSTER_NAME));
+ boolean result =
+ ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.MasterNbInExtViewVerifier(
+ ZK_ADDR, CLUSTER_NAME));
Assert.assertTrue(result);
- result = ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
- CLUSTER_NAME));
+ result =
+ ClusterStateVerifier
+ .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR,
+ CLUSTER_NAME));
Assert.assertTrue(result);
}
@AfterClass
- public void afterClass()
- throws Exception
- {
+ public void afterClass() throws Exception {
/**
* shutdown order: 1) disconnect the controller 2) disconnect participants
*/
- TestHelper.StartCMResult result;
- Iterator<Map.Entry<String, TestHelper.StartCMResult>> it = _startCMResultMap.entrySet().iterator();
- while (it.hasNext())
- {
- String instanceName = it.next().getKey();
- if (instanceName.startsWith(CONTROLLER_PREFIX))
- {
- result = _startCMResultMap.get(instanceName);
- result._manager.disconnect();
- result._thread.interrupt();
- it.remove();
- }
- }
-
- Thread.sleep(100);
- it = _startCMResultMap.entrySet().iterator();
- while (it.hasNext())
- {
- String instanceName = it.next().getKey();
- result = _startCMResultMap.get(instanceName);
- result._manager.disconnect();
- result._thread.interrupt();
- it.remove();
+ _controller.syncStop();
+ for (MockParticipantManager participant : _participants) {
+ participant.syncStop();
}
_manager.disconnect();
}
@Test
- public void stopAndResume()
- throws Exception
- {
- Workflow flow = WorkflowGenerator.generateDefaultSingleTaskWorkflowBuilderWithExtraConfigs(TASK_RESOURCE,
+ public void stopAndResume() throws Exception {
+ Workflow flow =
+ WorkflowGenerator.generateDefaultSingleTaskWorkflowBuilderWithExtraConfigs(TASK_RESOURCE,
TaskConfig.COMMAND_CONFIG, String.valueOf(100)).build();
LOG.info("Starting flow " + flow.getName());
@@ -163,9 +147,7 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase
}
@Test
- public void stopAndResumeWorkflow()
- throws Exception
- {
+ public void stopAndResumeWorkflow() throws Exception {
String workflow = "SomeWorkflow";
Workflow flow = WorkflowGenerator.generateDefaultRepeatedTaskWorkflowBuilder(workflow).build();
@@ -182,48 +164,40 @@ public class TestTaskRebalancerStopResume extends ZkIntegrationTestBase
TestUtil.pollForWorkflowState(_manager, workflow, TaskState.COMPLETED);
}
- public static class ReindexTask implements Task
- {
+ public static class ReindexTask implements Task {
private final long _delay;
private volatile boolean _canceled;
- public ReindexTask(String cfg)
- {
+ public ReindexTask(String cfg) {
_delay = Long.parseLong(cfg);
}
@Override
- public TaskResult run()
- {
+ public TaskResult run() {
long expiry = System.currentTimeMillis() + _delay;
long timeLeft;
- while (System.currentTimeMillis() < expiry)
- {
- if (_canceled)
- {
+ while (System.currentTimeMillis() < expiry) {
+ if (_canceled) {
timeLeft = expiry - System.currentTimeMillis();
- return new TaskResult(TaskResult.Status.CANCELED, String.valueOf(timeLeft < 0 ? 0 : timeLeft));
+ return new TaskResult(TaskResult.Status.CANCELED, String.valueOf(timeLeft < 0 ? 0
+ : timeLeft));
}
sleep(50);
}
timeLeft = expiry - System.currentTimeMillis();
- return new TaskResult(TaskResult.Status.COMPLETED, String.valueOf(timeLeft < 0 ? 0 : timeLeft));
+ return new TaskResult(TaskResult.Status.COMPLETED,
+ String.valueOf(timeLeft < 0 ? 0 : timeLeft));
}
@Override
- public void cancel()
- {
+ public void cancel() {
_canceled = true;
}
- private static void sleep(long d)
- {
- try
- {
+ private static void sleep(long d) {
+ try {
Thread.sleep(d);
- }
- catch (InterruptedException e)
- {
+ } catch (InterruptedException e) {
e.printStackTrace();
}
}