You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sp...@apache.org on 2016/05/20 16:17:37 UTC
hive git commit: HIVE-13657: Spark driver stderr logs should appear
in hive client logs (Mohit Sabharwal, reviewed by Sergio Pena)
Repository: hive
Updated Branches:
refs/heads/master e5ba2690f -> 360dfa0ff
HIVE-13657: Spark driver stderr logs should appear in hive client logs (Mohit Sabharwal, reviewed by Sergio Pena)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/360dfa0f
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/360dfa0f
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/360dfa0f
Branch: refs/heads/master
Commit: 360dfa0ffd0d0500db016861ef24299f1596274d
Parents: e5ba269
Author: Mohit Sabharwal <mo...@cloudera.com>
Authored: Fri May 20 11:16:43 2016 -0500
Committer: Sergio Pena <se...@cloudera.com>
Committed: Fri May 20 11:16:43 2016 -0500
----------------------------------------------------------------------
.../hive/spark/client/SparkClientImpl.java | 35 ++++++++++++++++----
1 file changed, 28 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hive/blob/360dfa0f/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
index ae78bc3..dfe263f 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
@@ -42,6 +42,7 @@ import java.io.Serializable;
import java.io.Writer;
import java.net.URI;
import java.net.URL;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
@@ -68,6 +69,7 @@ class SparkClientImpl implements SparkClient {
private static final Logger LOG = LoggerFactory.getLogger(SparkClientImpl.class);
private static final long DEFAULT_SHUTDOWN_TIMEOUT = 10000; // In milliseconds
+ private static final long MAX_ERR_LOG_LINES_FOR_RPC = 1000;
private static final String OSX_TEST_OPTS = "SPARK_OSX_TEST_OPTS";
private static final String SPARK_HOME_ENV = "SPARK_HOME";
@@ -391,7 +393,6 @@ class SparkClientImpl implements SparkClient {
argv.add(numOfExecutors);
}
}
-
if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS)) {
try {
String currentUser = Utils.getUGI().getShortUserName();
@@ -445,8 +446,9 @@ class SparkClientImpl implements SparkClient {
final Process child = pb.start();
int childId = childIdGenerator.incrementAndGet();
- redirect("stdout-redir-" + childId, child.getInputStream());
- redirect("stderr-redir-" + childId, child.getErrorStream());
+ final List<String> childErrorLog = new ArrayList<String>();
+ redirect("stdout-redir-" + childId, new Redirector(child.getInputStream()));
+ redirect("stderr-redir-" + childId, new Redirector(child.getErrorStream(), childErrorLog));
runnable = new Runnable() {
@Override
@@ -454,8 +456,15 @@ class SparkClientImpl implements SparkClient {
try {
int exitCode = child.waitFor();
if (exitCode != 0) {
- rpcServer.cancelClient(clientId, "Child process exited before connecting back");
- LOG.warn("Child process exited with code {}.", exitCode);
+ StringBuilder errStr = new StringBuilder();
+ for (String s : childErrorLog) {
+ errStr.append(s);
+ errStr.append('\n');
+ }
+
+ rpcServer.cancelClient(clientId,
+ "Child process exited before connecting back with error log " + errStr.toString());
+ LOG.warn("Child process exited with code {}", exitCode);
}
} catch (InterruptedException ie) {
LOG.warn("Waiting thread interrupted, killing child process.");
@@ -475,8 +484,8 @@ class SparkClientImpl implements SparkClient {
return thread;
}
- private void redirect(String name, InputStream in) {
- Thread thread = new Thread(new Redirector(in));
+ private void redirect(String name, Redirector redirector) {
+ Thread thread = new Thread(redirector);
thread.setName(name);
thread.setDaemon(true);
thread.start();
@@ -587,17 +596,29 @@ class SparkClientImpl implements SparkClient {
private class Redirector implements Runnable {
private final BufferedReader in;
+ private List<String> errLogs;
+ private int numErrLogLines = 0;
Redirector(InputStream in) {
this.in = new BufferedReader(new InputStreamReader(in));
}
+ Redirector(InputStream in, List<String> errLogs) {
+ this.in = new BufferedReader(new InputStreamReader(in));
+ this.errLogs = errLogs;
+ }
+
@Override
public void run() {
try {
String line = null;
while ((line = in.readLine()) != null) {
LOG.info(line);
+ if (errLogs != null) {
+ if (numErrLogLines++ < MAX_ERR_LOG_LINES_FOR_RPC) {
+ errLogs.add(line);
+ }
+ }
}
} catch (Exception e) {
LOG.warn("Error in redirector thread.", e);