You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2020/11/18 01:56:50 UTC

[zeppelin] branch master updated: [ZEPPELIN-5100]. Cancel paragraph when no sufficient resources for hive job

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new 0202273  [ZEPPELIN-5100]. Cancel paragraph when no sufficient resources for hive job
0202273 is described below

commit 02022732c8b73050576c1b21395755ffafe50c77
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Wed Oct 21 10:38:51 2020 +0800

    [ZEPPELIN-5100]. Cancel paragraph when no sufficient resources for hive job
    
    ### What is this PR for?
    
    This PR is to cancel paragraph is no sufficient resources for hive jobs in jdbc interpreter.
    
    ### What type of PR is it?
    [Improvement ]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-5100
    
    ### How should this be tested?
    * Manually tested
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zj...@apache.org>
    
    Closes #3948 from zjffdu/ZEPPELIN-5100 and squashes the following commits:
    
    f0542ee29 [Jeff Zhang] [ZEPPELIN-5100]. Cancel paragraph when no sufficient resources for hive job
---
 .../org/apache/zeppelin/jdbc/JDBCInterpreter.java  | 17 ++++++++++--
 .../org/apache/zeppelin/jdbc/hive/HiveUtils.java   | 31 +++++++++++++++++++---
 jdbc/src/main/resources/interpreter-setting.json   |  7 +++++
 .../apache/zeppelin/jdbc/hive/HiveUtilsTest.java   |  2 +-
 4 files changed, 50 insertions(+), 7 deletions(-)

diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java
index 5d792ec..e72481e 100644
--- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java
+++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java
@@ -125,6 +125,7 @@ public class JDBCInterpreter extends KerberosInterpreter {
   private static final char TAB = '\t';
   private static final String TABLE_MAGIC_TAG = "%table ";
   private static final String EXPLAIN_PREDICATE = "EXPLAIN ";
+  private static final String CANCEL_REASON = "cancel_reason";
 
   static final String COMMON_MAX_LINE = COMMON_KEY + DOT + MAX_LINE_KEY;
 
@@ -160,7 +161,6 @@ public class JDBCInterpreter extends KerberosInterpreter {
 
   private int maxLineResults;
   private int maxRows;
-
   private SqlSplitter sqlSplitter;
 
   private Map<String, ScheduledExecutorService> refreshExecutorServices = new HashMap<>();
@@ -735,7 +735,7 @@ public class JDBCInterpreter extends KerberosInterpreter {
           String jdbcURL = getJDBCConfiguration(user).getPropertyMap(dbPrefix).getProperty(URL_KEY);
           if (jdbcURL != null && jdbcURL.startsWith("jdbc:hive2://")) {
             HiveUtils.startHiveMonitorThread(statement, context,
-                    Boolean.parseBoolean(getProperty("hive.log.display", "true")));
+                    Boolean.parseBoolean(getProperty("hive.log.display", "true")), this);
           }
           boolean isResultSetAvailable = statement.execute(sqlToExecute);
           getJDBCConfiguration(user).setConnectionInDBDriverPoolSuccessful(dbPrefix);
@@ -932,8 +932,21 @@ public class JDBCInterpreter extends KerberosInterpreter {
     } catch (SQLException e) {
       LOGGER.error("Error while cancelling...", e);
     }
+
+    String cancelReason = context.getLocalProperties().get(CANCEL_REASON);
+    if (StringUtils.isNotBlank(cancelReason)) {
+      try {
+        context.out.write(cancelReason);
+      } catch (IOException e) {
+        LOGGER.error("Fail to write cancel reason");
+      }
+    }
   }
 
+  public void cancel(InterpreterContext context, String errorMessage) {
+    context.getLocalProperties().put(CANCEL_REASON, errorMessage);
+    cancel(context);
+  }
   /**
    *
    *
diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/hive/HiveUtils.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/hive/HiveUtils.java
index 58fc2f0..ad253f0 100644
--- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/hive/HiveUtils.java
+++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/hive/HiveUtils.java
@@ -20,6 +20,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.hive.common.util.HiveVersionInfo;
 import org.apache.hive.jdbc.HiveStatement;
 import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.jdbc.JDBCInterpreter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -53,7 +54,8 @@ public class HiveUtils {
    */
   public static void startHiveMonitorThread(Statement stmt,
                                             InterpreterContext context,
-                                            boolean displayLog) {
+                                            boolean displayLog,
+                                            JDBCInterpreter jdbcInterpreter) {
     HiveStatement hiveStmt = (HiveStatement)
             ((DelegatingStatement) ((DelegatingStatement) stmt).getDelegate()).getDelegate();
     String hiveVersion = HiveVersionInfo.getVersion();
@@ -66,8 +68,11 @@ public class HiveUtils {
     }
     // need to use final variable progressBar in thread, so need progressBarTemp here.
     final ProgressBar progressBar = progressBarTemp;
-
+    final long timeoutThreshold = Long.parseLong(
+            jdbcInterpreter.getProperty("zeppelin.jdbc.hive.timeout.threshold", "" + 60 * 1000));
     Thread thread = new Thread(() -> {
+      boolean jobLaunched = false;
+      long jobLastActiveTime = System.currentTimeMillis();
       while (hiveStmt.hasMoreLogs() && !Thread.interrupted()) {
         try {
           List<String> logs = hiveStmt.getQueryLog();
@@ -81,7 +86,7 @@ public class HiveUtils {
           if (!StringUtils.isBlank(logsOutput) && progressBar != null && displayLogProperty) {
             progressBar.operationLogShowedToUser();
           }
-          Optional<String> jobURL = extractJobURL(logsOutput);
+          Optional<String> jobURL = extractMRJobURL(logsOutput);
           if (jobURL.isPresent()) {
             Map<String, String> infos = new HashMap<>();
             infos.put("jobUrl", jobURL.get());
@@ -91,6 +96,24 @@ public class HiveUtils {
             infos.put("paraId", context.getParagraphId());
             context.getIntpEventClient().onParaInfosReceived(infos);
           }
+          if (logsOutput.contains("Launching Job")) {
+            jobLaunched = true;
+          }
+
+          if (jobLaunched) {
+            if (StringUtils.isNotBlank(logsOutput)) {
+              jobLastActiveTime = System.currentTimeMillis();
+            } else {
+              if (((System.currentTimeMillis() - jobLastActiveTime) > timeoutThreshold)) {
+                String errorMessage = "Cancel this job as no more log is produced in the " +
+                        "last " + timeoutThreshold / 1000 + " seconds, " +
+                        "maybe it is because no yarn resources";
+                LOGGER.warn(errorMessage);
+                jdbcInterpreter.cancel(context, errorMessage);
+                break;
+              }
+            }
+          }
           // refresh logs every 1 second.
           Thread.sleep(DEFAULT_QUERY_PROGRESS_INTERVAL);
         } catch (Exception e) {
@@ -118,7 +141,7 @@ public class HiveUtils {
   }
 
   // extract hive job url from logs, it only works for MR engine.
-  static Optional<String> extractJobURL(String log) {
+  static Optional<String> extractMRJobURL(String log) {
     Matcher matcher = JOBURL_PATTERN.matcher(log);
     if (matcher.matches()) {
       String jobURL = matcher.group(1);
diff --git a/jdbc/src/main/resources/interpreter-setting.json b/jdbc/src/main/resources/interpreter-setting.json
index 5aac46e..f203ac1 100644
--- a/jdbc/src/main/resources/interpreter-setting.json
+++ b/jdbc/src/main/resources/interpreter-setting.json
@@ -122,6 +122,13 @@
         "defaultValue": "1000",
         "description": "Maximum number of rows fetched from the query.",
         "type": "number"
+      },
+      "zeppelin.jdbc.hive.timeout.threshold": {
+        "envName": null,
+        "propertyName": "zeppelin.jdbc.hive.timeout.threshold",
+        "defaultValue": "60000",
+        "description": "Timeout for hive job timeout",
+        "type": "number"
       }
     },
     "editor": {
diff --git a/jdbc/src/test/java/org/apache/zeppelin/jdbc/hive/HiveUtilsTest.java b/jdbc/src/test/java/org/apache/zeppelin/jdbc/hive/HiveUtilsTest.java
index f0f6269..058f7eb 100644
--- a/jdbc/src/test/java/org/apache/zeppelin/jdbc/hive/HiveUtilsTest.java
+++ b/jdbc/src/test/java/org/apache/zeppelin/jdbc/hive/HiveUtilsTest.java
@@ -28,7 +28,7 @@ public class HiveUtilsTest {
 
   @Test
   public void testJobURL() {
-    Optional<String> jobURL = HiveUtils.extractJobURL(
+    Optional<String> jobURL = HiveUtils.extractMRJobURL(
             "INFO  : The url to track the job: " +
             "http://localhost:8088/proxy/application_1591195707498_0064/\n" +
             "INFO  : Starting Job = job_1591195707498_0064, " +