You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by rdblue <gi...@git.apache.org> on 2018/08/23 17:41:39 UTC

[GitHub] spark pull request #22206: SPARK-25213: Add project to v2 scans before pytho...

GitHub user rdblue opened a pull request:

    https://github.com/apache/spark/pull/22206

    SPARK-25213: Add project to v2 scans before python filters.

    ## What changes were proposed in this pull request?
    
    The v2 API always adds a projection when converting to physical plan to ensure that rows are all `UnsafeRow`. This is added after any filters run by Spark, assuming that the filter and projection can handle InternalRow, but this fails if those nodes contain python UDFs. This PR detects the Python UDFs and adds a projection above the filter to immediately convert to `UnsafeRow` before passing data to python.
    
    ## How was this patch tested?
    
    This adds a test for the case reported in SPARK-25213 in python's SQL tests.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/rdblue/spark SPARK-25213-v2-add-project-before-python-filter

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/22206.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #22206
    
----
commit ada6e924597cbe247c8be47fd80df38b79cb34e5
Author: Ryan Blue <bl...@...>
Date:   2018-08-23T17:37:19Z

    SPARK-25213: Add project to v2 scans before python filters.

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22206: SPARK-25213: Add project to v2 scans before pytho...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22206#discussion_r212795284
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala ---
    @@ -130,10 +133,22 @@ object DataSourceV2Strategy extends Strategy {
             config)
     
           val filterCondition = postScanFilters.reduceLeftOption(And)
    -      val withFilter = filterCondition.map(FilterExec(_, scan)).getOrElse(scan)
    +
    +      val withFilter = if (filterCondition.exists(hasScalarPythonUDF)) {
    +        // add a projection before FilterExec to ensure that the rows are converted to unsafe
    +        val filterExpr = filterCondition.get
    +        FilterExec(filterExpr, ProjectExec(filterExpr.references.toSeq, scan))
    +      } else {
    +        filterCondition.map(FilterExec(_, scan)).getOrElse(scan)
    +      }
     
           // always add the projection, which will produce unsafe rows required by some operators
    -      ProjectExec(project, withFilter) :: Nil
    +      if (project.exists(hasScalarPythonUDF)) {
    +        val references = project.map(_.references).reduce(_ ++ _).toSeq
    +        ProjectExec(project, ProjectExec(references, withFilter)) :: Nil
    --- End diff --
    
    +1 for leaving as is.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22206: SPARK-25213: Add project to v2 scans before python filte...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22206
  
    **[Test build #95170 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95170/testReport)** for PR 22206 at commit [`ada6e92`](https://github.com/apache/spark/commit/ada6e924597cbe247c8be47fd80df38b79cb34e5).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22206: SPARK-25213: Add project to v2 scans before python filte...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22206
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95170/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22206: SPARK-25213: Add project to v2 scans before pytho...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22206#discussion_r212488758
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala ---
    @@ -130,10 +133,22 @@ object DataSourceV2Strategy extends Strategy {
             config)
     
           val filterCondition = postScanFilters.reduceLeftOption(And)
    -      val withFilter = filterCondition.map(FilterExec(_, scan)).getOrElse(scan)
    +
    +      val withFilter = if (filterCondition.exists(hasScalarPythonUDF)) {
    +        // add a projection before FilterExec to ensure that the rows are converted to unsafe
    +        val filterExpr = filterCondition.get
    +        FilterExec(filterExpr, ProjectExec(filterExpr.references.toSeq, scan))
    +      } else {
    +        filterCondition.map(FilterExec(_, scan)).getOrElse(scan)
    +      }
     
           // always add the projection, which will produce unsafe rows required by some operators
    -      ProjectExec(project, withFilter) :: Nil
    +      if (project.exists(hasScalarPythonUDF)) {
    +        val references = project.map(_.references).reduce(_ ++ _).toSeq
    +        ProjectExec(project, ProjectExec(references, withFilter)) :: Nil
    --- End diff --
    
    nit: If we already add Project on top of Filter, we don't need to add another Project here, right?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22206: SPARK-25213: Add project to v2 scans before python filte...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22206
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22206: SPARK-25213: Add project to v2 scans before python filte...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22206
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/2512/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22206: SPARK-25213: Add project to v2 scans before python filte...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on the issue:

    https://github.com/apache/spark/pull/22206
  
    Not a big deal but PR title: `[SPARK-25213][PYTHON] ... ` per the guide.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22206: [SPARK-25213][PYTHON] Add project to v2 scans bef...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue closed the pull request at:

    https://github.com/apache/spark/pull/22206


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22206: SPARK-25213: Add project to v2 scans before python filte...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22206
  
    **[Test build #95187 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95187/testReport)** for PR 22206 at commit [`550368e`](https://github.com/apache/spark/commit/550368eaeebdc87f2c89bad7214f2624784eeb04).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22206: SPARK-25213: Add project to v2 scans before pytho...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22206#discussion_r212496291
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala ---
    @@ -130,10 +133,22 @@ object DataSourceV2Strategy extends Strategy {
             config)
     
           val filterCondition = postScanFilters.reduceLeftOption(And)
    -      val withFilter = filterCondition.map(FilterExec(_, scan)).getOrElse(scan)
    +
    +      val withFilter = if (filterCondition.exists(hasScalarPythonUDF)) {
    +        // add a projection before FilterExec to ensure that the rows are converted to unsafe
    +        val filterExpr = filterCondition.get
    +        FilterExec(filterExpr, ProjectExec(filterExpr.references.toSeq, scan))
    +      } else {
    +        filterCondition.map(FilterExec(_, scan)).getOrElse(scan)
    +      }
     
           // always add the projection, which will produce unsafe rows required by some operators
    -      ProjectExec(project, withFilter) :: Nil
    +      if (project.exists(hasScalarPythonUDF)) {
    +        val references = project.map(_.references).reduce(_ ++ _).toSeq
    +        ProjectExec(project, ProjectExec(references, withFilter)) :: Nil
    --- End diff --
    
    Ok. Let's leave as it is now.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22206: SPARK-25213: Add project to v2 scans before python filte...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22206
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/2500/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22206: SPARK-25213: Add project to v2 scans before python filte...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22206
  
    **[Test build #95172 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95172/testReport)** for PR 22206 at commit [`c2e1bc7`](https://github.com/apache/spark/commit/c2e1bc7e21a82605e884a40c7c5a553a1b711b51).
     * This patch **fails Python style tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22206: SPARK-25213: Add project to v2 scans before python filte...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22206
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95187/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22206: SPARK-25213: Add project to v2 scans before pytho...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22206#discussion_r212676265
  
    --- Diff: python/pyspark/sql/tests.py ---
    @@ -6394,6 +6394,17 @@ def test_invalid_args(self):
                     df.withColumn('mean_v', mean_udf(df['v']).over(ow))
     
     
    +class DataSourceV2Tests(ReusedSQLTestCase):
    +    def test_pyspark_udf_SPARK_25213(self):
    +        from pyspark.sql.functions import udf
    +
    +        df = self.spark.read.format("org.apache.spark.sql.sources.v2.SimpleDataSourceV2").load()
    +        result = df.withColumn('x', udf(lambda x: x, 'int')(df['i']))
    --- End diff --
    
    Agreed. I was just verifying that the fix worked before spending more time on it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22206: SPARK-25213: Add project to v2 scans before python filte...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22206
  
    **[Test build #95173 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95173/testReport)** for PR 22206 at commit [`c49157e`](https://github.com/apache/spark/commit/c49157e94634ea964f78ce671d48c52dc9b43e6b).
     * This patch **fails PySpark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22206: SPARK-25213: Add project to v2 scans before pytho...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22206#discussion_r212497182
  
    --- Diff: python/pyspark/sql/tests.py ---
    @@ -6394,6 +6394,17 @@ def test_invalid_args(self):
                     df.withColumn('mean_v', mean_udf(df['v']).over(ow))
     
     
    +class DataSourceV2Tests(ReusedSQLTestCase):
    +    def test_pyspark_udf_SPARK_25213(self):
    +        from pyspark.sql.functions import udf
    +
    +        df = self.spark.read.format("org.apache.spark.sql.sources.v2.SimpleDataSourceV2").load()
    +        result = df.withColumn('x', udf(lambda x: x, 'int')(df['i']))
    --- End diff --
    
    This only tests Project with Scalar PythonUDF? Might be better to also test Filter case.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22206: SPARK-25213: Add project to v2 scans before python filte...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22206
  
    **[Test build #95187 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95187/testReport)** for PR 22206 at commit [`550368e`](https://github.com/apache/spark/commit/550368eaeebdc87f2c89bad7214f2624784eeb04).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22206: SPARK-25213: Add project to v2 scans before python filte...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22206
  
    **[Test build #95173 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95173/testReport)** for PR 22206 at commit [`c49157e`](https://github.com/apache/spark/commit/c49157e94634ea964f78ce671d48c52dc9b43e6b).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22206: SPARK-25213: Add project to v2 scans before python filte...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22206
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22206: SPARK-25213: Add project to v2 scans before python filte...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22206
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22206: SPARK-25213: Add project to v2 scans before python filte...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22206
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22206: SPARK-25213: Add project to v2 scans before pytho...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22206#discussion_r212795255
  
    --- Diff: python/pyspark/sql/tests.py ---
    @@ -6394,6 +6394,17 @@ def test_invalid_args(self):
                     df.withColumn('mean_v', mean_udf(df['v']).over(ow))
     
     
    +class DataSourceV2Tests(ReusedSQLTestCase):
    +    def test_pyspark_udf_SPARK_25213(self):
    --- End diff --
    
    not a big deal but I would avoid `SPARK_25213` postfix at the end just for consistency.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22206: SPARK-25213: Add project to v2 scans before python filte...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22206
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95172/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22206: [SPARK-25213][PYTHON] Add project to v2 scans bef...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22206#discussion_r213098202
  
    --- Diff: python/pyspark/sql/tests.py ---
    @@ -6394,6 +6394,17 @@ def test_invalid_args(self):
                     df.withColumn('mean_v', mean_udf(df['v']).over(ow))
     
     
    +class DataSourceV2Tests(ReusedSQLTestCase):
    +    def test_pyspark_udf_SPARK_25213(self):
    --- End diff --
    
    I like that the tests in Scala include this information somewhere. Is there a better place for it in PySpark? I'm not aware of another way to pass extra metadata, but I'm open to if it there's a better way.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22206: [SPARK-25213][PYTHON] Add project to v2 scans before pyt...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on the issue:

    https://github.com/apache/spark/pull/22206
  
    @HyukjinKwon and @viirya, thank you for looking at this commit, but I like @cloud-fan's approach to fixing this in #22244 better than this work-around. I'm going to close this in favor of that approach, although if we need a quick fix I can pick this back up.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22206: SPARK-25213: Add project to v2 scans before python filte...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22206
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22206: SPARK-25213: Add project to v2 scans before python filte...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22206
  
    Merged build finished. Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22206: SPARK-25213: Add project to v2 scans before python filte...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22206
  
    Merged build finished. Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22206: SPARK-25213: Add project to v2 scans before python filte...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22206
  
    **[Test build #95172 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95172/testReport)** for PR 22206 at commit [`c2e1bc7`](https://github.com/apache/spark/commit/c2e1bc7e21a82605e884a40c7c5a553a1b711b51).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22206: SPARK-25213: Add project to v2 scans before pytho...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22206#discussion_r212484229
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala ---
    @@ -130,10 +133,22 @@ object DataSourceV2Strategy extends Strategy {
             config)
     
           val filterCondition = postScanFilters.reduceLeftOption(And)
    -      val withFilter = filterCondition.map(FilterExec(_, scan)).getOrElse(scan)
    +
    +      val withFilter = if (filterCondition.exists(hasScalarPythonUDF)) {
    +        // add a projection before FilterExec to ensure that the rows are converted to unsafe
    +        val filterExpr = filterCondition.get
    +        FilterExec(filterExpr, ProjectExec(filterExpr.references.toSeq, scan))
    +      } else {
    +        filterCondition.map(FilterExec(_, scan)).getOrElse(scan)
    +      }
     
           // always add the projection, which will produce unsafe rows required by some operators
    -      ProjectExec(project, withFilter) :: Nil
    +      if (project.exists(hasScalarPythonUDF)) {
    +        val references = project.map(_.references).reduce(_ ++ _).toSeq
    +        ProjectExec(project, ProjectExec(references, withFilter)) :: Nil
    --- End diff --
    
    Why do we need to add extra Project on top of Filter here?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22206: SPARK-25213: Add project to v2 scans before pytho...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22206#discussion_r212488266
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala ---
    @@ -130,10 +133,22 @@ object DataSourceV2Strategy extends Strategy {
             config)
     
           val filterCondition = postScanFilters.reduceLeftOption(And)
    -      val withFilter = filterCondition.map(FilterExec(_, scan)).getOrElse(scan)
    +
    +      val withFilter = if (filterCondition.exists(hasScalarPythonUDF)) {
    +        // add a projection before FilterExec to ensure that the rows are converted to unsafe
    +        val filterExpr = filterCondition.get
    +        FilterExec(filterExpr, ProjectExec(filterExpr.references.toSeq, scan))
    +      } else {
    +        filterCondition.map(FilterExec(_, scan)).getOrElse(scan)
    +      }
     
           // always add the projection, which will produce unsafe rows required by some operators
    -      ProjectExec(project, withFilter) :: Nil
    +      if (project.exists(hasScalarPythonUDF)) {
    +        val references = project.map(_.references).reduce(_ ++ _).toSeq
    +        ProjectExec(project, ProjectExec(references, withFilter)) :: Nil
    --- End diff --
    
    The v2 data sources return `InternalRow`, not `UnsafeRow`. Python UDFs can't handle `InternalRow`, so this is intended to add a projection to convert to unsafe before the projection that contains a python UDF.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22206: SPARK-25213: Add project to v2 scans before pytho...

Posted by rdblue <gi...@git.apache.org>.
Github user rdblue commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22206#discussion_r212489210
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala ---
    @@ -130,10 +133,22 @@ object DataSourceV2Strategy extends Strategy {
             config)
     
           val filterCondition = postScanFilters.reduceLeftOption(And)
    -      val withFilter = filterCondition.map(FilterExec(_, scan)).getOrElse(scan)
    +
    +      val withFilter = if (filterCondition.exists(hasScalarPythonUDF)) {
    +        // add a projection before FilterExec to ensure that the rows are converted to unsafe
    +        val filterExpr = filterCondition.get
    +        FilterExec(filterExpr, ProjectExec(filterExpr.references.toSeq, scan))
    +      } else {
    +        filterCondition.map(FilterExec(_, scan)).getOrElse(scan)
    +      }
     
           // always add the projection, which will produce unsafe rows required by some operators
    -      ProjectExec(project, withFilter) :: Nil
    +      if (project.exists(hasScalarPythonUDF)) {
    +        val references = project.map(_.references).reduce(_ ++ _).toSeq
    +        ProjectExec(project, ProjectExec(references, withFilter)) :: Nil
    --- End diff --
    
    That one was only added if there was a filter and if that filter ran a UDF. This will add an unnecessary project if both the filter and the project have python UDFs, but I thought that was probably okay. I can add a boolean to signal if the filter caused one to be added already if you think it's worth it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22206: SPARK-25213: Add project to v2 scans before pytho...

Posted by HyukjinKwon <gi...@git.apache.org>.
Github user HyukjinKwon commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22206#discussion_r212795238
  
    --- Diff: python/pyspark/sql/tests.py ---
    @@ -6394,6 +6394,17 @@ def test_invalid_args(self):
                     df.withColumn('mean_v', mean_udf(df['v']).over(ow))
     
     
    +class DataSourceV2Tests(ReusedSQLTestCase):
    +    def test_pyspark_udf_SPARK_25213(self):
    +        from pyspark.sql.functions import udf
    +
    +        df = self.spark.read.format("org.apache.spark.sql.sources.v2.SimpleDataSourceV2").load()
    --- End diff --
    
    I think this test will fail if test classes are not compiled. Can we check if test classes are compiled and then skip if not existent?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22206: SPARK-25213: Add project to v2 scans before python filte...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22206
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/2498/
    Test PASSed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22206: SPARK-25213: Add project to v2 scans before pytho...

Posted by viirya <gi...@git.apache.org>.
Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22206#discussion_r212488439
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala ---
    @@ -130,10 +133,22 @@ object DataSourceV2Strategy extends Strategy {
             config)
     
           val filterCondition = postScanFilters.reduceLeftOption(And)
    -      val withFilter = filterCondition.map(FilterExec(_, scan)).getOrElse(scan)
    +
    +      val withFilter = if (filterCondition.exists(hasScalarPythonUDF)) {
    +        // add a projection before FilterExec to ensure that the rows are converted to unsafe
    +        val filterExpr = filterCondition.get
    +        FilterExec(filterExpr, ProjectExec(filterExpr.references.toSeq, scan))
    +      } else {
    +        filterCondition.map(FilterExec(_, scan)).getOrElse(scan)
    +      }
     
           // always add the projection, which will produce unsafe rows required by some operators
    -      ProjectExec(project, withFilter) :: Nil
    +      if (project.exists(hasScalarPythonUDF)) {
    +        val references = project.map(_.references).reduce(_ ++ _).toSeq
    +        ProjectExec(project, ProjectExec(references, withFilter)) :: Nil
    --- End diff --
    
    oh, I see. It is also used to make sure PythonUDF in top Project takes unsafe row input.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22206: SPARK-25213: Add project to v2 scans before python filte...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22206
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95173/
    Test FAILed.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22206: SPARK-25213: Add project to v2 scans before python filte...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22206
  
    **[Test build #95170 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95170/testReport)** for PR 22206 at commit [`ada6e92`](https://github.com/apache/spark/commit/ada6e924597cbe247c8be47fd80df38b79cb34e5).
     * This patch **fails Python style tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org