You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/06/28 21:17:06 UTC
helix git commit: Support batch add jobs to JobQueue
Repository: helix
Updated Branches:
refs/heads/master 94ac4253b -> 6e047915d
Support batch add jobs to JobQueue
To better support compliance event emition request, we can have an new API to enque jobs with required orders.
With this API:
1. Batch added jobs should keep the order of input set.
2. Jobs should be stay in the same bucket.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/6e047915
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/6e047915
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/6e047915
Branch: refs/heads/master
Commit: 6e047915d7c4aefb976ca888dcba2b4b684bec90
Parents: 94ac425
Author: Junkai Xue <jx...@linkedin.com>
Authored: Wed Apr 25 13:39:20 2018 -0700
Committer: Junkai Xue <jx...@jxue-mn2.linkedin.biz>
Committed: Tue Jun 26 17:53:18 2018 -0700
----------------------------------------------------------------------
.../java/org/apache/helix/task/TaskDriver.java | 119 +++++++++++++------
.../integration/task/TestBatchAddJobs.java | 109 +++++++++++++++++
2 files changed, 193 insertions(+), 35 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/helix/blob/6e047915/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index a628dd3..b04534f 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -329,6 +329,20 @@ public class TaskDriver {
*/
public void enqueueJob(final String queue, final String job,
JobConfig.Builder jobBuilder) {
+ enqueueJobs(queue, Collections.singletonList(job), Collections.singletonList(jobBuilder));
+ }
+
+ /**
+ * Batch add jobs to queues that garantee
+ *
+ * @param queue
+ * @param jobs
+ * @param jobBuilders
+ */
+ public void enqueueJobs(final String queue, final List<String> jobs,
+ final List<JobConfig.Builder> jobBuilders) {
+
+
// Get the job queue config and capacity
WorkflowConfig workflowConfig = TaskUtil.getWorkflowConfig(_accessor, queue);
if (workflowConfig == null) {
@@ -357,14 +371,29 @@ public class TaskDriver {
}
validateZKNodeLimitation(1);
-
- // Create the job to ensure that it validates
- JobConfig jobConfig = jobBuilder.setWorkflow(queue).build();
- final String namespacedJobName = TaskUtil.getNamespacedJobName(queue, job);
-
- // add job config first.
- addJobConfig(namespacedJobName, jobConfig);
- final String jobType = jobConfig.getJobType();
+ final List<JobConfig> jobConfigs = new ArrayList<>();
+ final List<String> namespacedJobNames = new ArrayList<>();
+ final List<String> jobTypeList = new ArrayList<>();
+
+ try {
+ for (int i = 0; i < jobBuilders.size(); i++) {
+ // Create the job to ensure that it validates
+ JobConfig jobConfig = jobBuilders.get(i).setWorkflow(queue).build();
+ String namespacedJobName = TaskUtil.getNamespacedJobName(queue, jobs.get(i));
+
+ // add job config first.
+ addJobConfig(namespacedJobName, jobConfig);
+ jobConfigs.add(jobConfig);
+ namespacedJobNames.add(namespacedJobName);
+ jobTypeList.add(jobConfig.getJobType());
+ }
+ } catch (HelixException e) {
+ LOG.error("Failed to add job configs {}. Remove them all!", jobs.toString());
+ for (String job : jobs) {
+ String namespacedJobName = TaskUtil.getNamespacedJobName(queue, job);
+ TaskUtil.removeJobConfig(_accessor, namespacedJobName);
+ }
+ }
// update the job dag to append the job to the end of the queue.
DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>() {
@@ -374,53 +403,73 @@ public class TaskDriver {
JobDag jobDag = JobDag.fromJson(
currentData.getSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name()));
Set<String> allNodes = jobDag.getAllNodes();
- if (capacity > 0 && allNodes.size() >= capacity) {
- throw new IllegalStateException(
- "Queue " + queue + " already reaches its max capacity, failed to add " + job);
+ if (capacity > 0 && allNodes.size() + jobConfigs.size() >= capacity) {
+ throw new IllegalStateException(String
+ .format("Queue %s already reaches its max capacity %f, failed to add %s", capacity,
+ queue, jobs.toString()));
}
- if (allNodes.contains(namespacedJobName)) {
- throw new IllegalStateException(
- "Could not add to queue " + queue + ", job " + job + " already exists");
- }
- jobDag.addNode(namespacedJobName);
-
- // Add the node to the end of the queue
- String candidate = null;
- for (String node : allNodes) {
- if (!node.equals(namespacedJobName) && jobDag.getDirectChildren(node).isEmpty()) {
- candidate = node;
- break;
+
+ String lastNodeName = null;
+ for (int i = 0; i < namespacedJobNames.size(); i++) {
+ String namespacedJobName = namespacedJobNames.get(i);
+ if (allNodes.contains(namespacedJobName)) {
+ throw new IllegalStateException(String
+ .format("Could not add to queue %s, job %s already exists", queue, jobs.get(i)));
+ }
+ jobDag.addNode(namespacedJobName);
+
+ // Add the node to the end of the queue
+ String candidate = null;
+ if (lastNodeName == null) {
+ for (String node : allNodes) {
+ if (!node.equals(namespacedJobName) && jobDag.getDirectChildren(node).isEmpty()) {
+ candidate = node;
+ break;
+ }
+ }
+ } else {
+ candidate = lastNodeName;
+ }
+ if (candidate != null) {
+ jobDag.addParentToChild(candidate, namespacedJobName);
+ lastNodeName = namespacedJobName;
}
- }
- if (candidate != null) {
- jobDag.addParentToChild(candidate, namespacedJobName);
}
// Add job type if job type is not null
- if (jobType != null) {
- Map<String, String> jobTypes =
- currentData.getMapField(WorkflowConfig.WorkflowConfigProperty.JobTypes.name());
- if (jobTypes == null) {
- jobTypes = new HashMap<String, String>();
+ Map<String, String> jobTypes =
+ currentData.getMapField(WorkflowConfig.WorkflowConfigProperty.JobTypes.name());
+ for (String jobType : jobTypeList) {
+ if (jobType != null) {
+ if (jobTypes == null) {
+ jobTypes = new HashMap<>();
+ }
+ jobTypes.put(queue, jobType);
}
- jobTypes.put(queue, jobType);
- currentData.setMapField(WorkflowConfig.WorkflowConfigProperty.JobTypes.name(), jobTypes);
}
+ if (jobTypes != null) {
+ currentData.setMapField(WorkflowConfig.WorkflowConfigProperty.JobTypes.name(), jobTypes);
+ }
// Save the updated DAG
try {
currentData
.setSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name(), jobDag.toJson());
} catch (Exception e) {
- throw new IllegalStateException("Could not add job " + job + " to queue " + queue,
- e);
+ throw new IllegalStateException(
+ String.format("Could not add jobs %s to queue %s", jobs.toString(), queue), e);
}
return currentData;
}
};
+
String path = _accessor.keyBuilder().resourceConfig(queue).getPath();
boolean status = _accessor.getBaseDataAccessor().update(path, updater, AccessOption.PERSISTENT);
if (!status) {
+ LOG.error("Failed to update WorkflowConfig, remove all jobs {}", jobs.toString());
+ for (String job : jobs) {
+ TaskUtil.removeJobConfig(_accessor, job);
+ }
throw new HelixException("Failed to enqueue job");
}
http://git-wip-us.apache.org/repos/asf/helix/blob/6e047915/helix-core/src/test/java/org/apache/helix/integration/task/TestBatchAddJobs.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestBatchAddJobs.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestBatchAddJobs.java
new file mode 100644
index 0000000..d50845e
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestBatchAddJobs.java
@@ -0,0 +1,109 @@
+package org.apache.helix.integration.task;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.integration.common.ZkIntegrationTestBase;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobDag;
+import org.apache.helix.task.JobQueue;
+import org.apache.helix.task.TaskConfig;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.WorkflowConfig;
+import org.apache.helix.tools.ClusterSetup;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestBatchAddJobs extends ZkIntegrationTestBase {
+ private static final String CLUSTER_NAME = CLUSTER_PREFIX + "_TestBatchAddJobs";
+ private static final String QUEUE_NAME = "TestBatchAddJobQueue";
+ private ClusterSetup _setupTool;
+ private List<SubmitJobTask> _submitJobTasks;
+
+ @BeforeClass
+ public void beforeClass() {
+ String namespace = "/" + CLUSTER_NAME;
+ if (_gZkClient.exists(namespace)) {
+ _gZkClient.deleteRecursively(namespace);
+ }
+
+ _setupTool = new ClusterSetup(ZK_ADDR);
+ _setupTool.addCluster(CLUSTER_NAME, true);
+ _submitJobTasks = new ArrayList<>();
+
+ }
+
+ @Test
+ public void testBatchAddJobs() throws Exception {
+ TaskDriver driver = new TaskDriver(_gZkClient, CLUSTER_NAME);
+ driver.createQueue(new JobQueue.Builder(QUEUE_NAME).build());
+ for (int i = 0; i < 10; i++) {
+ _submitJobTasks.add(new SubmitJobTask(ZK_ADDR, i));
+ _submitJobTasks.get(i).start();
+ }
+
+ WorkflowConfig workflowConfig = driver.getWorkflowConfig(QUEUE_NAME);
+ while (workflowConfig.getJobDag().getAllNodes().size() < 100) {
+ Thread.sleep(50);
+ driver.getWorkflowConfig(QUEUE_NAME);
+ }
+
+ JobDag dag = workflowConfig.getJobDag();
+ String currentJob = dag.getAllNodes().iterator().next();
+ while (dag.getDirectChildren(currentJob).size() > 0) {
+ String childJob = dag.getDirectChildren(currentJob).iterator().next();
+ if (!getPrefix(currentJob).equals(getPrefix(childJob))
+ && currentJob.charAt(currentJob.length() - 1) != '9') {
+ Assert.fail();
+ }
+ currentJob = childJob;
+ }
+ }
+
+ private String getPrefix(String job) {
+ return job.split("#")[0];
+ }
+
+ @AfterClass
+ public void afterClass() {
+ for (SubmitJobTask submitJobTask : _submitJobTasks) {
+ submitJobTask.interrupt();
+ }
+ }
+
+ static class SubmitJobTask extends Thread {
+ private TaskDriver _driver;
+ private String _jobPrefixName;
+
+ public SubmitJobTask(String zkAddress, int index) throws Exception {
+ HelixManager manager = HelixManagerFactory
+ .getZKHelixManager(CLUSTER_NAME, "Administrator", InstanceType.ADMINISTRATOR, zkAddress);
+ manager.connect();
+ _driver = new TaskDriver(manager);
+ _jobPrefixName = "JOB_" + index + "#";
+ }
+
+ @Override
+ public void start() {
+ List<String> jobNames = new ArrayList<>();
+ List<JobConfig.Builder> jobConfigBuilders = new ArrayList<>();
+ for (int i = 0; i < 10; i++) {
+ String jobName = _jobPrefixName + i;
+ jobNames.add(jobName);
+ jobConfigBuilders.add(new JobConfig.Builder().addTaskConfigs(Collections
+ .singletonList(new TaskConfig("CMD", null, UUID.randomUUID().toString(), "TARGET"))));
+ }
+
+ _driver.enqueueJobs(QUEUE_NAME, jobNames, jobConfigBuilders);
+ }
+ }
+}