You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2015/04/23 19:35:26 UTC

spark git commit: [SPARK-7044] [SQL] Fix the deadlock in script transformation

Repository: spark
Updated Branches:
  refs/heads/master 975f53e4f -> cc48e6387


[SPARK-7044] [SQL] Fix the deadlock in script transformation

Author: Cheng Hao <ha...@intel.com>

Closes #5625 from chenghao-intel/transform and squashes the following commits:

5ec1dd2 [Cheng Hao] fix the deadlock issue in ScriptTransform


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cc48e638
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cc48e638
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cc48e638

Branch: refs/heads/master
Commit: cc48e6387abdd909921cb58e0588cdf226556bcd
Parents: 975f53e
Author: Cheng Hao <ha...@intel.com>
Authored: Thu Apr 23 10:35:22 2015 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Thu Apr 23 10:35:22 2015 -0700

----------------------------------------------------------------------
 .../hive/execution/ScriptTransformation.scala   | 33 +++++++++++++-------
 .../sql/hive/execution/SQLQuerySuite.scala      |  8 +++++
 2 files changed, 29 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/cc48e638/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
index cab0fdd..3eddda3 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
@@ -145,20 +145,29 @@ case class ScriptTransformation(
       val dataOutputStream = new DataOutputStream(outputStream)
       val outputProjection = new InterpretedProjection(input, child.output)
 
-      iter
-        .map(outputProjection)
-        .foreach { row =>
-          if (inputSerde == null) {
-            val data = row.mkString("", ioschema.inputRowFormatMap("TOK_TABLEROWFORMATFIELD"),
-            ioschema.inputRowFormatMap("TOK_TABLEROWFORMATLINES")).getBytes("utf-8")
- 
-            outputStream.write(data)
-          } else {
-            val writable = inputSerde.serialize(row.asInstanceOf[GenericRow].values, inputSoi)
-            prepareWritable(writable).write(dataOutputStream)
+      // Put the write(output to the pipeline) into a single thread
+      // and keep the collector as remain in the main thread.
+      // otherwise it will causes deadlock if the data size greater than
+      // the pipeline / buffer capacity.
+      new Thread(new Runnable() {
+        override def run(): Unit = {
+          iter
+            .map(outputProjection)
+            .foreach { row =>
+            if (inputSerde == null) {
+              val data = row.mkString("", ioschema.inputRowFormatMap("TOK_TABLEROWFORMATFIELD"),
+                ioschema.inputRowFormatMap("TOK_TABLEROWFORMATLINES")).getBytes("utf-8")
+
+              outputStream.write(data)
+            } else {
+              val writable = inputSerde.serialize(row.asInstanceOf[GenericRow].values, inputSoi)
+              prepareWritable(writable).write(dataOutputStream)
+            }
           }
+          outputStream.close()
         }
-      outputStream.close()
+      }).start()
+
       iterator
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/cc48e638/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 47b4cb9..4f8d0ac 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -561,4 +561,12 @@ class SQLQuerySuite extends QueryTest {
     sql("select d from dn union all select d * 2 from dn")
       .queryExecution.analyzed
   }
+
+  test("test script transform") {
+    val data = (1 to 100000).map { i => (i, i, i) }
+    data.toDF("d1", "d2", "d3").registerTempTable("script_trans")
+    assert(100000 ===
+      sql("SELECT TRANSFORM (d1, d2, d3) USING 'cat' AS (a,b,c) FROM script_trans")
+      .queryExecution.toRdd.count())
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org