You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by GitBox <gi...@apache.org> on 2022/12/24 04:35:47 UTC

[GitHub] [dolphinscheduler] ruanwenjun commented on a diff in pull request #13250: Feature better failover

ruanwenjun commented on code in PR #13250:
URL: https://github.com/apache/dolphinscheduler/pull/13250#discussion_r1056727269


##########
dolphinscheduler-dao/src/main/java/org/apache/dolphinscheduler/dao/entity/TaskInstance.java:
##########
@@ -398,9 +398,6 @@ public boolean taskCanRetry() {
         if (this.isSubProcess()) {
             return false;
         }
-        if (this.getState() == TaskExecutionStatus.NEED_FAULT_TOLERANCE) {
-            return true;
-        }

Review Comment:
   Why you remove this, if you remove this the failover task instance will not create a new task instance. When we do failover, some task information will be override if we don't create a new one.



##########
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/utils/ProcessUtils.java:
##########
@@ -186,6 +188,42 @@ public static String getPidsStr(int processId) throws Exception {
         return String.join(" ", pidList).trim();
     }
 
+    /**
+     * get remote host pids str
+     * @param host
+     * @param processId
+     * @return
+     * @throws Exception
+     */
+    public static String getHostPidsStr(String host, int processId) throws Exception {

Review Comment:
   Why we need to add this method?



##########
dolphinscheduler-service/src/main/java/org/apache/dolphinscheduler/service/storage/impl/HadoopUtils.java:
##########
@@ -650,6 +652,42 @@ public TaskExecutionStatus getApplicationStatus(String applicationId) throws Bas
         return getExecutionStatus(result);
     }
 
+    public TaskExecutionStatus waitApplicationAccepted(String applicationId) throws BaseException {
+        if (StringUtils.isEmpty(applicationId)) {
+            return null;
+        }
+
+        String result;
+        String applicationUrl = getApplicationUrl(applicationId);
+        logger.debug("generate yarn application url, applicationUrl={}", applicationUrl);
+        long startTime = System.currentTimeMillis();
+        while (System.currentTimeMillis() - startTime < 60 * 1000) {
+            String responseContent = Boolean.TRUE
+                    .equals(PropertyUtils.getBoolean(Constants.HADOOP_SECURITY_AUTHENTICATION_STARTUP_STATE, false))
+                            ? KerberosHttpClient.get(applicationUrl)
+                            : HttpUtils.get(applicationUrl);
+            if (responseContent != null) {
+                ObjectNode jsonObject = JSONUtils.parseObject(responseContent);

Review Comment:
   Please don't use `ObjectNode`, this is not a good practice.



##########
dolphinscheduler-task-plugin/dolphinscheduler-task-dms/src/main/java/org/apache/dolphinscheduler/plugin/task/dms/DmsTask.java:
##########
@@ -77,8 +77,8 @@ public void init() throws TaskException {
     }
 
     @Override
-    public List<String> getApplicationIds() throws TaskException {
-        return Collections.emptyList();
+    public Set<String> getApplicationIds() throws TaskException {

Review Comment:
   You don't need to do this change here, the return result should be distinct by task.



##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/builder/TaskExecutionContextBuilder.java:
##########
@@ -79,6 +79,7 @@ public TaskExecutionContextBuilder buildTaskInstanceRelatedInfo(TaskInstance tas
         taskExecutionContext.setCpuQuota(taskInstance.getCpuQuota());
         taskExecutionContext.setMemoryMax(taskInstance.getMemoryMax());
         taskExecutionContext.setAppIds(taskInstance.getAppLink());
+        taskExecutionContext.setProcessId(taskInstance.getPid());

Review Comment:
   It's not a good practice to set process id here, when the worker failover the pid is useless, when master failover, the pid can get from worker by task instance id.



##########
dolphinscheduler-remote/src/main/java/org/apache/dolphinscheduler/remote/command/HostUpdateResponseCommand.java:
##########
@@ -72,6 +75,14 @@ public Command convert2Command() {
         return command;
     }
 
+    public Command convert2ResponseCommand(long opaque) {

Review Comment:
   Could you please split this with request/response? don't use one class to represent request and response.



##########
dolphinscheduler-master/src/main/java/org/apache/dolphinscheduler/server/master/runner/WorkflowExecuteRunnable.java:
##########
@@ -959,9 +966,15 @@ private Optional<TaskInstance> submitTaskExec(TaskInstance taskInstance) {
             ITaskProcessor taskProcessor = TaskProcessorFactory.getTaskProcessor(taskInstance.getTaskType());
             taskProcessor.init(taskInstance, processInstance);
 
-            if (taskInstance.getState().isRunning()
+            // rebuild channel when master crashes
+            if (taskInstance.getState().isNeedFaultTolerance()

Review Comment:
   It's better to use other flag(When we create a failover workflow we can know this) to judge if the current workflow is execute from master crash, rather than use isNeedFaultTolerance.



##########
dolphinscheduler-worker/src/main/java/org/apache/dolphinscheduler/server/worker/runner/WorkerTaskExecuteRunnable.java:
##########
@@ -110,9 +118,24 @@ protected void afterExecute() throws TaskException {
         if (task == null) {
             throw new TaskException("The current task instance is null");
         }
+        TaskExecutionStatus taskExecutionStatus = task.getExitStatus();
+
+        if (task.getExitStatus() == TaskExecutionStatus.SUCCESS && StringUtils.isNotEmpty(task.getAppIds())) {
+            // monitor task submitted before
+            logger.info("monitor task by appId {}, maybe has process id {}", task.getAppIds(), task.getProcessId());
+
+            taskExecutionStatus = waitApplicationEnd(task.getAppIds());
+        } else if (task.getExitStatus() == TaskExecutionStatus.SUCCESS && task.getProcessId() > 0) {
+            // monitor task by process id
+            logger.info("monitor task by process id {}, maybe has appId {}", task.getProcessId(), task.getAppIds());
+
+            taskExecutionStatus = waitProcessEnd(task.getProcess());
+
+        }

Review Comment:
   Why we need to wait status here? right now all task is sync task, the process should already exist here. And the remote status should be generate by task plugin rathere than this runnable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@dolphinscheduler.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org