You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/05/14 13:58:32 UTC
[spark] branch branch-3.0 updated: [SPARK-30973][SQL] ScriptTransformationExec should wait for the termination …
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new f5cf11c [SPARK-30973][SQL] ScriptTransformationExec should wait for the termination …
f5cf11c is described below
commit f5cf11c4d39190f7f5f8a20c8c634c0dc2d6c212
Author: sunke.03 <su...@bytedance.com>
AuthorDate: Thu May 14 13:55:24 2020 +0000
[SPARK-30973][SQL] ScriptTransformationExec should wait for the termination …
### What changes were proposed in this pull request?
This PR try to fix a bug in `org.apache.spark.sql.hive.execution.ScriptTransformationExec`. This bug appears in our online cluster. `ScriptTransformationExec` should throw an exception, when user uses a python script which contains parse error. But current implementation may miss this case of failure.
### Why are the changes needed?
When user uses a python script which contains a parse error, there will be no output. So `scriptOutputReader.next(scriptOutputWritable) <= 0` matches, then we use `checkFailureAndPropagate()` to check the `proc`. But the `proc` may still be alive and `writerThread.exception` is not defined, `checkFailureAndPropagate` cannot check this case of failure. In the end, the Spark SQL job runs successfully and returns no result. In fact, the SparK SQL job should fails and shows the except [...]
For example, the error python script is blow.
``` python
# encoding: utf8
import unknow_module
import sys
for line in sys.stdin:
print line
```
The bug can be reproduced by running the following code in our cluter.
```
spark.range(100*100).toDF("index").createOrReplaceTempView("test")
spark.sql("select TRANSFORM(index) USING 'python error_python.py' as new_index from test").collect.foreach(println)
```
### Does this PR introduce any user-facing change?
No
### How was this patch tested?
Existing UT
Closes #27724 from slamke/transformation.
Authored-by: sunke.03 <su...@bytedance.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
(cherry picked from commit ddbce4edee6d4de30e6900bc0f03728a989aef0a)
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../org/apache/spark/sql/internal/SQLConf.scala | 9 ++++++
.../hive/execution/ScriptTransformationExec.scala | 12 +++++++-
.../hive/execution/ScriptTransformationSuite.scala | 36 ++++++++++++++++++++++
3 files changed, 56 insertions(+), 1 deletion(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 6c18280..31038a0e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -2561,6 +2561,15 @@ object SQLConf {
.booleanConf
.createWithDefault(false)
+ val SCRIPT_TRANSFORMATION_EXIT_TIMEOUT =
+ buildConf("spark.sql.scriptTransformation.exitTimeoutInSeconds")
+ .internal()
+ .doc("Timeout for executor to wait for the termination of transformation script when EOF.")
+ .version("3.0.0")
+ .timeConf(TimeUnit.SECONDS)
+ .checkValue(_ > 0, "The timeout value must be positive")
+ .createWithDefault(10L)
+
/**
* Holds information about keys that have been deprecated.
*
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformationExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformationExec.scala
index 40f7b4e..c7183fd 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformationExec.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformationExec.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.hive.execution
import java.io._
import java.nio.charset.StandardCharsets
import java.util.Properties
+import java.util.concurrent.TimeUnit
import javax.annotation.Nullable
import scala.collection.JavaConverters._
@@ -42,6 +43,7 @@ import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution._
import org.apache.spark.sql.hive.HiveInspectors
import org.apache.spark.sql.hive.HiveShim._
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.DataType
import org.apache.spark.util.{CircularBuffer, RedirectThread, SerializableConfiguration, Utils}
@@ -136,6 +138,15 @@ case class ScriptTransformationExec(
throw writerThread.exception.get
}
+ // There can be a lag between reader read EOF and the process termination.
+ // If the script fails to startup, this kind of error may be missed.
+ // So explicitly waiting for the process termination.
+ val timeout = conf.getConf(SQLConf.SCRIPT_TRANSFORMATION_EXIT_TIMEOUT)
+ val exitRes = proc.waitFor(timeout, TimeUnit.SECONDS)
+ if (!exitRes) {
+ log.warn(s"Transformation script process exits timeout in $timeout seconds")
+ }
+
if (!proc.isAlive) {
val exitCode = proc.exitValue()
if (exitCode != 0) {
@@ -173,7 +184,6 @@ case class ScriptTransformationExec(
// Ideally the proc should *not* be alive at this point but
// there can be a lag between EOF being written out and the process
// being terminated. So explicitly waiting for the process to be done.
- proc.waitFor()
checkFailureAndPropagate()
return false
}
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala
index 7153d3f..b97eb86 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala
@@ -227,6 +227,42 @@ class ScriptTransformationSuite extends SparkPlanTest with SQLTestUtils with Tes
'e.cast("string")).collect())
}
}
+
+ test("SPARK-30973: TRANSFORM should wait for the termination of the script (no serde)") {
+ assume(TestUtils.testCommandAvailable("/bin/bash"))
+
+ val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a")
+ val e = intercept[SparkException] {
+ val plan =
+ new ScriptTransformationExec(
+ input = Seq(rowsDf.col("a").expr),
+ script = "some_non_existent_command",
+ output = Seq(AttributeReference("a", StringType)()),
+ child = rowsDf.queryExecution.sparkPlan,
+ ioschema = noSerdeIOSchema)
+ SparkPlanTest.executePlan(plan, hiveContext)
+ }
+ assert(e.getMessage.contains("Subprocess exited with status"))
+ assert(uncaughtExceptionHandler.exception.isEmpty)
+ }
+
+ test("SPARK-30973: TRANSFORM should wait for the termination of the script (with serde)") {
+ assume(TestUtils.testCommandAvailable("/bin/bash"))
+
+ val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a")
+ val e = intercept[SparkException] {
+ val plan =
+ new ScriptTransformationExec(
+ input = Seq(rowsDf.col("a").expr),
+ script = "some_non_existent_command",
+ output = Seq(AttributeReference("a", StringType)()),
+ child = rowsDf.queryExecution.sparkPlan,
+ ioschema = serdeIOSchema)
+ SparkPlanTest.executePlan(plan, hiveContext)
+ }
+ assert(e.getMessage.contains("Subprocess exited with status"))
+ assert(uncaughtExceptionHandler.exception.isEmpty)
+ }
}
private case class ExceptionInjectingOperator(child: SparkPlan) extends UnaryExecNode {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org