You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ka...@apache.org on 2022/10/31 09:48:31 UTC

[druid] branch master updated: Correct task status returned by controller (#13288)

This is an automated email from the ASF dual-hosted git repository.

karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 675fd982fb Correct task status returned by controller (#13288)
675fd982fb is described below

commit 675fd982fb5ca274b057495a90563ecc248ad823
Author: Adarsh Sanjeev <ad...@gmail.com>
AuthorDate: Mon Oct 31 15:18:19 2022 +0530

    Correct task status returned by controller (#13288)
    
    * Correct worker status returned by controller
    
    * Address review comments
---
 .../org/apache/druid/msq/exec/ControllerImpl.java  |  7 ++--
 .../druid/msq/indexing/MSQWorkerTaskLauncher.java  |  5 +--
 .../org/apache/druid/msq/indexing/WorkerCount.java | 46 ++++++++++++++++++++++
 3 files changed, 52 insertions(+), 6 deletions(-)

diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index 8686abbc2d..eb7609c32e 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -90,6 +90,7 @@ import org.apache.druid.msq.indexing.MSQTuningConfig;
 import org.apache.druid.msq.indexing.MSQWorkerTaskLauncher;
 import org.apache.druid.msq.indexing.SegmentGeneratorFrameProcessorFactory;
 import org.apache.druid.msq.indexing.TaskReportMSQDestination;
+import org.apache.druid.msq.indexing.WorkerCount;
 import org.apache.druid.msq.indexing.error.CanceledFault;
 import org.apache.druid.msq.indexing.error.CannotParseExternalDataFault;
 import org.apache.druid.msq.indexing.error.FaultsExceededChecker;
@@ -1817,9 +1818,9 @@ public class ControllerImpl implements Controller
     int runningTasks = 1;
 
     if (taskLauncher != null) {
-      Pair<Integer, Integer> workerTaskStatus = taskLauncher.getWorkerTaskStatus();
-      pendingTasks = workerTaskStatus.lhs;
-      runningTasks = workerTaskStatus.rhs + 1; // To account for controller.
+      WorkerCount workerTaskCount = taskLauncher.getWorkerTaskCount();
+      pendingTasks = workerTaskCount.getPendingWorkerCount();
+      runningTasks = workerTaskCount.getRunningWorkerCount() + 1; // To account for controller.
     }
     return new MSQStatusReport(
         taskState,
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
index 12c869d23c..2273822402 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
@@ -30,7 +30,6 @@ import org.apache.druid.indexer.TaskLocation;
 import org.apache.druid.indexer.TaskState;
 import org.apache.druid.indexer.TaskStatus;
 import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.Pair;
 import org.apache.druid.java.util.common.StringUtils;
 import org.apache.druid.java.util.common.concurrent.Execs;
 import org.apache.druid.java.util.common.logger.Logger;
@@ -350,12 +349,12 @@ public class MSQWorkerTaskLauncher
    * Returns a pair which contains the number of currently running worker tasks and the number of worker tasks that are
    * not yet fully started as left and right respectively.
    */
-  public Pair<Integer, Integer> getWorkerTaskStatus()
+  public WorkerCount getWorkerTaskCount()
   {
     synchronized (taskIds) {
       int runningTasks = fullyStartedTasks.size();
       int pendingTasks = desiredTaskCount - runningTasks;
-      return Pair.of(runningTasks, pendingTasks);
+      return new WorkerCount(runningTasks, pendingTasks);
     }
   }
 
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/WorkerCount.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/WorkerCount.java
new file mode 100644
index 0000000000..83031468e0
--- /dev/null
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/WorkerCount.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.indexing;
+
+/**
+ * Information about current status of worker tasks
+ */
+public class WorkerCount
+{
+  private final int runningWorkerCount;
+
+  private final int pendingWorkerCount;
+
+  public WorkerCount(int runningWorkerCount, int pendingWorkerCount)
+  {
+    this.runningWorkerCount = runningWorkerCount;
+    this.pendingWorkerCount = pendingWorkerCount;
+  }
+
+  public int getRunningWorkerCount()
+  {
+    return runningWorkerCount;
+  }
+
+  public int getPendingWorkerCount()
+  {
+    return pendingWorkerCount;
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org