You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by GitBox <gi...@apache.org> on 2022/07/12 04:14:37 UTC

[GitHub] [dolphinscheduler] caishunfeng commented on a diff in pull request #10886: [Fix-10827] Fix network error cause worker cannot send message to master

caishunfeng commented on code in PR #10886:
URL: https://github.com/apache/dolphinscheduler/pull/10886#discussion_r918524582


##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/processor/TaskExecuteRunningProcessor.java:
##########
@@ -54,10 +54,10 @@ public class TaskExecuteRunningProcessor implements NettyRequestProcessor {
     @Override
     public void process(Channel channel, Command command) {
         Preconditions.checkArgument(CommandType.TASK_EXECUTE_RUNNING == command.getType(), String.format("invalid command type : %s", command.getType()));
-        TaskExecuteRunningCommand taskExecuteRunningCommand = JSONUtils.parseObject(command.getBody(), TaskExecuteRunningCommand.class);
-        logger.info("taskExecuteRunningCommand: {}", taskExecuteRunningCommand);
+        TaskExecuteRunningCommand taskExecuteRunningMessage = JSONUtils.parseObject(command.getBody(), TaskExecuteRunningCommand.class);

Review Comment:
   nip: revert variable name 



##########
dolphinscheduler-master/src/test/java/org/apache/dolphinscheduler/server/master/processor/TaskAckProcessorTest.java:
##########
@@ -44,7 +44,7 @@ public class TaskAckProcessorTest {
     private TaskExecuteRunningProcessor taskExecuteRunningProcessor;
     private TaskEventService taskEventService;
     private ProcessService processService;
-    private TaskExecuteRunningCommand taskExecuteRunningCommand;
+    private TaskExecuteRunningCommand taskExecuteRunningMessage;

Review Comment:
   same here



##########
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/processor/TaskCallbackService.java:
##########
@@ -76,201 +59,201 @@ public TaskCallbackService() {
         final NettyClientConfig clientConfig = new NettyClientConfig();
         this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
         this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RUNNING_ACK, taskExecuteRunningProcessor);
-        this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE_ACK, taskExecuteResponseAckProcessor);
-    }
-
-    /**
-     * add callback channel
-     *
-     * @param taskInstanceId taskInstanceId
-     * @param channel channel
-     */
-    public void addRemoteChannel(int taskInstanceId, NettyRemoteChannel channel) {
-        REMOTE_CHANNELS.put(taskInstanceId, channel);
-    }
-
-    /**
-     * change remote channel
-     */
-    public void changeRemoteChannel(int taskInstanceId, NettyRemoteChannel channel) {
-        REMOTE_CHANNELS.put(taskInstanceId, channel);
-    }
-
-    /**
-     * get callback channel
-     *
-     * @param taskInstanceId taskInstanceId
-     * @return callback channel
-     */
-    private Optional<NettyRemoteChannel> getRemoteChannel(int taskInstanceId) {
-        Channel newChannel;
-        NettyRemoteChannel nettyRemoteChannel = REMOTE_CHANNELS.get(taskInstanceId);
-        if (nettyRemoteChannel != null) {
-            if (nettyRemoteChannel.isActive()) {
-                return Optional.of(nettyRemoteChannel);
-            }
-            newChannel = nettyRemotingClient.getChannel(nettyRemoteChannel.getHost());
-            if (newChannel != null) {
-                return Optional.of(getRemoteChannel(newChannel, nettyRemoteChannel.getOpaque(), taskInstanceId));
-            }
-        }
-        return Optional.empty();
-    }
-
-    public long pause(int ntries) {
-        return SLEEP_TIME_MILLIS * RETRY_BACKOFF[ntries % RETRY_BACKOFF.length];
-    }
-
-    private NettyRemoteChannel getRemoteChannel(Channel newChannel, long opaque, int taskInstanceId) {
-        NettyRemoteChannel remoteChannel = new NettyRemoteChannel(newChannel, opaque);
-        addRemoteChannel(taskInstanceId, remoteChannel);
-        return remoteChannel;
-    }
-
-    private NettyRemoteChannel getRemoteChannel(Channel newChannel, int taskInstanceId) {
-        NettyRemoteChannel remoteChannel = new NettyRemoteChannel(newChannel);
-        addRemoteChannel(taskInstanceId, remoteChannel);
-        return remoteChannel;
+        this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESULT_ACK, taskExecuteResultAckProcessor);
     }
 
-    /**
-     * remove callback channels
-     *
-     * @param taskInstanceId taskInstanceId
-     */
-    public static void remove(int taskInstanceId) {
-        REMOTE_CHANNELS.remove(taskInstanceId);
-    }
-
-    /**
-     * send result
-     *
-     * @param taskInstanceId taskInstanceId
-     * @param command command
-     */
-    public void send(int taskInstanceId, Command command) {
-        Optional<NettyRemoteChannel> nettyRemoteChannel = getRemoteChannel(taskInstanceId);
-        if (nettyRemoteChannel.isPresent()) {
-            nettyRemoteChannel.get().writeAndFlush(command).addListener(new ChannelFutureListener() {
-                @Override
-                public void operationComplete(ChannelFuture future) {
-                    if (!future.isSuccess()) {
-                        logger.error("Send callback command error, taskInstanceId: {}, command: {}", taskInstanceId, command);
-                    }
-                }
-            });
-        } else {
-            logger.warn("Remote channel of taskInstanceId is null: {}, cannot send command: {}", taskInstanceId, command);
-        }
-    }
-
-    /**
-     * build task execute running command
-     *
-     * @param taskExecutionContext taskExecutionContext
-     * @return TaskExecuteAckCommand
-     */
-    private TaskExecuteRunningCommand buildTaskExecuteRunningCommand(TaskExecutionContext taskExecutionContext) {
-        TaskExecuteRunningCommand command = new TaskExecuteRunningCommand();
-        command.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
-        command.setProcessInstanceId(taskExecutionContext.getProcessInstanceId());
-        command.setStatus(taskExecutionContext.getCurrentExecutionStatus().getCode());
-        command.setLogPath(taskExecutionContext.getLogPath());
-        command.setHost(taskExecutionContext.getHost());
-        command.setStartTime(taskExecutionContext.getStartTime());
-        command.setExecutePath(taskExecutionContext.getExecutePath());
-        return command;
-    }
-
-    /**
-     * build task execute response command
-     *
-     * @param taskExecutionContext taskExecutionContext
-     * @return TaskExecuteResponseCommand
-     */
-    private TaskExecuteResponseCommand buildTaskExecuteResponseCommand(TaskExecutionContext taskExecutionContext) {
-        TaskExecuteResponseCommand command = new TaskExecuteResponseCommand();
-        command.setProcessInstanceId(taskExecutionContext.getProcessInstanceId());
-        command.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
-        command.setStatus(taskExecutionContext.getCurrentExecutionStatus().getCode());
-        command.setLogPath(taskExecutionContext.getLogPath());
-        command.setExecutePath(taskExecutionContext.getExecutePath());
-        command.setAppIds(taskExecutionContext.getAppIds());
-        command.setProcessId(taskExecutionContext.getProcessId());
-        command.setHost(taskExecutionContext.getHost());
-        command.setStartTime(taskExecutionContext.getStartTime());
-        command.setEndTime(taskExecutionContext.getEndTime());
-        command.setVarPool(taskExecutionContext.getVarPool());
-        command.setExecutePath(taskExecutionContext.getExecutePath());
-        return command;
-    }
-
-    /**
-     * build TaskKillResponseCommand
-     *
-     * @param taskExecutionContext taskExecutionContext
-     * @return build TaskKillResponseCommand
-     */
-    private TaskKillResponseCommand buildKillTaskResponseCommand(TaskExecutionContext taskExecutionContext) {
-        TaskKillResponseCommand taskKillResponseCommand = new TaskKillResponseCommand();
-        taskKillResponseCommand.setStatus(taskExecutionContext.getCurrentExecutionStatus().getCode());
-        taskKillResponseCommand.setAppIds(Arrays.asList(taskExecutionContext.getAppIds().split(TaskConstants.COMMA)));
-        taskKillResponseCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
-        taskKillResponseCommand.setHost(taskExecutionContext.getHost());
-        taskKillResponseCommand.setProcessId(taskExecutionContext.getProcessId());
-        return taskKillResponseCommand;
-    }
-
-    private TaskRecallCommand buildRecallCommand(TaskExecutionContext taskExecutionContext) {
-        TaskRecallCommand taskRecallCommand = new TaskRecallCommand();
-        taskRecallCommand.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
-        taskRecallCommand.setProcessInstanceId(taskExecutionContext.getProcessInstanceId());
-        taskRecallCommand.setHost(taskExecutionContext.getHost());
-        return taskRecallCommand;
-    }
-
-    /**
-     * send task execute running command
-     * todo unified callback command
-     */
-    public void sendTaskExecuteRunningCommand(TaskExecutionContext taskExecutionContext) {
-        TaskExecuteRunningCommand command = buildTaskExecuteRunningCommand(taskExecutionContext);
-        // add response cache
-        ResponseCache.get().cache(taskExecutionContext.getTaskInstanceId(), command.convert2Command(), TaskEventType.RUNNING);
-        send(taskExecutionContext.getTaskInstanceId(), command.convert2Command());
-    }
-
-    /**
-     * send task execute delay command
-     * todo unified callback command
-     */
-    public void sendTaskExecuteDelayCommand(TaskExecutionContext taskExecutionContext) {
-        TaskExecuteRunningCommand command = buildTaskExecuteRunningCommand(taskExecutionContext);
-        send(taskExecutionContext.getTaskInstanceId(), command.convert2Command());
-    }
-
-    /**
-     * send task execute response command
-     * todo unified callback command
-     */
-    public void sendTaskExecuteResponseCommand(TaskExecutionContext taskExecutionContext) {
-        TaskExecuteResponseCommand command = buildTaskExecuteResponseCommand(taskExecutionContext);
-        // add response cache
-        ResponseCache.get().cache(taskExecutionContext.getTaskInstanceId(), command.convert2Command(), TaskEventType.RESULT);
-        send(taskExecutionContext.getTaskInstanceId(), command.convert2Command());
-    }
-
-    public void sendTaskKillResponseCommand(TaskExecutionContext taskExecutionContext) {
-        TaskKillResponseCommand taskKillResponseCommand = buildKillTaskResponseCommand(taskExecutionContext);
-        send(taskExecutionContext.getTaskInstanceId(), taskKillResponseCommand.convert2Command());
-    }
-
-    /**
-     * send task execute response command
-     */
-    public void sendRecallCommand(TaskExecutionContext taskExecutionContext) {
-        TaskRecallCommand taskRecallCommand = buildRecallCommand(taskExecutionContext);
-        ResponseCache.get().cache(taskExecutionContext.getTaskInstanceId(), taskRecallCommand.convert2Command(), TaskEventType.WORKER_REJECT);
-        send(taskExecutionContext.getTaskInstanceId(), taskRecallCommand.convert2Command());
-    }
+    //    /**

Review Comment:
   maybe it should be removed.



##########
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/MessageSender.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.message;
+
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.remote.command.BaseCommand;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
+
+public interface MessageSender<T extends BaseCommand> {

Review Comment:
   :+1: 



##########
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerMessageSender.java:
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.dolphinscheduler.server.worker.rpc;
+
+import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
+import org.apache.dolphinscheduler.remote.command.BaseCommand;
+import org.apache.dolphinscheduler.remote.command.CommandType;
+import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
+import org.apache.dolphinscheduler.server.worker.message.MessageRetryRunner;
+import org.apache.dolphinscheduler.server.worker.message.MessageSender;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.annotation.PostConstruct;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.ApplicationContext;
+import org.springframework.stereotype.Component;
+
+import lombok.NonNull;
+
+@Component
+public class WorkerMessageSender {
+
+    private final Logger logger = LoggerFactory.getLogger(WorkerMessageSender.class);
+
+    @Autowired
+    private MessageRetryRunner messageRetryRunner;
+
+    @Autowired
+    private ApplicationContext applicationContext;
+
+    private Map<CommandType, MessageSender> messageSenderMap = new HashMap<>();
+
+    @PostConstruct
+    public void init() {
+        Map<String, MessageSender> messageSenders = applicationContext.getBeansOfType(MessageSender.class);
+        messageSenders.values().forEach(messageSender -> messageSenderMap.put(messageSender.getMessageType(),
+                                                                              messageSender));
+    }
+
+    // todo: use message rather than context
+    public void sendMessageNeedAck(@NonNull TaskExecutionContext taskExecutionContext,

Review Comment:
   ```suggestion
       public void sendMessageWithRetry(@NonNull TaskExecutionContext taskExecutionContext,
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org