You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2021/08/05 02:26:31 UTC

[zeppelin] branch master updated: [ZEPPELIN-5439] Improve the logic of extract hive job url in JdbcInterpreter

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new aa96e31  [ZEPPELIN-5439] Improve the logic of extract hive job url in JdbcInterpreter
aa96e31 is described below

commit aa96e31807f5e384b5475ab6a017ca006790c46d
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Thu Jul 1 18:00:33 2021 +0800

    [ZEPPELIN-5439] Improve the logic of extract hive job url in JdbcInterpreter
    
    ### What is this PR for?
    
    Previously we extract hive job url from hive jdbc job log, but it seems the in the latest hive, there's no job url info in log.
    This PR is use another approach to do that, we use the paragraphId as the hive job yarn tag, after hive sql execution is started, we look at the yarn app that has this tag.
    
    ### What type of PR is it?
    [Improvement]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-5439
    
    ### How should this be tested?
    * CI pass
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zj...@apache.org>
    
    Closes #4166 from zjffdu/ZEPPELIN-5439 and squashes the following commits:
    
    c88c528299 [Jeff Zhang] [ZEPPELIN-5439] Improve the logic of extract hive job url in JdbcInterpreter
---
 jdbc/pom.xml                                       |  11 +-
 .../org/apache/zeppelin/jdbc/JDBCInterpreter.java  |  24 +++-
 .../org/apache/zeppelin/jdbc/hive/HiveUtils.java   | 145 +++++++++++++++------
 .../org/apache/zeppelin/jdbc/hive/YarnUtil.java    |  78 +++++++++++
 .../apache/zeppelin/jdbc/hive/HiveUtilsTest.java   |  12 ++
 5 files changed, 227 insertions(+), 43 deletions(-)

diff --git a/jdbc/pom.xml b/jdbc/pom.xml
index 24d861d..cdc3029 100644
--- a/jdbc/pom.xml
+++ b/jdbc/pom.xml
@@ -35,7 +35,7 @@
     <!--library versions-->
     <interpreter.name>jdbc</interpreter.name>
     <postgresql.version>9.4-1201-jdbc41</postgresql.version>
-    <hadoop.common.version>${hadoop2.7.version}</hadoop.common.version>
+    <hadoop.version>${hadoop2.7.version}</hadoop.version>
     <h2.version>1.4.190</h2.version>
     <commons.dbcp2.version>2.0.1</commons.dbcp2.version>
     <hive2.version>2.3.4</hive2.version>
@@ -85,8 +85,15 @@
 
     <dependency>
       <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-client</artifactId>
+      <version>${hadoop.version}</version>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-common</artifactId>
-      <version>${hadoop.common.version}</version>
+      <version>${hadoop.version}</version>
       <scope>provided</scope>
       <exclusions>
         <exclusion>
diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java
index 1882f1d..3af326d 100644
--- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java
+++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java
@@ -529,7 +529,8 @@ public class JDBCInterpreter extends KerberosInterpreter {
 
     final Properties properties = jdbcUserConfigurations.getPropertyMap(dbPrefix);
     String url = properties.getProperty(URL_KEY);
-    String connectionUrl = appendProxyUserToURL(url, user, dbPrefix);
+    url = appendProxyUserToURL(url, user, dbPrefix);
+    String connectionUrl = appendTagsToURL(url, context);
 
     String authType = getProperty("zeppelin.jdbc.auth.type", "SIMPLE")
             .trim().toUpperCase();
@@ -559,9 +560,10 @@ public class JDBCInterpreter extends KerberosInterpreter {
           }
 
           final String poolKey = dbPrefix;
+          final String finalUser = user;
           try {
             connection = ugi.doAs((PrivilegedExceptionAction<Connection>) () ->
-                    getConnectionFromPool(connectionUrl, user, poolKey, properties));
+                    getConnectionFromPool(connectionUrl, finalUser, poolKey, properties));
           } catch (Exception e) {
             LOGGER.error("Error in doAs", e);
             throw new InterpreterException("Error in doAs", e);
@@ -596,6 +598,24 @@ public class JDBCInterpreter extends KerberosInterpreter {
     return connectionUrl.toString();
   }
 
+  // only add tags for hive jdbc
+  private String appendTagsToURL(String url, InterpreterContext context) {
+    StringBuilder builder = new StringBuilder(url);
+    if (url.startsWith("jdbc:hive2:")) {
+      Integer lastIndexOfQMark = builder.indexOf("?");
+      if (lastIndexOfQMark == -1) {
+        builder.append("?");
+        lastIndexOfQMark = builder.length();
+      } else {
+        lastIndexOfQMark++;
+      }
+      builder.insert(lastIndexOfQMark, "mapreduce.job.tags=" + context.getParagraphId() + ";");
+      builder.insert(lastIndexOfQMark, "tez.application.tags=" + context.getParagraphId() + ";");
+    }
+    return builder.toString();
+  }
+
+
   private String getPassword(Properties properties) throws IOException, InterpreterException {
     if (isNotEmpty(properties.getProperty(PASSWORD_KEY))) {
       return properties.getProperty(PASSWORD_KEY);
diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/hive/HiveUtils.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/hive/HiveUtils.java
index 8930522..ddf35f3 100644
--- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/hive/HiveUtils.java
+++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/hive/HiveUtils.java
@@ -45,6 +45,9 @@ public class HiveUtils {
   private static final Pattern JOBURL_PATTERN =
           Pattern.compile(".*Tracking URL = (\\S*).*", Pattern.DOTALL);
 
+  private static final Pattern APPID_PATTERN =
+          Pattern.compile(".*with App id (\\S*)\\).*", Pattern.DOTALL);
+
   /**
    * Display hive job execution info, and progress info for hive >= 2.3
    *
@@ -68,14 +71,13 @@ public class HiveUtils {
     }
     // need to use final variable progressBar in thread, so need progressBarTemp here.
     final ProgressBar progressBar = progressBarTemp;
-    final long timeoutThreshold = Long.parseLong(
-            jdbcInterpreter.getProperty("zeppelin.jdbc.hive.timeout.threshold", "" + 60 * 1000));
     final long queryInterval = Long.parseLong(
-        jdbcInterpreter.getProperty("zeppelin.jdbc.hive.monitor.query_interval",
-            DEFAULT_QUERY_PROGRESS_INTERVAL + ""));
+            jdbcInterpreter.getProperty("zeppelin.jdbc.hive.monitor.query_interval",
+                    DEFAULT_QUERY_PROGRESS_INTERVAL + ""));
     Thread thread = new Thread(() -> {
-      boolean jobLaunched = false;
-      long jobLastActiveTime = System.currentTimeMillis();
+      String jobUrlTemplate = jdbcInterpreter.getProperty("zeppelin.jdbc.hive.jobUrl.template");
+      boolean jobUrlExtracted = false;
+
       try {
         while (hiveStmt.hasMoreLogs() && !hiveStmt.isClosed() && !Thread.interrupted()) {
           Thread.sleep(queryInterval);
@@ -90,40 +92,9 @@ public class HiveUtils {
           if (!StringUtils.isBlank(logsOutput) && progressBar != null && displayLogProperty) {
             progressBar.operationLogShowedToUser();
           }
-          Optional<String> jobURL = extractMRJobURL(logsOutput);
-          if (jobURL.isPresent()) {
-            Map<String, String> infos = new HashMap<>();
-            infos.put("jobUrl", jobURL.get());
-            infos.put("label", "HIVE JOB");
-            infos.put("tooltip", "View in YARN WEB UI");
-            infos.put("noteId", context.getNoteId());
-            infos.put("paraId", context.getParagraphId());
-            context.getIntpEventClient().onParaInfosReceived(infos);
-          }
-          if (logsOutput.contains("Launching Job")) {
-            jobLaunched = true;
-          }
 
-          if (jobLaunched) {
-            // Step 1. update jobLastActiveTime first
-            // Step 2. Check whether it is timeout.
-            if (StringUtils.isNotBlank(logsOutput)) {
-              jobLastActiveTime = System.currentTimeMillis();
-            } else if (progressBar.getBeelineInPlaceUpdateStream() != null &&
-                    progressBar.getBeelineInPlaceUpdateStream().getLastUpdateTimestamp()
-                            > jobLastActiveTime) {
-              jobLastActiveTime = progressBar.getBeelineInPlaceUpdateStream()
-                      .getLastUpdateTimestamp();
-            }
-
-            if (((System.currentTimeMillis() - jobLastActiveTime) > timeoutThreshold)) {
-              String errorMessage = "Cancel this job as no more log is produced in the " +
-                      "last " + timeoutThreshold / 1000 + " seconds, " +
-                      "maybe it is because no yarn resources";
-              LOGGER.warn(errorMessage);
-              jdbcInterpreter.cancel(context, errorMessage);
-              break;
-            }
+          if (!jobUrlExtracted) {
+            jobUrlExtracted = extractJobURL(logsOutput, jobUrlTemplate, context);
           }
         }
       } catch (InterruptedException e) {
@@ -149,6 +120,83 @@ public class HiveUtils {
     }
   }
 
+  /**
+   * Extract hive job url in the following order:
+   * 1. Extract job url from hive logs when hive use mr engine
+   * 2. Extract job url from hive logs when hive use tez engine
+   * 3. Extract job url via yarn tags when the above 2 methods doesn't work
+   * @param logsOutput
+   * @param jobUrlTemplate
+   * @param context
+   * @return
+   */
+  private static boolean extractJobURL(String logsOutput,
+                                       String jobUrlTemplate,
+                                       InterpreterContext context) {
+    String jobUrl = null;
+    Optional<String> mrJobURLOption = extractMRJobURL(logsOutput);
+    Optional<String> tezAppIdOption = extractTezAppId(logsOutput);
+    if (mrJobURLOption.isPresent()) {
+      jobUrl = mrJobURLOption.get();
+      LOGGER.info("Extract MR jobUrl: {} from logs", mrJobURLOption.get());
+      if (StringUtils.isNotBlank(jobUrlTemplate)) {
+        Optional<String> yarnAppId = extractYarnAppId(jobUrl);
+        if (yarnAppId.isPresent()) {
+          LOGGER.info("Extract yarn app id: {} from MR jobUrl", yarnAppId);
+          jobUrl = jobUrlTemplate.replace("{{applicationId}}", yarnAppId.get());
+        } else {
+          LOGGER.warn("Unable to extract yarn App Id from jobURL: {}", jobUrl);
+        }
+      }
+    } else if (tezAppIdOption.isPresent()) {
+      String yarnAppId = tezAppIdOption.get();
+      LOGGER.info("Extract Tez job yarn appId: {} from logs", yarnAppId);
+      if (StringUtils.isNotBlank(jobUrlTemplate)) {
+        jobUrl = jobUrlTemplate.replace("{{applicationId}}", yarnAppId);
+      } else {
+        LOGGER.warn("Unable to set JobUrl because zeppelin.jdbc.hive.jobUrl.template is not set");
+        jobUrl = yarnAppId;
+      }
+    } else {
+      if (isHadoopJarAvailable()) {
+        String yarnAppId = YarnUtil.getYarnAppIdByTag(context.getParagraphId());
+        if (StringUtils.isNotBlank(yarnAppId)) {
+          LOGGER.info("Extract yarn appId: {} by tag", yarnAppId);
+          if (StringUtils.isNotBlank(jobUrlTemplate)) {
+            jobUrl = jobUrlTemplate.replace("{{applicationId}}", yarnAppId);
+          } else {
+            jobUrl = yarnAppId;
+          }
+        }
+      } else {
+        LOGGER.warn("Hadoop jar is not available, unable to use tags to fetch yarn app url");
+      }
+    }
+
+    if (jobUrl != null) {
+      LOGGER.info("Detected hive jobUrl: {}", jobUrl);
+      Map<String, String> infos = new HashMap<>();
+      infos.put("jobUrl", jobUrl);
+      infos.put("label", "HIVE JOB");
+      infos.put("tooltip", "View in YARN WEB UI");
+      infos.put("noteId", context.getNoteId());
+      infos.put("paraId", context.getParagraphId());
+      context.getIntpEventClient().onParaInfosReceived(infos);
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  private static boolean isHadoopJarAvailable() {
+    try {
+      Class.forName("org.apache.hadoop.conf.Configuration");
+      return true;
+    } catch (ClassNotFoundException e) {
+      return false;
+    }
+  }
+
   // Hive progress bar is supported from hive 2.3 (HIVE-16045)
   private static boolean isProgressBarSupported(String hiveVersion) {
     String[] tokens = hiveVersion.split("\\.");
@@ -166,4 +214,23 @@ public class HiveUtils {
     }
     return Optional.empty();
   }
+
+  // extract yarn appId from logs, it only works for Tez engine.
+  static Optional<String> extractTezAppId(String log) {
+    Matcher matcher = APPID_PATTERN.matcher(log);
+    if (matcher.matches()) {
+      String appId = matcher.group(1);
+      return Optional.of(appId);
+    }
+    return Optional.empty();
+  }
+
+  // extract yarn appId from jobURL
+  static Optional<String> extractYarnAppId(String jobURL) {
+    int pos = jobURL.indexOf("application_");
+    if (pos != -1) {
+      return Optional.of(jobURL.substring(pos));
+    }
+    return Optional.empty();
+  }
 }
diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/hive/YarnUtil.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/hive/YarnUtil.java
new file mode 100644
index 0000000..87ca420
--- /dev/null
+++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/hive/YarnUtil.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
+ * agreements. See the NOTICE file distributed with this work for additional information regarding
+ * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License. You may obtain a
+ * copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
+ * or implied. See the License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.zeppelin.jdbc.hive;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Set;
+
+public class YarnUtil {
+
+  private static final Logger LOGGER = LoggerFactory.getLogger(YarnUtil.class);
+
+  private static YarnClient yarnClient;
+
+  static {
+    try {
+      yarnClient = YarnClient.createYarnClient();
+      YarnConfiguration yarnConf = new YarnConfiguration();
+      // disable timeline service as we only query yarn app here.
+      // Otherwise we may hit this kind of ERROR:
+      // java.lang.ClassNotFoundException: com.sun.jersey.api.client.config.ClientConfig
+      yarnConf.set("yarn.timeline-service.enabled", "false");
+      yarnClient.init(yarnConf);
+      yarnClient.start();
+    } catch (Exception e) {
+      LOGGER.warn("Fail to start yarnClient", e);
+    }
+  }
+
+
+  public static String getYarnAppIdByTag(String paragraphId) {
+    if (yarnClient == null) {
+      return null;
+    }
+    Set<String> applicationTypes = Sets.newHashSet("MAPREDUCE", "TEZ");
+    EnumSet<YarnApplicationState> yarnStates =
+            Sets.newEnumSet(Lists.newArrayList(YarnApplicationState.RUNNING),
+            YarnApplicationState.class);
+
+    try {
+      List<ApplicationReport> apps =
+              yarnClient.getApplications(applicationTypes, yarnStates);
+      for (ApplicationReport appReport : apps) {
+        if (appReport.getApplicationTags().contains(paragraphId)) {
+          return appReport.getApplicationId().toString();
+        }
+      }
+      return null;
+    } catch (YarnException | IOException e) {
+      LOGGER.warn("Fail to get yarn apps", e);
+      return null;
+    }
+  }
+}
diff --git a/jdbc/src/test/java/org/apache/zeppelin/jdbc/hive/HiveUtilsTest.java b/jdbc/src/test/java/org/apache/zeppelin/jdbc/hive/HiveUtilsTest.java
index 058f7eb..a3db8c8 100644
--- a/jdbc/src/test/java/org/apache/zeppelin/jdbc/hive/HiveUtilsTest.java
+++ b/jdbc/src/test/java/org/apache/zeppelin/jdbc/hive/HiveUtilsTest.java
@@ -38,4 +38,16 @@ public class HiveUtilsTest {
     assertTrue(jobURL.isPresent());
     assertEquals("http://localhost:8088/proxy/application_1591195707498_0064/", jobURL.get());
   }
+
+  @Test
+  public void testTezAppId() {
+    Optional<String> appId = HiveUtils.extractTezAppId(
+            "Query ID = hadoop_20210514105011_6f620c39-f557-4fc6-a899-ecd892be2652\n" +
+                    "Total jobs = 1\n" +
+                    "Launching Job 1 out of 1\n" +
+                    "Status: Running " +
+                    "(Executing on YARN cluster with App id application_1612885840821_260263)");
+    assertTrue(appId.isPresent());
+    assertEquals("application_1612885840821_260263", appId.get());
+  }
 }