You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2015/06/29 21:46:44 UTC

spark git commit: [SPARK-7862] [SQL] Disable the error message redirect to stderr

Repository: spark
Updated Branches:
  refs/heads/master 637b4eeda -> c6ba2ea34


[SPARK-7862] [SQL] Disable the error message redirect to stderr

This is a follow up of #6404, the ScriptTransformation prints the error msg into stderr directly, probably be a disaster for application log.

Author: Cheng Hao <ha...@intel.com>

Closes #6882 from chenghao-intel/verbose and squashes the following commits:

bfedd77 [Cheng Hao] revert the write
76ff46b [Cheng Hao] update the CircularBuffer
692b19e [Cheng Hao] check the process exitValue for ScriptTransform
47e0970 [Cheng Hao] Use the RedirectThread instead
1de771d [Cheng Hao] naming the threads in ScriptTransformation
8536e81 [Cheng Hao] disable the error message redirection for stderr


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c6ba2ea3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c6ba2ea3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c6ba2ea3

Branch: refs/heads/master
Commit: c6ba2ea341ad23de265d870669b25e6a41f461e5
Parents: 637b4ee
Author: Cheng Hao <ha...@intel.com>
Authored: Mon Jun 29 12:46:33 2015 -0700
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Mon Jun 29 12:46:33 2015 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/util/Utils.scala     | 33 +++++++++++++
 .../org/apache/spark/util/UtilsSuite.scala      |  8 +++
 .../spark/sql/hive/client/ClientWrapper.scala   | 29 ++---------
 .../hive/execution/ScriptTransformation.scala   | 51 ++++++++++++--------
 .../sql/hive/execution/SQLQuerySuite.scala      |  2 +-
 5 files changed, 77 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/c6ba2ea3/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 19157af..a7fc749 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -2333,3 +2333,36 @@ private[spark] class RedirectThread(
     }
   }
 }
+
+/**
+ * An [[OutputStream]] that will store the last 10 kilobytes (by default) written to it
+ * in a circular buffer. The current contents of the buffer can be accessed using
+ * the toString method.
+ */
+private[spark] class CircularBuffer(sizeInBytes: Int = 10240) extends java.io.OutputStream {
+  var pos: Int = 0
+  var buffer = new Array[Int](sizeInBytes)
+
+  def write(i: Int): Unit = {
+    buffer(pos) = i
+    pos = (pos + 1) % buffer.length
+  }
+
+  override def toString: String = {
+    val (end, start) = buffer.splitAt(pos)
+    val input = new java.io.InputStream {
+      val iterator = (start ++ end).iterator
+
+      def read(): Int = if (iterator.hasNext) iterator.next() else -1
+    }
+    val reader = new BufferedReader(new InputStreamReader(input))
+    val stringBuilder = new StringBuilder
+    var line = reader.readLine()
+    while (line != null) {
+      stringBuilder.append(line)
+      stringBuilder.append("\n")
+      line = reader.readLine()
+    }
+    stringBuilder.toString()
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/c6ba2ea3/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index a61ea39..baa4c66 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -673,4 +673,12 @@ class UtilsSuite extends SparkFunSuite with ResetSystemProperties with Logging {
     assert(!Utils.isInDirectory(nullFile, parentDir))
     assert(!Utils.isInDirectory(nullFile, childFile3))
   }
+
+  test("circular buffer") {
+    val buffer = new CircularBuffer(25)
+    val stream = new java.io.PrintStream(buffer, true, "UTF-8")
+
+    stream.println("test circular test circular test circular test circular test circular")
+    assert(buffer.toString === "t circular test circular\n")
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/c6ba2ea3/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
index 4c708ce..cbd2bf6 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/ClientWrapper.scala
@@ -22,6 +22,8 @@ import java.net.URI
 import java.util.{ArrayList => JArrayList, Map => JMap, List => JList, Set => JSet}
 import javax.annotation.concurrent.GuardedBy
 
+import org.apache.spark.util.CircularBuffer
+
 import scala.collection.JavaConversions._
 import scala.language.reflectiveCalls
 
@@ -66,32 +68,7 @@ private[hive] class ClientWrapper(
   with Logging {
 
   // Circular buffer to hold what hive prints to STDOUT and ERR.  Only printed when failures occur.
-  private val outputBuffer = new java.io.OutputStream {
-    var pos: Int = 0
-    var buffer = new Array[Int](10240)
-    def write(i: Int): Unit = {
-      buffer(pos) = i
-      pos = (pos + 1) % buffer.size
-    }
-
-    override def toString: String = {
-      val (end, start) = buffer.splitAt(pos)
-      val input = new java.io.InputStream {
-        val iterator = (start ++ end).iterator
-
-        def read(): Int = if (iterator.hasNext) iterator.next() else -1
-      }
-      val reader = new BufferedReader(new InputStreamReader(input))
-      val stringBuilder = new StringBuilder
-      var line = reader.readLine()
-      while(line != null) {
-        stringBuilder.append(line)
-        stringBuilder.append("\n")
-        line = reader.readLine()
-      }
-      stringBuilder.toString()
-    }
-  }
+  private val outputBuffer = new CircularBuffer()
 
   private val shim = version match {
     case hive.v12 => new Shim_v0_12()

http://git-wip-us.apache.org/repos/asf/spark/blob/c6ba2ea3/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
index 6118880..b967e19 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
@@ -35,7 +35,7 @@ import org.apache.spark.sql.execution._
 import org.apache.spark.sql.hive.HiveShim._
 import org.apache.spark.sql.hive.{HiveContext, HiveInspectors}
 import org.apache.spark.sql.types.DataType
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{CircularBuffer, RedirectThread, Utils}
 
 /**
  * Transforms the input by forking and running the specified script.
@@ -59,15 +59,13 @@ case class ScriptTransformation(
     child.execute().mapPartitions { iter =>
       val cmd = List("/bin/bash", "-c", script)
       val builder = new ProcessBuilder(cmd)
-      // redirectError(Redirect.INHERIT) would consume the error output from buffer and
-      // then print it to stderr (inherit the target from the current Scala process).
-      // If without this there would be 2 issues:
+      // We need to start threads connected to the process pipeline:
       // 1) The error msg generated by the script process would be hidden.
       // 2) If the error msg is too big to chock up the buffer, the input logic would be hung
-      builder.redirectError(Redirect.INHERIT)
       val proc = builder.start()
       val inputStream = proc.getInputStream
       val outputStream = proc.getOutputStream
+      val errorStream = proc.getErrorStream
       val reader = new BufferedReader(new InputStreamReader(inputStream))
 
       val (outputSerde, outputSoi) = ioschema.initOutputSerDe(output)
@@ -152,29 +150,43 @@ case class ScriptTransformation(
       val dataOutputStream = new DataOutputStream(outputStream)
       val outputProjection = new InterpretedProjection(input, child.output)
 
+      // TODO make the 2048 configurable?
+      val stderrBuffer = new CircularBuffer(2048)
+      // Consume the error stream from the pipeline, otherwise it will be blocked if
+      // the pipeline is full.
+      new RedirectThread(errorStream, // input stream from the pipeline
+        stderrBuffer,                 // output to a circular buffer
+        "Thread-ScriptTransformation-STDERR-Consumer").start()
+
       // Put the write(output to the pipeline) into a single thread
       // and keep the collector as remain in the main thread.
       // otherwise it will causes deadlock if the data size greater than
       // the pipeline / buffer capacity.
       new Thread(new Runnable() {
         override def run(): Unit = {
-          iter
-            .map(outputProjection)
-            .foreach { row =>
-            if (inputSerde == null) {
-              val data = row.mkString("", ioschema.inputRowFormatMap("TOK_TABLEROWFORMATFIELD"),
-                ioschema.inputRowFormatMap("TOK_TABLEROWFORMATLINES")).getBytes("utf-8")
-
-              outputStream.write(data)
-            } else {
-              val writable = inputSerde.serialize(
-                row.asInstanceOf[GenericInternalRow].values, inputSoi)
-              prepareWritable(writable).write(dataOutputStream)
+          Utils.tryWithSafeFinally {
+            iter
+              .map(outputProjection)
+              .foreach { row =>
+              if (inputSerde == null) {
+                val data = row.mkString("", ioschema.inputRowFormatMap("TOK_TABLEROWFORMATFIELD"),
+                  ioschema.inputRowFormatMap("TOK_TABLEROWFORMATLINES")).getBytes("utf-8")
+
+                outputStream.write(data)
+              } else {
+                val writable = inputSerde.serialize(
+                  row.asInstanceOf[GenericInternalRow].values, inputSoi)
+                prepareWritable(writable).write(dataOutputStream)
+              }
+            }
+            outputStream.close()
+          } {
+            if (proc.waitFor() != 0) {
+              logError(stderrBuffer.toString) // log the stderr circular buffer
             }
           }
-          outputStream.close()
         }
-      }).start()
+      }, "Thread-ScriptTransformation-Feed").start()
 
       iterator
     }
@@ -278,3 +290,4 @@ case class HiveScriptIOSchema (
     }
   }
 }
+

http://git-wip-us.apache.org/repos/asf/spark/blob/c6ba2ea3/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index f0aad8d..9f7e58f 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -653,7 +653,7 @@ class SQLQuerySuite extends QueryTest {
         .queryExecution.toRdd.count())
   }
 
-  ignore("test script transform for stderr") {
+  test("test script transform for stderr") {
     val data = (1 to 100000).map { i => (i, i, i) }
     data.toDF("d1", "d2", "d3").registerTempTable("script_trans")
     assert(0 ===


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org