You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2019/05/31 10:16:25 UTC

[GitHub] [spark] HyukjinKwon commented on a change in pull request #24752: [WIP][SPARK-27893][SQL][PYTHON] Create an integrated test base for Python, Scalar Pandas, Scala UDF by sql files

HyukjinKwon commented on a change in pull request #24752: [WIP][SPARK-27893][SQL][PYTHON] Create an integrated test base for Python, Scalar Pandas, Scala UDF by sql files
URL: https://github.com/apache/spark/pull/24752#discussion_r289334248
 
 

 ##########
 File path: sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
 ##########
 @@ -442,3 +520,205 @@ class SQLQueryTestSuite extends QueryTest with SharedSQLContext {
     }
   }
 }
+
+
+/**
+ * This object targets to integrate various UDF test cases so that Scalar UDF, Python UDF and
+ * Scalar Pandas UDFs can be tested in SBT & Maven tests.
+ *
+ * The available UDFs cast input to strings and take one column as input with a string type
+ * column as output.
+ *
+ * To register Scala UDF in SQL:
+ * {{{
+ *   IntegratedUDFTestUtils.registerTestUDF(new TestScalaUDF, spark)
+ * }}}
+ *
+ * To register Python UDF in SQL:
+ * {{{
+ *   IntegratedUDFTestUtils.registerTestUDF(new TestPythonUDF, spark)
+ * }}}
+ *
+ * To register Scalar Pandas UDF in SQL:
+ * {{{
+ *   IntegratedUDFTestUtils.registerTestUDF(new TestScalarPandasUDF, spark)
+ * }}}
+ *
+ * To use it in Scala API and SQL:
+ * {{{
+ *   sql("SELECT udf(1)")
+ *   spark.select(expr("udf(1)")
+ * }}}
+ *
+ * They are currently registered as the name 'udf' in function registry.
+ */
+object IntegratedUDFTestUtils extends SQLHelper with Logging {
+  import scala.sys.process._
+
+  lazy val pythonExec: String = {
+    val pythonExec = sys.env.getOrElse("PYSPARK_PYTHON", "python3.6")
+    if (TestUtils.testCommandAvailable(pythonExec)) {
+      pythonExec
+    } else {
+      "python"
+    }
+  }
+
+  private lazy val isPythonAvailable: Boolean = TestUtils.testCommandAvailable(pythonExec)
+
+  private lazy val isPySparkAvailable: Boolean = isPythonAvailable && Try {
+    Process(
+      Seq(pythonExec, "-c", "import pyspark"),
+      None,
+      "PYTHONPATH" -> s"$sourcePythonPath:$pythonPath").!!
+    true
+  }.getOrElse(false)
+
+  private val pythonPath = sys.env.getOrElse("PYTHONPATH", "")
+  private val sparkHome = if (sys.props.contains(Tests.IS_TESTING.key)) {
+    assert(sys.props.contains("spark.test.home"), "spark.test.home is not set.")
+    sys.props("spark.test.home")
+  } else {
+    assert(sys.env.contains("SPARK_HOME"), "SPARK_HOME is not set.")
+    sys.env.get("SPARK_HOME").orNull
+  }
+  // Note that we will directly refer python from source, not built-in zip. It is possible
+  // the test is being ran without a regular build.
+  private val sourcePythonPath = new File(sparkHome, "python").getAbsolutePath
+
+  private lazy val isPandasAvailable: Boolean = isPythonAvailable && isPySparkAvailable && Try {
+    Process(
+      Seq(
+        pythonExec,
+        "-c",
+        "from pyspark.sql.utils import require_minimum_pandas_version;" +
+          "require_minimum_pandas_version()"),
+      None,
+      "PYTHONPATH" -> s"$sourcePythonPath:$pythonPath").!!
+    true
+  }.getOrElse(false)
+
+  private lazy val isPyArrowAvailable: Boolean = isPythonAvailable && isPySparkAvailable  && Try {
+    Process(
+      Seq(
+        pythonExec,
+        "-c",
+        "from pyspark.sql.utils import require_minimum_pyarrow_version;" +
+          "require_minimum_pyarrow_version()"),
+      None,
+      "PYTHONPATH" -> s"$sourcePythonPath:$pythonPath").!!
+    true
+  }.getOrElse(false)
+
+  private lazy val pythonVer = if (isPythonAvailable) {
+    Process(
+      Seq(pythonExec, "-c", "import sys; print('%d.%d' % sys.version_info[:2])"),
+      None,
+      "PYTHONPATH" -> s"$sourcePythonPath:$pythonPath").!!.trim()
+  } else {
+    throw new RuntimeException(s"Python executable [$pythonExec] is unavailable.")
+  }
+
+  lazy val shouldTestPythonUDFs: Boolean = isPythonAvailable && isPySparkAvailable
+
+  lazy val shouldTestScalarPandasUDFs: Boolean =
+    isPythonAvailable && isPandasAvailable && isPyArrowAvailable
+
+  // Dynamically pickles and reads into JVM side in order to mimic Python native function within
+  // Python UDFs.
+  private lazy val pythonFunc: Array[Byte] = if (isPythonAvailable) {
+    var binaryPythonFunc: Array[Byte] = null
+    withTempPath { path =>
+      Process(
+        Seq(
+          pythonExec,
+          "-c",
+          "from pyspark.sql.types import StringType; " +
+            "from pyspark import cloudpickle; " +
+            s"cloudpickle.dump((lambda x: str(x), StringType()), open('$path', 'wb'))"),
 
 Review comment:
   makes sense. let me address the comments soon

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org