You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2020/11/29 12:01:17 UTC

[zeppelin] branch master updated: [ZEPPELIN-5146]. Query was cancelled incorrectly for hive on tez

This is an automated email from the ASF dual-hosted git repository.

zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git


The following commit(s) were added to refs/heads/master by this push:
     new fc36cbc   [ZEPPELIN-5146]. Query was cancelled incorrectly for hive on tez
fc36cbc is described below

commit fc36cbc425fab30f5bba94db729f3c09fc31a730
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Fri Nov 27 14:12:29 2020 +0800

     [ZEPPELIN-5146]. Query was cancelled incorrectly for hive on tez
    
    ### What is this PR for?
    
    The root cause is that for hive on tez, log is updated via BeelineInPlaceUpdateStream instead of HiveStatement.getQueryLog().
    This PR would check BeelineInPlaceUpdateStream to update the `jobLastActiveTime`
    
    ### What type of PR is it?
    [Bug Fix |]
    
    ### Todos
    * [ ] - Task
    
    ### What is the Jira issue?
    * https://issues.apache.org/jira/browse/ZEPPELIN-5146
    
    ### How should this be tested?
    * CI pass
    
    ### Screenshots (if appropriate)
    
    ### Questions:
    * Does the licenses files need update? No
    * Is there breaking changes for older versions? No
    * Does this needs documentation? No
    
    Author: Jeff Zhang <zj...@apache.org>
    
    Closes #3984 from zjffdu/ZEPPELIN-5146 and squashes the following commits:
    
    4e4168e93 [Jeff Zhang] save
    f6f021c28 [Jeff Zhang] [ZEPPELIN-5146]. Query was cancelled incorrectly for hive on tez
---
 .../jdbc/hive/BeelineInPlaceUpdateStream.java      |  6 ++++++
 .../org/apache/zeppelin/jdbc/hive/HiveUtils.java   | 25 ++++++++++++++--------
 .../org/apache/zeppelin/jdbc/hive/ProgressBar.java |  8 ++++++-
 3 files changed, 29 insertions(+), 10 deletions(-)

diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/hive/BeelineInPlaceUpdateStream.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/hive/BeelineInPlaceUpdateStream.java
index cd7cbd1..c4367b1 100644
--- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/hive/BeelineInPlaceUpdateStream.java
+++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/hive/BeelineInPlaceUpdateStream.java
@@ -35,6 +35,7 @@ public class BeelineInPlaceUpdateStream implements InPlaceUpdateStream {
 
   private InPlaceUpdate inPlaceUpdate;
   private EventNotifier notifier;
+  private long lastUpdateTimestamp;
 
   public BeelineInPlaceUpdateStream(PrintStream out,
                                     InPlaceUpdateStream.EventNotifier notifier) {
@@ -59,11 +60,16 @@ public class BeelineInPlaceUpdateStream implements InPlaceUpdateStream {
         etc. have to remove these notifiers when the operation logs get merged into
         GetOperationStatus
       */
+      lastUpdateTimestamp = System.currentTimeMillis();
       LOGGER.info("update progress: " + response.getProgressedPercentage());
       inPlaceUpdate.render(new ProgressMonitorWrapper(response));
     }
   }
 
+  public long getLastUpdateTimestamp() {
+    return lastUpdateTimestamp;
+  }
+
   @Override
   public EventNotifier getEventNotifier() {
     return notifier;
diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/hive/HiveUtils.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/hive/HiveUtils.java
index c7d47b9..026cbb8 100644
--- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/hive/HiveUtils.java
+++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/hive/HiveUtils.java
@@ -102,17 +102,24 @@ public class HiveUtils {
           }
 
           if (jobLaunched) {
+            // Step 1. update jobLastActiveTime first
+            // Step 2. Check whether it is timeout.
             if (StringUtils.isNotBlank(logsOutput)) {
               jobLastActiveTime = System.currentTimeMillis();
-            } else {
-              if (((System.currentTimeMillis() - jobLastActiveTime) > timeoutThreshold)) {
-                String errorMessage = "Cancel this job as no more log is produced in the " +
-                        "last " + timeoutThreshold / 1000 + " seconds, " +
-                        "maybe it is because no yarn resources";
-                LOGGER.warn(errorMessage);
-                jdbcInterpreter.cancel(context, errorMessage);
-                break;
-              }
+            } else if (progressBar.getBeelineInPlaceUpdateStream() != null &&
+                    progressBar.getBeelineInPlaceUpdateStream().getLastUpdateTimestamp()
+                            > jobLastActiveTime) {
+              jobLastActiveTime = progressBar.getBeelineInPlaceUpdateStream()
+                      .getLastUpdateTimestamp();
+            }
+
+            if (((System.currentTimeMillis() - jobLastActiveTime) > timeoutThreshold)) {
+              String errorMessage = "Cancel this job as no more log is produced in the " +
+                      "last " + timeoutThreshold / 1000 + " seconds, " +
+                      "maybe it is because no yarn resources";
+              LOGGER.warn(errorMessage);
+              jdbcInterpreter.cancel(context, errorMessage);
+              break;
             }
           }
           // refresh logs every 1 second.
diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/hive/ProgressBar.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/hive/ProgressBar.java
index 577a1f1..adf2846 100644
--- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/hive/ProgressBar.java
+++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/hive/ProgressBar.java
@@ -27,6 +27,7 @@ import java.io.PrintStream;
  */
 public class ProgressBar {
   private InPlaceUpdateStream.EventNotifier eventNotifier;
+  private BeelineInPlaceUpdateStream beelineInPlaceUpdateStream;
 
   public ProgressBar() {
     this.eventNotifier = new InPlaceUpdateStream.EventNotifier();
@@ -37,10 +38,15 @@ public class ProgressBar {
   }
 
   public BeelineInPlaceUpdateStream getInPlaceUpdateStream(OutputStream out) {
-    return new BeelineInPlaceUpdateStream(
+    beelineInPlaceUpdateStream = new BeelineInPlaceUpdateStream(
             new PrintStream(out),
             eventNotifier
     );
+    return beelineInPlaceUpdateStream;
+  }
+
+  public BeelineInPlaceUpdateStream getBeelineInPlaceUpdateStream() {
+    return beelineInPlaceUpdateStream;
   }
 
   public void setInPlaceUpdateStream(HiveStatement hiveStmt, OutputStream out){