You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2020/11/29 12:01:17 UTC
[zeppelin] branch master updated: [ZEPPELIN-5146]. Query was
cancelled incorrectly for hive on tez
This is an automated email from the ASF dual-hosted git repository.
zjffdu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/master by this push:
new fc36cbc [ZEPPELIN-5146]. Query was cancelled incorrectly for hive on tez
fc36cbc is described below
commit fc36cbc425fab30f5bba94db729f3c09fc31a730
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Fri Nov 27 14:12:29 2020 +0800
[ZEPPELIN-5146]. Query was cancelled incorrectly for hive on tez
### What is this PR for?
The root cause is that for hive on tez, log is updated via BeelineInPlaceUpdateStream instead of HiveStatement.getQueryLog().
This PR would check BeelineInPlaceUpdateStream to update the `jobLastActiveTime`
### What type of PR is it?
[Bug Fix |]
### Todos
* [ ] - Task
### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-5146
### How should this be tested?
* CI pass
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Jeff Zhang <zj...@apache.org>
Closes #3984 from zjffdu/ZEPPELIN-5146 and squashes the following commits:
4e4168e93 [Jeff Zhang] save
f6f021c28 [Jeff Zhang] [ZEPPELIN-5146]. Query was cancelled incorrectly for hive on tez
---
.../jdbc/hive/BeelineInPlaceUpdateStream.java | 6 ++++++
.../org/apache/zeppelin/jdbc/hive/HiveUtils.java | 25 ++++++++++++++--------
.../org/apache/zeppelin/jdbc/hive/ProgressBar.java | 8 ++++++-
3 files changed, 29 insertions(+), 10 deletions(-)
diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/hive/BeelineInPlaceUpdateStream.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/hive/BeelineInPlaceUpdateStream.java
index cd7cbd1..c4367b1 100644
--- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/hive/BeelineInPlaceUpdateStream.java
+++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/hive/BeelineInPlaceUpdateStream.java
@@ -35,6 +35,7 @@ public class BeelineInPlaceUpdateStream implements InPlaceUpdateStream {
private InPlaceUpdate inPlaceUpdate;
private EventNotifier notifier;
+ private long lastUpdateTimestamp;
public BeelineInPlaceUpdateStream(PrintStream out,
InPlaceUpdateStream.EventNotifier notifier) {
@@ -59,11 +60,16 @@ public class BeelineInPlaceUpdateStream implements InPlaceUpdateStream {
etc. have to remove these notifiers when the operation logs get merged into
GetOperationStatus
*/
+ lastUpdateTimestamp = System.currentTimeMillis();
LOGGER.info("update progress: " + response.getProgressedPercentage());
inPlaceUpdate.render(new ProgressMonitorWrapper(response));
}
}
+ public long getLastUpdateTimestamp() {
+ return lastUpdateTimestamp;
+ }
+
@Override
public EventNotifier getEventNotifier() {
return notifier;
diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/hive/HiveUtils.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/hive/HiveUtils.java
index c7d47b9..026cbb8 100644
--- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/hive/HiveUtils.java
+++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/hive/HiveUtils.java
@@ -102,17 +102,24 @@ public class HiveUtils {
}
if (jobLaunched) {
+ // Step 1. update jobLastActiveTime first
+ // Step 2. Check whether it is timeout.
if (StringUtils.isNotBlank(logsOutput)) {
jobLastActiveTime = System.currentTimeMillis();
- } else {
- if (((System.currentTimeMillis() - jobLastActiveTime) > timeoutThreshold)) {
- String errorMessage = "Cancel this job as no more log is produced in the " +
- "last " + timeoutThreshold / 1000 + " seconds, " +
- "maybe it is because no yarn resources";
- LOGGER.warn(errorMessage);
- jdbcInterpreter.cancel(context, errorMessage);
- break;
- }
+ } else if (progressBar.getBeelineInPlaceUpdateStream() != null &&
+ progressBar.getBeelineInPlaceUpdateStream().getLastUpdateTimestamp()
+ > jobLastActiveTime) {
+ jobLastActiveTime = progressBar.getBeelineInPlaceUpdateStream()
+ .getLastUpdateTimestamp();
+ }
+
+ if (((System.currentTimeMillis() - jobLastActiveTime) > timeoutThreshold)) {
+ String errorMessage = "Cancel this job as no more log is produced in the " +
+ "last " + timeoutThreshold / 1000 + " seconds, " +
+ "maybe it is because no yarn resources";
+ LOGGER.warn(errorMessage);
+ jdbcInterpreter.cancel(context, errorMessage);
+ break;
}
}
// refresh logs every 1 second.
diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/hive/ProgressBar.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/hive/ProgressBar.java
index 577a1f1..adf2846 100644
--- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/hive/ProgressBar.java
+++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/hive/ProgressBar.java
@@ -27,6 +27,7 @@ import java.io.PrintStream;
*/
public class ProgressBar {
private InPlaceUpdateStream.EventNotifier eventNotifier;
+ private BeelineInPlaceUpdateStream beelineInPlaceUpdateStream;
public ProgressBar() {
this.eventNotifier = new InPlaceUpdateStream.EventNotifier();
@@ -37,10 +38,15 @@ public class ProgressBar {
}
public BeelineInPlaceUpdateStream getInPlaceUpdateStream(OutputStream out) {
- return new BeelineInPlaceUpdateStream(
+ beelineInPlaceUpdateStream = new BeelineInPlaceUpdateStream(
new PrintStream(out),
eventNotifier
);
+ return beelineInPlaceUpdateStream;
+ }
+
+ public BeelineInPlaceUpdateStream getBeelineInPlaceUpdateStream() {
+ return beelineInPlaceUpdateStream;
}
public void setInPlaceUpdateStream(HiveStatement hiveStmt, OutputStream out){