You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2023/04/03 15:10:17 UTC

[doris] 04/07: [Enhancement](spark load)Support spark time out config (#17108)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git

commit a59478a14146dc1e0e9f4e306b996d4193af0287
Author: liujinhui <96...@qq.com>
AuthorDate: Thu Mar 30 20:12:46 2023 +0800

    [Enhancement](spark load)Support spark time out config (#17108)
---
 .../doris/load/loadv2/SparkEtlJobHandler.java      |  5 ++++-
 .../doris/load/loadv2/SparkLauncherMonitor.java    | 22 ++++++++++++++++++----
 .../load/loadv2/SparkLauncherMonitorTest.java      |  7 ++++++-
 3 files changed, 28 insertions(+), 6 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java
index 57f44f41d5..57f0c58af7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java
@@ -117,6 +117,8 @@ public class SparkEtlJobHandler {
             sparkConfigs.put("spark.yarn.stage.dir", jobStageHdfsPath);
         }
 
+        LOG.info("submit etl spark job, sparkConfigs:{}", sparkConfigs);
+
         try {
             byte[] configData = etlJobConfig.configToJson().getBytes("UTF-8");
             BrokerUtil.writeFile(configData, jobConfigHdfsPath, brokerDesc);
@@ -154,7 +156,8 @@ public class SparkEtlJobHandler {
             Process process = launcher.launch();
             handle.setProcess(process);
             if (!FeConstants.runningUnitTest) {
-                SparkLauncherMonitor.LogMonitor logMonitor = SparkLauncherMonitor.createLogMonitor(handle);
+                SparkLauncherMonitor.LogMonitor logMonitor =
+                        SparkLauncherMonitor.createLogMonitor(handle, sparkConfigs);
                 logMonitor.setSubmitTimeoutMs(GET_APPID_TIMEOUT_MS);
                 logMonitor.setRedirectLogPath(logFilePath);
                 logMonitor.start();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLauncherMonitor.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLauncherMonitor.java
index 012b7ad6d5..4cf388f53b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLauncherMonitor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLauncherMonitor.java
@@ -20,6 +20,8 @@ package org.apache.doris.load.loadv2;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Splitter;
 import com.google.common.base.Strings;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.logging.log4j.LogManager;
@@ -32,14 +34,15 @@ import java.io.IOException;
 import java.io.InputStreamReader;
 import java.io.OutputStream;
 import java.util.List;
+import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 public class SparkLauncherMonitor {
     private static final Logger LOG = LogManager.getLogger(SparkLauncherMonitor.class);
 
-    public static LogMonitor createLogMonitor(SparkLoadAppHandle handle) {
-        return new LogMonitor(handle);
+    public static LogMonitor createLogMonitor(SparkLoadAppHandle handle, Map<String, String> resourceSparkConfig) {
+        return new LogMonitor(handle, resourceSparkConfig);
     }
 
     private static SparkLoadAppHandle.State fromYarnState(YarnApplicationState yarnState) {
@@ -80,18 +83,29 @@ public class SparkLauncherMonitor {
 
         // 5min
         private static final long DEFAULT_SUBMIT_TIMEOUT_MS = 300000L;
+        private static final String SUBMIT_TIMEOUT_KEY = "spark.submit.timeout";
 
-        public LogMonitor(SparkLoadAppHandle handle) {
+        public LogMonitor(SparkLoadAppHandle handle, Map<String, String> resourceSparkConfig) {
             this.handle = handle;
             this.process = handle.getProcess();
             this.isStop = false;
-            setSubmitTimeoutMs(DEFAULT_SUBMIT_TIMEOUT_MS);
+
+            if (MapUtils.isNotEmpty(resourceSparkConfig)
+                    && StringUtils.isNotEmpty(resourceSparkConfig.get(SUBMIT_TIMEOUT_KEY))) {
+                setSubmitTimeoutMs(Long.parseLong(resourceSparkConfig.get(SUBMIT_TIMEOUT_KEY)));
+            } else {
+                setSubmitTimeoutMs(DEFAULT_SUBMIT_TIMEOUT_MS);
+            }
         }
 
         public void setSubmitTimeoutMs(long submitTimeoutMs) {
             this.submitTimeoutMs = submitTimeoutMs;
         }
 
+        public long getSubmitTimeoutMs() {
+            return submitTimeoutMs;
+        }
+
         public void setRedirectLogPath(String redirectLogPath) throws IOException {
             this.outputStream = new FileOutputStream(new File(redirectLogPath), false);
             this.handle.setLogPath(redirectLogPath);
diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLauncherMonitorTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLauncherMonitorTest.java
index f9874c33c7..c9f28f188e 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLauncherMonitorTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLauncherMonitorTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.load.loadv2;
 
+import org.apache.commons.collections.map.HashedMap;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.junit.After;
 import org.junit.Assert;
@@ -26,6 +27,7 @@ import org.junit.Test;
 import java.io.File;
 import java.io.IOException;
 import java.net.URL;
+import java.util.Map;
 
 public class SparkLauncherMonitorTest {
     private String appId;
@@ -51,13 +53,16 @@ public class SparkLauncherMonitorTest {
 
     @Test
     public void testLogMonitorNormal() {
+        Map<String, String> config = new HashedMap();
+        config.put("spark.submit.timeout", "600000");
         URL log = getClass().getClassLoader().getResource("spark_launcher_monitor.log");
         String cmd = "cat " + log.getPath();
         SparkLoadAppHandle handle = null;
         try {
             Process process = Runtime.getRuntime().exec(cmd);
             handle = new SparkLoadAppHandle(process);
-            SparkLauncherMonitor.LogMonitor logMonitor = SparkLauncherMonitor.createLogMonitor(handle);
+            SparkLauncherMonitor.LogMonitor logMonitor = SparkLauncherMonitor.createLogMonitor(handle, config);
+            Assert.assertEquals(logMonitor.getSubmitTimeoutMs(), 600000L);
             logMonitor.setRedirectLogPath(logPath);
             logMonitor.start();
             try {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org