You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by ki...@apache.org on 2015/07/28 23:42:41 UTC

[2/3] helix git commit: [HELIX-601] Allow work flow to schedule dependency jobs in parallel

[HELIX-601] Allow work flow to schedule dependency jobs in parallel

Currently, Helix won't schedule dependency jobs in a same work flow. For example, if Job2 depends on Job1, Job2 won't be scheduled until every partition of Job1 is completed.
However, if some participant is very slow, then all dependency jobs is waiting for that single participant.
Helix should be able to schedule multiple jobs according to a parameter.
A.C.
1. Introduce parallel count parameter in work flow and job queue.
2. Dependency jobs can be scheduled according to the parameter (Now the parameter is always 1, so no parallel)
3. If Job2 depends on Job1, Job1 is scheduled before Job2.
4. No parallel jobs on the same instance. If a instance is running Job1, it won't run Job2 until Job1 is finished.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/1afbcbd8
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/1afbcbd8
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/1afbcbd8

Branch: refs/heads/helix-0.6.x
Commit: 1afbcbd8f899e3e8baf616fa1a7fbf7d534ce98c
Parents: 8819220
Author: Congrui Ji <cj...@linkedin.com>
Authored: Fri Jun 19 11:51:19 2015 -0700
Committer: Congrui Ji <cj...@linkedin.com>
Committed: Mon Jun 22 10:51:57 2015 -0700

----------------------------------------------------------------------
 .../org/apache/helix/task/TaskRebalancer.java    | 12 ++++++------
 .../task/TestTaskRebalancerParallel.java         | 19 +++++++++++++++++++
 2 files changed, 25 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/1afbcbd8/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
index 963247d..d9068cf 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java
@@ -91,7 +91,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
    * @param prevAssignment the previous task partition assignment
    * @param instances the instances
    * @param jobCfg the task configuration
-   * @param taskCtx the task context
+   * @param jobContext the task context
    * @param workflowCfg the workflow configuration
    * @param workflowCtx the workflow context
    * @param partitionSet the partitions to assign
@@ -135,18 +135,18 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
     }
 
     // check ancestor job status
-    int unStartCount = 0;
+    int notStartedCount = 0;
     int inCompleteCount = 0;
     for (String ancestor : workflowCfg.getJobDag().getAncestors(resourceName)) {
       TaskState jobState = workflowCtx.getJobState(ancestor);
       if (jobState == null || jobState == TaskState.NOT_STARTED) {
-        ++unStartCount;
+        ++notStartedCount;
       } else if (jobState == TaskState.IN_PROGRESS || jobState == TaskState.STOPPED) {
         ++inCompleteCount;
       }
     }
 
-    if (unStartCount > 0 || inCompleteCount >= workflowCfg.getParallelJobs()) {
+    if (notStartedCount > 0 || inCompleteCount >= workflowCfg.getParallelJobs()) {
       return emptyAssignment(resourceName, currStateOutput);
     }
 
@@ -227,7 +227,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
     return newAssignment;
   }
 
-  private Set<String> getWorkflowAssignedInstances(String currentJobName,
+  private Set<String> getInstancesAssignedToOtherJobs(String currentJobName,
       WorkflowConfig workflowCfg) {
 
     Set<String> ret = new HashSet<String>();
@@ -282,7 +282,7 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator {
     // Keeps a mapping of (partition) -> (instance, state)
     Map<Integer, PartitionAssignment> paMap = new TreeMap<Integer, PartitionAssignment>();
 
-    Set<String> excludedInstances = getWorkflowAssignedInstances(jobResource, workflowConfig);
+    Set<String> excludedInstances = getInstancesAssignedToOtherJobs(jobResource, workflowConfig);
 
     // Process all the current assignments of tasks.
     Set<Integer> allPartitions =

http://git-wip-us.apache.org/repos/asf/helix/blob/1afbcbd8/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java
index 368ef9f..f6fc53a 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskRebalancerParallel.java
@@ -1,5 +1,24 @@
 package org.apache.helix.integration.task;
 
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;