You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by lx...@apache.org on 2016/08/17 04:27:29 UTC
[33/33] helix git commit: Allow an instance group tag to be
configured for a job,
so all tasks of the job can only be running on the instances containing the
tag.
Allow an instance group tag to be configured for a job, so all tasks of the job can only be running on the instances containing the tag.
1. Add instance group tag for jobs.
2. Add a test for job assignment when the only instance can be assigned instance is disabled.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/947a7d55
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/947a7d55
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/947a7d55
Branch: refs/heads/helix-0.6.x
Commit: 947a7d55756bdb4f50a3b0b358fb0364542df723
Parents: 6455b8b
Author: Junkai Xue <jx...@linkedin.com>
Authored: Mon Jun 6 13:19:53 2016 -0700
Committer: Lei Xia <lx...@linkedin.com>
Committed: Wed Jul 6 13:18:53 2016 -0700
----------------------------------------------------------------------
.../controller/stages/ClusterDataCache.java | 23 ++++++++++++
.../task/GenericTaskAssignmentCalculator.java | 4 ++
.../java/org/apache/helix/task/JobConfig.java | 39 +++++++++++++++++---
.../org/apache/helix/task/JobRebalancer.java | 14 +++++--
.../apache/helix/task/WorkflowRebalancer.java | 4 ++
.../org/apache/helix/task/beans/JobBean.java | 1 +
.../integration/task/TestTaskAssignment.java | 37 +++++++++++++++++++
7 files changed, 113 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/947a7d55/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
index fde4959..b77ce0d 100644
--- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
+++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java
@@ -41,6 +41,7 @@ import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Message;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.task.JobConfig;
import org.apache.log4j.Logger;
import com.google.common.collect.Lists;
@@ -430,6 +431,28 @@ public class ClusterDataCache {
}
/**
+ * Return all the nodes that are enabled and tagged same as the job.
+ * @param allInstances List of instances to filter with instance tag
+ * @param instanceTag The instance group tag
+ * @return A new set contains instance name and that are marked enabled and have same
+ * tag with job. The original set will not be changed during the filtering
+ */
+ public Set<String> getAllEnabledInstanceWithTag(final Set<String> allInstances,
+ String instanceTag) {
+ Set<String> enabledTagInstances = new HashSet<String>();
+ for (String instance : allInstances) {
+ InstanceConfig instanceConfig = _instanceConfigMap.get(instance);
+
+ if (instanceConfig != null && instanceConfig.getInstanceEnabled() && instanceConfig
+ .containsTag(instanceTag)) {
+ enabledTagInstances.add(instance);
+ }
+ }
+
+ return enabledTagInstances;
+ }
+
+ /**
* Indicate that a full read should be done on the next refresh
*/
public synchronized void requireFullRefresh() {
http://git-wip-us.apache.org/repos/asf/helix/blob/947a7d55/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java b/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
index e8d5f5d..b0a1a33 100644
--- a/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
+++ b/helix-core/src/main/java/org/apache/helix/task/GenericTaskAssignmentCalculator.java
@@ -34,8 +34,10 @@ import org.apache.helix.controller.stages.ClusterDataCache;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.controller.strategy.AutoRebalanceStrategy;
import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.Partition;
import org.apache.helix.model.ResourceAssignment;
+import org.apache.log4j.Logger;
import com.google.common.base.Function;
import com.google.common.collect.BiMap;
@@ -49,6 +51,8 @@ import com.google.common.collect.Sets;
* assignment to target partitions and states of another resource
*/
public class GenericTaskAssignmentCalculator extends TaskAssignmentCalculator {
+ private static final Logger LOG = Logger.getLogger(GenericTaskAssignmentCalculator.class);
+
/** Reassignment policy for this algorithm */
private RetryPolicy _retryPolicy = new DefaultRetryReassigner();
http://git-wip-us.apache.org/repos/asf/helix/blob/947a7d55/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
index d26c83b..7a4e2d3 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobConfig.java
@@ -117,7 +117,12 @@ public class JobConfig {
/**
* The type of the job
*/
- JobType
+ JobType,
+
+ /**
+ * The instance group that task assign to
+ */
+ InstanceGroupTag
}
//Default property values
@@ -133,6 +138,7 @@ public class JobConfig {
private final String _workflow;
private final String _targetResource;
private final String _jobType;
+ private final String _instanceGroupTag;
private final List<String> _targetPartitions;
private final Set<String> _targetPartitionStates;
private final String _command;
@@ -152,7 +158,7 @@ public class JobConfig {
long timeoutPerTask, int numConcurrentTasksPerInstance, int maxAttemptsPerTask,
int maxForcedReassignmentsPerTask, int failureThreshold, long retryDelay,
boolean disableExternalView, boolean ignoreDependentJobFailure,
- Map<String, TaskConfig> taskConfigMap, String jobType) {
+ Map<String, TaskConfig> taskConfigMap, String jobType, String instanceGroupTag) {
_workflow = workflow;
_targetResource = targetResource;
_targetPartitions = targetPartitions;
@@ -173,6 +179,7 @@ public class JobConfig {
_taskConfigMap = Collections.emptyMap();
}
_jobType = jobType;
+ _instanceGroupTag = instanceGroupTag;
}
public String getWorkflow() {
@@ -274,9 +281,12 @@ public class JobConfig {
"" + _numConcurrentTasksPerInstance);
cfgMap.put(JobConfigProperty.IgnoreDependentJobFailure.name(),
Boolean.toString(_ignoreDependentJobFailure));
- if (_jobType != null) {
- cfgMap.put(JobConfigProperty.JobType.name(), _jobType);
- }
+ if (_jobType != null) {
+ cfgMap.put(JobConfigProperty.JobType.name(), _jobType);
+ }
+ if (_instanceGroupTag != null) {
+ cfgMap.put(JobConfigProperty.InstanceGroupTag.name(), _instanceGroupTag);
+ }
return cfgMap;
}
@@ -284,6 +294,10 @@ public class JobConfig {
return _jobType;
}
+ public String getInstanceGroupTag() {
+ return _instanceGroupTag;
+ }
+
public static JobConfig fromHelixProperty(HelixProperty property)
throws IllegalArgumentException {
Map<String, String> configs = property.getRecord().getSimpleFields();
@@ -297,6 +311,7 @@ public class JobConfig {
private String _workflow;
private String _targetResource;
private String _jobType;
+ private String _instanceGroupTag;
private List<String> _targetPartitions;
private Set<String> _targetPartitionStates;
private String _command;
@@ -317,7 +332,8 @@ public class JobConfig {
return new JobConfig(_workflow, _targetResource, _targetPartitions, _targetPartitionStates,
_command, _commandConfig, _timeoutPerTask, _numConcurrentTasksPerInstance,
_maxAttemptsPerTask, _maxForcedReassignmentsPerTask, _failureThreshold, _retryDelay,
- _disableExternalView, _ignoreDependentJobFailure, _taskConfigMap, _jobType);
+ _disableExternalView, _ignoreDependentJobFailure, _taskConfigMap, _jobType,
+ _instanceGroupTag);
}
/**
@@ -382,6 +398,9 @@ public class JobConfig {
if (cfg.containsKey(JobConfigProperty.JobType.name())) {
b.setJobType(cfg.get(JobConfigProperty.JobType.name()));
}
+ if (cfg.containsKey(JobConfigProperty.InstanceGroupTag.name())) {
+ b.setInstanceGroupTag(cfg.get(JobConfigProperty.InstanceGroupTag.name()));
+ }
return b;
}
@@ -474,6 +493,11 @@ public class JobConfig {
return this;
}
+ public Builder setInstanceGroupTag(String instanceGroupTag) {
+ _instanceGroupTag = instanceGroupTag;
+ return this;
+ }
+
private void validate() {
if (_taskConfigMap.isEmpty() && _targetResource == null) {
throw new IllegalArgumentException(
@@ -555,6 +579,9 @@ public class JobConfig {
if (jobBean.jobType != null) {
b.setJobType(jobBean.jobType);
}
+ if (jobBean.instanceGroupTag != null) {
+ b.setInstanceGroupTag(jobBean.instanceGroupTag);
+ }
return b;
}
http://git-wip-us.apache.org/repos/asf/helix/blob/947a7d55/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
index fae7ac7..c181ba5 100644
--- a/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/JobRebalancer.java
@@ -128,11 +128,19 @@ public class JobRebalancer extends TaskRebalancer {
// is stored in zk.
// Fetch the previous resource assignment from the property store. This is required because of
// HELIX-230.
+ Set<String> liveInstances = jobCfg.getInstanceGroupTag() == null
+ ? clusterData.getLiveInstances().keySet()
+ : clusterData.getAllEnabledInstanceWithTag(clusterData.getLiveInstances().keySet(),
+ jobCfg.getInstanceGroupTag());
+
+ if (liveInstances.isEmpty()) {
+ LOG.error("No available instance found for job!");
+ }
+
Set<Integer> partitionsToDrop = new TreeSet<Integer>();
ResourceAssignment newAssignment =
- computeResourceMapping(jobName, workflowCfg, jobCfg, prevAssignment, clusterData
- .getLiveInstances().keySet(), currStateOutput, workflowCtx, jobCtx, partitionsToDrop,
- clusterData);
+ computeResourceMapping(jobName, workflowCfg, jobCfg, prevAssignment, liveInstances,
+ currStateOutput, workflowCtx, jobCtx, partitionsToDrop, clusterData);
if (!partitionsToDrop.isEmpty()) {
for (Integer pId : partitionsToDrop) {
http://git-wip-us.apache.org/repos/asf/helix/blob/947a7d55/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
index 2d4ca75..b4f25d5 100644
--- a/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
+++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowRebalancer.java
@@ -208,6 +208,10 @@ public class WorkflowRebalancer extends TaskRebalancer {
builder.setNumPartitions(numPartitions);
builder.setStateModel(TaskConstants.STATE_MODEL_NAME);
+ if (jobConfig.getInstanceGroupTag() != null) {
+ builder.setNodeGroup(jobConfig.getInstanceGroupTag());
+ }
+
if (jobConfig.isDisableExternalView()) {
builder.setDisableExternalView(true);
}
http://git-wip-us.apache.org/repos/asf/helix/blob/947a7d55/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java b/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
index 0080cc6..dd7ebab 100644
--- a/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
+++ b/helix-core/src/main/java/org/apache/helix/task/beans/JobBean.java
@@ -32,6 +32,7 @@ public class JobBean {
public List<String> parents;
public String targetResource;
public String jobType;
+ public String instanceGroupTag;
public List<String> targetPartitionStates;
public List<String> targetPartitions;
public String command;
http://git-wip-us.apache.org/repos/asf/helix/blob/947a7d55/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskAssignment.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskAssignment.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskAssignment.java
index df976b1..a22b63d 100644
--- a/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskAssignment.java
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestTaskAssignment.java
@@ -19,8 +19,14 @@ package org.apache.helix.integration.task;
* under the License.
*/
+import java.util.ArrayList;
+import java.util.List;
+
import org.apache.helix.TestHelper;
import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobQueue;
+import org.apache.helix.task.TaskConfig;
+import org.apache.helix.task.TaskState;
import org.apache.helix.task.TaskUtil;
import org.apache.helix.task.Workflow;
import org.testng.Assert;
@@ -57,6 +63,37 @@ public class TestTaskAssignment extends TaskTestBase {
// The task is not assigned so the task state should be null in this case.
Assert.assertNull(
_driver.getJobContext(TaskUtil.getNamespacedJobName(jobResource)).getPartitionState(0));
+ }
+
+ @Test
+ public void testGenericTaskInstanceGroup() throws InterruptedException {
+ // Disable the only instance can be assigned.
+ String queueName = TestHelper.getTestMethodName();
+ String jobName = "Job4InstanceGroup";
+ JobQueue.Builder queueBuilder = TaskTestUtil.buildJobQueue(queueName);
+ JobConfig.Builder jobConfig = new JobConfig.Builder();
+
+ List<TaskConfig> taskConfigs = new ArrayList<TaskConfig>();
+ int num_tasks = 3;
+ for (int j = 0; j < num_tasks; j++) {
+ taskConfigs.add(
+ new TaskConfig.Builder().setTaskId("task_" + j).setCommand(MockTask.TASK_COMMAND)
+ .build());
+ }
+
+ jobConfig.addTaskConfigs(taskConfigs);
+ jobConfig.setInstanceGroupTag("TESTTAG1");
+
+ queueBuilder.enqueueJob(jobName, jobConfig);
+ _driver.start(queueBuilder.build());
+
+ // Wait 1 sec. The task should not be complete since it is not assigned.
+ Thread.sleep(1000L);
+
+ // The task is not assigned so the task state should be null in this case.
+ String namedSpaceJob = TaskUtil.getNamespacedJobName(queueName, jobName);
+ Assert.assertEquals(_driver.getJobContext(namedSpaceJob).getAssignedParticipant(0),
+ _participants[1].getInstanceName());
}
}