You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@livy.apache.org by js...@apache.org on 2017/06/27 06:39:05 UTC
[24/50] [abbrv] incubator-livy git commit: LIVY-293: Redirect
spark-submit log to REST response Log field for interactive sessions. (#297)
LIVY-293: Redirect spark-submit log to REST response Log field for interactive sessions. (#297)
Passing driverProcess to SparkApp for interactive sessions so session/log will return spark-submit log.
Project: http://git-wip-us.apache.org/repos/asf/incubator-livy/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-livy/commit/d74d5a9c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-livy/tree/d74d5a9c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-livy/diff/d74d5a9c
Branch: refs/heads/master
Commit: d74d5a9cdaf07fa9e6f7d1fe8ebfa161572fd81a
Parents: 932d397
Author: Praveen Kanamarlapudi <kp...@outlook.com>
Authored: Mon Feb 27 14:39:16 2017 -0800
Committer: Alex Man <al...@users.noreply.github.com>
Committed: Mon Feb 27 14:39:16 2017 -0800
----------------------------------------------------------------------
.../com/cloudera/livy/rsc/ContextLauncher.java | 62 +-------------------
.../cloudera/livy/rsc/DriverProcessInfo.java | 42 +++++++++++++
.../java/com/cloudera/livy/rsc/RSCClient.java | 8 ++-
.../com/cloudera/livy/rsc/RSCClientFactory.java | 7 ++-
.../server/interactive/InteractiveSession.scala | 5 +-
.../livy/utils/LineBufferedStream.scala | 2 +-
.../com/cloudera/livy/utils/SparkYarnApp.scala | 4 +-
.../cloudera/livy/utils/SparkYarnAppSpec.scala | 8 ++-
8 files changed, 70 insertions(+), 68 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/d74d5a9c/rsc/src/main/java/com/cloudera/livy/rsc/ContextLauncher.java
----------------------------------------------------------------------
diff --git a/rsc/src/main/java/com/cloudera/livy/rsc/ContextLauncher.java b/rsc/src/main/java/com/cloudera/livy/rsc/ContextLauncher.java
index eceb419..9a5e447 100644
--- a/rsc/src/main/java/com/cloudera/livy/rsc/ContextLauncher.java
+++ b/rsc/src/main/java/com/cloudera/livy/rsc/ContextLauncher.java
@@ -69,10 +69,10 @@ class ContextLauncher {
private static final String SPARK_ARCHIVES_KEY = "spark.yarn.dist.archives";
private static final String SPARK_HOME_ENV = "SPARK_HOME";
- static Promise<ContextInfo> create(RSCClientFactory factory, RSCConf conf)
+ static DriverProcessInfo create(RSCClientFactory factory, RSCConf conf)
throws IOException {
ContextLauncher launcher = new ContextLauncher(factory, conf);
- return launcher.promise;
+ return new DriverProcessInfo(launcher.promise, launcher.child.child);
}
private final Promise<ContextInfo> promise;
@@ -304,33 +304,6 @@ class ContextLauncher {
return file;
}
- private static class Redirector implements Runnable {
-
- private final BufferedReader in;
-
- Redirector(InputStream in) {
- this.in = new BufferedReader(new InputStreamReader(in));
- }
-
- @Override
- public void run() {
- try {
- String line = null;
- while ((line = in.readLine()) != null) {
- LOG.info(line);
- }
- } catch (Exception e) {
- LOG.warn("Error in redirector thread.", e);
- }
-
- try {
- in.close();
- } catch (IOException ioe) {
- LOG.warn("Error closing child stream.", ioe);
- }
- }
-
- }
private class RegistrationHandler extends BaseProtocol
implements RpcServer.ClientCallback {
@@ -383,8 +356,6 @@ class ContextLauncher {
private final Promise<?> promise;
private final Process child;
private final Thread monitor;
- private final Thread stdout;
- private final Thread stderr;
private final File confFile;
public ChildProcess(RSCConf conf, Promise<?> promise, Runnable child, File confFile) {
@@ -392,8 +363,6 @@ class ContextLauncher {
this.promise = promise;
this.monitor = monitor(child, CHILD_IDS.incrementAndGet());
this.child = null;
- this.stdout = null;
- this.stderr = null;
this.confFile = confFile;
}
@@ -402,8 +371,6 @@ class ContextLauncher {
this.conf = conf;
this.promise = promise;
this.child = childProc;
- this.stdout = redirect("stdout-redir-" + childId, child.getInputStream());
- this.stderr = redirect("stderr-redir-" + childId, child.getErrorStream());
this.confFile = confFile;
Runnable monitorTask = new Runnable() {
@@ -450,23 +417,6 @@ class ContextLauncher {
}
public void detach() {
- if (stdout != null) {
- stdout.interrupt();
- try {
- stdout.join(conf.getTimeAsMs(CLIENT_SHUTDOWN_TIMEOUT));
- } catch (InterruptedException ie) {
- LOG.info("Interrupted while waiting for child stdout to finish.");
- }
- }
- if (stderr != null) {
- stderr.interrupt();
- try {
- stderr.join(conf.getTimeAsMs(CLIENT_SHUTDOWN_TIMEOUT));
- } catch (InterruptedException ie) {
- LOG.info("Interrupted while waiting for child stderr to finish.");
- }
- }
-
try {
monitor.join(conf.getTimeAsMs(CLIENT_SHUTDOWN_TIMEOUT));
} catch (InterruptedException ie) {
@@ -474,14 +424,6 @@ class ContextLauncher {
}
}
- private Thread redirect(String name, InputStream in) {
- Thread thread = new Thread(new Redirector(in));
- thread.setName(name);
- thread.setDaemon(true);
- thread.start();
- return thread;
- }
-
private Thread monitor(final Runnable task, int childId) {
Runnable wrappedTask = new Runnable() {
@Override
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/d74d5a9c/rsc/src/main/java/com/cloudera/livy/rsc/DriverProcessInfo.java
----------------------------------------------------------------------
diff --git a/rsc/src/main/java/com/cloudera/livy/rsc/DriverProcessInfo.java b/rsc/src/main/java/com/cloudera/livy/rsc/DriverProcessInfo.java
new file mode 100644
index 0000000..a224fd6
--- /dev/null
+++ b/rsc/src/main/java/com/cloudera/livy/rsc/DriverProcessInfo.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.cloudera.livy.rsc;
+
+import io.netty.util.concurrent.Promise;
+
+/**
+ * Information about driver process and @{@link ContextInfo}
+ */
+public class DriverProcessInfo {
+
+ private Promise<ContextInfo> contextInfo;
+ private transient Process driverProcess;
+
+ public DriverProcessInfo(Promise<ContextInfo> contextInfo, Process driverProcess) {
+ this.contextInfo = contextInfo;
+ this.driverProcess = driverProcess;
+ }
+
+ public Promise<ContextInfo> getContextInfo() {
+ return contextInfo;
+ }
+
+ public Process getDriverProcess() {
+ return driverProcess;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/d74d5a9c/rsc/src/main/java/com/cloudera/livy/rsc/RSCClient.java
----------------------------------------------------------------------
diff --git a/rsc/src/main/java/com/cloudera/livy/rsc/RSCClient.java b/rsc/src/main/java/com/cloudera/livy/rsc/RSCClient.java
index 11f45b6..11cb0f6 100644
--- a/rsc/src/main/java/com/cloudera/livy/rsc/RSCClient.java
+++ b/rsc/src/main/java/com/cloudera/livy/rsc/RSCClient.java
@@ -60,12 +60,14 @@ public class RSCClient implements LivyClient {
private final Promise<URI> serverUriPromise;
private ContextInfo contextInfo;
+ private Process driverProcess;
private volatile boolean isAlive;
private volatile String replState;
- RSCClient(RSCConf conf, Promise<ContextInfo> ctx) throws IOException {
+ RSCClient(RSCConf conf, Promise<ContextInfo> ctx, Process driverProcess) throws IOException {
this.conf = conf;
this.contextInfoPromise = ctx;
+ this.driverProcess = driverProcess;
this.jobs = new ConcurrentHashMap<>();
this.protocol = new ClientProtocol();
this.driverRpc = ImmediateEventExecutor.INSTANCE.newPromise();
@@ -98,6 +100,10 @@ public class RSCClient implements LivyClient {
return isAlive;
}
+ public Process getDriverProcess() {
+ return driverProcess;
+ }
+
private synchronized void connectToContext(final ContextInfo info) throws Exception {
this.contextInfo = info;
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/d74d5a9c/rsc/src/main/java/com/cloudera/livy/rsc/RSCClientFactory.java
----------------------------------------------------------------------
diff --git a/rsc/src/main/java/com/cloudera/livy/rsc/RSCClientFactory.java b/rsc/src/main/java/com/cloudera/livy/rsc/RSCClientFactory.java
index d50c62e..27540bb 100644
--- a/rsc/src/main/java/com/cloudera/livy/rsc/RSCClientFactory.java
+++ b/rsc/src/main/java/com/cloudera/livy/rsc/RSCClientFactory.java
@@ -57,14 +57,17 @@ public final class RSCClientFactory implements LivyClientFactory {
boolean needsServer = false;
try {
Promise<ContextInfo> info;
+ Process driverProcess = null;
if (uri.getUserInfo() != null && uri.getHost() != null && uri.getPort() > 0) {
info = createContextInfo(uri);
} else {
needsServer = true;
ref(lconf);
- info = ContextLauncher.create(this, lconf);
+ DriverProcessInfo processInfo = ContextLauncher.create(this, lconf);
+ info = processInfo.getContextInfo();
+ driverProcess = processInfo.getDriverProcess();
}
- return new RSCClient(lconf, info);
+ return new RSCClient(lconf, info, driverProcess);
} catch (Exception e) {
if (needsServer) {
unref();
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/d74d5a9c/server/src/main/scala/com/cloudera/livy/server/interactive/InteractiveSession.scala
----------------------------------------------------------------------
diff --git a/server/src/main/scala/com/cloudera/livy/server/interactive/InteractiveSession.scala b/server/src/main/scala/com/cloudera/livy/server/interactive/InteractiveSession.scala
index 35fbc79..605edc6 100644
--- a/server/src/main/scala/com/cloudera/livy/server/interactive/InteractiveSession.scala
+++ b/server/src/main/scala/com/cloudera/livy/server/interactive/InteractiveSession.scala
@@ -44,6 +44,7 @@ import com.cloudera.livy.server.recovery.SessionStore
import com.cloudera.livy.sessions._
import com.cloudera.livy.sessions.Session._
import com.cloudera.livy.sessions.SessionState.Dead
+import com.cloudera.livy.util.LineBufferedProcess
import com.cloudera.livy.utils.{AppInfo, LivySparkUtils, SparkApp, SparkAppListener}
@JsonIgnoreProperties(ignoreUnknown = true)
@@ -379,9 +380,11 @@ class InteractiveSession(
private val app = mockApp.orElse {
if (livyConf.isRunningOnYarn()) {
+ val driverProcess = client.flatMap { c => Option(c.getDriverProcess) }
+ .map(new LineBufferedProcess(_))
// When Livy is running with YARN, SparkYarnApp can provide better YARN integration.
// (e.g. Reflect YARN application state to session state).
- Option(SparkApp.create(appTag, appId, None, livyConf, Some(this)))
+ Option(SparkApp.create(appTag, appId, driverProcess, livyConf, Some(this)))
} else {
// When Livy is running with other cluster manager, SparkApp doesn't provide any
// additional benefit over controlling RSCDriver using RSCClient. Don't use it.
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/d74d5a9c/server/src/main/scala/com/cloudera/livy/utils/LineBufferedStream.scala
----------------------------------------------------------------------
diff --git a/server/src/main/scala/com/cloudera/livy/utils/LineBufferedStream.scala b/server/src/main/scala/com/cloudera/livy/utils/LineBufferedStream.scala
index d54bfb0..a8949af 100644
--- a/server/src/main/scala/com/cloudera/livy/utils/LineBufferedStream.scala
+++ b/server/src/main/scala/com/cloudera/livy/utils/LineBufferedStream.scala
@@ -39,7 +39,6 @@ class LineBufferedStream(inputStream: InputStream) extends Logging {
for (line <- lines) {
_lock.lock()
try {
- trace("stdout: ", line)
_lines = _lines :+ line
_condition.signalAll()
} finally {
@@ -47,6 +46,7 @@ class LineBufferedStream(inputStream: InputStream) extends Logging {
}
}
+ _lines.map { line => info("stdout: ", line) }
_lock.lock()
try {
_finished = true
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/d74d5a9c/server/src/main/scala/com/cloudera/livy/utils/SparkYarnApp.scala
----------------------------------------------------------------------
diff --git a/server/src/main/scala/com/cloudera/livy/utils/SparkYarnApp.scala b/server/src/main/scala/com/cloudera/livy/utils/SparkYarnApp.scala
index 7b5cc75..fb47e5e 100644
--- a/server/src/main/scala/com/cloudera/livy/utils/SparkYarnApp.scala
+++ b/server/src/main/scala/com/cloudera/livy/utils/SparkYarnApp.scala
@@ -128,7 +128,9 @@ class SparkYarnApp private[utils] (
private var yarnDiagnostics: IndexedSeq[String] = IndexedSeq.empty[String]
override def log(): IndexedSeq[String] =
- process.map(_.inputLines).getOrElse(ArrayBuffer.empty[String]) ++ yarnDiagnostics
+ ("stdout: " +: process.map(_.inputLines).getOrElse(ArrayBuffer.empty[String])) ++
+ ("\nstderr: " +: process.map(_.errorLines).getOrElse(ArrayBuffer.empty[String])) ++
+ ("\nYARN Diagnostics: " +: yarnDiagnostics)
override def kill(): Unit = synchronized {
if (isRunning) {
http://git-wip-us.apache.org/repos/asf/incubator-livy/blob/d74d5a9c/server/src/test/scala/com/cloudera/livy/utils/SparkYarnAppSpec.scala
----------------------------------------------------------------------
diff --git a/server/src/test/scala/com/cloudera/livy/utils/SparkYarnAppSpec.scala b/server/src/test/scala/com/cloudera/livy/utils/SparkYarnAppSpec.scala
index 3d7c76f..1d90367 100644
--- a/server/src/test/scala/com/cloudera/livy/utils/SparkYarnAppSpec.scala
+++ b/server/src/test/scala/com/cloudera/livy/utils/SparkYarnAppSpec.scala
@@ -136,8 +136,12 @@ class SparkYarnAppSpec extends FunSpec with LivyBaseUnitTestSuite {
Clock.withSleepMethod(mockSleep) {
val mockYarnClient = mock[YarnClient]
val mockSparkSubmit = mock[LineBufferedProcess]
- val sparkSubmitLog = IndexedSeq("SPARK-SUBMIT", "LOG")
- when(mockSparkSubmit.inputLines).thenReturn(sparkSubmitLog)
+ val sparkSubmitInfoLog = IndexedSeq("SPARK-SUBMIT", "LOG")
+ val sparkSubmitErrorLog = IndexedSeq("SPARK-SUBMIT", "error log")
+ val sparkSubmitLog = ("stdout: " +: sparkSubmitInfoLog) ++
+ ("\nstderr: " +: sparkSubmitErrorLog) :+ "\nYARN Diagnostics: "
+ when(mockSparkSubmit.inputLines).thenReturn(sparkSubmitInfoLog)
+ when(mockSparkSubmit.errorLines).thenReturn(sparkSubmitErrorLog)
val waitForCalledLatch = new CountDownLatch(1)
when(mockSparkSubmit.waitFor()).thenAnswer(new Answer[Int]() {
override def answer(invocation: InvocationOnMock): Int = {