You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by davies <gi...@git.apache.org> on 2014/08/05 19:51:16 UTC
[GitHub] spark pull request: [WIP] [PySpark] Add missing API
GitHub user davies opened a pull request:
https://github.com/apache/spark/pull/1791
[WIP] [PySpark] Add missing API
Try to bring all Java/Scala API to PySpark.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/davies/spark api
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/1791.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #1791
----
commit ff2cbe3effddae2c92d7d1ddb14c6762b42ae5fa
Author: Davies Liu <da...@gmail.com>
Date: 2014-08-05T06:55:40Z
add missing API in SparkContext
commit e0b3d307bb8a5988425f4eeff17d8cfc5469e6e8
Author: Davies Liu <da...@gmail.com>
Date: 2014-08-05T08:11:41Z
add histogram()
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2871] [PySpark] Add missing API
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:
https://github.com/apache/spark/pull/1791#issuecomment-53473278
[QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19221/consoleFull) for PR 1791 at commit [`28fd368`](https://github.com/apache/spark/commit/28fd3682b434b8a19ea7cb731252ea9c613c968b).
* This patch **fails** unit tests.
* This patch merges cleanly.
* This patch adds the following public classes _(experimental)_:
* `class BoundedFloat(float):`
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [WIP] [PySpark] Add missing API
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:
https://github.com/apache/spark/pull/1791#issuecomment-51276479
QA tests have started for PR 1791. This patch merges cleanly. <br>View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17958/consoleFull
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2871] [PySpark] Add missing API
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:
https://github.com/apache/spark/pull/1791#issuecomment-51401265
QA tests have started for PR 1791. This patch merges cleanly. <br>View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18055/consoleFull
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [WIP] [PySpark] Add missing API
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:
https://github.com/apache/spark/pull/1791#issuecomment-51269914
QA tests have started for PR 1791. This patch merges cleanly. <br>View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17953/consoleFull
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2871] [PySpark] Add missing API
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:
https://github.com/apache/spark/pull/1791#issuecomment-52132725
QA results for PR 1791:<br>- This patch PASSES unit tests.<br>- This patch merges cleanly<br>- This patch adds the following public classes (experimental):<br>class BoundedFloat(float):<br><br>For more information see test ouptut:<br>https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18498/consoleFull
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2871] [PySpark] Add missing API
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:
https://github.com/apache/spark/pull/1791#issuecomment-51408173
QA results for PR 1791:<br>- This patch PASSES unit tests.<br>- This patch merges cleanly<br>- This patch adds the following public classes (experimental):<br>class BoundedFloat(float):<br><br>For more information see test ouptut:<br>https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18059/consoleFull
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2871] [PySpark] Add missing API
Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:
https://github.com/apache/spark/pull/1791#discussion_r16266736
--- Diff: python/pyspark/rdd.py ---
@@ -858,6 +904,88 @@ def redFunc(left_counter, right_counter):
return self.mapPartitions(lambda i: [StatCounter(i)]).reduce(redFunc)
+ def histogram(self, buckets, even=False):
+ """
+ Compute a histogram using the provided buckets. The buckets
+ are all open to the right except for the last which is closed.
+ e.g. [1,10,20,50] means the buckets are [1,10) [10,20) [20,50],
+ which means 1<=x<10, 10<=x<20, 20<=x<=50. And on the input of 1
+ and 50 we would have a histogram of 1,0,1.
+
+ If your histogram is evenly spaced (e.g. [0, 10, 20, 30]),
+ this can be switched from an O(log n) inseration to O(1) per
+ element(where n = # buckets), if you set `even` to True.
--- End diff --
Call the even parameter `evenBuckets` to match the Scala API.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2871] [PySpark] Add missing API
Posted by nrchandan <gi...@git.apache.org>.
Github user nrchandan commented on a diff in the pull request:
https://github.com/apache/spark/pull/1791#discussion_r15858417
--- Diff: python/pyspark/rdd.py ---
@@ -854,6 +884,97 @@ def redFunc(left_counter, right_counter):
return self.mapPartitions(lambda i: [StatCounter(i)]).reduce(redFunc)
+ def histogram(self, buckets, even=False):
+ """
+ Compute a histogram using the provided buckets. The buckets
+ are all open to the right except for the last which is closed.
+ e.g. [1,10,20,50] means the buckets are [1,10) [10,20) [20,50],
+ which means 1<=x<10, 10<=x<20, 20<=x<=50. And on the input of 1
+ and 50 we would have a histogram of 1,0,1.
+
+ If your histogram is evenly spaced (e.g. [0, 10, 20, 30]),
+ this can be switched from an O(log n) inseration to O(1) per
+ element(where n = # buckets), if you set `even` to True.
+
+ Buckets must be sorted and not contain any duplicates, must be
+ at least two elements.
+
+ If `buckets` is a number, it will generates buckets which is
+ evenly spaced between the minimum and maximum of the RDD. For
+ example, if the min value is 0 and the max is 100, given buckets
+ as 2, the resulting buckets will be [0,50) [50,100]. buckets must
+ be at least 1 If the RDD contains infinity, NaN throws an exception
+ If the elements in RDD do not vary (max == min) always returns
+ a single bucket.
+
+ It will return an tuple of buckets and histogram.
+
+ >>> rdd = sc.parallelize(range(51))
+ >>> rdd.histogram(2)
+ ([0, 25, 50], [25, 26])
+ >>> rdd.histogram([0, 5, 25, 50])
+ ([0, 5, 25, 50], [5, 20, 26])
+ >>> rdd.histogram([0, 15, 30, 45, 60], True)
+ ([0, 15, 30, 45, 60], [15, 15, 15, 6])
+ """
+
+ if isinstance(buckets, (int, long)):
+ if buckets < 1:
+ raise ValueError("buckets should not less than 1")
+
+ # faster than stats()
+ def minmax(it):
+ minv, maxv = float("inf"), float("-inf")
+ for v in it:
+ minv = min(minv, v)
+ maxv = max(maxv, v)
+ return [(minv, maxv)]
+
+ def _merge(a, b):
+ return (min(a[0], b[0]), max(a[1], b[1]))
+
+ minv, maxv = self.mapPartitions(minmax).reduce(_merge)
+
+ if minv == maxv or buckets == 1:
+ return [minv, maxv], [self.count()]
+
+ inc = (maxv - minv) / buckets
+ # keep them as integer if possible
+ if inc * buckets != maxv - minv:
--- End diff --
This was smart!
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2871] [PySpark] Add missing API
Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:
https://github.com/apache/spark/pull/1791#issuecomment-52256403
@mateiz thanks for review this, I had addressed all you comments.
@JoshRosen could you take a look a this again?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2871] [PySpark] Add missing API
Posted by davies <gi...@git.apache.org>.
GitHub user davies reopened a pull request:
https://github.com/apache/spark/pull/1791
[SPARK-2871] [PySpark] Add missing API
Try to bring all Java/Scala API to PySpark.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/davies/spark api
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/1791.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #1791
----
commit ff2cbe3effddae2c92d7d1ddb14c6762b42ae5fa
Author: Davies Liu <da...@gmail.com>
Date: 2014-08-05T06:55:40Z
add missing API in SparkContext
commit e0b3d307bb8a5988425f4eeff17d8cfc5469e6e8
Author: Davies Liu <da...@gmail.com>
Date: 2014-08-05T08:11:41Z
add histogram()
commit 5d5be95d1dcede041950bf092e0c6438a3aac536
Author: Davies Liu <da...@gmail.com>
Date: 2014-08-05T17:59:03Z
change histogram API
commit a95eca01ebfd023a5b016015b49d98abbd658287
Author: Davies Liu <da...@gmail.com>
Date: 2014-08-05T22:30:10Z
add zipWithIndex and zipWithUniqueId
commit 4ffae0031e1f00641845fc5e9e3b62f54e7c56ad
Author: Davies Liu <da...@gmail.com>
Date: 2014-08-05T23:50:57Z
collectPartitions()
commit 7a9ea0a4fbb804101d6180bd95a5964822f7a6a0
Author: Davies Liu <da...@gmail.com>
Date: 2014-08-05T23:56:30Z
update docs of histogram
commit 53640be1de45418dbae9406f9717873c62bfa584
Author: Davies Liu <da...@gmail.com>
Date: 2014-08-06T05:38:57Z
histogram() in pure Python, better support for int
commit 9a01ac30a0b03bf265db4bda5fdd3e2a49bddf8d
Author: Davies Liu <da...@gmail.com>
Date: 2014-08-06T05:42:33Z
fix docs
commit 7ba5f882ab124ffb77414024f6c9d3297c2351c9
Author: Davies Liu <da...@gmail.com>
Date: 2014-08-06T17:58:24Z
refactor
commit a25c34e49bf2958b210fc0c0c793f51d00b726fa
Author: Davies Liu <da...@gmail.com>
Date: 2014-08-06T21:33:46Z
fix bug of countApproxDistinct
commit 1218b3b8c997e81cecc23a57f83cd79b5eac9147
Author: Davies Liu <da...@gmail.com>
Date: 2014-08-06T21:34:18Z
add countApprox and countApproxDistinct
meanApprox() and sumApprox()
commit 034124f07adad1aeb8b8a6ba8491ce9cd4f8d217
Author: Davies Liu <da...@gmail.com>
Date: 2014-08-06T21:43:44Z
Merge branch 'master' into api
commit 91324562d5faa268051ead6a7c6de5fe8be8fbef
Author: Davies Liu <da...@gmail.com>
Date: 2014-08-06T21:49:12Z
fix pep8
commit 977e4741ceadf7716676aae120dd846c8c415376
Author: Davies Liu <da...@gmail.com>
Date: 2014-08-07T19:01:29Z
address comments: improve docs
commit ac606ca1ed3e81329d6808f8a99557079e0f463f
Author: Davies Liu <da...@gmail.com>
Date: 2014-08-11T18:48:48Z
comment out not implemented APIs
add TODO for them
commit f0158e4bb3db74e9c96035a2ebd6192ad256fc3e
Author: Davies Liu <da...@gmail.com>
Date: 2014-08-11T18:50:25Z
comment out not implemented API in SparkContext
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2871] [PySpark] Add missing API
Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the pull request:
https://github.com/apache/spark/pull/1791#issuecomment-51425517
@matei @pwendell Do you have any thoughts on placeholders vs. leaving out APIs that aren't implemented in PySpark? Which is better from a usability perspective?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2871] [PySpark] Add missing API
Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:
https://github.com/apache/spark/pull/1791#discussion_r15916742
--- Diff: python/pyspark/rdd.py ---
@@ -737,6 +754,19 @@ def _collect_iterator_through_file(self, iterator):
yield item
os.unlink(tempFile.name)
+ def collectPartitions(self, partitions):
--- End diff --
In the Scala API, this is marked as a private API used only for tests. Is there a non-test usecase for this?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2871] [PySpark] Add missing API
Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:
https://github.com/apache/spark/pull/1791#issuecomment-51826337
closed by accident
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2871] [PySpark] Add missing API
Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:
https://github.com/apache/spark/pull/1791#discussion_r16266909
--- Diff: python/pyspark/rdd.py ---
@@ -858,6 +904,88 @@ def redFunc(left_counter, right_counter):
return self.mapPartitions(lambda i: [StatCounter(i)]).reduce(redFunc)
+ def histogram(self, buckets, even=False):
+ """
+ Compute a histogram using the provided buckets. The buckets
+ are all open to the right except for the last which is closed.
+ e.g. [1,10,20,50] means the buckets are [1,10) [10,20) [20,50],
+ which means 1<=x<10, 10<=x<20, 20<=x<=50. And on the input of 1
+ and 50 we would have a histogram of 1,0,1.
+
+ If your histogram is evenly spaced (e.g. [0, 10, 20, 30]),
+ this can be switched from an O(log n) inseration to O(1) per
+ element(where n = # buckets), if you set `even` to True.
+
+ Buckets must be sorted and not contain any duplicates, must be
+ at least two elements.
+
+ If `buckets` is a number, it will generates buckets which is
+ evenly spaced between the minimum and maximum of the RDD. For
+ example, if the min value is 0 and the max is 100, given buckets
+ as 2, the resulting buckets will be [0,50) [50,100]. buckets must
+ be at least 1 If the RDD contains infinity, NaN throws an exception
+ If the elements in RDD do not vary (max == min) always returns
+ a single bucket.
+
+ It will return an tuple of buckets and histogram.
+
+ >>> rdd = sc.parallelize(range(51))
+ >>> rdd.histogram(2)
+ ([0, 25, 50], [25, 26])
+ >>> rdd.histogram([0, 5, 25, 50])
+ ([0, 5, 25, 50], [5, 20, 26])
+ >>> rdd.histogram([0, 15, 30, 45, 60], True)
+ ([0, 15, 30, 45, 60], [15, 15, 15, 6])
--- End diff --
In addition to these tests, port the cases from DoubleRDDSuite.scala into tests.py. Histogram had some tricky edge-cases in Scala (e.g. NaNs, values outside the bucket ranges) so we should make sure we catch them here. Sometimes floating-point math is also different in Python than in Scala.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2871] [PySpark] Add missing API
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:
https://github.com/apache/spark/pull/1791#issuecomment-52121288
QA tests have started for PR 1791. This patch DID NOT merge cleanly! <br>View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18488/consoleFull
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2871] [PySpark] Add missing API
Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:
https://github.com/apache/spark/pull/1791#discussion_r16209415
--- Diff: python/pyspark/rdd.py ---
@@ -1755,6 +1941,114 @@ def _defaultReducePartitions(self):
# on the key; we need to compare the hash of the key to the hash of the
# keys in the pairs. This could be an expensive operation, since those
# hashes aren't retained.
+ # def lookup(self, key):
+ # """
+ # Return the list of values in the RDD for key key.
+ # """
+
+ def _is_pickled(self):
+ """ Return this RDD is serialized by Pickle or not. """
+ der = self._jrdd_deserializer
+ if isinstance(der, PickleSerializer):
+ return True
+ if isinstance(der, BatchedSerializer) and isinstance(der.serializer, PickleSerializer):
+ return True
+ return False
+
+ def _to_jrdd(self):
+ """ Return an JavaRDD """
+ if not self._is_pickled():
+ self = self._reserialize(BatchedSerializer(PickleSerializer(), 1024))
+ batched = isinstance(self._jrdd_deserializer, BatchedSerializer)
+ return self.ctx._jvm.PythonRDD.pythonToJava(self._jrdd, batched)
+
+ def countApproxDistinct(self, relativeSD=0.05):
+ """
+ :: Experimental ::
+ Return approximate number of distinct elements in the RDD.
+
+ The algorithm used is based on streamlib's implementation of
+ "HyperLogLog in Practice: Algorithmic Engineering of a State
+ of The Art Cardinality Estimation Algorithm", available
+ <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
+
+ :param: relativeSD Relative accuracy. Smaller values create
+ counters that require more space.
+ It must be greater than 0.000017.
+
+ >>> n = sc.parallelize(range(1000)).map(str).countApproxDistinct()
+ >>> 950 < n < 1050
+ True
+ """
+ return self._to_jrdd().countApproxDistinct(relativeSD)
--- End diff --
Actually I guess you remove the batching, but still, it's not super clear that countApproxDistinct will work on byte arrays.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2871] [PySpark] Add missing API
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:
https://github.com/apache/spark/pull/1791#issuecomment-51281172
QA results for PR 1791:<br>- This patch PASSES unit tests.<br>- This patch merges cleanly<br>- This patch adds no public classes<br><br>For more information see test ouptut:<br>https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17958/consoleFull
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2871] [PySpark] Add missing API
Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:
https://github.com/apache/spark/pull/1791#discussion_r16210568
--- Diff: python/pyspark/rdd.py ---
@@ -1755,6 +1941,114 @@ def _defaultReducePartitions(self):
# on the key; we need to compare the hash of the key to the hash of the
# keys in the pairs. This could be an expensive operation, since those
# hashes aren't retained.
+ # def lookup(self, key):
+ # """
+ # Return the list of values in the RDD for key key.
+ # """
+
+ def _is_pickled(self):
+ """ Return this RDD is serialized by Pickle or not. """
+ der = self._jrdd_deserializer
+ if isinstance(der, PickleSerializer):
+ return True
+ if isinstance(der, BatchedSerializer) and isinstance(der.serializer, PickleSerializer):
+ return True
+ return False
+
+ def _to_jrdd(self):
+ """ Return an JavaRDD """
+ if not self._is_pickled():
+ self = self._reserialize(BatchedSerializer(PickleSerializer(), 1024))
+ batched = isinstance(self._jrdd_deserializer, BatchedSerializer)
+ return self.ctx._jvm.PythonRDD.pythonToJava(self._jrdd, batched)
+
+ def countApproxDistinct(self, relativeSD=0.05):
+ """
+ :: Experimental ::
+ Return approximate number of distinct elements in the RDD.
+
+ The algorithm used is based on streamlib's implementation of
+ "HyperLogLog in Practice: Algorithmic Engineering of a State
+ of The Art Cardinality Estimation Algorithm", available
+ <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
+
+ :param: relativeSD Relative accuracy. Smaller values create
+ counters that require more space.
+ It must be greater than 0.000017.
+
+ >>> n = sc.parallelize(range(1000)).map(str).countApproxDistinct()
+ >>> 950 < n < 1050
+ True
+ """
+ return self._to_jrdd().countApproxDistinct(relativeSD)
--- End diff --
What countApproxDistinct() need is the hash code of items, so this means that we create a hash code for Python objects by hash(pickle.dumps(obj)), so I think this could work.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2871] [PySpark] Add missing API
Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:
https://github.com/apache/spark/pull/1791#discussion_r16267396
--- Diff: python/pyspark/rdd.py ---
@@ -1756,6 +1942,114 @@ def _defaultReducePartitions(self):
# on the key; we need to compare the hash of the key to the hash of the
# keys in the pairs. This could be an expensive operation, since those
# hashes aren't retained.
+ # def lookup(self, key):
+ # """
+ # Return the list of values in the RDD for key key.
+ # """
--- End diff --
If you'd like you can implement lookup() the same way as in Scala, it's not too bad
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2871] [PySpark] Add missing API
Posted by MLnick <gi...@git.apache.org>.
Github user MLnick commented on a diff in the pull request:
https://github.com/apache/spark/pull/1791#discussion_r16071018
--- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala ---
@@ -741,6 +741,23 @@ private[spark] object PythonRDD extends Logging {
}
}
}
+
+ /**
+ * Convert an RDD of serialized Python objects to RDD of objects, that is usable by PySpark.
+ */
+ def pythonToJava(pyRDD: JavaRDD[Array[Byte]], batched: Boolean): JavaRDD[Any] = {
--- End diff --
minor point - but we created `SerDeUtil` object for the SequenceFile/InputFormat stuff, maybe these Java <-> Python conversion methods can all live there instead, it's a little cleaner.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2871] [PySpark] Add missing API
Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:
https://github.com/apache/spark/pull/1791#discussion_r16267475
--- Diff: python/pyspark/rdd.py ---
@@ -1756,6 +1942,114 @@ def _defaultReducePartitions(self):
# on the key; we need to compare the hash of the key to the hash of the
# keys in the pairs. This could be an expensive operation, since those
# hashes aren't retained.
+ # def lookup(self, key):
+ # """
+ # Return the list of values in the RDD for key key.
+ # """
+
+ def _is_pickled(self):
+ """ Return this RDD is serialized by Pickle or not. """
+ der = self._jrdd_deserializer
+ if isinstance(der, PickleSerializer):
+ return True
+ if isinstance(der, BatchedSerializer) and isinstance(der.serializer, PickleSerializer):
+ return True
+ return False
+
+ def _to_jrdd(self):
+ """ Return an JavaRDD of Object by unpickling"""
--- End diff --
This comment needs to be a bit longer, it needs to say there's one Java object per Python object, using Pyrolite. Does passing batched data work with that? Would be good to add tests for both cases.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2871] [PySpark] Add missing API
Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on the pull request:
https://github.com/apache/spark/pull/1791#issuecomment-51731037
BTW leaving TODOs in the Python code would also be okay, if you want to see this in the code.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2871] [PySpark] Add missing API
Posted by zzl0 <gi...@git.apache.org>.
Github user zzl0 commented on a diff in the pull request:
https://github.com/apache/spark/pull/1791#discussion_r15856478
--- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala ---
@@ -730,7 +730,25 @@ private[spark] object PythonRDD extends Logging {
}
/**
- * Convert and RDD of Java objects to and RDD of serialized Python objects, that is usable by
+ * Convert a RDD of serialized Python objects to RDD of Double, that is usable by
+ * PySpark.
+ */
+ def pythonToJavaDouble(pyRDD: JavaRDD[Array[Byte]], batched: Boolean): JavaDoubleRDD = {
+ new JavaDoubleRDD(pyRDD.rdd.mapPartitions { iter =>
+ val unpickle = new Unpickler
+ iter.flatMap { row =>
+ val obj = unpickle.loads(row)
+ if (batched) {
+ obj.asInstanceOf[JArrayList[_]].map(_.asInstanceOf[Double])
+ } else {
+ Seq(obj.asInstanceOf[Double])
+ }
+ }
+ })
+ }
+
+ /**
+ * Convert a RDD of Java objects to and RDD of serialized Python objects, that is usable by
--- End diff --
Convert a RDD of Java objects to and RDD of serialized Python objects
=>
Convert an RDD of Java objects to an RDD of serialized Python objects ?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2871] [PySpark] Add missing API
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:
https://github.com/apache/spark/pull/1791#issuecomment-53479397
[QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19229/consoleFull) for PR 1791 at commit [`657a09b`](https://github.com/apache/spark/commit/657a09bf14ae70639190f832d8b25dde6ee37c01).
* This patch **fails** unit tests.
* This patch merges cleanly.
* This patch adds no public classes.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2871] [PySpark] Add missing API
Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:
https://github.com/apache/spark/pull/1791#issuecomment-51826195
@JoshRosen @mateiz I had commented out those not implemented APIs.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2871] [PySpark] Add missing API
Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:
https://github.com/apache/spark/pull/1791#discussion_r16267443
--- Diff: python/pyspark/rdd.py ---
@@ -1755,6 +1941,114 @@ def _defaultReducePartitions(self):
# on the key; we need to compare the hash of the key to the hash of the
# keys in the pairs. This could be an expensive operation, since those
# hashes aren't retained.
+ # def lookup(self, key):
+ # """
+ # Return the list of values in the RDD for key key.
+ # """
+
+ def _is_pickled(self):
+ """ Return this RDD is serialized by Pickle or not. """
+ der = self._jrdd_deserializer
+ if isinstance(der, PickleSerializer):
+ return True
+ if isinstance(der, BatchedSerializer) and isinstance(der.serializer, PickleSerializer):
+ return True
+ return False
+
+ def _to_jrdd(self):
+ """ Return an JavaRDD """
+ if not self._is_pickled():
+ self = self._reserialize(BatchedSerializer(PickleSerializer(), 1024))
+ batched = isinstance(self._jrdd_deserializer, BatchedSerializer)
+ return self.ctx._jvm.PythonRDD.pythonToJava(self._jrdd, batched)
+
+ def countApproxDistinct(self, relativeSD=0.05):
+ """
+ :: Experimental ::
+ Return approximate number of distinct elements in the RDD.
+
+ The algorithm used is based on streamlib's implementation of
+ "HyperLogLog in Practice: Algorithmic Engineering of a State
+ of The Art Cardinality Estimation Algorithm", available
+ <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
+
+ :param: relativeSD Relative accuracy. Smaller values create
+ counters that require more space.
+ It must be greater than 0.000017.
+
+ >>> n = sc.parallelize(range(1000)).map(str).countApproxDistinct()
+ >>> 950 < n < 1050
+ True
+ """
+ return self._to_jrdd().countApproxDistinct(relativeSD)
--- End diff --
I see, okay. Please add some tests for this in tests.py to make sure we cover all the data types we'll care about (e.g. strings, integers, tuples, etc). We might also want to document which data types it works with above.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2871] [PySpark] Add missing API
Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:
https://github.com/apache/spark/pull/1791#discussion_r16209228
--- Diff: python/pyspark/context.py ---
@@ -260,6 +260,20 @@ def defaultMinPartitions(self):
"""
return self._jsc.sc().defaultMinPartitions()
+ @property
+ def isLocal(self):
+ """
+ Whether the context run locally
+ """
+ return self._jsc.isLocal()
+
+ @property
+ def conf(self):
+ """
+ The L{SparkConf} object
+ """
+ return self._conf
--- End diff --
I agree with Josh here, you need to clone the conf before returning it
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2871] [PySpark] Add missing API
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:
https://github.com/apache/spark/pull/1791#issuecomment-51295996
QA tests have started for PR 1791. This patch merges cleanly. <br>View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18001/consoleFull
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2871] [PySpark] Add missing API
Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:
https://github.com/apache/spark/pull/1791#discussion_r16209516
--- Diff: python/pyspark/context.py ---
@@ -260,6 +260,20 @@ def defaultMinPartitions(self):
"""
return self._jsc.sc().defaultMinPartitions()
+ @property
+ def isLocal(self):
+ """
+ Whether the context run locally
+ """
+ return self._jsc.isLocal()
+
+ @property
+ def conf(self):
+ """
+ The L{SparkConf} object
+ """
+ return self._conf
--- End diff --
I will return an read-only copy of it.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2871] [PySpark] Add missing API
Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:
https://github.com/apache/spark/pull/1791#discussion_r16209395
--- Diff: python/pyspark/rdd.py ---
@@ -1755,6 +1941,114 @@ def _defaultReducePartitions(self):
# on the key; we need to compare the hash of the key to the hash of the
# keys in the pairs. This could be an expensive operation, since those
# hashes aren't retained.
+ # def lookup(self, key):
+ # """
+ # Return the list of values in the RDD for key key.
+ # """
+
+ def _is_pickled(self):
+ """ Return this RDD is serialized by Pickle or not. """
+ der = self._jrdd_deserializer
+ if isinstance(der, PickleSerializer):
+ return True
+ if isinstance(der, BatchedSerializer) and isinstance(der.serializer, PickleSerializer):
+ return True
+ return False
+
+ def _to_jrdd(self):
+ """ Return an JavaRDD """
--- End diff --
Add that this is not batched
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2871] [PySpark] Add missing API
Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:
https://github.com/apache/spark/pull/1791#discussion_r15906969
--- Diff: python/pyspark/context.py ---
@@ -727,6 +738,13 @@ def sparkUser(self):
"""
return self._jsc.sc().sparkUser()
+ @property
+ def startTime(self):
+ """
+ Return the start time of context in millis seconds
--- End diff --
This `startTime` property isn't documented in the Scala API. Do we want to include it here? What's the use-case?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2871] [PySpark] Add missing API
Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:
https://github.com/apache/spark/pull/1791#discussion_r16266655
--- Diff: python/pyspark/rdd.py ---
@@ -812,23 +842,39 @@ def func(iterator):
return self.mapPartitions(func).fold(zeroValue, combOp)
- def max(self):
+ def max(self, comp=None):
--- End diff --
Maybe explain what "comp" is in the doc comment
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2871] [PySpark] Add missing API
Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:
https://github.com/apache/spark/pull/1791#discussion_r15955661
--- Diff: python/pyspark/rdd.py ---
@@ -737,6 +754,19 @@ def _collect_iterator_through_file(self, iterator):
yield item
os.unlink(tempFile.name)
+ def collectPartitions(self, partitions):
--- End diff --
It will help for debug, you can collect parts of the RDD to investigate with them.
It also be helpful if we have an API called slice(start, [end]) to select parts of the partitions. DPark has this kind of API, it help us a lot, Narrow down the data to do fast debugging.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2871] [PySpark] Add missing API
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:
https://github.com/apache/spark/pull/1791#issuecomment-53465719
[QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19221/consoleFull) for PR 1791 at commit [`28fd368`](https://github.com/apache/spark/commit/28fd3682b434b8a19ea7cb731252ea9c613c968b).
* This patch merges cleanly.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2871] [PySpark] Add missing API
Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:
https://github.com/apache/spark/pull/1791#discussion_r16267032
--- Diff: python/pyspark/rdd.py ---
@@ -1685,11 +1813,69 @@ def zip(self, other):
>>> x.zip(y).collect()
[(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)]
"""
+ if self.getNumPartitions() != other.getNumPartitions():
+ raise ValueError("the number of partitions dose not match"
+ " with each other")
+
pairRDD = self._jrdd.zip(other._jrdd)
deserializer = PairDeserializer(self._jrdd_deserializer,
other._jrdd_deserializer)
return RDD(pairRDD, self.ctx, deserializer)
+ # TODO
+ # def zipPartitions(self, other, f, preservesPartitioning=False):
+ # """
+ # Zip this RDD's partitions with one (or more) RDD(s) and return a
+ # new RDD by applying a function to the zipped partitions.
+ # """
+
+ def zipWithIndex(self):
+ """
+ Zips this RDD with its element indices.
+
+ The ordering is first based on the partition index and then the
+ ordering of items within each partition. So the first item in
+ the first partition gets index 0, and the last item in the last
+ partition receives the largest index.
+
+ This method needs to trigger a spark job when this RDD contains
+ more than one partitions.
+
+ >>> sc.parallelize(range(4), 2).zipWithIndex().collect()
+ [(0, 0), (1, 1), (2, 2), (3, 3)]
+ """
+ starts = [0]
+ if self.getNumPartitions() > 1:
+ nums = self.glom().map(lambda it: sum(1 for i in it)).collect()
--- End diff --
glom() actually puts all the elements in a list, so it can get out of memory errors and such. Instead, use mapPartitions to count the values, similar to RDD.count().
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2871] [PySpark] Add missing API
Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:
https://github.com/apache/spark/pull/1791#discussion_r15916433
--- Diff: python/pyspark/context.py ---
@@ -727,6 +738,13 @@ def sparkUser(self):
"""
return self._jsc.sc().sparkUser()
+ @property
+ def startTime(self):
+ """
+ Return the start time of context in millis seconds
--- End diff --
Change it to uptime will not improve anything, or remove it?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2871] [PySpark] Add missing API
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:
https://github.com/apache/spark/pull/1791#issuecomment-51299595
QA results for PR 1791:<br>- This patch PASSES unit tests.<br>- This patch merges cleanly<br>- This patch adds no public classes<br><br>For more information see test ouptut:<br>https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18001/consoleFull
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [PySpark] Add missing API
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:
https://github.com/apache/spark/pull/1791#issuecomment-51276760
QA tests have started for PR 1791. This patch merges cleanly. <br>View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17959/consoleFull
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2871] [PySpark] Add missing API
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:
https://github.com/apache/spark/pull/1791#issuecomment-51823944
QA tests have started for PR 1791. This patch merges cleanly. <br>View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18319/consoleFull
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2871] [PySpark] Add missing API
Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:
https://github.com/apache/spark/pull/1791#discussion_r16266558
--- Diff: python/pyspark/rdd.py ---
@@ -737,6 +754,19 @@ def _collect_iterator_through_file(self, iterator):
yield item
os.unlink(tempFile.name)
+ def collectPartitions(self, partitions):
--- End diff --
I agree with Josh, let's delete this for now. We can open a separate JIRA about making it public and maybe discuss there.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2871] [PySpark] Add missing API
Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:
https://github.com/apache/spark/pull/1791#issuecomment-53024348
@mateiz @JoshRosen some APIs has been splitted out as separated PRs: #2091, #2092, #2093, #2094, #2095
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2871] [PySpark] Add missing API
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:
https://github.com/apache/spark/pull/1791#issuecomment-51516873
QA tests have started for PR 1791. This patch merges cleanly. <br>View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18137/consoleFull
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2871] [PySpark] Add missing API
Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:
https://github.com/apache/spark/pull/1791#discussion_r15916708
--- Diff: python/pyspark/context.py ---
@@ -260,6 +260,17 @@ def defaultMinPartitions(self):
"""
return self._jsc.sc().defaultMinPartitions()
+ @property
+ def isLocal(self):
+ """
+ Whether the context run locally
+ """
+ return self._jsc.isLocal()
+
+ @property
+ def conf(self):
--- End diff --
This needs a docstring. Also, the Scala equivalent of this clones the SparkConf because it cannot be changed at runtime. We might want to do the same thing here (to guard against misuse); I'm not sure how clone() interacts with Py4J objects; do we need to implement a custom clone method for objects with Py4J objects as fields that calls those objects' JVM clone methods?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2871] [PySpark] Add missing API
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:
https://github.com/apache/spark/pull/1791#issuecomment-52129587
QA tests have started for PR 1791. This patch merges cleanly. <br>View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18498/consoleFull
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2871] [PySpark] Add missing API
Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:
https://github.com/apache/spark/pull/1791#issuecomment-53647004
Most of useful parts have been merged separately, so close this.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2871] [PySpark] Add missing API
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:
https://github.com/apache/spark/pull/1791#issuecomment-51401363
QA results for PR 1791:<br>- This patch FAILED unit tests.<br>- This patch merges cleanly<br>- This patch adds the following public classes (experimental):<br>class BoundedFloat(float):<br><br>For more information see test ouptut:<br>https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18055/consoleFull
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2871] [PySpark] Add missing API
Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:
https://github.com/apache/spark/pull/1791#discussion_r15916764
--- Diff: python/pyspark/rdd.py ---
@@ -811,23 +841,39 @@ def func(iterator):
return self.mapPartitions(func).fold(zeroValue, combOp)
- def max(self):
+ def max(self, comp=None):
"""
Find the maximum item in this RDD.
- >>> sc.parallelize([1.0, 5.0, 43.0, 10.0]).max()
+ >>> rdd = sc.parallelize([1.0, 5.0, 43.0, 10.0])
+ >>> rdd.max()
43.0
+ >>> rdd.max(lambda a, b: cmp(str(a), str(b)))
+ 5.0
"""
- return self.reduce(max)
+ if comp is not None:
+ func = lambda a, b: a if comp(a, b) >= 0 else b
+ else:
+ func = max
+
+ return self.reduce(func)
- def min(self):
+ def min(self, comp=None):
"""
Find the minimum item in this RDD.
- >>> sc.parallelize([1.0, 5.0, 43.0, 10.0]).min()
+ >>> rdd = sc.parallelize([1.0, 5.0, 43.0, 10.0])
+ >>> rdd.min()
+ 1.0
+ >>> rdd.min(lambda a, b: cmp(str(a), str(b)))
--- End diff --
In `max`, using this different comparator returns a different result. We might want to pick an example here so that the two comparator settings yield different results.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2871] [PySpark] Add missing API
Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:
https://github.com/apache/spark/pull/1791#discussion_r15916792
--- Diff: python/pyspark/rdd.py ---
@@ -1684,11 +1812,57 @@ def zip(self, other):
>>> x.zip(y).collect()
[(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)]
"""
+ if self.getNumPartitions() != other.getNumPartitions():
+ raise ValueError("the number of partitions dose not match"
+ " with each other")
+
pairRDD = self._jrdd.zip(other._jrdd)
deserializer = PairDeserializer(self._jrdd_deserializer,
other._jrdd_deserializer)
return RDD(pairRDD, self.ctx, deserializer)
+ def zipPartitions(self, other, f, preservesPartitioning=False):
+ """
+ Zip this RDD's partitions with one (or more) RDD(s) and return a
+ new RDD by applying a function to the zipped partitions.
+
+ Not implemented.
+ """
+ raise NotImplementedError
+
+ def zipWithIndex(self):
+ """
+ Zips this RDD with its element indices.
--- End diff --
The Scala documentation is much more descriptive about what this method does:
```scala
/**
* Zips this RDD with its element indices. The ordering is first based on the partition index
* and then the ordering of items within each partition. So the first item in the first
* partition gets index 0, and the last item in the last partition receives the largest index.
* This is similar to Scala's zipWithIndex but it uses Long instead of Int as the index type.
* This method needs to trigger a spark job when this RDD contains more than one partitions.
*/
def zipWithIndex(): RDD[(T, Long)] = new ZippedWithIndexRDD(this)
```
The Python documentation should explain these subtleties, too.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2871] [PySpark] Add missing API
Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:
https://github.com/apache/spark/pull/1791#issuecomment-51405218
The difference is that whether those unimplemented API should in the API docs, I think we should have an complete set of API in Java or Python, and user can easily know that what they could get.
If we just commenting them in the code, then user will compare Java API and Python API to find out what are missing, this was that I had done.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2871] [PySpark] Add missing API
Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:
https://github.com/apache/spark/pull/1791#issuecomment-51297756
The histogram() had been implemented in pure Python, it will support integer better, also it will support RDD of strings and other comparable objects.
This was inspired by #1783 et, and much improved.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2871] [PySpark] Add missing API
Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:
https://github.com/apache/spark/pull/1791#discussion_r16266596
--- Diff: python/pyspark/rdd.py ---
@@ -737,6 +754,19 @@ def _collect_iterator_through_file(self, iterator):
yield item
os.unlink(tempFile.name)
+ def collectPartitions(self, partitions):
--- End diff --
BTW I do like a slice-based API in general, that might be what we propose publicly.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2871] [PySpark] Add missing API
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:
https://github.com/apache/spark/pull/1791#issuecomment-51372924
QA tests have started for PR 1791. This patch merges cleanly. <br>View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18032/consoleFull
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2871] [PySpark] Add missing API
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:
https://github.com/apache/spark/pull/1791#issuecomment-53474577
[QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19224/consoleFull) for PR 1791 at commit [`1ac98d6`](https://github.com/apache/spark/commit/1ac98d63642c625b80d41305c278044bdd940ccb).
* This patch **fails** unit tests.
* This patch merges cleanly.
* This patch adds no public classes.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2871] [PySpark] Add missing API
Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:
https://github.com/apache/spark/pull/1791#discussion_r15906724
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -1004,7 +1004,7 @@ abstract class RDD[T: ClassTag](
},
(h1: HyperLogLogPlus, h2: HyperLogLogPlus) => {
h1.addAll(h2)
- h2
+ h1
--- End diff --
Nice catch.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2871] [PySpark] Add missing API
Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on the pull request:
https://github.com/apache/spark/pull/1791#issuecomment-51731023
I also actually prefer leaving out the non-implemented ones instead of putting them in with NotImplementedError. Especially when working in an IDE or something similar, the user might try to call one of those, and get confused when it crashes. Also, a lot of them are quite esoteric.
We can develop some other ways to track the missing APIs. For example, for these ones, you can create JIRA issues such as "implement ZipPartitions in Python", and we can do that for new APIs added to Scala / Java (we usually ask people to add them in Python now anyway). A lot of these particular ones are also pretty esoteric and I don't think people will miss them.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2871] [PySpark] Add missing API
Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:
https://github.com/apache/spark/pull/1791#discussion_r15907204
--- Diff: python/pyspark/context.py ---
@@ -727,6 +738,13 @@ def sparkUser(self):
"""
return self._jsc.sc().sparkUser()
+ @property
+ def startTime(self):
+ """
+ Return the start time of context in millis seconds
--- End diff --
I saw it in Java API docs,so add it here.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2871] [PySpark] Add missing API
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:
https://github.com/apache/spark/pull/1791#issuecomment-51402995
QA tests have started for PR 1791. This patch merges cleanly. <br>View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18059/consoleFull
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2871] [PySpark] Add missing API
Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:
https://github.com/apache/spark/pull/1791#discussion_r16209359
--- Diff: python/pyspark/rdd.py ---
@@ -1755,6 +1941,114 @@ def _defaultReducePartitions(self):
# on the key; we need to compare the hash of the key to the hash of the
# keys in the pairs. This could be an expensive operation, since those
# hashes aren't retained.
+ # def lookup(self, key):
+ # """
+ # Return the list of values in the RDD for key key.
+ # """
+
+ def _is_pickled(self):
+ """ Return this RDD is serialized by Pickle or not. """
+ der = self._jrdd_deserializer
+ if isinstance(der, PickleSerializer):
+ return True
+ if isinstance(der, BatchedSerializer) and isinstance(der.serializer, PickleSerializer):
+ return True
+ return False
+
+ def _to_jrdd(self):
+ """ Return an JavaRDD """
+ if not self._is_pickled():
+ self = self._reserialize(BatchedSerializer(PickleSerializer(), 1024))
+ batched = isinstance(self._jrdd_deserializer, BatchedSerializer)
+ return self.ctx._jvm.PythonRDD.pythonToJava(self._jrdd, batched)
+
+ def countApproxDistinct(self, relativeSD=0.05):
+ """
+ :: Experimental ::
+ Return approximate number of distinct elements in the RDD.
+
+ The algorithm used is based on streamlib's implementation of
+ "HyperLogLog in Practice: Algorithmic Engineering of a State
+ of The Art Cardinality Estimation Algorithm", available
+ <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
+
+ :param: relativeSD Relative accuracy. Smaller values create
+ counters that require more space.
+ It must be greater than 0.000017.
+
+ >>> n = sc.parallelize(range(1000)).map(str).countApproxDistinct()
+ >>> 950 < n < 1050
+ True
+ """
+ return self._to_jrdd().countApproxDistinct(relativeSD)
--- End diff --
Does this really work? The Java version of this RDD will be batched, and it will contain byte arrays that may not be comparable directly with the algorithm in countApproxDistinct. Please add some tests for this.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2871] [PySpark] Add missing API
Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on the pull request:
https://github.com/apache/spark/pull/1791#issuecomment-52246974
@davies I looked over all of this now and made some comments, but you should have Josh check too. Just to be clear though, I don't think this can make it into 1.1, so we can hold off on it for a while while we fix issues for 1.1. But these are great APIs to have.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2871] [PySpark] Add missing API
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:
https://github.com/apache/spark/pull/1791#issuecomment-53467694
[QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19224/consoleFull) for PR 1791 at commit [`1ac98d6`](https://github.com/apache/spark/commit/1ac98d63642c625b80d41305c278044bdd940ccb).
* This patch merges cleanly.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2871] [PySpark] Add missing API
Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:
https://github.com/apache/spark/pull/1791#issuecomment-52258026
The description had been updated to list all the added APIs.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2871] [PySpark] Add missing API
Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the pull request:
https://github.com/apache/spark/pull/1791#issuecomment-51404681
If we're going to add placeholders for unimplemented methods, what about just commenting out that code instead of throwing NotImplementedError? That might be less confusing to users.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2871] [PySpark] Add missing API
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:
https://github.com/apache/spark/pull/1791#issuecomment-51830890
QA results for PR 1791:<br>- This patch PASSES unit tests.<br>- This patch merges cleanly<br>- This patch adds the following public classes (experimental):<br>class BoundedFloat(float):<br><br>For more information see test ouptut:<br>https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18319/consoleFull
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2871] [PySpark] Add missing API
Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:
https://github.com/apache/spark/pull/1791#issuecomment-52121066
If this PR is too huge to be merged, I can split it, then merge some good parts of it.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2871] [PySpark] Add missing API
Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:
https://github.com/apache/spark/pull/1791#discussion_r15916811
--- Diff: python/pyspark/rdd.py ---
@@ -1684,11 +1812,57 @@ def zip(self, other):
>>> x.zip(y).collect()
[(0, 1000), (1, 1001), (2, 1002), (3, 1003), (4, 1004)]
"""
+ if self.getNumPartitions() != other.getNumPartitions():
+ raise ValueError("the number of partitions dose not match"
+ " with each other")
+
pairRDD = self._jrdd.zip(other._jrdd)
deserializer = PairDeserializer(self._jrdd_deserializer,
other._jrdd_deserializer)
return RDD(pairRDD, self.ctx, deserializer)
+ def zipPartitions(self, other, f, preservesPartitioning=False):
+ """
+ Zip this RDD's partitions with one (or more) RDD(s) and return a
+ new RDD by applying a function to the zipped partitions.
+
+ Not implemented.
+ """
+ raise NotImplementedError
+
+ def zipWithIndex(self):
+ """
+ Zips this RDD with its element indices.
+
+ >>> sc.parallelize(range(4), 2).zipWithIndex().collect()
+ [(0, 0), (1, 1), (2, 2), (3, 3)]
+ """
+ nums = self.glom().map(lambda it: sum(1 for i in it)).collect()
+ starts = [0]
+ for i in range(len(nums) - 1):
+ starts.append(starts[-1] + nums[i])
+
+ def func(k, it):
+ for i, v in enumerate(it):
+ yield starts[k] + i, v
+
+ return self.mapPartitionsWithIndex(func)
+
+ def zipWithUniqueId(self):
+ """
+ Zips this RDD with generated unique Long ids.
--- End diff --
Same case here: this should be similarly descriptive to the Scala docs:
```scala
/**
* Zips this RDD with generated unique Long ids. Items in the kth partition will get ids k, n+k,
* 2*n+k, ..., where n is the number of partitions. So there may exist gaps, but this method
* won't trigger a spark job, which is different from [[org.apache.spark.rdd.RDD#zipWithIndex]].
*/
def zipWithUniqueId(): RDD[(T, Long)] = {
```
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [WIP] [PySpark] Add missing API
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:
https://github.com/apache/spark/pull/1791#issuecomment-51274978
QA results for PR 1791:<br>- This patch PASSES unit tests.<br>- This patch merges cleanly<br>- This patch adds no public classes<br><br>For more information see test ouptut:<br>https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17953/consoleFull
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2871] [PySpark] Add missing API
Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:
https://github.com/apache/spark/pull/1791#issuecomment-51402751
@JoshRosen @mateiz Could you take a look at this? I hope that this can be in 1.1.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2871] [PySpark] Add missing API
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:
https://github.com/apache/spark/pull/1791#issuecomment-51402438
QA results for PR 1791:<br>- This patch FAILED unit tests.<br>- This patch merges cleanly<br>- This patch adds the following public classes (experimental):<br>class BoundedFloat(float):<br><br>For more information see test ouptut:<br>https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18058/consoleFull
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2871] [PySpark] Add missing API
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:
https://github.com/apache/spark/pull/1791#issuecomment-51379740
QA results for PR 1791:<br>- This patch PASSES unit tests.<br>- This patch merges cleanly<br>- This patch adds no public classes<br><br>For more information see test ouptut:<br>https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18032/consoleFull
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2871] [PySpark] Add missing API
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:
https://github.com/apache/spark/pull/1791#issuecomment-51402338
QA tests have started for PR 1791. This patch merges cleanly. <br>View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18058/consoleFull
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2871] [PySpark] Add missing API
Posted by davies <gi...@git.apache.org>.
Github user davies closed the pull request at:
https://github.com/apache/spark/pull/1791
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2871] [PySpark] Add missing API
Posted by davies <gi...@git.apache.org>.
Github user davies closed the pull request at:
https://github.com/apache/spark/pull/1791
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2871] [PySpark] Add missing API
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:
https://github.com/apache/spark/pull/1791#issuecomment-53473137
[QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/19229/consoleFull) for PR 1791 at commit [`657a09b`](https://github.com/apache/spark/commit/657a09bf14ae70639190f832d8b25dde6ee37c01).
* This patch merges cleanly.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2871] [PySpark] Add missing API
Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:
https://github.com/apache/spark/pull/1791#issuecomment-52129398
Jenkins, test this please
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2871] [PySpark] Add missing API
Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:
https://github.com/apache/spark/pull/1791#discussion_r16210900
--- Diff: python/pyspark/rdd.py ---
@@ -1755,6 +1941,114 @@ def _defaultReducePartitions(self):
# on the key; we need to compare the hash of the key to the hash of the
# keys in the pairs. This could be an expensive operation, since those
# hashes aren't retained.
+ # def lookup(self, key):
+ # """
+ # Return the list of values in the RDD for key key.
+ # """
+
+ def _is_pickled(self):
+ """ Return this RDD is serialized by Pickle or not. """
+ der = self._jrdd_deserializer
+ if isinstance(der, PickleSerializer):
+ return True
+ if isinstance(der, BatchedSerializer) and isinstance(der.serializer, PickleSerializer):
+ return True
+ return False
+
+ def _to_jrdd(self):
+ """ Return an JavaRDD """
+ if not self._is_pickled():
+ self = self._reserialize(BatchedSerializer(PickleSerializer(), 1024))
+ batched = isinstance(self._jrdd_deserializer, BatchedSerializer)
+ return self.ctx._jvm.PythonRDD.pythonToJava(self._jrdd, batched)
+
+ def countApproxDistinct(self, relativeSD=0.05):
+ """
+ :: Experimental ::
+ Return approximate number of distinct elements in the RDD.
+
+ The algorithm used is based on streamlib's implementation of
+ "HyperLogLog in Practice: Algorithmic Engineering of a State
+ of The Art Cardinality Estimation Algorithm", available
+ <a href="http://dx.doi.org/10.1145/2452376.2452456">here</a>.
+
+ :param: relativeSD Relative accuracy. Smaller values create
+ counters that require more space.
+ It must be greater than 0.000017.
+
+ >>> n = sc.parallelize(range(1000)).map(str).countApproxDistinct()
+ >>> 950 < n < 1050
+ True
+ """
+ return self._to_jrdd().countApproxDistinct(relativeSD)
--- End diff --
No, the JavaRDD is an RDD[Object], by unpickle the Python objects. so it support most of the builtin types. (those supported by Pyrolite)
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2871] [PySpark] Add missing API
Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:
https://github.com/apache/spark/pull/1791#discussion_r16072784
--- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala ---
@@ -741,6 +741,23 @@ private[spark] object PythonRDD extends Logging {
}
}
}
+
+ /**
+ * Convert an RDD of serialized Python objects to RDD of objects, that is usable by PySpark.
+ */
+ def pythonToJava(pyRDD: JavaRDD[Array[Byte]], batched: Boolean): JavaRDD[Any] = {
--- End diff --
Good point. Then we can move all the pickle stuff into SerDeUtil in another PR.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2871] [PySpark] Add missing API
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:
https://github.com/apache/spark/pull/1791#issuecomment-52125485
QA results for PR 1791:<br>- This patch FAILED unit tests.<br><br>For more information see test ouptut:<br>https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18488/consoleFull
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2871] [PySpark] Add missing API
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:
https://github.com/apache/spark/pull/1791#issuecomment-51522969
QA results for PR 1791:<br>- This patch PASSES unit tests.<br>- This patch merges cleanly<br>- This patch adds the following public classes (experimental):<br>class BoundedFloat(float):<br><br>For more information see test ouptut:<br>https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18137/consoleFull
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2871] [PySpark] Add missing API
Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on a diff in the pull request:
https://github.com/apache/spark/pull/1791#discussion_r15907322
--- Diff: python/pyspark/context.py ---
@@ -727,6 +738,13 @@ def sparkUser(self):
"""
return self._jsc.sc().sparkUser()
+ @property
+ def startTime(self):
+ """
+ Return the start time of context in millis seconds
--- End diff --
The primary use of this, outside of SparkContext, seems to be printing the context's uptime. So, why not add an `uptime` method instead?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org