You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/03/04 03:28:33 UTC
git commit: Start changing the task rebalancer to work at a finer
granularity
Repository: helix
Updated Branches:
refs/heads/helix-provisioning 1bc9354d5 -> 080a15ff9
Start changing the task rebalancer to work at a finer granularity
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/080a15ff
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/080a15ff
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/080a15ff
Branch: refs/heads/helix-provisioning
Commit: 080a15ff9241f1a7d2c218447ad2569d62ca120d
Parents: 1bc9354
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Mon Mar 3 18:28:20 2014 -0800
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Mon Mar 3 18:28:20 2014 -0800
----------------------------------------------------------------------
.../helix/task/AbstractTaskRebalancer.java | 7 +-
.../helix/task/IndependentTaskRebalancer.java | 32 +++-
.../java/org/apache/helix/task/TaskConfig.java | 35 ++++-
.../helix/provisioning/tools/TaskManager.java | 152 +++++++++++++++++++
.../provisioning/tools/TestTaskManager.java | 115 ++++++++++++++
5 files changed, 331 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/080a15ff/helix-core/src/main/java/org/apache/helix/task/AbstractTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskRebalancer.java
index 9a9538c..329d02f 100644
--- a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskRebalancer.java
@@ -357,7 +357,9 @@ public abstract class AbstractTaskRebalancer implements HelixRebalancer {
}
if (isTaskComplete(taskCtx, allPartitions)) {
- workflowCtx.setTaskState(taskResource, TaskState.COMPLETED);
+ if (!taskCfg.isLongLived()) {
+ workflowCtx.setTaskState(taskResource, TaskState.COMPLETED);
+ }
if (isWorkflowComplete(workflowCtx, workflowConfig)) {
workflowCtx.setWorkflowState(TaskState.COMPLETED);
workflowCtx.setFinishTime(System.currentTimeMillis());
@@ -553,6 +555,9 @@ public abstract class AbstractTaskRebalancer implements HelixRebalancer {
private static List<Integer> getNextPartitions(SortedSet<Integer> candidatePartitions,
Set<Integer> excluded, int n) {
List<Integer> result = new ArrayList<Integer>(n);
+ if (candidatePartitions == null || candidatePartitions.isEmpty()) {
+ return result;
+ }
for (Integer pId : candidatePartitions) {
if (result.size() >= n) {
break;
http://git-wip-us.apache.org/repos/asf/helix/blob/080a15ff/helix-core/src/main/java/org/apache/helix/task/IndependentTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/IndependentTaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/IndependentTaskRebalancer.java
index 80ec23c..2bc4081 100644
--- a/helix-core/src/main/java/org/apache/helix/task/IndependentTaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/IndependentTaskRebalancer.java
@@ -42,6 +42,7 @@ import org.apache.helix.model.ResourceAssignment;
import com.google.common.base.Function;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
/**
* A task rebalancer that evenly assigns tasks to nodes
@@ -63,11 +64,25 @@ public class IndependentTaskRebalancer extends AbstractTaskRebalancer {
@Override
public Map<String, SortedSet<Integer>> getTaskAssignment(ResourceCurrentState currStateOutput,
ResourceAssignment prevAssignment, Iterable<ParticipantId> instanceList, TaskConfig taskCfg,
- TaskContext taskContext, WorkflowConfig workflowCfg, WorkflowContext workflowCtx,
+ final TaskContext taskContext, WorkflowConfig workflowCfg, WorkflowContext workflowCtx,
Set<Integer> partitionSet, Cluster cluster) {
// Gather input to the full auto rebalancing algorithm
LinkedHashMap<State, Integer> states = new LinkedHashMap<State, Integer>();
states.put(State.from("ONLINE"), 1);
+
+ // Only map partitions whose assignment we care about
+ final Set<TaskPartitionState> honoredStates =
+ Sets.newHashSet(TaskPartitionState.INIT, TaskPartitionState.RUNNING,
+ TaskPartitionState.STOPPED);
+ Set<Integer> filteredPartitionSet = Sets.newHashSet();
+ for (Integer p : partitionSet) {
+ TaskPartitionState state = (taskContext == null) ? null : taskContext.getPartitionState(p);
+ if (state == null || honoredStates.contains(state)) {
+ filteredPartitionSet.add(p);
+ }
+ }
+
+ // Transform from partition id to fully qualified partition name
List<Integer> partitionNums = Lists.newArrayList(partitionSet);
Collections.sort(partitionNums);
final ResourceId resourceId = prevAssignment.getResourceId();
@@ -79,10 +94,21 @@ public class IndependentTaskRebalancer extends AbstractTaskRebalancer {
return PartitionId.from(resourceId, partitionNum.toString());
}
}));
+
+ // Compute the current assignment
Map<PartitionId, Map<ParticipantId, State>> currentMapping = Maps.newHashMap();
for (PartitionId partitionId : currStateOutput.getCurrentStateMappedPartitions(resourceId)) {
- currentMapping.put(partitionId, currStateOutput.getCurrentStateMap(resourceId, partitionId));
- currentMapping.put(partitionId, currStateOutput.getPendingStateMap(resourceId, partitionId));
+ if (!filteredPartitionSet.contains(pId(partitionId.toString()))) {
+ // not computing old partitions
+ continue;
+ }
+ Map<ParticipantId, State> allPreviousDecisionMap = Maps.newHashMap();
+ if (prevAssignment != null) {
+ allPreviousDecisionMap.putAll(prevAssignment.getReplicaMap(partitionId));
+ }
+ allPreviousDecisionMap.putAll(currStateOutput.getCurrentStateMap(resourceId, partitionId));
+ allPreviousDecisionMap.putAll(currStateOutput.getPendingStateMap(resourceId, partitionId));
+ currentMapping.put(partitionId, allPreviousDecisionMap);
}
// Get the assignment keyed on partition
http://git-wip-us.apache.org/repos/asf/helix/blob/080a15ff/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java b/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
index 2834e85..be9db79 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
@@ -62,6 +62,10 @@ public class TaskConfig {
public static final String MAX_ATTEMPTS_PER_PARTITION = "MaxAttemptsPerPartition";
/** The number of concurrent tasks that are allowed to run on an instance. */
public static final String NUM_CONCURRENT_TASKS_PER_INSTANCE = "ConcurrentTasksPerInstance";
+ /** Support overarching tasks that hang around for a while */
+ public static final String LONG_LIVED = "LongLived";
+ /** Support giving tasks a custom name **/
+ public static final String PARTITION_NAME_MAP = "PartitionNameMap";
// // Default property values ////
@@ -78,10 +82,12 @@ public class TaskConfig {
private final long _timeoutPerPartition;
private final int _numConcurrentTasksPerInstance;
private final int _maxAttemptsPerPartition;
+ private final boolean _longLived;
private TaskConfig(String workflow, String targetResource, List<Integer> targetPartitions,
Set<String> targetPartitionStates, String command, String commandConfig,
- long timeoutPerPartition, int numConcurrentTasksPerInstance, int maxAttemptsPerPartition) {
+ long timeoutPerPartition, int numConcurrentTasksPerInstance, int maxAttemptsPerPartition,
+ boolean longLived) {
_workflow = workflow;
_targetResource = targetResource;
_targetPartitions = targetPartitions;
@@ -91,6 +97,7 @@ public class TaskConfig {
_timeoutPerPartition = timeoutPerPartition;
_numConcurrentTasksPerInstance = numConcurrentTasksPerInstance;
_maxAttemptsPerPartition = maxAttemptsPerPartition;
+ _longLived = longLived;
}
public String getWorkflow() {
@@ -129,6 +136,10 @@ public class TaskConfig {
return _maxAttemptsPerPartition;
}
+ public boolean isLongLived() {
+ return _longLived;
+ }
+
public Map<String, String> getResourceConfigMap() {
Map<String, String> cfgMap = new HashMap<String, String>();
cfgMap.put(TaskConfig.WORKFLOW_ID, _workflow);
@@ -143,7 +154,9 @@ public class TaskConfig {
}
cfgMap.put(TaskConfig.TIMEOUT_PER_PARTITION, "" + _timeoutPerPartition);
cfgMap.put(TaskConfig.MAX_ATTEMPTS_PER_PARTITION, "" + _maxAttemptsPerPartition);
-
+ cfgMap.put(TaskConfig.LONG_LIVED + "", String.valueOf(_longLived));
+ cfgMap.put(TaskConfig.NUM_CONCURRENT_TASKS_PER_INSTANCE + "",
+ String.valueOf(_numConcurrentTasksPerInstance));
return cfgMap;
}
@@ -160,13 +173,14 @@ public class TaskConfig {
private long _timeoutPerPartition = DEFAULT_TIMEOUT_PER_PARTITION;
private int _numConcurrentTasksPerInstance = DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;
private int _maxAttemptsPerPartition = DEFAULT_MAX_ATTEMPTS_PER_PARTITION;
+ private boolean _longLived = false;
public TaskConfig build() {
validate();
return new TaskConfig(_workflow, _targetResource, _targetPartitions, _targetPartitionStates,
_command, _commandConfig, _timeoutPerPartition, _numConcurrentTasksPerInstance,
- _maxAttemptsPerPartition);
+ _maxAttemptsPerPartition, _longLived);
}
/**
@@ -205,7 +219,9 @@ public class TaskConfig {
if (cfg.containsKey(MAX_ATTEMPTS_PER_PARTITION)) {
b.setMaxAttemptsPerPartition(Integer.parseInt(cfg.get(MAX_ATTEMPTS_PER_PARTITION)));
}
-
+ if (cfg.containsKey(LONG_LIVED)) {
+ b.setLongLived(Boolean.parseBoolean(cfg.get(LONG_LIVED)));
+ }
return b;
}
@@ -254,8 +270,13 @@ public class TaskConfig {
return this;
}
+ public Builder setLongLived(boolean isLongLived) {
+ _longLived = isLongLived;
+ return this;
+ }
+
private void validate() {
- if (_targetResource == null && (_targetPartitions == null || _targetPartitions.isEmpty())) {
+ if (_targetResource == null && _targetPartitions == null) {
throw new IllegalArgumentException(String.format(
"%s cannot be null without specified partitions", TARGET_RESOURCE));
}
@@ -288,7 +309,9 @@ public class TaskConfig {
String[] vals = csv.split(",");
List<Integer> l = new ArrayList<Integer>();
for (String v : vals) {
- l.add(Integer.parseInt(v));
+ if (v != null && !v.isEmpty()) {
+ l.add(Integer.parseInt(v));
+ }
}
return l;
http://git-wip-us.apache.org/repos/asf/helix/blob/080a15ff/helix-provisioning/src/main/java/org/apache/helix/provisioning/tools/TaskManager.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/tools/TaskManager.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/tools/TaskManager.java
new file mode 100644
index 0000000..2a80841
--- /dev/null
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/tools/TaskManager.java
@@ -0,0 +1,152 @@
+package org.apache.helix.provisioning.tools;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import org.I0Itec.zkclient.DataUpdater;
+import org.apache.helix.AccessOption;
+import org.apache.helix.ClusterMessagingService;
+import org.apache.helix.HelixConnection;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixRole;
+import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.Id;
+import org.apache.helix.manager.zk.HelixConnectionAdaptor;
+import org.apache.helix.task.TaskConfig;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.Workflow;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Maps;
+
+public class TaskManager {
+ private static final Logger LOG = Logger.getLogger(TaskManager.class);
+
+ private final ClusterId _clusterId;
+ private final HelixConnection _connection;
+ private final TaskDriver _driver;
+
+ public TaskManager(final ClusterId clusterId, final HelixConnection connection) {
+ HelixRole dummyRole = new HelixRole() {
+ @Override
+ public HelixConnection getConnection() {
+ return connection;
+ }
+
+ @Override
+ public ClusterId getClusterId() {
+ return clusterId;
+ }
+
+ @Override
+ public Id getId() {
+ return clusterId;
+ }
+
+ @Override
+ public InstanceType getType() {
+ return InstanceType.ADMINISTRATOR;
+ }
+
+ @Override
+ public ClusterMessagingService getMessagingService() {
+ return null;
+ }
+ };
+ _driver = new TaskDriver(new HelixConnectionAdaptor(dummyRole));
+ _clusterId = clusterId;
+ _connection = connection;
+ }
+
+ public boolean createTaskQueue(String queueName, boolean isParallel) {
+ Workflow.Builder builder = new Workflow.Builder(queueName);
+ builder.addConfig(queueName, TaskConfig.COMMAND, queueName);
+ builder.addConfig(queueName, TaskConfig.TARGET_PARTITIONS, "");
+ builder.addConfig(queueName, TaskConfig.COMMAND_CONFIG, "");
+ builder.addConfig(queueName, TaskConfig.LONG_LIVED + "", String.valueOf(true));
+ if (isParallel) {
+ builder
+ .addConfig(queueName, TaskConfig.NUM_CONCURRENT_TASKS_PER_INSTANCE, String.valueOf(10));
+ }
+ Workflow workflow = builder.build();
+ try {
+ _driver.start(workflow);
+ } catch (Exception e) {
+ LOG.error("Failed to start queue " + queueName, e);
+ return false;
+ }
+ return true;
+ }
+
+ public void addTaskToQueue(final String taskName, final String queueName) {
+ HelixDataAccessor accessor = _connection.createDataAccessor(_clusterId);
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ String configPath = keyBuilder.resourceConfig(queueName + "_" + queueName).getPath();
+ DataUpdater<ZNRecord> dataUpdater = new DataUpdater<ZNRecord>() {
+ @Override
+ public ZNRecord update(ZNRecord currentData) {
+ // Update the partition integers to add one to the end, and have that integer map to the
+ // task name
+ String current = currentData.getSimpleField(TaskConfig.TARGET_PARTITIONS);
+ int currentId = 0;
+ if (current == null || current.isEmpty()) {
+ currentData.setSimpleField(TaskConfig.TARGET_PARTITIONS, String.valueOf(currentId));
+ } else {
+ String[] parts = current.split(",");
+ currentId = parts.length;
+ currentData.setSimpleField(TaskConfig.TARGET_PARTITIONS, current + "," + currentId);
+ }
+ Map<String, String> partitionMap = currentData.getMapField(TaskConfig.PARTITION_NAME_MAP);
+ if (partitionMap == null) {
+ partitionMap = Maps.newHashMap();
+ currentData.setMapField(TaskConfig.PARTITION_NAME_MAP, partitionMap);
+ }
+ partitionMap.put(String.valueOf(currentId), taskName);
+ return currentData;
+ }
+ };
+ List<DataUpdater<ZNRecord>> dataUpdaters = new ArrayList<DataUpdater<ZNRecord>>();
+ dataUpdaters.add(dataUpdater);
+ accessor.updateChildren(Arrays.asList(configPath), dataUpdaters, AccessOption.PERSISTENT);
+
+ // Update the ideal state to trigger a change event
+ DataUpdater<ZNRecord> noOpUpdater = new DataUpdater<ZNRecord>() {
+ @Override
+ public ZNRecord update(ZNRecord currentData) {
+ return currentData;
+ }
+ };
+ String idealStatePath = keyBuilder.idealStates(queueName + "_" + queueName).getPath();
+ dataUpdaters.clear();
+ dataUpdaters.add(noOpUpdater);
+ accessor.updateChildren(Arrays.asList(idealStatePath), dataUpdaters, AccessOption.PERSISTENT);
+ }
+
+ public void shutdownQueue(String queueName) {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/helix/blob/080a15ff/helix-provisioning/src/test/java/org/apache/helix/provisioning/tools/TestTaskManager.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/test/java/org/apache/helix/provisioning/tools/TestTaskManager.java b/helix-provisioning/src/test/java/org/apache/helix/provisioning/tools/TestTaskManager.java
new file mode 100644
index 0000000..7016661
--- /dev/null
+++ b/helix-provisioning/src/test/java/org/apache/helix/provisioning/tools/TestTaskManager.java
@@ -0,0 +1,115 @@
+package org.apache.helix.provisioning.tools;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.HelixConnection;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.integration.TestHelixConnection;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZkHelixConnection;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskFactory;
+import org.apache.helix.task.TaskResult;
+import org.apache.helix.task.TaskStateModelFactory;
+import org.testng.annotations.Test;
+
+public class TestTaskManager extends ZkUnitTestBase {
+ @Test
+ public void testBasic() throws Exception {
+ final int NUM_PARTICIPANTS = 3;
+ final int NUM_PARTITIONS = 1;
+ final int NUM_REPLICAS = 1;
+
+ String className = TestHelper.getTestClassName();
+ String methodName = TestHelper.getTestMethodName();
+ String clusterName = className + "_" + methodName;
+
+ // Set up cluster
+ TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+ "localhost", // participant name prefix
+ "TestService", // resource name prefix
+ 1, // resources
+ NUM_PARTITIONS, // partitions per resource
+ NUM_PARTICIPANTS, // number of nodes
+ NUM_REPLICAS, // replicas
+ "StatelessService", RebalanceMode.FULL_AUTO, // just get everything up
+ true); // do rebalance
+
+ Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
+ taskFactoryReg.put("myqueue", new TaskFactory() {
+ @Override
+ public Task createNewTask(String config) {
+ return new MyTask();
+ }
+ });
+ MockParticipantManager[] participants = new MockParticipantManager[NUM_PARTICIPANTS];
+ for (int i = 0; i < participants.length; i++) {
+ String instanceName = "localhost_" + (12918 + i);
+ participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+ participants[i].getStateMachineEngine()
+ .registerStateModelFactory(StateModelDefId.from("StatelessService"),
+ new TestHelixConnection.MockStateModelFactory());
+ participants[i].getStateMachineEngine().registerStateModelFactory(
+ StateModelDefId.from("Task"), new TaskStateModelFactory(participants[i], taskFactoryReg));
+ participants[i].syncStart();
+ }
+
+ ClusterControllerManager controller =
+ new ClusterControllerManager(ZK_ADDR, clusterName, "controller_1");
+ controller.syncStart();
+
+ HelixConnection connection = new ZkHelixConnection(ZK_ADDR);
+ connection.connect();
+ ClusterId clusterId = ClusterId.from(clusterName);
+ TaskManager taskManager = new TaskManager(clusterId, connection);
+ taskManager.createTaskQueue("myqueue", true);
+ taskManager.addTaskToQueue("mytask", "myqueue");
+ taskManager.addTaskToQueue("mytask2", "myqueue");
+
+ controller.syncStop();
+ for (MockParticipantManager participant : participants) {
+ participant.syncStop();
+ }
+ }
+
+ public static class MyTask implements Task {
+ @Override
+ public TaskResult run() {
+ try {
+ Thread.sleep(10000);
+ } catch (InterruptedException e) {
+ }
+ System.err.println("task complete");
+ return new TaskResult(TaskResult.Status.COMPLETED, "");
+ }
+
+ @Override
+ public void cancel() {
+ }
+ }
+}