You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2014/07/14 09:43:06 UTC

git commit: Made rdd.py pep8 complaint by using Autopep8 and a little manual editing.

Repository: spark
Updated Branches:
  refs/heads/master 635888cbe -> aab534966


Made rdd.py pep8 complaint by using Autopep8 and a little manual editing.

Author: Prashant Sharma <pr...@imaginea.com>

Closes #1354 from ScrapCodes/pep8-comp-1 and squashes the following commits:

9858ea8 [Prashant Sharma] Code Review
d8851b7 [Prashant Sharma] Found # noqa works even inside comment blocks. Not sure if it works with all versions of python.
10c0cef [Prashant Sharma] Made rdd.py pep8 complaint by using Autopep8 and a little manual tweaking.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/aab53496
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/aab53496
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/aab53496

Branch: refs/heads/master
Commit: aab5349660109481ee944721d611771da5a93109
Parents: 635888c
Author: Prashant Sharma <pr...@imaginea.com>
Authored: Mon Jul 14 00:42:59 2014 -0700
Committer: Reynold Xin <rx...@apache.org>
Committed: Mon Jul 14 00:42:59 2014 -0700

----------------------------------------------------------------------
 python/pyspark/rdd.py | 150 +++++++++++++++++++++++++++------------------
 1 file changed, 92 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/aab53496/python/pyspark/rdd.py
----------------------------------------------------------------------
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index f64f48e..0c35c66 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -69,16 +69,19 @@ def _extract_concise_traceback():
         file, line, fun, what = tb[0]
         return callsite(function=fun, file=file, linenum=line)
     sfile, sline, sfun, swhat = tb[first_spark_frame]
-    ufile, uline, ufun, uwhat = tb[first_spark_frame-1]
+    ufile, uline, ufun, uwhat = tb[first_spark_frame - 1]
     return callsite(function=sfun, file=ufile, linenum=uline)
 
 _spark_stack_depth = 0
 
+
 class _JavaStackTrace(object):
+
     def __init__(self, sc):
         tb = _extract_concise_traceback()
         if tb is not None:
-            self._traceback = "%s at %s:%s" % (tb.function, tb.file, tb.linenum)
+            self._traceback = "%s at %s:%s" % (
+                tb.function, tb.file, tb.linenum)
         else:
             self._traceback = "Error! Could not extract traceback info"
         self._context = sc
@@ -95,7 +98,9 @@ class _JavaStackTrace(object):
         if _spark_stack_depth == 0:
             self._context._jsc.setCallSite(None)
 
+
 class MaxHeapQ(object):
+
     """
     An implementation of MaxHeap.
     >>> import pyspark.rdd
@@ -117,14 +122,14 @@ class MaxHeapQ(object):
     """
 
     def __init__(self, maxsize):
-        # we start from q[1], this makes calculating children as trivial as 2 * k
+        # We start from q[1], so its children are always  2 * k
         self.q = [0]
         self.maxsize = maxsize
 
     def _swim(self, k):
-        while (k > 1) and (self.q[k/2] < self.q[k]):
-            self._swap(k, k/2)
-            k = k/2
+        while (k > 1) and (self.q[k / 2] < self.q[k]):
+            self._swap(k, k / 2)
+            k = k / 2
 
     def _swap(self, i, j):
         t = self.q[i]
@@ -162,7 +167,9 @@ class MaxHeapQ(object):
             self.q[1] = value
             self._sink(1)
 
+
 class RDD(object):
+
     """
     A Resilient Distributed Dataset (RDD), the basic abstraction in Spark.
     Represents an immutable, partitioned collection of elements that can be
@@ -257,7 +264,8 @@ class RDD(object):
         >>> sorted(rdd.map(lambda x: (x, 1)).collect())
         [('a', 1), ('b', 1), ('c', 1)]
         """
-        def func(split, iterator): return imap(f, iterator)
+        def func(split, iterator):
+            return imap(f, iterator)
         return PipelinedRDD(self, func, preservesPartitioning)
 
     def flatMap(self, f, preservesPartitioning=False):
@@ -271,7 +279,8 @@ class RDD(object):
         >>> sorted(rdd.flatMap(lambda x: [(x, x), (x, x)]).collect())
         [(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)]
         """
-        def func(s, iterator): return chain.from_iterable(imap(f, iterator))
+        def func(s, iterator):
+            return chain.from_iterable(imap(f, iterator))
         return self.mapPartitionsWithIndex(func, preservesPartitioning)
 
     def mapPartitions(self, f, preservesPartitioning=False):
@@ -283,7 +292,8 @@ class RDD(object):
         >>> rdd.mapPartitions(f).collect()
         [3, 7]
         """
-        def func(s, iterator): return f(iterator)
+        def func(s, iterator):
+            return f(iterator)
         return self.mapPartitionsWithIndex(func)
 
     def mapPartitionsWithIndex(self, f, preservesPartitioning=False):
@@ -311,17 +321,17 @@ class RDD(object):
         6
         """
         warnings.warn("mapPartitionsWithSplit is deprecated; "
-            "use mapPartitionsWithIndex instead", DeprecationWarning, stacklevel=2)
+                      "use mapPartitionsWithIndex instead", DeprecationWarning, stacklevel=2)
         return self.mapPartitionsWithIndex(f, preservesPartitioning)
 
     def getNumPartitions(self):
-      """
-      Returns the number of partitions in RDD
-      >>> rdd = sc.parallelize([1, 2, 3, 4], 2)
-      >>> rdd.getNumPartitions()
-      2
-      """
-      return self._jrdd.partitions().size()
+        """
+        Returns the number of partitions in RDD
+        >>> rdd = sc.parallelize([1, 2, 3, 4], 2)
+        >>> rdd.getNumPartitions()
+        2
+        """
+        return self._jrdd.partitions().size()
 
     def filter(self, f):
         """
@@ -331,7 +341,8 @@ class RDD(object):
         >>> rdd.filter(lambda x: x % 2 == 0).collect()
         [2, 4]
         """
-        def func(iterator): return ifilter(f, iterator)
+        def func(iterator):
+            return ifilter(f, iterator)
         return self.mapPartitions(func)
 
     def distinct(self):
@@ -391,9 +402,11 @@ class RDD(object):
 
         maxSampleSize = sys.maxint - int(numStDev * sqrt(sys.maxint))
         if num > maxSampleSize:
-            raise ValueError("Sample size cannot be greater than %d." % maxSampleSize)
+            raise ValueError(
+                "Sample size cannot be greater than %d." % maxSampleSize)
 
-        fraction = RDD._computeFractionForSampleSize(num, initialCount, withReplacement)
+        fraction = RDD._computeFractionForSampleSize(
+            num, initialCount, withReplacement)
         samples = self.sample(withReplacement, fraction, seed).collect()
 
         # If the first sample didn't turn out large enough, keep trying to take samples;
@@ -499,17 +512,17 @@ class RDD(object):
             raise TypeError
         return self.union(other)
 
-    def sortByKey(self, ascending=True, numPartitions=None, keyfunc = lambda x: x):
+    def sortByKey(self, ascending=True, numPartitions=None, keyfunc=lambda x: x):
         """
         Sorts this RDD, which is assumed to consist of (key, value) pairs.
-
+        # noqa
         >>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
         >>> sc.parallelize(tmp).sortByKey(True, 2).collect()
         [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]
         >>> tmp2 = [('Mary', 1), ('had', 2), ('a', 3), ('little', 4), ('lamb', 5)]
         >>> tmp2.extend([('whose', 6), ('fleece', 7), ('was', 8), ('white', 9)])
         >>> sc.parallelize(tmp2).sortByKey(True, 3, keyfunc=lambda k: k.lower()).collect()
-        [('a', 3), ('fleece', 7), ('had', 2), ('lamb', 5), ('little', 4), ('Mary', 1), ('was', 8), ('white', 9), ('whose', 6)]
+        [('a', 3), ('fleece', 7), ('had', 2), ('lamb', 5),...('white', 9), ('whose', 6)]
         """
         if numPartitions is None:
             numPartitions = self._defaultReducePartitions()
@@ -521,10 +534,12 @@ class RDD(object):
         # number of (key, value) pairs falling into them
         if numPartitions > 1:
             rddSize = self.count()
-            maxSampleSize = numPartitions * 20.0 # constant from Spark's RangePartitioner
+            # constant from Spark's RangePartitioner
+            maxSampleSize = numPartitions * 20.0
             fraction = min(maxSampleSize / max(rddSize, 1), 1.0)
 
-            samples = self.sample(False, fraction, 1).map(lambda (k, v): k).collect()
+            samples = self.sample(False, fraction, 1).map(
+                lambda (k, v): k).collect()
             samples = sorted(samples, reverse=(not ascending), key=keyfunc)
 
             # we have numPartitions many parts but one of the them has
@@ -540,13 +555,13 @@ class RDD(object):
             if ascending:
                 return p
             else:
-                return numPartitions-1-p
+                return numPartitions - 1 - p
 
         def mapFunc(iterator):
             yield sorted(iterator, reverse=(not ascending), key=lambda (k, v): keyfunc(k))
 
         return (self.partitionBy(numPartitions, partitionFunc=rangePartitionFunc)
-                    .mapPartitions(mapFunc,preservesPartitioning=True)
+                    .mapPartitions(mapFunc, preservesPartitioning=True)
                     .flatMap(lambda x: x, preservesPartitioning=True))
 
     def sortBy(self, keyfunc, ascending=True, numPartitions=None):
@@ -570,7 +585,8 @@ class RDD(object):
         >>> sorted(rdd.glom().collect())
         [[1, 2], [3, 4]]
         """
-        def func(iterator): yield list(iterator)
+        def func(iterator):
+            yield list(iterator)
         return self.mapPartitions(func)
 
     def cartesian(self, other):
@@ -607,7 +623,9 @@ class RDD(object):
         ['1', '2', '', '3']
         """
         def func(iterator):
-            pipe = Popen(shlex.split(command), env=env, stdin=PIPE, stdout=PIPE)
+            pipe = Popen(
+                shlex.split(command), env=env, stdin=PIPE, stdout=PIPE)
+
             def pipe_objs(out):
                 for obj in iterator:
                     out.write(str(obj).rstrip('\n') + '\n')
@@ -646,7 +664,7 @@ class RDD(object):
         Return a list that contains all of the elements in this RDD.
         """
         with _JavaStackTrace(self.context) as st:
-          bytesInJava = self._jrdd.collect().iterator()
+            bytesInJava = self._jrdd.collect().iterator()
         return list(self._collect_iterator_through_file(bytesInJava))
 
     def _collect_iterator_through_file(self, iterator):
@@ -736,7 +754,6 @@ class RDD(object):
 
         return self.mapPartitions(func).fold(zeroValue, combOp)
 
-
     def max(self):
         """
         Find the maximum item in this RDD.
@@ -844,6 +861,7 @@ class RDD(object):
             for obj in iterator:
                 counts[obj] += 1
             yield counts
+
         def mergeMaps(m1, m2):
             for (k, v) in m2.iteritems():
                 m1[k] += v
@@ -888,22 +906,22 @@ class RDD(object):
         def topNKeyedElems(iterator, key_=None):
             q = MaxHeapQ(num)
             for k in iterator:
-                if key_ != None:
+                if key_ is not None:
                     k = (key_(k), k)
                 q.insert(k)
             yield q.getElements()
 
         def unKey(x, key_=None):
-            if key_ != None:
+            if key_ is not None:
                 x = [i[1] for i in x]
             return x
 
         def merge(a, b):
             return next(topNKeyedElems(a + b))
-        result = self.mapPartitions(lambda i: topNKeyedElems(i, key)).reduce(merge)
+        result = self.mapPartitions(
+            lambda i: topNKeyedElems(i, key)).reduce(merge)
         return sorted(unKey(result, key), key=key)
 
-
     def take(self, num):
         """
         Take the first num elements of the RDD.
@@ -947,7 +965,8 @@ class RDD(object):
                     yield next(iterator)
                     taken += 1
 
-            p = range(partsScanned, min(partsScanned + numPartsToTry, totalParts))
+            p = range(
+                partsScanned, min(partsScanned + numPartsToTry, totalParts))
             res = self.context.runJob(self, takeUpToNumLeft, p, True)
 
             items += res
@@ -977,7 +996,7 @@ class RDD(object):
         [1, 2, 'rdd', 'spark']
         """
         self._reserialize(BatchedSerializer(PickleSerializer(),
-                                batchSize))._jrdd.saveAsObjectFile(path)
+                                            batchSize))._jrdd.saveAsObjectFile(path)
 
     def saveAsTextFile(self, path):
         """
@@ -1075,6 +1094,7 @@ class RDD(object):
             for (k, v) in iterator:
                 m[k] = v if k not in m else func(m[k], v)
             yield m
+
         def mergeMaps(m1, m2):
             for (k, v) in m2.iteritems():
                 m1[k] = v if k not in m1 else func(m1[k], v)
@@ -1162,6 +1182,7 @@ class RDD(object):
         # form the hash buckets in Python, transferring O(numPartitions) objects
         # to Java.  Each object is a (splitNumber, [objects]) pair.
         outputSerializer = self.ctx._unbatched_serializer
+
         def add_shuffle_key(split, iterator):
 
             buckets = defaultdict(list)
@@ -1174,7 +1195,8 @@ class RDD(object):
         keyed = PipelinedRDD(self, add_shuffle_key)
         keyed._bypass_serializer = True
         with _JavaStackTrace(self.context) as st:
-            pairRDD = self.ctx._jvm.PairwiseRDD(keyed._jrdd.rdd()).asJavaPairRDD()
+            pairRDD = self.ctx._jvm.PairwiseRDD(
+                keyed._jrdd.rdd()).asJavaPairRDD()
             partitioner = self.ctx._jvm.PythonPartitioner(numPartitions,
                                                           id(partitionFunc))
         jrdd = pairRDD.partitionBy(partitioner).values()
@@ -1213,6 +1235,7 @@ class RDD(object):
         """
         if numPartitions is None:
             numPartitions = self._defaultReducePartitions()
+
         def combineLocally(iterator):
             combiners = {}
             for x in iterator:
@@ -1224,10 +1247,11 @@ class RDD(object):
             return combiners.iteritems()
         locally_combined = self.mapPartitions(combineLocally)
         shuffled = locally_combined.partitionBy(numPartitions)
+
         def _mergeCombiners(iterator):
             combiners = {}
             for (k, v) in iterator:
-                if not k in combiners:
+                if k not in combiners:
                     combiners[k] = v
                 else:
                     combiners[k] = mergeCombiners(combiners[k], v)
@@ -1236,17 +1260,19 @@ class RDD(object):
 
     def aggregateByKey(self, zeroValue, seqFunc, combFunc, numPartitions=None):
         """
-        Aggregate the values of each key, using given combine functions and a neutral "zero value".
-        This function can return a different result type, U, than the type of the values in this RDD,
-        V. Thus, we need one operation for merging a V into a U and one operation for merging two U's,
-        The former operation is used for merging values within a partition, and the latter is used
-        for merging values between partitions. To avoid memory allocation, both of these functions are
+        Aggregate the values of each key, using given combine functions and a neutral
+        "zero value". This function can return a different result type, U, than the type
+        of the values in this RDD, V. Thus, we need one operation for merging a V into
+        a U and one operation for merging two U's, The former operation is used for merging
+        values within a partition, and the latter is used for merging values between
+        partitions. To avoid memory allocation, both of these functions are
         allowed to modify and return their first argument instead of creating a new U.
         """
         def createZero():
-          return copy.deepcopy(zeroValue)
+            return copy.deepcopy(zeroValue)
 
-        return self.combineByKey(lambda v: seqFunc(createZero(), v), seqFunc, combFunc, numPartitions)
+        return self.combineByKey(
+            lambda v: seqFunc(createZero(), v), seqFunc, combFunc, numPartitions)
 
     def foldByKey(self, zeroValue, func, numPartitions=None):
         """
@@ -1261,11 +1287,10 @@ class RDD(object):
         [('a', 2), ('b', 1)]
         """
         def createZero():
-          return copy.deepcopy(zeroValue)
+            return copy.deepcopy(zeroValue)
 
         return self.combineByKey(lambda v: func(createZero(), v), func, func, numPartitions)
 
-
     # TODO: support variant with custom partitioner
     def groupByKey(self, numPartitions=None):
         """
@@ -1292,7 +1317,7 @@ class RDD(object):
             return a + b
 
         return self.combineByKey(createCombiner, mergeValue, mergeCombiners,
-                numPartitions).mapValues(lambda x: ResultIterable(x))
+                                 numPartitions).mapValues(lambda x: ResultIterable(x))
 
     # TODO: add tests
     def flatMapValues(self, f):
@@ -1362,7 +1387,8 @@ class RDD(object):
         >>> sorted(x.subtractByKey(y).collect())
         [('b', 4), ('b', 5)]
         """
-        filter_func = lambda (key, vals): len(vals[0]) > 0 and len(vals[1]) == 0
+        def filter_func((key, vals)):
+            return len(vals[0]) > 0 and len(vals[1]) == 0
         map_func = lambda (key, vals): [(key, val) for val in vals[0]]
         return self.cogroup(other, numPartitions).filter(filter_func).flatMap(map_func)
 
@@ -1375,8 +1401,9 @@ class RDD(object):
         >>> sorted(x.subtract(y).collect())
         [('a', 1), ('b', 4), ('b', 5)]
         """
-        rdd = other.map(lambda x: (x, True)) # note: here 'True' is just a placeholder
-        return self.map(lambda x: (x, True)).subtractByKey(rdd).map(lambda tpl: tpl[0]) # note: here 'True' is just a placeholder
+        # note: here 'True' is just a placeholder
+        rdd = other.map(lambda x: (x, True))
+        return self.map(lambda x: (x, True)).subtractByKey(rdd).map(lambda tpl: tpl[0])
 
     def keyBy(self, f):
         """
@@ -1434,7 +1461,7 @@ class RDD(object):
         """
         pairRDD = self._jrdd.zip(other._jrdd)
         deserializer = PairDeserializer(self._jrdd_deserializer,
-                                             other._jrdd_deserializer)
+                                        other._jrdd_deserializer)
         return RDD(pairRDD, self.ctx, deserializer)
 
     def name(self):
@@ -1503,7 +1530,9 @@ class RDD(object):
     # keys in the pairs.  This could be an expensive operation, since those
     # hashes aren't retained.
 
+
 class PipelinedRDD(RDD):
+
     """
     Pipelined maps:
     >>> rdd = sc.parallelize([1, 2, 3, 4])
@@ -1519,6 +1548,7 @@ class PipelinedRDD(RDD):
     >>> rdd.flatMap(lambda x: [x, x]).reduce(add)
     20
     """
+
     def __init__(self, prev, func, preservesPartitioning=False):
         if not isinstance(prev, PipelinedRDD) or not prev._is_pipelinable():
             # This transformation is the first in its stage:
@@ -1528,6 +1558,7 @@ class PipelinedRDD(RDD):
             self._prev_jrdd_deserializer = prev._jrdd_deserializer
         else:
             prev_func = prev.func
+
             def pipeline_func(split, iterator):
                 return func(split, prev_func(split, iterator))
             self.func = pipeline_func
@@ -1560,11 +1591,13 @@ class PipelinedRDD(RDD):
         env = MapConverter().convert(self.ctx.environment,
                                      self.ctx._gateway._gateway_client)
         includes = ListConverter().convert(self.ctx._python_includes,
-                                     self.ctx._gateway._gateway_client)
+                                           self.ctx._gateway._gateway_client)
         python_rdd = self.ctx._jvm.PythonRDD(self._prev_jrdd.rdd(),
-            bytearray(pickled_command), env, includes, self.preservesPartitioning,
-            self.ctx.pythonExec, broadcast_vars, self.ctx._javaAccumulator,
-            class_tag)
+                                             bytearray(pickled_command),
+                                             env, includes, self.preservesPartitioning,
+                                             self.ctx.pythonExec,
+                                             broadcast_vars, self.ctx._javaAccumulator,
+                                             class_tag)
         self._jrdd_val = python_rdd.asJavaRDD()
         return self._jrdd_val
 
@@ -1579,7 +1612,8 @@ def _test():
     # The small batch size here ensures that we see multiple batches,
     # even in these small test examples:
     globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
-    (failure_count, test_count) = doctest.testmod(globs=globs,optionflags=doctest.ELLIPSIS)
+    (failure_count, test_count) = doctest.testmod(
+        globs=globs, optionflags=doctest.ELLIPSIS)
     globs['sc'].stop()
     if failure_count:
         exit(-1)