You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zeppelin.apache.org by zj...@apache.org on 2020/11/18 01:57:35 UTC
[zeppelin] branch branch-0.9 updated: [ZEPPELIN-5100]. Cancel
paragraph when no sufficient resources for hive job
This is an automated email from the ASF dual-hosted git repository.
zjffdu pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/zeppelin.git
The following commit(s) were added to refs/heads/branch-0.9 by this push:
new 3fbc9d7 [ZEPPELIN-5100]. Cancel paragraph when no sufficient resources for hive job
3fbc9d7 is described below
commit 3fbc9d744dfeaa0ba137c6b033037229703211c7
Author: Jeff Zhang <zj...@apache.org>
AuthorDate: Wed Oct 21 10:38:51 2020 +0800
[ZEPPELIN-5100]. Cancel paragraph when no sufficient resources for hive job
### What is this PR for?
This PR is to cancel paragraph is no sufficient resources for hive jobs in jdbc interpreter.
### What type of PR is it?
[Improvement ]
### Todos
* [ ] - Task
### What is the Jira issue?
* https://issues.apache.org/jira/browse/ZEPPELIN-5100
### How should this be tested?
* Manually tested
### Screenshots (if appropriate)
### Questions:
* Does the licenses files need update? No
* Is there breaking changes for older versions? No
* Does this needs documentation? No
Author: Jeff Zhang <zj...@apache.org>
Closes #3948 from zjffdu/ZEPPELIN-5100 and squashes the following commits:
f0542ee29 [Jeff Zhang] [ZEPPELIN-5100]. Cancel paragraph when no sufficient resources for hive job
(cherry picked from commit 02022732c8b73050576c1b21395755ffafe50c77)
Signed-off-by: Jeff Zhang <zj...@apache.org>
---
.../org/apache/zeppelin/jdbc/JDBCInterpreter.java | 17 ++++++++++--
.../org/apache/zeppelin/jdbc/hive/HiveUtils.java | 31 +++++++++++++++++++---
jdbc/src/main/resources/interpreter-setting.json | 7 +++++
.../apache/zeppelin/jdbc/hive/HiveUtilsTest.java | 2 +-
4 files changed, 50 insertions(+), 7 deletions(-)
diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java
index 5d792ec..e72481e 100644
--- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java
+++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/JDBCInterpreter.java
@@ -125,6 +125,7 @@ public class JDBCInterpreter extends KerberosInterpreter {
private static final char TAB = '\t';
private static final String TABLE_MAGIC_TAG = "%table ";
private static final String EXPLAIN_PREDICATE = "EXPLAIN ";
+ private static final String CANCEL_REASON = "cancel_reason";
static final String COMMON_MAX_LINE = COMMON_KEY + DOT + MAX_LINE_KEY;
@@ -160,7 +161,6 @@ public class JDBCInterpreter extends KerberosInterpreter {
private int maxLineResults;
private int maxRows;
-
private SqlSplitter sqlSplitter;
private Map<String, ScheduledExecutorService> refreshExecutorServices = new HashMap<>();
@@ -735,7 +735,7 @@ public class JDBCInterpreter extends KerberosInterpreter {
String jdbcURL = getJDBCConfiguration(user).getPropertyMap(dbPrefix).getProperty(URL_KEY);
if (jdbcURL != null && jdbcURL.startsWith("jdbc:hive2://")) {
HiveUtils.startHiveMonitorThread(statement, context,
- Boolean.parseBoolean(getProperty("hive.log.display", "true")));
+ Boolean.parseBoolean(getProperty("hive.log.display", "true")), this);
}
boolean isResultSetAvailable = statement.execute(sqlToExecute);
getJDBCConfiguration(user).setConnectionInDBDriverPoolSuccessful(dbPrefix);
@@ -932,8 +932,21 @@ public class JDBCInterpreter extends KerberosInterpreter {
} catch (SQLException e) {
LOGGER.error("Error while cancelling...", e);
}
+
+ String cancelReason = context.getLocalProperties().get(CANCEL_REASON);
+ if (StringUtils.isNotBlank(cancelReason)) {
+ try {
+ context.out.write(cancelReason);
+ } catch (IOException e) {
+ LOGGER.error("Fail to write cancel reason");
+ }
+ }
}
+ public void cancel(InterpreterContext context, String errorMessage) {
+ context.getLocalProperties().put(CANCEL_REASON, errorMessage);
+ cancel(context);
+ }
/**
*
*
diff --git a/jdbc/src/main/java/org/apache/zeppelin/jdbc/hive/HiveUtils.java b/jdbc/src/main/java/org/apache/zeppelin/jdbc/hive/HiveUtils.java
index 58fc2f0..ad253f0 100644
--- a/jdbc/src/main/java/org/apache/zeppelin/jdbc/hive/HiveUtils.java
+++ b/jdbc/src/main/java/org/apache/zeppelin/jdbc/hive/HiveUtils.java
@@ -20,6 +20,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.hive.common.util.HiveVersionInfo;
import org.apache.hive.jdbc.HiveStatement;
import org.apache.zeppelin.interpreter.InterpreterContext;
+import org.apache.zeppelin.jdbc.JDBCInterpreter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,7 +54,8 @@ public class HiveUtils {
*/
public static void startHiveMonitorThread(Statement stmt,
InterpreterContext context,
- boolean displayLog) {
+ boolean displayLog,
+ JDBCInterpreter jdbcInterpreter) {
HiveStatement hiveStmt = (HiveStatement)
((DelegatingStatement) ((DelegatingStatement) stmt).getDelegate()).getDelegate();
String hiveVersion = HiveVersionInfo.getVersion();
@@ -66,8 +68,11 @@ public class HiveUtils {
}
// need to use final variable progressBar in thread, so need progressBarTemp here.
final ProgressBar progressBar = progressBarTemp;
-
+ final long timeoutThreshold = Long.parseLong(
+ jdbcInterpreter.getProperty("zeppelin.jdbc.hive.timeout.threshold", "" + 60 * 1000));
Thread thread = new Thread(() -> {
+ boolean jobLaunched = false;
+ long jobLastActiveTime = System.currentTimeMillis();
while (hiveStmt.hasMoreLogs() && !Thread.interrupted()) {
try {
List<String> logs = hiveStmt.getQueryLog();
@@ -81,7 +86,7 @@ public class HiveUtils {
if (!StringUtils.isBlank(logsOutput) && progressBar != null && displayLogProperty) {
progressBar.operationLogShowedToUser();
}
- Optional<String> jobURL = extractJobURL(logsOutput);
+ Optional<String> jobURL = extractMRJobURL(logsOutput);
if (jobURL.isPresent()) {
Map<String, String> infos = new HashMap<>();
infos.put("jobUrl", jobURL.get());
@@ -91,6 +96,24 @@ public class HiveUtils {
infos.put("paraId", context.getParagraphId());
context.getIntpEventClient().onParaInfosReceived(infos);
}
+ if (logsOutput.contains("Launching Job")) {
+ jobLaunched = true;
+ }
+
+ if (jobLaunched) {
+ if (StringUtils.isNotBlank(logsOutput)) {
+ jobLastActiveTime = System.currentTimeMillis();
+ } else {
+ if (((System.currentTimeMillis() - jobLastActiveTime) > timeoutThreshold)) {
+ String errorMessage = "Cancel this job as no more log is produced in the " +
+ "last " + timeoutThreshold / 1000 + " seconds, " +
+ "maybe it is because no yarn resources";
+ LOGGER.warn(errorMessage);
+ jdbcInterpreter.cancel(context, errorMessage);
+ break;
+ }
+ }
+ }
// refresh logs every 1 second.
Thread.sleep(DEFAULT_QUERY_PROGRESS_INTERVAL);
} catch (Exception e) {
@@ -118,7 +141,7 @@ public class HiveUtils {
}
// extract hive job url from logs, it only works for MR engine.
- static Optional<String> extractJobURL(String log) {
+ static Optional<String> extractMRJobURL(String log) {
Matcher matcher = JOBURL_PATTERN.matcher(log);
if (matcher.matches()) {
String jobURL = matcher.group(1);
diff --git a/jdbc/src/main/resources/interpreter-setting.json b/jdbc/src/main/resources/interpreter-setting.json
index 5aac46e..f203ac1 100644
--- a/jdbc/src/main/resources/interpreter-setting.json
+++ b/jdbc/src/main/resources/interpreter-setting.json
@@ -122,6 +122,13 @@
"defaultValue": "1000",
"description": "Maximum number of rows fetched from the query.",
"type": "number"
+ },
+ "zeppelin.jdbc.hive.timeout.threshold": {
+ "envName": null,
+ "propertyName": "zeppelin.jdbc.hive.timeout.threshold",
+ "defaultValue": "60000",
+ "description": "Timeout for hive job timeout",
+ "type": "number"
}
},
"editor": {
diff --git a/jdbc/src/test/java/org/apache/zeppelin/jdbc/hive/HiveUtilsTest.java b/jdbc/src/test/java/org/apache/zeppelin/jdbc/hive/HiveUtilsTest.java
index f0f6269..058f7eb 100644
--- a/jdbc/src/test/java/org/apache/zeppelin/jdbc/hive/HiveUtilsTest.java
+++ b/jdbc/src/test/java/org/apache/zeppelin/jdbc/hive/HiveUtilsTest.java
@@ -28,7 +28,7 @@ public class HiveUtilsTest {
@Test
public void testJobURL() {
- Optional<String> jobURL = HiveUtils.extractJobURL(
+ Optional<String> jobURL = HiveUtils.extractMRJobURL(
"INFO : The url to track the job: " +
"http://localhost:8088/proxy/application_1591195707498_0064/\n" +
"INFO : Starting Job = job_1591195707498_0064, " +