You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2014/12/24 01:08:55 UTC
spark git commit: [SPARK-4606] Send EOF to child JVM when there's no
more data to read.
Repository: spark
Updated Branches:
refs/heads/master 3f5f4cc4e -> 7e2deb71c
[SPARK-4606] Send EOF to child JVM when there's no more data to read.
Author: Marcelo Vanzin <va...@cloudera.com>
Closes #3460 from vanzin/SPARK-4606 and squashes the following commits:
031207d [Marcelo Vanzin] [SPARK-4606] Send EOF to child JVM when there's no more data to read.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7e2deb71
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7e2deb71
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7e2deb71
Branch: refs/heads/master
Commit: 7e2deb71c4239564631b19c748e95c3d1aa1c77d
Parents: 3f5f4cc
Author: Marcelo Vanzin <va...@cloudera.com>
Authored: Tue Dec 23 16:02:59 2014 -0800
Committer: Josh Rosen <jo...@databricks.com>
Committed: Tue Dec 23 16:07:59 2014 -0800
----------------------------------------------------------------------
.../deploy/SparkSubmitDriverBootstrapper.scala | 3 ++-
.../scala/org/apache/spark/util/Utils.scala | 24 ++++++++++++++------
2 files changed, 19 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/7e2deb71/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala
index d2687fa..2eab998 100644
--- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala
@@ -151,7 +151,8 @@ private[spark] object SparkSubmitDriverBootstrapper {
val isWindows = Utils.isWindows
val isSubprocess = sys.env.contains("IS_SUBPROCESS")
if (!isWindows) {
- val stdinThread = new RedirectThread(System.in, process.getOutputStream, "redirect stdin")
+ val stdinThread = new RedirectThread(System.in, process.getOutputStream, "redirect stdin",
+ propagateEof = true)
stdinThread.start()
// Spark submit (JVM) may run as a subprocess, and so this JVM should terminate on
// broken pipe, signaling that the parent process has exited. This is the case if the
http://git-wip-us.apache.org/repos/asf/spark/blob/7e2deb71/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 8c00f2c..0d771ba 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -1847,19 +1847,29 @@ private[spark] object Utils extends Logging {
/**
* A utility class to redirect the child process's stdout or stderr.
*/
-private[spark] class RedirectThread(in: InputStream, out: OutputStream, name: String)
+private[spark] class RedirectThread(
+ in: InputStream,
+ out: OutputStream,
+ name: String,
+ propagateEof: Boolean = false)
extends Thread(name) {
setDaemon(true)
override def run() {
scala.util.control.Exception.ignoring(classOf[IOException]) {
// FIXME: We copy the stream on the level of bytes to avoid encoding problems.
- val buf = new Array[Byte](1024)
- var len = in.read(buf)
- while (len != -1) {
- out.write(buf, 0, len)
- out.flush()
- len = in.read(buf)
+ try {
+ val buf = new Array[Byte](1024)
+ var len = in.read(buf)
+ while (len != -1) {
+ out.write(buf, 0, len)
+ out.flush()
+ len = in.read(buf)
+ }
+ } finally {
+ if (propagateEof) {
+ out.close()
+ }
}
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org