You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by we...@apache.org on 2022/07/19 04:30:45 UTC
[dolphinscheduler] branch 3.0.0-prepare updated: Fix compile error
This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch 3.0.0-prepare
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git
The following commit(s) were added to refs/heads/3.0.0-prepare by this push:
new 5a2ea0b76b Fix compile error
5a2ea0b76b is described below
commit 5a2ea0b76bb9db0297881fbbb425e4ce3e819e8a
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Tue Jul 19 12:30:26 2022 +0800
Fix compile error
---
.../dao/utils/TaskInstanceUtils.java | 2 -
.../event/TaskRejectByWorkerEventHandler.java | 84 ----------------------
.../remote/command/TaskRejectAckCommand.java | 59 ---------------
.../remote/command/TaskRejectCommand.java | 0
.../worker/message/TaskRejectMessageSender.java | 59 ---------------
.../server/worker/rpc/WorkerRpcServer.java | 5 --
6 files changed, 209 deletions(-)
diff --git a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/TaskInstanceUtils.java b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/TaskInstanceUtils.java
index fa3bfec0ca..7351f972b4 100644
--- a/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/TaskInstanceUtils.java
+++ b/dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/utils/TaskInstanceUtils.java
@@ -72,8 +72,6 @@ public class TaskInstanceUtils {
target.setDelayTime(source.getDelayTime());
target.setDryRun(source.getDryRun());
target.setTaskGroupId(source.getTaskGroupId());
- target.setCpuQuota(source.getCpuQuota());
- target.setMemoryMax(source.getMemoryMax());
}
}
diff --git a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRejectByWorkerEventHandler.java b/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRejectByWorkerEventHandler.java
deleted file mode 100644
index ac4dab50f2..0000000000
--- a/dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/event/TaskRejectByWorkerEventHandler.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.master.event;
-
-import org.apache.dolphinscheduler.common.enums.TaskEventType;
-import org.apache.dolphinscheduler.dao.entity.TaskInstance;
-import org.apache.dolphinscheduler.plugin.task.api.enums.ExecutionStatus;
-import org.apache.dolphinscheduler.remote.command.TaskRejectAckCommand;
-import org.apache.dolphinscheduler.server.master.cache.ProcessInstanceExecCacheManager;
-import org.apache.dolphinscheduler.server.master.config.MasterConfig;
-import org.apache.dolphinscheduler.server.master.processor.queue.TaskEvent;
-import org.apache.dolphinscheduler.server.master.runner.WorkflowExecuteRunnable;
-
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-@Component
-public class TaskRejectByWorkerEventHandler implements TaskEventHandler {
-
- @Autowired
- private ProcessInstanceExecCacheManager processInstanceExecCacheManager;
-
- @Autowired
- private MasterConfig masterConfig;
-
- @Override
- public void handleTaskEvent(TaskEvent taskEvent) throws TaskEventHandleError {
- int taskInstanceId = taskEvent.getTaskInstanceId();
- int processInstanceId = taskEvent.getProcessInstanceId();
-
- WorkflowExecuteRunnable workflowExecuteRunnable = this.processInstanceExecCacheManager.getByProcessInstanceId(
- processInstanceId);
- if (workflowExecuteRunnable == null) {
- sendAckToWorker(taskEvent);
- throw new TaskEventHandleError(
- "Handle task reject event error, cannot find related workflow instance from cache, will discard this event");
- }
- TaskInstance taskInstance = workflowExecuteRunnable.getTaskInstance(taskInstanceId).orElseThrow(() -> {
- sendAckToWorker(taskEvent);
- return new TaskEventHandleError(
- "Handle task reject event error, cannot find the taskInstance from cache, will discord this event");
- });
- try {
- // todo: If the worker submit multiple reject response to master, the task instance may be dispatch multiple,
- // we need to control the worker overload by master rather than worker
- // if the task resubmit and the worker failover, this task may be dispatch twice?
- // todo: we need to clear the taskInstance host and rollback the status to submit.
- workflowExecuteRunnable.resubmit(taskInstance.getTaskCode());
- sendAckToWorker(taskEvent);
- } catch (Exception ex) {
- throw new TaskEventHandleError("Handle task reject event error", ex);
- }
-
- }
-
- public void sendAckToWorker(TaskEvent taskEvent) {
- TaskRejectAckCommand taskRejectAckMessage = new TaskRejectAckCommand(ExecutionStatus.SUCCESS.getCode(),
- taskEvent.getTaskInstanceId(),
- masterConfig.getMasterAddress(),
- taskEvent.getWorkerAddress(),
- System.currentTimeMillis());
- taskEvent.getChannel().writeAndFlush(taskRejectAckMessage.convert2Command());
- }
-
- @Override
- public TaskEventType getHandleEventType() {
- return TaskEventType.WORKER_REJECT;
- }
-}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRejectAckCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRejectAckCommand.java
deleted file mode 100644
index aed647bb4c..0000000000
--- a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRejectAckCommand.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.remote.command;
-
-import org.apache.dolphinscheduler.common.utils.JSONUtils;
-
-import lombok.Data;
-import lombok.EqualsAndHashCode;
-import lombok.NoArgsConstructor;
-import lombok.ToString;
-
-@Data
-@NoArgsConstructor
-@ToString(callSuper = true)
-@EqualsAndHashCode(callSuper = true)
-public class TaskRejectAckCommand extends BaseCommand {
-
- private int taskInstanceId;
- private int status;
-
- public TaskRejectAckCommand(int status,
- int taskInstanceId,
- String messageSenderAddress,
- String messageReceiverAddress,
- long messageSendTime) {
- super(messageSenderAddress, messageReceiverAddress, messageSendTime);
- this.status = status;
- this.taskInstanceId = taskInstanceId;
- }
-
- /**
- * package response command
- *
- * @return command
- */
- public Command convert2Command() {
- Command command = new Command();
- command.setType(CommandType.TASK_REJECT_ACK);
- byte[] body = JSONUtils.toJsonByteArray(this);
- command.setBody(body);
- return command;
- }
-
-}
diff --git a/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRejectCommand.java b/dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/TaskRejectCommand.java
deleted file mode 100644
index e69de29bb2..0000000000
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskRejectMessageSender.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskRejectMessageSender.java
deleted file mode 100644
index d50c5d8997..0000000000
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/message/TaskRejectMessageSender.java
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.dolphinscheduler.server.worker.message;
-
-import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
-import org.apache.dolphinscheduler.remote.command.CommandType;
-import org.apache.dolphinscheduler.remote.command.TaskRejectCommand;
-import org.apache.dolphinscheduler.remote.exceptions.RemotingException;
-import org.apache.dolphinscheduler.remote.utils.Host;
-import org.apache.dolphinscheduler.server.worker.config.WorkerConfig;
-import org.apache.dolphinscheduler.server.worker.rpc.WorkerRpcClient;
-
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.stereotype.Component;
-
-@Component
-public class TaskRejectMessageSender implements MessageSender<TaskRejectCommand> {
-
- @Autowired
- private WorkerRpcClient workerRpcClient;
-
- @Autowired
- private WorkerConfig workerConfig;
-
- @Override
- public void sendMessage(TaskRejectCommand message) throws RemotingException {
- workerRpcClient.send(Host.of(message.getMessageReceiverAddress()), message.convert2Command());
- }
-
- public TaskRejectCommand buildMessage(TaskExecutionContext taskExecutionContext, String masterAddress) {
- TaskRejectCommand taskRejectMessage = new TaskRejectCommand(workerConfig.getWorkerAddress(),
- masterAddress,
- System.currentTimeMillis());
- taskRejectMessage.setTaskInstanceId(taskExecutionContext.getTaskInstanceId());
- taskRejectMessage.setProcessInstanceId(taskExecutionContext.getProcessInstanceId());
- taskRejectMessage.setHost(taskExecutionContext.getHost());
- return taskRejectMessage;
- }
-
- @Override
- public CommandType getMessageType() {
- return CommandType.TASK_REJECT;
- }
-}
diff --git a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java
index e599046c57..8fcf6966f8 100644
--- a/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java
+++ b/dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/rpc/WorkerRpcServer.java
@@ -27,7 +27,6 @@ import org.apache.dolphinscheduler.server.worker.processor.TaskDispatchProcessor
import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteResultAckProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskExecuteRunningAckProcessor;
import org.apache.dolphinscheduler.server.worker.processor.TaskKillProcessor;
-import org.apache.dolphinscheduler.server.worker.processor.TaskRejectAckProcessor;
import java.io.Closeable;
@@ -47,9 +46,6 @@ public class WorkerRpcServer implements Closeable {
@Autowired
private TaskKillProcessor taskKillProcessor;
- @Autowired
- private TaskRejectAckProcessor taskRejectAckProcessor;
-
@Autowired
private TaskExecuteRunningAckProcessor taskExecuteRunningAckProcessor;
@@ -76,7 +72,6 @@ public class WorkerRpcServer implements Closeable {
this.nettyRemotingServer.registerProcessor(CommandType.TASK_KILL_REQUEST, taskKillProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RUNNING_ACK, taskExecuteRunningAckProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.TASK_EXECUTE_RESULT_ACK, taskExecuteResultAckProcessor);
- this.nettyRemotingServer.registerProcessor(CommandType.TASK_REJECT_ACK, taskRejectAckProcessor);
this.nettyRemotingServer.registerProcessor(CommandType.PROCESS_HOST_UPDATE_REQUEST, hostUpdateProcessor);
// logger server
this.nettyRemotingServer.registerProcessor(CommandType.GET_LOG_BYTES_REQUEST, loggerRequestProcessor);