You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@livy.apache.org by js...@apache.org on 2017/06/27 06:39:05 UTC

[24/50] [abbrv] incubator-livy git commit: LIVY-293: Redirect spark-submit log to REST response Log field for interactive sessions. (#297)

LIVY-293: Redirect spark-submit log to REST response Log field for interactive sessions. (#297)

Passing driverProcess to SparkApp for interactive sessions so session/log will return spark-submit log.

Project: http://git-wip-us.apache.org/repos/asf/incubator-livy/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-livy/commit/d74d5a9c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-livy/tree/d74d5a9c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-livy/diff/d74d5a9c

Branch: refs/heads/master
Commit: d74d5a9cdaf07fa9e6f7d1fe8ebfa161572fd81a
Parents: 932d397
Author: Praveen Kanamarlapudi <kp...@outlook.com>
Authored: Mon Feb 27 14:39:16 2017 -0800
Committer: Alex Man <al...@users.noreply.github.com>
Committed: Mon Feb 27 14:39:16 2017 -0800

----------------------------------------------------------------------
 .../com/cloudera/livy/rsc/ContextLauncher.java  | 62 +-------------------
 .../cloudera/livy/rsc/DriverProcessInfo.java    | 42 +++++++++++++
 .../java/com/cloudera/livy/rsc/RSCClient.java   |  8 ++-
 .../com/cloudera/livy/rsc/RSCClientFactory.java |  7 ++-
 .../server/interactive/InteractiveSession.scala |  5 +-
 .../livy/utils/LineBufferedStream.scala         |  2 +-
 .../com/cloudera/livy/utils/SparkYarnApp.scala  |  4 +-
 .../cloudera/livy/utils/SparkYarnAppSpec.scala  |  8 ++-
 8 files changed, 70 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/d74d5a9c/rsc/src/main/java/com/cloudera/livy/rsc/ContextLauncher.java
----------------------------------------------------------------------
diff --git a/rsc/src/main/java/com/cloudera/livy/rsc/ContextLauncher.java b/rsc/src/main/java/com/cloudera/livy/rsc/ContextLauncher.java
index eceb419..9a5e447 100644
--- a/rsc/src/main/java/com/cloudera/livy/rsc/ContextLauncher.java
+++ b/rsc/src/main/java/com/cloudera/livy/rsc/ContextLauncher.java
@@ -69,10 +69,10 @@ class ContextLauncher {
   private static final String SPARK_ARCHIVES_KEY = "spark.yarn.dist.archives";
   private static final String SPARK_HOME_ENV = "SPARK_HOME";
 
-  static Promise<ContextInfo> create(RSCClientFactory factory, RSCConf conf)
+  static DriverProcessInfo create(RSCClientFactory factory, RSCConf conf)
       throws IOException {
     ContextLauncher launcher = new ContextLauncher(factory, conf);
-    return launcher.promise;
+    return new DriverProcessInfo(launcher.promise, launcher.child.child);
   }
 
   private final Promise<ContextInfo> promise;
@@ -304,33 +304,6 @@ class ContextLauncher {
     return file;
   }
 
-  private static class Redirector implements Runnable {
-
-    private final BufferedReader in;
-
-    Redirector(InputStream in) {
-      this.in = new BufferedReader(new InputStreamReader(in));
-    }
-
-    @Override
-    public void run() {
-      try {
-        String line = null;
-        while ((line = in.readLine()) != null) {
-          LOG.info(line);
-        }
-      } catch (Exception e) {
-        LOG.warn("Error in redirector thread.", e);
-      }
-
-      try {
-        in.close();
-      } catch (IOException ioe) {
-        LOG.warn("Error closing child stream.", ioe);
-      }
-    }
-
-  }
 
   private class RegistrationHandler extends BaseProtocol
     implements RpcServer.ClientCallback {
@@ -383,8 +356,6 @@ class ContextLauncher {
     private final Promise<?> promise;
     private final Process child;
     private final Thread monitor;
-    private final Thread stdout;
-    private final Thread stderr;
     private final File confFile;
 
     public ChildProcess(RSCConf conf, Promise<?> promise, Runnable child, File confFile) {
@@ -392,8 +363,6 @@ class ContextLauncher {
       this.promise = promise;
       this.monitor = monitor(child, CHILD_IDS.incrementAndGet());
       this.child = null;
-      this.stdout = null;
-      this.stderr = null;
       this.confFile = confFile;
     }
 
@@ -402,8 +371,6 @@ class ContextLauncher {
       this.conf = conf;
       this.promise = promise;
       this.child = childProc;
-      this.stdout = redirect("stdout-redir-" + childId, child.getInputStream());
-      this.stderr = redirect("stderr-redir-" + childId, child.getErrorStream());
       this.confFile = confFile;
 
       Runnable monitorTask = new Runnable() {
@@ -450,23 +417,6 @@ class ContextLauncher {
     }
 
     public void detach() {
-      if (stdout != null) {
-        stdout.interrupt();
-        try {
-          stdout.join(conf.getTimeAsMs(CLIENT_SHUTDOWN_TIMEOUT));
-        } catch (InterruptedException ie) {
-          LOG.info("Interrupted while waiting for child stdout to finish.");
-        }
-      }
-      if (stderr != null) {
-        stderr.interrupt();
-        try {
-          stderr.join(conf.getTimeAsMs(CLIENT_SHUTDOWN_TIMEOUT));
-        } catch (InterruptedException ie) {
-          LOG.info("Interrupted while waiting for child stderr to finish.");
-        }
-      }
-
       try {
         monitor.join(conf.getTimeAsMs(CLIENT_SHUTDOWN_TIMEOUT));
       } catch (InterruptedException ie) {
@@ -474,14 +424,6 @@ class ContextLauncher {
       }
     }
 
-    private Thread redirect(String name, InputStream in) {
-      Thread thread = new Thread(new Redirector(in));
-      thread.setName(name);
-      thread.setDaemon(true);
-      thread.start();
-      return thread;
-    }
-
     private Thread monitor(final Runnable task, int childId) {
       Runnable wrappedTask = new Runnable() {
         @Override

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/d74d5a9c/rsc/src/main/java/com/cloudera/livy/rsc/DriverProcessInfo.java
----------------------------------------------------------------------
diff --git a/rsc/src/main/java/com/cloudera/livy/rsc/DriverProcessInfo.java b/rsc/src/main/java/com/cloudera/livy/rsc/DriverProcessInfo.java
new file mode 100644
index 0000000..a224fd6
--- /dev/null
+++ b/rsc/src/main/java/com/cloudera/livy/rsc/DriverProcessInfo.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.cloudera.livy.rsc;
+
+import io.netty.util.concurrent.Promise;
+
+/**
+ * Information about driver process and @{@link ContextInfo}
+ */
+public class DriverProcessInfo {
+
+  private Promise<ContextInfo> contextInfo;
+  private transient Process driverProcess;
+
+  public DriverProcessInfo(Promise<ContextInfo> contextInfo, Process driverProcess) {
+    this.contextInfo = contextInfo;
+    this.driverProcess = driverProcess;
+  }
+
+  public Promise<ContextInfo> getContextInfo() {
+    return contextInfo;
+  }
+
+  public Process getDriverProcess() {
+    return driverProcess;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/d74d5a9c/rsc/src/main/java/com/cloudera/livy/rsc/RSCClient.java
----------------------------------------------------------------------
diff --git a/rsc/src/main/java/com/cloudera/livy/rsc/RSCClient.java b/rsc/src/main/java/com/cloudera/livy/rsc/RSCClient.java
index 11f45b6..11cb0f6 100644
--- a/rsc/src/main/java/com/cloudera/livy/rsc/RSCClient.java
+++ b/rsc/src/main/java/com/cloudera/livy/rsc/RSCClient.java
@@ -60,12 +60,14 @@ public class RSCClient implements LivyClient {
   private final Promise<URI> serverUriPromise;
 
   private ContextInfo contextInfo;
+  private Process driverProcess;
   private volatile boolean isAlive;
   private volatile String replState;
 
-  RSCClient(RSCConf conf, Promise<ContextInfo> ctx) throws IOException {
+  RSCClient(RSCConf conf, Promise<ContextInfo> ctx, Process driverProcess) throws IOException {
     this.conf = conf;
     this.contextInfoPromise = ctx;
+    this.driverProcess = driverProcess;
     this.jobs = new ConcurrentHashMap<>();
     this.protocol = new ClientProtocol();
     this.driverRpc = ImmediateEventExecutor.INSTANCE.newPromise();
@@ -98,6 +100,10 @@ public class RSCClient implements LivyClient {
     return isAlive;
   }
 
+  public Process getDriverProcess() {
+    return driverProcess;
+  }
+
   private synchronized void connectToContext(final ContextInfo info) throws Exception {
     this.contextInfo = info;
 

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/d74d5a9c/rsc/src/main/java/com/cloudera/livy/rsc/RSCClientFactory.java
----------------------------------------------------------------------
diff --git a/rsc/src/main/java/com/cloudera/livy/rsc/RSCClientFactory.java b/rsc/src/main/java/com/cloudera/livy/rsc/RSCClientFactory.java
index d50c62e..27540bb 100644
--- a/rsc/src/main/java/com/cloudera/livy/rsc/RSCClientFactory.java
+++ b/rsc/src/main/java/com/cloudera/livy/rsc/RSCClientFactory.java
@@ -57,14 +57,17 @@ public final class RSCClientFactory implements LivyClientFactory {
     boolean needsServer = false;
     try {
       Promise<ContextInfo> info;
+      Process driverProcess = null;
       if (uri.getUserInfo() != null && uri.getHost() != null && uri.getPort() > 0) {
         info = createContextInfo(uri);
       } else {
         needsServer = true;
         ref(lconf);
-        info = ContextLauncher.create(this, lconf);
+        DriverProcessInfo processInfo = ContextLauncher.create(this, lconf);
+        info = processInfo.getContextInfo();
+        driverProcess = processInfo.getDriverProcess();
       }
-      return new RSCClient(lconf, info);
+      return new RSCClient(lconf, info, driverProcess);
     } catch (Exception e) {
       if (needsServer) {
         unref();

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/d74d5a9c/server/src/main/scala/com/cloudera/livy/server/interactive/InteractiveSession.scala
----------------------------------------------------------------------
diff --git a/server/src/main/scala/com/cloudera/livy/server/interactive/InteractiveSession.scala b/server/src/main/scala/com/cloudera/livy/server/interactive/InteractiveSession.scala
index 35fbc79..605edc6 100644
--- a/server/src/main/scala/com/cloudera/livy/server/interactive/InteractiveSession.scala
+++ b/server/src/main/scala/com/cloudera/livy/server/interactive/InteractiveSession.scala
@@ -44,6 +44,7 @@ import com.cloudera.livy.server.recovery.SessionStore
 import com.cloudera.livy.sessions._
 import com.cloudera.livy.sessions.Session._
 import com.cloudera.livy.sessions.SessionState.Dead
+import com.cloudera.livy.util.LineBufferedProcess
 import com.cloudera.livy.utils.{AppInfo, LivySparkUtils, SparkApp, SparkAppListener}
 
 @JsonIgnoreProperties(ignoreUnknown = true)
@@ -379,9 +380,11 @@ class InteractiveSession(
 
   private val app = mockApp.orElse {
     if (livyConf.isRunningOnYarn()) {
+      val driverProcess = client.flatMap { c => Option(c.getDriverProcess) }
+        .map(new LineBufferedProcess(_))
       // When Livy is running with YARN, SparkYarnApp can provide better YARN integration.
       // (e.g. Reflect YARN application state to session state).
-      Option(SparkApp.create(appTag, appId, None, livyConf, Some(this)))
+      Option(SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this)))
     } else {
       // When Livy is running with other cluster manager, SparkApp doesn't provide any
       // additional benefit over controlling RSCDriver using RSCClient. Don't use it.

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/d74d5a9c/server/src/main/scala/com/cloudera/livy/utils/LineBufferedStream.scala
----------------------------------------------------------------------
diff --git a/server/src/main/scala/com/cloudera/livy/utils/LineBufferedStream.scala b/server/src/main/scala/com/cloudera/livy/utils/LineBufferedStream.scala
index d54bfb0..a8949af 100644
--- a/server/src/main/scala/com/cloudera/livy/utils/LineBufferedStream.scala
+++ b/server/src/main/scala/com/cloudera/livy/utils/LineBufferedStream.scala
@@ -39,7 +39,6 @@ class LineBufferedStream(inputStream: InputStream) extends Logging {
       for (line <- lines) {
         _lock.lock()
         try {
-          trace("stdout: ", line)
           _lines = _lines :+ line
           _condition.signalAll()
         } finally {
@@ -47,6 +46,7 @@ class LineBufferedStream(inputStream: InputStream) extends Logging {
         }
       }
 
+      _lines.map { line => info("stdout: ", line) }
       _lock.lock()
       try {
         _finished = true

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/d74d5a9c/server/src/main/scala/com/cloudera/livy/utils/SparkYarnApp.scala
----------------------------------------------------------------------
diff --git a/server/src/main/scala/com/cloudera/livy/utils/SparkYarnApp.scala b/server/src/main/scala/com/cloudera/livy/utils/SparkYarnApp.scala
index 7b5cc75..fb47e5e 100644
--- a/server/src/main/scala/com/cloudera/livy/utils/SparkYarnApp.scala
+++ b/server/src/main/scala/com/cloudera/livy/utils/SparkYarnApp.scala
@@ -128,7 +128,9 @@ class SparkYarnApp private[utils] (
   private var yarnDiagnostics: IndexedSeq[String] = IndexedSeq.empty[String]
 
   override def log(): IndexedSeq[String] =
-    process.map(_.inputLines).getOrElse(ArrayBuffer.empty[String]) ++ yarnDiagnostics
+    ("stdout: " +: process.map(_.inputLines).getOrElse(ArrayBuffer.empty[String])) ++
+    ("\nstderr: " +: process.map(_.errorLines).getOrElse(ArrayBuffer.empty[String])) ++
+    ("\nYARN Diagnostics: " +: yarnDiagnostics)
 
   override def kill(): Unit = synchronized {
     if (isRunning) {

http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/d74d5a9c/server/src/test/scala/com/cloudera/livy/utils/SparkYarnAppSpec.scala
----------------------------------------------------------------------
diff --git a/server/src/test/scala/com/cloudera/livy/utils/SparkYarnAppSpec.scala b/server/src/test/scala/com/cloudera/livy/utils/SparkYarnAppSpec.scala
index 3d7c76f..1d90367 100644
--- a/server/src/test/scala/com/cloudera/livy/utils/SparkYarnAppSpec.scala
+++ b/server/src/test/scala/com/cloudera/livy/utils/SparkYarnAppSpec.scala
@@ -136,8 +136,12 @@ class SparkYarnAppSpec extends FunSpec with LivyBaseUnitTestSuite {
       Clock.withSleepMethod(mockSleep) {
         val mockYarnClient = mock[YarnClient]
         val mockSparkSubmit = mock[LineBufferedProcess]
-        val sparkSubmitLog = IndexedSeq("SPARK-SUBMIT", "LOG")
-        when(mockSparkSubmit.inputLines).thenReturn(sparkSubmitLog)
+        val sparkSubmitInfoLog = IndexedSeq("SPARK-SUBMIT", "LOG")
+        val sparkSubmitErrorLog = IndexedSeq("SPARK-SUBMIT", "error log")
+        val sparkSubmitLog = ("stdout: " +: sparkSubmitInfoLog) ++
+          ("\nstderr: " +: sparkSubmitErrorLog) :+ "\nYARN Diagnostics: "
+        when(mockSparkSubmit.inputLines).thenReturn(sparkSubmitInfoLog)
+        when(mockSparkSubmit.errorLines).thenReturn(sparkSubmitErrorLog)
         val waitForCalledLatch = new CountDownLatch(1)
         when(mockSparkSubmit.waitFor()).thenAnswer(new Answer[Int]() {
           override def answer(invocation: InvocationOnMock): Int = {