You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2020/11/27 07:22:53 UTC
[kylin] 06/06: KYLIN-4825 Add yarn tracking url on job step page
This is an automated email from the ASF dual-hosted git repository.
xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 51ff5ec9c6959bd5238c579573494bc4b07c2095
Author: yaqian.zhang <59...@qq.com>
AuthorDate: Wed Nov 25 20:00:08 2020 +0800
KYLIN-4825 Add yarn tracking url on job step page
---
.../org/apache/kylin/common/KylinConfigBase.java | 4 +
.../kylin/job/execution/ExecutableManager.java | 4 +-
.../engine/spark/application/SparkApplication.java | 106 +++++++++++++++++++++
.../kylin/engine/spark/job/NSparkExecutable.java | 15 +--
.../kylin/rest/controller/JobController.java | 18 +++-
.../kylin/rest/request/SparkJobUpdateRequest.java | 54 +++++++++++
.../org/apache/kylin/rest/service/JobService.java | 13 +++
server/src/main/resources/kylinSecurity.xml | 1 +
8 files changed, 198 insertions(+), 17 deletions(-)
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 964eb64..8f2bf8e 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -2648,6 +2648,10 @@ public abstract class KylinConfigBase implements Serializable {
return Integer.parseInt(getOptional("kylin.engine.driver-memory-base", "1024"));
}
+ public boolean isTrackingUrlIpAddressEnabled() {
+ return Boolean.valueOf(this.getOptional("kylin.job.tracking-url-ip-address-enabled", TRUE));
+ }
+
//Auto adjust the memory of driver
public int[] getSparkEngineDriverMemoryStrategy() {
String[] dft = {"2", "20", "100"};
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
index 42a9c99..0bd7b6e 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
@@ -672,6 +672,7 @@ public class ExecutableManager {
+ newStatus + ", job id: " + jobId);
}
jobOutput.setStatus(newStatus.toString());
+ logger.info("job id:" + jobId + " from " + oldStatus + " to " + newStatus);
}
if (info != null) {
jobOutput.setInfo(info);
@@ -683,7 +684,6 @@ public class ExecutableManager {
jobOutput.setContent(output);
}
executableDao.updateJobOutput(jobOutput);
- logger.info("job id:" + jobId + " from " + oldStatus + " to " + newStatus);
if (needDestroyProcess(oldStatus, newStatus)) {
logger.debug("need kill {}, from {} to {}", jobId, oldStatus, newStatus);
@@ -695,7 +695,7 @@ public class ExecutableManager {
throw new RuntimeException(e);
}
- if (project != null) {
+ if (project != null && logPath != null) {
updateJobOutputToHDFS(project, jobId, output, logPath);
}
}
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
index 84737cb..98dde2b 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
@@ -18,6 +18,14 @@
package org.apache.kylin.engine.spark.application;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.http.HttpHeaders;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpStatus;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.DefaultHttpClient;
import org.apache.kylin.shaded.com.google.common.collect.Maps;
import org.apache.kylin.engine.spark.job.BuildJobInfos;
import org.apache.kylin.engine.spark.job.KylinBuildEnv;
@@ -26,8 +34,15 @@ import org.apache.kylin.engine.spark.job.UdfManager;
import org.apache.kylin.engine.spark.utils.MetaDumpUtil;
import org.apache.kylin.engine.spark.utils.SparkConfHelper;
import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetAddress;
+import java.net.URI;
+import java.net.UnknownHostException;
+import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Locale;
import java.util.Map;
import org.apache.hadoop.fs.FileSystem;
@@ -111,6 +126,93 @@ public abstract class SparkApplication {
return YarnInfoFetcherUtils.getTrackingUrl(yarnAppId);
}
+ /**
+ * http request the spark job controller
+ */
+ public Boolean updateSparkJobInfo(String url, String json) {
+ String serverIp = System.getProperty("spark.driver.rest.server.ip", "127.0.0.1");
+ String port = System.getProperty("spark.driver.rest.server.port", "7070");
+ String requestApi = String.format(Locale.ROOT, "http://%s:%s" + url, serverIp, port);
+
+ try {
+ DefaultHttpClient httpClient = new DefaultHttpClient();
+ HttpPut httpPut = new HttpPut(requestApi);
+ httpPut.addHeader(HttpHeaders.CONTENT_TYPE, "application/json");
+ httpPut.setEntity(new StringEntity(json, StandardCharsets.UTF_8));
+
+ HttpResponse response = httpClient.execute(httpPut);
+ int code = response.getStatusLine().getStatusCode();
+ if (code == HttpStatus.SC_OK) {
+ return true;
+ } else {
+ InputStream inputStream = response.getEntity().getContent();
+ String responseContent = IOUtils.toString(inputStream);
+ logger.warn("update spark job failed, info: {}", responseContent);
+ }
+ } catch (IOException e) {
+ logger.error("http request {} failed!", requestApi, e);
+ }
+ return false;
+ }
+
+ /**
+ * update spark job extra info, link yarn_application_tracking_url
+ */
+ public Boolean updateSparkJobExtraInfo(String url, String project, String jobId, Map<String, String> extraInfo) {
+ Map<String, String> payload = new HashMap<>(5);
+ payload.put("project", project);
+ payload.put("taskId", System.getProperty("spark.driver.param.taskId", jobId));
+ payload.putAll(extraInfo);
+
+ try {
+ String payloadJson = JsonUtil.writeValueAsString(payload);
+ int retry = 3;
+ for (int i = 0; i < retry; i++) {
+ if (updateSparkJobInfo(url, payloadJson)) {
+ return Boolean.TRUE;
+ }
+ Thread.sleep(3000);
+ logger.warn("retry request rest api update spark extra job info");
+ }
+ } catch (Exception e) {
+ logger.error("update spark job extra info failed!", e);
+ }
+
+ return Boolean.FALSE;
+ }
+
+ private String tryReplaceHostAddress(String url) {
+ String originHost = null;
+ try {
+ URI uri = URI.create(url);
+ originHost = uri.getHost();
+ String hostAddress = InetAddress.getByName(originHost).getHostAddress();
+ return url.replace(originHost, hostAddress);
+ } catch (UnknownHostException uhe) {
+ logger.error("failed to get the ip address of " + originHost + ", step back to use the origin tracking url.", uhe);
+ return url;
+ }
+ }
+
+ private Map<String, String> getTrackingInfo(boolean ipAddressPreferred) {
+ String applicationId = ss.sparkContext().applicationId();
+ Map<String, String> extraInfo = new HashMap<>();
+ try {
+ String trackingUrl = getTrackingUrl(applicationId);
+ if (StringUtils.isBlank(trackingUrl)) {
+ logger.warn("Get tracking url of application {}, but empty url found.", applicationId);
+ return extraInfo;
+ }
+ if (ipAddressPreferred) {
+ trackingUrl = tryReplaceHostAddress(trackingUrl);
+ }
+ extraInfo.put("yarnAppUrl", trackingUrl);
+ } catch (Exception e) {
+ logger.error("get tracking url failed!", e);
+ }
+ return extraInfo;
+ }
+
final protected void execute() throws Exception {
String hdfsMetalUrl = getParam(MetadataConstants.P_DIST_META_URL);
@@ -180,6 +282,10 @@ public abstract class SparkApplication {
}).enableHiveSupport().config(sparkConf).config("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
.getOrCreate();
+ if (isJobOnCluster(sparkConf)) {
+ updateSparkJobExtraInfo("/kylin/api/jobs/spark", project, jobId,
+ getTrackingInfo(config.isTrackingUrlIpAddressEnabled()));
+ }
// for spark metrics
//JobMetricsUtils.registerListener(ss);
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
index 7cd178c..393c10d 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
@@ -59,7 +59,6 @@ import org.apache.kylin.common.util.CliCommandExecutor;
import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.JsonUtil;
import org.apache.kylin.job.common.PatternedLogger;
-import org.apache.kylin.job.constant.ExecutableConstants;
import org.apache.kylin.job.exception.ExecuteException;
import org.apache.kylin.job.execution.AbstractExecutable;
import org.apache.kylin.job.execution.ExecutableContext;
@@ -254,24 +253,12 @@ public class NSparkExecutable extends AbstractExecutable {
String kylinJobJar, String appArgs, String jobId) {
PatternedLogger patternedLogger;
if (config.isJobLogPrintEnabled()) {
- patternedLogger = new PatternedLogger(logger, new PatternedLogger.ILogListener() {
- @Override
- public void onLogEvent(String infoKey, Map<String, String> info) {
- // only care three properties here
- if (ExecutableConstants.SPARK_JOB_ID.equals(infoKey)
- || ExecutableConstants.YARN_APP_ID.equals(infoKey)
- || ExecutableConstants.YARN_APP_URL.equals(infoKey)) {
- getManager().addJobInfo(getId(), info);
- }
- }
- });
+ patternedLogger = new PatternedLogger(logger);
} else {
patternedLogger = new PatternedLogger(null);
}
try {
String cmd = generateSparkCmd(config, hadoopConf, jars, kylinJobJar, appArgs);
- patternedLogger.log("cmd: ");
- patternedLogger.log(cmd);
CliCommandExecutor exec = new CliCommandExecutor();
exec.execute(cmd, patternedLogger, jobId);
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java
index cd63ac7..6b60326 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java
@@ -34,6 +34,7 @@ import org.apache.kylin.job.constant.JobTimeFilterEnum;
import org.apache.kylin.rest.exception.BadRequestException;
import org.apache.kylin.rest.exception.InternalErrorException;
import org.apache.kylin.rest.request.JobListRequest;
+import org.apache.kylin.rest.request.SparkJobUpdateRequest;
import org.apache.kylin.rest.response.EnvelopeResponse;
import org.apache.kylin.rest.response.ResponseCode;
import org.apache.kylin.rest.service.JobService;
@@ -45,9 +46,11 @@ import org.springframework.http.MediaType;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;
-import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.PutMapping;
import javax.servlet.http.HttpServletResponse;
@@ -199,6 +202,19 @@ public class JobController extends BasicController {
}
/**
+ * RPC Call
+ */
+ @PutMapping(value = "/spark")
+ @ResponseBody
+ public EnvelopeResponse<String> updateSparkJobInfo(@RequestBody SparkJobUpdateRequest sparkJobUpdateRequest) {
+ jobService.updateSparkJobInfo(sparkJobUpdateRequest.getProject(),
+ sparkJobUpdateRequest.getTaskId(),
+ sparkJobUpdateRequest.getYarnAppUrl());
+
+ return new EnvelopeResponse<>(ResponseCode.CODE_SUCCESS, "", "");
+ }
+
+ /**
* Resume a cube job
*
* @return
diff --git a/server-base/src/main/java/org/apache/kylin/rest/request/SparkJobUpdateRequest.java b/server-base/src/main/java/org/apache/kylin/rest/request/SparkJobUpdateRequest.java
new file mode 100644
index 0000000..ce4147f
--- /dev/null
+++ b/server-base/src/main/java/org/apache/kylin/rest/request/SparkJobUpdateRequest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.rest.request;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class SparkJobUpdateRequest {
+ @JsonProperty
+ private String project;
+ @JsonProperty
+ private String taskId;
+ @JsonProperty
+ private String yarnAppUrl;
+
+ public String getProject() {
+ return project;
+ }
+
+ public void setProject(String project) {
+ this.project = project;
+ }
+
+ public String getTaskId() {
+ return taskId;
+ }
+
+ public void setTaskId(String taskId) {
+ this.taskId = taskId;
+ }
+
+ public String getYarnAppUrl() {
+ return yarnAppUrl;
+ }
+
+ public void setYarnAppUrl(String yarnAppUrl) {
+ this.yarnAppUrl = yarnAppUrl;
+ }
+}
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
index 90ee782..e543c22 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -33,6 +33,7 @@ import java.util.TimeZone;
import javax.annotation.Nullable;
+import com.google.common.collect.Maps;
import org.apache.commons.lang3.StringUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.lock.DistributedLock;
@@ -54,6 +55,7 @@ import org.apache.kylin.job.JobInstance;
import org.apache.kylin.job.JobSearchResult;
import org.apache.kylin.job.Scheduler;
import org.apache.kylin.job.SchedulerFactory;
+import org.apache.kylin.job.constant.ExecutableConstants;
import org.apache.kylin.job.constant.JobStatusEnum;
import org.apache.kylin.job.constant.JobTimeFilterEnum;
import org.apache.kylin.job.dao.ExecutableOutputPO;
@@ -467,6 +469,17 @@ public class JobService extends BasicService implements InitializingBean {
}
}
+ /**
+ * update the spark job yarnAppUrl.
+ */
+ public void updateSparkJobInfo(String project, String taskId, String yarnAppUrl) {
+ ExecutableManager executableManager = getExecutableManager();
+ Map<String, String> extraInfo = Maps.newHashMap();
+ extraInfo.put(ExecutableConstants.YARN_APP_URL, yarnAppUrl);
+
+ executableManager.updateJobOutput(project, taskId, null, extraInfo, null, null);
+ }
+
public JobInstance getJobInstance(String uuid) {
AbstractExecutable job = getExecutableManager().getJob(uuid);
if (job instanceof CheckpointExecutable) {
diff --git a/server/src/main/resources/kylinSecurity.xml b/server/src/main/resources/kylinSecurity.xml
index 0003a5f..f73c595 100644
--- a/server/src/main/resources/kylinSecurity.xml
+++ b/server/src/main/resources/kylinSecurity.xml
@@ -249,6 +249,7 @@
<scr:intercept-url pattern="/api/cubes*/**" access="isAuthenticated()"/>
<scr:intercept-url pattern="/api/models*/**" access="isAuthenticated()"/>
<scr:intercept-url pattern="/api/streaming*/**" access="isAuthenticated()"/>
+ <scr:intercept-url pattern="/api/jobs/spark" access="permitAll"/>
<scr:intercept-url pattern="/api/job*/**" access="isAuthenticated()"/>
<scr:intercept-url pattern="/api/admin/public_config" access="permitAll"/>
<scr:intercept-url pattern="/api/admin/version" access="permitAll"/>