You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kylin.apache.org by xx...@apache.org on 2020/11/27 07:22:53 UTC

[kylin] 06/06: KYLIN-4825 Add yarn tracking url on job step page

This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch kylin-on-parquet-v2
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 51ff5ec9c6959bd5238c579573494bc4b07c2095
Author: yaqian.zhang <59...@qq.com>
AuthorDate: Wed Nov 25 20:00:08 2020 +0800

    KYLIN-4825 Add yarn tracking url on job step page
---
 .../org/apache/kylin/common/KylinConfigBase.java   |   4 +
 .../kylin/job/execution/ExecutableManager.java     |   4 +-
 .../engine/spark/application/SparkApplication.java | 106 +++++++++++++++++++++
 .../kylin/engine/spark/job/NSparkExecutable.java   |  15 +--
 .../kylin/rest/controller/JobController.java       |  18 +++-
 .../kylin/rest/request/SparkJobUpdateRequest.java  |  54 +++++++++++
 .../org/apache/kylin/rest/service/JobService.java  |  13 +++
 server/src/main/resources/kylinSecurity.xml        |   1 +
 8 files changed, 198 insertions(+), 17 deletions(-)

diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 964eb64..8f2bf8e 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -2648,6 +2648,10 @@ public abstract class KylinConfigBase implements Serializable {
         return Integer.parseInt(getOptional("kylin.engine.driver-memory-base", "1024"));
     }
 
+    public boolean isTrackingUrlIpAddressEnabled() {
+        return Boolean.valueOf(this.getOptional("kylin.job.tracking-url-ip-address-enabled", TRUE));
+    }
+
     //Auto adjust the memory of driver
     public int[] getSparkEngineDriverMemoryStrategy() {
         String[] dft = {"2", "20", "100"};
diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
index 42a9c99..0bd7b6e 100644
--- a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
+++ b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java
@@ -672,6 +672,7 @@ public class ExecutableManager {
                             + newStatus + ", job id: " + jobId);
                 }
                 jobOutput.setStatus(newStatus.toString());
+                logger.info("job id:" + jobId + " from " + oldStatus + " to " + newStatus);
             }
             if (info != null) {
                 jobOutput.setInfo(info);
@@ -683,7 +684,6 @@ public class ExecutableManager {
                 jobOutput.setContent(output);
             }
             executableDao.updateJobOutput(jobOutput);
-            logger.info("job id:" + jobId + " from " + oldStatus + " to " + newStatus);
 
             if (needDestroyProcess(oldStatus, newStatus)) {
                 logger.debug("need kill {}, from {} to {}", jobId, oldStatus, newStatus);
@@ -695,7 +695,7 @@ public class ExecutableManager {
             throw new RuntimeException(e);
         }
 
-        if (project != null) {
+        if (project != null && logPath != null) {
             updateJobOutputToHDFS(project, jobId, output, logPath);
         }
     }
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
index 84737cb..98dde2b 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/application/SparkApplication.java
@@ -18,6 +18,14 @@
 
 package org.apache.kylin.engine.spark.application;
 
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.http.HttpHeaders;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpStatus;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.entity.StringEntity;
+import org.apache.http.impl.client.DefaultHttpClient;
 import org.apache.kylin.shaded.com.google.common.collect.Maps;
 import org.apache.kylin.engine.spark.job.BuildJobInfos;
 import org.apache.kylin.engine.spark.job.KylinBuildEnv;
@@ -26,8 +34,15 @@ import org.apache.kylin.engine.spark.job.UdfManager;
 import org.apache.kylin.engine.spark.utils.MetaDumpUtil;
 import org.apache.kylin.engine.spark.utils.SparkConfHelper;
 import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetAddress;
+import java.net.URI;
+import java.net.UnknownHostException;
+import java.nio.charset.StandardCharsets;
 import java.nio.file.Files;
 import java.nio.file.Paths;
+import java.util.HashMap;
+import java.util.Locale;
 import java.util.Map;
 
 import org.apache.hadoop.fs.FileSystem;
@@ -111,6 +126,93 @@ public abstract class SparkApplication {
         return YarnInfoFetcherUtils.getTrackingUrl(yarnAppId);
     }
 
+    /**
+     * http request the spark job controller
+     */
+    public Boolean updateSparkJobInfo(String url, String json) {
+        String serverIp = System.getProperty("spark.driver.rest.server.ip", "127.0.0.1");
+        String port = System.getProperty("spark.driver.rest.server.port", "7070");
+        String requestApi = String.format(Locale.ROOT, "http://%s:%s" + url, serverIp, port);
+
+        try {
+            DefaultHttpClient httpClient = new DefaultHttpClient();
+            HttpPut httpPut = new HttpPut(requestApi);
+            httpPut.addHeader(HttpHeaders.CONTENT_TYPE, "application/json");
+            httpPut.setEntity(new StringEntity(json, StandardCharsets.UTF_8));
+
+            HttpResponse response = httpClient.execute(httpPut);
+            int code = response.getStatusLine().getStatusCode();
+            if (code == HttpStatus.SC_OK) {
+                return true;
+            } else {
+                InputStream inputStream = response.getEntity().getContent();
+                String responseContent = IOUtils.toString(inputStream);
+                logger.warn("update spark job failed, info: {}", responseContent);
+            }
+        } catch (IOException e) {
+            logger.error("http request {} failed!", requestApi, e);
+        }
+        return false;
+    }
+
+    /**
+     * update spark job extra info, link yarn_application_tracking_url
+     */
+    public Boolean updateSparkJobExtraInfo(String url, String project, String jobId, Map<String, String> extraInfo) {
+        Map<String, String> payload = new HashMap<>(5);
+        payload.put("project", project);
+        payload.put("taskId", System.getProperty("spark.driver.param.taskId", jobId));
+        payload.putAll(extraInfo);
+
+        try {
+            String payloadJson = JsonUtil.writeValueAsString(payload);
+            int retry = 3;
+            for (int i = 0; i < retry; i++) {
+                if (updateSparkJobInfo(url, payloadJson)) {
+                    return Boolean.TRUE;
+                }
+                Thread.sleep(3000);
+                logger.warn("retry request rest api update spark extra job info");
+            }
+        } catch (Exception e) {
+            logger.error("update spark job extra info failed!", e);
+        }
+
+        return Boolean.FALSE;
+    }
+
+    private String tryReplaceHostAddress(String url) {
+        String originHost = null;
+        try {
+            URI uri = URI.create(url);
+            originHost = uri.getHost();
+            String hostAddress = InetAddress.getByName(originHost).getHostAddress();
+            return url.replace(originHost, hostAddress);
+        } catch (UnknownHostException uhe) {
+            logger.error("failed to get the ip address of " + originHost + ", step back to use the origin tracking url.", uhe);
+            return url;
+        }
+    }
+
+    private Map<String, String> getTrackingInfo(boolean ipAddressPreferred) {
+        String applicationId = ss.sparkContext().applicationId();
+        Map<String, String> extraInfo = new HashMap<>();
+        try {
+            String trackingUrl = getTrackingUrl(applicationId);
+            if (StringUtils.isBlank(trackingUrl)) {
+                logger.warn("Get tracking url of application {}, but empty url found.", applicationId);
+                return extraInfo;
+            }
+            if (ipAddressPreferred) {
+                trackingUrl = tryReplaceHostAddress(trackingUrl);
+            }
+            extraInfo.put("yarnAppUrl", trackingUrl);
+        } catch (Exception e) {
+            logger.error("get tracking url failed!", e);
+        }
+        return extraInfo;
+    }
+
 
     final protected void execute() throws Exception {
         String hdfsMetalUrl = getParam(MetadataConstants.P_DIST_META_URL);
@@ -180,6 +282,10 @@ public abstract class SparkApplication {
             }).enableHiveSupport().config(sparkConf).config("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
                     .getOrCreate();
 
+            if (isJobOnCluster(sparkConf)) {
+                updateSparkJobExtraInfo("/kylin/api/jobs/spark", project, jobId,
+                        getTrackingInfo(config.isTrackingUrlIpAddressEnabled()));
+            }
             // for spark metrics
             //JobMetricsUtils.registerListener(ss);
 
diff --git a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
index 7cd178c..393c10d 100644
--- a/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
+++ b/kylin-spark-project/kylin-spark-engine/src/main/java/org/apache/kylin/engine/spark/job/NSparkExecutable.java
@@ -59,7 +59,6 @@ import org.apache.kylin.common.util.CliCommandExecutor;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.JsonUtil;
 import org.apache.kylin.job.common.PatternedLogger;
-import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableContext;
@@ -254,24 +253,12 @@ public class NSparkExecutable extends AbstractExecutable {
                                          String kylinJobJar, String appArgs, String jobId) {
         PatternedLogger patternedLogger;
         if (config.isJobLogPrintEnabled()) {
-            patternedLogger = new PatternedLogger(logger, new PatternedLogger.ILogListener() {
-                @Override
-                public void onLogEvent(String infoKey, Map<String, String> info) {
-                    // only care three properties here
-                    if (ExecutableConstants.SPARK_JOB_ID.equals(infoKey)
-                            || ExecutableConstants.YARN_APP_ID.equals(infoKey)
-                            || ExecutableConstants.YARN_APP_URL.equals(infoKey)) {
-                        getManager().addJobInfo(getId(), info);
-                    }
-                }
-            });
+            patternedLogger = new PatternedLogger(logger);
         } else {
             patternedLogger = new PatternedLogger(null);
         }
         try {
             String cmd = generateSparkCmd(config, hadoopConf, jars, kylinJobJar, appArgs);
-            patternedLogger.log("cmd: ");
-            patternedLogger.log(cmd);
 
             CliCommandExecutor exec = new CliCommandExecutor();
             exec.execute(cmd, patternedLogger, jobId);
diff --git a/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java b/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java
index cd63ac7..6b60326 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/controller/JobController.java
@@ -34,6 +34,7 @@ import org.apache.kylin.job.constant.JobTimeFilterEnum;
 import org.apache.kylin.rest.exception.BadRequestException;
 import org.apache.kylin.rest.exception.InternalErrorException;
 import org.apache.kylin.rest.request.JobListRequest;
+import org.apache.kylin.rest.request.SparkJobUpdateRequest;
 import org.apache.kylin.rest.response.EnvelopeResponse;
 import org.apache.kylin.rest.response.ResponseCode;
 import org.apache.kylin.rest.service.JobService;
@@ -45,9 +46,11 @@ import org.springframework.http.MediaType;
 import org.springframework.stereotype.Controller;
 import org.springframework.web.bind.annotation.PathVariable;
 import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.RequestBody;
 import org.springframework.web.bind.annotation.RequestMethod;
 import org.springframework.web.bind.annotation.ResponseBody;
-import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.PutMapping;
 
 import javax.servlet.http.HttpServletResponse;
 
@@ -199,6 +202,19 @@ public class JobController extends BasicController {
     }
 
     /**
+     * RPC Call
+     */
+    @PutMapping(value = "/spark")
+    @ResponseBody
+    public EnvelopeResponse<String> updateSparkJobInfo(@RequestBody SparkJobUpdateRequest sparkJobUpdateRequest) {
+        jobService.updateSparkJobInfo(sparkJobUpdateRequest.getProject(),
+                sparkJobUpdateRequest.getTaskId(),
+                sparkJobUpdateRequest.getYarnAppUrl());
+
+        return new EnvelopeResponse<>(ResponseCode.CODE_SUCCESS, "", "");
+    }
+
+    /**
      * Resume a cube job
      * 
      * @return
diff --git a/server-base/src/main/java/org/apache/kylin/rest/request/SparkJobUpdateRequest.java b/server-base/src/main/java/org/apache/kylin/rest/request/SparkJobUpdateRequest.java
new file mode 100644
index 0000000..ce4147f
--- /dev/null
+++ b/server-base/src/main/java/org/apache/kylin/rest/request/SparkJobUpdateRequest.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.rest.request;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+public class SparkJobUpdateRequest {
+    @JsonProperty
+    private String project;
+    @JsonProperty
+    private String taskId;
+    @JsonProperty
+    private String yarnAppUrl;
+
+    public String getProject() {
+        return project;
+    }
+
+    public void setProject(String project) {
+        this.project = project;
+    }
+
+    public String getTaskId() {
+        return taskId;
+    }
+
+    public void setTaskId(String taskId) {
+        this.taskId = taskId;
+    }
+
+    public String getYarnAppUrl() {
+        return yarnAppUrl;
+    }
+
+    public void setYarnAppUrl(String yarnAppUrl) {
+        this.yarnAppUrl = yarnAppUrl;
+    }
+}
diff --git a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
index 90ee782..e543c22 100644
--- a/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
+++ b/server-base/src/main/java/org/apache/kylin/rest/service/JobService.java
@@ -33,6 +33,7 @@ import java.util.TimeZone;
 
 import javax.annotation.Nullable;
 
+import com.google.common.collect.Maps;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.lock.DistributedLock;
@@ -54,6 +55,7 @@ import org.apache.kylin.job.JobInstance;
 import org.apache.kylin.job.JobSearchResult;
 import org.apache.kylin.job.Scheduler;
 import org.apache.kylin.job.SchedulerFactory;
+import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.constant.JobStatusEnum;
 import org.apache.kylin.job.constant.JobTimeFilterEnum;
 import org.apache.kylin.job.dao.ExecutableOutputPO;
@@ -467,6 +469,17 @@ public class JobService extends BasicService implements InitializingBean {
         }
     }
 
+    /**
+     * update the spark job yarnAppUrl.
+     */
+    public void updateSparkJobInfo(String project, String taskId, String yarnAppUrl) {
+        ExecutableManager executableManager = getExecutableManager();
+        Map<String, String> extraInfo = Maps.newHashMap();
+        extraInfo.put(ExecutableConstants.YARN_APP_URL, yarnAppUrl);
+
+        executableManager.updateJobOutput(project, taskId, null, extraInfo, null, null);
+    }
+
     public JobInstance getJobInstance(String uuid) {
         AbstractExecutable job = getExecutableManager().getJob(uuid);
         if (job instanceof CheckpointExecutable) {
diff --git a/server/src/main/resources/kylinSecurity.xml b/server/src/main/resources/kylinSecurity.xml
index 0003a5f..f73c595 100644
--- a/server/src/main/resources/kylinSecurity.xml
+++ b/server/src/main/resources/kylinSecurity.xml
@@ -249,6 +249,7 @@
             <scr:intercept-url pattern="/api/cubes*/**" access="isAuthenticated()"/>
             <scr:intercept-url pattern="/api/models*/**" access="isAuthenticated()"/>
             <scr:intercept-url pattern="/api/streaming*/**" access="isAuthenticated()"/>
+            <scr:intercept-url pattern="/api/jobs/spark" access="permitAll"/>
             <scr:intercept-url pattern="/api/job*/**" access="isAuthenticated()"/>
             <scr:intercept-url pattern="/api/admin/public_config" access="permitAll"/>
             <scr:intercept-url pattern="/api/admin/version" access="permitAll"/>