You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2022/07/19 09:52:27 UTC

[GitHub] [rocketmq-connect] sunxiaojian commented on a diff in pull request #201: [ISSUE #183] Optimize worker task

sunxiaojian commented on code in PR #201:
URL: https://github.com/apache/rocketmq-connect/pull/201#discussion_r924298054


##########
rocketmq-connect-runtime/src/main/java/org/apache/rocketmq/connect/runtime/connectorwrapper/Worker.java:
##########
@@ -326,83 +312,245 @@ public void maintainConnectorState() {
 
     }
 
+    /**
+     * maintain task state
+     *
+     * @throws Exception
+     */
     public void maintainTaskState() throws Exception {
-
-        Map<String, List<ConnectKeyValue>> taskConfigs = new HashMap<>();
+        Map<String, List<ConnectKeyValue>> connectorConfig = new HashMap<>();
         synchronized (latestTaskConfigs) {
-            taskConfigs.putAll(latestTaskConfigs);
+            connectorConfig.putAll(latestTaskConfigs);
         }
 
-        boolean needCommitPosition = false;
+        //  STEP 1: check running tasks and put to error status
+        checkRunningTasks(connectorConfig);
+
+        // get new Tasks
+        Map<String, List<ConnectKeyValue>> newTasks = newTasks(connectorConfig);
+
+        //  STEP 2: try to create new tasks
+        startTask(newTasks);
+
+        //  STEP 3: check all pending state
+        checkPendingTask();
+
+        //  STEP 4 check stopping tasks
+        checkStoppingTasks();
+
+        //  STEP 5 check error tasks
+        checkErrorTasks();
+
+        //  STEP 6 check errorTasks and stopped tasks
+        checkStoppedTasks();
+    }
+
+    /**
+     * check running task
+     *
+     * @param connectorConfig
+     */
+    private void checkRunningTasks(Map<String, List<ConnectKeyValue>> connectorConfig) {
         //  STEP 1: check running tasks and put to error status
         for (Runnable runnable : runningTasks) {
             WorkerTask workerTask = (WorkerTask) runnable;
-            String connectorName = workerTask.getConnectorName();
-            ConnectKeyValue taskConfig = workerTask.getTaskConfig();
-            List<ConnectKeyValue> keyValues = taskConfigs.get(connectorName);
+            String connectorName = workerTask.id().connector();
+            ConnectKeyValue taskConfig = workerTask.currentTaskConfig();
+            List<ConnectKeyValue> taskConfigs = connectorConfig.get(connectorName);
             WorkerTaskState state = ((WorkerTask) runnable).getState();
-
-            if (WorkerTaskState.ERROR == state) {
-                errorTasks.add(runnable);
-                runningTasks.remove(runnable);
-            } else if (WorkerTaskState.RUNNING == state) {
-                boolean needStop = true;
-                if (null != keyValues && keyValues.size() > 0) {
-                    for (ConnectKeyValue keyValue : keyValues) {
-                        if (keyValue.equals(taskConfig)) {
-                            needStop = false;
-                            break;
+            switch (state) {
+                case ERROR:
+                    errorTasks.add(runnable);
+                    runningTasks.remove(runnable);
+                    break;
+                case RUNNING:
+                    if (isNeedStop(taskConfig, taskConfigs)) {
+                        try {
+                            // remove committer offset
+                            sourceTaskOffsetCommitter.ifPresent(commiter -> commiter.remove(workerTask.id()));
+                            workerTask.doClose();
+                        } catch (Exception e) {
+                            log.error("workerTask stop exception, workerTask: " + workerTask.currentTaskConfig(), e);
                         }
+                        log.info("Task stopping, connector name {}, config {}", workerTask.id().connector(), workerTask.currentTaskConfig());
+                        runningTasks.remove(runnable);
+                        stoppingTasks.put(runnable, System.currentTimeMillis());
                     }
+                    break;
+                default:
+                    log.error("[BUG] Illegal State in when checking running tasks, {} is in {} state",
+                            ((WorkerTask) runnable).id().connector(), state);
+                    break;
+            }
+        }
+    }
+
+    /**
+     * check is need stop
+     *
+     * @param taskConfig
+     * @param keyValues
+     * @return
+     */
+    private boolean isNeedStop(ConnectKeyValue taskConfig, List<ConnectKeyValue> keyValues) {
+        if (CollectionUtils.isEmpty(keyValues)) {
+            return true;
+        }
+        for (ConnectKeyValue keyValue : keyValues) {
+            if (keyValue.equals(taskConfig)) {
+                // not stop
+                return false;
+            }
+        }
+        return true;
+    }
+
+    /**
+     * check stopped tasks
+     */
+    private void checkStoppedTasks() {
+        for (Runnable runnable : stoppedTasks) {
+            WorkerTask workerTask = (WorkerTask) runnable;
+            workerTask.cleanup();
+            Future future = taskToFutureMap.get(runnable);
+            try {
+                if (null != future) {
+                    future.get(workerConfig.getMaxStartTimeoutMills(), TimeUnit.MILLISECONDS);
+                } else {
+                    log.error("[BUG] stopped Tasks reference not found in taskFutureMap");
                 }
+            } catch (ExecutionException e) {
+                Throwable t = e.getCause();
+                log.info("[BUG] Stopped Tasks should not throw any exception");
+                t.printStackTrace();
+            } catch (CancellationException e) {
+                log.info("[BUG] Stopped Tasks throws PrintStackTrace");
+                e.printStackTrace();
+            } catch (TimeoutException e) {
+                log.info("[BUG] Stopped Tasks should not throw any exception");
+                e.printStackTrace();
+            } catch (InterruptedException e) {
+                log.info("[BUG] Stopped Tasks should not throw any exception");
+                e.printStackTrace();
+            } finally {
+                // remove committer offset
+                sourceTaskOffsetCommitter.ifPresent(commiter -> commiter.remove(workerTask.id()));
+                future.cancel(true);
+                taskToFutureMap.remove(runnable);
+                stoppedTasks.remove(runnable);
+                cleanedStoppedTasks.add(runnable);
+            }
+        }
+    }
 
-                if (needStop) {
-                    try {
-                        workerTask.stop();
-                    } catch (Exception e) {
-                        log.error("workerTask stop exception, workerTask: " + workerTask.getTaskConfig(), e);
-                    }
-                    log.info("Task stopping, connector name {}, config {}", workerTask.getConnectorName(), workerTask.getTaskConfig());
-                    runningTasks.remove(runnable);
-                    stoppingTasks.put(runnable, System.currentTimeMillis());
-                    needCommitPosition = true;
+    private void checkErrorTasks() {
+        for (Runnable runnable : errorTasks) {
+            WorkerTask workerTask = (WorkerTask) runnable;
+            Future future = taskToFutureMap.get(runnable);
+            try {
+                if (null != future) {
+                    future.get(workerConfig.getMaxStopTimeoutMills(), TimeUnit.MILLISECONDS);
+                } else {
+                    log.error("[BUG] errorTasks reference not found in taskFutureMap");
                 }
-            } else {
-                log.error("[BUG] Illegal State in when checking running tasks, {} is in {} state",
-                    ((WorkerTask) runnable).getConnectorName(), state.toString());
+            } catch (ExecutionException e) {
+                log.error("Execution exception , {}", e);
+            } catch (CancellationException | TimeoutException | InterruptedException e) {
+                log.error("error, {}", e);
+            } finally {
+                // remove committer offset
+                sourceTaskOffsetCommitter.ifPresent(commiter -> commiter.remove(workerTask.id()));
+
+                future.cancel(true);
+                workerTask.cleanup();
+                taskToFutureMap.remove(runnable);
+                errorTasks.remove(runnable);
+                cleanedErrorTasks.add(runnable);
             }
         }
+    }
 
-        //If some tasks are closed, synchronize the position.
-        if (needCommitPosition) {
-            taskPositionCommitService.commitTaskPosition();
+    private void checkStoppingTasks() {
+        for (Map.Entry<Runnable, Long> entry : stoppingTasks.entrySet()) {
+            Runnable runnable = entry.getKey();
+            Long stopTimestamp = entry.getValue();
+            Long currentTimeMillis = System.currentTimeMillis();
+            Future future = taskToFutureMap.get(runnable);
+            WorkerTaskState state = ((WorkerTask) runnable).getState();
+            // exited normally
+            switch (state) {
+                case STOPPED:
+                    // concurrent modification Exception ? Will it pop that in the
+                    if (null == future || !future.isDone()) {
+                        log.error("[BUG] future is null or Stopped task should have its Future.isDone() true, but false");
+                    }
+                    stoppingTasks.remove(runnable);
+                    stoppedTasks.add(runnable);
+                    break;
+                case ERROR:
+                    stoppingTasks.remove(runnable);
+                    errorTasks.add(runnable);
+                    break;
+                case STOPPING:
+                    if (currentTimeMillis - stopTimestamp > workerConfig.getMaxStopTimeoutMills()) {
+                        ((WorkerTask) runnable).timeout();
+                        stoppingTasks.remove(runnable);
+                        errorTasks.add(runnable);
+                    }
+                    break;
+                default:
+                    log.error("[BUG] Illegal State in when checking stopping tasks, {} is in {} state",
+                            ((WorkerTask) runnable).id().connector(), state.toString());
+            }
         }
+    }
 
-        // get new Tasks
-        Map<String, List<ConnectKeyValue>> newTasks = new HashMap<>();
-        for (String connectorName : taskConfigs.keySet()) {
-            for (ConnectKeyValue keyValue : taskConfigs.get(connectorName)) {
-                boolean isNewTask = true;
-                if (isConfigInSet(keyValue, runningTasks) || isConfigInSet(keyValue, pendingTasks.keySet()) || isConfigInSet(keyValue, errorTasks)) {
-                    isNewTask = false;
-                }
-                if (isNewTask) {
-                    if (!newTasks.containsKey(connectorName)) {
-                        newTasks.put(connectorName, new ArrayList<>());
+    private void checkPendingTask() {
+        for (Map.Entry<Runnable, Long> entry : pendingTasks.entrySet()) {
+            Runnable runnable = entry.getKey();
+            Long startTimestamp = entry.getValue();
+            Long currentTimeMillis = System.currentTimeMillis();
+            WorkerTaskState state = ((WorkerTask) runnable).getState();
+            switch (state) {
+                case ERROR:
+                    errorTasks.add(runnable);
+                    pendingTasks.remove(runnable);
+                    break;
+                case RUNNING:
+                    runningTasks.add(runnable);
+                    pendingTasks.remove(runnable);
+                    break;
+                case NEW:
+                    log.info("[RACE CONDITION] we checked the pending tasks before state turns to PENDING");
+                    break;
+                case PENDING:
+                    if (currentTimeMillis - startTimestamp > workerConfig.getMaxStartTimeoutMills()) {
+                        ((WorkerTask) runnable).timeout();
+                        pendingTasks.remove(runnable);
+                        errorTasks.add(runnable);
                     }
-                    log.info("Add new tasks,connector name {}, config {}", connectorName, keyValue);
-                    newTasks.get(connectorName).add(keyValue);
-                }
+                default:
+                    log.error("[BUG] Illegal State in when checking pending tasks, {} is in {} state",
+                            ((WorkerTask) runnable).id().connector(), state.toString());
+                    break;
             }
         }
+    }
 
-        //  STEP 2: try to create new tasks
-        int taskId = 0;
+    /**
+     * start task
+     *
+     * @param newTasks
+     * @throws Exception
+     */
+    private void startTask(Map<String, List<ConnectKeyValue>> newTasks) throws Exception {
         for (String connectorName : newTasks.keySet()) {
+            AtomicInteger taskId = new AtomicInteger(0);

Review Comment:
   > Whether the same task is assigned to different worker nodes with different connectorTaskId
   
   Repair as soon as possible
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@rocketmq.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org