You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@helix.apache.org by GitBox <gi...@apache.org> on 2020/05/18 19:26:06 UTC

[GitHub] [helix] pkuwm commented on a change in pull request #1006: Fix the RuntimeJobDag issue for queues

pkuwm commented on a change in pull request #1006:
URL: https://github.com/apache/helix/pull/1006#discussion_r426842209



##########
File path: helix-core/src/test/java/org/apache/helix/integration/task/TestEnqueueJobs.java
##########
@@ -96,4 +102,60 @@ public void testJobSubmitGenericWorkflows() throws InterruptedException {
 
     _driver.pollForWorkflowState(workflowName, TaskState.COMPLETED);
   }
-}
\ No newline at end of file
+
+  @Test
+  public void testQueueParallelJobs() throws InterruptedException {
+    String queueName = TestHelper.getTestMethodName();
+    JobQueue.Builder builder = TaskTestUtil.buildJobQueue(queueName);
+    WorkflowConfig.Builder workflowCfgBuilder = new WorkflowConfig.Builder()
+        .setWorkflowId(queueName).setParallelJobs(3).setAllowOverlapJobAssignment(true);
+    _driver.start(builder.setWorkflowConfig(workflowCfgBuilder.build()).build());
+    JobConfig.Builder jobBuilder =
+        new JobConfig.Builder().setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB)
+            .setCommand(MockTask.TASK_COMMAND).setMaxAttemptsPerTask(2)
+            .setJobCommandConfigMap(Collections.singletonMap(MockTask.JOB_DELAY, "10000"));
+
+    // Add 4 jobs to the queue
+    for (int i = 0; i <= 3; i++) {
+      _driver.enqueueJob(queueName, "JOB" + i, jobBuilder);
+    }
+
+    // Wait until all of the enqueued jobs (Job0 to Job3) are finished
+    for (int i = 0; i <= 3; i++) {
+      _driver.pollForJobState(queueName, TaskUtil.getNamespacedJobName(queueName, "JOB" + i),
+          TaskState.COMPLETED);
+    }
+
+    // Stop the Controller
+    _controller.syncStop();
+
+    // Add 3 more jobs to the queue which should run in parallel after the Controller is started
+    for (int i = 4; i <= 6; i++) {
+      _driver.enqueueJob(queueName, "JOB" + i, jobBuilder);
+    }
+
+    // Start the Controller
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName);
+    _controller.syncStart();
+
+    // Wait until all of the newly added jobs (Job4 to Job6) are finished
+    for (int i = 4; i <= 6; i++) {
+      _driver.pollForJobState(queueName, TaskUtil.getNamespacedJobName(queueName, "JOB" + i),
+          TaskState.COMPLETED);
+    }
+
+    // Make sure the jobs have been running in parallel by checking the jobs start time and finish
+    // time
+    Set<Long> startTimes = new HashSet<>();
+    Set<Long> endTime = new HashSet<>();
+
+    for (int i = 4; i <= 6; i++) {
+      JobContext jobContext =
+          _driver.getJobContext(TaskUtil.getNamespacedJobName(queueName, "JOB" + i));
+      startTimes.add(jobContext.getStartTime());
+      endTime.add(jobContext.getFinishTime());
+    }
+    Assert.assertTrue(Collections.min(endTime) > Collections.max(startTimes));

Review comment:
       Could you help understand: If you are only checking one: maxEndtime > maxStartime, why are HashSets necessary? 

##########
File path: helix-core/src/main/java/org/apache/helix/task/RuntimeJobDag.java
##########
@@ -146,8 +146,13 @@ public boolean finishJob(String job) {
     }
     // Add finished job's successors to ready-list
     if (_isJobQueue) {
-      if (_lastJob != null && _parentsToChildren.containsKey(_lastJob)) {
-        _readyJobList.offer(_parentsToChildren.get(_lastJob).iterator().next());
+      while (_lastJob != null && _parentsToChildren.containsKey(_lastJob)) {
+        String nextJob = _parentsToChildren.get(_lastJob).iterator().next();
+        if (!_readyJobList.contains(nextJob)) {

Review comment:
       What's the time complexity of this operation?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@helix.apache.org
For additional commands, e-mail: reviews-help@helix.apache.org