You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dolphinscheduler.apache.org by ki...@apache.org on 2021/07/19 08:28:24 UTC

[dolphinscheduler] branch 1.3.7-prepare_#5783 created (now a05ba81)

This is an automated email from the ASF dual-hosted git repository.

kirs pushed a change to branch 1.3.7-prepare_#5783
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git.


      at a05ba81  [1.3.7-prepare#5775][Improvement][Worker] Task log may be lost Issue #5775 Pr #5783

This branch includes the following new commits:

     new a05ba81  [1.3.7-prepare#5775][Improvement][Worker] Task log may be lost Issue #5775 Pr #5783

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[dolphinscheduler] 01/01: [1.3.7-prepare#5775][Improvement][Worker] Task log may be lost Issue #5775 Pr #5783

Posted by ki...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch 1.3.7-prepare_#5783
in repository https://gitbox.apache.org/repos/asf/dolphinscheduler.git

commit a05ba8150bc9a92ebcc7649349c32d09273e8fc3
Author: CalvinKirs <ki...@apache.org>
AuthorDate: Mon Jul 19 16:28:07 2021 +0800

    [1.3.7-prepare#5775][Improvement][Worker] Task log may be lost
    Issue #5775
    Pr #5783
---
 .../worker/task/AbstractCommandExecutor.java       | 59 ++++++----------------
 .../server/worker/task/AbstractTask.java           | 11 +++-
 .../server/worker/task/PythonCommandExecutor.java  |  3 +-
 .../server/worker/task/ShellCommandExecutor.java   |  3 +-
 .../server/worker/task/sqoop/SqoopTaskTest.java    | 40 ++++++++++++---
 5 files changed, 61 insertions(+), 55 deletions(-)

diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java
index 4baf8af..a0d1016 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractCommandExecutor.java
@@ -41,6 +41,7 @@ import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
 import java.util.regex.Matcher;
@@ -67,7 +68,7 @@ public abstract class AbstractCommandExecutor {
     /**
      *  log handler
      */
-    protected Consumer<List<String>> logHandler;
+    protected Consumer<LinkedBlockingQueue<String>> logHandler;
 
     /**
      *  logger
@@ -75,9 +76,11 @@ public abstract class AbstractCommandExecutor {
     protected Logger logger;
 
     /**
-     *  log list
+     * log collection
      */
-    protected final List<String> logBuffer;
+    protected final LinkedBlockingQueue<String> logBuffer;
+
+    protected boolean logOutputIsScuccess = false;
 
     /**
      * taskExecutionContext
@@ -92,13 +95,13 @@ public abstract class AbstractCommandExecutor {
      */
     private TaskExecutionContextCacheManager taskExecutionContextCacheManager;
 
-    public AbstractCommandExecutor(Consumer<List<String>> logHandler,
+    public AbstractCommandExecutor(Consumer<LinkedBlockingQueue<String>> logHandler,
                                    TaskExecutionContext taskExecutionContext ,
                                    Logger logger){
         this.logHandler = logHandler;
         this.taskExecutionContext = taskExecutionContext;
         this.logger = logger;
-        this.logBuffer = Collections.synchronizedList(new ArrayList<>());
+        this.logBuffer = new LinkedBlockingQueue<>();
         this.taskExecutionContextCacheManager = SpringApplicationContext.getBean(TaskExecutionContextCacheManagerImpl.class);
     }
 
@@ -335,15 +338,14 @@ public abstract class AbstractCommandExecutor {
      */
     private void clear() {
 
-        List<String> markerList = new ArrayList<>();
-        markerList.add(ch.qos.logback.classic.ClassicConstants.FINALIZE_SESSION_MARKER.toString());
+        LinkedBlockingQueue<String> markerLog = new LinkedBlockingQueue<>();
+        markerLog.add(ch.qos.logback.classic.ClassicConstants.FINALIZE_SESSION_MARKER.toString());
 
         if (!logBuffer.isEmpty()) {
             // log handle
             logHandler.accept(logBuffer);
-            logBuffer.clear();
         }
-        logHandler.accept(markerList);
+        logHandler.accept(markerLog);
     }
 
     /**
@@ -354,9 +356,7 @@ public abstract class AbstractCommandExecutor {
         String threadLoggerInfoName = String.format(LoggerUtils.TASK_LOGGER_THREAD_NAME + "-%s", taskExecutionContext.getTaskAppId());
         ExecutorService getOutputLogService = ThreadUtils.newDaemonSingleThreadExecutor(threadLoggerInfoName + "-" + "getOutputLogService");
         getOutputLogService.submit(() -> {
-            BufferedReader inReader = null;
-            try {
-                inReader = new BufferedReader(new InputStreamReader(process.getInputStream()));
+            try (BufferedReader inReader = new BufferedReader(new InputStreamReader(process.getInputStream()))) {
                 String line;
                 while ((line = inReader.readLine()) != null) {
                     logBuffer.add(line);
@@ -364,8 +364,7 @@ public abstract class AbstractCommandExecutor {
             } catch (Exception e) {
                 logger.error(e.getMessage(), e);
             } finally {
-                logOutputIsSuccess = true;
-                close(inReader);
+                logOutputIsScuccess = true;
             }
         });
         getOutputLogService.shutdown();
@@ -454,31 +453,20 @@ public abstract class AbstractCommandExecutor {
      * @return line list
      */
     private List<String> convertFile2List(String filename) {
-        List lineList = new ArrayList<String>(100);
+        List<String> lineList = new ArrayList<>(100);
         File file=new File(filename);
 
         if (!file.exists()){
             return lineList;
         }
 
-        BufferedReader br = null;
-        try {
-            br = new BufferedReader(new InputStreamReader(new FileInputStream(filename), StandardCharsets.UTF_8));
+        try (BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(filename), StandardCharsets.UTF_8))) {
             String line = null;
             while ((line = br.readLine()) != null) {
                 lineList.add(line);
             }
         } catch (Exception e) {
             logger.error(String.format("read file: %s failed : ",filename),e);
-        } finally {
-            if(br != null){
-                try {
-                    br.close();
-                } catch (IOException e) {
-                    logger.error(e.getMessage(),e);
-                }
-            }
-
         }
         return lineList;
     }
@@ -555,27 +543,10 @@ public abstract class AbstractCommandExecutor {
             lastFlushTime = now;
             /** log handle */
             logHandler.accept(logBuffer);
-
-            logBuffer.clear();
         }
         return lastFlushTime;
     }
 
-    /**
-     * close buffer reader
-     *
-     * @param inReader in reader
-     */
-    private void close(BufferedReader inReader) {
-        if (inReader != null) {
-            try {
-                inReader.close();
-            } catch (IOException e) {
-                logger.error(e.getMessage(), e);
-            }
-        }
-    }
-
     protected List<String> commandOptions() {
         return Collections.emptyList();
     }
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java
index 84c5052..a85acdc 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/AbstractTask.java
@@ -44,6 +44,8 @@ import org.apache.commons.lang.StringUtils;
 
 import java.util.List;
 import java.util.Map;
+import java.util.StringJoiner;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import org.slf4j.Logger;
 
@@ -119,14 +121,19 @@ public abstract class AbstractTask {
 
     /**
      * log handle
+     *
      * @param logs log list
      */
-    public void logHandle(List<String> logs) {
+    public void logHandle(LinkedBlockingQueue<String> logs) {
         // note that the "new line" is added here to facilitate log parsing
         if (logs.contains(FINALIZE_SESSION_MARKER.toString())) {
             logger.info(FINALIZE_SESSION_MARKER, FINALIZE_SESSION_MARKER.toString());
         } else {
-            logger.info(" -> {}", String.join("\n\t", logs));
+            StringJoiner joiner = new StringJoiner("\n\t");
+            while (!logs.isEmpty()) {
+                joiner.add(logs.poll());
+            }
+            logger.info(" -> {}", joiner);
         }
     }
 
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/PythonCommandExecutor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/PythonCommandExecutor.java
index 344d00f..2ea5710 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/PythonCommandExecutor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/PythonCommandExecutor.java
@@ -30,6 +30,7 @@ import java.nio.file.Paths;
 import java.util.Collections;
 import java.util.Date;
 import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.function.Consumer;
 
 /**
@@ -54,7 +55,7 @@ public class PythonCommandExecutor extends AbstractCommandExecutor {
      * @param taskExecutionContext       taskExecutionContext
      * @param logger        logger
      */
-    public PythonCommandExecutor(Consumer<List<String>> logHandler,
+    public PythonCommandExecutor(Consumer<LinkedBlockingQueue<String>> logHandler,
                                  TaskExecutionContext taskExecutionContext,
                                  Logger logger) {
         super(logHandler,taskExecutionContext,logger);
diff --git a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java
index 5e297ab..7a552c4 100644
--- a/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java
+++ b/dolphinscheduler-server/src/main/java/org/apache/dolphinscheduler/server/worker/task/ShellCommandExecutor.java
@@ -27,6 +27,7 @@ import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Paths;
 import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.function.Consumer;
 
 /**
@@ -50,7 +51,7 @@ public class ShellCommandExecutor extends AbstractCommandExecutor {
      * @param taskExecutionContext taskExecutionContext
      * @param logger logger
      */
-    public ShellCommandExecutor(Consumer<List<String>> logHandler,
+    public ShellCommandExecutor(Consumer<LinkedBlockingQueue<String>> logHandler,
                                 TaskExecutionContext taskExecutionContext,
                                 Logger logger) {
         super(logHandler,taskExecutionContext,logger);
diff --git a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java
index 5a15a21..fecb31e 100644
--- a/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java
+++ b/dolphinscheduler-server/src/test/java/org/apache/dolphinscheduler/server/worker/task/sqoop/SqoopTaskTest.java
@@ -25,7 +25,11 @@ import org.apache.dolphinscheduler.server.worker.task.sqoop.generator.SqoopJobGe
 import org.apache.dolphinscheduler.service.bean.SpringApplicationContext;
 import org.apache.dolphinscheduler.service.process.ProcessService;
 
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Date;
+import java.util.List;
+import java.util.concurrent.LinkedBlockingQueue;
 
 import org.junit.Assert;
 import org.junit.Before;
@@ -137,18 +141,19 @@ public class SqoopTaskTest {
         //import mysql to hive
         String mysqlToHive =
             "{\"jobName\":\"sqoop_import\",\"jobType\":\"TEMPLATE\",\"concurrency\":1,\"modelType\":\"import\",\"sourceType\":\"MYSQL\",\"targetType\":\"HIVE\","
-                + "\"hadoopCustomParams\":[{\"prop\":\"mapreduce.map.memory.mb\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"2048\"},{\"prop\":\"mapreduce.reduce.memory.mb\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"2048\"}],"
-                + "\"sqoopAdvancedParams\":[{\"prop\":\"--delete-target-dir\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"\"},{\"prop\":\"--direct\",\"direct\":\"IN\",\"type\":\"VARCHAR\",\"value\":\"\"}],"
                 + "\"sourceParams\":\"{\\\"srcDatasource\\\":2,\\\"srcTable\\\":\\\"person_2\\\",\\\"srcQueryType\\\":\\\"1\\\","
                 + "\\\"srcQuerySql\\\":\\\"SELECT * FROM person_2\\\",\\\"srcColumnType\\\":\\\"0\\\",\\\"srcColumns\\\":\\\"\\\",\\\"srcConditionList\\\":[],"
-                + "\\\"mapColumnHive\\\":[{\\\"prop\\\":\\\"create_time\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"string\\\"},{\\\"prop\\\":\\\"update_time\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"string\\\"}],"
-                + "\\\"mapColumnJava\\\":[{\\\"prop\\\":\\\"create_time\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"java.sql.Date\\\"},{\\\"prop\\\":\\\"update_time\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"java.sql.Date\\\"}]}\","
+                + "\\\"mapColumnHive\\\":[],\\\"mapColumnJava\\\":[{\\\"prop\\\":\\\"id\\\",\\\"direct\\\":\\\"IN\\\",\\\"type\\\":\\\"VARCHAR\\\",\\\"value\\\":\\\"Integer\\\"}]}\","
                 + "\"targetParams\":\"{\\\"hiveDatabase\\\":\\\"stg\\\",\\\"hiveTable\\\":\\\"person_internal_2\\\",\\\"createHiveTable\\\":true,\\\"dropDelimiter\\\":false,"
-                + "\\\"hiveOverWrite\\\":true,\\\"hiveTargetDir\\\":\\\"/tmp/sqoop_import_hive\\\",\\\"replaceDelimiter\\\":\\\"\\\",\\\"hivePartitionKey\\\":\\\"date\\\",\\\"hivePartitionValue\\\":\\\"2020-02-16\\\"}\",\"localParams\":[]}";
+                + "\\\"hiveOverWrite\\\":true,\\\"replaceDelimiter\\\":\\\"\\\",\\\"hivePartitionKey\\\":\\\"date\\\",\\\"hivePartitionValue\\\":\\\"2020-02-16\\\"}\",\"localParams\":[]}";
         SqoopParameters mysqlToHiveParams = JSONUtils.parseObject(mysqlToHive, SqoopParameters.class);
         String mysqlToHiveScript = generator.generateSqoopJob(mysqlToHiveParams, mysqlTaskExecutionContext);
-        String mysqlToHiveExpected ="sqoop import -D mapred.job.name=sqoop_import -D mapreduce.map.memory.mb=2048 -D mapreduce.reduce.memory.mb=2048 --delete-target-dir  --direct  -m 1 --connect \"jdbc:mysql://192.168.0.111:3306/test?allowLoadLocalInfile=false&autoDeserialize=false&allowLocalInfile=false&allowUrlInLocalInfile=false\" --username kylo --password \"123456\" --query \"SELECT * FROM person_2 WHERE \\$CONDITIONS\" --map-column-hive create_time=string,update_time=string --map-c [...]
-
+        String mysqlToHiveExpected =
+            "sqoop import -D mapred.job.name=sqoop_import -m 1 --connect \"jdbc:mysql://192.168.0.111:3306/"
+                    + "test?allowLoadLocalInfile=false&autoDeserialize=false&allowLocalInfile=false&allowUrlInLocalInfile=false\" "
+                    + "--username kylo --password \"123456\" "
+                + "--query \"SELECT * FROM person_2 WHERE \\$CONDITIONS\" --map-column-java id=Integer --hive-import --hive-database stg --hive-table person_internal_2 "
+                + "--create-hive-table --hive-overwrite --delete-target-dir --hive-partition-key date --hive-partition-value 2020-02-16";
                       Assert.assertEquals(mysqlToHiveExpected, mysqlToHiveScript);
 
         //sqoop CUSTOM job
@@ -199,4 +204,25 @@ public class SqoopTaskTest {
         }
     }
 
+    @Test
+    public void testLogHandler() throws InterruptedException {
+        LinkedBlockingQueue<String> loggerBuffer = new LinkedBlockingQueue<>();
+        Thread thread1 = new Thread(() -> {
+            for (int i = 0; i < 10; i++) {
+                loggerBuffer.add("test add log");
+            }
+        });
+        Thread thread2 = new Thread(() -> {
+            for (int i = 0; i < 10; i++) {
+                sqoopTask.logHandle(loggerBuffer);
+            }
+        });
+        thread1.start();
+        thread2.start();
+        thread1.join();
+        thread2.join();
+        // if no exception throw, assert true
+        Assert.assertTrue(true);
+    }
+
 }