You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by me...@apache.org on 2014/11/21 01:40:29 UTC

spark git commit: [SPARK-4477] [PySpark] remove numpy from RDDSampler

Repository: spark
Updated Branches:
  refs/heads/master ad5f1f3ca -> d39f2e9c6


[SPARK-4477] [PySpark] remove numpy from RDDSampler

In RDDSampler, it try use numpy to gain better performance for possion(), but the number of call of random() is only (1+faction) * N in the pure python implementation of possion(), so there is no much performance gain from numpy.

numpy is not a dependent of pyspark, so it maybe introduce some problem, such as there is no numpy installed in slaves, but only installed master, as reported in SPARK-927.

It also complicate the code a lot, so we may should remove numpy from RDDSampler.

I also did some benchmark to verify that:
```
>>> from pyspark.mllib.random import RandomRDDs
>>> rdd = RandomRDDs.uniformRDD(sc, 1 << 20, 1).cache()
>>> rdd.count()  # cache it
>>> rdd.sample(True, 0.9).count()    # measure this line
```
the results:

|withReplacement      |  random  | numpy.random |
 ------- | ------------ |  -------
|True | 1.5 s|  1.4 s|
|False|  0.6 s | 0.8 s|

closes #2313

Note: this patch including some commits that not mirrored to github, it will be OK after it catches up.

Author: Davies Liu <da...@databricks.com>
Author: Xiangrui Meng <me...@databricks.com>

Closes #3351 from davies/numpy and squashes the following commits:

5c438d7 [Davies Liu] fix comment
c5b9252 [Davies Liu] Merge pull request #1 from mengxr/SPARK-4477
98eb31b [Xiangrui Meng] make poisson sampling slightly faster
ee17d78 [Davies Liu] remove = for float
13f7b05 [Davies Liu] Merge branch 'master' of http://git-wip-us.apache.org/repos/asf/spark into numpy
f583023 [Davies Liu] fix tests
51649f5 [Davies Liu] remove numpy in RDDSampler
78bf997 [Davies Liu] fix tests, do not use numpy in randomSplit, no performance gain
f5fdf63 [Davies Liu] fix bug with int in weights
4dfa2cd [Davies Liu] refactor
f866bcf [Davies Liu] remove unneeded change
c7a2007 [Davies Liu] switch to python implementation
95a48ac [Davies Liu] Merge branch 'master' of github.com:apache/spark into randomSplit
0d9b256 [Davies Liu] refactor
1715ee3 [Davies Liu] address comments
41fce54 [Davies Liu] randomSplit()


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d39f2e9c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d39f2e9c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d39f2e9c

Branch: refs/heads/master
Commit: d39f2e9c683a4ab78b29eb3c5668325bf8568e8c
Parents: ad5f1f3
Author: Davies Liu <da...@databricks.com>
Authored: Thu Nov 20 16:40:25 2014 -0800
Committer: Xiangrui Meng <me...@databricks.com>
Committed: Thu Nov 20 16:40:25 2014 -0800

----------------------------------------------------------------------
 python/pyspark/rdd.py        | 10 ++--
 python/pyspark/rddsampler.py | 99 ++++++++++++++-------------------------
 2 files changed, 40 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/d39f2e9c/python/pyspark/rdd.py
----------------------------------------------------------------------
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 50535d2..5775477 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -310,8 +310,11 @@ class RDD(object):
 
     def sample(self, withReplacement, fraction, seed=None):
         """
-        Return a sampled subset of this RDD (relies on numpy and falls back
-        on default random generator if numpy is unavailable).
+        Return a sampled subset of this RDD.
+
+        >>> rdd = sc.parallelize(range(100), 4)
+        >>> rdd.sample(False, 0.1, 81).count()
+        10
         """
         assert fraction >= 0.0, "Negative fraction value: %s" % fraction
         return self.mapPartitionsWithIndex(RDDSampler(withReplacement, fraction, seed).func, True)
@@ -343,8 +346,7 @@ class RDD(object):
     # this is ported from scala/spark/RDD.scala
     def takeSample(self, withReplacement, num, seed=None):
         """
-        Return a fixed-size sampled subset of this RDD (currently requires
-        numpy).
+        Return a fixed-size sampled subset of this RDD.
 
         >>> rdd = sc.parallelize(range(0, 10))
         >>> len(rdd.takeSample(True, 20, 1))

http://git-wip-us.apache.org/repos/asf/spark/blob/d39f2e9c/python/pyspark/rddsampler.py
----------------------------------------------------------------------
diff --git a/python/pyspark/rddsampler.py b/python/pyspark/rddsampler.py
index 558dcfd..459e142 100644
--- a/python/pyspark/rddsampler.py
+++ b/python/pyspark/rddsampler.py
@@ -17,81 +17,48 @@
 
 import sys
 import random
+import math
 
 
 class RDDSamplerBase(object):
 
     def __init__(self, withReplacement, seed=None):
-        try:
-            import numpy
-            self._use_numpy = True
-        except ImportError:
-            print >> sys.stderr, (
-                "NumPy does not appear to be installed. "
-                "Falling back to default random generator for sampling.")
-            self._use_numpy = False
-
-        self._seed = seed if seed is not None else random.randint(0, 2 ** 32 - 1)
+        self._seed = seed if seed is not None else random.randint(0, sys.maxint)
         self._withReplacement = withReplacement
         self._random = None
-        self._split = None
-        self._rand_initialized = False
 
     def initRandomGenerator(self, split):
-        if self._use_numpy:
-            import numpy
-            self._random = numpy.random.RandomState(self._seed ^ split)
-        else:
-            self._random = random.Random(self._seed ^ split)
+        self._random = random.Random(self._seed ^ split)
 
         # mixing because the initial seeds are close to each other
         for _ in xrange(10):
             self._random.randint(0, 1)
 
-        self._split = split
-        self._rand_initialized = True
-
-    def getUniformSample(self, split):
-        if not self._rand_initialized or split != self._split:
-            self.initRandomGenerator(split)
-
-        if self._use_numpy:
-            return self._random.random_sample()
+    def getUniformSample(self):
+        return self._random.random()
+
+    def getPoissonSample(self, mean):
+        # Using Knuth's algorithm described in
+        # http://en.wikipedia.org/wiki/Poisson_distribution
+        if mean < 20.0:
+            # one exp and k+1 random calls
+            l = math.exp(-mean)
+            p = self._random.random()
+            k = 0
+            while p > l:
+                k += 1
+                p *= self._random.random()
         else:
-            return self._random.uniform(0.0, 1.0)
-
-    def getPoissonSample(self, split, mean):
-        if not self._rand_initialized or split != self._split:
-            self.initRandomGenerator(split)
-
-        if self._use_numpy:
-            return self._random.poisson(mean)
-        else:
-            # here we simulate drawing numbers n_i ~ Poisson(lambda = 1/mean) by
-            # drawing a sequence of numbers delta_j ~ Exp(mean)
-            num_arrivals = 1
-            cur_time = 0.0
-
-            cur_time += self._random.expovariate(mean)
+            # switch to the log domain, k+1 expovariate (random + log) calls
+            p = self._random.expovariate(mean)
+            k = 0
+            while p < 1.0:
+                k += 1
+                p += self._random.expovariate(mean)
+        return k
 
-            if cur_time > 1.0:
-                return 0
-
-            while(cur_time <= 1.0):
-                cur_time += self._random.expovariate(mean)
-                num_arrivals += 1
-
-            return (num_arrivals - 1)
-
-    def shuffle(self, vals):
-        if self._random is None:
-            self.initRandomGenerator(0)  # this should only ever called on the master so
-            # the split does not matter
-
-        if self._use_numpy:
-            self._random.shuffle(vals)
-        else:
-            self._random.shuffle(vals, self._random.random)
+    def func(self, split, iterator):
+        raise NotImplementedError
 
 
 class RDDSampler(RDDSamplerBase):
@@ -101,17 +68,18 @@ class RDDSampler(RDDSamplerBase):
         self._fraction = fraction
 
     def func(self, split, iterator):
+        self.initRandomGenerator(split)
         if self._withReplacement:
             for obj in iterator:
                 # For large datasets, the expected number of occurrences of each element in
                 # a sample with replacement is Poisson(frac). We use that to get a count for
                 # each element.
-                count = self.getPoissonSample(split, mean=self._fraction)
+                count = self.getPoissonSample(self._fraction)
                 for _ in range(0, count):
                     yield obj
         else:
             for obj in iterator:
-                if self.getUniformSample(split) <= self._fraction:
+                if self.getUniformSample() < self._fraction:
                     yield obj
 
 
@@ -119,13 +87,13 @@ class RDDRangeSampler(RDDSamplerBase):
 
     def __init__(self, lowerBound, upperBound, seed=None):
         RDDSamplerBase.__init__(self, False, seed)
-        self._use_numpy = False  # no performance gain from numpy
         self._lowerBound = lowerBound
         self._upperBound = upperBound
 
     def func(self, split, iterator):
+        self.initRandomGenerator(split)
         for obj in iterator:
-            if self._lowerBound <= self.getUniformSample(split) < self._upperBound:
+            if self._lowerBound <= self.getUniformSample() < self._upperBound:
                 yield obj
 
 
@@ -136,15 +104,16 @@ class RDDStratifiedSampler(RDDSamplerBase):
         self._fractions = fractions
 
     def func(self, split, iterator):
+        self.initRandomGenerator(split)
         if self._withReplacement:
             for key, val in iterator:
                 # For large datasets, the expected number of occurrences of each element in
                 # a sample with replacement is Poisson(frac). We use that to get a count for
                 # each element.
-                count = self.getPoissonSample(split, mean=self._fractions[key])
+                count = self.getPoissonSample(self._fractions[key])
                 for _ in range(0, count):
                     yield key, val
         else:
             for key, val in iterator:
-                if self.getUniformSample(split) <= self._fractions[key]:
+                if self.getUniformSample() < self._fractions[key]:
                     yield key, val


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org