You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by lx...@apache.org on 2016/08/17 04:27:29 UTC

[33/33] helix git commit: Allow an instance group tag to be configured for a job, so all tasks of the job can only be running on the instances containing the tag.

Allow an instance group tag to be configured for a job, so all tasks of the job can only be running on the instances containing the tag.

1. Add instance group tag for jobs.
2. Add a test for job assignment when the only instance can be assigned instance is disabled.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/947a7d55
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/947a7d55
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/947a7d55

Branch: refs/heads/helix-0.6.x
Commit: 947a7d55756bdb4f50a3b0b358fb0364542df723
Parents: 6455b8b
Author: Junkai Xue <jx...@linkedin.com>
Authored: Mon Jun 6 13:19:53 2016 -0700
Committer: Lei Xia <lx...@linkedin.com>
Committed: Wed Jul 6 13:18:53 2016 -0700

----------------------------------------------------------------------
 .../controller/stages/ClusterDataCache.java     | 23 ++++++++++++
 .../task/GenericTaskAssignmentCalculator.java   |  4 ++
 .../java/org/apache/helix/task/JobConfig.java   | 39 +++++++++++++++++---
 .../org/apache/helix/task/JobRebalancer.java    | 14 +++++--
 .../apache/helix/task/WorkflowRebalancer.java   |  4 ++
 .../org/apache/helix/task/beans/JobBean.java    |  1 +
 .../integration/task/TestTaskAssignment.java    | 37 +++++++++++++++++++
 7 files changed, 113 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/947a7d55/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index fde4959..b77ce0d 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -41,6 +41,7 @@ import org.apache.helix.model.LiveInstance;
 import org.apache.helix.model.Message;
 import org.apache.helix.model.ResourceConfig;
 import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.task.JobConfig;
 import org.apache.log4j.Logger;
 
 import com.google.common.collect.Lists;
@@ -430,6 +431,28 @@ public class ClusterDataCache {
   }
 
   /**
+   * Return all the nodes that are enabled and tagged same as the job.
+   * @param allInstances List of instances to filter with instance tag
+   * @param instanceTag The instance group tag
+   * @return A new set contains instance name and that are marked enabled and have same
+   *         tag with job. The original set will not be changed during the filtering
+   */
+  public Set<String> getAllEnabledInstanceWithTag(final Set<String> allInstances,
+      String instanceTag) {
+    Set<String> enabledTagInstances = new HashSet<String>();
+    for (String instance : allInstances) {
+      InstanceConfig instanceConfig = _instanceConfigMap.get(instance);
+
+      if (instanceConfig != null && instanceConfig.getInstanceEnabled() && instanceConfig
+          .containsTag(instanceTag)) {
+        enabledTagInstances.add(instance);
+      }
+    }
+
+    return enabledTagInstances;
+  }
+
+  /**
    * Indicate that a full read should be done on the next refresh
    */
   public synchronized void requireFullRefresh() {

http://git-wip-us.apache.org/repos/asf/helix/blob/947a7d55/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java b/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
index e8d5f5d..b0a1a33 100644
--- a/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
+++ b/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
@@ -34,8 +34,10 @@ import org.apache.helix.controller.stages.ClusterDataCache;
 import org.apache.helix.controller.stages.CurrentStateOutput;
 import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
 import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.Partition;
 import org.apache.helix.model.ResourceAssignment;
+import org.apache.log4j.Logger;
 
 import com.google.common.base.Function;
 import com.google.common.collect.BiMap;
@@ -49,6 +51,8 @@ import com.google.common.collect.Sets;
  * assignment to target partitions and states of another resource
  */
 public class GenericTaskAssignmentCalculator extends TaskAssignmentCalculator {
+  private static final Logger LOG = Logger.getLogger(GenericTaskAssignmentCalculator.class);
+
   /** Reassignment policy for this algorithm */
   private RetryPolicy _retryPolicy = new DefaultRetryReassigner();
 

http://git-wip-us.apache.org/repos/asf/helix/blob/947a7d55/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
index d26c83b..7a4e2d3 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
@@ -117,7 +117,12 @@ public class JobConfig {
     /**
      * The type of the job
      */
-    JobType
+    JobType,
+
+    /**
+     * The instance group that task assign to
+     */
+    InstanceGroupTag
   }
 
   //Default property values
@@ -133,6 +138,7 @@ public class JobConfig {
   private final String _workflow;
   private final String _targetResource;
   private final String _jobType;
+  private final String _instanceGroupTag;
   private final List<String> _targetPartitions;
   private final Set<String> _targetPartitionStates;
   private final String _command;
@@ -152,7 +158,7 @@ public class JobConfig {
       long timeoutPerTask, int numConcurrentTasksPerInstance, int maxAttemptsPerTask,
       int maxForcedReassignmentsPerTask, int failureThreshold, long retryDelay,
       boolean disableExternalView, boolean ignoreDependentJobFailure,
-      Map<String, TaskConfig> taskConfigMap, String jobType) {
+      Map<String, TaskConfig> taskConfigMap, String jobType, String instanceGroupTag) {
     _workflow = workflow;
     _targetResource = targetResource;
     _targetPartitions = targetPartitions;
@@ -173,6 +179,7 @@ public class JobConfig {
       _taskConfigMap = Collections.emptyMap();
     }
     _jobType = jobType;
+    _instanceGroupTag = instanceGroupTag;
   }
 
   public String getWorkflow() {
@@ -274,9 +281,12 @@ public class JobConfig {
         "" + _numConcurrentTasksPerInstance);
     cfgMap.put(JobConfigProperty.IgnoreDependentJobFailure.name(),
         Boolean.toString(_ignoreDependentJobFailure));
-   if (_jobType != null) {
-     cfgMap.put(JobConfigProperty.JobType.name(), _jobType);
-   }
+    if (_jobType != null) {
+      cfgMap.put(JobConfigProperty.JobType.name(), _jobType);
+    }
+    if (_instanceGroupTag != null) {
+      cfgMap.put(JobConfigProperty.InstanceGroupTag.name(), _instanceGroupTag);
+    }
     return cfgMap;
   }
 
@@ -284,6 +294,10 @@ public class JobConfig {
     return _jobType;
   }
 
+  public String getInstanceGroupTag() {
+    return _instanceGroupTag;
+  }
+
   public static JobConfig fromHelixProperty(HelixProperty property)
       throws IllegalArgumentException {
     Map<String, String> configs = property.getRecord().getSimpleFields();
@@ -297,6 +311,7 @@ public class JobConfig {
     private String _workflow;
     private String _targetResource;
     private String _jobType;
+    private String _instanceGroupTag;
     private List<String> _targetPartitions;
     private Set<String> _targetPartitionStates;
     private String _command;
@@ -317,7 +332,8 @@ public class JobConfig {
       return new JobConfig(_workflow, _targetResource, _targetPartitions, _targetPartitionStates,
           _command, _commandConfig, _timeoutPerTask, _numConcurrentTasksPerInstance,
           _maxAttemptsPerTask, _maxForcedReassignmentsPerTask, _failureThreshold, _retryDelay,
-          _disableExternalView, _ignoreDependentJobFailure, _taskConfigMap, _jobType);
+          _disableExternalView, _ignoreDependentJobFailure, _taskConfigMap, _jobType,
+          _instanceGroupTag);
     }
 
     /**
@@ -382,6 +398,9 @@ public class JobConfig {
       if (cfg.containsKey(JobConfigProperty.JobType.name())) {
         b.setJobType(cfg.get(JobConfigProperty.JobType.name()));
       }
+      if (cfg.containsKey(JobConfigProperty.InstanceGroupTag.name())) {
+        b.setInstanceGroupTag(cfg.get(JobConfigProperty.InstanceGroupTag.name()));
+      }
       return b;
     }
 
@@ -474,6 +493,11 @@ public class JobConfig {
       return this;
     }
 
+    public Builder setInstanceGroupTag(String instanceGroupTag) {
+      _instanceGroupTag = instanceGroupTag;
+      return this;
+    }
+
     private void validate() {
       if (_taskConfigMap.isEmpty() && _targetResource == null) {
         throw new IllegalArgumentException(
@@ -555,6 +579,9 @@ public class JobConfig {
       if (jobBean.jobType != null) {
         b.setJobType(jobBean.jobType);
       }
+      if (jobBean.instanceGroupTag != null) {
+        b.setInstanceGroupTag(jobBean.instanceGroupTag);
+      }
       return b;
     }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/947a7d55/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
index fae7ac7..c181ba5 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
@@ -128,11 +128,19 @@ public class JobRebalancer extends TaskRebalancer {
     // is stored in zk.
     // Fetch the previous resource assignment from the property store. This is required because of
     // HELIX-230.
+    Set<String> liveInstances = jobCfg.getInstanceGroupTag() == null
+        ? clusterData.getLiveInstances().keySet()
+        : clusterData.getAllEnabledInstanceWithTag(clusterData.getLiveInstances().keySet(),
+            jobCfg.getInstanceGroupTag());
+
+    if (liveInstances.isEmpty()) {
+      LOG.error("No available instance found for job!");
+    }
+
     Set<Integer> partitionsToDrop = new TreeSet<Integer>();
     ResourceAssignment newAssignment =
-        computeResourceMapping(jobName, workflowCfg, jobCfg, prevAssignment, clusterData
-            .getLiveInstances().keySet(), currStateOutput, workflowCtx, jobCtx, partitionsToDrop,
-            clusterData);
+        computeResourceMapping(jobName, workflowCfg, jobCfg, prevAssignment, liveInstances,
+            currStateOutput, workflowCtx, jobCtx, partitionsToDrop, clusterData);
 
     if (!partitionsToDrop.isEmpty()) {
       for (Integer pId : partitionsToDrop) {

http://git-wip-us.apache.org/repos/asf/helix/blob/947a7d55/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
index 2d4ca75..b4f25d5 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
@@ -208,6 +208,10 @@ public class WorkflowRebalancer extends TaskRebalancer {
     builder.setNumPartitions(numPartitions);
     builder.setStateModel(TaskConstants.STATE_MODEL_NAME);
 
+    if (jobConfig.getInstanceGroupTag() != null) {
+      builder.setNodeGroup(jobConfig.getInstanceGroupTag());
+    }
+
     if (jobConfig.isDisableExternalView()) {
       builder.setDisableExternalView(true);
     }

http://git-wip-us.apache.org/repos/asf/helix/blob/947a7d55/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java b/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
index 0080cc6..dd7ebab 100644
--- a/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
+++ b/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
@@ -32,6 +32,7 @@ public class JobBean {
   public List<String> parents;
   public String targetResource;
   public String jobType;
+  public String instanceGroupTag;
   public List<String> targetPartitionStates;
   public List<String> targetPartitions;
   public String command;

http://git-wip-us.apache.org/repos/asf/helix/blob/947a7d55/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskAssignment.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskAssignment.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskAssignment.java
index df976b1..a22b63d 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskAssignment.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskAssignment.java
@@ -19,8 +19,14 @@ package org.apache.helix.integration.task;
  * under the License.
  */
 
+import java.util.ArrayList;
+import java.util.List;
+
 import org.apache.helix.TestHelper;
 import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobQueue;
+import org.apache.helix.task.TaskConfig;
+import org.apache.helix.task.TaskState;
 import org.apache.helix.task.TaskUtil;
 import org.apache.helix.task.Workflow;
 import org.testng.Assert;
@@ -57,6 +63,37 @@ public class TestTaskAssignment extends TaskTestBase {
     // The task is not assigned so the task state should be null in this case.
     Assert.assertNull(
         _driver.getJobContext(TaskUtil.getNamespacedJobName(jobResource)).getPartitionState(0));
+  }
+
+  @Test
+  public void testGenericTaskInstanceGroup() throws InterruptedException {
+    // Disable the only instance can be assigned.
+    String queueName = TestHelper.getTestMethodName();
+    String jobName = "Job4InstanceGroup";
+    JobQueue.Builder queueBuilder = TaskTestUtil.buildJobQueue(queueName);
+    JobConfig.Builder jobConfig = new JobConfig.Builder();
+
+    List<TaskConfig> taskConfigs = new ArrayList<TaskConfig>();
+    int num_tasks = 3;
+    for (int j = 0; j < num_tasks; j++) {
+      taskConfigs.add(
+          new TaskConfig.Builder().setTaskId("task_" + j).setCommand(MockTask.TASK_COMMAND)
+              .build());
+    }
+
+    jobConfig.addTaskConfigs(taskConfigs);
+    jobConfig.setInstanceGroupTag("TESTTAG1");
+
+    queueBuilder.enqueueJob(jobName, jobConfig);
+    _driver.start(queueBuilder.build());
+
+    // Wait 1 sec. The task should not be complete since it is not assigned.
+    Thread.sleep(1000L);
+
+    // The task is not assigned so the task state should be null in this case.
+    String namedSpaceJob = TaskUtil.getNamespacedJobName(queueName, jobName);
 
+    Assert.assertEquals(_driver.getJobContext(namedSpaceJob).getAssignedParticipant(0),
+        _participants[1].getInstanceName());
   }
 }