You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2022/07/19 04:30:45 UTC

[dolphinscheduler] branch 3.0.0-prepare updated: Fix compile error

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch 3.0.0-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git


The following commit(s) were added to refs/heads/3.0.0-prepare by this push:
     new 5a2ea0b76b Fix compile error
5a2ea0b76b is described below

commit 5a2ea0b76bb9db0297881fbbb425e4ce3e819e8a
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Tue Jul 19 12:30:26 2022 +0800

    Fix compile error
---
 .../dao/utils/TaskInstanceUtils.java               |  2 -
 .../event/TaskRejectByWorkerEventHandler.java      | 84 ----------------------
 .../remote/command/TaskRejectAckCommand.java       | 59 ---------------
 .../remote/command/TaskRejectCommand.java          |  0
 .../worker/message/TaskRejectMessageSender.java    | 59 ---------------
 .../server/worker/rpc/WorkerRpcServer.java         |  5 --
 6 files changed, 209 deletions(-)

diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/TaskInstanceUtils.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/TaskInstanceUtils.java
index fa3bfec0ca..7351f972b4 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/TaskInstanceUtils.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/TaskInstanceUtils.java
@@ -72,8 +72,6 @@ public class TaskInstanceUtils {
         target.setDelayTime(source.getDelayTime());
         target.setDryRun(source.getDryRun());
         target.setTaskGroupId(source.getTaskGroupId());
-        target.setCpuQuota(source.getCpuQuota());
-        target.setMemoryMax(source.getMemoryMax());
     }
 
 }
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRejectByWorkerEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRejectByWorkerEventHandler.java
deleted file mode 100644
index ac4dab50f2..0000000000
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRejectByWorkerEventHandler.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.master.event;
-
-import org.apache.dolphinscheduler.common.enums.TaskEventType;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
-import org.apache.dolphinscheduler.remote.command.TaskRejectAckCommand;
-import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
-import org.apache.dolphinscheduler.server.master.config.MasterConfig;
-import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
-import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
-
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-@Component
-public class TaskRejectByWorkerEventHandler implements TaskEventHandler {
-
-    @Autowired
-    private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
-
-    @Autowired
-    private MasterConfig masterConfig;
-
-    @Override
-    public void handleTaskEvent(TaskEvent taskEvent) throws TaskEventHandleError {
-        int taskInstanceId = taskEvent.getTaskInstanceId();
-        int processInstanceId = taskEvent.getProcessInstanceId();
-
-        WorkflowExecuteRunnable workflowExecuteRunnable = this.processInstanceExecCacheManager.getByProcessInstanceId(
-            processInstanceId);
-        if (workflowExecuteRunnable == null) {
-            sendAckToWorker(taskEvent);
-            throw new TaskEventHandleError(
-                "Handle task reject event error, cannot find related workflow instance from cache, will discard this event");
-        }
-        TaskInstance taskInstance = workflowExecuteRunnable.getTaskInstance(taskInstanceId).orElseThrow(() -> {
-            sendAckToWorker(taskEvent);
-            return new TaskEventHandleError(
-                "Handle task reject event error, cannot find the taskInstance from cache, will discord this event");
-        });
-        try {
-            // todo: If the worker submit multiple reject response to master, the task instance may be dispatch multiple,
-            // we need to control the worker overload by master rather than worker
-            // if the task resubmit and the worker failover, this task may be dispatch twice?
-            // todo: we need to clear the taskInstance host and rollback the status to submit.
-            workflowExecuteRunnable.resubmit(taskInstance.getTaskCode());
-            sendAckToWorker(taskEvent);
-        } catch (Exception ex) {
-            throw new TaskEventHandleError("Handle task reject event error", ex);
-        }
-
-    }
-
-    public void sendAckToWorker(TaskEvent taskEvent) {
-        TaskRejectAckCommand taskRejectAckMessage = new TaskRejectAckCommand(ExecutionStatus.SUCCESS.getCode(),
-                                                                             taskEvent.getTaskInstanceId(),
-                                                                             masterConfig.getMasterAddress(),
-                                                                             taskEvent.getWorkerAddress(),
-                                                                             System.currentTimeMillis());
-        taskEvent.getChannel().writeAndFlush(taskRejectAckMessage.convert2Command());
-    }
-
-    @Override
-    public TaskEventType getHandleEventType() {
-        return TaskEventType.WORKER_REJECT;
-    }
-}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRejectAckCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRejectAckCommand.java
deleted file mode 100644
index aed647bb4c..0000000000
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRejectAckCommand.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.remote.command;
-
-import org.apache.dolphinscheduler.common.utils.JSONUtils;
-
-import lombok.Data;
-import lombok.EqualsAndHashCode;
-import lombok.NoArgsConstructor;
-import lombok.ToString;
-
-@Data
-@NoArgsConstructor
-@ToString(callSuper = true)
-@EqualsAndHashCode(callSuper = true)
-public class TaskRejectAckCommand extends BaseCommand {
-
-    private int taskInstanceId;
-    private int status;
-
-    public TaskRejectAckCommand(int status,
-                                int taskInstanceId,
-                                String messageSenderAddress,
-                                String messageReceiverAddress,
-                                long messageSendTime) {
-        super(messageSenderAddress, messageReceiverAddress, messageSendTime);
-        this.status = status;
-        this.taskInstanceId = taskInstanceId;
-    }
-
-    /**
-     * package response command
-     *
-     * @return command
-     */
-    public Command convert2Command() {
-        Command command = new Command();
-        command.setType(CommandType.TASK_REJECT_ACK);
-        byte[] body = JSONUtils.toJsonByteArray(this);
-        command.setBody(body);
-        return command;
-    }
-
-}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRejectCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRejectCommand.java
deleted file mode 100644
index e69de29bb2..0000000000
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskRejectMessageSender.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskRejectMessageSender.java
deleted file mode 100644
index d50c5d8997..0000000000
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskRejectMessageSender.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.worker.message;
-
-import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import org.apache.dolphinscheduler.remote.command.CommandType;
-import org.apache.dolphinscheduler.remote.command.TaskRejectCommand;
-import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
-import org.apache.dolphinscheduler.remote.utils.Host;
-import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
-import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcClient;
-
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-@Component
-public class TaskRejectMessageSender implements MessageSender<TaskRejectCommand> {
-
-    @Autowired
-    private WorkerRpcClient workerRpcClient;
-
-    @Autowired
-    private WorkerConfig workerConfig;
-
-    @Override
-    public void sendMessage(TaskRejectCommand message) throws RemotingException {
-        workerRpcClient.send(Host.of(message.getMessageReceiverAddress()), message.convert2Command());
-    }
-
-    public TaskRejectCommand buildMessage(TaskExecutionContext taskExecutionContext, String masterAddress) {
-        TaskRejectCommand taskRejectMessage = new TaskRejectCommand(workerConfig.getWorkerAddress(),
-                                                                    masterAddress,
-                                                                    System.currentTimeMillis());
-        taskRejectMessage.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
-        taskRejectMessage.setProcessInstanceId(taskExecutionContext.getProcessInstanceId());
-        taskRejectMessage.setHost(taskExecutionContext.getHost());
-        return taskRejectMessage;
-    }
-
-    @Override
-    public CommandType getMessageType() {
-        return CommandType.TASK_REJECT;
-    }
-}
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java
index e599046c57..8fcf6966f8 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java
@@ -27,7 +27,6 @@ import org.apache.dolphinscheduler.server.worker.processor.TaskDispatchProcessor
 import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteResultAckProcessor;
 import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteRunningAckProcessor;
 import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor;
-import org.apache.dolphinscheduler.server.worker.processor.TaskRejectAckProcessor;
 
 import java.io.Closeable;
 
@@ -47,9 +46,6 @@ public class WorkerRpcServer implements Closeable {
     @Autowired
     private TaskKillProcessor taskKillProcessor;
 
-    @Autowired
-    private TaskRejectAckProcessor taskRejectAckProcessor;
-
     @Autowired
     private TaskExecuteRunningAckProcessor taskExecuteRunningAckProcessor;
 
@@ -76,7 +72,6 @@ public class WorkerRpcServer implements Closeable {
         this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, taskKillProcessor);
         this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING_ACK, taskExecuteRunningAckProcessor);
         this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESULT_ACK, taskExecuteResultAckProcessor);
-        this.nettyRemotingServer.registerProcessor(CommandType.TASK_REJECT_ACK, taskRejectAckProcessor);
         this.nettyRemotingServer.registerProcessor(CommandType.PROCESS_HOST_UPDATE_REQUEST, hostUpdateProcessor);
         // logger server
         this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, loggerRequestProcessor);