You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@helix.apache.org by jx...@apache.org on 2018/06/28 21:17:06 UTC

helix git commit: Support batch add jobs to JobQueue

Repository: helix
Updated Branches:
  refs/heads/master 94ac4253b -> 6e047915d


Support batch add jobs to JobQueue

To better support compliance event emition request, we can have an new API to enque jobs with required orders.

With this API:
1. Batch added jobs should keep the order of input set.
2. Jobs should be stay in the same bucket.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/6e047915
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/6e047915
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/6e047915

Branch: refs/heads/master
Commit: 6e047915d7c4aefb976ca888dcba2b4b684bec90
Parents: 94ac425
Author: Junkai Xue <jx...@linkedin.com>
Authored: Wed Apr 25 13:39:20 2018 -0700
Committer: Junkai Xue <jx...@jxue-mn2.linkedin.biz>
Committed: Tue Jun 26 17:53:18 2018 -0700

----------------------------------------------------------------------
 .../java/org/apache/helix/task/TaskDriver.java  | 119 +++++++++++++------
 .../integration/task/TestBatchAddJobs.java      | 109 +++++++++++++++++
 2 files changed, 193 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/6e047915/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
index a628dd3..b04534f 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java
@@ -329,6 +329,20 @@ public class TaskDriver {
    */
   public void enqueueJob(final String queue, final String job,
       JobConfig.Builder jobBuilder) {
+    enqueueJobs(queue, Collections.singletonList(job), Collections.singletonList(jobBuilder));
+  }
+
+  /**
+   * Batch add jobs to queues that garantee
+   *
+   * @param queue
+   * @param jobs
+   * @param jobBuilders
+   */
+  public void enqueueJobs(final String queue, final List<String> jobs,
+      final List<JobConfig.Builder> jobBuilders) {
+
+
     // Get the job queue config and capacity
     WorkflowConfig workflowConfig = TaskUtil.getWorkflowConfig(_accessor, queue);
     if (workflowConfig == null) {
@@ -357,14 +371,29 @@ public class TaskDriver {
     }
 
     validateZKNodeLimitation(1);
-
-    // Create the job to ensure that it validates
-    JobConfig jobConfig = jobBuilder.setWorkflow(queue).build();
-    final String namespacedJobName = TaskUtil.getNamespacedJobName(queue, job);
-
-    // add job config first.
-    addJobConfig(namespacedJobName, jobConfig);
-    final String jobType = jobConfig.getJobType();
+    final List<JobConfig> jobConfigs = new ArrayList<>();
+    final List<String> namespacedJobNames = new ArrayList<>();
+    final List<String> jobTypeList = new ArrayList<>();
+
+    try {
+      for (int i = 0; i < jobBuilders.size(); i++) {
+        // Create the job to ensure that it validates
+        JobConfig jobConfig = jobBuilders.get(i).setWorkflow(queue).build();
+        String namespacedJobName = TaskUtil.getNamespacedJobName(queue, jobs.get(i));
+
+        // add job config first.
+        addJobConfig(namespacedJobName, jobConfig);
+        jobConfigs.add(jobConfig);
+        namespacedJobNames.add(namespacedJobName);
+        jobTypeList.add(jobConfig.getJobType());
+      }
+    } catch (HelixException e) {
+      LOG.error("Failed to add job configs {}. Remove them all!", jobs.toString());
+      for (String job : jobs) {
+        String namespacedJobName = TaskUtil.getNamespacedJobName(queue, job);
+        TaskUtil.removeJobConfig(_accessor, namespacedJobName);
+      }
+    }
 
     // update the job dag to append the job to the end of the queue.
     DataUpdater<ZNRecord> updater = new DataUpdater<ZNRecord>() {
@@ -374,53 +403,73 @@ public class TaskDriver {
         JobDag jobDag = JobDag.fromJson(
             currentData.getSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name()));
         Set<String> allNodes = jobDag.getAllNodes();
-        if (capacity > 0 && allNodes.size() >= capacity) {
-          throw new IllegalStateException(
-              "Queue " + queue + " already reaches its max capacity, failed to add " + job);
+        if (capacity > 0 && allNodes.size() + jobConfigs.size() >= capacity) {
+          throw new IllegalStateException(String
+              .format("Queue %s already reaches its max capacity %f, failed to add %s", capacity,
+                  queue, jobs.toString()));
         }
-        if (allNodes.contains(namespacedJobName)) {
-          throw new IllegalStateException(
-              "Could not add to queue " + queue + ", job " + job + " already exists");
-        }
-        jobDag.addNode(namespacedJobName);
-
-        // Add the node to the end of the queue
-        String candidate = null;
-        for (String node : allNodes) {
-          if (!node.equals(namespacedJobName) && jobDag.getDirectChildren(node).isEmpty()) {
-            candidate = node;
-            break;
+
+        String lastNodeName = null;
+        for (int i = 0; i < namespacedJobNames.size(); i++) {
+          String namespacedJobName = namespacedJobNames.get(i);
+          if (allNodes.contains(namespacedJobName)) {
+            throw new IllegalStateException(String
+                .format("Could not add to queue %s, job %s already exists", queue, jobs.get(i)));
+          }
+          jobDag.addNode(namespacedJobName);
+
+          // Add the node to the end of the queue
+          String candidate = null;
+          if (lastNodeName == null) {
+            for (String node : allNodes) {
+              if (!node.equals(namespacedJobName) && jobDag.getDirectChildren(node).isEmpty()) {
+                candidate = node;
+                break;
+              }
+            }
+          } else {
+            candidate = lastNodeName;
+          }
+          if (candidate != null) {
+            jobDag.addParentToChild(candidate, namespacedJobName);
+            lastNodeName = namespacedJobName;
           }
-        }
-        if (candidate != null) {
-          jobDag.addParentToChild(candidate, namespacedJobName);
         }
 
         // Add job type if job type is not null
-        if (jobType != null) {
-          Map<String, String> jobTypes =
-              currentData.getMapField(WorkflowConfig.WorkflowConfigProperty.JobTypes.name());
-          if (jobTypes == null) {
-            jobTypes = new HashMap<String, String>();
+        Map<String, String> jobTypes =
+            currentData.getMapField(WorkflowConfig.WorkflowConfigProperty.JobTypes.name());
+        for (String jobType : jobTypeList) {
+          if (jobType != null) {
+            if (jobTypes == null) {
+              jobTypes = new HashMap<>();
+            }
+            jobTypes.put(queue, jobType);
           }
-          jobTypes.put(queue, jobType);
-          currentData.setMapField(WorkflowConfig.WorkflowConfigProperty.JobTypes.name(), jobTypes);
         }
 
+        if (jobTypes != null) {
+          currentData.setMapField(WorkflowConfig.WorkflowConfigProperty.JobTypes.name(), jobTypes);
+        }
         // Save the updated DAG
         try {
           currentData
               .setSimpleField(WorkflowConfig.WorkflowConfigProperty.Dag.name(), jobDag.toJson());
         } catch (Exception e) {
-          throw new IllegalStateException("Could not add job " + job + " to queue " + queue,
-              e);
+          throw new IllegalStateException(
+              String.format("Could not add jobs %s to queue %s", jobs.toString(), queue), e);
         }
         return currentData;
       }
     };
+
     String path = _accessor.keyBuilder().resourceConfig(queue).getPath();
     boolean status = _accessor.getBaseDataAccessor().update(path, updater, AccessOption.PERSISTENT);
     if (!status) {
+      LOG.error("Failed to update WorkflowConfig, remove all jobs {}", jobs.toString());
+      for (String job : jobs) {
+        TaskUtil.removeJobConfig(_accessor, job);
+      }
       throw new HelixException("Failed to enqueue job");
     }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/6e047915/helix-core/src/test/java/org/apache/helix/integration/task/TestBatchAddJobs.java
----------------------------------------------------------------------
diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestBatchAddJobs.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestBatchAddJobs.java
new file mode 100644
index 0000000..d50845e
--- /dev/null
+++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestBatchAddJobs.java
@@ -0,0 +1,109 @@
+package org.apache.helix.integration.task;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.UUID;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.TestHelper;
+import org.apache.helix.integration.common.ZkIntegrationTestBase;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.task.JobConfig;
+import org.apache.helix.task.JobDag;
+import org.apache.helix.task.JobQueue;
+import org.apache.helix.task.TaskConfig;
+import org.apache.helix.task.TaskDriver;
+import org.apache.helix.task.WorkflowConfig;
+import org.apache.helix.tools.ClusterSetup;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestBatchAddJobs extends ZkIntegrationTestBase {
+  private static final String CLUSTER_NAME = CLUSTER_PREFIX + "_TestBatchAddJobs";
+  private static final String QUEUE_NAME = "TestBatchAddJobQueue";
+  private ClusterSetup _setupTool;
+  private List<SubmitJobTask> _submitJobTasks;
+
+  @BeforeClass
+  public void beforeClass() {
+    String namespace = "/" + CLUSTER_NAME;
+    if (_gZkClient.exists(namespace)) {
+      _gZkClient.deleteRecursively(namespace);
+    }
+
+    _setupTool = new ClusterSetup(ZK_ADDR);
+    _setupTool.addCluster(CLUSTER_NAME, true);
+    _submitJobTasks = new ArrayList<>();
+
+  }
+
+  @Test
+  public void testBatchAddJobs() throws Exception {
+    TaskDriver driver = new TaskDriver(_gZkClient, CLUSTER_NAME);
+    driver.createQueue(new JobQueue.Builder(QUEUE_NAME).build());
+    for (int i = 0; i < 10; i++) {
+      _submitJobTasks.add(new SubmitJobTask(ZK_ADDR, i));
+      _submitJobTasks.get(i).start();
+    }
+
+    WorkflowConfig workflowConfig = driver.getWorkflowConfig(QUEUE_NAME);
+    while (workflowConfig.getJobDag().getAllNodes().size() < 100) {
+      Thread.sleep(50);
+      driver.getWorkflowConfig(QUEUE_NAME);
+    }
+
+    JobDag dag = workflowConfig.getJobDag();
+    String currentJob = dag.getAllNodes().iterator().next();
+    while (dag.getDirectChildren(currentJob).size() > 0) {
+      String childJob = dag.getDirectChildren(currentJob).iterator().next();
+      if (!getPrefix(currentJob).equals(getPrefix(childJob))
+          && currentJob.charAt(currentJob.length() - 1) != '9') {
+        Assert.fail();
+      }
+      currentJob = childJob;
+    }
+  }
+
+  private String getPrefix(String job) {
+    return job.split("#")[0];
+  }
+
+  @AfterClass
+  public void afterClass() {
+    for (SubmitJobTask submitJobTask : _submitJobTasks) {
+      submitJobTask.interrupt();
+    }
+  }
+
+  static class SubmitJobTask extends Thread {
+    private TaskDriver _driver;
+    private String _jobPrefixName;
+
+    public SubmitJobTask(String zkAddress, int index) throws Exception {
+      HelixManager manager = HelixManagerFactory
+          .getZKHelixManager(CLUSTER_NAME, "Administrator", InstanceType.ADMINISTRATOR, zkAddress);
+      manager.connect();
+      _driver = new TaskDriver(manager);
+      _jobPrefixName = "JOB_" + index + "#";
+    }
+
+    @Override
+    public void start() {
+      List<String> jobNames = new ArrayList<>();
+      List<JobConfig.Builder> jobConfigBuilders = new ArrayList<>();
+      for (int i = 0; i < 10; i++) {
+        String jobName = _jobPrefixName + i;
+        jobNames.add(jobName);
+        jobConfigBuilders.add(new JobConfig.Builder().addTaskConfigs(Collections
+            .singletonList(new TaskConfig("CMD", null, UUID.randomUUID().toString(), "TARGET"))));
+      }
+
+      _driver.enqueueJobs(QUEUE_NAME, jobNames, jobConfigBuilders);
+    }
+  }
+}