You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ka...@apache.org on 2014/03/04 03:28:33 UTC

git commit: Start changing the task rebalancer to work at a finer granularity

Repository: helix
Updated Branches:
  refs/heads/helix-provisioning 1bc9354d5 -> 080a15ff9


Start changing the task rebalancer to work at a finer granularity


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/080a15ff
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/080a15ff
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/080a15ff

Branch: refs/heads/helix-provisioning
Commit: 080a15ff9241f1a7d2c218447ad2569d62ca120d
Parents: 1bc9354
Author: Kanak Biscuitwala <ka...@apache.org>
Authored: Mon Mar 3 18:28:20 2014 -0800
Committer: Kanak Biscuitwala <ka...@apache.org>
Committed: Mon Mar 3 18:28:20 2014 -0800

----------------------------------------------------------------------
 .../helix/task/AbstractTaskRebalancer.java      |   7 +-
 .../helix/task/IndependentTaskRebalancer.java   |  32 +++-
 .../java/org/apache/helix/task/TaskConfig.java  |  35 ++++-
 .../helix/provisioning/tools/TaskManager.java   | 152 +++++++++++++++++++
 .../provisioning/tools/TestTaskManager.java     | 115 ++++++++++++++
 5 files changed, 331 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/080a15ff/helix-core/src/main/java/org/apache/helix/task/AbstractTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskRebalancer.java
index 9a9538c..329d02f 100644
--- a/helix-core/src/main/java/org/apache/helix/task/AbstractTaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/AbstractTaskRebalancer.java
@@ -357,7 +357,9 @@ public abstract class AbstractTaskRebalancer implements HelixRebalancer {
     }
 
     if (isTaskComplete(taskCtx, allPartitions)) {
-      workflowCtx.setTaskState(taskResource, TaskState.COMPLETED);
+      if (!taskCfg.isLongLived()) {
+        workflowCtx.setTaskState(taskResource, TaskState.COMPLETED);
+      }
       if (isWorkflowComplete(workflowCtx, workflowConfig)) {
         workflowCtx.setWorkflowState(TaskState.COMPLETED);
         workflowCtx.setFinishTime(System.currentTimeMillis());
@@ -553,6 +555,9 @@ public abstract class AbstractTaskRebalancer implements HelixRebalancer {
   private static List<Integer> getNextPartitions(SortedSet<Integer> candidatePartitions,
       Set<Integer> excluded, int n) {
     List<Integer> result = new ArrayList<Integer>(n);
+    if (candidatePartitions == null || candidatePartitions.isEmpty()) {
+      return result;
+    }
     for (Integer pId : candidatePartitions) {
       if (result.size() >= n) {
         break;

http://git-wip-us.apache.org/repos/asf/helix/blob/080a15ff/helix-core/src/main/java/org/apache/helix/task/IndependentTaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/IndependentTaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/IndependentTaskRebalancer.java
index 80ec23c..2bc4081 100644
--- a/helix-core/src/main/java/org/apache/helix/task/IndependentTaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/IndependentTaskRebalancer.java
@@ -42,6 +42,7 @@ import org.apache.helix.model.ResourceAssignment;
 import com.google.common.base.Function;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
 
 /**
  * A task rebalancer that evenly assigns tasks to nodes
@@ -63,11 +64,25 @@ public class IndependentTaskRebalancer extends AbstractTaskRebalancer {
   @Override
   public Map<String, SortedSet<Integer>> getTaskAssignment(ResourceCurrentState currStateOutput,
       ResourceAssignment prevAssignment, Iterable<ParticipantId> instanceList, TaskConfig taskCfg,
-      TaskContext taskContext, WorkflowConfig workflowCfg, WorkflowContext workflowCtx,
+      final TaskContext taskContext, WorkflowConfig workflowCfg, WorkflowContext workflowCtx,
       Set<Integer> partitionSet, Cluster cluster) {
     // Gather input to the full auto rebalancing algorithm
     LinkedHashMap<State, Integer> states = new LinkedHashMap<State, Integer>();
     states.put(State.from("ONLINE"), 1);
+
+    // Only map partitions whose assignment we care about
+    final Set<TaskPartitionState> honoredStates =
+        Sets.newHashSet(TaskPartitionState.INIT, TaskPartitionState.RUNNING,
+            TaskPartitionState.STOPPED);
+    Set<Integer> filteredPartitionSet = Sets.newHashSet();
+    for (Integer p : partitionSet) {
+      TaskPartitionState state = (taskContext == null) ? null : taskContext.getPartitionState(p);
+      if (state == null || honoredStates.contains(state)) {
+        filteredPartitionSet.add(p);
+      }
+    }
+
+    // Transform from partition id to fully qualified partition name
     List<Integer> partitionNums = Lists.newArrayList(partitionSet);
     Collections.sort(partitionNums);
     final ResourceId resourceId = prevAssignment.getResourceId();
@@ -79,10 +94,21 @@ public class IndependentTaskRebalancer extends AbstractTaskRebalancer {
                 return PartitionId.from(resourceId, partitionNum.toString());
               }
             }));
+
+    // Compute the current assignment
     Map<PartitionId, Map<ParticipantId, State>> currentMapping = Maps.newHashMap();
     for (PartitionId partitionId : currStateOutput.getCurrentStateMappedPartitions(resourceId)) {
-      currentMapping.put(partitionId, currStateOutput.getCurrentStateMap(resourceId, partitionId));
-      currentMapping.put(partitionId, currStateOutput.getPendingStateMap(resourceId, partitionId));
+      if (!filteredPartitionSet.contains(pId(partitionId.toString()))) {
+        // not computing old partitions
+        continue;
+      }
+      Map<ParticipantId, State> allPreviousDecisionMap = Maps.newHashMap();
+      if (prevAssignment != null) {
+        allPreviousDecisionMap.putAll(prevAssignment.getReplicaMap(partitionId));
+      }
+      allPreviousDecisionMap.putAll(currStateOutput.getCurrentStateMap(resourceId, partitionId));
+      allPreviousDecisionMap.putAll(currStateOutput.getPendingStateMap(resourceId, partitionId));
+      currentMapping.put(partitionId, allPreviousDecisionMap);
     }
 
     // Get the assignment keyed on partition

http://git-wip-us.apache.org/repos/asf/helix/blob/080a15ff/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java b/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
index 2834e85..be9db79 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskConfig.java
@@ -62,6 +62,10 @@ public class TaskConfig {
   public static final String MAX_ATTEMPTS_PER_PARTITION = "MaxAttemptsPerPartition";
   /** The number of concurrent tasks that are allowed to run on an instance. */
   public static final String NUM_CONCURRENT_TASKS_PER_INSTANCE = "ConcurrentTasksPerInstance";
+  /** Support overarching tasks that hang around for a while */
+  public static final String LONG_LIVED = "LongLived";
+  /** Support giving tasks a custom name **/
+  public static final String PARTITION_NAME_MAP = "PartitionNameMap";
 
   // // Default property values ////
 
@@ -78,10 +82,12 @@ public class TaskConfig {
   private final long _timeoutPerPartition;
   private final int _numConcurrentTasksPerInstance;
   private final int _maxAttemptsPerPartition;
+  private final boolean _longLived;
 
   private TaskConfig(String workflow, String targetResource, List<Integer> targetPartitions,
       Set<String> targetPartitionStates, String command, String commandConfig,
-      long timeoutPerPartition, int numConcurrentTasksPerInstance, int maxAttemptsPerPartition) {
+      long timeoutPerPartition, int numConcurrentTasksPerInstance, int maxAttemptsPerPartition,
+      boolean longLived) {
     _workflow = workflow;
     _targetResource = targetResource;
     _targetPartitions = targetPartitions;
@@ -91,6 +97,7 @@ public class TaskConfig {
     _timeoutPerPartition = timeoutPerPartition;
     _numConcurrentTasksPerInstance = numConcurrentTasksPerInstance;
     _maxAttemptsPerPartition = maxAttemptsPerPartition;
+    _longLived = longLived;
   }
 
   public String getWorkflow() {
@@ -129,6 +136,10 @@ public class TaskConfig {
     return _maxAttemptsPerPartition;
   }
 
+  public boolean isLongLived() {
+    return _longLived;
+  }
+
   public Map<String, String> getResourceConfigMap() {
     Map<String, String> cfgMap = new HashMap<String, String>();
     cfgMap.put(TaskConfig.WORKFLOW_ID, _workflow);
@@ -143,7 +154,9 @@ public class TaskConfig {
     }
     cfgMap.put(TaskConfig.TIMEOUT_PER_PARTITION, "" + _timeoutPerPartition);
     cfgMap.put(TaskConfig.MAX_ATTEMPTS_PER_PARTITION, "" + _maxAttemptsPerPartition);
-
+    cfgMap.put(TaskConfig.LONG_LIVED + "", String.valueOf(_longLived));
+    cfgMap.put(TaskConfig.NUM_CONCURRENT_TASKS_PER_INSTANCE + "",
+        String.valueOf(_numConcurrentTasksPerInstance));
     return cfgMap;
   }
 
@@ -160,13 +173,14 @@ public class TaskConfig {
     private long _timeoutPerPartition = DEFAULT_TIMEOUT_PER_PARTITION;
     private int _numConcurrentTasksPerInstance = DEFAULT_NUM_CONCURRENT_TASKS_PER_INSTANCE;
     private int _maxAttemptsPerPartition = DEFAULT_MAX_ATTEMPTS_PER_PARTITION;
+    private boolean _longLived = false;
 
     public TaskConfig build() {
       validate();
 
       return new TaskConfig(_workflow, _targetResource, _targetPartitions, _targetPartitionStates,
           _command, _commandConfig, _timeoutPerPartition, _numConcurrentTasksPerInstance,
-          _maxAttemptsPerPartition);
+          _maxAttemptsPerPartition, _longLived);
     }
 
     /**
@@ -205,7 +219,9 @@ public class TaskConfig {
       if (cfg.containsKey(MAX_ATTEMPTS_PER_PARTITION)) {
         b.setMaxAttemptsPerPartition(Integer.parseInt(cfg.get(MAX_ATTEMPTS_PER_PARTITION)));
       }
-
+      if (cfg.containsKey(LONG_LIVED)) {
+        b.setLongLived(Boolean.parseBoolean(cfg.get(LONG_LIVED)));
+      }
       return b;
     }
 
@@ -254,8 +270,13 @@ public class TaskConfig {
       return this;
     }
 
+    public Builder setLongLived(boolean isLongLived) {
+      _longLived = isLongLived;
+      return this;
+    }
+
     private void validate() {
-      if (_targetResource == null && (_targetPartitions == null || _targetPartitions.isEmpty())) {
+      if (_targetResource == null && _targetPartitions == null) {
         throw new IllegalArgumentException(String.format(
             "%s cannot be null without specified partitions", TARGET_RESOURCE));
       }
@@ -288,7 +309,9 @@ public class TaskConfig {
       String[] vals = csv.split(",");
       List<Integer> l = new ArrayList<Integer>();
       for (String v : vals) {
-        l.add(Integer.parseInt(v));
+        if (v != null && !v.isEmpty()) {
+          l.add(Integer.parseInt(v));
+        }
       }
 
       return l;

http://git-wip-us.apache.org/repos/asf/helix/blob/080a15ff/helix-provisioning/src/main/java/org/apache/helix/provisioning/tools/TaskManager.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/main/java/org/apache/helix/provisioning/tools/TaskManager.java b/helix-provisioning/src/main/java/org/apache/helix/provisioning/tools/TaskManager.java
new file mode 100644
index 0000000..2a80841
--- /dev/null
+++ b/helix-provisioning/src/main/java/org/apache/helix/provisioning/tools/TaskManager.java
@@ -0,0 +1,152 @@
+package org.apache.helix.provisioning.tools;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import org.I0Itec.zkclient.DataUpdater;
+import org.apache.helix.AccessOption;
+import org.apache.helix.ClusterMessagingService;
+import org.apache.helix.HelixConnection;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixRole;
+import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.ZNRecord;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.Id;
+import org.apache.helix.manager.zk.HelixConnectionAdaptor;
+import org.apache.helix.task.TaskConfig;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.Workflow;
+import org.apache.log4j.Logger;
+
+import com.google.common.collect.Maps;
+
+public class TaskManager {
+  private static final Logger LOG = Logger.getLogger(TaskManager.class);
+
+  private final ClusterId _clusterId;
+  private final HelixConnection _connection;
+  private final TaskDriver _driver;
+
+  public TaskManager(final ClusterId clusterId, final HelixConnection connection) {
+    HelixRole dummyRole = new HelixRole() {
+      @Override
+      public HelixConnection getConnection() {
+        return connection;
+      }
+
+      @Override
+      public ClusterId getClusterId() {
+        return clusterId;
+      }
+
+      @Override
+      public Id getId() {
+        return clusterId;
+      }
+
+      @Override
+      public InstanceType getType() {
+        return InstanceType.ADMINISTRATOR;
+      }
+
+      @Override
+      public ClusterMessagingService getMessagingService() {
+        return null;
+      }
+    };
+    _driver = new TaskDriver(new HelixConnectionAdaptor(dummyRole));
+    _clusterId = clusterId;
+    _connection = connection;
+  }
+
+  public boolean createTaskQueue(String queueName, boolean isParallel) {
+    Workflow.Builder builder = new Workflow.Builder(queueName);
+    builder.addConfig(queueName, TaskConfig.COMMAND, queueName);
+    builder.addConfig(queueName, TaskConfig.TARGET_PARTITIONS, "");
+    builder.addConfig(queueName, TaskConfig.COMMAND_CONFIG, "");
+    builder.addConfig(queueName, TaskConfig.LONG_LIVED + "", String.valueOf(true));
+    if (isParallel) {
+      builder
+          .addConfig(queueName, TaskConfig.NUM_CONCURRENT_TASKS_PER_INSTANCE, String.valueOf(10));
+    }
+    Workflow workflow = builder.build();
+    try {
+      _driver.start(workflow);
+    } catch (Exception e) {
+      LOG.error("Failed to start queue " + queueName, e);
+      return false;
+    }
+    return true;
+  }
+
+  public void addTaskToQueue(final String taskName, final String queueName) {
+    HelixDataAccessor accessor = _connection.createDataAccessor(_clusterId);
+    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+    String configPath = keyBuilder.resourceConfig(queueName + "_" + queueName).getPath();
+    DataUpdater<ZNRecord> dataUpdater = new DataUpdater<ZNRecord>() {
+      @Override
+      public ZNRecord update(ZNRecord currentData) {
+        // Update the partition integers to add one to the end, and have that integer map to the
+        // task name
+        String current = currentData.getSimpleField(TaskConfig.TARGET_PARTITIONS);
+        int currentId = 0;
+        if (current == null || current.isEmpty()) {
+          currentData.setSimpleField(TaskConfig.TARGET_PARTITIONS, String.valueOf(currentId));
+        } else {
+          String[] parts = current.split(",");
+          currentId = parts.length;
+          currentData.setSimpleField(TaskConfig.TARGET_PARTITIONS, current + "," + currentId);
+        }
+        Map<String, String> partitionMap = currentData.getMapField(TaskConfig.PARTITION_NAME_MAP);
+        if (partitionMap == null) {
+          partitionMap = Maps.newHashMap();
+          currentData.setMapField(TaskConfig.PARTITION_NAME_MAP, partitionMap);
+        }
+        partitionMap.put(String.valueOf(currentId), taskName);
+        return currentData;
+      }
+    };
+    List<DataUpdater<ZNRecord>> dataUpdaters = new ArrayList<DataUpdater<ZNRecord>>();
+    dataUpdaters.add(dataUpdater);
+    accessor.updateChildren(Arrays.asList(configPath), dataUpdaters, AccessOption.PERSISTENT);
+
+    // Update the ideal state to trigger a change event
+    DataUpdater<ZNRecord> noOpUpdater = new DataUpdater<ZNRecord>() {
+      @Override
+      public ZNRecord update(ZNRecord currentData) {
+        return currentData;
+      }
+    };
+    String idealStatePath = keyBuilder.idealStates(queueName + "_" + queueName).getPath();
+    dataUpdaters.clear();
+    dataUpdaters.add(noOpUpdater);
+    accessor.updateChildren(Arrays.asList(idealStatePath), dataUpdaters, AccessOption.PERSISTENT);
+  }
+
+  public void shutdownQueue(String queueName) {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/helix/blob/080a15ff/helix-provisioning/src/test/java/org/apache/helix/provisioning/tools/TestTaskManager.java
----------------------------------------------------------------------
diff --git a/helix-provisioning/src/test/java/org/apache/helix/provisioning/tools/TestTaskManager.java b/helix-provisioning/src/test/java/org/apache/helix/provisioning/tools/TestTaskManager.java
new file mode 100644
index 0000000..7016661
--- /dev/null
+++ b/helix-provisioning/src/test/java/org/apache/helix/provisioning/tools/TestTaskManager.java
@@ -0,0 +1,115 @@
+package org.apache.helix.provisioning.tools;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.helix.HelixConnection;
+import org.apache.helix.TestHelper;
+import org.apache.helix.ZkUnitTestBase;
+import org.apache.helix.api.id.ClusterId;
+import org.apache.helix.api.id.StateModelDefId;
+import org.apache.helix.integration.TestHelixConnection;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZkHelixConnection;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.task.Task;
+import org.apache.helix.task.TaskFactory;
+import org.apache.helix.task.TaskResult;
+import org.apache.helix.task.TaskStateModelFactory;
+import org.testng.annotations.Test;
+
+public class TestTaskManager extends ZkUnitTestBase {
+  @Test
+  public void testBasic() throws Exception {
+    final int NUM_PARTICIPANTS = 3;
+    final int NUM_PARTITIONS = 1;
+    final int NUM_REPLICAS = 1;
+
+    String className = TestHelper.getTestClassName();
+    String methodName = TestHelper.getTestMethodName();
+    String clusterName = className + "_" + methodName;
+
+    // Set up cluster
+    TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port
+        "localhost", // participant name prefix
+        "TestService", // resource name prefix
+        1, // resources
+        NUM_PARTITIONS, // partitions per resource
+        NUM_PARTICIPANTS, // number of nodes
+        NUM_REPLICAS, // replicas
+        "StatelessService", RebalanceMode.FULL_AUTO, // just get everything up
+        true); // do rebalance
+
+    Map<String, TaskFactory> taskFactoryReg = new HashMap<String, TaskFactory>();
+    taskFactoryReg.put("myqueue", new TaskFactory() {
+      @Override
+      public Task createNewTask(String config) {
+        return new MyTask();
+      }
+    });
+    MockParticipantManager[] participants = new MockParticipantManager[NUM_PARTICIPANTS];
+    for (int i = 0; i < participants.length; i++) {
+      String instanceName = "localhost_" + (12918 + i);
+      participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName);
+      participants[i].getStateMachineEngine()
+          .registerStateModelFactory(StateModelDefId.from("StatelessService"),
+              new TestHelixConnection.MockStateModelFactory());
+      participants[i].getStateMachineEngine().registerStateModelFactory(
+          StateModelDefId.from("Task"), new TaskStateModelFactory(participants[i], taskFactoryReg));
+      participants[i].syncStart();
+    }
+
+    ClusterControllerManager controller =
+        new ClusterControllerManager(ZK_ADDR, clusterName, "controller_1");
+    controller.syncStart();
+
+    HelixConnection connection = new ZkHelixConnection(ZK_ADDR);
+    connection.connect();
+    ClusterId clusterId = ClusterId.from(clusterName);
+    TaskManager taskManager = new TaskManager(clusterId, connection);
+    taskManager.createTaskQueue("myqueue", true);
+    taskManager.addTaskToQueue("mytask", "myqueue");
+    taskManager.addTaskToQueue("mytask2", "myqueue");
+
+    controller.syncStop();
+    for (MockParticipantManager participant : participants) {
+      participant.syncStop();
+    }
+  }
+
+  public static class MyTask implements Task {
+    @Override
+    public TaskResult run() {
+      try {
+        Thread.sleep(10000);
+      } catch (InterruptedException e) {
+      }
+      System.err.println("task complete");
+      return new TaskResult(TaskResult.Status.COMPLETED, "");
+    }
+
+    @Override
+    public void cancel() {
+    }
+  }
+}