You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2023/04/03 15:10:17 UTC
[doris] 04/07: [Enhancement](spark load)Support spark time out config (#17108)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git
commit a59478a14146dc1e0e9f4e306b996d4193af0287
Author: liujinhui <96...@qq.com>
AuthorDate: Thu Mar 30 20:12:46 2023 +0800
[Enhancement](spark load)Support spark time out config (#17108)
---
.../doris/load/loadv2/SparkEtlJobHandler.java | 5 ++++-
.../doris/load/loadv2/SparkLauncherMonitor.java | 22 ++++++++++++++++++----
.../load/loadv2/SparkLauncherMonitorTest.java | 7 ++++++-
3 files changed, 28 insertions(+), 6 deletions(-)
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java
index 57f44f41d5..57f0c58af7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkEtlJobHandler.java
@@ -117,6 +117,8 @@ public class SparkEtlJobHandler {
sparkConfigs.put("spark.yarn.stage.dir", jobStageHdfsPath);
}
+ LOG.info("submit etl spark job, sparkConfigs:{}", sparkConfigs);
+
try {
byte[] configData = etlJobConfig.configToJson().getBytes("UTF-8");
BrokerUtil.writeFile(configData, jobConfigHdfsPath, brokerDesc);
@@ -154,7 +156,8 @@ public class SparkEtlJobHandler {
Process process = launcher.launch();
handle.setProcess(process);
if (!FeConstants.runningUnitTest) {
- SparkLauncherMonitor.LogMonitor logMonitor = SparkLauncherMonitor.createLogMonitor(handle);
+ SparkLauncherMonitor.LogMonitor logMonitor =
+ SparkLauncherMonitor.createLogMonitor(handle, sparkConfigs);
logMonitor.setSubmitTimeoutMs(GET_APPID_TIMEOUT_MS);
logMonitor.setRedirectLogPath(logFilePath);
logMonitor.start();
diff --git a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLauncherMonitor.java b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLauncherMonitor.java
index 012b7ad6d5..4cf388f53b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLauncherMonitor.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/load/loadv2/SparkLauncherMonitor.java
@@ -20,6 +20,8 @@ package org.apache.doris.load.loadv2;
import com.google.common.base.Preconditions;
import com.google.common.base.Splitter;
import com.google.common.base.Strings;
+import org.apache.commons.collections.MapUtils;
+import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.logging.log4j.LogManager;
@@ -32,14 +34,15 @@ import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.util.List;
+import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class SparkLauncherMonitor {
private static final Logger LOG = LogManager.getLogger(SparkLauncherMonitor.class);
- public static LogMonitor createLogMonitor(SparkLoadAppHandle handle) {
- return new LogMonitor(handle);
+ public static LogMonitor createLogMonitor(SparkLoadAppHandle handle, Map<String, String> resourceSparkConfig) {
+ return new LogMonitor(handle, resourceSparkConfig);
}
private static SparkLoadAppHandle.State fromYarnState(YarnApplicationState yarnState) {
@@ -80,18 +83,29 @@ public class SparkLauncherMonitor {
// 5min
private static final long DEFAULT_SUBMIT_TIMEOUT_MS = 300000L;
+ private static final String SUBMIT_TIMEOUT_KEY = "spark.submit.timeout";
- public LogMonitor(SparkLoadAppHandle handle) {
+ public LogMonitor(SparkLoadAppHandle handle, Map<String, String> resourceSparkConfig) {
this.handle = handle;
this.process = handle.getProcess();
this.isStop = false;
- setSubmitTimeoutMs(DEFAULT_SUBMIT_TIMEOUT_MS);
+
+ if (MapUtils.isNotEmpty(resourceSparkConfig)
+ && StringUtils.isNotEmpty(resourceSparkConfig.get(SUBMIT_TIMEOUT_KEY))) {
+ setSubmitTimeoutMs(Long.parseLong(resourceSparkConfig.get(SUBMIT_TIMEOUT_KEY)));
+ } else {
+ setSubmitTimeoutMs(DEFAULT_SUBMIT_TIMEOUT_MS);
+ }
}
public void setSubmitTimeoutMs(long submitTimeoutMs) {
this.submitTimeoutMs = submitTimeoutMs;
}
+ public long getSubmitTimeoutMs() {
+ return submitTimeoutMs;
+ }
+
public void setRedirectLogPath(String redirectLogPath) throws IOException {
this.outputStream = new FileOutputStream(new File(redirectLogPath), false);
this.handle.setLogPath(redirectLogPath);
diff --git a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLauncherMonitorTest.java b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLauncherMonitorTest.java
index f9874c33c7..c9f28f188e 100644
--- a/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLauncherMonitorTest.java
+++ b/fe/fe-core/src/test/java/org/apache/doris/load/loadv2/SparkLauncherMonitorTest.java
@@ -17,6 +17,7 @@
package org.apache.doris.load.loadv2;
+import org.apache.commons.collections.map.HashedMap;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.junit.After;
import org.junit.Assert;
@@ -26,6 +27,7 @@ import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.net.URL;
+import java.util.Map;
public class SparkLauncherMonitorTest {
private String appId;
@@ -51,13 +53,16 @@ public class SparkLauncherMonitorTest {
@Test
public void testLogMonitorNormal() {
+ Map<String, String> config = new HashedMap();
+ config.put("spark.submit.timeout", "600000");
URL log = getClass().getClassLoader().getResource("spark_launcher_monitor.log");
String cmd = "cat " + log.getPath();
SparkLoadAppHandle handle = null;
try {
Process process = Runtime.getRuntime().exec(cmd);
handle = new SparkLoadAppHandle(process);
- SparkLauncherMonitor.LogMonitor logMonitor = SparkLauncherMonitor.createLogMonitor(handle);
+ SparkLauncherMonitor.LogMonitor logMonitor = SparkLauncherMonitor.createLogMonitor(handle, config);
+ Assert.assertEquals(logMonitor.getSubmitTimeoutMs(), 600000L);
logMonitor.setRedirectLogPath(logPath);
logMonitor.start();
try {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org