You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by icexelloss <gi...@git.apache.org> on 2018/06/27 22:40:20 UTC

[GitHub] spark pull request #21650: [SPARK-24624] Support mixture of Python UDF and S...

GitHub user icexelloss opened a pull request:

    https://github.com/apache/spark/pull/21650

    [SPARK-24624] Support mixture of Python UDF and Scalar Pandas UDF

    ## What changes were proposed in this pull request?
    
    This PR add supports for using mixed Python UDF and Scalar Pandas UDF, in the following two cases:
    
    (1)
    ```
    f1 = udf(lambda x: x + 1, 'int')
    f2 = pandas_udf(lambda x: x + 2, 'int')
    
    df = ...
    df = df.withColumn('foo', f1(df['v']))
    df = df.withColumn('bar', f2(df['v']))
    ```
    
    (2)
    ```
    f1 = udf(lambda x: x + 1, 'int')
    f2 = pandas_udf(lambda x: x + 2, 'int')
    
    df = ...
    df = df.withColumn('foo', f2(f1(df['v'])))
    ```
    ## How was this patch tested?
    
    New tests are added to BatchEvalPythonExecSuite and ScalarPandasUDFTests


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/icexelloss/spark SPARK-24624-mix-udf

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/21650.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #21650
    
----
commit 48ae822bcdf6df40b181f86379d275d602c580c9
Author: Li Jin <ic...@...>
Date:   2018-06-22T18:35:34Z

    wip

commit 68e665ec981c1a7cae46398bc2ea8a4880e95331
Author: Li Jin <ic...@...>
Date:   2018-06-27T22:31:25Z

    Test passes

commit 6b47b69305257e9ee9f5135968913a4f92731ef5
Author: Li Jin <ic...@...>
Date:   2018-06-27T22:34:28Z

    Remove white spaces

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    @BryanCutler I've address most of you comments and explained the ones that I didn't change. Do you mind take another look? Thanks!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21650#discussion_r202863084
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala ---
    @@ -167,7 +191,8 @@ object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper {
                 case (vectorizedUdfs, plainUdfs) if vectorizedUdfs.isEmpty =>
                   BatchEvalPythonExec(plainUdfs, child.output ++ resultAttrs, child)
                 case _ =>
    -              throw new IllegalArgumentException("Can not mix vectorized and non-vectorized UDFs")
    +              throw new AnalysisException(
    +                "Mixed Python and Scalar Pandas UDFs are not expected here")
    --- End diff --
    
    Change this to "Expected either Scalar Pandas UDFs or Batched UDFs but got both"


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21650#discussion_r202863732
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala ---
    @@ -97,6 +103,64 @@ class BatchEvalPythonExecSuite extends SparkPlanTest with SharedSQLContext {
         }
         assert(qualifiedPlanNodes.size == 1)
       }
    +
    +  private def collectPythonExec(plan: SparkPlan): Seq[BatchEvalPythonExec] = plan.collect {
    +    case b: BatchEvalPythonExec => b
    +  }
    +
    +  private def collectPandasExec(plan: SparkPlan): Seq[ArrowEvalPythonExec] = plan.collect {
    --- End diff --
    
    rename to `collectArrowExec`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21650#discussion_r202863867
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala ---
    @@ -97,6 +103,64 @@ class BatchEvalPythonExecSuite extends SparkPlanTest with SharedSQLContext {
         }
         assert(qualifiedPlanNodes.size == 1)
       }
    +
    +  private def collectPythonExec(plan: SparkPlan): Seq[BatchEvalPythonExec] = plan.collect {
    +    case b: BatchEvalPythonExec => b
    +  }
    +
    +  private def collectPandasExec(plan: SparkPlan): Seq[ArrowEvalPythonExec] = plan.collect {
    +    case b: ArrowEvalPythonExec => b
    +  }
    +
    +  test("Chained Python UDFs should be combined to a single physical node") {
    --- End diff --
    
    change to "Chained Python Batched UDFs.."


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21650#discussion_r205132748
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala ---
    @@ -94,36 +95,94 @@ object ExtractPythonUDFFromAggregate extends Rule[LogicalPlan] {
      */
     object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper {
     
    -  private def hasPythonUDF(e: Expression): Boolean = {
    +  private case class LazyEvalType(var evalType: Int = -1) {
    +
    +    def isSet: Boolean = evalType >= 0
    +
    +    def set(evalType: Int): Unit = {
    +      if (isSet) {
    +        throw new IllegalStateException("Eval type has already been set")
    +      } else {
    +        this.evalType = evalType
    +      }
    +    }
    +
    +    def get(): Int = {
    +      if (!isSet) {
    +        throw new IllegalStateException("Eval type is not set")
    +      } else {
    +        evalType
    +      }
    +    }
    +  }
    +
    +  private def hasScalarPythonUDF(e: Expression): Boolean = {
         e.find(PythonUDF.isScalarPythonUDF).isDefined
       }
     
    -  private def canEvaluateInPython(e: PythonUDF): Boolean = {
    -    e.children match {
    -      // single PythonUDF child could be chained and evaluated in Python
    -      case Seq(u: PythonUDF) => canEvaluateInPython(u)
    -      // Python UDF can't be evaluated directly in JVM
    -      case children => !children.exists(hasPythonUDF)
    +  /**
    +   * Check whether a PythonUDF expression can be evaluated in Python.
    +   *
    +   * If the lazy eval type is not set, this method checks for either Batched Python UDF and Scalar
    +   * Pandas UDF. If the lazy eval type is set, this method checks for the expression of the
    +   * specified eval type.
    +   *
    +   * This method will also set the lazy eval type to be the type of the first evaluable expression,
    +   * i.e., if lazy eval type is not set and we find a evaluable Python UDF expression, lazy eval
    +   * type will be set to the eval type of the expression.
    +   *
    +   */
    +  private def canEvaluateInPython(e: PythonUDF, lazyEvalType: LazyEvalType): Boolean = {
    --- End diff --
    
    @BryanCutler I rewrite this function using mutable state based on your suggestion. It's not quite the same as your code so please take a look and let me know if this looks better now. Thanks!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21650: [SPARK-24624] Support mixture of Python UDF and S...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21650#discussion_r198664314
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala ---
    @@ -94,36 +95,59 @@ object ExtractPythonUDFFromAggregate extends Rule[LogicalPlan] {
      */
     object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper {
     
    -  private def hasPythonUDF(e: Expression): Boolean = {
    +  private def hasScalarPythonUDF(e: Expression): Boolean = {
         e.find(PythonUDF.isScalarPythonUDF).isDefined
       }
     
    -  private def canEvaluateInPython(e: PythonUDF): Boolean = {
    -    e.children match {
    -      // single PythonUDF child could be chained and evaluated in Python
    -      case Seq(u: PythonUDF) => canEvaluateInPython(u)
    -      // Python UDF can't be evaluated directly in JVM
    -      case children => !children.exists(hasPythonUDF)
    +  private def canEvaluateInPython(e: PythonUDF, evalType: Int): Boolean = {
    +    if (e.evalType != evalType) {
    +      false
    +    } else {
    +      e.children match {
    +        // single PythonUDF child could be chained and evaluated in Python
    +        case Seq(u: PythonUDF) => canEvaluateInPython(u, evalType)
    +        // Python UDF can't be evaluated directly in JVM
    +        case children => !children.exists(hasScalarPythonUDF)
    +      }
         }
       }
     
    -  private def collectEvaluatableUDF(expr: Expression): Seq[PythonUDF] = expr match {
    -    case udf: PythonUDF if PythonUDF.isScalarPythonUDF(udf) && canEvaluateInPython(udf) => Seq(udf)
    -    case e => e.children.flatMap(collectEvaluatableUDF)
    +  private def collectEvaluableUDF(expr: Expression, evalType: Int): Seq[PythonUDF] = expr match {
    +    case udf: PythonUDF if PythonUDF.isScalarPythonUDF(udf) && canEvaluateInPython(udf, evalType) =>
    +      Seq(udf)
    +    case e => e.children.flatMap(collectEvaluableUDF(_, evalType))
    +  }
    +
    +  /**
    +   * Collect evaluable UDFs from the current node.
    +   *
    +   * This function collects Python UDFs or Scalar Python UDFs from expressions of the input node,
    +   * and returns a list of UDFs of the same eval type.
    +   *
    +   * If expressions contain both UDFs eval types, this function will only return Python UDFs.
    +   *
    +   * The caller should call this function multiple times until all evaluable UDFs are collected.
    +   */
    +  private def collectEvaluableUDFs(plan: SparkPlan): Seq[PythonUDF] = {
    +    val pythonUDFs =
    +      plan.expressions.flatMap(collectEvaluableUDF(_, PythonEvalType.SQL_BATCHED_UDF))
    +
    +    if (pythonUDFs.isEmpty) {
    +      plan.expressions.flatMap(collectEvaluableUDF(_, PythonEvalType.SQL_SCALAR_PANDAS_UDF))
    +    } else {
    +      pythonUDFs
    +    }
       }
     
       def apply(plan: SparkPlan): SparkPlan = plan transformUp {
    -    // AggregateInPandasExec and FlatMapGroupsInPandas can be evaluated directly in python worker
    -    // Therefore we don't need to extract the UDFs
    -    case plan: FlatMapGroupsInPandasExec => plan
    --- End diff --
    
    This is no longer needed because this rule will only extract Python UDF and Scalar Pandas UDF and ignore other types of UDFs


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    @BryanCutler I think your suggestion would change the behavior. Using ArrowEvalExec and BatchEvalExec are still different when it comes to corner cases, for example, type coercion (ArrowEvalExec supports type coercion but BatchEvalExec doesn't) and timestamp type (regular UDF expects Python datetime for timestamp and pandas UDF expects pd.Timestamp)
    
    I think this is probably a good future improvement but not great for this Jira because of the behavior change. WDYT?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    @HyukjinKwon  I think Bryan's imple looks promising. Please let me take a look.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    **[Test build #92482 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92482/testReport)** for PR 21650 at commit [`ce5e7f5`](https://github.com/apache/spark/commit/ce5e7f53cff3c5657fe2e99f2f2a57176d009cce).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21650#discussion_r199203674
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala ---
    @@ -94,36 +95,59 @@ object ExtractPythonUDFFromAggregate extends Rule[LogicalPlan] {
      */
     object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper {
     
    -  private def hasPythonUDF(e: Expression): Boolean = {
    +  private def hasScalarPythonUDF(e: Expression): Boolean = {
         e.find(PythonUDF.isScalarPythonUDF).isDefined
       }
     
    -  private def canEvaluateInPython(e: PythonUDF): Boolean = {
    -    e.children match {
    -      // single PythonUDF child could be chained and evaluated in Python
    -      case Seq(u: PythonUDF) => canEvaluateInPython(u)
    -      // Python UDF can't be evaluated directly in JVM
    -      case children => !children.exists(hasPythonUDF)
    +  private def canEvaluateInPython(e: PythonUDF, evalType: Int): Boolean = {
    +    if (e.evalType != evalType) {
    +      false
    +    } else {
    +      e.children match {
    +        // single PythonUDF child could be chained and evaluated in Python
    +        case Seq(u: PythonUDF) => canEvaluateInPython(u, evalType)
    +        // Python UDF can't be evaluated directly in JVM
    +        case children => !children.exists(hasScalarPythonUDF)
    +      }
         }
       }
     
    -  private def collectEvaluatableUDF(expr: Expression): Seq[PythonUDF] = expr match {
    -    case udf: PythonUDF if PythonUDF.isScalarPythonUDF(udf) && canEvaluateInPython(udf) => Seq(udf)
    -    case e => e.children.flatMap(collectEvaluatableUDF)
    +  private def collectEvaluableUDF(expr: Expression, evalType: Int): Seq[PythonUDF] = expr match {
    +    case udf: PythonUDF if PythonUDF.isScalarPythonUDF(udf) && canEvaluateInPython(udf, evalType) =>
    +      Seq(udf)
    +    case e => e.children.flatMap(collectEvaluableUDF(_, evalType))
    +  }
    +
    +  /**
    +   * Collect evaluable UDFs from the current node.
    +   *
    +   * This function collects Python UDFs or Scalar Python UDFs from expressions of the input node,
    +   * and returns a list of UDFs of the same eval type.
    --- End diff --
    
    I tried this on master and got the same exception:
    
    ```
    >>> foo = pandas_udf(lambda x: x, 'v int', PandasUDFType.GROUPED_MAP)
    >>> df.select(foo(df['v'])).show()
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
      File "/Users/icexelloss/workspace/upstream/spark/python/pyspark/sql/dataframe.py", line 353, in show
        print(self._jdf.showString(n, 20, vertical))
      File "/Users/icexelloss/workspace/upstream/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
      File "/Users/icexelloss/workspace/upstream/spark/python/pyspark/sql/utils.py", line 63, in deco
        return f(*a, **kw)
      File "/Users/icexelloss/workspace/upstream/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
    py4j.protocol.Py4JJavaError: An error occurred while calling o257.showString.
    : java.lang.UnsupportedOperationException: Cannot evaluate expression: <lambda>(input[0, bigint, false])
    	at org.apache.spark.sql.catalyst.expressions.Unevaluable$class.doGenCode(Expression.scala:261)
    	at org.apache.spark.sql.catalyst.expressions.PythonUDF.doGenCode(PythonUDF.scala:50)
    	at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:108)
    	at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:105)
    	at scala.Option.getOrElse(Option.scala:121)
            ...
    ```
    Therefore, this PR doesn't change that behavior. Both master and this PR don't extract non-scalar UDF in the expression. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    I had an idea of a slightly different approach.. Would it be possible to "promote" the regular `udf` to a `pandas_udf`?  By this I mean wrap the function using `apply()` so that it takes pd.Series as inputs and returns another pd.Series.  Then we can send the entire mix of `udf`s and `pandas_udf`s to the worker in one shot, instead of separate evaluations.  Since the user is already are using `pandas_udf`s we know that the worker supports it and I think the performance would be much better.  Is there any downside or issues with doing it this way?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    **[Test build #92443 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92443/testReport)** for PR 21650 at commit [`674e361`](https://github.com/apache/spark/commit/674e36136911839df00635eff8abb3c405e537d4).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21650#discussion_r205030035
  
    --- Diff: python/pyspark/sql/tests.py ---
    @@ -5060,6 +5049,147 @@ def test_type_annotation(self):
             df = self.spark.range(1).select(pandas_udf(f=_locals['noop'], returnType='bigint')('id'))
             self.assertEqual(df.first()[0], 0)
     
    +    def test_mixed_udf(self):
    +        import pandas as pd
    +        from pyspark.sql.functions import col, udf, pandas_udf
    +
    +        df = self.spark.range(0, 1).toDF('v')
    +
    +        # Test mixture of multiple UDFs and Pandas UDFs
    +
    +        @udf('int')
    +        def f1(x):
    +            assert type(x) == int
    +            return x + 1
    +
    +        @pandas_udf('int')
    +        def f2(x):
    +            assert type(x) == pd.Series
    +            return x + 10
    +
    +        @udf('int')
    +        def f3(x):
    +            assert type(x) == int
    +            return x + 100
    +
    +        @pandas_udf('int')
    +        def f4(x):
    +            assert type(x) == pd.Series
    +            return x + 1000
    +
    +        # Test mixed udfs in a single projection
    +        df1 = df \
    +            .withColumn('f1', f1(col('v'))) \
    +            .withColumn('f2', f2(col('v'))) \
    +            .withColumn('f3', f3(col('v'))) \
    +            .withColumn('f4', f4(col('v'))) \
    +            .withColumn('f2_f1', f2(col('f1'))) \
    +            .withColumn('f3_f1', f3(col('f1'))) \
    +            .withColumn('f4_f1', f4(col('f1'))) \
    +            .withColumn('f3_f2', f3(col('f2'))) \
    +            .withColumn('f4_f2', f4(col('f2'))) \
    +            .withColumn('f4_f3', f4(col('f3'))) \
    +            .withColumn('f3_f2_f1', f3(col('f2_f1'))) \
    +            .withColumn('f4_f2_f1', f4(col('f2_f1'))) \
    +            .withColumn('f4_f3_f1', f4(col('f3_f1'))) \
    +            .withColumn('f4_f3_f2', f4(col('f3_f2'))) \
    +            .withColumn('f4_f3_f2_f1', f4(col('f3_f2_f1')))
    +
    +        # Test mixed udfs in a single expression
    +        df2 = df \
    +            .withColumn('f1', f1(col('v'))) \
    +            .withColumn('f2', f2(col('v'))) \
    +            .withColumn('f3', f3(col('v'))) \
    +            .withColumn('f4', f4(col('v'))) \
    +            .withColumn('f2_f1', f2(f1(col('v')))) \
    +            .withColumn('f3_f1', f3(f1(col('v')))) \
    +            .withColumn('f4_f1', f4(f1(col('v')))) \
    +            .withColumn('f3_f2', f3(f2(col('v')))) \
    +            .withColumn('f4_f2', f4(f2(col('v')))) \
    +            .withColumn('f4_f3', f4(f3(col('v')))) \
    +            .withColumn('f3_f2_f1', f3(f2(f1(col('v'))))) \
    +            .withColumn('f4_f2_f1', f4(f2(f1(col('v'))))) \
    +            .withColumn('f4_f3_f1', f4(f3(f1(col('v'))))) \
    +            .withColumn('f4_f3_f2', f4(f3(f2(col('v'))))) \
    +            .withColumn('f4_f3_f2_f1', f4(f3(f2(f1(col('v'))))))
    +
    +        # expected result
    +        df3 = df \
    +            .withColumn('f1', df['v'] + 1) \
    +            .withColumn('f2', df['v'] + 10) \
    +            .withColumn('f3', df['v'] + 100) \
    +            .withColumn('f4', df['v'] + 1000) \
    +            .withColumn('f2_f1', df['v'] + 11) \
    +            .withColumn('f3_f1', df['v'] + 101) \
    +            .withColumn('f4_f1', df['v'] + 1001) \
    +            .withColumn('f3_f2', df['v'] + 110) \
    +            .withColumn('f4_f2', df['v'] + 1010) \
    +            .withColumn('f4_f3', df['v'] + 1100) \
    +            .withColumn('f3_f2_f1', df['v'] + 111) \
    +            .withColumn('f4_f2_f1', df['v'] + 1011) \
    +            .withColumn('f4_f3_f1', df['v'] + 1101) \
    +            .withColumn('f4_f3_f2', df['v'] + 1110) \
    +            .withColumn('f4_f3_f2_f1', df['v'] + 1111)
    +
    +        self.assertEquals(df3.collect(), df1.collect())
    +        self.assertEquals(df3.collect(), df2.collect())
    +
    +    def test_mixed_udf_and_sql(self):
    +        import pandas as pd
    +        from pyspark.sql.functions import udf, pandas_udf
    +
    +        df = self.spark.range(0, 1).toDF('v')
    +
    +        # Test mixture of UDFs, Pandas UDFs and SQL expression.
    +
    +        @udf('int')
    +        def f1(x):
    +            assert type(x) == int
    +            return x + 1
    +
    +        def f2(x):
    --- End diff --
    
    Ah, I see why it looks confusing. Can we add an assert here too (check if it's a column)?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/93451/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    @BryanCutler @HyukjinKwon I updated the PR based on Bryan's suggestion. Please take a look and let me know if you have further comments.
    
    Thanks!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21650#discussion_r205133506
  
    --- Diff: python/pyspark/sql/tests.py ---
    @@ -5060,6 +5049,147 @@ def test_type_annotation(self):
             df = self.spark.range(1).select(pandas_udf(f=_locals['noop'], returnType='bigint')('id'))
             self.assertEqual(df.first()[0], 0)
     
    +    def test_mixed_udf(self):
    +        import pandas as pd
    +        from pyspark.sql.functions import col, udf, pandas_udf
    +
    +        df = self.spark.range(0, 1).toDF('v')
    +
    +        # Test mixture of multiple UDFs and Pandas UDFs
    +
    +        @udf('int')
    +        def f1(x):
    +            assert type(x) == int
    +            return x + 1
    +
    +        @pandas_udf('int')
    +        def f2(x):
    +            assert type(x) == pd.Series
    +            return x + 10
    +
    +        @udf('int')
    +        def f3(x):
    +            assert type(x) == int
    +            return x + 100
    +
    +        @pandas_udf('int')
    +        def f4(x):
    +            assert type(x) == pd.Series
    +            return x + 1000
    +
    +        # Test mixed udfs in a single projection
    +        df1 = df \
    +            .withColumn('f1', f1(col('v'))) \
    +            .withColumn('f2', f2(col('v'))) \
    +            .withColumn('f3', f3(col('v'))) \
    +            .withColumn('f4', f4(col('v'))) \
    +            .withColumn('f2_f1', f2(col('f1'))) \
    +            .withColumn('f3_f1', f3(col('f1'))) \
    +            .withColumn('f4_f1', f4(col('f1'))) \
    +            .withColumn('f3_f2', f3(col('f2'))) \
    +            .withColumn('f4_f2', f4(col('f2'))) \
    +            .withColumn('f4_f3', f4(col('f3'))) \
    +            .withColumn('f3_f2_f1', f3(col('f2_f1'))) \
    +            .withColumn('f4_f2_f1', f4(col('f2_f1'))) \
    +            .withColumn('f4_f3_f1', f4(col('f3_f1'))) \
    +            .withColumn('f4_f3_f2', f4(col('f3_f2'))) \
    +            .withColumn('f4_f3_f2_f1', f4(col('f3_f2_f1')))
    +
    +        # Test mixed udfs in a single expression
    +        df2 = df \
    +            .withColumn('f1', f1(col('v'))) \
    +            .withColumn('f2', f2(col('v'))) \
    +            .withColumn('f3', f3(col('v'))) \
    +            .withColumn('f4', f4(col('v'))) \
    +            .withColumn('f2_f1', f2(f1(col('v')))) \
    +            .withColumn('f3_f1', f3(f1(col('v')))) \
    +            .withColumn('f4_f1', f4(f1(col('v')))) \
    +            .withColumn('f3_f2', f3(f2(col('v')))) \
    +            .withColumn('f4_f2', f4(f2(col('v')))) \
    +            .withColumn('f4_f3', f4(f3(col('v')))) \
    +            .withColumn('f3_f2_f1', f3(f2(f1(col('v'))))) \
    +            .withColumn('f4_f2_f1', f4(f2(f1(col('v'))))) \
    +            .withColumn('f4_f3_f1', f4(f3(f1(col('v'))))) \
    +            .withColumn('f4_f3_f2', f4(f3(f2(col('v'))))) \
    +            .withColumn('f4_f3_f2_f1', f4(f3(f2(f1(col('v'))))))
    +
    +        # expected result
    +        df3 = df \
    +            .withColumn('f1', df['v'] + 1) \
    +            .withColumn('f2', df['v'] + 10) \
    +            .withColumn('f3', df['v'] + 100) \
    +            .withColumn('f4', df['v'] + 1000) \
    +            .withColumn('f2_f1', df['v'] + 11) \
    +            .withColumn('f3_f1', df['v'] + 101) \
    +            .withColumn('f4_f1', df['v'] + 1001) \
    +            .withColumn('f3_f2', df['v'] + 110) \
    +            .withColumn('f4_f2', df['v'] + 1010) \
    +            .withColumn('f4_f3', df['v'] + 1100) \
    +            .withColumn('f3_f2_f1', df['v'] + 111) \
    +            .withColumn('f4_f2_f1', df['v'] + 1011) \
    +            .withColumn('f4_f3_f1', df['v'] + 1101) \
    +            .withColumn('f4_f3_f2', df['v'] + 1110) \
    +            .withColumn('f4_f3_f2_f1', df['v'] + 1111)
    +
    +        self.assertEquals(df3.collect(), df1.collect())
    +        self.assertEquals(df3.collect(), df2.collect())
    +
    +    def test_mixed_udf_and_sql(self):
    +        import pandas as pd
    +        from pyspark.sql.functions import udf, pandas_udf
    +
    +        df = self.spark.range(0, 1).toDF('v')
    +
    +        # Test mixture of UDFs, Pandas UDFs and SQL expression.
    +
    +        @udf('int')
    +        def f1(x):
    +            assert type(x) == int
    +            return x + 1
    +
    +        def f2(x):
    --- End diff --
    
    Added


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/92443/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21650#discussion_r205819781
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala ---
    @@ -94,36 +95,61 @@ object ExtractPythonUDFFromAggregate extends Rule[LogicalPlan] {
      */
     object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper {
     
    -  private def hasPythonUDF(e: Expression): Boolean = {
    +  private case class EvalTypeHolder(private var evalType: Int = -1) {
    --- End diff --
    
    How about this:
    
    ```scala
      private type EvalType = Int
      private type EvalTypeChecker = Option[EvalType => Boolean]
    
      private def collectEvaluableUDFsFromExpressions(expressions: Seq[Expression]): Seq[PythonUDF] = {
        // Eval type checker is set in the middle of checking because once it's found,
        // the same eval type should be checked .. blah blah
        var evalChecker: EvalTypeChecker = None
    
        def collectEvaluableUDFs(expr: Expression): Seq[PythonUDF] = expr match {
          case udf: PythonUDF if PythonUDF.isScalarPythonUDF(udf) && canEvaluateInPython(udf)
            && evalChecker.isEmpty =>
            evalChecker = Some((otherEvalType: EvalType) => otherEvalType == udf.evalType)
            collectEvaluableUDFs(expr)
          case udf: PythonUDF if PythonUDF.isScalarPythonUDF(udf) && canEvaluateInPython(udf)
            && evalChecker.get(udf.evalType) =>
            Seq(udf)
          case e => e.children.flatMap(collectEvaluableUDFs)
        }
    
        expressions.flatMap(collectEvaluableUDFs)
      }
    
      def apply(plan: SparkPlan): SparkPlan = plan transformUp {
        case plan: SparkPlan => extract(plan)
      }
    
      /**
       * Extract all the PythonUDFs from the current operator and evaluate them before the operator.
       */
      private def extract(plan: SparkPlan): SparkPlan = {
        val udfs = collectEvaluableUDFsFromExpressions(plan.expressions)
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21650#discussion_r205872386
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala ---
    @@ -94,36 +95,52 @@ object ExtractPythonUDFFromAggregate extends Rule[LogicalPlan] {
      */
     object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper {
     
    -  private def hasPythonUDF(e: Expression): Boolean = {
    +  private type EvalType = Int
    +  private type EvalTypeChecker = EvalType => Boolean
    +
    +  private def hasScalarPythonUDF(e: Expression): Boolean = {
         e.find(PythonUDF.isScalarPythonUDF).isDefined
       }
     
       private def canEvaluateInPython(e: PythonUDF): Boolean = {
         e.children match {
           // single PythonUDF child could be chained and evaluated in Python
    -      case Seq(u: PythonUDF) => canEvaluateInPython(u)
    +      case Seq(u: PythonUDF) => e.evalType == u.evalType && canEvaluateInPython(u)
           // Python UDF can't be evaluated directly in JVM
    -      case children => !children.exists(hasPythonUDF)
    +      case children => !children.exists(hasScalarPythonUDF)
         }
       }
     
    -  private def collectEvaluatableUDF(expr: Expression): Seq[PythonUDF] = expr match {
    -    case udf: PythonUDF if PythonUDF.isScalarPythonUDF(udf) && canEvaluateInPython(udf) => Seq(udf)
    -    case e => e.children.flatMap(collectEvaluatableUDF)
    +  private def collectEvaluableUDFsFromExpressions(expressions: Seq[Expression]): Seq[PythonUDF] = {
    +    // Eval type checker is set once when we find the first evaluable UDF and its value
    +    // shouldn't change later.
    +    // Used to check if subsequent UDFs are of the same type as the first UDF. (since we can only
    +    // extract UDFs of the same eval type)
    +    var evalTypeChecker: Option[EvalTypeChecker] = None
    +
    +    def collectEvaluableUDFs(expr: Expression): Seq[PythonUDF] = expr match {
    +      case udf: PythonUDF if PythonUDF.isScalarPythonUDF(udf) && canEvaluateInPython(udf)
    +        && evalTypeChecker.isEmpty =>
    +        evalTypeChecker = Some((otherEvalType: EvalType) => otherEvalType == udf.evalType)
    +        Seq(udf)
    --- End diff --
    
    @HyukjinKwon In your code this line is `collectEvaluableUDFs (udf)`. I think we should just return `Seq(udf)` to avoid checking the expression twice.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1421/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624] Support mixture of Python UDF and Scalar P...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    @icexelloss Can you also show the query plan of the examples in the PR description? Thanks.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624] Support mixture of Python UDF and Scalar P...

Posted by maropu <gi...@git.apache.org>.
Github user maropu commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    nit: Also, can you put `[SQL][PYTHON]` in the title?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21650#discussion_r202861461
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala ---
    @@ -94,36 +95,59 @@ object ExtractPythonUDFFromAggregate extends Rule[LogicalPlan] {
      */
     object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper {
     
    -  private def hasPythonUDF(e: Expression): Boolean = {
    +  private def hasScalarPythonUDF(e: Expression): Boolean = {
         e.find(PythonUDF.isScalarPythonUDF).isDefined
       }
     
    -  private def canEvaluateInPython(e: PythonUDF): Boolean = {
    -    e.children match {
    -      // single PythonUDF child could be chained and evaluated in Python
    -      case Seq(u: PythonUDF) => canEvaluateInPython(u)
    -      // Python UDF can't be evaluated directly in JVM
    -      case children => !children.exists(hasPythonUDF)
    +  private def canEvaluateInPython(e: PythonUDF, evalType: Int): Boolean = {
    +    if (e.evalType != evalType) {
    +      false
    +    } else {
    +      e.children match {
    +        // single PythonUDF child could be chained and evaluated in Python
    +        case Seq(u: PythonUDF) => canEvaluateInPython(u, evalType)
    +        // Python UDF can't be evaluated directly in JVM
    +        case children => !children.exists(hasScalarPythonUDF)
    +      }
         }
       }
     
    -  private def collectEvaluatableUDF(expr: Expression): Seq[PythonUDF] = expr match {
    -    case udf: PythonUDF if PythonUDF.isScalarPythonUDF(udf) && canEvaluateInPython(udf) => Seq(udf)
    -    case e => e.children.flatMap(collectEvaluatableUDF)
    +  private def collectEvaluableUDF(expr: Expression, evalType: Int): Seq[PythonUDF] = expr match {
    --- End diff --
    
    It's a little confusing to have this function named so similar to the one below, maybe you can combine them if just doing a single loop (see other comment).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    **[Test build #93667 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93667/testReport)** for PR 21650 at commit [`6b22fea`](https://github.com/apache/spark/commit/6b22fea5b42b40d2eb92d931e76d183518533717).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21650#discussion_r205144604
  
    --- Diff: python/pyspark/sql/tests.py ---
    @@ -5060,6 +5049,147 @@ def test_type_annotation(self):
             df = self.spark.range(1).select(pandas_udf(f=_locals['noop'], returnType='bigint')('id'))
             self.assertEqual(df.first()[0], 0)
     
    +    def test_mixed_udf(self):
    +        import pandas as pd
    +        from pyspark.sql.functions import col, udf, pandas_udf
    +
    +        df = self.spark.range(0, 1).toDF('v')
    +
    +        # Test mixture of multiple UDFs and Pandas UDFs
    +
    +        @udf('int')
    +        def f1(x):
    +            assert type(x) == int
    +            return x + 1
    +
    +        @pandas_udf('int')
    +        def f2(x):
    +            assert type(x) == pd.Series
    +            return x + 10
    +
    +        @udf('int')
    +        def f3(x):
    +            assert type(x) == int
    +            return x + 100
    +
    +        @pandas_udf('int')
    +        def f4(x):
    +            assert type(x) == pd.Series
    +            return x + 1000
    +
    +        # Test mixed udfs in a single projection
    +        df1 = df \
    +            .withColumn('f1', f1(col('v'))) \
    +            .withColumn('f2', f2(col('v'))) \
    +            .withColumn('f3', f3(col('v'))) \
    +            .withColumn('f4', f4(col('v'))) \
    +            .withColumn('f2_f1', f2(col('f1'))) \
    +            .withColumn('f3_f1', f3(col('f1'))) \
    +            .withColumn('f4_f1', f4(col('f1'))) \
    +            .withColumn('f3_f2', f3(col('f2'))) \
    +            .withColumn('f4_f2', f4(col('f2'))) \
    +            .withColumn('f4_f3', f4(col('f3'))) \
    +            .withColumn('f3_f2_f1', f3(col('f2_f1'))) \
    +            .withColumn('f4_f2_f1', f4(col('f2_f1'))) \
    +            .withColumn('f4_f3_f1', f4(col('f3_f1'))) \
    +            .withColumn('f4_f3_f2', f4(col('f3_f2'))) \
    +            .withColumn('f4_f3_f2_f1', f4(col('f3_f2_f1')))
    +
    +        # Test mixed udfs in a single expression
    +        df2 = df \
    +            .withColumn('f1', f1(col('v'))) \
    +            .withColumn('f2', f2(col('v'))) \
    +            .withColumn('f3', f3(col('v'))) \
    +            .withColumn('f4', f4(col('v'))) \
    +            .withColumn('f2_f1', f2(f1(col('v')))) \
    +            .withColumn('f3_f1', f3(f1(col('v')))) \
    +            .withColumn('f4_f1', f4(f1(col('v')))) \
    +            .withColumn('f3_f2', f3(f2(col('v')))) \
    +            .withColumn('f4_f2', f4(f2(col('v')))) \
    +            .withColumn('f4_f3', f4(f3(col('v')))) \
    +            .withColumn('f3_f2_f1', f3(f2(f1(col('v'))))) \
    +            .withColumn('f4_f2_f1', f4(f2(f1(col('v'))))) \
    +            .withColumn('f4_f3_f1', f4(f3(f1(col('v'))))) \
    +            .withColumn('f4_f3_f2', f4(f3(f2(col('v'))))) \
    +            .withColumn('f4_f3_f2_f1', f4(f3(f2(f1(col('v'))))))
    +
    +        # expected result
    +        df3 = df \
    +            .withColumn('f1', df['v'] + 1) \
    +            .withColumn('f2', df['v'] + 10) \
    +            .withColumn('f3', df['v'] + 100) \
    +            .withColumn('f4', df['v'] + 1000) \
    +            .withColumn('f2_f1', df['v'] + 11) \
    +            .withColumn('f3_f1', df['v'] + 101) \
    +            .withColumn('f4_f1', df['v'] + 1001) \
    +            .withColumn('f3_f2', df['v'] + 110) \
    +            .withColumn('f4_f2', df['v'] + 1010) \
    +            .withColumn('f4_f3', df['v'] + 1100) \
    +            .withColumn('f3_f2_f1', df['v'] + 111) \
    +            .withColumn('f4_f2_f1', df['v'] + 1011) \
    +            .withColumn('f4_f3_f1', df['v'] + 1101) \
    +            .withColumn('f4_f3_f2', df['v'] + 1110) \
    +            .withColumn('f4_f3_f2_f1', df['v'] + 1111)
    +
    +        self.assertEquals(df3.collect(), df1.collect())
    +        self.assertEquals(df3.collect(), df2.collect())
    +
    +    def test_mixed_udf_and_sql(self):
    +        import pandas as pd
    +        from pyspark.sql.functions import udf, pandas_udf
    +
    +        df = self.spark.range(0, 1).toDF('v')
    +
    +        # Test mixture of UDFs, Pandas UDFs and SQL expression.
    +
    +        @udf('int')
    +        def f1(x):
    +            assert type(x) == int
    +            return x + 1
    +
    +        def f2(x):
    +            return x + 10
    +
    +        @pandas_udf('int')
    +        def f3(x):
    +            assert type(x) == pd.Series
    +            return x + 100
    +
    +        df1 = df.withColumn('f1', f1(df['v'])) \
    +            .withColumn('f2', f2(df['v'])) \
    +            .withColumn('f3', f3(df['v'])) \
    +            .withColumn('f1_f2', f1(f2(df['v']))) \
    +            .withColumn('f1_f3', f1(f3(df['v']))) \
    +            .withColumn('f2_f1', f2(f1(df['v']))) \
    +            .withColumn('f2_f3', f2(f3(df['v']))) \
    +            .withColumn('f3_f1', f3(f1(df['v']))) \
    --- End diff --
    
    It was discussed here https://github.com/apache/spark/pull/21845


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21650#discussion_r205448677
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala ---
    @@ -94,36 +95,94 @@ object ExtractPythonUDFFromAggregate extends Rule[LogicalPlan] {
      */
     object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper {
     
    -  private def hasPythonUDF(e: Expression): Boolean = {
    +  private case class LazyEvalType(var evalType: Int = -1) {
    +
    +    def isSet: Boolean = evalType >= 0
    +
    +    def set(evalType: Int): Unit = {
    +      if (isSet) {
    +        throw new IllegalStateException("Eval type has already been set")
    +      } else {
    +        this.evalType = evalType
    +      }
    +    }
    +
    +    def get(): Int = {
    +      if (!isSet) {
    +        throw new IllegalStateException("Eval type is not set")
    +      } else {
    +        evalType
    +      }
    +    }
    +  }
    +
    +  private def hasScalarPythonUDF(e: Expression): Boolean = {
         e.find(PythonUDF.isScalarPythonUDF).isDefined
       }
     
    -  private def canEvaluateInPython(e: PythonUDF): Boolean = {
    -    e.children match {
    -      // single PythonUDF child could be chained and evaluated in Python
    -      case Seq(u: PythonUDF) => canEvaluateInPython(u)
    -      // Python UDF can't be evaluated directly in JVM
    -      case children => !children.exists(hasPythonUDF)
    +  /**
    +   * Check whether a PythonUDF expression can be evaluated in Python.
    +   *
    +   * If the lazy eval type is not set, this method checks for either Batched Python UDF and Scalar
    +   * Pandas UDF. If the lazy eval type is set, this method checks for the expression of the
    +   * specified eval type.
    +   *
    +   * This method will also set the lazy eval type to be the type of the first evaluable expression,
    +   * i.e., if lazy eval type is not set and we find a evaluable Python UDF expression, lazy eval
    +   * type will be set to the eval type of the expression.
    +   *
    +   */
    +  private def canEvaluateInPython(e: PythonUDF, lazyEvalType: LazyEvalType): Boolean = {
    --- End diff --
    
    I applied you new code but the test I mentioned above still fails.
    
    I think the issue could be when visiting `f2(f1(col('v')))`, firstEvalType is set to Scalar Pandas first and isn't set to Batched SQL later so f1 is not extracted. It's possible that my code is still different than yours somehow.
    
    But similar to https://github.com/apache/spark/pull/21650#issuecomment-407951457, I think the state machine of the eval type holder object here is fairly complicated (i.e., what is the expected state of the eval type holder and what's the invariance of the algo) with your suggested implementation and I found myself think pretty hard to prove the state machine is correct in all cases. If we want to go with this implementation, we need to carefully think about it and explain it in code...
    
    The lazyEvalType implementation is better IMHO because the state machine is simpler - lazyEvalType is empty until we find the first evaluable UDF and the value doesn't change after we find the first UDF.
    
    The first implementation (two pass, immutable state) is probably the simplest in terms of the mental complexity of the algo but is less efficient. 
    
    I think I am ok with both immutable state or the lazy state. I think @HyukjinKwon prefers the immutable state one. @BryanCutler WDYT?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21650#discussion_r199204000
  
    --- Diff: python/pyspark/sql/tests.py ---
    @@ -5060,6 +5049,138 @@ def test_type_annotation(self):
             df = self.spark.range(1).select(pandas_udf(f=_locals['noop'], returnType='bigint')('id'))
             self.assertEqual(df.first()[0], 0)
     
    +    def test_mixed_udf(self):
    +        import pandas as pd
    +        from pyspark.sql.functions import udf, pandas_udf
    +
    +        df = self.spark.range(0, 1).toDF('v')
    +
    +        @udf('int')
    +        def f1(x):
    +            assert type(x) == int
    +            return x + 1
    +
    +        @pandas_udf('int')
    +        def f2(x):
    +            assert type(x) == pd.Series
    +            return x + 10
    +
    +        @udf('int')
    +        def f3(x):
    +            assert type(x) == int
    +            return x + 100
    +
    +        @pandas_udf('int')
    +        def f4(x):
    +            assert type(x) == pd.Series
    +            return x + 1000
    +
    +        # Test mixed udfs in a single projection
    +        df1 = df.withColumn('f1', f1(df['v']))
    +        df1 = df1.withColumn('f2', f2(df1['v']))
    +        df1 = df1.withColumn('f3', f3(df1['v']))
    +        df1 = df1.withColumn('f4', f4(df1['v']))
    +        df1 = df1.withColumn('f2_f1', f2(df1['f1']))
    +        df1 = df1.withColumn('f3_f1', f3(df1['f1']))
    +        df1 = df1.withColumn('f4_f1', f4(df1['f1']))
    +        df1 = df1.withColumn('f3_f2', f3(df1['f2']))
    +        df1 = df1.withColumn('f4_f2', f4(df1['f2']))
    +        df1 = df1.withColumn('f4_f3', f4(df1['f3']))
    +        df1 = df1.withColumn('f3_f2_f1', f3(df1['f2_f1']))
    +        df1 = df1.withColumn('f4_f2_f1', f4(df1['f2_f1']))
    +        df1 = df1.withColumn('f4_f3_f1', f4(df1['f3_f1']))
    +        df1 = df1.withColumn('f4_f3_f2', f4(df1['f3_f2']))
    +        df1 = df1.withColumn('f4_f3_f2_f1', f4(df1['f3_f2_f1']))
    +
    +        # Test mixed udfs in a single expression
    +        df2 = df.withColumn('f1', f1(df['v']))
    +        df2 = df2.withColumn('f2', f2(df['v']))
    +        df2 = df2.withColumn('f3', f3(df['v']))
    +        df2 = df2.withColumn('f4', f4(df['v']))
    +        df2 = df2.withColumn('f2_f1', f2(f1(df['v'])))
    +        df2 = df2.withColumn('f3_f1', f3(f1(df['v'])))
    +        df2 = df2.withColumn('f4_f1', f4(f1(df['v'])))
    +        df2 = df2.withColumn('f3_f2', f3(f2(df['v'])))
    +        df2 = df2.withColumn('f4_f2', f4(f2(df['v'])))
    +        df2 = df2.withColumn('f4_f3', f4(f3(df['v'])))
    +        df2 = df2.withColumn('f3_f2_f1', f3(f2(f1(df['v']))))
    +        df2 = df2.withColumn('f4_f2_f1', f4(f2(f1(df['v']))))
    +        df2 = df2.withColumn('f4_f3_f1', f4(f3(f1(df['v']))))
    +        df2 = df2.withColumn('f4_f3_f2', f4(f3(f2(df['v']))))
    +        df2 = df2.withColumn('f4_f3_f2_f1', f4(f3(f2(f1(df['v'])))))
    +
    +        df3 = df.withColumn('f1', df['v'] + 1)
    +        df3 = df3.withColumn('f2', df['v'] + 10)
    +        df3 = df3.withColumn('f3', df['v'] + 100)
    +        df3 = df3.withColumn('f4', df['v'] + 1000)
    +        df3 = df3.withColumn('f2_f1', df['v'] + 11)
    +        df3 = df3.withColumn('f3_f1', df['v'] + 101)
    +        df3 = df3.withColumn('f4_f1', df['v'] + 1001)
    +        df3 = df3.withColumn('f3_f2', df['v'] + 110)
    +        df3 = df3.withColumn('f4_f2', df['v'] + 1010)
    +        df3 = df3.withColumn('f4_f3', df['v'] + 1100)
    +        df3 = df3.withColumn('f3_f2_f1', df['v'] + 111)
    +        df3 = df3.withColumn('f4_f2_f1', df['v'] + 1011)
    +        df3 = df3.withColumn('f4_f3_f1', df['v'] + 1101)
    +        df3 = df3.withColumn('f4_f3_f2', df['v'] + 1110)
    +        df3 = df3.withColumn('f4_f3_f2_f1', df['v'] + 1111)
    +
    +        self.assertEquals(df3.collect(), df1.collect())
    +        self.assertEquals(df3.collect(), df2.collect())
    +
    +    def test_mixed_udf_and_sql(self):
    +        import pandas as pd
    +        from pyspark.sql.functions import udf, pandas_udf
    +
    +        df = self.spark.range(0, 1).toDF('v')
    +
    +        @udf('int')
    +        def f1(x):
    +            assert type(x) == int
    +            return x + 1
    +
    +        def f2(x):
    --- End diff --
    
    Added comments in test


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21650#discussion_r199020532
  
    --- Diff: python/pyspark/sql/tests.py ---
    @@ -5060,6 +5049,138 @@ def test_type_annotation(self):
             df = self.spark.range(1).select(pandas_udf(f=_locals['noop'], returnType='bigint')('id'))
             self.assertEqual(df.first()[0], 0)
     
    +    def test_mixed_udf(self):
    +        import pandas as pd
    +        from pyspark.sql.functions import udf, pandas_udf
    +
    +        df = self.spark.range(0, 1).toDF('v')
    +
    +        @udf('int')
    +        def f1(x):
    +            assert type(x) == int
    +            return x + 1
    +
    +        @pandas_udf('int')
    +        def f2(x):
    +            assert type(x) == pd.Series
    +            return x + 10
    +
    +        @udf('int')
    +        def f3(x):
    +            assert type(x) == int
    +            return x + 100
    +
    +        @pandas_udf('int')
    +        def f4(x):
    +            assert type(x) == pd.Series
    +            return x + 1000
    +
    +        # Test mixed udfs in a single projection
    +        df1 = df.withColumn('f1', f1(df['v']))
    +        df1 = df1.withColumn('f2', f2(df1['v']))
    +        df1 = df1.withColumn('f3', f3(df1['v']))
    +        df1 = df1.withColumn('f4', f4(df1['v']))
    +        df1 = df1.withColumn('f2_f1', f2(df1['f1']))
    +        df1 = df1.withColumn('f3_f1', f3(df1['f1']))
    +        df1 = df1.withColumn('f4_f1', f4(df1['f1']))
    +        df1 = df1.withColumn('f3_f2', f3(df1['f2']))
    +        df1 = df1.withColumn('f4_f2', f4(df1['f2']))
    +        df1 = df1.withColumn('f4_f3', f4(df1['f3']))
    +        df1 = df1.withColumn('f3_f2_f1', f3(df1['f2_f1']))
    +        df1 = df1.withColumn('f4_f2_f1', f4(df1['f2_f1']))
    +        df1 = df1.withColumn('f4_f3_f1', f4(df1['f3_f1']))
    +        df1 = df1.withColumn('f4_f3_f2', f4(df1['f3_f2']))
    +        df1 = df1.withColumn('f4_f3_f2_f1', f4(df1['f3_f2_f1']))
    +
    +        # Test mixed udfs in a single expression
    +        df2 = df.withColumn('f1', f1(df['v']))
    +        df2 = df2.withColumn('f2', f2(df['v']))
    +        df2 = df2.withColumn('f3', f3(df['v']))
    +        df2 = df2.withColumn('f4', f4(df['v']))
    +        df2 = df2.withColumn('f2_f1', f2(f1(df['v'])))
    +        df2 = df2.withColumn('f3_f1', f3(f1(df['v'])))
    +        df2 = df2.withColumn('f4_f1', f4(f1(df['v'])))
    +        df2 = df2.withColumn('f3_f2', f3(f2(df['v'])))
    +        df2 = df2.withColumn('f4_f2', f4(f2(df['v'])))
    +        df2 = df2.withColumn('f4_f3', f4(f3(df['v'])))
    +        df2 = df2.withColumn('f3_f2_f1', f3(f2(f1(df['v']))))
    +        df2 = df2.withColumn('f4_f2_f1', f4(f2(f1(df['v']))))
    +        df2 = df2.withColumn('f4_f3_f1', f4(f3(f1(df['v']))))
    +        df2 = df2.withColumn('f4_f3_f2', f4(f3(f2(df['v']))))
    +        df2 = df2.withColumn('f4_f3_f2_f1', f4(f3(f2(f1(df['v'])))))
    +
    +        df3 = df.withColumn('f1', df['v'] + 1)
    +        df3 = df3.withColumn('f2', df['v'] + 10)
    +        df3 = df3.withColumn('f3', df['v'] + 100)
    +        df3 = df3.withColumn('f4', df['v'] + 1000)
    +        df3 = df3.withColumn('f2_f1', df['v'] + 11)
    +        df3 = df3.withColumn('f3_f1', df['v'] + 101)
    +        df3 = df3.withColumn('f4_f1', df['v'] + 1001)
    +        df3 = df3.withColumn('f3_f2', df['v'] + 110)
    +        df3 = df3.withColumn('f4_f2', df['v'] + 1010)
    +        df3 = df3.withColumn('f4_f3', df['v'] + 1100)
    +        df3 = df3.withColumn('f3_f2_f1', df['v'] + 111)
    +        df3 = df3.withColumn('f4_f2_f1', df['v'] + 1011)
    +        df3 = df3.withColumn('f4_f3_f1', df['v'] + 1101)
    +        df3 = df3.withColumn('f4_f3_f2', df['v'] + 1110)
    +        df3 = df3.withColumn('f4_f3_f2_f1', df['v'] + 1111)
    --- End diff --
    
    so df3 is the expected values?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    ehh .. @BryanCutler, WDYT about just doing the previous one for now? The approach you suggested sounds efficient of course but.. here's not a hot path so I think the previous way is fine too .. since that's a bit cleaner (but a bit less efficient), and partly the code freeze is close. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    **[Test build #93450 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93450/testReport)** for PR 21650 at commit [`78f2ebf`](https://github.com/apache/spark/commit/78f2ebf3b11fe8849fe0d41300f74319ca174d42).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1402/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1422/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21650#discussion_r204482591
  
    --- Diff: python/pyspark/sql/tests.py ---
    @@ -5060,6 +5049,144 @@ def test_type_annotation(self):
             df = self.spark.range(1).select(pandas_udf(f=_locals['noop'], returnType='bigint')('id'))
             self.assertEqual(df.first()[0], 0)
     
    +    def test_mixed_udf(self):
    +        import pandas as pd
    +        from pyspark.sql.functions import udf, pandas_udf
    +
    +        df = self.spark.range(0, 1).toDF('v')
    +
    +        # Test mixture of multiple UDFs and Pandas UDFs
    +
    +        @udf('int')
    +        def f1(x):
    +            assert type(x) == int
    +            return x + 1
    +
    +        @pandas_udf('int')
    +        def f2(x):
    +            assert type(x) == pd.Series
    +            return x + 10
    +
    +        @udf('int')
    +        def f3(x):
    +            assert type(x) == int
    +            return x + 100
    +
    +        @pandas_udf('int')
    +        def f4(x):
    +            assert type(x) == pd.Series
    +            return x + 1000
    +
    +        # Test mixed udfs in a single projection
    +        df1 = df.withColumn('f1', f1(df['v']))
    +        df1 = df1.withColumn('f2', f2(df1['v']))
    +        df1 = df1.withColumn('f3', f3(df1['v']))
    +        df1 = df1.withColumn('f4', f4(df1['v']))
    +        df1 = df1.withColumn('f2_f1', f2(df1['f1']))
    +        df1 = df1.withColumn('f3_f1', f3(df1['f1']))
    +        df1 = df1.withColumn('f4_f1', f4(df1['f1']))
    +        df1 = df1.withColumn('f3_f2', f3(df1['f2']))
    +        df1 = df1.withColumn('f4_f2', f4(df1['f2']))
    +        df1 = df1.withColumn('f4_f3', f4(df1['f3']))
    +        df1 = df1.withColumn('f3_f2_f1', f3(df1['f2_f1']))
    +        df1 = df1.withColumn('f4_f2_f1', f4(df1['f2_f1']))
    +        df1 = df1.withColumn('f4_f3_f1', f4(df1['f3_f1']))
    +        df1 = df1.withColumn('f4_f3_f2', f4(df1['f3_f2']))
    +        df1 = df1.withColumn('f4_f3_f2_f1', f4(df1['f3_f2_f1']))
    +
    +        # Test mixed udfs in a single expression
    +        df2 = df.withColumn('f1', f1(df['v']))
    +        df2 = df2.withColumn('f2', f2(df['v']))
    +        df2 = df2.withColumn('f3', f3(df['v']))
    +        df2 = df2.withColumn('f4', f4(df['v']))
    +        df2 = df2.withColumn('f2_f1', f2(f1(df['v'])))
    +        df2 = df2.withColumn('f3_f1', f3(f1(df['v'])))
    +        df2 = df2.withColumn('f4_f1', f4(f1(df['v'])))
    +        df2 = df2.withColumn('f3_f2', f3(f2(df['v'])))
    +        df2 = df2.withColumn('f4_f2', f4(f2(df['v'])))
    +        df2 = df2.withColumn('f4_f3', f4(f3(df['v'])))
    +        df2 = df2.withColumn('f3_f2_f1', f3(f2(f1(df['v']))))
    +        df2 = df2.withColumn('f4_f2_f1', f4(f2(f1(df['v']))))
    +        df2 = df2.withColumn('f4_f3_f1', f4(f3(f1(df['v']))))
    +        df2 = df2.withColumn('f4_f3_f2', f4(f3(f2(df['v']))))
    +        df2 = df2.withColumn('f4_f3_f2_f1', f4(f3(f2(f1(df['v'])))))
    +
    +        # expected result
    +        df3 = df.withColumn('f1', df['v'] + 1)
    +        df3 = df3.withColumn('f2', df['v'] + 10)
    +        df3 = df3.withColumn('f3', df['v'] + 100)
    +        df3 = df3.withColumn('f4', df['v'] + 1000)
    +        df3 = df3.withColumn('f2_f1', df['v'] + 11)
    +        df3 = df3.withColumn('f3_f1', df['v'] + 101)
    +        df3 = df3.withColumn('f4_f1', df['v'] + 1001)
    +        df3 = df3.withColumn('f3_f2', df['v'] + 110)
    +        df3 = df3.withColumn('f4_f2', df['v'] + 1010)
    +        df3 = df3.withColumn('f4_f3', df['v'] + 1100)
    +        df3 = df3.withColumn('f3_f2_f1', df['v'] + 111)
    +        df3 = df3.withColumn('f4_f2_f1', df['v'] + 1011)
    +        df3 = df3.withColumn('f4_f3_f1', df['v'] + 1101)
    +        df3 = df3.withColumn('f4_f3_f2', df['v'] + 1110)
    +        df3 = df3.withColumn('f4_f3_f2_f1', df['v'] + 1111)
    +
    +        self.assertEquals(df3.collect(), df1.collect())
    +        self.assertEquals(df3.collect(), df2.collect())
    +
    +    def test_mixed_udf_and_sql(self):
    +        import pandas as pd
    +        from pyspark.sql.functions import udf, pandas_udf
    +
    +        df = self.spark.range(0, 1).toDF('v')
    +
    +        # Test mixture of UDFs, Pandas UDFs and SQL expression.
    +
    +        @udf('int')
    +        def f1(x):
    +            assert type(x) == int
    +            return x + 1
    +
    +        def f2(x):
    +            return x + 10
    +
    +        @pandas_udf('int')
    +        def f3(x):
    +            assert type(x) == pd.Series
    +            return x + 100
    +
    +        df1 = df.withColumn('f1', f1(df['v']))
    +        df1 = df1.withColumn('f2', f2(df['v']))
    +        df1 = df1.withColumn('f3', f3(df['v']))
    +        df1 = df1.withColumn('f1_f2', f1(f2(df['v'])))
    +        df1 = df1.withColumn('f1_f3', f1(f3(df['v'])))
    +        df1 = df1.withColumn('f2_f1', f2(f1(df['v'])))
    +        df1 = df1.withColumn('f2_f3', f2(f3(df['v'])))
    +        df1 = df1.withColumn('f3_f1', f3(f1(df['v'])))
    +        df1 = df1.withColumn('f3_f2', f3(f2(df['v'])))
    +        df1 = df1.withColumn('f1_f2_f3', f1(f2(f3(df['v']))))
    +        df1 = df1.withColumn('f1_f3_f2', f1(f3(f2(df['v']))))
    +        df1 = df1.withColumn('f2_f1_f3', f2(f1(f3(df['v']))))
    +        df1 = df1.withColumn('f2_f3_f1', f2(f3(f1(df['v']))))
    +        df1 = df1.withColumn('f3_f1_f2', f3(f1(f2(df['v']))))
    +        df1 = df1.withColumn('f3_f2_f1', f3(f2(f1(df['v']))))
    +
    +        # expected result
    +        df2 = df.withColumn('f1', df['v'] + 1)
    +        df2 = df2.withColumn('f2', df['v'] + 10)
    +        df2 = df2.withColumn('f3', df['v'] + 100)
    +        df2 = df2.withColumn('f1_f2', df['v'] + 11)
    +        df2 = df2.withColumn('f1_f3', df['v'] + 101)
    +        df2 = df2.withColumn('f2_f1', df['v'] + 11)
    +        df2 = df2.withColumn('f2_f3', df['v'] + 110)
    +        df2 = df2.withColumn('f3_f1', df['v'] + 101)
    +        df2 = df2.withColumn('f3_f2', df['v'] + 110)
    +        df2 = df2.withColumn('f1_f2_f3', df['v'] + 111)
    +        df2 = df2.withColumn('f1_f3_f2', df['v'] + 111)
    +        df2 = df2.withColumn('f2_f1_f3', df['v'] + 111)
    +        df2 = df2.withColumn('f2_f3_f1', df['v'] + 111)
    +        df2 = df2.withColumn('f3_f1_f2', df['v'] + 111)
    +        df2 = df2.withColumn('f3_f2_f1', df['v'] + 111)
    +
    +        self.assertEquals(df2.collect(), df1.collect())
    --- End diff --
    
    I chained `withColumn` together instead of reassigning DataFrames. How does it look now?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    Hm, then how about giving a try in a followup @BryanCutler if you see some values on it?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    **[Test build #93688 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93688/testReport)** for PR 21650 at commit [`f3a45a5`](https://github.com/apache/spark/commit/f3a45a576b6a186f3694e6bd0f22a8198a9d19a2).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    I gave it a shot to extract the UDFs in one traversal, using the first occurrence of either pandas or batch udf.  I think it's much clearer
    
    ```scala
    object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper {
    
      private class FirstEvalType() {
        var evalType = -1
        def isEvalTypeSet(): Boolean = evalType >= 0
      }
    
      private def canEvaluateInPython(e: PythonUDF, firstEvalType: FirstEvalType): Boolean = {
        if (firstEvalType.isEvalTypeSet() && e.evalType != firstEvalType.evalType) {
          false
        } else {
          firstEvalType.evalType = e.evalType
          e.children match {
            // single PythonUDF child could be chained and evaluated in Python
            case Seq(u: PythonUDF) => canEvaluateInPython(u, firstEvalType)
            // Python UDF can't be evaluated directly in JVM
            case children => !children.exists(hasScalarPythonUDF)
          }
        }
      }
    
      private def collectEvaluableUDFs(expr: Expression, firstEvalType: FirstEvalType): Seq[PythonUDF] = expr match {
        case udf: PythonUDF if PythonUDF.isScalarPythonUDF(udf) && canEvaluateInPython(udf, firstEvalType) =>
          Seq(udf)
        case e => e.children.flatMap(collectEvaluableUDFs(_, firstEvalType))
      }
    
      private def extract(plan: SparkPlan): SparkPlan = {
        val udfs = plan.expressions.flatMap(collectEvaluableUDFs(_, new FirstEvalType))
        ...
    ```
    
    This does pass around a mutable object, but I guess you could do about the same using an Option that gets returned, but that might not look as nice.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21650#discussion_r205024958
  
    --- Diff: python/pyspark/sql/tests.py ---
    @@ -5060,6 +5049,147 @@ def test_type_annotation(self):
             df = self.spark.range(1).select(pandas_udf(f=_locals['noop'], returnType='bigint')('id'))
             self.assertEqual(df.first()[0], 0)
     
    +    def test_mixed_udf(self):
    +        import pandas as pd
    +        from pyspark.sql.functions import col, udf, pandas_udf
    +
    +        df = self.spark.range(0, 1).toDF('v')
    +
    +        # Test mixture of multiple UDFs and Pandas UDFs
    +
    +        @udf('int')
    +        def f1(x):
    +            assert type(x) == int
    +            return x + 1
    +
    +        @pandas_udf('int')
    +        def f2(x):
    +            assert type(x) == pd.Series
    +            return x + 10
    +
    +        @udf('int')
    +        def f3(x):
    +            assert type(x) == int
    +            return x + 100
    +
    +        @pandas_udf('int')
    +        def f4(x):
    +            assert type(x) == pd.Series
    +            return x + 1000
    +
    +        # Test mixed udfs in a single projection
    +        df1 = df \
    +            .withColumn('f1', f1(col('v'))) \
    +            .withColumn('f2', f2(col('v'))) \
    +            .withColumn('f3', f3(col('v'))) \
    +            .withColumn('f4', f4(col('v'))) \
    +            .withColumn('f2_f1', f2(col('f1'))) \
    +            .withColumn('f3_f1', f3(col('f1'))) \
    --- End diff --
    
    This looks testing udf + udf


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21650#discussion_r205146857
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala ---
    @@ -94,36 +95,94 @@ object ExtractPythonUDFFromAggregate extends Rule[LogicalPlan] {
      */
     object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper {
     
    -  private def hasPythonUDF(e: Expression): Boolean = {
    +  private case class LazyEvalType(var evalType: Int = -1) {
    --- End diff --
    
    hmmmmm looks messier then I thought .. previous one looks a bit better to me .. wdyt @BryanCutler ?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21650#discussion_r202861241
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala ---
    @@ -94,36 +95,59 @@ object ExtractPythonUDFFromAggregate extends Rule[LogicalPlan] {
      */
     object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper {
     
    -  private def hasPythonUDF(e: Expression): Boolean = {
    +  private def hasScalarPythonUDF(e: Expression): Boolean = {
         e.find(PythonUDF.isScalarPythonUDF).isDefined
       }
     
    -  private def canEvaluateInPython(e: PythonUDF): Boolean = {
    -    e.children match {
    -      // single PythonUDF child could be chained and evaluated in Python
    -      case Seq(u: PythonUDF) => canEvaluateInPython(u)
    -      // Python UDF can't be evaluated directly in JVM
    -      case children => !children.exists(hasPythonUDF)
    +  private def canEvaluateInPython(e: PythonUDF, evalType: Int): Boolean = {
    +    if (e.evalType != evalType) {
    +      false
    +    } else {
    +      e.children match {
    +        // single PythonUDF child could be chained and evaluated in Python
    +        case Seq(u: PythonUDF) => canEvaluateInPython(u, evalType)
    +        // Python UDF can't be evaluated directly in JVM
    +        case children => !children.exists(hasScalarPythonUDF)
    +      }
         }
       }
     
    -  private def collectEvaluatableUDF(expr: Expression): Seq[PythonUDF] = expr match {
    -    case udf: PythonUDF if PythonUDF.isScalarPythonUDF(udf) && canEvaluateInPython(udf) => Seq(udf)
    -    case e => e.children.flatMap(collectEvaluatableUDF)
    +  private def collectEvaluableUDF(expr: Expression, evalType: Int): Seq[PythonUDF] = expr match {
    +    case udf: PythonUDF if PythonUDF.isScalarPythonUDF(udf) && canEvaluateInPython(udf, evalType) =>
    +      Seq(udf)
    +    case e => e.children.flatMap(collectEvaluableUDF(_, evalType))
    +  }
    +
    +  /**
    +   * Collect evaluable UDFs from the current node.
    +   *
    +   * This function collects Python UDFs or Scalar Python UDFs from expressions of the input node,
    +   * and returns a list of UDFs of the same eval type.
    +   *
    +   * If expressions contain both UDFs eval types, this function will only return Python UDFs.
    +   *
    +   * The caller should call this function multiple times until all evaluable UDFs are collected.
    +   */
    +  private def collectEvaluableUDFs(plan: SparkPlan): Seq[PythonUDF] = {
    +    val pythonUDFs =
    +      plan.expressions.flatMap(collectEvaluableUDF(_, PythonEvalType.SQL_BATCHED_UDF))
    +
    +    if (pythonUDFs.isEmpty) {
    +      plan.expressions.flatMap(collectEvaluableUDF(_, PythonEvalType.SQL_SCALAR_PANDAS_UDF))
    +    } else {
    +      pythonUDFs
    --- End diff --
    
    I think it would be better to loop through the expressions and find the first scalar python udf, either `SQL_BATCHED_UDF` or `SQL_SCALAR_PANDAS_UDF` and then collect the rest of that type.  This is really what is happening here so I think it would be more straightforward to do this in a single loop instead of 2 `flatMaps`.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21650#discussion_r204429892
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala ---
    @@ -94,36 +95,59 @@ object ExtractPythonUDFFromAggregate extends Rule[LogicalPlan] {
      */
     object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper {
     
    -  private def hasPythonUDF(e: Expression): Boolean = {
    +  private def hasScalarPythonUDF(e: Expression): Boolean = {
         e.find(PythonUDF.isScalarPythonUDF).isDefined
       }
     
    -  private def canEvaluateInPython(e: PythonUDF): Boolean = {
    -    e.children match {
    -      // single PythonUDF child could be chained and evaluated in Python
    -      case Seq(u: PythonUDF) => canEvaluateInPython(u)
    -      // Python UDF can't be evaluated directly in JVM
    -      case children => !children.exists(hasPythonUDF)
    +  private def canEvaluateInPython(e: PythonUDF, evalType: Int): Boolean = {
    +    if (e.evalType != evalType) {
    +      false
    +    } else {
    +      e.children match {
    +        // single PythonUDF child could be chained and evaluated in Python
    +        case Seq(u: PythonUDF) => canEvaluateInPython(u, evalType)
    +        // Python UDF can't be evaluated directly in JVM
    +        case children => !children.exists(hasScalarPythonUDF)
    +      }
         }
       }
     
    -  private def collectEvaluatableUDF(expr: Expression): Seq[PythonUDF] = expr match {
    -    case udf: PythonUDF if PythonUDF.isScalarPythonUDF(udf) && canEvaluateInPython(udf) => Seq(udf)
    -    case e => e.children.flatMap(collectEvaluatableUDF)
    +  private def collectEvaluableUDF(expr: Expression, evalType: Int): Seq[PythonUDF] = expr match {
    +    case udf: PythonUDF if PythonUDF.isScalarPythonUDF(udf) && canEvaluateInPython(udf, evalType) =>
    +      Seq(udf)
    +    case e => e.children.flatMap(collectEvaluableUDF(_, evalType))
    +  }
    +
    +  /**
    +   * Collect evaluable UDFs from the current node.
    +   *
    +   * This function collects Python UDFs or Scalar Python UDFs from expressions of the input node,
    +   * and returns a list of UDFs of the same eval type.
    +   *
    +   * If expressions contain both UDFs eval types, this function will only return Python UDFs.
    +   *
    +   * The caller should call this function multiple times until all evaluable UDFs are collected.
    +   */
    +  private def collectEvaluableUDFs(plan: SparkPlan): Seq[PythonUDF] = {
    +    val pythonUDFs =
    +      plan.expressions.flatMap(collectEvaluableUDF(_, PythonEvalType.SQL_BATCHED_UDF))
    +
    +    if (pythonUDFs.isEmpty) {
    +      plan.expressions.flatMap(collectEvaluableUDF(_, PythonEvalType.SQL_SCALAR_PANDAS_UDF))
    +    } else {
    +      pythonUDFs
    --- End diff --
    
    What you said makes sense and that's actually my first attempt but end up being pretty complicated. The issue is that it is hard to do a one traversal of the expression tree to find the UDFs because we need to pass the evalType to all subtree and the result of one subtree can affect the result of another (i.e, if we find one type of UDF in one subtree, we need to pass the type to all other subtree because they must agree on evalType), this makes the code more complicated...
    
    Another way is to do two traversals where in the first traversal, we look for eval type and in the second traversal, we look for UDFs of the eval type, but this isn't much different from what I have now in terms of efficiency and I find the current logic is simpler and less likely to have bugs. I actually tried these approaches and found the current way to be the easiest to implement and least likely to have bugs.
    
    WDYT?



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21650#discussion_r199051146
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala ---
    @@ -166,8 +190,9 @@ object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper {
                   ArrowEvalPythonExec(vectorizedUdfs, child.output ++ resultAttrs, child)
                 case (vectorizedUdfs, plainUdfs) if vectorizedUdfs.isEmpty =>
                   BatchEvalPythonExec(plainUdfs, child.output ++ resultAttrs, child)
    -            case _ =>
    -              throw new IllegalArgumentException("Can not mix vectorized and non-vectorized UDFs")
    +            case (vectorizedUdfs, plainUdfs) =>
    --- End diff --
    
    `case _ =>` should work?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624] Support mixture of Python UDF and Scalar P...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    **[Test build #92401 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92401/testReport)** for PR 21650 at commit [`be3b99c`](https://github.com/apache/spark/commit/be3b99c951c3df77eace0a6a124f8f9a94ac804c).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    **[Test build #93686 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93686/testReport)** for PR 21650 at commit [`8e995e8`](https://github.com/apache/spark/commit/8e995e81542852ff4af43883db79cdfbe9aca1ad).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21650#discussion_r205127465
  
    --- Diff: python/pyspark/sql/tests.py ---
    @@ -5060,6 +5049,147 @@ def test_type_annotation(self):
             df = self.spark.range(1).select(pandas_udf(f=_locals['noop'], returnType='bigint')('id'))
             self.assertEqual(df.first()[0], 0)
     
    +    def test_mixed_udf(self):
    +        import pandas as pd
    +        from pyspark.sql.functions import col, udf, pandas_udf
    +
    +        df = self.spark.range(0, 1).toDF('v')
    +
    +        # Test mixture of multiple UDFs and Pandas UDFs
    +
    +        @udf('int')
    +        def f1(x):
    +            assert type(x) == int
    +            return x + 1
    +
    +        @pandas_udf('int')
    +        def f2(x):
    +            assert type(x) == pd.Series
    +            return x + 10
    +
    +        @udf('int')
    +        def f3(x):
    +            assert type(x) == int
    +            return x + 100
    +
    +        @pandas_udf('int')
    +        def f4(x):
    +            assert type(x) == pd.Series
    +            return x + 1000
    +
    +        # Test mixed udfs in a single projection
    +        df1 = df \
    +            .withColumn('f1', f1(col('v'))) \
    +            .withColumn('f2', f2(col('v'))) \
    +            .withColumn('f3', f3(col('v'))) \
    +            .withColumn('f4', f4(col('v'))) \
    +            .withColumn('f2_f1', f2(col('f1'))) \
    +            .withColumn('f3_f1', f3(col('f1'))) \
    +            .withColumn('f4_f1', f4(col('f1'))) \
    +            .withColumn('f3_f2', f3(col('f2'))) \
    +            .withColumn('f4_f2', f4(col('f2'))) \
    +            .withColumn('f4_f3', f4(col('f3'))) \
    +            .withColumn('f3_f2_f1', f3(col('f2_f1'))) \
    +            .withColumn('f4_f2_f1', f4(col('f2_f1'))) \
    +            .withColumn('f4_f3_f1', f4(col('f3_f1'))) \
    +            .withColumn('f4_f3_f2', f4(col('f3_f2'))) \
    +            .withColumn('f4_f3_f2_f1', f4(col('f3_f2_f1')))
    +
    +        # Test mixed udfs in a single expression
    +        df2 = df \
    +            .withColumn('f1', f1(col('v'))) \
    +            .withColumn('f2', f2(col('v'))) \
    +            .withColumn('f3', f3(col('v'))) \
    +            .withColumn('f4', f4(col('v'))) \
    +            .withColumn('f2_f1', f2(f1(col('v')))) \
    +            .withColumn('f3_f1', f3(f1(col('v')))) \
    +            .withColumn('f4_f1', f4(f1(col('v')))) \
    +            .withColumn('f3_f2', f3(f2(col('v')))) \
    +            .withColumn('f4_f2', f4(f2(col('v')))) \
    +            .withColumn('f4_f3', f4(f3(col('v')))) \
    +            .withColumn('f3_f2_f1', f3(f2(f1(col('v'))))) \
    +            .withColumn('f4_f2_f1', f4(f2(f1(col('v'))))) \
    +            .withColumn('f4_f3_f1', f4(f3(f1(col('v'))))) \
    +            .withColumn('f4_f3_f2', f4(f3(f2(col('v'))))) \
    +            .withColumn('f4_f3_f2_f1', f4(f3(f2(f1(col('v'))))))
    +
    +        # expected result
    +        df3 = df \
    +            .withColumn('f1', df['v'] + 1) \
    +            .withColumn('f2', df['v'] + 10) \
    +            .withColumn('f3', df['v'] + 100) \
    +            .withColumn('f4', df['v'] + 1000) \
    +            .withColumn('f2_f1', df['v'] + 11) \
    +            .withColumn('f3_f1', df['v'] + 101) \
    +            .withColumn('f4_f1', df['v'] + 1001) \
    +            .withColumn('f3_f2', df['v'] + 110) \
    +            .withColumn('f4_f2', df['v'] + 1010) \
    +            .withColumn('f4_f3', df['v'] + 1100) \
    +            .withColumn('f3_f2_f1', df['v'] + 111) \
    +            .withColumn('f4_f2_f1', df['v'] + 1011) \
    +            .withColumn('f4_f3_f1', df['v'] + 1101) \
    +            .withColumn('f4_f3_f2', df['v'] + 1110) \
    +            .withColumn('f4_f3_f2_f1', df['v'] + 1111)
    +
    +        self.assertEquals(df3.collect(), df1.collect())
    +        self.assertEquals(df3.collect(), df2.collect())
    +
    +    def test_mixed_udf_and_sql(self):
    +        import pandas as pd
    +        from pyspark.sql.functions import udf, pandas_udf
    +
    +        df = self.spark.range(0, 1).toDF('v')
    +
    +        # Test mixture of UDFs, Pandas UDFs and SQL expression.
    +
    +        @udf('int')
    +        def f1(x):
    +            assert type(x) == int
    +            return x + 1
    +
    +        def f2(x):
    +            return x + 10
    +
    +        @pandas_udf('int')
    +        def f3(x):
    +            assert type(x) == pd.Series
    +            return x + 100
    +
    +        df1 = df.withColumn('f1', f1(df['v'])) \
    +            .withColumn('f2', f2(df['v'])) \
    +            .withColumn('f3', f3(df['v'])) \
    +            .withColumn('f1_f2', f1(f2(df['v']))) \
    +            .withColumn('f1_f3', f1(f3(df['v']))) \
    +            .withColumn('f2_f1', f2(f1(df['v']))) \
    +            .withColumn('f2_f3', f2(f3(df['v']))) \
    +            .withColumn('f3_f1', f3(f1(df['v']))) \
    --- End diff --
    
    Yeah, the way the test is written is that I am trying to test many combinations so there are some dup cases. Do you prefer that I remove these?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21650#discussion_r199021023
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala ---
    @@ -94,36 +95,59 @@ object ExtractPythonUDFFromAggregate extends Rule[LogicalPlan] {
      */
     object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper {
     
    -  private def hasPythonUDF(e: Expression): Boolean = {
    +  private def hasScalarPythonUDF(e: Expression): Boolean = {
         e.find(PythonUDF.isScalarPythonUDF).isDefined
       }
     
    -  private def canEvaluateInPython(e: PythonUDF): Boolean = {
    -    e.children match {
    -      // single PythonUDF child could be chained and evaluated in Python
    -      case Seq(u: PythonUDF) => canEvaluateInPython(u)
    -      // Python UDF can't be evaluated directly in JVM
    -      case children => !children.exists(hasPythonUDF)
    +  private def canEvaluateInPython(e: PythonUDF, evalType: Int): Boolean = {
    +    if (e.evalType != evalType) {
    +      false
    +    } else {
    +      e.children match {
    +        // single PythonUDF child could be chained and evaluated in Python
    +        case Seq(u: PythonUDF) => canEvaluateInPython(u, evalType)
    +        // Python UDF can't be evaluated directly in JVM
    +        case children => !children.exists(hasScalarPythonUDF)
    +      }
         }
       }
     
    -  private def collectEvaluatableUDF(expr: Expression): Seq[PythonUDF] = expr match {
    -    case udf: PythonUDF if PythonUDF.isScalarPythonUDF(udf) && canEvaluateInPython(udf) => Seq(udf)
    -    case e => e.children.flatMap(collectEvaluatableUDF)
    +  private def collectEvaluableUDF(expr: Expression, evalType: Int): Seq[PythonUDF] = expr match {
    +    case udf: PythonUDF if PythonUDF.isScalarPythonUDF(udf) && canEvaluateInPython(udf, evalType) =>
    +      Seq(udf)
    +    case e => e.children.flatMap(collectEvaluableUDF(_, evalType))
    +  }
    +
    +  /**
    +   * Collect evaluable UDFs from the current node.
    +   *
    +   * This function collects Python UDFs or Scalar Python UDFs from expressions of the input node,
    +   * and returns a list of UDFs of the same eval type.
    --- End diff --
    
    What happens if the user tries to mix a non-scalar UDF?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21650#discussion_r199051303
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala ---
    @@ -97,6 +103,64 @@ class BatchEvalPythonExecSuite extends SparkPlanTest with SharedSQLContext {
         }
         assert(qualifiedPlanNodes.size == 1)
       }
    +
    +  private def collectPythonExec(spark: SparkPlan): Seq[BatchEvalPythonExec] = spark.collect {
    +    case b: BatchEvalPythonExec => b
    +  }
    +
    +  private def collectPandasExec(spark: SparkPlan): Seq[ArrowEvalPythonExec] = spark.collect {
    --- End diff --
    
    ditto.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624] Support mixture of Python UDF and Scalar P...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21650#discussion_r202863906
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala ---
    @@ -97,6 +103,64 @@ class BatchEvalPythonExecSuite extends SparkPlanTest with SharedSQLContext {
         }
         assert(qualifiedPlanNodes.size == 1)
       }
    +
    +  private def collectPythonExec(plan: SparkPlan): Seq[BatchEvalPythonExec] = plan.collect {
    +    case b: BatchEvalPythonExec => b
    +  }
    +
    +  private def collectPandasExec(plan: SparkPlan): Seq[ArrowEvalPythonExec] = plan.collect {
    +    case b: ArrowEvalPythonExec => b
    +  }
    +
    +  test("Chained Python UDFs should be combined to a single physical node") {
    +    val df = Seq(("Hello", 4)).toDF("a", "b")
    +    val df2 = df.withColumn("c", pythonUDF(col("a"))).withColumn("d", pythonUDF(col("c")))
    +    val pythonEvalNodes = collectPythonExec(df2.queryExecution.executedPlan)
    +    assert(pythonEvalNodes.size == 1)
    +  }
    +
    +  test("Chained Pandas UDFs should be combined to a single physical node") {
    +    val df = Seq(("Hello", 4)).toDF("a", "b")
    +    val df2 = df.withColumn("c", pandasUDF(col("a"))).withColumn("d", pandasUDF(col("c")))
    +    val arrowEvalNodes = collectPandasExec(df2.queryExecution.executedPlan)
    +    assert(arrowEvalNodes.size == 1)
    +  }
    +
    +  test("Mixed Python UDFs and Pandas UDF should be separate physical node") {
    --- End diff --
    
    "Mixed Python Batched UDFs..."


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21650#discussion_r205061160
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala ---
    @@ -94,36 +95,59 @@ object ExtractPythonUDFFromAggregate extends Rule[LogicalPlan] {
      */
     object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper {
     
    -  private def hasPythonUDF(e: Expression): Boolean = {
    +  private def hasScalarPythonUDF(e: Expression): Boolean = {
         e.find(PythonUDF.isScalarPythonUDF).isDefined
       }
     
    -  private def canEvaluateInPython(e: PythonUDF): Boolean = {
    -    e.children match {
    -      // single PythonUDF child could be chained and evaluated in Python
    -      case Seq(u: PythonUDF) => canEvaluateInPython(u)
    -      // Python UDF can't be evaluated directly in JVM
    -      case children => !children.exists(hasPythonUDF)
    +  private def canEvaluateInPython(e: PythonUDF, evalType: Int): Boolean = {
    +    if (e.evalType != evalType) {
    --- End diff --
    
    Can we rename this function or write a comment since Scalar both Vectorized UDF and normal UDF can be evaluated in Python each but it returns `false` in this case?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21650#discussion_r202865674
  
    --- Diff: python/pyspark/sql/tests.py ---
    @@ -5471,6 +5598,22 @@ def foo(_):
                     self.assertEqual(r.a, 'hi')
                     self.assertEqual(r.b, 1)
     
    +    def test_mixed_udf(self):
    +        # Test Pandas UDF and scalar Python UDF followed by groupby apply
    +        from pyspark.sql.functions import udf, pandas_udf, PandasUDFType
    +        import pandas as pd
    +
    +        df = self.spark.range(0, 10).toDF('v1')
    +        df = df.withColumn('v2', udf(lambda x: x + 1, 'int')(df['v1']))
    +        df = df.withColumn('v3', pandas_udf(lambda x: x + 2, 'int')(df['v1']))
    --- End diff --
    
    could you just chain the `withColumn` calls here?  I think it's clearer than reassigning the df each time


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21650#discussion_r205242020
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala ---
    @@ -94,36 +95,94 @@ object ExtractPythonUDFFromAggregate extends Rule[LogicalPlan] {
      */
     object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper {
     
    -  private def hasPythonUDF(e: Expression): Boolean = {
    +  private case class LazyEvalType(var evalType: Int = -1) {
    --- End diff --
    
    Yeah the idea of the LazyEvalType is a container object that can be set once. Maybe the name LazyEvalType is confusing. I don't think CurrentEvalType is accurate either because the original idea is that we don't change the value once it's set. Maybe call it `EvalTypeHolder` and add docs to explain?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    **[Test build #93686 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93686/testReport)** for PR 21650 at commit [`8e995e8`](https://github.com/apache/spark/commit/8e995e81542852ff4af43883db79cdfbe9aca1ad).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/93686/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    LGTM.
    
    Merged to master.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1239/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21650#discussion_r199021120
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala ---
    @@ -166,8 +190,9 @@ object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper {
                   ArrowEvalPythonExec(vectorizedUdfs, child.output ++ resultAttrs, child)
                 case (vectorizedUdfs, plainUdfs) if vectorizedUdfs.isEmpty =>
                   BatchEvalPythonExec(plainUdfs, child.output ++ resultAttrs, child)
    -            case _ =>
    -              throw new IllegalArgumentException("Can not mix vectorized and non-vectorized UDFs")
    +            case (vectorizedUdfs, plainUdfs) =>
    +              throw new AnalysisException(
    --- End diff --
    
    Why change the exception type?  Can you make a test that causes this?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21650#discussion_r199022212
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala ---
    @@ -94,36 +95,59 @@ object ExtractPythonUDFFromAggregate extends Rule[LogicalPlan] {
      */
     object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper {
     
    -  private def hasPythonUDF(e: Expression): Boolean = {
    +  private def hasScalarPythonUDF(e: Expression): Boolean = {
         e.find(PythonUDF.isScalarPythonUDF).isDefined
       }
     
    -  private def canEvaluateInPython(e: PythonUDF): Boolean = {
    -    e.children match {
    -      // single PythonUDF child could be chained and evaluated in Python
    -      case Seq(u: PythonUDF) => canEvaluateInPython(u)
    -      // Python UDF can't be evaluated directly in JVM
    -      case children => !children.exists(hasPythonUDF)
    +  private def canEvaluateInPython(e: PythonUDF, evalType: Int): Boolean = {
    +    if (e.evalType != evalType) {
    +      false
    +    } else {
    +      e.children match {
    +        // single PythonUDF child could be chained and evaluated in Python
    +        case Seq(u: PythonUDF) => canEvaluateInPython(u, evalType)
    +        // Python UDF can't be evaluated directly in JVM
    +        case children => !children.exists(hasScalarPythonUDF)
    +      }
         }
       }
     
    -  private def collectEvaluatableUDF(expr: Expression): Seq[PythonUDF] = expr match {
    -    case udf: PythonUDF if PythonUDF.isScalarPythonUDF(udf) && canEvaluateInPython(udf) => Seq(udf)
    -    case e => e.children.flatMap(collectEvaluatableUDF)
    +  private def collectEvaluableUDF(expr: Expression, evalType: Int): Seq[PythonUDF] = expr match {
    +    case udf: PythonUDF if PythonUDF.isScalarPythonUDF(udf) && canEvaluateInPython(udf, evalType) =>
    +      Seq(udf)
    +    case e => e.children.flatMap(collectEvaluableUDF(_, evalType))
    +  }
    +
    +  /**
    +   * Collect evaluable UDFs from the current node.
    +   *
    +   * This function collects Python UDFs or Scalar Python UDFs from expressions of the input node,
    +   * and returns a list of UDFs of the same eval type.
    +   *
    +   * If expressions contain both UDFs eval types, this function will only return Python UDFs.
    +   *
    +   * The caller should call this function multiple times until all evaluable UDFs are collected.
    --- End diff --
    
    So this will pipeline UDFs of the same eval type so that they can be processed together in the same call to python worker?  
    
    For example if we have `pandas_udf, pandas_udf, udf, udf` then both `pandas_udfs` will be sent together to the worker, then both `udfs` together - python runner gets executed twice.
    
    On the other hand, if we have `pandas_udf, udf, pandas_udf, udf` then each one will have to be executed at a time, and python runner gets executed 4 times.  Is that right?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1312/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21650#discussion_r199938894
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala ---
    @@ -94,36 +95,59 @@ object ExtractPythonUDFFromAggregate extends Rule[LogicalPlan] {
      */
     object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper {
     
    -  private def hasPythonUDF(e: Expression): Boolean = {
    +  private def hasScalarPythonUDF(e: Expression): Boolean = {
         e.find(PythonUDF.isScalarPythonUDF).isDefined
       }
     
    -  private def canEvaluateInPython(e: PythonUDF): Boolean = {
    -    e.children match {
    -      // single PythonUDF child could be chained and evaluated in Python
    -      case Seq(u: PythonUDF) => canEvaluateInPython(u)
    -      // Python UDF can't be evaluated directly in JVM
    -      case children => !children.exists(hasPythonUDF)
    +  private def canEvaluateInPython(e: PythonUDF, evalType: Int): Boolean = {
    +    if (e.evalType != evalType) {
    +      false
    +    } else {
    +      e.children match {
    +        // single PythonUDF child could be chained and evaluated in Python
    +        case Seq(u: PythonUDF) => canEvaluateInPython(u, evalType)
    +        // Python UDF can't be evaluated directly in JVM
    +        case children => !children.exists(hasScalarPythonUDF)
    +      }
         }
       }
     
    -  private def collectEvaluatableUDF(expr: Expression): Seq[PythonUDF] = expr match {
    -    case udf: PythonUDF if PythonUDF.isScalarPythonUDF(udf) && canEvaluateInPython(udf) => Seq(udf)
    -    case e => e.children.flatMap(collectEvaluatableUDF)
    +  private def collectEvaluableUDF(expr: Expression, evalType: Int): Seq[PythonUDF] = expr match {
    +    case udf: PythonUDF if PythonUDF.isScalarPythonUDF(udf) && canEvaluateInPython(udf, evalType) =>
    +      Seq(udf)
    +    case e => e.children.flatMap(collectEvaluableUDF(_, evalType))
    +  }
    +
    +  /**
    +   * Collect evaluable UDFs from the current node.
    +   *
    +   * This function collects Python UDFs or Scalar Python UDFs from expressions of the input node,
    +   * and returns a list of UDFs of the same eval type.
    --- End diff --
    
    Yeah, that's not a very informative exception but we can fix that later. I made https://issues.apache.org/jira/browse/SPARK-24735 to track.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21650#discussion_r205186820
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala ---
    @@ -94,36 +95,94 @@ object ExtractPythonUDFFromAggregate extends Rule[LogicalPlan] {
      */
     object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper {
     
    -  private def hasPythonUDF(e: Expression): Boolean = {
    +  private case class LazyEvalType(var evalType: Int = -1) {
    --- End diff --
    
    I'm not too fond of the name `LazyEvalType`, makes it sound like something else.  Maybe `CurrentEvalType`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    **[Test build #93451 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93451/testReport)** for PR 21650 at commit [`4c9c007`](https://github.com/apache/spark/commit/4c9c007858aef65c2c190b35673404dd61279369).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21650#discussion_r199026600
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala ---
    @@ -94,36 +95,59 @@ object ExtractPythonUDFFromAggregate extends Rule[LogicalPlan] {
      */
     object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper {
     
    -  private def hasPythonUDF(e: Expression): Boolean = {
    +  private def hasScalarPythonUDF(e: Expression): Boolean = {
         e.find(PythonUDF.isScalarPythonUDF).isDefined
       }
     
    -  private def canEvaluateInPython(e: PythonUDF): Boolean = {
    -    e.children match {
    -      // single PythonUDF child could be chained and evaluated in Python
    -      case Seq(u: PythonUDF) => canEvaluateInPython(u)
    -      // Python UDF can't be evaluated directly in JVM
    -      case children => !children.exists(hasPythonUDF)
    +  private def canEvaluateInPython(e: PythonUDF, evalType: Int): Boolean = {
    +    if (e.evalType != evalType) {
    +      false
    +    } else {
    +      e.children match {
    +        // single PythonUDF child could be chained and evaluated in Python
    +        case Seq(u: PythonUDF) => canEvaluateInPython(u, evalType)
    +        // Python UDF can't be evaluated directly in JVM
    +        case children => !children.exists(hasScalarPythonUDF)
    +      }
         }
       }
     
    -  private def collectEvaluatableUDF(expr: Expression): Seq[PythonUDF] = expr match {
    -    case udf: PythonUDF if PythonUDF.isScalarPythonUDF(udf) && canEvaluateInPython(udf) => Seq(udf)
    -    case e => e.children.flatMap(collectEvaluatableUDF)
    +  private def collectEvaluableUDF(expr: Expression, evalType: Int): Seq[PythonUDF] = expr match {
    +    case udf: PythonUDF if PythonUDF.isScalarPythonUDF(udf) && canEvaluateInPython(udf, evalType) =>
    +      Seq(udf)
    +    case e => e.children.flatMap(collectEvaluableUDF(_, evalType))
    +  }
    +
    +  /**
    +   * Collect evaluable UDFs from the current node.
    +   *
    +   * This function collects Python UDFs or Scalar Python UDFs from expressions of the input node,
    +   * and returns a list of UDFs of the same eval type.
    --- End diff --
    
    Hmm.. It currently will throw an exception in the codegen stage. (Because non-scalar UDF will not be extracted by this rule)
    
    We should probably throw a better exception but I need to think a bit how to do it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    @viirya I have added the query plan output. @maropu I updated the PR title.
    
    Thanks!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21650#discussion_r205127129
  
    --- Diff: python/pyspark/sql/tests.py ---
    @@ -5060,6 +5049,147 @@ def test_type_annotation(self):
             df = self.spark.range(1).select(pandas_udf(f=_locals['noop'], returnType='bigint')('id'))
             self.assertEqual(df.first()[0], 0)
     
    +    def test_mixed_udf(self):
    +        import pandas as pd
    +        from pyspark.sql.functions import col, udf, pandas_udf
    +
    +        df = self.spark.range(0, 1).toDF('v')
    +
    +        # Test mixture of multiple UDFs and Pandas UDFs
    +
    +        @udf('int')
    +        def f1(x):
    +            assert type(x) == int
    +            return x + 1
    +
    +        @pandas_udf('int')
    +        def f2(x):
    +            assert type(x) == pd.Series
    +            return x + 10
    +
    +        @udf('int')
    +        def f3(x):
    +            assert type(x) == int
    +            return x + 100
    +
    +        @pandas_udf('int')
    +        def f4(x):
    +            assert type(x) == pd.Series
    +            return x + 1000
    +
    +        # Test mixed udfs in a single projection
    +        df1 = df \
    +            .withColumn('f1', f1(col('v'))) \
    +            .withColumn('f2', f2(col('v'))) \
    +            .withColumn('f3', f3(col('v'))) \
    +            .withColumn('f4', f4(col('v'))) \
    +            .withColumn('f2_f1', f2(col('f1'))) \
    +            .withColumn('f3_f1', f3(col('f1'))) \
    --- End diff --
    
    Yeah, the way the test is written is that I am trying to test many combinations so some combinations might not be mixed UDF. Do you prefer that I remove these cases?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21650#discussion_r202864194
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala ---
    @@ -97,6 +103,64 @@ class BatchEvalPythonExecSuite extends SparkPlanTest with SharedSQLContext {
         }
         assert(qualifiedPlanNodes.size == 1)
       }
    +
    +  private def collectPythonExec(plan: SparkPlan): Seq[BatchEvalPythonExec] = plan.collect {
    +    case b: BatchEvalPythonExec => b
    +  }
    +
    +  private def collectPandasExec(plan: SparkPlan): Seq[ArrowEvalPythonExec] = plan.collect {
    +    case b: ArrowEvalPythonExec => b
    +  }
    +
    +  test("Chained Python UDFs should be combined to a single physical node") {
    +    val df = Seq(("Hello", 4)).toDF("a", "b")
    +    val df2 = df.withColumn("c", pythonUDF(col("a"))).withColumn("d", pythonUDF(col("c")))
    +    val pythonEvalNodes = collectPythonExec(df2.queryExecution.executedPlan)
    +    assert(pythonEvalNodes.size == 1)
    +  }
    +
    +  test("Chained Pandas UDFs should be combined to a single physical node") {
    +    val df = Seq(("Hello", 4)).toDF("a", "b")
    +    val df2 = df.withColumn("c", pandasUDF(col("a"))).withColumn("d", pandasUDF(col("c")))
    +    val arrowEvalNodes = collectPandasExec(df2.queryExecution.executedPlan)
    +    assert(arrowEvalNodes.size == 1)
    +  }
    +
    +  test("Mixed Python UDFs and Pandas UDF should be separate physical node") {
    +    val df = Seq(("Hello", 4)).toDF("a", "b")
    +    val df2 = df.withColumn("c", pythonUDF(col("a"))).withColumn("d", pandasUDF(col("b")))
    +
    +    val pythonEvalNodes = collectPythonExec(df2.queryExecution.executedPlan)
    +    val arrowEvalNodes = collectPandasExec(df2.queryExecution.executedPlan)
    +    assert(pythonEvalNodes.size == 1)
    +    assert(arrowEvalNodes.size == 1)
    +  }
    +
    +  test("Independent Python UDFs and Pandas UDFs should be combined separately") {
    +    val df = Seq(("Hello", 4)).toDF("a", "b")
    +    val df2 = df.withColumn("c1", pythonUDF(col("a")))
    +      .withColumn("c2", pythonUDF(col("c1")))
    +      .withColumn("d1", pandasUDF(col("a")))
    +      .withColumn("d2", pandasUDF(col("d1")))
    +
    +    val pythonEvalNodes = collectPythonExec(df2.queryExecution.executedPlan)
    +    val arrowEvalNodes = collectPandasExec(df2.queryExecution.executedPlan)
    +    assert(pythonEvalNodes.size == 1)
    +    assert(arrowEvalNodes.size == 1)
    +  }
    +
    +  test("Dependent Python UDFs and Pandas UDFs should not be combined") {
    --- End diff --
    
    "Dependent Python Batched..."


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    **[Test build #93546 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93546/testReport)** for PR 21650 at commit [`2bc906d`](https://github.com/apache/spark/commit/2bc906de5a12dcc452e6855aa30d27021c446e17).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21650#discussion_r205268767
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala ---
    @@ -94,36 +95,94 @@ object ExtractPythonUDFFromAggregate extends Rule[LogicalPlan] {
      */
     object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper {
     
    -  private def hasPythonUDF(e: Expression): Boolean = {
    +  private case class LazyEvalType(var evalType: Int = -1) {
    +
    +    def isSet: Boolean = evalType >= 0
    +
    +    def set(evalType: Int): Unit = {
    +      if (isSet) {
    +        throw new IllegalStateException("Eval type has already been set")
    +      } else {
    +        this.evalType = evalType
    +      }
    +    }
    +
    +    def get(): Int = {
    +      if (!isSet) {
    +        throw new IllegalStateException("Eval type is not set")
    +      } else {
    +        evalType
    +      }
    +    }
    +  }
    +
    +  private def hasScalarPythonUDF(e: Expression): Boolean = {
         e.find(PythonUDF.isScalarPythonUDF).isDefined
       }
     
    -  private def canEvaluateInPython(e: PythonUDF): Boolean = {
    -    e.children match {
    -      // single PythonUDF child could be chained and evaluated in Python
    -      case Seq(u: PythonUDF) => canEvaluateInPython(u)
    -      // Python UDF can't be evaluated directly in JVM
    -      case children => !children.exists(hasPythonUDF)
    +  /**
    +   * Check whether a PythonUDF expression can be evaluated in Python.
    +   *
    +   * If the lazy eval type is not set, this method checks for either Batched Python UDF and Scalar
    +   * Pandas UDF. If the lazy eval type is set, this method checks for the expression of the
    +   * specified eval type.
    +   *
    +   * This method will also set the lazy eval type to be the type of the first evaluable expression,
    +   * i.e., if lazy eval type is not set and we find a evaluable Python UDF expression, lazy eval
    +   * type will be set to the eval type of the expression.
    +   *
    +   */
    +  private def canEvaluateInPython(e: PythonUDF, lazyEvalType: LazyEvalType): Boolean = {
    --- End diff --
    
    Yes it's in the most recent commit.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21650#discussion_r199026769
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala ---
    @@ -166,8 +190,9 @@ object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper {
                   ArrowEvalPythonExec(vectorizedUdfs, child.output ++ resultAttrs, child)
                 case (vectorizedUdfs, plainUdfs) if vectorizedUdfs.isEmpty =>
                   BatchEvalPythonExec(plainUdfs, child.output ++ resultAttrs, child)
    -            case _ =>
    -              throw new IllegalArgumentException("Can not mix vectorized and non-vectorized UDFs")
    +            case (vectorizedUdfs, plainUdfs) =>
    +              throw new AnalysisException(
    --- End diff --
    
    This is because we shouldn't reach here. (Otherwise it's bug). Don't know what's the best exception type here though.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624] Support mixture of Python UDF and Scalar P...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    Build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21650#discussion_r205206127
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala ---
    @@ -94,36 +95,94 @@ object ExtractPythonUDFFromAggregate extends Rule[LogicalPlan] {
      */
     object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper {
     
    -  private def hasPythonUDF(e: Expression): Boolean = {
    +  private case class LazyEvalType(var evalType: Int = -1) {
    +
    +    def isSet: Boolean = evalType >= 0
    +
    +    def set(evalType: Int): Unit = {
    +      if (isSet) {
    +        throw new IllegalStateException("Eval type has already been set")
    +      } else {
    +        this.evalType = evalType
    +      }
    +    }
    +
    +    def get(): Int = {
    +      if (!isSet) {
    +        throw new IllegalStateException("Eval type is not set")
    +      } else {
    +        evalType
    +      }
    +    }
    +  }
    +
    +  private def hasScalarPythonUDF(e: Expression): Boolean = {
         e.find(PythonUDF.isScalarPythonUDF).isDefined
       }
     
    -  private def canEvaluateInPython(e: PythonUDF): Boolean = {
    -    e.children match {
    -      // single PythonUDF child could be chained and evaluated in Python
    -      case Seq(u: PythonUDF) => canEvaluateInPython(u)
    -      // Python UDF can't be evaluated directly in JVM
    -      case children => !children.exists(hasPythonUDF)
    +  /**
    +   * Check whether a PythonUDF expression can be evaluated in Python.
    +   *
    +   * If the lazy eval type is not set, this method checks for either Batched Python UDF and Scalar
    +   * Pandas UDF. If the lazy eval type is set, this method checks for the expression of the
    +   * specified eval type.
    +   *
    +   * This method will also set the lazy eval type to be the type of the first evaluable expression,
    +   * i.e., if lazy eval type is not set and we find a evaluable Python UDF expression, lazy eval
    +   * type will be set to the eval type of the expression.
    +   *
    +   */
    +  private def canEvaluateInPython(e: PythonUDF, lazyEvalType: LazyEvalType): Boolean = {
    --- End diff --
    
    In your code:
    
    ```
      private def canEvaluateInPython(e: PythonUDF, firstEvalType: FirstEvalType): Boolean = {
        if (firstEvalType.isEvalTypeSet() && e.evalType != firstEvalType.evalType) {
          false
        } else {
          firstEvalType.evalType = e.evalType
          e.children match {
            // single PythonUDF child could be chained and evaluated in Python
            case Seq(u: PythonUDF) => canEvaluateInPython(u, firstEvalType)
            // Python UDF can't be evaluated directly in JVM
            case children => !children.exists(hasScalarPythonUDF)
          }
        }
      }
    ```
    
    I think what's confusing part here is that the value of `firstEvalType.evalType` keeps changing while we are traversing the tree, and we could be carrying the value across independent subtrees (i.e., after finish traversing one subtree, the firstEvalType can be set to Scalar Pandas, even we didn't find a evaluable UDF and we never reset it so when we visit another subtree, we could get wrong results). The fact that the evalType keeps changing as we traverse the tree seems very error prone to me.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/93450/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    **[Test build #93667 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93667/testReport)** for PR 21650 at commit [`6b22fea`](https://github.com/apache/spark/commit/6b22fea5b42b40d2eb92d931e76d183518533717).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/557/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21650#discussion_r205275406
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala ---
    @@ -94,36 +95,94 @@ object ExtractPythonUDFFromAggregate extends Rule[LogicalPlan] {
      */
     object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper {
     
    -  private def hasPythonUDF(e: Expression): Boolean = {
    +  private case class LazyEvalType(var evalType: Int = -1) {
    +
    +    def isSet: Boolean = evalType >= 0
    +
    +    def set(evalType: Int): Unit = {
    +      if (isSet) {
    +        throw new IllegalStateException("Eval type has already been set")
    +      } else {
    +        this.evalType = evalType
    +      }
    +    }
    +
    +    def get(): Int = {
    +      if (!isSet) {
    +        throw new IllegalStateException("Eval type is not set")
    +      } else {
    +        evalType
    +      }
    +    }
    +  }
    +
    +  private def hasScalarPythonUDF(e: Expression): Boolean = {
         e.find(PythonUDF.isScalarPythonUDF).isDefined
       }
     
    -  private def canEvaluateInPython(e: PythonUDF): Boolean = {
    -    e.children match {
    -      // single PythonUDF child could be chained and evaluated in Python
    -      case Seq(u: PythonUDF) => canEvaluateInPython(u)
    -      // Python UDF can't be evaluated directly in JVM
    -      case children => !children.exists(hasPythonUDF)
    +  /**
    +   * Check whether a PythonUDF expression can be evaluated in Python.
    +   *
    +   * If the lazy eval type is not set, this method checks for either Batched Python UDF and Scalar
    +   * Pandas UDF. If the lazy eval type is set, this method checks for the expression of the
    +   * specified eval type.
    +   *
    +   * This method will also set the lazy eval type to be the type of the first evaluable expression,
    +   * i.e., if lazy eval type is not set and we find a evaluable Python UDF expression, lazy eval
    +   * type will be set to the eval type of the expression.
    +   *
    +   */
    +  private def canEvaluateInPython(e: PythonUDF, lazyEvalType: LazyEvalType): Boolean = {
    --- End diff --
    
    Ok, I think I see the problem. Since there was a map over `plan.expressions`, a new `FirstEvalType` object was being created for each expression.  Changing this to the following corrected the failure:
    ```
    val setEvalType = new FirstEvalType
    val udfs = plan.expressions.flatMap(collectEvaluableUDFs(_, setEvalType))
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624] Support mixture of Python UDF and Scalar P...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    Build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21650#discussion_r202864128
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala ---
    @@ -97,6 +103,64 @@ class BatchEvalPythonExecSuite extends SparkPlanTest with SharedSQLContext {
         }
         assert(qualifiedPlanNodes.size == 1)
       }
    +
    +  private def collectPythonExec(plan: SparkPlan): Seq[BatchEvalPythonExec] = plan.collect {
    +    case b: BatchEvalPythonExec => b
    +  }
    +
    +  private def collectPandasExec(plan: SparkPlan): Seq[ArrowEvalPythonExec] = plan.collect {
    +    case b: ArrowEvalPythonExec => b
    +  }
    +
    +  test("Chained Python UDFs should be combined to a single physical node") {
    +    val df = Seq(("Hello", 4)).toDF("a", "b")
    +    val df2 = df.withColumn("c", pythonUDF(col("a"))).withColumn("d", pythonUDF(col("c")))
    +    val pythonEvalNodes = collectPythonExec(df2.queryExecution.executedPlan)
    +    assert(pythonEvalNodes.size == 1)
    +  }
    +
    +  test("Chained Pandas UDFs should be combined to a single physical node") {
    +    val df = Seq(("Hello", 4)).toDF("a", "b")
    +    val df2 = df.withColumn("c", pandasUDF(col("a"))).withColumn("d", pandasUDF(col("c")))
    +    val arrowEvalNodes = collectPandasExec(df2.queryExecution.executedPlan)
    +    assert(arrowEvalNodes.size == 1)
    +  }
    +
    +  test("Mixed Python UDFs and Pandas UDF should be separate physical node") {
    +    val df = Seq(("Hello", 4)).toDF("a", "b")
    +    val df2 = df.withColumn("c", pythonUDF(col("a"))).withColumn("d", pandasUDF(col("b")))
    +
    +    val pythonEvalNodes = collectPythonExec(df2.queryExecution.executedPlan)
    +    val arrowEvalNodes = collectPandasExec(df2.queryExecution.executedPlan)
    +    assert(pythonEvalNodes.size == 1)
    +    assert(arrowEvalNodes.size == 1)
    +  }
    +
    +  test("Independent Python UDFs and Pandas UDFs should be combined separately") {
    --- End diff --
    
    "Independent Python Batched UDFs.."


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    I think the previous behavior was to not allow mixing pandas and regular udfs, but you're probably right that there are some cases where data could be handled differently. I'll try to look at this more in depth today.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    **[Test build #93688 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93688/testReport)** for PR 21650 at commit [`f3a45a5`](https://github.com/apache/spark/commit/f3a45a576b6a186f3694e6bd0f22a8198a9d19a2).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    **[Test build #93546 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93546/testReport)** for PR 21650 at commit [`2bc906d`](https://github.com/apache/spark/commit/2bc906de5a12dcc452e6855aa30d27021c446e17).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    >ehh .. @BryanCutler, WDYT about just doing the previous one for now? The approach you suggested sounds efficient of course but.. here's not a hot path so I think the previous way is fine too .. since that's a bit cleaner (but a bit less efficient), and partly the code freeze is close
    
    I didn't make the suggestion for performance, it was because looking at the previous code took me a while before I realized the intent was to find the first evaluable udf then all others matching that eval type. I think the previous code kind of masked that and made it more complicated to follow.
    
    I wasn't really sure how the expression tree was evaluated, so my suggestion didn't handle chained expressions. The problem was the eval type was being set when checking the children nodes, instead it should only be set after all children are determined to be the same type. I'll update the above code again, which passes all tests, as far as I can tell.  I still prefer this approach, but I'm not a sql expert ;)


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1238/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21650#discussion_r205025755
  
    --- Diff: python/pyspark/sql/tests.py ---
    @@ -5487,6 +5617,22 @@ def dummy_pandas_udf(df):
                                                      F.col('temp0.key') == F.col('temp1.key'))
             self.assertEquals(res.count(), 5)
     
    +    def test_mixed_scalar_udfs_followed_by_grouby_apply(self):
    +        # Test Pandas UDF and scalar Python UDF followed by groupby apply
    +        from pyspark.sql.functions import udf, pandas_udf, PandasUDFType
    +        import pandas as pd
    --- End diff --
    
    not a big deal at all really .. but I would swap the import order (thridparty, pyspark)


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    **[Test build #93668 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93668/testReport)** for PR 21650 at commit [`b25936d`](https://github.com/apache/spark/commit/b25936d4c5216904f0ca3cf33df4b5c7130aa8f8).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    **[Test build #93668 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93668/testReport)** for PR 21650 at commit [`b25936d`](https://github.com/apache/spark/commit/b25936d4c5216904f0ca3cf33df4b5c7130aa8f8).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624] Support mixture of Python UDF and Scalar P...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    **[Test build #92400 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92400/testReport)** for PR 21650 at commit [`6b47b69`](https://github.com/apache/spark/commit/6b47b69305257e9ee9f5135968913a4f92731ef5).
     * This patch passes all tests.
     * This patch **does not merge cleanly**.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21650#discussion_r205865029
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala ---
    @@ -94,36 +95,61 @@ object ExtractPythonUDFFromAggregate extends Rule[LogicalPlan] {
      */
     object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper {
     
    -  private def hasPythonUDF(e: Expression): Boolean = {
    +  private case class EvalTypeHolder(private var evalType: Int = -1) {
    --- End diff --
    
    yup. I do avoid nested functions but I found here is where is's needed. If it's clear when it's set and unset within a function, I think the shorter one is fine.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21650#discussion_r202867741
  
    --- Diff: python/pyspark/sql/tests.py ---
    @@ -5060,6 +5049,144 @@ def test_type_annotation(self):
             df = self.spark.range(1).select(pandas_udf(f=_locals['noop'], returnType='bigint')('id'))
             self.assertEqual(df.first()[0], 0)
     
    +    def test_mixed_udf(self):
    +        import pandas as pd
    +        from pyspark.sql.functions import udf, pandas_udf
    +
    +        df = self.spark.range(0, 1).toDF('v')
    +
    +        # Test mixture of multiple UDFs and Pandas UDFs
    +
    +        @udf('int')
    +        def f1(x):
    +            assert type(x) == int
    +            return x + 1
    +
    +        @pandas_udf('int')
    +        def f2(x):
    +            assert type(x) == pd.Series
    +            return x + 10
    +
    +        @udf('int')
    +        def f3(x):
    +            assert type(x) == int
    +            return x + 100
    +
    +        @pandas_udf('int')
    +        def f4(x):
    +            assert type(x) == pd.Series
    +            return x + 1000
    +
    +        # Test mixed udfs in a single projection
    +        df1 = df.withColumn('f1', f1(df['v']))
    +        df1 = df1.withColumn('f2', f2(df1['v']))
    +        df1 = df1.withColumn('f3', f3(df1['v']))
    +        df1 = df1.withColumn('f4', f4(df1['v']))
    +        df1 = df1.withColumn('f2_f1', f2(df1['f1']))
    +        df1 = df1.withColumn('f3_f1', f3(df1['f1']))
    +        df1 = df1.withColumn('f4_f1', f4(df1['f1']))
    +        df1 = df1.withColumn('f3_f2', f3(df1['f2']))
    +        df1 = df1.withColumn('f4_f2', f4(df1['f2']))
    +        df1 = df1.withColumn('f4_f3', f4(df1['f3']))
    +        df1 = df1.withColumn('f3_f2_f1', f3(df1['f2_f1']))
    +        df1 = df1.withColumn('f4_f2_f1', f4(df1['f2_f1']))
    +        df1 = df1.withColumn('f4_f3_f1', f4(df1['f3_f1']))
    +        df1 = df1.withColumn('f4_f3_f2', f4(df1['f3_f2']))
    +        df1 = df1.withColumn('f4_f3_f2_f1', f4(df1['f3_f2_f1']))
    +
    +        # Test mixed udfs in a single expression
    +        df2 = df.withColumn('f1', f1(df['v']))
    +        df2 = df2.withColumn('f2', f2(df['v']))
    +        df2 = df2.withColumn('f3', f3(df['v']))
    +        df2 = df2.withColumn('f4', f4(df['v']))
    +        df2 = df2.withColumn('f2_f1', f2(f1(df['v'])))
    +        df2 = df2.withColumn('f3_f1', f3(f1(df['v'])))
    +        df2 = df2.withColumn('f4_f1', f4(f1(df['v'])))
    +        df2 = df2.withColumn('f3_f2', f3(f2(df['v'])))
    +        df2 = df2.withColumn('f4_f2', f4(f2(df['v'])))
    +        df2 = df2.withColumn('f4_f3', f4(f3(df['v'])))
    +        df2 = df2.withColumn('f3_f2_f1', f3(f2(f1(df['v']))))
    +        df2 = df2.withColumn('f4_f2_f1', f4(f2(f1(df['v']))))
    +        df2 = df2.withColumn('f4_f3_f1', f4(f3(f1(df['v']))))
    +        df2 = df2.withColumn('f4_f3_f2', f4(f3(f2(df['v']))))
    +        df2 = df2.withColumn('f4_f3_f2_f1', f4(f3(f2(f1(df['v'])))))
    +
    +        # expected result
    +        df3 = df.withColumn('f1', df['v'] + 1)
    +        df3 = df3.withColumn('f2', df['v'] + 10)
    +        df3 = df3.withColumn('f3', df['v'] + 100)
    +        df3 = df3.withColumn('f4', df['v'] + 1000)
    +        df3 = df3.withColumn('f2_f1', df['v'] + 11)
    +        df3 = df3.withColumn('f3_f1', df['v'] + 101)
    +        df3 = df3.withColumn('f4_f1', df['v'] + 1001)
    +        df3 = df3.withColumn('f3_f2', df['v'] + 110)
    +        df3 = df3.withColumn('f4_f2', df['v'] + 1010)
    +        df3 = df3.withColumn('f4_f3', df['v'] + 1100)
    +        df3 = df3.withColumn('f3_f2_f1', df['v'] + 111)
    +        df3 = df3.withColumn('f4_f2_f1', df['v'] + 1011)
    +        df3 = df3.withColumn('f4_f3_f1', df['v'] + 1101)
    +        df3 = df3.withColumn('f4_f3_f2', df['v'] + 1110)
    +        df3 = df3.withColumn('f4_f3_f2_f1', df['v'] + 1111)
    +
    +        self.assertEquals(df3.collect(), df1.collect())
    +        self.assertEquals(df3.collect(), df2.collect())
    +
    +    def test_mixed_udf_and_sql(self):
    +        import pandas as pd
    +        from pyspark.sql.functions import udf, pandas_udf
    +
    +        df = self.spark.range(0, 1).toDF('v')
    +
    +        # Test mixture of UDFs, Pandas UDFs and SQL expression.
    +
    +        @udf('int')
    +        def f1(x):
    +            assert type(x) == int
    +            return x + 1
    +
    +        def f2(x):
    +            return x + 10
    +
    +        @pandas_udf('int')
    +        def f3(x):
    +            assert type(x) == pd.Series
    +            return x + 100
    +
    +        df1 = df.withColumn('f1', f1(df['v']))
    +        df1 = df1.withColumn('f2', f2(df['v']))
    +        df1 = df1.withColumn('f3', f3(df['v']))
    +        df1 = df1.withColumn('f1_f2', f1(f2(df['v'])))
    +        df1 = df1.withColumn('f1_f3', f1(f3(df['v'])))
    +        df1 = df1.withColumn('f2_f1', f2(f1(df['v'])))
    +        df1 = df1.withColumn('f2_f3', f2(f3(df['v'])))
    +        df1 = df1.withColumn('f3_f1', f3(f1(df['v'])))
    +        df1 = df1.withColumn('f3_f2', f3(f2(df['v'])))
    +        df1 = df1.withColumn('f1_f2_f3', f1(f2(f3(df['v']))))
    +        df1 = df1.withColumn('f1_f3_f2', f1(f3(f2(df['v']))))
    +        df1 = df1.withColumn('f2_f1_f3', f2(f1(f3(df['v']))))
    +        df1 = df1.withColumn('f2_f3_f1', f2(f3(f1(df['v']))))
    +        df1 = df1.withColumn('f3_f1_f2', f3(f1(f2(df['v']))))
    +        df1 = df1.withColumn('f3_f2_f1', f3(f2(f1(df['v']))))
    +
    +        # expected result
    +        df2 = df.withColumn('f1', df['v'] + 1)
    +        df2 = df2.withColumn('f2', df['v'] + 10)
    +        df2 = df2.withColumn('f3', df['v'] + 100)
    +        df2 = df2.withColumn('f1_f2', df['v'] + 11)
    +        df2 = df2.withColumn('f1_f3', df['v'] + 101)
    +        df2 = df2.withColumn('f2_f1', df['v'] + 11)
    +        df2 = df2.withColumn('f2_f3', df['v'] + 110)
    +        df2 = df2.withColumn('f3_f1', df['v'] + 101)
    +        df2 = df2.withColumn('f3_f2', df['v'] + 110)
    +        df2 = df2.withColumn('f1_f2_f3', df['v'] + 111)
    +        df2 = df2.withColumn('f1_f3_f2', df['v'] + 111)
    +        df2 = df2.withColumn('f2_f1_f3', df['v'] + 111)
    +        df2 = df2.withColumn('f2_f3_f1', df['v'] + 111)
    +        df2 = df2.withColumn('f3_f1_f2', df['v'] + 111)
    +        df2 = df2.withColumn('f3_f2_f1', df['v'] + 111)
    +
    +        self.assertEquals(df2.collect(), df1.collect())
    --- End diff --
    
    I think it would be better to combine this test with the one above and construct it as a list of cases that you could loop over instead of so many blocks of `withColumn`s.  Something like
    
    ```
    class TestCase():
        def __init__(self, col_name, col_expected, col_projection, col_udf_expression, col_sql_expression):
            ...
    
    cases = [
        TestCase('f4_f3_f2_f1', df['v'] + 1111, f4(df1['f3_f2_f1']), f4(f3(f2(f1(df['v']))), f4(f3(f1(df['v']) + 10)))
        ...]
    
    expected_df = df
    
    for case in cases:
        expected_df = expected_df.with_column(case.col_name, case.col_expected)
        ....
    
    self.assertEquals(expected_df.collect(), projection_df.collect())
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21650#discussion_r204447950
  
    --- Diff: python/pyspark/sql/tests.py ---
    @@ -5060,6 +5049,144 @@ def test_type_annotation(self):
             df = self.spark.range(1).select(pandas_udf(f=_locals['noop'], returnType='bigint')('id'))
             self.assertEqual(df.first()[0], 0)
     
    +    def test_mixed_udf(self):
    +        import pandas as pd
    +        from pyspark.sql.functions import udf, pandas_udf
    +
    +        df = self.spark.range(0, 1).toDF('v')
    +
    +        # Test mixture of multiple UDFs and Pandas UDFs
    +
    +        @udf('int')
    +        def f1(x):
    +            assert type(x) == int
    +            return x + 1
    +
    +        @pandas_udf('int')
    +        def f2(x):
    +            assert type(x) == pd.Series
    +            return x + 10
    +
    +        @udf('int')
    +        def f3(x):
    +            assert type(x) == int
    +            return x + 100
    +
    +        @pandas_udf('int')
    +        def f4(x):
    +            assert type(x) == pd.Series
    +            return x + 1000
    +
    +        # Test mixed udfs in a single projection
    +        df1 = df.withColumn('f1', f1(df['v']))
    +        df1 = df1.withColumn('f2', f2(df1['v']))
    +        df1 = df1.withColumn('f3', f3(df1['v']))
    +        df1 = df1.withColumn('f4', f4(df1['v']))
    +        df1 = df1.withColumn('f2_f1', f2(df1['f1']))
    +        df1 = df1.withColumn('f3_f1', f3(df1['f1']))
    +        df1 = df1.withColumn('f4_f1', f4(df1['f1']))
    +        df1 = df1.withColumn('f3_f2', f3(df1['f2']))
    +        df1 = df1.withColumn('f4_f2', f4(df1['f2']))
    +        df1 = df1.withColumn('f4_f3', f4(df1['f3']))
    +        df1 = df1.withColumn('f3_f2_f1', f3(df1['f2_f1']))
    +        df1 = df1.withColumn('f4_f2_f1', f4(df1['f2_f1']))
    +        df1 = df1.withColumn('f4_f3_f1', f4(df1['f3_f1']))
    +        df1 = df1.withColumn('f4_f3_f2', f4(df1['f3_f2']))
    +        df1 = df1.withColumn('f4_f3_f2_f1', f4(df1['f3_f2_f1']))
    +
    +        # Test mixed udfs in a single expression
    +        df2 = df.withColumn('f1', f1(df['v']))
    +        df2 = df2.withColumn('f2', f2(df['v']))
    +        df2 = df2.withColumn('f3', f3(df['v']))
    +        df2 = df2.withColumn('f4', f4(df['v']))
    +        df2 = df2.withColumn('f2_f1', f2(f1(df['v'])))
    +        df2 = df2.withColumn('f3_f1', f3(f1(df['v'])))
    +        df2 = df2.withColumn('f4_f1', f4(f1(df['v'])))
    +        df2 = df2.withColumn('f3_f2', f3(f2(df['v'])))
    +        df2 = df2.withColumn('f4_f2', f4(f2(df['v'])))
    +        df2 = df2.withColumn('f4_f3', f4(f3(df['v'])))
    +        df2 = df2.withColumn('f3_f2_f1', f3(f2(f1(df['v']))))
    +        df2 = df2.withColumn('f4_f2_f1', f4(f2(f1(df['v']))))
    +        df2 = df2.withColumn('f4_f3_f1', f4(f3(f1(df['v']))))
    +        df2 = df2.withColumn('f4_f3_f2', f4(f3(f2(df['v']))))
    +        df2 = df2.withColumn('f4_f3_f2_f1', f4(f3(f2(f1(df['v'])))))
    +
    +        # expected result
    +        df3 = df.withColumn('f1', df['v'] + 1)
    +        df3 = df3.withColumn('f2', df['v'] + 10)
    +        df3 = df3.withColumn('f3', df['v'] + 100)
    +        df3 = df3.withColumn('f4', df['v'] + 1000)
    +        df3 = df3.withColumn('f2_f1', df['v'] + 11)
    +        df3 = df3.withColumn('f3_f1', df['v'] + 101)
    +        df3 = df3.withColumn('f4_f1', df['v'] + 1001)
    +        df3 = df3.withColumn('f3_f2', df['v'] + 110)
    +        df3 = df3.withColumn('f4_f2', df['v'] + 1010)
    +        df3 = df3.withColumn('f4_f3', df['v'] + 1100)
    +        df3 = df3.withColumn('f3_f2_f1', df['v'] + 111)
    +        df3 = df3.withColumn('f4_f2_f1', df['v'] + 1011)
    +        df3 = df3.withColumn('f4_f3_f1', df['v'] + 1101)
    +        df3 = df3.withColumn('f4_f3_f2', df['v'] + 1110)
    +        df3 = df3.withColumn('f4_f3_f2_f1', df['v'] + 1111)
    +
    +        self.assertEquals(df3.collect(), df1.collect())
    +        self.assertEquals(df3.collect(), df2.collect())
    +
    +    def test_mixed_udf_and_sql(self):
    +        import pandas as pd
    +        from pyspark.sql.functions import udf, pandas_udf
    +
    +        df = self.spark.range(0, 1).toDF('v')
    +
    +        # Test mixture of UDFs, Pandas UDFs and SQL expression.
    +
    +        @udf('int')
    +        def f1(x):
    +            assert type(x) == int
    +            return x + 1
    +
    +        def f2(x):
    +            return x + 10
    +
    +        @pandas_udf('int')
    +        def f3(x):
    +            assert type(x) == pd.Series
    +            return x + 100
    +
    +        df1 = df.withColumn('f1', f1(df['v']))
    +        df1 = df1.withColumn('f2', f2(df['v']))
    +        df1 = df1.withColumn('f3', f3(df['v']))
    +        df1 = df1.withColumn('f1_f2', f1(f2(df['v'])))
    +        df1 = df1.withColumn('f1_f3', f1(f3(df['v'])))
    +        df1 = df1.withColumn('f2_f1', f2(f1(df['v'])))
    +        df1 = df1.withColumn('f2_f3', f2(f3(df['v'])))
    +        df1 = df1.withColumn('f3_f1', f3(f1(df['v'])))
    +        df1 = df1.withColumn('f3_f2', f3(f2(df['v'])))
    +        df1 = df1.withColumn('f1_f2_f3', f1(f2(f3(df['v']))))
    +        df1 = df1.withColumn('f1_f3_f2', f1(f3(f2(df['v']))))
    +        df1 = df1.withColumn('f2_f1_f3', f2(f1(f3(df['v']))))
    +        df1 = df1.withColumn('f2_f3_f1', f2(f3(f1(df['v']))))
    +        df1 = df1.withColumn('f3_f1_f2', f3(f1(f2(df['v']))))
    +        df1 = df1.withColumn('f3_f2_f1', f3(f2(f1(df['v']))))
    +
    +        # expected result
    +        df2 = df.withColumn('f1', df['v'] + 1)
    +        df2 = df2.withColumn('f2', df['v'] + 10)
    +        df2 = df2.withColumn('f3', df['v'] + 100)
    +        df2 = df2.withColumn('f1_f2', df['v'] + 11)
    +        df2 = df2.withColumn('f1_f3', df['v'] + 101)
    +        df2 = df2.withColumn('f2_f1', df['v'] + 11)
    +        df2 = df2.withColumn('f2_f3', df['v'] + 110)
    +        df2 = df2.withColumn('f3_f1', df['v'] + 101)
    +        df2 = df2.withColumn('f3_f2', df['v'] + 110)
    +        df2 = df2.withColumn('f1_f2_f3', df['v'] + 111)
    +        df2 = df2.withColumn('f1_f3_f2', df['v'] + 111)
    +        df2 = df2.withColumn('f2_f1_f3', df['v'] + 111)
    +        df2 = df2.withColumn('f2_f3_f1', df['v'] + 111)
    +        df2 = df2.withColumn('f3_f1_f2', df['v'] + 111)
    +        df2 = df2.withColumn('f3_f2_f1', df['v'] + 111)
    +
    +        self.assertEquals(df2.collect(), df1.collect())
    --- End diff --
    
    Sorry, could  you please elaborate a bit? e.g.
    
    ```
    TestCase('f4_f3_f2_f1', df['v'] + 1111, f4(df1['f3_f2_f1']), f4(f3(f2(f1(df['v']))), f4(f3(f1(df['v']) + 10)))
    ```
    How is `df1['f3_f2_f1']` defined in this test case?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21650#discussion_r205859891
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala ---
    @@ -94,36 +95,61 @@ object ExtractPythonUDFFromAggregate extends Rule[LogicalPlan] {
      */
     object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper {
     
    -  private def hasPythonUDF(e: Expression): Boolean = {
    +  private case class EvalTypeHolder(private var evalType: Int = -1) {
    --- End diff --
    
    I see... You uses a var and nested function definition and var to remove the need of a holder object. 
    
    IMHO I usually find nested function definition and function that refers to variable outside its definition scope hard to read, but it could be my personal preference. 
    
    Another thing I like about the current impl the is `EvalTypeHolder` class ensures its value is ever changed once it's set so I think that's more robust.
    
    That being said, I am ok with your suggestions too if you insist or @BryanCutler also prefers it.
    



---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21650#discussion_r205243011
  
    --- Diff: python/pyspark/sql/tests.py ---
    @@ -5060,6 +5049,147 @@ def test_type_annotation(self):
             df = self.spark.range(1).select(pandas_udf(f=_locals['noop'], returnType='bigint')('id'))
             self.assertEqual(df.first()[0], 0)
     
    +    def test_mixed_udf(self):
    +        import pandas as pd
    +        from pyspark.sql.functions import col, udf, pandas_udf
    +
    +        df = self.spark.range(0, 1).toDF('v')
    +
    +        # Test mixture of multiple UDFs and Pandas UDFs
    +
    +        @udf('int')
    +        def f1(x):
    +            assert type(x) == int
    +            return x + 1
    +
    +        @pandas_udf('int')
    +        def f2(x):
    +            assert type(x) == pd.Series
    +            return x + 10
    +
    +        @udf('int')
    +        def f3(x):
    +            assert type(x) == int
    +            return x + 100
    +
    +        @pandas_udf('int')
    +        def f4(x):
    +            assert type(x) == pd.Series
    +            return x + 1000
    +
    +        # Test mixed udfs in a single projection
    +        df1 = df \
    +            .withColumn('f1', f1(col('v'))) \
    +            .withColumn('f2', f2(col('v'))) \
    +            .withColumn('f3', f3(col('v'))) \
    +            .withColumn('f4', f4(col('v'))) \
    +            .withColumn('f2_f1', f2(col('f1'))) \
    +            .withColumn('f3_f1', f3(col('f1'))) \
    +            .withColumn('f4_f1', f4(col('f1'))) \
    +            .withColumn('f3_f2', f3(col('f2'))) \
    +            .withColumn('f4_f2', f4(col('f2'))) \
    +            .withColumn('f4_f3', f4(col('f3'))) \
    +            .withColumn('f3_f2_f1', f3(col('f2_f1'))) \
    +            .withColumn('f4_f2_f1', f4(col('f2_f1'))) \
    +            .withColumn('f4_f3_f1', f4(col('f3_f1'))) \
    +            .withColumn('f4_f3_f2', f4(col('f3_f2'))) \
    +            .withColumn('f4_f3_f2_f1', f4(col('f3_f2_f1')))
    +
    +        # Test mixed udfs in a single expression
    +        df2 = df \
    +            .withColumn('f1', f1(col('v'))) \
    +            .withColumn('f2', f2(col('v'))) \
    +            .withColumn('f3', f3(col('v'))) \
    +            .withColumn('f4', f4(col('v'))) \
    +            .withColumn('f2_f1', f2(f1(col('v')))) \
    +            .withColumn('f3_f1', f3(f1(col('v')))) \
    +            .withColumn('f4_f1', f4(f1(col('v')))) \
    +            .withColumn('f3_f2', f3(f2(col('v')))) \
    +            .withColumn('f4_f2', f4(f2(col('v')))) \
    +            .withColumn('f4_f3', f4(f3(col('v')))) \
    +            .withColumn('f3_f2_f1', f3(f2(f1(col('v'))))) \
    +            .withColumn('f4_f2_f1', f4(f2(f1(col('v'))))) \
    +            .withColumn('f4_f3_f1', f4(f3(f1(col('v'))))) \
    +            .withColumn('f4_f3_f2', f4(f3(f2(col('v'))))) \
    +            .withColumn('f4_f3_f2_f1', f4(f3(f2(f1(col('v'))))))
    +
    +        # expected result
    +        df3 = df \
    +            .withColumn('f1', df['v'] + 1) \
    +            .withColumn('f2', df['v'] + 10) \
    +            .withColumn('f3', df['v'] + 100) \
    +            .withColumn('f4', df['v'] + 1000) \
    +            .withColumn('f2_f1', df['v'] + 11) \
    +            .withColumn('f3_f1', df['v'] + 101) \
    +            .withColumn('f4_f1', df['v'] + 1001) \
    +            .withColumn('f3_f2', df['v'] + 110) \
    +            .withColumn('f4_f2', df['v'] + 1010) \
    +            .withColumn('f4_f3', df['v'] + 1100) \
    +            .withColumn('f3_f2_f1', df['v'] + 111) \
    +            .withColumn('f4_f2_f1', df['v'] + 1011) \
    +            .withColumn('f4_f3_f1', df['v'] + 1101) \
    +            .withColumn('f4_f3_f2', df['v'] + 1110) \
    +            .withColumn('f4_f3_f2_f1', df['v'] + 1111)
    +
    +        self.assertEquals(df3.collect(), df1.collect())
    +        self.assertEquals(df3.collect(), df2.collect())
    +
    +    def test_mixed_udf_and_sql(self):
    +        import pandas as pd
    +        from pyspark.sql.functions import udf, pandas_udf
    +
    +        df = self.spark.range(0, 1).toDF('v')
    +
    +        # Test mixture of UDFs, Pandas UDFs and SQL expression.
    +
    +        @udf('int')
    +        def f1(x):
    +            assert type(x) == int
    +            return x + 1
    +
    +        def f2(x):
    +            return x + 10
    +
    +        @pandas_udf('int')
    +        def f3(x):
    +            assert type(x) == pd.Series
    +            return x + 100
    +
    +        df1 = df.withColumn('f1', f1(df['v'])) \
    +            .withColumn('f2', f2(df['v'])) \
    +            .withColumn('f3', f3(df['v'])) \
    +            .withColumn('f1_f2', f1(f2(df['v']))) \
    +            .withColumn('f1_f3', f1(f3(df['v']))) \
    +            .withColumn('f2_f1', f2(f1(df['v']))) \
    +            .withColumn('f2_f3', f2(f3(df['v']))) \
    +            .withColumn('f3_f1', f3(f1(df['v']))) \
    --- End diff --
    
    I see. I don't think it's necessary (we are only likely to remove a few cases and like you said, the test time is virtually the same) and helps the readability of the tests (so it doesn't look like some test cases are missed). 
    
    But if that's the preferred practice I can remove duplicate cases in the next commit.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21650#discussion_r199167215
  
    --- Diff: python/pyspark/sql/tests.py ---
    @@ -5060,6 +5049,138 @@ def test_type_annotation(self):
             df = self.spark.range(1).select(pandas_udf(f=_locals['noop'], returnType='bigint')('id'))
             self.assertEqual(df.first()[0], 0)
     
    +    def test_mixed_udf(self):
    +        import pandas as pd
    +        from pyspark.sql.functions import udf, pandas_udf
    +
    +        df = self.spark.range(0, 1).toDF('v')
    +
    +        @udf('int')
    +        def f1(x):
    +            assert type(x) == int
    +            return x + 1
    +
    +        @pandas_udf('int')
    +        def f2(x):
    +            assert type(x) == pd.Series
    +            return x + 10
    +
    +        @udf('int')
    +        def f3(x):
    +            assert type(x) == int
    +            return x + 100
    +
    +        @pandas_udf('int')
    +        def f4(x):
    +            assert type(x) == pd.Series
    +            return x + 1000
    +
    +        # Test mixed udfs in a single projection
    +        df1 = df.withColumn('f1', f1(df['v']))
    +        df1 = df1.withColumn('f2', f2(df1['v']))
    +        df1 = df1.withColumn('f3', f3(df1['v']))
    +        df1 = df1.withColumn('f4', f4(df1['v']))
    +        df1 = df1.withColumn('f2_f1', f2(df1['f1']))
    +        df1 = df1.withColumn('f3_f1', f3(df1['f1']))
    +        df1 = df1.withColumn('f4_f1', f4(df1['f1']))
    +        df1 = df1.withColumn('f3_f2', f3(df1['f2']))
    +        df1 = df1.withColumn('f4_f2', f4(df1['f2']))
    +        df1 = df1.withColumn('f4_f3', f4(df1['f3']))
    +        df1 = df1.withColumn('f3_f2_f1', f3(df1['f2_f1']))
    +        df1 = df1.withColumn('f4_f2_f1', f4(df1['f2_f1']))
    +        df1 = df1.withColumn('f4_f3_f1', f4(df1['f3_f1']))
    +        df1 = df1.withColumn('f4_f3_f2', f4(df1['f3_f2']))
    +        df1 = df1.withColumn('f4_f3_f2_f1', f4(df1['f3_f2_f1']))
    +
    +        # Test mixed udfs in a single expression
    +        df2 = df.withColumn('f1', f1(df['v']))
    +        df2 = df2.withColumn('f2', f2(df['v']))
    +        df2 = df2.withColumn('f3', f3(df['v']))
    +        df2 = df2.withColumn('f4', f4(df['v']))
    +        df2 = df2.withColumn('f2_f1', f2(f1(df['v'])))
    +        df2 = df2.withColumn('f3_f1', f3(f1(df['v'])))
    +        df2 = df2.withColumn('f4_f1', f4(f1(df['v'])))
    +        df2 = df2.withColumn('f3_f2', f3(f2(df['v'])))
    +        df2 = df2.withColumn('f4_f2', f4(f2(df['v'])))
    +        df2 = df2.withColumn('f4_f3', f4(f3(df['v'])))
    +        df2 = df2.withColumn('f3_f2_f1', f3(f2(f1(df['v']))))
    +        df2 = df2.withColumn('f4_f2_f1', f4(f2(f1(df['v']))))
    +        df2 = df2.withColumn('f4_f3_f1', f4(f3(f1(df['v']))))
    +        df2 = df2.withColumn('f4_f3_f2', f4(f3(f2(df['v']))))
    +        df2 = df2.withColumn('f4_f3_f2_f1', f4(f3(f2(f1(df['v'])))))
    +
    +        df3 = df.withColumn('f1', df['v'] + 1)
    +        df3 = df3.withColumn('f2', df['v'] + 10)
    +        df3 = df3.withColumn('f3', df['v'] + 100)
    +        df3 = df3.withColumn('f4', df['v'] + 1000)
    +        df3 = df3.withColumn('f2_f1', df['v'] + 11)
    +        df3 = df3.withColumn('f3_f1', df['v'] + 101)
    +        df3 = df3.withColumn('f4_f1', df['v'] + 1001)
    +        df3 = df3.withColumn('f3_f2', df['v'] + 110)
    +        df3 = df3.withColumn('f4_f2', df['v'] + 1010)
    +        df3 = df3.withColumn('f4_f3', df['v'] + 1100)
    +        df3 = df3.withColumn('f3_f2_f1', df['v'] + 111)
    +        df3 = df3.withColumn('f4_f2_f1', df['v'] + 1011)
    +        df3 = df3.withColumn('f4_f3_f1', df['v'] + 1101)
    +        df3 = df3.withColumn('f4_f3_f2', df['v'] + 1110)
    +        df3 = df3.withColumn('f4_f3_f2_f1', df['v'] + 1111)
    +
    +        self.assertEquals(df3.collect(), df1.collect())
    +        self.assertEquals(df3.collect(), df2.collect())
    +
    +    def test_mixed_udf_and_sql(self):
    +        import pandas as pd
    +        from pyspark.sql.functions import udf, pandas_udf
    +
    +        df = self.spark.range(0, 1).toDF('v')
    +
    +        @udf('int')
    +        def f1(x):
    +            assert type(x) == int
    +            return x + 1
    +
    +        def f2(x):
    --- End diff --
    
    Yes, the purpose is to test mixing udf, pandas_udf and sql expression. I will add comments to make it clearer.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/92482/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624] Support mixture of Python UDF and Scalar P...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/526/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21650#discussion_r205311130
  
    --- Diff: python/pyspark/sql/tests.py ---
    @@ -5060,6 +5049,147 @@ def test_type_annotation(self):
             df = self.spark.range(1).select(pandas_udf(f=_locals['noop'], returnType='bigint')('id'))
             self.assertEqual(df.first()[0], 0)
     
    +    def test_mixed_udf(self):
    +        import pandas as pd
    +        from pyspark.sql.functions import col, udf, pandas_udf
    +
    +        df = self.spark.range(0, 1).toDF('v')
    +
    +        # Test mixture of multiple UDFs and Pandas UDFs
    +
    +        @udf('int')
    +        def f1(x):
    +            assert type(x) == int
    +            return x + 1
    +
    +        @pandas_udf('int')
    +        def f2(x):
    +            assert type(x) == pd.Series
    +            return x + 10
    +
    +        @udf('int')
    +        def f3(x):
    +            assert type(x) == int
    +            return x + 100
    +
    +        @pandas_udf('int')
    +        def f4(x):
    +            assert type(x) == pd.Series
    +            return x + 1000
    +
    +        # Test mixed udfs in a single projection
    +        df1 = df \
    +            .withColumn('f1', f1(col('v'))) \
    +            .withColumn('f2', f2(col('v'))) \
    +            .withColumn('f3', f3(col('v'))) \
    +            .withColumn('f4', f4(col('v'))) \
    +            .withColumn('f2_f1', f2(col('f1'))) \
    +            .withColumn('f3_f1', f3(col('f1'))) \
    +            .withColumn('f4_f1', f4(col('f1'))) \
    +            .withColumn('f3_f2', f3(col('f2'))) \
    +            .withColumn('f4_f2', f4(col('f2'))) \
    +            .withColumn('f4_f3', f4(col('f3'))) \
    +            .withColumn('f3_f2_f1', f3(col('f2_f1'))) \
    +            .withColumn('f4_f2_f1', f4(col('f2_f1'))) \
    +            .withColumn('f4_f3_f1', f4(col('f3_f1'))) \
    +            .withColumn('f4_f3_f2', f4(col('f3_f2'))) \
    +            .withColumn('f4_f3_f2_f1', f4(col('f3_f2_f1')))
    +
    +        # Test mixed udfs in a single expression
    +        df2 = df \
    +            .withColumn('f1', f1(col('v'))) \
    +            .withColumn('f2', f2(col('v'))) \
    +            .withColumn('f3', f3(col('v'))) \
    +            .withColumn('f4', f4(col('v'))) \
    +            .withColumn('f2_f1', f2(f1(col('v')))) \
    +            .withColumn('f3_f1', f3(f1(col('v')))) \
    +            .withColumn('f4_f1', f4(f1(col('v')))) \
    +            .withColumn('f3_f2', f3(f2(col('v')))) \
    +            .withColumn('f4_f2', f4(f2(col('v')))) \
    +            .withColumn('f4_f3', f4(f3(col('v')))) \
    +            .withColumn('f3_f2_f1', f3(f2(f1(col('v'))))) \
    +            .withColumn('f4_f2_f1', f4(f2(f1(col('v'))))) \
    +            .withColumn('f4_f3_f1', f4(f3(f1(col('v'))))) \
    +            .withColumn('f4_f3_f2', f4(f3(f2(col('v'))))) \
    +            .withColumn('f4_f3_f2_f1', f4(f3(f2(f1(col('v'))))))
    +
    +        # expected result
    +        df3 = df \
    +            .withColumn('f1', df['v'] + 1) \
    +            .withColumn('f2', df['v'] + 10) \
    +            .withColumn('f3', df['v'] + 100) \
    +            .withColumn('f4', df['v'] + 1000) \
    +            .withColumn('f2_f1', df['v'] + 11) \
    +            .withColumn('f3_f1', df['v'] + 101) \
    +            .withColumn('f4_f1', df['v'] + 1001) \
    +            .withColumn('f3_f2', df['v'] + 110) \
    +            .withColumn('f4_f2', df['v'] + 1010) \
    +            .withColumn('f4_f3', df['v'] + 1100) \
    +            .withColumn('f3_f2_f1', df['v'] + 111) \
    +            .withColumn('f4_f2_f1', df['v'] + 1011) \
    +            .withColumn('f4_f3_f1', df['v'] + 1101) \
    +            .withColumn('f4_f3_f2', df['v'] + 1110) \
    +            .withColumn('f4_f3_f2_f1', df['v'] + 1111)
    +
    +        self.assertEquals(df3.collect(), df1.collect())
    +        self.assertEquals(df3.collect(), df2.collect())
    +
    +    def test_mixed_udf_and_sql(self):
    +        import pandas as pd
    +        from pyspark.sql.functions import udf, pandas_udf
    +
    +        df = self.spark.range(0, 1).toDF('v')
    +
    +        # Test mixture of UDFs, Pandas UDFs and SQL expression.
    +
    +        @udf('int')
    +        def f1(x):
    +            assert type(x) == int
    +            return x + 1
    +
    +        def f2(x):
    +            return x + 10
    +
    +        @pandas_udf('int')
    +        def f3(x):
    +            assert type(x) == pd.Series
    +            return x + 100
    +
    +        df1 = df.withColumn('f1', f1(df['v'])) \
    +            .withColumn('f2', f2(df['v'])) \
    +            .withColumn('f3', f3(df['v'])) \
    +            .withColumn('f1_f2', f1(f2(df['v']))) \
    +            .withColumn('f1_f3', f1(f3(df['v']))) \
    +            .withColumn('f2_f1', f2(f1(df['v']))) \
    +            .withColumn('f2_f3', f2(f3(df['v']))) \
    +            .withColumn('f3_f1', f3(f1(df['v']))) \
    --- End diff --
    
    I am okay to leave it too here since it's clear they are virtually the same but let's remove duplicated tests or orthogonal tests next time.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624] Support mixture of Python UDF and Scalar P...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    **[Test build #92401 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92401/testReport)** for PR 21650 at commit [`be3b99c`](https://github.com/apache/spark/commit/be3b99c951c3df77eace0a6a124f8f9a94ac804c).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1403/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21650#discussion_r205267754
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala ---
    @@ -94,36 +95,94 @@ object ExtractPythonUDFFromAggregate extends Rule[LogicalPlan] {
      */
     object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper {
     
    -  private def hasPythonUDF(e: Expression): Boolean = {
    +  private case class LazyEvalType(var evalType: Int = -1) {
    +
    +    def isSet: Boolean = evalType >= 0
    +
    +    def set(evalType: Int): Unit = {
    +      if (isSet) {
    +        throw new IllegalStateException("Eval type has already been set")
    +      } else {
    +        this.evalType = evalType
    +      }
    +    }
    +
    +    def get(): Int = {
    +      if (!isSet) {
    +        throw new IllegalStateException("Eval type is not set")
    +      } else {
    +        evalType
    +      }
    +    }
    +  }
    +
    +  private def hasScalarPythonUDF(e: Expression): Boolean = {
         e.find(PythonUDF.isScalarPythonUDF).isDefined
       }
     
    -  private def canEvaluateInPython(e: PythonUDF): Boolean = {
    -    e.children match {
    -      // single PythonUDF child could be chained and evaluated in Python
    -      case Seq(u: PythonUDF) => canEvaluateInPython(u)
    -      // Python UDF can't be evaluated directly in JVM
    -      case children => !children.exists(hasPythonUDF)
    +  /**
    +   * Check whether a PythonUDF expression can be evaluated in Python.
    +   *
    +   * If the lazy eval type is not set, this method checks for either Batched Python UDF and Scalar
    +   * Pandas UDF. If the lazy eval type is set, this method checks for the expression of the
    +   * specified eval type.
    +   *
    +   * This method will also set the lazy eval type to be the type of the first evaluable expression,
    +   * i.e., if lazy eval type is not set and we find a evaluable Python UDF expression, lazy eval
    +   * type will be set to the eval type of the expression.
    +   *
    +   */
    +  private def canEvaluateInPython(e: PythonUDF, lazyEvalType: LazyEvalType): Boolean = {
    --- End diff --
    
    Is the above test part of sql/tests.py?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    **[Test build #93451 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93451/testReport)** for PR 21650 at commit [`4c9c007`](https://github.com/apache/spark/commit/4c9c007858aef65c2c190b35673404dd61279369).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/93546/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21650#discussion_r205141733
  
    --- Diff: python/pyspark/sql/tests.py ---
    @@ -5060,6 +5049,147 @@ def test_type_annotation(self):
             df = self.spark.range(1).select(pandas_udf(f=_locals['noop'], returnType='bigint')('id'))
             self.assertEqual(df.first()[0], 0)
     
    +    def test_mixed_udf(self):
    +        import pandas as pd
    +        from pyspark.sql.functions import col, udf, pandas_udf
    +
    +        df = self.spark.range(0, 1).toDF('v')
    +
    +        # Test mixture of multiple UDFs and Pandas UDFs
    +
    +        @udf('int')
    +        def f1(x):
    +            assert type(x) == int
    +            return x + 1
    +
    +        @pandas_udf('int')
    +        def f2(x):
    +            assert type(x) == pd.Series
    +            return x + 10
    +
    +        @udf('int')
    +        def f3(x):
    +            assert type(x) == int
    +            return x + 100
    +
    +        @pandas_udf('int')
    +        def f4(x):
    +            assert type(x) == pd.Series
    +            return x + 1000
    +
    +        # Test mixed udfs in a single projection
    +        df1 = df \
    +            .withColumn('f1', f1(col('v'))) \
    +            .withColumn('f2', f2(col('v'))) \
    +            .withColumn('f3', f3(col('v'))) \
    +            .withColumn('f4', f4(col('v'))) \
    +            .withColumn('f2_f1', f2(col('f1'))) \
    +            .withColumn('f3_f1', f3(col('f1'))) \
    +            .withColumn('f4_f1', f4(col('f1'))) \
    +            .withColumn('f3_f2', f3(col('f2'))) \
    +            .withColumn('f4_f2', f4(col('f2'))) \
    +            .withColumn('f4_f3', f4(col('f3'))) \
    +            .withColumn('f3_f2_f1', f3(col('f2_f1'))) \
    +            .withColumn('f4_f2_f1', f4(col('f2_f1'))) \
    +            .withColumn('f4_f3_f1', f4(col('f3_f1'))) \
    +            .withColumn('f4_f3_f2', f4(col('f3_f2'))) \
    +            .withColumn('f4_f3_f2_f1', f4(col('f3_f2_f1')))
    +
    +        # Test mixed udfs in a single expression
    +        df2 = df \
    +            .withColumn('f1', f1(col('v'))) \
    +            .withColumn('f2', f2(col('v'))) \
    +            .withColumn('f3', f3(col('v'))) \
    +            .withColumn('f4', f4(col('v'))) \
    +            .withColumn('f2_f1', f2(f1(col('v')))) \
    +            .withColumn('f3_f1', f3(f1(col('v')))) \
    +            .withColumn('f4_f1', f4(f1(col('v')))) \
    +            .withColumn('f3_f2', f3(f2(col('v')))) \
    +            .withColumn('f4_f2', f4(f2(col('v')))) \
    +            .withColumn('f4_f3', f4(f3(col('v')))) \
    +            .withColumn('f3_f2_f1', f3(f2(f1(col('v'))))) \
    +            .withColumn('f4_f2_f1', f4(f2(f1(col('v'))))) \
    +            .withColumn('f4_f3_f1', f4(f3(f1(col('v'))))) \
    +            .withColumn('f4_f3_f2', f4(f3(f2(col('v'))))) \
    +            .withColumn('f4_f3_f2_f1', f4(f3(f2(f1(col('v'))))))
    +
    +        # expected result
    +        df3 = df \
    +            .withColumn('f1', df['v'] + 1) \
    +            .withColumn('f2', df['v'] + 10) \
    +            .withColumn('f3', df['v'] + 100) \
    +            .withColumn('f4', df['v'] + 1000) \
    +            .withColumn('f2_f1', df['v'] + 11) \
    +            .withColumn('f3_f1', df['v'] + 101) \
    +            .withColumn('f4_f1', df['v'] + 1001) \
    +            .withColumn('f3_f2', df['v'] + 110) \
    +            .withColumn('f4_f2', df['v'] + 1010) \
    +            .withColumn('f4_f3', df['v'] + 1100) \
    +            .withColumn('f3_f2_f1', df['v'] + 111) \
    +            .withColumn('f4_f2_f1', df['v'] + 1011) \
    +            .withColumn('f4_f3_f1', df['v'] + 1101) \
    +            .withColumn('f4_f3_f2', df['v'] + 1110) \
    +            .withColumn('f4_f3_f2_f1', df['v'] + 1111)
    +
    +        self.assertEquals(df3.collect(), df1.collect())
    +        self.assertEquals(df3.collect(), df2.collect())
    +
    +    def test_mixed_udf_and_sql(self):
    +        import pandas as pd
    +        from pyspark.sql.functions import udf, pandas_udf
    +
    +        df = self.spark.range(0, 1).toDF('v')
    +
    +        # Test mixture of UDFs, Pandas UDFs and SQL expression.
    +
    +        @udf('int')
    +        def f1(x):
    +            assert type(x) == int
    +            return x + 1
    +
    +        def f2(x):
    +            return x + 10
    +
    +        @pandas_udf('int')
    +        def f3(x):
    +            assert type(x) == pd.Series
    +            return x + 100
    +
    +        df1 = df.withColumn('f1', f1(df['v'])) \
    +            .withColumn('f2', f2(df['v'])) \
    +            .withColumn('f3', f3(df['v'])) \
    +            .withColumn('f1_f2', f1(f2(df['v']))) \
    +            .withColumn('f1_f3', f1(f3(df['v']))) \
    +            .withColumn('f2_f1', f2(f1(df['v']))) \
    +            .withColumn('f2_f3', f2(f3(df['v']))) \
    +            .withColumn('f3_f1', f3(f1(df['v']))) \
    --- End diff --
    
    Yea.. I know it's still minor since the elapsed time will be virtually the same but recently the build / test time was an issue, and I wonder if there's better way then avoding duplicated tests for now..


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/21650


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21650#discussion_r199048769
  
    --- Diff: python/pyspark/sql/tests.py ---
    @@ -5060,6 +5049,138 @@ def test_type_annotation(self):
             df = self.spark.range(1).select(pandas_udf(f=_locals['noop'], returnType='bigint')('id'))
             self.assertEqual(df.first()[0], 0)
     
    +    def test_mixed_udf(self):
    +        import pandas as pd
    +        from pyspark.sql.functions import udf, pandas_udf
    +
    +        df = self.spark.range(0, 1).toDF('v')
    +
    +        @udf('int')
    +        def f1(x):
    +            assert type(x) == int
    +            return x + 1
    +
    +        @pandas_udf('int')
    +        def f2(x):
    +            assert type(x) == pd.Series
    +            return x + 10
    +
    +        @udf('int')
    +        def f3(x):
    +            assert type(x) == int
    +            return x + 100
    +
    +        @pandas_udf('int')
    +        def f4(x):
    +            assert type(x) == pd.Series
    +            return x + 1000
    +
    +        # Test mixed udfs in a single projection
    +        df1 = df.withColumn('f1', f1(df['v']))
    +        df1 = df1.withColumn('f2', f2(df1['v']))
    +        df1 = df1.withColumn('f3', f3(df1['v']))
    +        df1 = df1.withColumn('f4', f4(df1['v']))
    +        df1 = df1.withColumn('f2_f1', f2(df1['f1']))
    +        df1 = df1.withColumn('f3_f1', f3(df1['f1']))
    +        df1 = df1.withColumn('f4_f1', f4(df1['f1']))
    +        df1 = df1.withColumn('f3_f2', f3(df1['f2']))
    +        df1 = df1.withColumn('f4_f2', f4(df1['f2']))
    +        df1 = df1.withColumn('f4_f3', f4(df1['f3']))
    +        df1 = df1.withColumn('f3_f2_f1', f3(df1['f2_f1']))
    +        df1 = df1.withColumn('f4_f2_f1', f4(df1['f2_f1']))
    +        df1 = df1.withColumn('f4_f3_f1', f4(df1['f3_f1']))
    +        df1 = df1.withColumn('f4_f3_f2', f4(df1['f3_f2']))
    +        df1 = df1.withColumn('f4_f3_f2_f1', f4(df1['f3_f2_f1']))
    +
    +        # Test mixed udfs in a single expression
    +        df2 = df.withColumn('f1', f1(df['v']))
    +        df2 = df2.withColumn('f2', f2(df['v']))
    +        df2 = df2.withColumn('f3', f3(df['v']))
    +        df2 = df2.withColumn('f4', f4(df['v']))
    +        df2 = df2.withColumn('f2_f1', f2(f1(df['v'])))
    +        df2 = df2.withColumn('f3_f1', f3(f1(df['v'])))
    +        df2 = df2.withColumn('f4_f1', f4(f1(df['v'])))
    +        df2 = df2.withColumn('f3_f2', f3(f2(df['v'])))
    +        df2 = df2.withColumn('f4_f2', f4(f2(df['v'])))
    +        df2 = df2.withColumn('f4_f3', f4(f3(df['v'])))
    +        df2 = df2.withColumn('f3_f2_f1', f3(f2(f1(df['v']))))
    +        df2 = df2.withColumn('f4_f2_f1', f4(f2(f1(df['v']))))
    +        df2 = df2.withColumn('f4_f3_f1', f4(f3(f1(df['v']))))
    +        df2 = df2.withColumn('f4_f3_f2', f4(f3(f2(df['v']))))
    +        df2 = df2.withColumn('f4_f3_f2_f1', f4(f3(f2(f1(df['v'])))))
    +
    +        df3 = df.withColumn('f1', df['v'] + 1)
    +        df3 = df3.withColumn('f2', df['v'] + 10)
    +        df3 = df3.withColumn('f3', df['v'] + 100)
    +        df3 = df3.withColumn('f4', df['v'] + 1000)
    +        df3 = df3.withColumn('f2_f1', df['v'] + 11)
    +        df3 = df3.withColumn('f3_f1', df['v'] + 101)
    +        df3 = df3.withColumn('f4_f1', df['v'] + 1001)
    +        df3 = df3.withColumn('f3_f2', df['v'] + 110)
    +        df3 = df3.withColumn('f4_f2', df['v'] + 1010)
    +        df3 = df3.withColumn('f4_f3', df['v'] + 1100)
    +        df3 = df3.withColumn('f3_f2_f1', df['v'] + 111)
    +        df3 = df3.withColumn('f4_f2_f1', df['v'] + 1011)
    +        df3 = df3.withColumn('f4_f3_f1', df['v'] + 1101)
    +        df3 = df3.withColumn('f4_f3_f2', df['v'] + 1110)
    +        df3 = df3.withColumn('f4_f3_f2_f1', df['v'] + 1111)
    +
    +        self.assertEquals(df3.collect(), df1.collect())
    +        self.assertEquals(df3.collect(), df2.collect())
    +
    +    def test_mixed_udf_and_sql(self):
    +        import pandas as pd
    +        from pyspark.sql.functions import udf, pandas_udf
    +
    +        df = self.spark.range(0, 1).toDF('v')
    +
    +        @udf('int')
    +        def f1(x):
    +            assert type(x) == int
    +            return x + 1
    +
    +        def f2(x):
    --- End diff --
    
    Seems like this is neither `@udf` nor `@pandas_udf`, is it on purpose? If so, could you add a comment to explain why?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    retest please


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21650#discussion_r205187569
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala ---
    @@ -94,36 +95,94 @@ object ExtractPythonUDFFromAggregate extends Rule[LogicalPlan] {
      */
     object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper {
     
    -  private def hasPythonUDF(e: Expression): Boolean = {
    +  private case class LazyEvalType(var evalType: Int = -1) {
    +
    +    def isSet: Boolean = evalType >= 0
    +
    +    def set(evalType: Int): Unit = {
    +      if (isSet) {
    +        throw new IllegalStateException("Eval type has already been set")
    +      } else {
    +        this.evalType = evalType
    +      }
    +    }
    +
    +    def get(): Int = {
    +      if (!isSet) {
    +        throw new IllegalStateException("Eval type is not set")
    +      } else {
    +        evalType
    +      }
    +    }
    +  }
    +
    +  private def hasScalarPythonUDF(e: Expression): Boolean = {
         e.find(PythonUDF.isScalarPythonUDF).isDefined
       }
     
    -  private def canEvaluateInPython(e: PythonUDF): Boolean = {
    -    e.children match {
    -      // single PythonUDF child could be chained and evaluated in Python
    -      case Seq(u: PythonUDF) => canEvaluateInPython(u)
    -      // Python UDF can't be evaluated directly in JVM
    -      case children => !children.exists(hasPythonUDF)
    +  /**
    +   * Check whether a PythonUDF expression can be evaluated in Python.
    +   *
    +   * If the lazy eval type is not set, this method checks for either Batched Python UDF and Scalar
    +   * Pandas UDF. If the lazy eval type is set, this method checks for the expression of the
    +   * specified eval type.
    +   *
    +   * This method will also set the lazy eval type to be the type of the first evaluable expression,
    +   * i.e., if lazy eval type is not set and we find a evaluable Python UDF expression, lazy eval
    +   * type will be set to the eval type of the expression.
    +   *
    +   */
    +  private def canEvaluateInPython(e: PythonUDF, lazyEvalType: LazyEvalType): Boolean = {
    --- End diff --
    
    The one method seems overly complicated, so I prefer the code from my suggestion.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21650#discussion_r199167359
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala ---
    @@ -166,8 +190,9 @@ object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper {
                   ArrowEvalPythonExec(vectorizedUdfs, child.output ++ resultAttrs, child)
                 case (vectorizedUdfs, plainUdfs) if vectorizedUdfs.isEmpty =>
                   BatchEvalPythonExec(plainUdfs, child.output ++ resultAttrs, child)
    -            case _ =>
    -              throw new IllegalArgumentException("Can not mix vectorized and non-vectorized UDFs")
    +            case (vectorizedUdfs, plainUdfs) =>
    --- End diff --
    
    Oh yes, let me revert.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/93667/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    I'm okay with https://github.com/apache/spark/pull/21650#issuecomment-407506043's way too but should be really simplified. Either way LGTM.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    Thanks @HyukjinKwon @BryanCutler for the review!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624] Support mixture of Python UDF and Scalar P...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/92401/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21650#discussion_r199026077
  
    --- Diff: python/pyspark/sql/tests.py ---
    @@ -5060,6 +5049,138 @@ def test_type_annotation(self):
             df = self.spark.range(1).select(pandas_udf(f=_locals['noop'], returnType='bigint')('id'))
             self.assertEqual(df.first()[0], 0)
     
    +    def test_mixed_udf(self):
    +        import pandas as pd
    +        from pyspark.sql.functions import udf, pandas_udf
    +
    +        df = self.spark.range(0, 1).toDF('v')
    +
    +        @udf('int')
    +        def f1(x):
    +            assert type(x) == int
    +            return x + 1
    +
    +        @pandas_udf('int')
    +        def f2(x):
    +            assert type(x) == pd.Series
    +            return x + 10
    +
    +        @udf('int')
    +        def f3(x):
    +            assert type(x) == int
    +            return x + 100
    +
    +        @pandas_udf('int')
    +        def f4(x):
    +            assert type(x) == pd.Series
    +            return x + 1000
    +
    +        # Test mixed udfs in a single projection
    +        df1 = df.withColumn('f1', f1(df['v']))
    +        df1 = df1.withColumn('f2', f2(df1['v']))
    +        df1 = df1.withColumn('f3', f3(df1['v']))
    +        df1 = df1.withColumn('f4', f4(df1['v']))
    +        df1 = df1.withColumn('f2_f1', f2(df1['f1']))
    +        df1 = df1.withColumn('f3_f1', f3(df1['f1']))
    +        df1 = df1.withColumn('f4_f1', f4(df1['f1']))
    +        df1 = df1.withColumn('f3_f2', f3(df1['f2']))
    +        df1 = df1.withColumn('f4_f2', f4(df1['f2']))
    +        df1 = df1.withColumn('f4_f3', f4(df1['f3']))
    +        df1 = df1.withColumn('f3_f2_f1', f3(df1['f2_f1']))
    +        df1 = df1.withColumn('f4_f2_f1', f4(df1['f2_f1']))
    +        df1 = df1.withColumn('f4_f3_f1', f4(df1['f3_f1']))
    +        df1 = df1.withColumn('f4_f3_f2', f4(df1['f3_f2']))
    +        df1 = df1.withColumn('f4_f3_f2_f1', f4(df1['f3_f2_f1']))
    +
    +        # Test mixed udfs in a single expression
    +        df2 = df.withColumn('f1', f1(df['v']))
    +        df2 = df2.withColumn('f2', f2(df['v']))
    +        df2 = df2.withColumn('f3', f3(df['v']))
    +        df2 = df2.withColumn('f4', f4(df['v']))
    +        df2 = df2.withColumn('f2_f1', f2(f1(df['v'])))
    +        df2 = df2.withColumn('f3_f1', f3(f1(df['v'])))
    +        df2 = df2.withColumn('f4_f1', f4(f1(df['v'])))
    +        df2 = df2.withColumn('f3_f2', f3(f2(df['v'])))
    +        df2 = df2.withColumn('f4_f2', f4(f2(df['v'])))
    +        df2 = df2.withColumn('f4_f3', f4(f3(df['v'])))
    +        df2 = df2.withColumn('f3_f2_f1', f3(f2(f1(df['v']))))
    +        df2 = df2.withColumn('f4_f2_f1', f4(f2(f1(df['v']))))
    +        df2 = df2.withColumn('f4_f3_f1', f4(f3(f1(df['v']))))
    +        df2 = df2.withColumn('f4_f3_f2', f4(f3(f2(df['v']))))
    +        df2 = df2.withColumn('f4_f3_f2_f1', f4(f3(f2(f1(df['v'])))))
    +
    +        df3 = df.withColumn('f1', df['v'] + 1)
    +        df3 = df3.withColumn('f2', df['v'] + 10)
    +        df3 = df3.withColumn('f3', df['v'] + 100)
    +        df3 = df3.withColumn('f4', df['v'] + 1000)
    +        df3 = df3.withColumn('f2_f1', df['v'] + 11)
    +        df3 = df3.withColumn('f3_f1', df['v'] + 101)
    +        df3 = df3.withColumn('f4_f1', df['v'] + 1001)
    +        df3 = df3.withColumn('f3_f2', df['v'] + 110)
    +        df3 = df3.withColumn('f4_f2', df['v'] + 1010)
    +        df3 = df3.withColumn('f4_f3', df['v'] + 1100)
    +        df3 = df3.withColumn('f3_f2_f1', df['v'] + 111)
    +        df3 = df3.withColumn('f4_f2_f1', df['v'] + 1011)
    +        df3 = df3.withColumn('f4_f3_f1', df['v'] + 1101)
    +        df3 = df3.withColumn('f4_f3_f2', df['v'] + 1110)
    +        df3 = df3.withColumn('f4_f3_f2_f1', df['v'] + 1111)
    --- End diff --
    
    That's right. I can add a comment to make it clearer.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624] Support mixture of Python UDF and Scalar P...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/92400/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21650#discussion_r202865865
  
    --- Diff: python/pyspark/sql/tests.py ---
    @@ -5471,6 +5598,22 @@ def foo(_):
                     self.assertEqual(r.a, 'hi')
                     self.assertEqual(r.b, 1)
     
    +    def test_mixed_udf(self):
    --- End diff --
    
    `test_mixed_udf` -> `test_mixed_scalar_udfs_followed_by_grouby_apply`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...

Posted by ueshin <gi...@git.apache.org>.
Github user ueshin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21650#discussion_r199051291
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala ---
    @@ -97,6 +103,64 @@ class BatchEvalPythonExecSuite extends SparkPlanTest with SharedSQLContext {
         }
         assert(qualifiedPlanNodes.size == 1)
       }
    +
    +  private def collectPythonExec(spark: SparkPlan): Seq[BatchEvalPythonExec] = spark.collect {
    --- End diff --
    
    `plan` would be better than `spark`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21650#discussion_r199026841
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala ---
    @@ -94,36 +95,59 @@ object ExtractPythonUDFFromAggregate extends Rule[LogicalPlan] {
      */
     object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper {
     
    -  private def hasPythonUDF(e: Expression): Boolean = {
    +  private def hasScalarPythonUDF(e: Expression): Boolean = {
         e.find(PythonUDF.isScalarPythonUDF).isDefined
       }
     
    -  private def canEvaluateInPython(e: PythonUDF): Boolean = {
    -    e.children match {
    -      // single PythonUDF child could be chained and evaluated in Python
    -      case Seq(u: PythonUDF) => canEvaluateInPython(u)
    -      // Python UDF can't be evaluated directly in JVM
    -      case children => !children.exists(hasPythonUDF)
    +  private def canEvaluateInPython(e: PythonUDF, evalType: Int): Boolean = {
    +    if (e.evalType != evalType) {
    +      false
    +    } else {
    +      e.children match {
    +        // single PythonUDF child could be chained and evaluated in Python
    +        case Seq(u: PythonUDF) => canEvaluateInPython(u, evalType)
    +        // Python UDF can't be evaluated directly in JVM
    +        case children => !children.exists(hasScalarPythonUDF)
    +      }
         }
       }
     
    -  private def collectEvaluatableUDF(expr: Expression): Seq[PythonUDF] = expr match {
    -    case udf: PythonUDF if PythonUDF.isScalarPythonUDF(udf) && canEvaluateInPython(udf) => Seq(udf)
    -    case e => e.children.flatMap(collectEvaluatableUDF)
    +  private def collectEvaluableUDF(expr: Expression, evalType: Int): Seq[PythonUDF] = expr match {
    +    case udf: PythonUDF if PythonUDF.isScalarPythonUDF(udf) && canEvaluateInPython(udf, evalType) =>
    +      Seq(udf)
    +    case e => e.children.flatMap(collectEvaluableUDF(_, evalType))
    +  }
    +
    +  /**
    +   * Collect evaluable UDFs from the current node.
    +   *
    +   * This function collects Python UDFs or Scalar Python UDFs from expressions of the input node,
    +   * and returns a list of UDFs of the same eval type.
    +   *
    +   * If expressions contain both UDFs eval types, this function will only return Python UDFs.
    +   *
    +   * The caller should call this function multiple times until all evaluable UDFs are collected.
    --- End diff --
    
    That's correct. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    **[Test build #92482 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92482/testReport)** for PR 21650 at commit [`ce5e7f5`](https://github.com/apache/spark/commit/ce5e7f53cff3c5657fe2e99f2f2a57176d009cce).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/93668/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...

Posted by gatorsmile <gi...@git.apache.org>.
Github user gatorsmile commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    ping @BryanCutler Any update about this PR?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21650#discussion_r202863696
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala ---
    @@ -97,6 +103,64 @@ class BatchEvalPythonExecSuite extends SparkPlanTest with SharedSQLContext {
         }
         assert(qualifiedPlanNodes.size == 1)
       }
    +
    +  private def collectPythonExec(plan: SparkPlan): Seq[BatchEvalPythonExec] = plan.collect {
    --- End diff --
    
    rename to `collectBatchExec`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624] Support mixture of Python UDF and Scalar P...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21650#discussion_r205262719
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala ---
    @@ -94,36 +95,94 @@ object ExtractPythonUDFFromAggregate extends Rule[LogicalPlan] {
      */
     object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper {
     
    -  private def hasPythonUDF(e: Expression): Boolean = {
    +  private case class LazyEvalType(var evalType: Int = -1) {
    +
    +    def isSet: Boolean = evalType >= 0
    +
    +    def set(evalType: Int): Unit = {
    +      if (isSet) {
    +        throw new IllegalStateException("Eval type has already been set")
    +      } else {
    +        this.evalType = evalType
    +      }
    +    }
    +
    +    def get(): Int = {
    +      if (!isSet) {
    +        throw new IllegalStateException("Eval type is not set")
    +      } else {
    +        evalType
    +      }
    +    }
    +  }
    +
    +  private def hasScalarPythonUDF(e: Expression): Boolean = {
         e.find(PythonUDF.isScalarPythonUDF).isDefined
       }
     
    -  private def canEvaluateInPython(e: PythonUDF): Boolean = {
    -    e.children match {
    -      // single PythonUDF child could be chained and evaluated in Python
    -      case Seq(u: PythonUDF) => canEvaluateInPython(u)
    -      // Python UDF can't be evaluated directly in JVM
    -      case children => !children.exists(hasPythonUDF)
    +  /**
    +   * Check whether a PythonUDF expression can be evaluated in Python.
    +   *
    +   * If the lazy eval type is not set, this method checks for either Batched Python UDF and Scalar
    +   * Pandas UDF. If the lazy eval type is set, this method checks for the expression of the
    +   * specified eval type.
    +   *
    +   * This method will also set the lazy eval type to be the type of the first evaluable expression,
    +   * i.e., if lazy eval type is not set and we find a evaluable Python UDF expression, lazy eval
    +   * type will be set to the eval type of the expression.
    +   *
    +   */
    +  private def canEvaluateInPython(e: PythonUDF, lazyEvalType: LazyEvalType): Boolean = {
    --- End diff --
    
    Bryan, I tried to apply your implementation and the simple test also fail:
    
    ```
    @udf('int')
    def f1(x):
        assert type(x) == int
        return x + 1
    
    @pandas_udf('int')
    def f2(x):
        assert type(x) == pd.Series
        return x + 10
    
    df_chained_1 = df.withColumn('f2_f1', f2(f1(df['v'])))
    expected_chained_1 = df.withColumn('f2_f1', df['v'] + 11)
    self.assertEquals(expected_chained_1.collect(), df_chained_1.collect())
    ```
    
    Do you mind trying this too? Hopefully I didn't do something silly here..


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624] Support mixture of Python UDF and Scalar P...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    This PR takes me a while to get to because I am not very familiar with Catalyst rules. I think in the end the change is relative simple but I would appreciate some more careful review from people that are familiar with Catalyst.
    
    cc @BryanCutler @gatorsmile @HyukjinKwon @ueshin 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21650#discussion_r205025311
  
    --- Diff: python/pyspark/sql/tests.py ---
    @@ -5060,6 +5049,147 @@ def test_type_annotation(self):
             df = self.spark.range(1).select(pandas_udf(f=_locals['noop'], returnType='bigint')('id'))
             self.assertEqual(df.first()[0], 0)
     
    +    def test_mixed_udf(self):
    +        import pandas as pd
    +        from pyspark.sql.functions import col, udf, pandas_udf
    +
    +        df = self.spark.range(0, 1).toDF('v')
    +
    +        # Test mixture of multiple UDFs and Pandas UDFs
    +
    +        @udf('int')
    +        def f1(x):
    +            assert type(x) == int
    +            return x + 1
    +
    +        @pandas_udf('int')
    +        def f2(x):
    +            assert type(x) == pd.Series
    +            return x + 10
    +
    +        @udf('int')
    +        def f3(x):
    +            assert type(x) == int
    +            return x + 100
    +
    +        @pandas_udf('int')
    +        def f4(x):
    +            assert type(x) == pd.Series
    +            return x + 1000
    +
    +        # Test mixed udfs in a single projection
    +        df1 = df \
    +            .withColumn('f1', f1(col('v'))) \
    +            .withColumn('f2', f2(col('v'))) \
    +            .withColumn('f3', f3(col('v'))) \
    +            .withColumn('f4', f4(col('v'))) \
    +            .withColumn('f2_f1', f2(col('f1'))) \
    +            .withColumn('f3_f1', f3(col('f1'))) \
    +            .withColumn('f4_f1', f4(col('f1'))) \
    +            .withColumn('f3_f2', f3(col('f2'))) \
    +            .withColumn('f4_f2', f4(col('f2'))) \
    +            .withColumn('f4_f3', f4(col('f3'))) \
    +            .withColumn('f3_f2_f1', f3(col('f2_f1'))) \
    +            .withColumn('f4_f2_f1', f4(col('f2_f1'))) \
    +            .withColumn('f4_f3_f1', f4(col('f3_f1'))) \
    +            .withColumn('f4_f3_f2', f4(col('f3_f2'))) \
    +            .withColumn('f4_f3_f2_f1', f4(col('f3_f2_f1')))
    +
    +        # Test mixed udfs in a single expression
    +        df2 = df \
    +            .withColumn('f1', f1(col('v'))) \
    +            .withColumn('f2', f2(col('v'))) \
    +            .withColumn('f3', f3(col('v'))) \
    +            .withColumn('f4', f4(col('v'))) \
    +            .withColumn('f2_f1', f2(f1(col('v')))) \
    +            .withColumn('f3_f1', f3(f1(col('v')))) \
    +            .withColumn('f4_f1', f4(f1(col('v')))) \
    +            .withColumn('f3_f2', f3(f2(col('v')))) \
    +            .withColumn('f4_f2', f4(f2(col('v')))) \
    +            .withColumn('f4_f3', f4(f3(col('v')))) \
    +            .withColumn('f3_f2_f1', f3(f2(f1(col('v'))))) \
    +            .withColumn('f4_f2_f1', f4(f2(f1(col('v'))))) \
    +            .withColumn('f4_f3_f1', f4(f3(f1(col('v'))))) \
    +            .withColumn('f4_f3_f2', f4(f3(f2(col('v'))))) \
    +            .withColumn('f4_f3_f2_f1', f4(f3(f2(f1(col('v'))))))
    +
    +        # expected result
    +        df3 = df \
    +            .withColumn('f1', df['v'] + 1) \
    +            .withColumn('f2', df['v'] + 10) \
    +            .withColumn('f3', df['v'] + 100) \
    +            .withColumn('f4', df['v'] + 1000) \
    +            .withColumn('f2_f1', df['v'] + 11) \
    +            .withColumn('f3_f1', df['v'] + 101) \
    +            .withColumn('f4_f1', df['v'] + 1001) \
    +            .withColumn('f3_f2', df['v'] + 110) \
    +            .withColumn('f4_f2', df['v'] + 1010) \
    +            .withColumn('f4_f3', df['v'] + 1100) \
    +            .withColumn('f3_f2_f1', df['v'] + 111) \
    +            .withColumn('f4_f2_f1', df['v'] + 1011) \
    +            .withColumn('f4_f3_f1', df['v'] + 1101) \
    +            .withColumn('f4_f3_f2', df['v'] + 1110) \
    +            .withColumn('f4_f3_f2_f1', df['v'] + 1111)
    +
    +        self.assertEquals(df3.collect(), df1.collect())
    +        self.assertEquals(df3.collect(), df2.collect())
    +
    +    def test_mixed_udf_and_sql(self):
    +        import pandas as pd
    +        from pyspark.sql.functions import udf, pandas_udf
    +
    +        df = self.spark.range(0, 1).toDF('v')
    +
    +        # Test mixture of UDFs, Pandas UDFs and SQL expression.
    +
    +        @udf('int')
    +        def f1(x):
    +            assert type(x) == int
    +            return x + 1
    +
    +        def f2(x):
    +            return x + 10
    +
    +        @pandas_udf('int')
    +        def f3(x):
    +            assert type(x) == pd.Series
    +            return x + 100
    +
    +        df1 = df.withColumn('f1', f1(df['v'])) \
    +            .withColumn('f2', f2(df['v'])) \
    +            .withColumn('f3', f3(df['v'])) \
    +            .withColumn('f1_f2', f1(f2(df['v']))) \
    +            .withColumn('f1_f3', f1(f3(df['v']))) \
    +            .withColumn('f2_f1', f2(f1(df['v']))) \
    +            .withColumn('f2_f3', f2(f3(df['v']))) \
    +            .withColumn('f3_f1', f3(f1(df['v']))) \
    --- End diff --
    
    Looks combination between f1 and f3 duplicating few tests in `test_mixed_udf`, for instance `f4_f3`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21650#discussion_r205255482
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala ---
    @@ -94,36 +95,94 @@ object ExtractPythonUDFFromAggregate extends Rule[LogicalPlan] {
      */
     object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper {
     
    -  private def hasPythonUDF(e: Expression): Boolean = {
    +  private case class LazyEvalType(var evalType: Int = -1) {
    +
    +    def isSet: Boolean = evalType >= 0
    +
    +    def set(evalType: Int): Unit = {
    +      if (isSet) {
    +        throw new IllegalStateException("Eval type has already been set")
    +      } else {
    +        this.evalType = evalType
    +      }
    +    }
    +
    +    def get(): Int = {
    +      if (!isSet) {
    +        throw new IllegalStateException("Eval type is not set")
    +      } else {
    +        evalType
    +      }
    +    }
    +  }
    +
    +  private def hasScalarPythonUDF(e: Expression): Boolean = {
         e.find(PythonUDF.isScalarPythonUDF).isDefined
       }
     
    -  private def canEvaluateInPython(e: PythonUDF): Boolean = {
    -    e.children match {
    -      // single PythonUDF child could be chained and evaluated in Python
    -      case Seq(u: PythonUDF) => canEvaluateInPython(u)
    -      // Python UDF can't be evaluated directly in JVM
    -      case children => !children.exists(hasPythonUDF)
    +  /**
    +   * Check whether a PythonUDF expression can be evaluated in Python.
    +   *
    +   * If the lazy eval type is not set, this method checks for either Batched Python UDF and Scalar
    +   * Pandas UDF. If the lazy eval type is set, this method checks for the expression of the
    +   * specified eval type.
    +   *
    +   * This method will also set the lazy eval type to be the type of the first evaluable expression,
    +   * i.e., if lazy eval type is not set and we find a evaluable Python UDF expression, lazy eval
    +   * type will be set to the eval type of the expression.
    +   *
    +   */
    +  private def canEvaluateInPython(e: PythonUDF, lazyEvalType: LazyEvalType): Boolean = {
    --- End diff --
    
    I'm not sure I follow how this could get wrong results.  `firstEvalType.evalType = e.evalType` is called only if the eval type is not set or if it is set and it equals the current eval type.  In the latter case, it does assign the same value again, but that's fine.  If there is some case that this fails, can you add that as a test?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21650#discussion_r202865230
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala ---
    @@ -23,21 +23,27 @@ import scala.collection.mutable.ArrayBuffer
     import org.apache.spark.api.python.{PythonEvalType, PythonFunction}
     import org.apache.spark.sql.catalyst.FunctionIdentifier
     import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, GreaterThan, In}
    -import org.apache.spark.sql.execution.{FilterExec, InputAdapter, SparkPlanTest, WholeStageCodegenExec}
    +import org.apache.spark.sql.execution._
    +import org.apache.spark.sql.functions.col
     import org.apache.spark.sql.test.SharedSQLContext
     import org.apache.spark.sql.types.BooleanType
     
     class BatchEvalPythonExecSuite extends SparkPlanTest with SharedSQLContext {
    --- End diff --
    
    I don't think your tests should be in this suite since it is just for `BatchEvalPythonExec`.  How about `ExtractPythonUDFsSuite`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624] Support mixture of Python UDF and Scalar P...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    **[Test build #92400 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92400/testReport)** for PR 21650 at commit [`6b47b69`](https://github.com/apache/spark/commit/6b47b69305257e9ee9f5135968913a4f92731ef5).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21650#discussion_r205866645
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala ---
    @@ -94,36 +95,61 @@ object ExtractPythonUDFFromAggregate extends Rule[LogicalPlan] {
      */
     object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper {
     
    -  private def hasPythonUDF(e: Expression): Boolean = {
    +  private case class EvalTypeHolder(private var evalType: Int = -1) {
    --- End diff --
    
    Ok, I will update the code then.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21650#discussion_r205445392
  
    --- Diff: python/pyspark/sql/tests.py ---
    @@ -5060,6 +5049,147 @@ def test_type_annotation(self):
             df = self.spark.range(1).select(pandas_udf(f=_locals['noop'], returnType='bigint')('id'))
             self.assertEqual(df.first()[0], 0)
     
    +    def test_mixed_udf(self):
    +        import pandas as pd
    +        from pyspark.sql.functions import col, udf, pandas_udf
    +
    +        df = self.spark.range(0, 1).toDF('v')
    +
    +        # Test mixture of multiple UDFs and Pandas UDFs
    +
    +        @udf('int')
    +        def f1(x):
    +            assert type(x) == int
    +            return x + 1
    +
    +        @pandas_udf('int')
    +        def f2(x):
    +            assert type(x) == pd.Series
    +            return x + 10
    +
    +        @udf('int')
    +        def f3(x):
    +            assert type(x) == int
    +            return x + 100
    +
    +        @pandas_udf('int')
    +        def f4(x):
    +            assert type(x) == pd.Series
    +            return x + 1000
    +
    +        # Test mixed udfs in a single projection
    +        df1 = df \
    +            .withColumn('f1', f1(col('v'))) \
    +            .withColumn('f2', f2(col('v'))) \
    +            .withColumn('f3', f3(col('v'))) \
    +            .withColumn('f4', f4(col('v'))) \
    +            .withColumn('f2_f1', f2(col('f1'))) \
    +            .withColumn('f3_f1', f3(col('f1'))) \
    +            .withColumn('f4_f1', f4(col('f1'))) \
    +            .withColumn('f3_f2', f3(col('f2'))) \
    +            .withColumn('f4_f2', f4(col('f2'))) \
    +            .withColumn('f4_f3', f4(col('f3'))) \
    +            .withColumn('f3_f2_f1', f3(col('f2_f1'))) \
    +            .withColumn('f4_f2_f1', f4(col('f2_f1'))) \
    +            .withColumn('f4_f3_f1', f4(col('f3_f1'))) \
    +            .withColumn('f4_f3_f2', f4(col('f3_f2'))) \
    +            .withColumn('f4_f3_f2_f1', f4(col('f3_f2_f1')))
    +
    +        # Test mixed udfs in a single expression
    +        df2 = df \
    +            .withColumn('f1', f1(col('v'))) \
    +            .withColumn('f2', f2(col('v'))) \
    +            .withColumn('f3', f3(col('v'))) \
    +            .withColumn('f4', f4(col('v'))) \
    +            .withColumn('f2_f1', f2(f1(col('v')))) \
    +            .withColumn('f3_f1', f3(f1(col('v')))) \
    +            .withColumn('f4_f1', f4(f1(col('v')))) \
    +            .withColumn('f3_f2', f3(f2(col('v')))) \
    +            .withColumn('f4_f2', f4(f2(col('v')))) \
    +            .withColumn('f4_f3', f4(f3(col('v')))) \
    +            .withColumn('f3_f2_f1', f3(f2(f1(col('v'))))) \
    +            .withColumn('f4_f2_f1', f4(f2(f1(col('v'))))) \
    +            .withColumn('f4_f3_f1', f4(f3(f1(col('v'))))) \
    +            .withColumn('f4_f3_f2', f4(f3(f2(col('v'))))) \
    +            .withColumn('f4_f3_f2_f1', f4(f3(f2(f1(col('v'))))))
    +
    +        # expected result
    +        df3 = df \
    +            .withColumn('f1', df['v'] + 1) \
    +            .withColumn('f2', df['v'] + 10) \
    +            .withColumn('f3', df['v'] + 100) \
    +            .withColumn('f4', df['v'] + 1000) \
    +            .withColumn('f2_f1', df['v'] + 11) \
    +            .withColumn('f3_f1', df['v'] + 101) \
    +            .withColumn('f4_f1', df['v'] + 1001) \
    +            .withColumn('f3_f2', df['v'] + 110) \
    +            .withColumn('f4_f2', df['v'] + 1010) \
    +            .withColumn('f4_f3', df['v'] + 1100) \
    +            .withColumn('f3_f2_f1', df['v'] + 111) \
    +            .withColumn('f4_f2_f1', df['v'] + 1011) \
    +            .withColumn('f4_f3_f1', df['v'] + 1101) \
    +            .withColumn('f4_f3_f2', df['v'] + 1110) \
    +            .withColumn('f4_f3_f2_f1', df['v'] + 1111)
    +
    +        self.assertEquals(df3.collect(), df1.collect())
    +        self.assertEquals(df3.collect(), df2.collect())
    +
    +    def test_mixed_udf_and_sql(self):
    +        import pandas as pd
    +        from pyspark.sql.functions import udf, pandas_udf
    +
    +        df = self.spark.range(0, 1).toDF('v')
    +
    +        # Test mixture of UDFs, Pandas UDFs and SQL expression.
    +
    +        @udf('int')
    +        def f1(x):
    +            assert type(x) == int
    +            return x + 1
    +
    +        def f2(x):
    +            return x + 10
    +
    +        @pandas_udf('int')
    +        def f3(x):
    +            assert type(x) == pd.Series
    +            return x + 100
    +
    +        df1 = df.withColumn('f1', f1(df['v'])) \
    +            .withColumn('f2', f2(df['v'])) \
    +            .withColumn('f3', f3(df['v'])) \
    +            .withColumn('f1_f2', f1(f2(df['v']))) \
    +            .withColumn('f1_f3', f1(f3(df['v']))) \
    +            .withColumn('f2_f1', f2(f1(df['v']))) \
    +            .withColumn('f2_f3', f2(f3(df['v']))) \
    +            .withColumn('f3_f1', f3(f1(df['v']))) \
    --- End diff --
    
    Gotcha. I will keep that in mind next time.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    @BryanCutler Thanks for taking a look at this! Yeah I think this works too. Let me update the code and try it. Thanks again!


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    **[Test build #92443 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/92443/testReport)** for PR 21650 at commit [`674e361`](https://github.com/apache/spark/commit/674e36136911839df00635eff8abb3c405e537d4).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21650#discussion_r205185872
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala ---
    @@ -94,36 +95,94 @@ object ExtractPythonUDFFromAggregate extends Rule[LogicalPlan] {
      */
     object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper {
     
    -  private def hasPythonUDF(e: Expression): Boolean = {
    +  private case class LazyEvalType(var evalType: Int = -1) {
    +
    +    def isSet: Boolean = evalType >= 0
    +
    +    def set(evalType: Int): Unit = {
    +      if (isSet) {
    +        throw new IllegalStateException("Eval type has already been set")
    +      } else {
    +        this.evalType = evalType
    +      }
    +    }
    +
    +    def get(): Int = {
    +      if (!isSet) {
    +        throw new IllegalStateException("Eval type is not set")
    +      } else {
    +        evalType
    +      }
    +    }
    +  }
    +
    +  private def hasScalarPythonUDF(e: Expression): Boolean = {
         e.find(PythonUDF.isScalarPythonUDF).isDefined
       }
     
    -  private def canEvaluateInPython(e: PythonUDF): Boolean = {
    -    e.children match {
    -      // single PythonUDF child could be chained and evaluated in Python
    -      case Seq(u: PythonUDF) => canEvaluateInPython(u)
    -      // Python UDF can't be evaluated directly in JVM
    -      case children => !children.exists(hasPythonUDF)
    +  /**
    +   * Check whether a PythonUDF expression can be evaluated in Python.
    +   *
    +   * If the lazy eval type is not set, this method checks for either Batched Python UDF and Scalar
    +   * Pandas UDF. If the lazy eval type is set, this method checks for the expression of the
    +   * specified eval type.
    +   *
    +   * This method will also set the lazy eval type to be the type of the first evaluable expression,
    +   * i.e., if lazy eval type is not set and we find a evaluable Python UDF expression, lazy eval
    +   * type will be set to the eval type of the expression.
    +   *
    +   */
    +  private def canEvaluateInPython(e: PythonUDF, lazyEvalType: LazyEvalType): Boolean = {
    +    if (!lazyEvalType.isSet) {
    +      e.children match {
    +        // single PythonUDF child could be chained and evaluated in Python if eval type is the same
    +        case Seq(u: PythonUDF) =>
    +          // Need to recheck the eval type because lazy eval type will be set if child Python UDF is
    +          // evaluable
    +          canEvaluateInPython(u, lazyEvalType) && lazyEvalType.get == e.evalType
    +        // Python UDF can't be evaluated directly in JVM
    +        case children => if (!children.exists(hasScalarPythonUDF)) {
    +          // We found the first evaluable expression, set lazy eval type to its eval type.
    +          lazyEvalType.set(e.evalType)
    +          true
    +        } else {
    +          false
    +        }
    +      }
    +    } else {
    +      if (e.evalType != lazyEvalType.get) {
    +        false
    +      } else {
    +        e.children match {
    +          case Seq(u: PythonUDF) => canEvaluateInPython(u, lazyEvalType)
    --- End diff --
    
    There are 2 paths for recursion here, which is probably not a good idea.  This method is much more complicated now and a little difficult to follow.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    **[Test build #93450 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/93450/testReport)** for PR 21650 at commit [`78f2ebf`](https://github.com/apache/spark/commit/78f2ebf3b11fe8849fe0d41300f74319ca174d42).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...

Posted by icexelloss <gi...@git.apache.org>.
Github user icexelloss commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21650#discussion_r199167791
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala ---
    @@ -97,6 +103,64 @@ class BatchEvalPythonExecSuite extends SparkPlanTest with SharedSQLContext {
         }
         assert(qualifiedPlanNodes.size == 1)
       }
    +
    +  private def collectPythonExec(spark: SparkPlan): Seq[BatchEvalPythonExec] = spark.collect {
    --- End diff --
    
    Yes! I meant to call it `plan` but apparently made a mistake :(


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/93688/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Pyt...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21650#discussion_r202864609
  
    --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/python/BatchEvalPythonExecSuite.scala ---
    @@ -23,21 +23,27 @@ import scala.collection.mutable.ArrayBuffer
     import org.apache.spark.api.python.{PythonEvalType, PythonFunction}
     import org.apache.spark.sql.catalyst.FunctionIdentifier
     import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, GreaterThan, In}
    -import org.apache.spark.sql.execution.{FilterExec, InputAdapter, SparkPlanTest, WholeStageCodegenExec}
    +import org.apache.spark.sql.execution._
    +import org.apache.spark.sql.functions.col
     import org.apache.spark.sql.test.SharedSQLContext
     import org.apache.spark.sql.types.BooleanType
     
     class BatchEvalPythonExecSuite extends SparkPlanTest with SharedSQLContext {
       import testImplicits.newProductEncoder
       import testImplicits.localSeqToDatasetHolder
     
    +  val pythonUDF = new MyDummyPythonUDF
    --- End diff --
    
    `pythonUDF` -> `pythonBatchedUDF`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/589/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624] Support mixture of Python UDF and Scalar P...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/527/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #21650: [SPARK-24624][SQL][PYTHON] Support mixture of Python UDF...

Posted by BryanCutler <gi...@git.apache.org>.
Github user BryanCutler commented on the issue:

    https://github.com/apache/spark/pull/21650
  
    Would you mind changing cast (1) in your description?  It threw me off a little as they looked independent at first glance.  Maybe something like:
    ```
    df = spark.range(0, 1).toDF('v') \
        .withColumn('foo', f1(df['v'])) \
        .withColumn('bar', f2(df['v']))
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org