You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by sp...@apache.org on 2016/05/20 16:17:37 UTC

hive git commit: HIVE-13657: Spark driver stderr logs should appear in hive client logs (Mohit Sabharwal, reviewed by Sergio Pena)

Repository: hive
Updated Branches:
  refs/heads/master e5ba2690f -> 360dfa0ff


HIVE-13657: Spark driver stderr logs should appear in hive client logs (Mohit Sabharwal, reviewed by Sergio Pena)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/360dfa0f
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/360dfa0f
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/360dfa0f

Branch: refs/heads/master
Commit: 360dfa0ffd0d0500db016861ef24299f1596274d
Parents: e5ba269
Author: Mohit Sabharwal <mo...@cloudera.com>
Authored: Fri May 20 11:16:43 2016 -0500
Committer: Sergio Pena <se...@cloudera.com>
Committed: Fri May 20 11:16:43 2016 -0500

----------------------------------------------------------------------
 .../hive/spark/client/SparkClientImpl.java      | 35 ++++++++++++++++----
 1 file changed, 28 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/360dfa0f/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
----------------------------------------------------------------------
diff --git a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
index ae78bc3..dfe263f 100644
--- a/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
+++ b/spark-client/src/main/java/org/apache/hive/spark/client/SparkClientImpl.java
@@ -42,6 +42,7 @@ import java.io.Serializable;
 import java.io.Writer;
 import java.net.URI;
 import java.net.URL;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -68,6 +69,7 @@ class SparkClientImpl implements SparkClient {
   private static final Logger LOG = LoggerFactory.getLogger(SparkClientImpl.class);
 
   private static final long DEFAULT_SHUTDOWN_TIMEOUT = 10000; // In milliseconds
+  private static final long MAX_ERR_LOG_LINES_FOR_RPC = 1000;
 
   private static final String OSX_TEST_OPTS = "SPARK_OSX_TEST_OPTS";
   private static final String SPARK_HOME_ENV = "SPARK_HOME";
@@ -391,7 +393,6 @@ class SparkClientImpl implements SparkClient {
           argv.add(numOfExecutors);
         }
       }
-
       if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS)) {
         try {
           String currentUser = Utils.getUGI().getShortUserName();
@@ -445,8 +446,9 @@ class SparkClientImpl implements SparkClient {
 
       final Process child = pb.start();
       int childId = childIdGenerator.incrementAndGet();
-      redirect("stdout-redir-" + childId, child.getInputStream());
-      redirect("stderr-redir-" + childId, child.getErrorStream());
+      final List<String> childErrorLog = new ArrayList<String>();
+      redirect("stdout-redir-" + childId, new Redirector(child.getInputStream()));
+      redirect("stderr-redir-" + childId, new Redirector(child.getErrorStream(), childErrorLog));
 
       runnable = new Runnable() {
         @Override
@@ -454,8 +456,15 @@ class SparkClientImpl implements SparkClient {
           try {
             int exitCode = child.waitFor();
             if (exitCode != 0) {
-              rpcServer.cancelClient(clientId, "Child process exited before connecting back");
-              LOG.warn("Child process exited with code {}.", exitCode);
+              StringBuilder errStr = new StringBuilder();
+              for (String s : childErrorLog) {
+                errStr.append(s);
+                errStr.append('\n');
+              }
+
+              rpcServer.cancelClient(clientId,
+                  "Child process exited before connecting back with error log " + errStr.toString());
+              LOG.warn("Child process exited with code {}", exitCode);
             }
           } catch (InterruptedException ie) {
             LOG.warn("Waiting thread interrupted, killing child process.");
@@ -475,8 +484,8 @@ class SparkClientImpl implements SparkClient {
     return thread;
   }
 
-  private void redirect(String name, InputStream in) {
-    Thread thread = new Thread(new Redirector(in));
+  private void redirect(String name, Redirector redirector) {
+    Thread thread = new Thread(redirector);
     thread.setName(name);
     thread.setDaemon(true);
     thread.start();
@@ -587,17 +596,29 @@ class SparkClientImpl implements SparkClient {
   private class Redirector implements Runnable {
 
     private final BufferedReader in;
+    private List<String> errLogs;
+    private int numErrLogLines = 0;
 
     Redirector(InputStream in) {
       this.in = new BufferedReader(new InputStreamReader(in));
     }
 
+    Redirector(InputStream in, List<String> errLogs) {
+      this.in = new BufferedReader(new InputStreamReader(in));
+      this.errLogs = errLogs;
+    }
+
     @Override
     public void run() {
       try {
         String line = null;
         while ((line = in.readLine()) != null) {
           LOG.info(line);
+          if (errLogs != null) {
+            if (numErrLogLines++ < MAX_ERR_LOG_LINES_FOR_RPC) {
+              errLogs.add(line);
+            }
+          }
         }
       } catch (Exception e) {
         LOG.warn("Error in redirector thread.", e);