You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by davies <gi...@git.apache.org> on 2015/09/19 07:46:56 UTC

[GitHub] spark pull request: [SPARK-10685] [SPARK-8632] [SQL] [PYSPARK] Pyt...

GitHub user davies opened a pull request:

    https://github.com/apache/spark/pull/8833

    [SPARK-10685] [SPARK-8632] [SQL] [PYSPARK] Python UDF should only compute the upstream once

    This PR changes to buffer the rows from upstream into a Queue, then zip them with result from Python UDF, to avoid the double computation of upstream.
    
    Thanks the idea from @rxin to simplify the buffer greatly!
    
    cc @marmbrus 
    
    Closes #8662  

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/davies/spark pyudf

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/8833.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #8833
    
----
commit b98cf2a45984d24bdb181e4d43d8f83ca1849aff
Author: Davies Liu <da...@databricks.com>
Date:   2015-09-19T05:42:26Z

    compute the upstream once

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10685] [SPARK-8632] [SQL] [PYSPARK] Pyt...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/8833#issuecomment-141700988
  
      [Test build #42714 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/42714/console) for   PR 8833 at commit [`b98cf2a`](https://github.com/apache/spark/commit/b98cf2a45984d24bdb181e4d43d8f83ca1849aff).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10685] [SPARK-8632] [SQL] [PYSPARK] Pyt...

Posted by justinuang <gi...@git.apache.org>.
Github user justinuang commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8833#discussion_r40048503
  
    --- Diff: python/pyspark/sql/functions.py ---
    @@ -1414,7 +1414,7 @@ def __init__(self, func, returnType, name=None):
         def _create_judf(self, name):
             f, returnType = self.func, self.returnType  # put them in closure `func`
             func = lambda _, it: map(lambda x: returnType.toInternal(f(*x)), it)
    -        ser = AutoBatchedSerializer(PickleSerializer())
    +        ser = BatchedSerializer(PickleSerializer(), 100)
    --- End diff --
    
    Good point, I was still thinking about my first attempt which involved a blocking queue.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10685] [SPARK-8632] [SQL] [PYSPARK] Pyt...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the pull request:

    https://github.com/apache/spark/pull/8833#issuecomment-142423117
  
    Based on some extended offline discussion / debate and code-review, we've decided to merge #8835 instead of this fix. The basic approaches in both patches are the same, except #8835 makes the scope + lifecycle of the queue a little clearer and will be easier to understand / maintain in the long term. I've reviewed that other patch pretty carefully and don't believe that it represents significantly more backporting risk than this patch.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10685] [SPARK-8632] [SQL] [PYSPARK] Pyt...

Posted by justinuang <gi...@git.apache.org>.
Github user justinuang commented on the pull request:

    https://github.com/apache/spark/pull/8833#issuecomment-142161491
  
    lgtm! So this avoids deadlock by getting rid of the blocking queue (duh!) and then assumes the OS buffer will rate limit how much gets buffered on the writer side?
    
    Looking forward to getting this fix in.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10685] [SPARK-8632] [SQL] [PYSPARK] Pyt...

Posted by justinuang <gi...@git.apache.org>.
Github user justinuang commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8833#discussion_r39933648
  
    --- Diff: python/pyspark/sql/functions.py ---
    @@ -1414,7 +1414,7 @@ def __init__(self, func, returnType, name=None):
         def _create_judf(self, name):
             f, returnType = self.func, self.returnType  # put them in closure `func`
             func = lambda _, it: map(lambda x: returnType.toInternal(f(*x)), it)
    -        ser = AutoBatchedSerializer(PickleSerializer())
    +        ser = BatchedSerializer(PickleSerializer(), 100)
    --- End diff --
    
    Can we pull this out in a constant? And also the same value in the Python, and put a comment on each saying that they have to equal? It's very dangerous if this value goes out of sync.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10685] [SPARK-8632] [SQL] [PYSPARK] Pyt...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8833#discussion_r39921623
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUDFs.scala ---
    @@ -338,7 +338,11 @@ case class BatchPythonEvaluation(udf: PythonUDF, output: Seq[Attribute], child:
       def children: Seq[SparkPlan] = child :: Nil
     
       protected override def doExecute(): RDD[InternalRow] = {
    -    val childResults = child.execute().map(_.copy())
    +    val buffer = new java.util.concurrent.ConcurrentLinkedQueue[InternalRow]()
    --- End diff --
    
    Yes


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10685] [SPARK-8632] [SQL] [PYSPARK] Pyt...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/8833#issuecomment-141691813
  
     Merged build triggered.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10685] [SPARK-8632] [SQL] [PYSPARK] Pyt...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/8833#issuecomment-141701003
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10685] [SPARK-8632] [SQL] [PYSPARK] Pyt...

Posted by davies <gi...@git.apache.org>.
Github user davies closed the pull request at:

    https://github.com/apache/spark/pull/8833


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10685] [SPARK-8632] [SQL] [PYSPARK] Pyt...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8833#discussion_r39917194
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUDFs.scala ---
    @@ -338,7 +338,11 @@ case class BatchPythonEvaluation(udf: PythonUDF, output: Seq[Attribute], child:
       def children: Seq[SparkPlan] = child :: Nil
     
       protected override def doExecute(): RDD[InternalRow] = {
    -    val childResults = child.execute().map(_.copy())
    +    val buffer = new java.util.concurrent.ConcurrentLinkedQueue[InternalRow]()
    --- End diff --
    
    If i understand this correctly, we are assuming the following in order for this to work:
    
    1. Each task gets their own copy of the deserialized closure, and thus their own copy of the queue.
    2. All closures are serialized together in one shot, rather than in multiple places (e.g. they are all done in the serializer, not in the ctor of the RDD)
    3. Java serializer does not serialize objects twice within the same stream, since it uses it to detect cycles. When they are deserialized, they still point to the same copy.
    
    
    
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10685] [SPARK-8632] [SQL] [PYSPARK] Pyt...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/8833#discussion_r39935553
  
    --- Diff: python/pyspark/sql/functions.py ---
    @@ -1414,7 +1414,7 @@ def __init__(self, func, returnType, name=None):
         def _create_judf(self, name):
             f, returnType = self.func, self.returnType  # put them in closure `func`
             func = lambda _, it: map(lambda x: returnType.toInternal(f(*x)), it)
    -        ser = AutoBatchedSerializer(PickleSerializer())
    +        ser = BatchedSerializer(PickleSerializer(), 100)
    --- End diff --
    
    These two values don't need to be the same.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10685] [SPARK-8632] [SQL] [PYSPARK] Pyt...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/8833#issuecomment-141693095
  
      [Test build #42714 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/42714/consoleFull) for   PR 8833 at commit [`b98cf2a`](https://github.com/apache/spark/commit/b98cf2a45984d24bdb181e4d43d8f83ca1849aff).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10685] [SPARK-8632] [SQL] [PYSPARK] Pyt...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/8833#issuecomment-141712761
  
      [Test build #1775 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/1775/console) for   PR 8833 at commit [`b98cf2a`](https://github.com/apache/spark/commit/b98cf2a45984d24bdb181e4d43d8f83ca1849aff).
     * This patch **passes all tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10685] [SPARK-8632] [SQL] [PYSPARK] Pyt...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/8833#issuecomment-141691826
  
    Merged build started.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10685] [SPARK-8632] [SQL] [PYSPARK] Pyt...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/8833#issuecomment-141708146
  
      [Test build #1775 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/1775/consoleFull) for   PR 8833 at commit [`b98cf2a`](https://github.com/apache/spark/commit/b98cf2a45984d24bdb181e4d43d8f83ca1849aff).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-10685] [SPARK-8632] [SQL] [PYSPARK] Pyt...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/8833#issuecomment-141701004
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/42714/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org