You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by st...@apache.org on 2018/07/02 18:37:45 UTC
hive git commit: HIVE-18916: SparkClientImpl doesn't error out if
spark-submit fails (Sahil Takiar, reviewed by Aihua Xu)
Repository: hive
Updated Branches:
refs/heads/master 88da0e8b7 -> e19b861cf
HIVE-18916: SparkClientImpl doesn't error out if spark-submit fails (Sahil Takiar, reviewed by Aihua Xu)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/e19b861c
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/e19b861c
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/e19b861c
Branch: refs/heads/master
Commit: e19b861cfbcb15166f9255f8b375ff5d8056b417
Parents: 88da0e8
Author: Sahil Takiar <ta...@gmail.com>
Authored: Mon Jul 2 11:30:14 2018 -0700
Committer: Sahil Takiar <st...@cloudera.com>
Committed: Mon Jul 2 11:30:14 2018 -0700
----------------------------------------------------------------------
.../test/resources/testconfiguration.properties | 4 +-
.../apache/hadoop/hive/ql/QOutProcessor.java | 2 +
.../hadoop/hive/ql/exec/spark/SparkTask.java | 15 ++--
.../ql/exec/spark/session/SparkSessionImpl.java | 10 ++-
.../spark/status/RemoteSparkJobMonitor.java | 1 -
.../exec/spark/TestSparkInvalidFileFormat.java | 81 ++++++++++++++++++++
.../spark_submit_negative_executor_cores.q | 5 ++
.../spark_submit_negative_executor_memory.q | 5 ++
.../spark_submit_negative_executor_cores.q.out | 5 ++
.../spark_submit_negative_executor_memory.q.out | 5 ++
.../hive/spark/client/AbstractSparkClient.java | 25 +++---
.../spark/client/SparkSubmitSparkClient.java | 22 +++---
.../apache/hive/spark/client/rpc/RpcServer.java | 21 +++--
13 files changed, 159 insertions(+), 42 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/e19b861c/itests/src/test/resources/testconfiguration.properties
----------------------------------------------------------------------
diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties
index d415b7d..385b71e 100644
--- a/itests/src/test/resources/testconfiguration.properties
+++ b/itests/src/test/resources/testconfiguration.properties
@@ -1684,7 +1684,9 @@ spark.query.negative.files=groupby2_map_skew_multi_distinct.q,\
spark.only.query.negative.files=spark_job_max_tasks.q,\
spark_stage_max_tasks.q,\
- spark_task_failure.q
+ spark_task_failure.q,\
+ spark_submit_negative_executor_cores.q,\
+ spark_submit_negative_executor_memory.q
spark.perf.disabled.query.files=query14.q,\
query64.q
http://git-wip-us.apache.org/repos/asf/hive/blob/e19b861c/itests/util/src/main/java/org/apache/hadoop/hive/ql/QOutProcessor.java
----------------------------------------------------------------------
diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QOutProcessor.java b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QOutProcessor.java
index 359f027..1e4cddd 100644
--- a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QOutProcessor.java
+++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QOutProcessor.java
@@ -273,6 +273,8 @@ public class QOutProcessor {
ppm.add(new PatternReplacementPair(Pattern.compile("attempt_[0-9_]+"), "attempt_#ID#"));
ppm.add(new PatternReplacementPair(Pattern.compile("vertex_[0-9_]+"), "vertex_#ID#"));
ppm.add(new PatternReplacementPair(Pattern.compile("task_[0-9_]+"), "task_#ID#"));
+ ppm.add(new PatternReplacementPair(Pattern.compile("for Spark session.*?:"),
+ "#SPARK_SESSION_ID#:"));
partialPlanMask = ppm.toArray(new PatternReplacementPair[ppm.size()]);
}
/* This list may be modified by specific cli drivers to mask strings that change on every test */
http://git-wip-us.apache.org/repos/asf/hive/blob/e19b861c/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
index 02613f2..ad5049a 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkTask.java
@@ -180,7 +180,7 @@ public class SparkTask extends Task<SparkWork> {
killJob();
} else if (rc == 4) {
LOG.info("The Spark job or one stage of it has too many tasks" +
- ". Cancelling Spark job " + sparkJobID + " with application ID " + jobID );
+ ". Cancelling Spark job " + sparkJobID + " with application ID " + jobID);
killJob();
}
@@ -189,12 +189,7 @@ public class SparkTask extends Task<SparkWork> {
}
sparkJobStatus.cleanup();
} catch (Exception e) {
- String msg = "Failed to execute Spark task " + getId() + ", with exception '" + Utilities.getNameMessage(e) + "'";
-
- // Has to use full name to make sure it does not conflict with
- // org.apache.commons.lang.StringUtils
- console.printError(msg, "\n" + org.apache.hadoop.util.StringUtils.stringifyException(e));
- LOG.error(msg, e);
+ LOG.error("Failed to execute Spark task \"" + getId() + "\"", e);
setException(e);
if (e instanceof HiveException) {
HiveException he = (HiveException) e;
@@ -609,7 +604,7 @@ public class SparkTask extends Task<SparkWork> {
private boolean isTaskFailure(Throwable error) {
Pattern taskFailedPattern = Pattern.compile("Task.*in stage.*failed.*times");
while (error != null) {
- if (taskFailedPattern.matcher(error.getMessage()).find()) {
+ if (error.getMessage() != null && taskFailedPattern.matcher(error.getMessage()).find()) {
return true;
}
error = error.getCause();
@@ -621,8 +616,8 @@ public class SparkTask extends Task<SparkWork> {
while (error != null) {
if (error instanceof OutOfMemoryError) {
return true;
- } else if (error.getMessage().contains("Container killed by YARN for exceeding memory " +
- "limits")) {
+ } else if (error.getMessage() != null && error.getMessage().contains("Container killed by " +
+ "YARN for exceeding memory limits")) {
return true;
}
error = error.getCause();
http://git-wip-us.apache.org/repos/asf/hive/blob/e19b861c/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
index c8cb1ac..6e37d93 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/session/SparkSessionImpl.java
@@ -214,13 +214,19 @@ public class SparkSessionImpl implements SparkSession {
return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_INVALID_RESOURCE_REQUEST,
sessionId, matchedString.toString());
} else {
- return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_ERROR, sessionId, Throwables.getRootCause(e).getMessage());
+ return new HiveException(e, ErrorMsg.SPARK_CREATE_CLIENT_ERROR, sessionId,
+ getRootCause(oe));
}
}
e = e.getCause();
}
- return new HiveException(oe, ErrorMsg.SPARK_CREATE_CLIENT_ERROR, sessionId, Throwables.getRootCause(oe).getMessage());
+ return new HiveException(oe, ErrorMsg.SPARK_CREATE_CLIENT_ERROR, sessionId, getRootCause(oe));
+ }
+
+ private String getRootCause(Throwable e) {
+ Throwable rootCause = Throwables.getRootCause(e);
+ return rootCause.getClass().getName() + ": " + rootCause.getMessage();
}
private boolean matches(String input, String regex, StringBuilder matchedString) {
http://git-wip-us.apache.org/repos/asf/hive/blob/e19b861c/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
index 004b50b..560fb58 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/status/RemoteSparkJobMonitor.java
@@ -76,7 +76,6 @@ public class RemoteSparkJobMonitor extends SparkJobMonitor {
if ((timeCount > monitorTimeoutInterval)) {
HiveException he = new HiveException(ErrorMsg.SPARK_JOB_MONITOR_TIMEOUT,
Long.toString(timeCount));
- console.printError(he.getMessage());
sparkJobStatus.setMonitorError(he);
running = false;
done = true;
http://git-wip-us.apache.org/repos/asf/hive/blob/e19b861c/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkInvalidFileFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkInvalidFileFormat.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkInvalidFileFormat.java
new file mode 100644
index 0000000..bcc0924
--- /dev/null
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/spark/TestSparkInvalidFileFormat.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.exec.spark;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.DriverFactory;
+import org.apache.hadoop.hive.ql.IDriver;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory;
+import org.apache.hadoop.hive.ql.session.SessionState;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.IOException;
+
+public class TestSparkInvalidFileFormat {
+
+ @Test
+ public void readTextFileAsParquet() throws IOException {
+ HiveConf conf = new HiveConf();
+ conf.setVar(HiveConf.ConfVars.HIVE_AUTHORIZATION_MANAGER,
+ SQLStdHiveAuthorizerFactory.class.getName());
+ conf.setBoolVar(HiveConf.ConfVars.HIVE_SUPPORT_CONCURRENCY, false);
+ conf.setVar(HiveConf.ConfVars.HIVE_EXECUTION_ENGINE, "spark");
+ conf.set("spark.master", "local");
+
+ FileSystem fs = FileSystem.getLocal(conf);
+ Path tmpDir = new Path("TestSparkInvalidFileFormat-tmp");
+
+ File testFile = new File(conf.get("test.data.files"), "kv1.txt");
+
+ SessionState.start(conf);
+
+ IDriver driver = null;
+
+ try {
+ driver = DriverFactory.newDriver(conf);
+ Assert.assertEquals(0,
+ driver.run("CREATE TABLE test_table (key STRING, value STRING)").getResponseCode());
+ Assert.assertEquals(0, driver.run(
+ "LOAD DATA LOCAL INPATH '" + testFile + "' INTO TABLE test_table").getResponseCode());
+ Assert.assertEquals(0,
+ driver.run("ALTER TABLE test_table SET FILEFORMAT parquet").getResponseCode());
+ Throwable exception = driver.run(
+ "SELECT * FROM test_table ORDER BY key LIMIT 10").getException();
+ Assert.assertTrue(exception instanceof HiveException);
+ Assert.assertTrue(exception.getMessage().contains("Spark job failed due to task failures"));
+ Assert.assertTrue(exception.getMessage().contains("kv1.txt is not a Parquet file. expected " +
+ "magic number at tail [80, 65, 82, 49] but found [95, 57, 55, 10]"));
+ } finally {
+ if (driver != null) {
+ Assert.assertEquals(0, driver.run("DROP TABLE IF EXISTS test_table").getResponseCode());
+ driver.destroy();
+ }
+ if (fs.exists(tmpDir)) {
+ fs.delete(tmpDir, true);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hive/blob/e19b861c/ql/src/test/queries/clientnegative/spark_submit_negative_executor_cores.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientnegative/spark_submit_negative_executor_cores.q b/ql/src/test/queries/clientnegative/spark_submit_negative_executor_cores.q
new file mode 100644
index 0000000..5a92390
--- /dev/null
+++ b/ql/src/test/queries/clientnegative/spark_submit_negative_executor_cores.q
@@ -0,0 +1,5 @@
+--! qt:dataset:src
+
+set spark.executor.cores=-1;
+
+select * from src order by key limit 10;
http://git-wip-us.apache.org/repos/asf/hive/blob/e19b861c/ql/src/test/queries/clientnegative/spark_submit_negative_executor_memory.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientnegative/spark_submit_negative_executor_memory.q b/ql/src/test/queries/clientnegative/spark_submit_negative_executor_memory.q
new file mode 100644
index 0000000..55bc4b8
--- /dev/null
+++ b/ql/src/test/queries/clientnegative/spark_submit_negative_executor_memory.q
@@ -0,0 +1,5 @@
+--! qt:dataset:src
+
+set spark.executor.memory=-1;
+
+select * from src order by key limit 10;
http://git-wip-us.apache.org/repos/asf/hive/blob/e19b861c/ql/src/test/results/clientnegative/spark/spark_submit_negative_executor_cores.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientnegative/spark/spark_submit_negative_executor_cores.q.out b/ql/src/test/results/clientnegative/spark/spark_submit_negative_executor_cores.q.out
new file mode 100644
index 0000000..47ac8b2
--- /dev/null
+++ b/ql/src/test/results/clientnegative/spark/spark_submit_negative_executor_cores.q.out
@@ -0,0 +1,5 @@
+PREHOOK: query: select * from src order by key limit 10
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+#### A masked pattern was here ####
+FAILED: Execution Error, return code 30041 from org.apache.hadoop.hive.ql.exec.spark.SparkTask. Failed to create Spark client #SPARK_SESSION_ID#: java.lang.RuntimeException: spark-submit process failed with exit code 1 and error "Error: Executor cores must be a positive number"
http://git-wip-us.apache.org/repos/asf/hive/blob/e19b861c/ql/src/test/results/clientnegative/spark/spark_submit_negative_executor_memory.q.out
----------------------------------------------------------------------
diff --git a/ql/src/test/results/clientnegative/spark/spark_submit_negative_executor_memory.q.out b/ql/src/test/results/clientnegative/spark/spark_submit_negative_executor_memory.q.out
new file mode 100644
index 0000000..4aa1337
--- /dev/null
+++ b/ql/src/test/results/clientnegative/spark/spark_submit_negative_executor_memory.q.out
@@ -0,0 +1,5 @@
+PREHOOK: query: select * from src order by key limit 10
+PREHOOK: type: QUERY
+PREHOOK: Input: default@src
+#### A masked pattern was here ####
+FAILED: Execution Error, return code 30041 from org.apache.hadoop.hive.ql.exec.spark.SparkTask. Failed to create Spark client #SPARK_SESSION_ID#: java.lang.RuntimeException: spark-submit process failed with exit code 1 and error "Error: Executor Memory cores must be a positive number"
http://git-wip-us.apache.org/repos/asf/hive/blob/e19b861c/spark-client/src/main/java/org/apache/hive/spark/client/AbstractSparkClient.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/AbstractSparkClient.java b/spark-client/src/main/java/org/apache/hive/spark/client/AbstractSparkClient.java
index ed9222c..b2b5201 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/AbstractSparkClient.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/AbstractSparkClient.java
@@ -24,7 +24,6 @@ import com.google.common.base.Charsets;
import com.google.common.base.Joiner;
import com.google.common.base.Strings;
import com.google.common.base.Preconditions;
-import com.google.common.base.Throwables;
import com.google.common.collect.Maps;
import com.google.common.io.Resources;
@@ -41,8 +40,6 @@ import java.io.Serializable;
import java.io.Writer;
import java.net.URI;
import java.net.URL;
-import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -125,17 +122,19 @@ abstract class AbstractSparkClient implements SparkClient {
} else {
errorMsg = "Error while waiting for Remote Spark Driver to connect back to HiveServer2.";
}
- LOG.error(errorMsg, e);
- driverFuture.cancel(true);
- try {
- driverFuture.get();
- } catch (InterruptedException ie) {
- // Give up.
- LOG.warn("Interrupted before driver thread was finished.", ie);
- } catch (ExecutionException ee) {
- LOG.error("Driver thread failed", ee);
+ if (driverFuture.isDone()) {
+ try {
+ driverFuture.get();
+ } catch (InterruptedException ie) {
+ // Give up.
+ LOG.warn("Interrupted before driver thread was finished.", ie);
+ } catch (ExecutionException ee) {
+ LOG.error("Driver thread failed", ee);
+ }
+ } else {
+ driverFuture.cancel(true);
}
- throw Throwables.propagate(e);
+ throw new RuntimeException(errorMsg, e);
}
LOG.info("Successfully connected to Remote Spark Driver at: " + this.driverRpc.getRemoteAddress());
http://git-wip-us.apache.org/repos/asf/hive/blob/e19b861c/spark-client/src/main/java/org/apache/hive/spark/client/SparkSubmitSparkClient.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkSubmitSparkClient.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkSubmitSparkClient.java
index 31e89b8..7a6e77b 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkSubmitSparkClient.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkSubmitSparkClient.java
@@ -32,6 +32,8 @@ import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
+import org.apache.commons.lang3.StringUtils;
+
import org.apache.hadoop.hive.common.log.LogRedirector;
import org.apache.hadoop.hive.conf.Constants;
import org.apache.hadoop.hive.conf.HiveConf;
@@ -200,19 +202,19 @@ class SparkSubmitSparkClient extends AbstractSparkClient {
try {
int exitCode = child.waitFor();
if (exitCode != 0) {
- StringBuilder errStr = new StringBuilder();
- synchronized(childErrorLog) {
- for (Object aChildErrorLog : childErrorLog) {
- errStr.append(aChildErrorLog);
- errStr.append('\n');
+ List<String> errorMessages = new ArrayList<>();
+ synchronized (childErrorLog) {
+ for (String line : childErrorLog) {
+ if (StringUtils.containsIgnoreCase(line, "Error")) {
+ errorMessages.add("\"" + line + "\"");
+ }
}
}
- LOG.warn("Child process exited with code {}", exitCode);
- rpcServer.cancelClient(clientId,
- "Child process (spark-submit) exited before connecting back with error log " + errStr.toString());
- } else {
- LOG.info("Child process (spark-submit) exited successfully.");
+ String errStr = errorMessages.isEmpty() ? "?" : Joiner.on(',').join(errorMessages);
+
+ rpcServer.cancelClient(clientId, new RuntimeException("spark-submit process failed " +
+ "with exit code " + exitCode + " and error " + errStr));
}
} catch (InterruptedException ie) {
LOG.warn("Thread waiting on the child process (spark-submit) is interrupted, killing the child process.");
http://git-wip-us.apache.org/repos/asf/hive/blob/e19b861c/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java
index babcb54..0c67ffd 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/rpc/RpcServer.java
@@ -202,11 +202,12 @@ public class RpcServer implements Closeable {
}
/**
- * Tells the RPC server to cancel the connection from an existing pending client
+ * Tells the RPC server to cancel the connection from an existing pending client.
+ *
* @param clientId The identifier for the client
- * @param msg The error message about why the connection should be canceled
+ * @param failure The error about why the connection should be canceled
*/
- public void cancelClient(final String clientId, final String msg) {
+ public void cancelClient(final String clientId, final Throwable failure) {
final ClientInfo cinfo = pendingClients.remove(clientId);
if (cinfo == null) {
// Nothing to be done here.
@@ -214,12 +215,22 @@ public class RpcServer implements Closeable {
}
cinfo.timeoutFuture.cancel(true);
if (!cinfo.promise.isDone()) {
- cinfo.promise.setFailure(new RuntimeException(
- String.format("Cancelling Remote Spark Driver client connection '%s' with error: " + msg, clientId)));
+ cinfo.promise.setFailure(failure);
}
}
/**
+ * Tells the RPC server to cancel the connection from an existing pending client.
+ *
+ * @param clientId The identifier for the client
+ * @param msg The error message about why the connection should be canceled
+ */
+ public void cancelClient(final String clientId, final String msg) {
+ cancelClient(clientId, new RuntimeException(String.format(
+ "Cancelling Remote Spark Driver client connection '%s' with error: " + msg, clientId)));
+ }
+
+ /**
* Creates a secret for identifying a client connection.
*/
public String createSecret() {