You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/05/14 13:58:32 UTC

[spark] branch branch-3.0 updated: [SPARK-30973][SQL] ScriptTransformationExec should wait for the termination …

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new f5cf11c  [SPARK-30973][SQL] ScriptTransformationExec should wait for the termination …
f5cf11c is described below

commit f5cf11c4d39190f7f5f8a20c8c634c0dc2d6c212
Author: sunke.03 <su...@bytedance.com>
AuthorDate: Thu May 14 13:55:24 2020 +0000

    [SPARK-30973][SQL] ScriptTransformationExec should wait for the termination …
    
    ### What changes were proposed in this pull request?
    
    This PR try to fix a bug in `org.apache.spark.sql.hive.execution.ScriptTransformationExec`. This bug appears in our online cluster.  `ScriptTransformationExec` should throw an exception, when user uses a python script which contains parse error.  But current implementation may miss this case of failure.
    
    ### Why are the changes needed?
    
    When user uses a python script which contains a parse error, there will be no output. So  `scriptOutputReader.next(scriptOutputWritable) <= 0` matches, then we use `checkFailureAndPropagate()` to check the `proc`.  But the `proc` may still be alive and `writerThread.exception` is not defined,  `checkFailureAndPropagate` cannot check this case of failure.  In the end, the Spark SQL job runs successfully and returns no result. In fact, the SparK SQL job should fails and shows the except [...]
    
    For example, the error python script is blow.
    ``` python
    # encoding: utf8
    import unknow_module
    import sys
    
    for line in sys.stdin:
        print line
    ```
    The bug can be reproduced by running the following code in our cluter.
    ```
    spark.range(100*100).toDF("index").createOrReplaceTempView("test")
    spark.sql("select TRANSFORM(index) USING 'python error_python.py' as new_index from test").collect.foreach(println)
    ```
    
    ### Does this PR introduce any user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Existing UT
    
    Closes #27724 from slamke/transformation.
    
    Authored-by: sunke.03 <su...@bytedance.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit ddbce4edee6d4de30e6900bc0f03728a989aef0a)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../org/apache/spark/sql/internal/SQLConf.scala    |  9 ++++++
 .../hive/execution/ScriptTransformationExec.scala  | 12 +++++++-
 .../hive/execution/ScriptTransformationSuite.scala | 36 ++++++++++++++++++++++
 3 files changed, 56 insertions(+), 1 deletion(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 6c18280..31038a0e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -2561,6 +2561,15 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val SCRIPT_TRANSFORMATION_EXIT_TIMEOUT =
+    buildConf("spark.sql.scriptTransformation.exitTimeoutInSeconds")
+      .internal()
+      .doc("Timeout for executor to wait for the termination of transformation script when EOF.")
+      .version("3.0.0")
+      .timeConf(TimeUnit.SECONDS)
+      .checkValue(_ > 0, "The timeout value must be positive")
+      .createWithDefault(10L)
+
   /**
    * Holds information about keys that have been deprecated.
    *
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformationExec.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformationExec.scala
index 40f7b4e..c7183fd 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformationExec.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/ScriptTransformationExec.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.hive.execution
 import java.io._
 import java.nio.charset.StandardCharsets
 import java.util.Properties
+import java.util.concurrent.TimeUnit
 import javax.annotation.Nullable
 
 import scala.collection.JavaConverters._
@@ -42,6 +43,7 @@ import org.apache.spark.sql.catalyst.plans.physical.Partitioning
 import org.apache.spark.sql.execution._
 import org.apache.spark.sql.hive.HiveInspectors
 import org.apache.spark.sql.hive.HiveShim._
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.DataType
 import org.apache.spark.util.{CircularBuffer, RedirectThread, SerializableConfiguration, Utils}
 
@@ -136,6 +138,15 @@ case class ScriptTransformationExec(
             throw writerThread.exception.get
           }
 
+          // There can be a lag between reader read EOF and the process termination.
+          // If the script fails to startup, this kind of error may be missed.
+          // So explicitly waiting for the process termination.
+          val timeout = conf.getConf(SQLConf.SCRIPT_TRANSFORMATION_EXIT_TIMEOUT)
+          val exitRes = proc.waitFor(timeout, TimeUnit.SECONDS)
+          if (!exitRes) {
+            log.warn(s"Transformation script process exits timeout in $timeout seconds")
+          }
+
           if (!proc.isAlive) {
             val exitCode = proc.exitValue()
             if (exitCode != 0) {
@@ -173,7 +184,6 @@ case class ScriptTransformationExec(
                     // Ideally the proc should *not* be alive at this point but
                     // there can be a lag between EOF being written out and the process
                     // being terminated. So explicitly waiting for the process to be done.
-                    proc.waitFor()
                     checkFailureAndPropagate()
                     return false
                 }
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala
index 7153d3f..b97eb86 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/ScriptTransformationSuite.scala
@@ -227,6 +227,42 @@ class ScriptTransformationSuite extends SparkPlanTest with SQLTestUtils with Tes
         'e.cast("string")).collect())
     }
   }
+
+  test("SPARK-30973: TRANSFORM should wait for the termination of the script (no serde)") {
+    assume(TestUtils.testCommandAvailable("/bin/bash"))
+
+    val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a")
+    val e = intercept[SparkException] {
+      val plan =
+        new ScriptTransformationExec(
+          input = Seq(rowsDf.col("a").expr),
+          script = "some_non_existent_command",
+          output = Seq(AttributeReference("a", StringType)()),
+          child = rowsDf.queryExecution.sparkPlan,
+          ioschema = noSerdeIOSchema)
+      SparkPlanTest.executePlan(plan, hiveContext)
+    }
+    assert(e.getMessage.contains("Subprocess exited with status"))
+    assert(uncaughtExceptionHandler.exception.isEmpty)
+  }
+
+  test("SPARK-30973: TRANSFORM should wait for the termination of the script (with serde)") {
+    assume(TestUtils.testCommandAvailable("/bin/bash"))
+
+    val rowsDf = Seq("a", "b", "c").map(Tuple1.apply).toDF("a")
+    val e = intercept[SparkException] {
+      val plan =
+        new ScriptTransformationExec(
+          input = Seq(rowsDf.col("a").expr),
+          script = "some_non_existent_command",
+          output = Seq(AttributeReference("a", StringType)()),
+          child = rowsDf.queryExecution.sparkPlan,
+          ioschema = serdeIOSchema)
+      SparkPlanTest.executePlan(plan, hiveContext)
+    }
+    assert(e.getMessage.contains("Subprocess exited with status"))
+    assert(uncaughtExceptionHandler.exception.isEmpty)
+  }
 }
 
 private case class ExceptionInjectingOperator(child: SparkPlan) extends UnaryExecNode {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org