You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@druid.apache.org by ka...@apache.org on 2022/10/31 09:48:31 UTC
[druid] branch master updated: Correct task status returned by controller (#13288)
This is an automated email from the ASF dual-hosted git repository.
karan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 675fd982fb Correct task status returned by controller (#13288)
675fd982fb is described below
commit 675fd982fb5ca274b057495a90563ecc248ad823
Author: Adarsh Sanjeev <ad...@gmail.com>
AuthorDate: Mon Oct 31 15:18:19 2022 +0530
Correct task status returned by controller (#13288)
* Correct worker status returned by controller
* Address review comments
---
.../org/apache/druid/msq/exec/ControllerImpl.java | 7 ++--
.../druid/msq/indexing/MSQWorkerTaskLauncher.java | 5 +--
.../org/apache/druid/msq/indexing/WorkerCount.java | 46 ++++++++++++++++++++++
3 files changed, 52 insertions(+), 6 deletions(-)
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index 8686abbc2d..eb7609c32e 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -90,6 +90,7 @@ import org.apache.druid.msq.indexing.MSQTuningConfig;
import org.apache.druid.msq.indexing.MSQWorkerTaskLauncher;
import org.apache.druid.msq.indexing.SegmentGeneratorFrameProcessorFactory;
import org.apache.druid.msq.indexing.TaskReportMSQDestination;
+import org.apache.druid.msq.indexing.WorkerCount;
import org.apache.druid.msq.indexing.error.CanceledFault;
import org.apache.druid.msq.indexing.error.CannotParseExternalDataFault;
import org.apache.druid.msq.indexing.error.FaultsExceededChecker;
@@ -1817,9 +1818,9 @@ public class ControllerImpl implements Controller
int runningTasks = 1;
if (taskLauncher != null) {
- Pair<Integer, Integer> workerTaskStatus = taskLauncher.getWorkerTaskStatus();
- pendingTasks = workerTaskStatus.lhs;
- runningTasks = workerTaskStatus.rhs + 1; // To account for controller.
+ WorkerCount workerTaskCount = taskLauncher.getWorkerTaskCount();
+ pendingTasks = workerTaskCount.getPendingWorkerCount();
+ runningTasks = workerTaskCount.getRunningWorkerCount() + 1; // To account for controller.
}
return new MSQStatusReport(
taskState,
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
index 12c869d23c..2273822402 100644
--- a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/MSQWorkerTaskLauncher.java
@@ -30,7 +30,6 @@ import org.apache.druid.indexer.TaskLocation;
import org.apache.druid.indexer.TaskState;
import org.apache.druid.indexer.TaskStatus;
import org.apache.druid.java.util.common.ISE;
-import org.apache.druid.java.util.common.Pair;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.java.util.common.logger.Logger;
@@ -350,12 +349,12 @@ public class MSQWorkerTaskLauncher
* Returns a pair which contains the number of currently running worker tasks and the number of worker tasks that are
* not yet fully started as left and right respectively.
*/
- public Pair<Integer, Integer> getWorkerTaskStatus()
+ public WorkerCount getWorkerTaskCount()
{
synchronized (taskIds) {
int runningTasks = fullyStartedTasks.size();
int pendingTasks = desiredTaskCount - runningTasks;
- return Pair.of(runningTasks, pendingTasks);
+ return new WorkerCount(runningTasks, pendingTasks);
}
}
diff --git a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/WorkerCount.java b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/WorkerCount.java
new file mode 100644
index 0000000000..83031468e0
--- /dev/null
+++ b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/indexing/WorkerCount.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.indexing;
+
+/**
+ * Information about current status of worker tasks
+ */
+public class WorkerCount
+{
+ private final int runningWorkerCount;
+
+ private final int pendingWorkerCount;
+
+ public WorkerCount(int runningWorkerCount, int pendingWorkerCount)
+ {
+ this.runningWorkerCount = runningWorkerCount;
+ this.pendingWorkerCount = pendingWorkerCount;
+ }
+
+ public int getRunningWorkerCount()
+ {
+ return runningWorkerCount;
+ }
+
+ public int getPendingWorkerCount()
+ {
+ return pendingWorkerCount;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@druid.apache.org
For additional commands, e-mail: commits-help@druid.apache.org