You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ke...@apache.org on 2022/10/18 01:28:23 UTC

[dolphinscheduler] branch 3.1.1-prepare updated (39c1144fab -> 703f9991b4)

This is an automated email from the ASF dual-hosted git repository.

kerwin pushed a change to branch 3.1.1-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


    from 39c1144fab [Improvement][Task Plugin] Modify the comment of 'deployMode'. (#12163)
     new 5cfe6a96b4 [Improvement-12372][k8s] Update the deprecated k8s api (#12373)
     new 703f9991b4 [DS-12154][worker] Optimize the log printing of the worker module (#12183)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../dolphinscheduler/plugin/task/api/utils/K8sUtils.java  |  7 ++++++-
 .../server/worker/processor/TaskDispatchProcessor.java    |  5 +++--
 .../worker/processor/TaskExecuteResultAckProcessor.java   |  2 +-
 .../server/worker/processor/TaskRejectAckProcessor.java   |  3 +++
 .../server/worker/processor/TaskSavePointProcessor.java   | 15 +++++++++++----
 .../server/worker/runner/WorkerManagerThread.java         |  1 +
 6 files changed, 25 insertions(+), 8 deletions(-)


[dolphinscheduler] 02/02: [DS-12154][worker] Optimize the log printing of the worker module (#12183)

Posted by ke...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kerwin pushed a commit to branch 3.1.1-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git

commit 703f9991b42ef99ee9d153e527f13eea3f414c79
Author: sgw <56...@users.noreply.github.com>
AuthorDate: Thu Sep 29 16:01:29 2022 +0800

    [DS-12154][worker] Optimize the log printing of the worker module (#12183)
    
    * [DS-12154][worker] Optimize the log printing of the worker module according to the log specification.
---
 .../server/worker/processor/TaskDispatchProcessor.java    |  5 +++--
 .../worker/processor/TaskExecuteResultAckProcessor.java   |  2 +-
 .../server/worker/processor/TaskRejectAckProcessor.java   |  3 +++
 .../server/worker/processor/TaskSavePointProcessor.java   | 15 +++++++++++----
 .../server/worker/runner/WorkerManagerThread.java         |  1 +
 5 files changed, 19 insertions(+), 7 deletions(-)

diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessor.java
index 4bb0c92124..288a5d9b33 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessor.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskDispatchProcessor.java
@@ -95,7 +95,7 @@ public class TaskDispatchProcessor implements NettyRequestProcessor {
             return;
         }
         final String workflowMasterAddress = taskDispatchCommand.getMessageSenderAddress();
-        logger.info("task execute request message: {}", taskDispatchCommand);
+        logger.info("Receive task dispatch request, command: {}", taskDispatchCommand);
 
         TaskExecutionContext taskExecutionContext = taskDispatchCommand.getTaskExecutionContext();
 
@@ -133,7 +133,8 @@ public class TaskDispatchProcessor implements NettyRequestProcessor {
             if (!offer) {
                 logger.warn("submit task to wait queue error, queue is full, current queue size is {}, will send a task reject message to master", workerManager.getWaitSubmitQueueSize());
                 workerMessageSender.sendMessageWithRetry(taskExecutionContext, workflowMasterAddress, CommandType.TASK_REJECT);
-            }
+            } else
+                logger.info("Submit task to wait queue success, current queue size is {}", workerManager.getWaitSubmitQueueSize());
         } finally {
             LoggerUtils.removeWorkflowAndTaskInstanceIdMDC();
         }
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteResultAckProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteResultAckProcessor.java
index e59902d6fc..bff92e5d11 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteResultAckProcessor.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskExecuteResultAckProcessor.java
@@ -54,10 +54,10 @@ public class TaskExecuteResultAckProcessor implements NettyRequestProcessor {
             logger.error("task execute response ack command is null");
             return;
         }
-        logger.info("task execute response ack command : {}", taskExecuteAckMessage);
 
         try {
             LoggerUtils.setTaskInstanceIdMDC(taskExecuteAckMessage.getTaskInstanceId());
+            logger.info("Receive task execute response ack command : {}", taskExecuteAckMessage);
             if (taskExecuteAckMessage.isSuccess()) {
                 messageRetryRunner.removeRetryMessage(taskExecuteAckMessage.getTaskInstanceId(),
                         CommandType.TASK_EXECUTE_RESULT);
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskRejectAckProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskRejectAckProcessor.java
index a18223b90d..b6eb954b5d 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskRejectAckProcessor.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskRejectAckProcessor.java
@@ -47,10 +47,13 @@ public class TaskRejectAckProcessor implements NettyRequestProcessor {
         TaskRejectAckCommand taskRejectAckMessage = JSONUtils.parseObject(command.getBody(),
                 TaskRejectAckCommand.class);
         if (taskRejectAckMessage == null) {
+            logger.error("Receive task reject response, the response message is null");
             return;
         }
+
         try {
             LoggerUtils.setTaskInstanceIdMDC(taskRejectAckMessage.getTaskInstanceId());
+            logger.info("Receive task reject response ack command: {}", taskRejectAckMessage);
             if (taskRejectAckMessage.isSuccess()) {
                 messageRetryRunner.removeRetryMessage(taskRejectAckMessage.getTaskInstanceId(),
                         CommandType.TASK_REJECT);
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskSavePointProcessor.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskSavePointProcessor.java
index 899ac7d9b7..1a621a39e7 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskSavePointProcessor.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskSavePointProcessor.java
@@ -22,6 +22,7 @@ import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
 import io.netty.channel.ChannelFutureListener;
 import org.apache.dolphinscheduler.common.utils.JSONUtils;
+import org.apache.dolphinscheduler.common.utils.LoggerUtils;
 import org.apache.dolphinscheduler.plugin.task.api.AbstractTask;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
 import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContextCacheManager;
@@ -67,7 +68,7 @@ public class TaskSavePointProcessor implements NettyRequestProcessor {
             logger.error("task savepoint request command is null");
             return;
         }
-        logger.info("task savepoint command : {}", taskSavePointRequestCommand);
+        logger.info("Receive task savepoint command : {}", taskSavePointRequestCommand);
 
         int taskInstanceId = taskSavePointRequestCommand.getTaskInstanceId();
         TaskExecutionContext taskExecutionContext = TaskExecutionContextCacheManager.getByTaskInstanceId(taskInstanceId);
@@ -76,9 +77,14 @@ public class TaskSavePointProcessor implements NettyRequestProcessor {
             return;
         }
 
-        doSavePoint(taskInstanceId);
+        try {
+            LoggerUtils.setTaskInstanceIdMDC(taskInstanceId);
+            doSavePoint(taskInstanceId);
 
-        sendTaskSavePointResponseCommand(channel, taskExecutionContext);
+            sendTaskSavePointResponseCommand(channel, taskExecutionContext);
+        } finally {
+            LoggerUtils.removeTaskInstanceIdMDC();
+        }
     }
 
     private void sendTaskSavePointResponseCommand(Channel channel, TaskExecutionContext taskExecutionContext) {
@@ -89,7 +95,8 @@ public class TaskSavePointProcessor implements NettyRequestProcessor {
             public void operationComplete(ChannelFuture future) throws Exception {
                 if (!future.isSuccess()) {
                     logger.error("Submit kill response to master error, kill command: {}", taskSavePointResponseCommand);
-                }
+                } else
+                    logger.info("Submit kill response to master success, kill command: {}", taskSavePointResponseCommand);
             }
         });
     }
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
index 41824a9a44..1e6a0ba6b7 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerManagerThread.java
@@ -93,6 +93,7 @@ public class WorkerManagerThread implements Runnable {
 
     public boolean offer(WorkerDelayTaskExecuteRunnable workerDelayTaskExecuteRunnable) {
         if (waitSubmitQueue.size() > workerExecThreads) {
+            logger.warn("Wait submit queue is full, will retry submit task later");
             WorkerServerMetrics.incWorkerSubmitQueueIsFullCount();
             // if waitSubmitQueue is full, it will wait 1s, then try add
             ThreadUtils.sleep(Constants.SLEEP_TIME_MILLIS);


[dolphinscheduler] 01/02: [Improvement-12372][k8s] Update the deprecated k8s api (#12373)

Posted by ke...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kerwin pushed a commit to branch 3.1.1-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git

commit 5cfe6a96b41d95da067702d6dfcf499690b17f6c
Author: rickchengx <38...@users.noreply.github.com>
AuthorDate: Fri Oct 14 16:18:35 2022 +0800

    [Improvement-12372][k8s] Update the deprecated k8s api (#12373)
---
 .../apache/dolphinscheduler/plugin/task/api/utils/K8sUtils.java    | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)

diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/K8sUtils.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/K8sUtils.java
index 2733cebae3..c50596bb02 100644
--- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/K8sUtils.java
+++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/K8sUtils.java
@@ -44,6 +44,7 @@ public class K8sUtils {
     public void createJob(String namespace, Job job) {
         try {
             client.batch()
+                    .v1()
                     .jobs()
                     .inNamespace(namespace)
                     .create(job);
@@ -55,6 +56,7 @@ public class K8sUtils {
     public void deleteJob(String jobName, String namespace) {
         try {
             client.batch()
+                    .v1()
                     .jobs()
                     .inNamespace(namespace)
                     .withName(jobName)
@@ -81,7 +83,10 @@ public class K8sUtils {
     public Watch createBatchJobWatcher(String jobName, Watcher<Job> watcher) {
         try {
             return client.batch()
-                    .jobs().withName(jobName).watch(watcher);
+                    .v1()
+                    .jobs()
+                    .withName(jobName)
+                    .watch(watcher);
         } catch (Exception e) {
             throw new TaskException("fail to register batch job watcher", e);
         }