You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2015/04/23 19:35:26 UTC
spark git commit: [SPARK-7044] [SQL] Fix the deadlock in script
transformation
Repository: spark
Updated Branches:
refs/heads/master 975f53e4f -> cc48e6387
[SPARK-7044] [SQL] Fix the deadlock in script transformation
Author: Cheng Hao <ha...@intel.com>
Closes #5625 from chenghao-intel/transform and squashes the following commits:
5ec1dd2 [Cheng Hao] fix the deadlock issue in ScriptTransform
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cc48e638
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cc48e638
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cc48e638
Branch: refs/heads/master
Commit: cc48e6387abdd909921cb58e0588cdf226556bcd
Parents: 975f53e
Author: Cheng Hao <ha...@intel.com>
Authored: Thu Apr 23 10:35:22 2015 -0700
Committer: Reynold Xin <rx...@databricks.com>
Committed: Thu Apr 23 10:35:22 2015 -0700
----------------------------------------------------------------------
.../hive/execution/ScriptTransformation.scala | 33 +++++++++++++-------
.../sql/hive/execution/SQLQuerySuite.scala | 8 +++++
2 files changed, 29 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/cc48e638/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
index cab0fdd..3eddda3 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformation.scala
@@ -145,20 +145,29 @@ case class ScriptTransformation(
val dataOutputStream = new DataOutputStream(outputStream)
val outputProjection = new InterpretedProjection(input, child.output)
- iter
- .map(outputProjection)
- .foreach { row =>
- if (inputSerde == null) {
- val data = row.mkString("", ioschema.inputRowFormatMap("TOK_TABLEROWFORMATFIELD"),
- ioschema.inputRowFormatMap("TOK_TABLEROWFORMATLINES")).getBytes("utf-8")
-
- outputStream.write(data)
- } else {
- val writable = inputSerde.serialize(row.asInstanceOf[GenericRow].values, inputSoi)
- prepareWritable(writable).write(dataOutputStream)
+ // Put the write(output to the pipeline) into a single thread
+ // and keep the collector as remain in the main thread.
+ // otherwise it will causes deadlock if the data size greater than
+ // the pipeline / buffer capacity.
+ new Thread(new Runnable() {
+ override def run(): Unit = {
+ iter
+ .map(outputProjection)
+ .foreach { row =>
+ if (inputSerde == null) {
+ val data = row.mkString("", ioschema.inputRowFormatMap("TOK_TABLEROWFORMATFIELD"),
+ ioschema.inputRowFormatMap("TOK_TABLEROWFORMATLINES")).getBytes("utf-8")
+
+ outputStream.write(data)
+ } else {
+ val writable = inputSerde.serialize(row.asInstanceOf[GenericRow].values, inputSoi)
+ prepareWritable(writable).write(dataOutputStream)
+ }
}
+ outputStream.close()
}
- outputStream.close()
+ }).start()
+
iterator
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/cc48e638/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 47b4cb9..4f8d0ac 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -561,4 +561,12 @@ class SQLQuerySuite extends QueryTest {
sql("select d from dn union all select d * 2 from dn")
.queryExecution.analyzed
}
+
+ test("test script transform") {
+ val data = (1 to 100000).map { i => (i, i, i) }
+ data.toDF("d1", "d2", "d3").registerTempTable("script_trans")
+ assert(100000 ===
+ sql("SELECT TRANSFORM (d1, d2, d3) USING 'cat' AS (a,b,c) FROM script_trans")
+ .queryExecution.toRdd.count())
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org