You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2015/04/17 01:21:15 UTC

[4/6] spark git commit: [SPARK-4897] [PySpark] Python 3 support

http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/python/pyspark/mllib/fpm.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/fpm.py b/python/pyspark/mllib/fpm.py
index 3aa6d79..628ccc0 100644
--- a/python/pyspark/mllib/fpm.py
+++ b/python/pyspark/mllib/fpm.py
@@ -16,12 +16,14 @@
 #
 
 from pyspark import SparkContext
+from pyspark.rdd import ignore_unicode_prefix
 from pyspark.mllib.common import JavaModelWrapper, callMLlibFunc, inherit_doc
 
 __all__ = ['FPGrowth', 'FPGrowthModel']
 
 
 @inherit_doc
+@ignore_unicode_prefix
 class FPGrowthModel(JavaModelWrapper):
 
     """

http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/python/pyspark/mllib/linalg.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/linalg.py b/python/pyspark/mllib/linalg.py
index a80320c..38b3aa3 100644
--- a/python/pyspark/mllib/linalg.py
+++ b/python/pyspark/mllib/linalg.py
@@ -25,7 +25,13 @@ SciPy is available in their environment.
 
 import sys
 import array
-import copy_reg
+
+if sys.version >= '3':
+    basestring = str
+    xrange = range
+    import copyreg as copy_reg
+else:
+    import copy_reg
 
 import numpy as np
 
@@ -57,7 +63,7 @@ except:
 def _convert_to_vector(l):
     if isinstance(l, Vector):
         return l
-    elif type(l) in (array.array, np.array, np.ndarray, list, tuple):
+    elif type(l) in (array.array, np.array, np.ndarray, list, tuple, xrange):
         return DenseVector(l)
     elif _have_scipy and scipy.sparse.issparse(l):
         assert l.shape[1] == 1, "Expected column vector"
@@ -88,7 +94,7 @@ def _vector_size(v):
     """
     if isinstance(v, Vector):
         return len(v)
-    elif type(v) in (array.array, list, tuple):
+    elif type(v) in (array.array, list, tuple, xrange):
         return len(v)
     elif type(v) == np.ndarray:
         if v.ndim == 1 or (v.ndim == 2 and v.shape[1] == 1):
@@ -193,7 +199,7 @@ class DenseVector(Vector):
     DenseVector([1.0, 0.0])
     """
     def __init__(self, ar):
-        if isinstance(ar, basestring):
+        if isinstance(ar, bytes):
             ar = np.frombuffer(ar, dtype=np.float64)
         elif not isinstance(ar, np.ndarray):
             ar = np.array(ar, dtype=np.float64)
@@ -321,11 +327,13 @@ class DenseVector(Vector):
     __sub__ = _delegate("__sub__")
     __mul__ = _delegate("__mul__")
     __div__ = _delegate("__div__")
+    __truediv__ = _delegate("__truediv__")
     __mod__ = _delegate("__mod__")
     __radd__ = _delegate("__radd__")
     __rsub__ = _delegate("__rsub__")
     __rmul__ = _delegate("__rmul__")
     __rdiv__ = _delegate("__rdiv__")
+    __rtruediv__ = _delegate("__rtruediv__")
     __rmod__ = _delegate("__rmod__")
 
 
@@ -344,12 +352,12 @@ class SparseVector(Vector):
         :param args: Non-zero entries, as a dictionary, list of tupes,
                or two sorted lists containing indices and values.
 
-        >>> print SparseVector(4, {1: 1.0, 3: 5.5})
-        (4,[1,3],[1.0,5.5])
-        >>> print SparseVector(4, [(1, 1.0), (3, 5.5)])
-        (4,[1,3],[1.0,5.5])
-        >>> print SparseVector(4, [1, 3], [1.0, 5.5])
-        (4,[1,3],[1.0,5.5])
+        >>> SparseVector(4, {1: 1.0, 3: 5.5})
+        SparseVector(4, {1: 1.0, 3: 5.5})
+        >>> SparseVector(4, [(1, 1.0), (3, 5.5)])
+        SparseVector(4, {1: 1.0, 3: 5.5})
+        >>> SparseVector(4, [1, 3], [1.0, 5.5])
+        SparseVector(4, {1: 1.0, 3: 5.5})
         """
         self.size = int(size)
         assert 1 <= len(args) <= 2, "must pass either 2 or 3 arguments"
@@ -361,8 +369,8 @@ class SparseVector(Vector):
             self.indices = np.array([p[0] for p in pairs], dtype=np.int32)
             self.values = np.array([p[1] for p in pairs], dtype=np.float64)
         else:
-            if isinstance(args[0], basestring):
-                assert isinstance(args[1], str), "values should be string too"
+            if isinstance(args[0], bytes):
+                assert isinstance(args[1], bytes), "values should be string too"
                 if args[0]:
                     self.indices = np.frombuffer(args[0], np.int32)
                     self.values = np.frombuffer(args[1], np.float64)
@@ -591,12 +599,12 @@ class Vectors(object):
         :param args: Non-zero entries, as a dictionary, list of tupes,
                      or two sorted lists containing indices and values.
 
-        >>> print Vectors.sparse(4, {1: 1.0, 3: 5.5})
-        (4,[1,3],[1.0,5.5])
-        >>> print Vectors.sparse(4, [(1, 1.0), (3, 5.5)])
-        (4,[1,3],[1.0,5.5])
-        >>> print Vectors.sparse(4, [1, 3], [1.0, 5.5])
-        (4,[1,3],[1.0,5.5])
+        >>> Vectors.sparse(4, {1: 1.0, 3: 5.5})
+        SparseVector(4, {1: 1.0, 3: 5.5})
+        >>> Vectors.sparse(4, [(1, 1.0), (3, 5.5)])
+        SparseVector(4, {1: 1.0, 3: 5.5})
+        >>> Vectors.sparse(4, [1, 3], [1.0, 5.5])
+        SparseVector(4, {1: 1.0, 3: 5.5})
         """
         return SparseVector(size, *args)
 
@@ -645,7 +653,7 @@ class Matrix(object):
         """
         Convert Matrix attributes which are array-like or buffer to array.
         """
-        if isinstance(array_like, basestring):
+        if isinstance(array_like, bytes):
             return np.frombuffer(array_like, dtype=dtype)
         return np.asarray(array_like, dtype=dtype)
 
@@ -677,7 +685,7 @@ class DenseMatrix(Matrix):
     def toSparse(self):
         """Convert to SparseMatrix"""
         indices = np.nonzero(self.values)[0]
-        colCounts = np.bincount(indices / self.numRows)
+        colCounts = np.bincount(indices // self.numRows)
         colPtrs = np.cumsum(np.hstack(
             (0, colCounts, np.zeros(self.numCols - colCounts.size))))
         values = self.values[indices]

http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/python/pyspark/mllib/rand.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/rand.py b/python/pyspark/mllib/rand.py
index 20ee9d7..06fbc0e 100644
--- a/python/pyspark/mllib/rand.py
+++ b/python/pyspark/mllib/rand.py
@@ -88,10 +88,10 @@ class RandomRDDs(object):
         :param seed: Random seed (default: a random long integer).
         :return: RDD of float comprised of i.i.d. samples ~ N(0.0, 1.0).
 
-        >>> x = RandomRDDs.normalRDD(sc, 1000, seed=1L)
+        >>> x = RandomRDDs.normalRDD(sc, 1000, seed=1)
         >>> stats = x.stats()
         >>> stats.count()
-        1000L
+        1000
         >>> abs(stats.mean() - 0.0) < 0.1
         True
         >>> abs(stats.stdev() - 1.0) < 0.1
@@ -118,10 +118,10 @@ class RandomRDDs(object):
         >>> std = 1.0
         >>> expMean = exp(mean + 0.5 * std * std)
         >>> expStd = sqrt((exp(std * std) - 1.0) * exp(2.0 * mean + std * std))
-        >>> x = RandomRDDs.logNormalRDD(sc, mean, std, 1000, seed=2L)
+        >>> x = RandomRDDs.logNormalRDD(sc, mean, std, 1000, seed=2)
         >>> stats = x.stats()
         >>> stats.count()
-        1000L
+        1000
         >>> abs(stats.mean() - expMean) < 0.5
         True
         >>> from math import sqrt
@@ -145,10 +145,10 @@ class RandomRDDs(object):
         :return: RDD of float comprised of i.i.d. samples ~ Pois(mean).
 
         >>> mean = 100.0
-        >>> x = RandomRDDs.poissonRDD(sc, mean, 1000, seed=2L)
+        >>> x = RandomRDDs.poissonRDD(sc, mean, 1000, seed=2)
         >>> stats = x.stats()
         >>> stats.count()
-        1000L
+        1000
         >>> abs(stats.mean() - mean) < 0.5
         True
         >>> from math import sqrt
@@ -171,10 +171,10 @@ class RandomRDDs(object):
         :return: RDD of float comprised of i.i.d. samples ~ Exp(mean).
 
         >>> mean = 2.0
-        >>> x = RandomRDDs.exponentialRDD(sc, mean, 1000, seed=2L)
+        >>> x = RandomRDDs.exponentialRDD(sc, mean, 1000, seed=2)
         >>> stats = x.stats()
         >>> stats.count()
-        1000L
+        1000
         >>> abs(stats.mean() - mean) < 0.5
         True
         >>> from math import sqrt
@@ -202,10 +202,10 @@ class RandomRDDs(object):
         >>> scale = 2.0
         >>> expMean = shape * scale
         >>> expStd = sqrt(shape * scale * scale)
-        >>> x = RandomRDDs.gammaRDD(sc, shape, scale, 1000, seed=2L)
+        >>> x = RandomRDDs.gammaRDD(sc, shape, scale, 1000, seed=2)
         >>> stats = x.stats()
         >>> stats.count()
-        1000L
+        1000
         >>> abs(stats.mean() - expMean) < 0.5
         True
         >>> abs(stats.stdev() - expStd) < 0.5
@@ -254,7 +254,7 @@ class RandomRDDs(object):
         :return: RDD of Vector with vectors containing i.i.d. samples ~ `N(0.0, 1.0)`.
 
         >>> import numpy as np
-        >>> mat = np.matrix(RandomRDDs.normalVectorRDD(sc, 100, 100, seed=1L).collect())
+        >>> mat = np.matrix(RandomRDDs.normalVectorRDD(sc, 100, 100, seed=1).collect())
         >>> mat.shape
         (100, 100)
         >>> abs(mat.mean() - 0.0) < 0.1
@@ -286,8 +286,8 @@ class RandomRDDs(object):
         >>> std = 1.0
         >>> expMean = exp(mean + 0.5 * std * std)
         >>> expStd = sqrt((exp(std * std) - 1.0) * exp(2.0 * mean + std * std))
-        >>> mat = np.matrix(RandomRDDs.logNormalVectorRDD(sc, mean, std, \
-                               100, 100, seed=1L).collect())
+        >>> m = RandomRDDs.logNormalVectorRDD(sc, mean, std, 100, 100, seed=1).collect()
+        >>> mat = np.matrix(m)
         >>> mat.shape
         (100, 100)
         >>> abs(mat.mean() - expMean) < 0.1
@@ -315,7 +315,7 @@ class RandomRDDs(object):
 
         >>> import numpy as np
         >>> mean = 100.0
-        >>> rdd = RandomRDDs.poissonVectorRDD(sc, mean, 100, 100, seed=1L)
+        >>> rdd = RandomRDDs.poissonVectorRDD(sc, mean, 100, 100, seed=1)
         >>> mat = np.mat(rdd.collect())
         >>> mat.shape
         (100, 100)
@@ -345,7 +345,7 @@ class RandomRDDs(object):
 
         >>> import numpy as np
         >>> mean = 0.5
-        >>> rdd = RandomRDDs.exponentialVectorRDD(sc, mean, 100, 100, seed=1L)
+        >>> rdd = RandomRDDs.exponentialVectorRDD(sc, mean, 100, 100, seed=1)
         >>> mat = np.mat(rdd.collect())
         >>> mat.shape
         (100, 100)
@@ -380,8 +380,7 @@ class RandomRDDs(object):
         >>> scale = 2.0
         >>> expMean = shape * scale
         >>> expStd = sqrt(shape * scale * scale)
-        >>> mat = np.matrix(RandomRDDs.gammaVectorRDD(sc, shape, scale, \
-                       100, 100, seed=1L).collect())
+        >>> mat = np.matrix(RandomRDDs.gammaVectorRDD(sc, shape, scale, 100, 100, seed=1).collect())
         >>> mat.shape
         (100, 100)
         >>> abs(mat.mean() - expMean) < 0.1

http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/python/pyspark/mllib/recommendation.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/recommendation.py b/python/pyspark/mllib/recommendation.py
index c5c4c13..80e0a35 100644
--- a/python/pyspark/mllib/recommendation.py
+++ b/python/pyspark/mllib/recommendation.py
@@ -15,6 +15,7 @@
 # limitations under the License.
 #
 
+import array
 from collections import namedtuple
 
 from pyspark import SparkContext
@@ -104,14 +105,14 @@ class MatrixFactorizationModel(JavaModelWrapper, JavaSaveable, JavaLoader):
         assert isinstance(user_product, RDD), "user_product should be RDD of (user, product)"
         first = user_product.first()
         assert len(first) == 2, "user_product should be RDD of (user, product)"
-        user_product = user_product.map(lambda (u, p): (int(u), int(p)))
+        user_product = user_product.map(lambda u_p: (int(u_p[0]), int(u_p[1])))
         return self.call("predict", user_product)
 
     def userFeatures(self):
-        return self.call("getUserFeatures")
+        return self.call("getUserFeatures").mapValues(lambda v: array.array('d', v))
 
     def productFeatures(self):
-        return self.call("getProductFeatures")
+        return self.call("getProductFeatures").mapValues(lambda v: array.array('d', v))
 
     @classmethod
     def load(cls, sc, path):

http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/python/pyspark/mllib/stat/_statistics.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/stat/_statistics.py b/python/pyspark/mllib/stat/_statistics.py
index 1d83e9d..b475be4 100644
--- a/python/pyspark/mllib/stat/_statistics.py
+++ b/python/pyspark/mllib/stat/_statistics.py
@@ -15,7 +15,7 @@
 # limitations under the License.
 #
 
-from pyspark import RDD
+from pyspark.rdd import RDD, ignore_unicode_prefix
 from pyspark.mllib.common import callMLlibFunc, JavaModelWrapper
 from pyspark.mllib.linalg import Matrix, _convert_to_vector
 from pyspark.mllib.regression import LabeledPoint
@@ -38,7 +38,7 @@ class MultivariateStatisticalSummary(JavaModelWrapper):
         return self.call("variance").toArray()
 
     def count(self):
-        return self.call("count")
+        return int(self.call("count"))
 
     def numNonzeros(self):
         return self.call("numNonzeros").toArray()
@@ -78,7 +78,7 @@ class Statistics(object):
         >>> cStats.variance()
         array([  4.,  13.,   0.,  25.])
         >>> cStats.count()
-        3L
+        3
         >>> cStats.numNonzeros()
         array([ 3.,  2.,  0.,  3.])
         >>> cStats.max()
@@ -124,20 +124,20 @@ class Statistics(object):
         >>> rdd = sc.parallelize([Vectors.dense([1, 0, 0, -2]), Vectors.dense([4, 5, 0, 3]),
         ...                       Vectors.dense([6, 7, 0,  8]), Vectors.dense([9, 0, 0, 1])])
         >>> pearsonCorr = Statistics.corr(rdd)
-        >>> print str(pearsonCorr).replace('nan', 'NaN')
+        >>> print(str(pearsonCorr).replace('nan', 'NaN'))
         [[ 1.          0.05564149         NaN  0.40047142]
          [ 0.05564149  1.                 NaN  0.91359586]
          [        NaN         NaN  1.                 NaN]
          [ 0.40047142  0.91359586         NaN  1.        ]]
         >>> spearmanCorr = Statistics.corr(rdd, method="spearman")
-        >>> print str(spearmanCorr).replace('nan', 'NaN')
+        >>> print(str(spearmanCorr).replace('nan', 'NaN'))
         [[ 1.          0.10540926         NaN  0.4       ]
          [ 0.10540926  1.                 NaN  0.9486833 ]
          [        NaN         NaN  1.                 NaN]
          [ 0.4         0.9486833          NaN  1.        ]]
         >>> try:
         ...     Statistics.corr(rdd, "spearman")
-        ...     print "Method name as second argument without 'method=' shouldn't be allowed."
+        ...     print("Method name as second argument without 'method=' shouldn't be allowed.")
         ... except TypeError:
         ...     pass
         """
@@ -153,6 +153,7 @@ class Statistics(object):
             return callMLlibFunc("corr", x.map(float), y.map(float), method)
 
     @staticmethod
+    @ignore_unicode_prefix
     def chiSqTest(observed, expected=None):
         """
         .. note:: Experimental
@@ -188,11 +189,11 @@ class Statistics(object):
         >>> from pyspark.mllib.linalg import Vectors, Matrices
         >>> observed = Vectors.dense([4, 6, 5])
         >>> pearson = Statistics.chiSqTest(observed)
-        >>> print pearson.statistic
+        >>> print(pearson.statistic)
         0.4
         >>> pearson.degreesOfFreedom
         2
-        >>> print round(pearson.pValue, 4)
+        >>> print(round(pearson.pValue, 4))
         0.8187
         >>> pearson.method
         u'pearson'
@@ -202,12 +203,12 @@ class Statistics(object):
         >>> observed = Vectors.dense([21, 38, 43, 80])
         >>> expected = Vectors.dense([3, 5, 7, 20])
         >>> pearson = Statistics.chiSqTest(observed, expected)
-        >>> print round(pearson.pValue, 4)
+        >>> print(round(pearson.pValue, 4))
         0.0027
 
         >>> data = [40.0, 24.0, 29.0, 56.0, 32.0, 42.0, 31.0, 10.0, 0.0, 30.0, 15.0, 12.0]
         >>> chi = Statistics.chiSqTest(Matrices.dense(3, 4, data))
-        >>> print round(chi.statistic, 4)
+        >>> print(round(chi.statistic, 4))
         21.9958
 
         >>> data = [LabeledPoint(0.0, Vectors.dense([0.5, 10.0])),
@@ -218,9 +219,9 @@ class Statistics(object):
         ...         LabeledPoint(1.0, Vectors.dense([3.5, 40.0])),]
         >>> rdd = sc.parallelize(data, 4)
         >>> chi = Statistics.chiSqTest(rdd)
-        >>> print chi[0].statistic
+        >>> print(chi[0].statistic)
         0.75
-        >>> print chi[1].statistic
+        >>> print(chi[1].statistic)
         1.5
         """
         if isinstance(observed, RDD):

http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/python/pyspark/mllib/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py
index 8eaddcf..c6ed5ac 100644
--- a/python/pyspark/mllib/tests.py
+++ b/python/pyspark/mllib/tests.py
@@ -72,11 +72,11 @@ class VectorTests(PySparkTestCase):
     def _test_serialize(self, v):
         self.assertEqual(v, ser.loads(ser.dumps(v)))
         jvec = self.sc._jvm.SerDe.loads(bytearray(ser.dumps(v)))
-        nv = ser.loads(str(self.sc._jvm.SerDe.dumps(jvec)))
+        nv = ser.loads(bytes(self.sc._jvm.SerDe.dumps(jvec)))
         self.assertEqual(v, nv)
         vs = [v] * 100
         jvecs = self.sc._jvm.SerDe.loads(bytearray(ser.dumps(vs)))
-        nvs = ser.loads(str(self.sc._jvm.SerDe.dumps(jvecs)))
+        nvs = ser.loads(bytes(self.sc._jvm.SerDe.dumps(jvecs)))
         self.assertEqual(vs, nvs)
 
     def test_serialize(self):
@@ -412,11 +412,11 @@ class StatTests(PySparkTestCase):
         self.assertEqual(10, len(summary.normL1()))
         self.assertEqual(10, len(summary.normL2()))
 
-        data2 = self.sc.parallelize(xrange(10)).map(lambda x: Vectors.dense(x))
+        data2 = self.sc.parallelize(range(10)).map(lambda x: Vectors.dense(x))
         summary2 = Statistics.colStats(data2)
         self.assertEqual(array([45.0]), summary2.normL1())
         import math
-        expectedNormL2 = math.sqrt(sum(map(lambda x: x*x, xrange(10))))
+        expectedNormL2 = math.sqrt(sum(map(lambda x: x*x, range(10))))
         self.assertTrue(math.fabs(summary2.normL2()[0] - expectedNormL2) < 1e-14)
 
 
@@ -438,11 +438,11 @@ class VectorUDTTests(PySparkTestCase):
     def test_infer_schema(self):
         sqlCtx = SQLContext(self.sc)
         rdd = self.sc.parallelize([LabeledPoint(1.0, self.dv1), LabeledPoint(0.0, self.sv1)])
-        srdd = sqlCtx.inferSchema(rdd)
-        schema = srdd.schema
+        df = rdd.toDF()
+        schema = df.schema
         field = [f for f in schema.fields if f.name == "features"][0]
         self.assertEqual(field.dataType, self.udt)
-        vectors = srdd.map(lambda p: p.features).collect()
+        vectors = df.map(lambda p: p.features).collect()
         self.assertEqual(len(vectors), 2)
         for v in vectors:
             if isinstance(v, SparseVector):
@@ -695,7 +695,7 @@ class ChiSqTestTests(PySparkTestCase):
 
 class SerDeTest(PySparkTestCase):
     def test_to_java_object_rdd(self):  # SPARK-6660
-        data = RandomRDDs.uniformRDD(self.sc, 10, 5, seed=0L)
+        data = RandomRDDs.uniformRDD(self.sc, 10, 5, seed=0)
         self.assertEqual(_to_java_object_rdd(data).count(), 10)
 
 
@@ -771,7 +771,7 @@ class StandardScalerTests(PySparkTestCase):
 
 if __name__ == "__main__":
     if not _have_scipy:
-        print "NOTE: Skipping SciPy tests as it does not seem to be installed"
+        print("NOTE: Skipping SciPy tests as it does not seem to be installed")
     unittest.main()
     if not _have_scipy:
-        print "NOTE: SciPy tests were skipped as it does not seem to be installed"
+        print("NOTE: SciPy tests were skipped as it does not seem to be installed")

http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/python/pyspark/mllib/tree.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/tree.py b/python/pyspark/mllib/tree.py
index a7a4d2a..0fe6e4f 100644
--- a/python/pyspark/mllib/tree.py
+++ b/python/pyspark/mllib/tree.py
@@ -163,14 +163,16 @@ class DecisionTree(object):
         ...     LabeledPoint(1.0, [3.0])
         ... ]
         >>> model = DecisionTree.trainClassifier(sc.parallelize(data), 2, {})
-        >>> print model,  # it already has newline
+        >>> print(model)
         DecisionTreeModel classifier of depth 1 with 3 nodes
-        >>> print model.toDebugString(),  # it already has newline
+
+        >>> print(model.toDebugString())
         DecisionTreeModel classifier of depth 1 with 3 nodes
           If (feature 0 <= 0.0)
            Predict: 0.0
           Else (feature 0 > 0.0)
            Predict: 1.0
+        <BLANKLINE>
         >>> model.predict(array([1.0]))
         1.0
         >>> model.predict(array([0.0]))
@@ -318,9 +320,10 @@ class RandomForest(object):
         3
         >>> model.totalNumNodes()
         7
-        >>> print model,
+        >>> print(model)
         TreeEnsembleModel classifier with 3 trees
-        >>> print model.toDebugString(),
+        <BLANKLINE>
+        >>> print(model.toDebugString())
         TreeEnsembleModel classifier with 3 trees
         <BLANKLINE>
           Tree 0:
@@ -335,6 +338,7 @@ class RandomForest(object):
              Predict: 0.0
             Else (feature 0 > 1.0)
              Predict: 1.0
+        <BLANKLINE>
         >>> model.predict([2.0])
         1.0
         >>> model.predict([0.0])
@@ -483,8 +487,9 @@ class GradientBoostedTrees(object):
         100
         >>> model.totalNumNodes()
         300
-        >>> print model,  # it already has newline
+        >>> print(model)  # it already has newline
         TreeEnsembleModel classifier with 100 trees
+        <BLANKLINE>
         >>> model.predict([2.0])
         1.0
         >>> model.predict([0.0])

http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/python/pyspark/mllib/util.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/util.py b/python/pyspark/mllib/util.py
index c5c3468..16a90db 100644
--- a/python/pyspark/mllib/util.py
+++ b/python/pyspark/mllib/util.py
@@ -15,10 +15,14 @@
 # limitations under the License.
 #
 
+import sys
 import numpy as np
 import warnings
 
-from pyspark.mllib.common import callMLlibFunc, JavaModelWrapper, inherit_doc
+if sys.version > '3':
+    xrange = range
+
+from pyspark.mllib.common import callMLlibFunc, inherit_doc
 from pyspark.mllib.linalg import Vectors, SparseVector, _convert_to_vector
 
 
@@ -94,22 +98,16 @@ class MLUtils(object):
         >>> from pyspark.mllib.util import MLUtils
         >>> from pyspark.mllib.regression import LabeledPoint
         >>> tempFile = NamedTemporaryFile(delete=True)
-        >>> tempFile.write("+1 1:1.0 3:2.0 5:3.0\\n-1\\n-1 2:4.0 4:5.0 6:6.0")
+        >>> _ = tempFile.write(b"+1 1:1.0 3:2.0 5:3.0\\n-1\\n-1 2:4.0 4:5.0 6:6.0")
         >>> tempFile.flush()
         >>> examples = MLUtils.loadLibSVMFile(sc, tempFile.name).collect()
         >>> tempFile.close()
-        >>> type(examples[0]) == LabeledPoint
-        True
-        >>> print examples[0]
-        (1.0,(6,[0,2,4],[1.0,2.0,3.0]))
-        >>> type(examples[1]) == LabeledPoint
-        True
-        >>> print examples[1]
-        (-1.0,(6,[],[]))
-        >>> type(examples[2]) == LabeledPoint
-        True
-        >>> print examples[2]
-        (-1.0,(6,[1,3,5],[4.0,5.0,6.0]))
+        >>> examples[0]
+        LabeledPoint(1.0, (6,[0,2,4],[1.0,2.0,3.0]))
+        >>> examples[1]
+        LabeledPoint(-1.0, (6,[],[]))
+        >>> examples[2]
+        LabeledPoint(-1.0, (6,[1,3,5],[4.0,5.0,6.0]))
         """
         from pyspark.mllib.regression import LabeledPoint
         if multiclass is not None:

http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/python/pyspark/profiler.py
----------------------------------------------------------------------
diff --git a/python/pyspark/profiler.py b/python/pyspark/profiler.py
index 4408996..d18daaa 100644
--- a/python/pyspark/profiler.py
+++ b/python/pyspark/profiler.py
@@ -84,11 +84,11 @@ class Profiler(object):
     >>> from pyspark import BasicProfiler
     >>> class MyCustomProfiler(BasicProfiler):
     ...     def show(self, id):
-    ...         print "My custom profiles for RDD:%s" % id
+    ...         print("My custom profiles for RDD:%s" % id)
     ...
     >>> conf = SparkConf().set("spark.python.profile", "true")
     >>> sc = SparkContext('local', 'test', conf=conf, profiler_cls=MyCustomProfiler)
-    >>> sc.parallelize(list(range(1000))).map(lambda x: 2 * x).take(10)
+    >>> sc.parallelize(range(1000)).map(lambda x: 2 * x).take(10)
     [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
     >>> sc.show_profiles()
     My custom profiles for RDD:1
@@ -111,9 +111,9 @@ class Profiler(object):
         """ Print the profile stats to stdout, id is the RDD id """
         stats = self.stats()
         if stats:
-            print "=" * 60
-            print "Profile of RDD<id=%d>" % id
-            print "=" * 60
+            print("=" * 60)
+            print("Profile of RDD<id=%d>" % id)
+            print("=" * 60)
             stats.sort_stats("time", "cumulative").print_stats()
 
     def dump(self, id, path):

http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/python/pyspark/rdd.py
----------------------------------------------------------------------
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 93e658e..d9cdbb6 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -16,21 +16,29 @@
 #
 
 import copy
-from collections import defaultdict
-from itertools import chain, ifilter, imap
-import operator
 import sys
+import os
+import re
+import operator
 import shlex
-from subprocess import Popen, PIPE
-from tempfile import NamedTemporaryFile
-from threading import Thread
 import warnings
 import heapq
 import bisect
 import random
 import socket
+from subprocess import Popen, PIPE
+from tempfile import NamedTemporaryFile
+from threading import Thread
+from collections import defaultdict
+from itertools import chain
+from functools import reduce
 from math import sqrt, log, isinf, isnan, pow, ceil
 
+if sys.version > '3':
+    basestring = unicode = str
+else:
+    from itertools import imap as map, ifilter as filter
+
 from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \
     BatchedSerializer, CloudPickleSerializer, PairDeserializer, \
     PickleSerializer, pack_long, AutoBatchedSerializer
@@ -50,20 +58,21 @@ from py4j.java_collections import ListConverter, MapConverter
 __all__ = ["RDD"]
 
 
-# TODO: for Python 3.3+, PYTHONHASHSEED should be reset to disable randomized
-# hash for string
 def portable_hash(x):
     """
-    This function returns consistant hash code for builtin types, especially
+    This function returns consistent hash code for builtin types, especially
     for None and tuple with None.
 
-    The algrithm is similar to that one used by CPython 2.7
+    The algorithm is similar to that one used by CPython 2.7
 
     >>> portable_hash(None)
     0
     >>> portable_hash((None, 1)) & 0xffffffff
     219750521
     """
+    if sys.version >= '3.3' and 'PYTHONHASHSEED' not in os.environ:
+        raise Exception("Randomness of hash of string should be disabled via PYTHONHASHSEED")
+
     if x is None:
         return 0
     if isinstance(x, tuple):
@@ -71,7 +80,7 @@ def portable_hash(x):
         for i in x:
             h ^= portable_hash(i)
             h *= 1000003
-            h &= sys.maxint
+            h &= sys.maxsize
         h ^= len(x)
         if h == -1:
             h = -2
@@ -123,6 +132,19 @@ def _load_from_socket(port, serializer):
         sock.close()
 
 
+def ignore_unicode_prefix(f):
+    """
+    Ignore the 'u' prefix of string in doc tests, to make it works
+    in both python 2 and 3
+    """
+    if sys.version >= '3':
+        # the representation of unicode string in Python 3 does not have prefix 'u',
+        # so remove the prefix 'u' for doc tests
+        literal_re = re.compile(r"(\W|^)[uU](['])", re.UNICODE)
+        f.__doc__ = literal_re.sub(r'\1\2', f.__doc__)
+    return f
+
+
 class Partitioner(object):
     def __init__(self, numPartitions, partitionFunc):
         self.numPartitions = numPartitions
@@ -251,7 +273,7 @@ class RDD(object):
         [('a', 1), ('b', 1), ('c', 1)]
         """
         def func(_, iterator):
-            return imap(f, iterator)
+            return map(f, iterator)
         return self.mapPartitionsWithIndex(func, preservesPartitioning)
 
     def flatMap(self, f, preservesPartitioning=False):
@@ -266,7 +288,7 @@ class RDD(object):
         [(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)]
         """
         def func(s, iterator):
-            return chain.from_iterable(imap(f, iterator))
+            return chain.from_iterable(map(f, iterator))
         return self.mapPartitionsWithIndex(func, preservesPartitioning)
 
     def mapPartitions(self, f, preservesPartitioning=False):
@@ -329,7 +351,7 @@ class RDD(object):
         [2, 4]
         """
         def func(iterator):
-            return ifilter(f, iterator)
+            return filter(f, iterator)
         return self.mapPartitions(func, True)
 
     def distinct(self, numPartitions=None):
@@ -341,7 +363,7 @@ class RDD(object):
         """
         return self.map(lambda x: (x, None)) \
                    .reduceByKey(lambda x, _: x, numPartitions) \
-                   .map(lambda (x, _): x)
+                   .map(lambda x: x[0])
 
     def sample(self, withReplacement, fraction, seed=None):
         """
@@ -354,8 +376,8 @@ class RDD(object):
         :param seed: seed for the random number generator
 
         >>> rdd = sc.parallelize(range(100), 4)
-        >>> rdd.sample(False, 0.1, 81).count()
-        10
+        >>> 6 <= rdd.sample(False, 0.1, 81).count() <= 14
+        True
         """
         assert fraction >= 0.0, "Negative fraction value: %s" % fraction
         return self.mapPartitionsWithIndex(RDDSampler(withReplacement, fraction, seed).func, True)
@@ -368,12 +390,14 @@ class RDD(object):
         :param seed: random seed
         :return: split RDDs in a list
 
-        >>> rdd = sc.parallelize(range(5), 1)
+        >>> rdd = sc.parallelize(range(500), 1)
         >>> rdd1, rdd2 = rdd.randomSplit([2, 3], 17)
-        >>> rdd1.collect()
-        [1, 3]
-        >>> rdd2.collect()
-        [0, 2, 4]
+        >>> len(rdd1.collect() + rdd2.collect())
+        500
+        >>> 150 < rdd1.count() < 250
+        True
+        >>> 250 < rdd2.count() < 350
+        True
         """
         s = float(sum(weights))
         cweights = [0.0]
@@ -416,7 +440,7 @@ class RDD(object):
             rand.shuffle(samples)
             return samples
 
-        maxSampleSize = sys.maxint - int(numStDev * sqrt(sys.maxint))
+        maxSampleSize = sys.maxsize - int(numStDev * sqrt(sys.maxsize))
         if num > maxSampleSize:
             raise ValueError(
                 "Sample size cannot be greater than %d." % maxSampleSize)
@@ -430,7 +454,7 @@ class RDD(object):
         # See: scala/spark/RDD.scala
         while len(samples) < num:
             # TODO: add log warning for when more than one iteration was run
-            seed = rand.randint(0, sys.maxint)
+            seed = rand.randint(0, sys.maxsize)
             samples = self.sample(withReplacement, fraction, seed).collect()
 
         rand.shuffle(samples)
@@ -507,7 +531,7 @@ class RDD(object):
         """
         return self.map(lambda v: (v, None)) \
             .cogroup(other.map(lambda v: (v, None))) \
-            .filter(lambda (k, vs): all(vs)) \
+            .filter(lambda k_vs: all(k_vs[1])) \
             .keys()
 
     def _reserialize(self, serializer=None):
@@ -549,7 +573,7 @@ class RDD(object):
 
         def sortPartition(iterator):
             sort = ExternalSorter(memory * 0.9, serializer).sorted if spill else sorted
-            return iter(sort(iterator, key=lambda (k, v): keyfunc(k), reverse=(not ascending)))
+            return iter(sort(iterator, key=lambda k_v: keyfunc(k_v[0]), reverse=(not ascending)))
 
         return self.partitionBy(numPartitions, partitionFunc).mapPartitions(sortPartition, True)
 
@@ -579,7 +603,7 @@ class RDD(object):
 
         def sortPartition(iterator):
             sort = ExternalSorter(memory * 0.9, serializer).sorted if spill else sorted
-            return iter(sort(iterator, key=lambda (k, v): keyfunc(k), reverse=(not ascending)))
+            return iter(sort(iterator, key=lambda kv: keyfunc(kv[0]), reverse=(not ascending)))
 
         if numPartitions == 1:
             if self.getNumPartitions() > 1:
@@ -594,12 +618,12 @@ class RDD(object):
             return self  # empty RDD
         maxSampleSize = numPartitions * 20.0  # constant from Spark's RangePartitioner
         fraction = min(maxSampleSize / max(rddSize, 1), 1.0)
-        samples = self.sample(False, fraction, 1).map(lambda (k, v): k).collect()
+        samples = self.sample(False, fraction, 1).map(lambda kv: kv[0]).collect()
         samples = sorted(samples, key=keyfunc)
 
         # we have numPartitions many parts but one of the them has
         # an implicit boundary
-        bounds = [samples[len(samples) * (i + 1) / numPartitions]
+        bounds = [samples[int(len(samples) * (i + 1) / numPartitions)]
                   for i in range(0, numPartitions - 1)]
 
         def rangePartitioner(k):
@@ -662,12 +686,13 @@ class RDD(object):
         """
         return self.map(lambda x: (f(x), x)).groupByKey(numPartitions)
 
+    @ignore_unicode_prefix
     def pipe(self, command, env={}):
         """
         Return an RDD created by piping elements to a forked external process.
 
         >>> sc.parallelize(['1', '2', '', '3']).pipe('cat').collect()
-        ['1', '2', '', '3']
+        [u'1', u'2', u'', u'3']
         """
         def func(iterator):
             pipe = Popen(
@@ -675,17 +700,18 @@ class RDD(object):
 
             def pipe_objs(out):
                 for obj in iterator:
-                    out.write(str(obj).rstrip('\n') + '\n')
+                    s = str(obj).rstrip('\n') + '\n'
+                    out.write(s.encode('utf-8'))
                 out.close()
             Thread(target=pipe_objs, args=[pipe.stdin]).start()
-            return (x.rstrip('\n') for x in iter(pipe.stdout.readline, ''))
+            return (x.rstrip(b'\n').decode('utf-8') for x in iter(pipe.stdout.readline, b''))
         return self.mapPartitions(func)
 
     def foreach(self, f):
         """
         Applies a function to all elements of this RDD.
 
-        >>> def f(x): print x
+        >>> def f(x): print(x)
         >>> sc.parallelize([1, 2, 3, 4, 5]).foreach(f)
         """
         def processPartition(iterator):
@@ -700,7 +726,7 @@ class RDD(object):
 
         >>> def f(iterator):
         ...      for x in iterator:
-        ...           print x
+        ...           print(x)
         >>> sc.parallelize([1, 2, 3, 4, 5]).foreachPartition(f)
         """
         def func(it):
@@ -874,7 +900,7 @@ class RDD(object):
         # aggregation.
         while numPartitions > scale + numPartitions / scale:
             numPartitions /= scale
-            curNumPartitions = numPartitions
+            curNumPartitions = int(numPartitions)
 
             def mapPartition(i, iterator):
                 for obj in iterator:
@@ -984,7 +1010,7 @@ class RDD(object):
         (('a', 'b', 'c'), [2, 2])
         """
 
-        if isinstance(buckets, (int, long)):
+        if isinstance(buckets, int):
             if buckets < 1:
                 raise ValueError("number of buckets must be >= 1")
 
@@ -1020,6 +1046,7 @@ class RDD(object):
                 raise ValueError("Can not generate buckets with infinite value")
 
             # keep them as integer if possible
+            inc = int(inc)
             if inc * buckets != maxv - minv:
                 inc = (maxv - minv) * 1.0 / buckets
 
@@ -1137,7 +1164,7 @@ class RDD(object):
             yield counts
 
         def mergeMaps(m1, m2):
-            for k, v in m2.iteritems():
+            for k, v in m2.items():
                 m1[k] += v
             return m1
         return self.mapPartitions(countPartition).reduce(mergeMaps)
@@ -1378,8 +1405,8 @@ class RDD(object):
         >>> tmpFile = NamedTemporaryFile(delete=True)
         >>> tmpFile.close()
         >>> sc.parallelize([1, 2, 'spark', 'rdd']).saveAsPickleFile(tmpFile.name, 3)
-        >>> sorted(sc.pickleFile(tmpFile.name, 5).collect())
-        [1, 2, 'rdd', 'spark']
+        >>> sorted(sc.pickleFile(tmpFile.name, 5).map(str).collect())
+        ['1', '2', 'rdd', 'spark']
         """
         if batchSize == 0:
             ser = AutoBatchedSerializer(PickleSerializer())
@@ -1387,6 +1414,7 @@ class RDD(object):
             ser = BatchedSerializer(PickleSerializer(), batchSize)
         self._reserialize(ser)._jrdd.saveAsObjectFile(path)
 
+    @ignore_unicode_prefix
     def saveAsTextFile(self, path, compressionCodecClass=None):
         """
         Save this RDD as a text file, using string representations of elements.
@@ -1418,12 +1446,13 @@ class RDD(object):
         >>> codec = "org.apache.hadoop.io.compress.GzipCodec"
         >>> sc.parallelize(['foo', 'bar']).saveAsTextFile(tempFile3.name, codec)
         >>> from fileinput import input, hook_compressed
-        >>> ''.join(sorted(input(glob(tempFile3.name + "/part*.gz"), openhook=hook_compressed)))
-        'bar\\nfoo\\n'
+        >>> result = sorted(input(glob(tempFile3.name + "/part*.gz"), openhook=hook_compressed))
+        >>> b''.join(result).decode('utf-8')
+        u'bar\\nfoo\\n'
         """
         def func(split, iterator):
             for x in iterator:
-                if not isinstance(x, basestring):
+                if not isinstance(x, (unicode, bytes)):
                     x = unicode(x)
                 if isinstance(x, unicode):
                     x = x.encode("utf-8")
@@ -1458,7 +1487,7 @@ class RDD(object):
         >>> m.collect()
         [1, 3]
         """
-        return self.map(lambda (k, v): k)
+        return self.map(lambda x: x[0])
 
     def values(self):
         """
@@ -1468,7 +1497,7 @@ class RDD(object):
         >>> m.collect()
         [2, 4]
         """
-        return self.map(lambda (k, v): v)
+        return self.map(lambda x: x[1])
 
     def reduceByKey(self, func, numPartitions=None):
         """
@@ -1507,7 +1536,7 @@ class RDD(object):
             yield m
 
         def mergeMaps(m1, m2):
-            for k, v in m2.iteritems():
+            for k, v in m2.items():
                 m1[k] = func(m1[k], v) if k in m1 else v
             return m1
         return self.mapPartitions(reducePartition).reduce(mergeMaps)
@@ -1604,8 +1633,8 @@ class RDD(object):
 
         >>> pairs = sc.parallelize([1, 2, 3, 4, 2, 4, 1]).map(lambda x: (x, x))
         >>> sets = pairs.partitionBy(2).glom().collect()
-        >>> set(sets[0]).intersection(set(sets[1]))
-        set([])
+        >>> len(set(sets[0]).intersection(set(sets[1])))
+        0
         """
         if numPartitions is None:
             numPartitions = self._defaultReducePartitions()
@@ -1637,22 +1666,22 @@ class RDD(object):
                 if (c % 1000 == 0 and get_used_memory() > limit
                         or c > batch):
                     n, size = len(buckets), 0
-                    for split in buckets.keys():
+                    for split in list(buckets.keys()):
                         yield pack_long(split)
                         d = outputSerializer.dumps(buckets[split])
                         del buckets[split]
                         yield d
                         size += len(d)
 
-                    avg = (size / n) >> 20
+                    avg = int(size / n) >> 20
                     # let 1M < avg < 10M
                     if avg < 1:
                         batch *= 1.5
                     elif avg > 10:
-                        batch = max(batch / 1.5, 1)
+                        batch = max(int(batch / 1.5), 1)
                     c = 0
 
-            for split, items in buckets.iteritems():
+            for split, items in buckets.items():
                 yield pack_long(split)
                 yield outputSerializer.dumps(items)
 
@@ -1707,7 +1736,7 @@ class RDD(object):
             merger = ExternalMerger(agg, memory * 0.9, serializer) \
                 if spill else InMemoryMerger(agg)
             merger.mergeValues(iterator)
-            return merger.iteritems()
+            return merger.items()
 
         locally_combined = self.mapPartitions(combineLocally, preservesPartitioning=True)
         shuffled = locally_combined.partitionBy(numPartitions)
@@ -1716,7 +1745,7 @@ class RDD(object):
             merger = ExternalMerger(agg, memory, serializer) \
                 if spill else InMemoryMerger(agg)
             merger.mergeCombiners(iterator)
-            return merger.iteritems()
+            return merger.items()
 
         return shuffled.mapPartitions(_mergeCombiners, preservesPartitioning=True)
 
@@ -1745,7 +1774,7 @@ class RDD(object):
 
         >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
         >>> from operator import add
-        >>> rdd.foldByKey(0, add).collect()
+        >>> sorted(rdd.foldByKey(0, add).collect())
         [('a', 2), ('b', 1)]
         """
         def createZero():
@@ -1769,10 +1798,10 @@ class RDD(object):
         sum or average) over each key, using reduceByKey or aggregateByKey will
         provide much better performance.
 
-        >>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
-        >>> sorted(x.groupByKey().mapValues(len).collect())
+        >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
+        >>> sorted(rdd.groupByKey().mapValues(len).collect())
         [('a', 2), ('b', 1)]
-        >>> sorted(x.groupByKey().mapValues(list).collect())
+        >>> sorted(rdd.groupByKey().mapValues(list).collect())
         [('a', [1, 1]), ('b', [1])]
         """
         def createCombiner(x):
@@ -1795,7 +1824,7 @@ class RDD(object):
             merger = ExternalMerger(agg, memory * 0.9, serializer) \
                 if spill else InMemoryMerger(agg)
             merger.mergeValues(iterator)
-            return merger.iteritems()
+            return merger.items()
 
         locally_combined = self.mapPartitions(combine, preservesPartitioning=True)
         shuffled = locally_combined.partitionBy(numPartitions)
@@ -1804,7 +1833,7 @@ class RDD(object):
             merger = ExternalGroupBy(agg, memory, serializer)\
                 if spill else InMemoryMerger(agg)
             merger.mergeCombiners(it)
-            return merger.iteritems()
+            return merger.items()
 
         return shuffled.mapPartitions(groupByKey, True).mapValues(ResultIterable)
 
@@ -1819,7 +1848,7 @@ class RDD(object):
         >>> x.flatMapValues(f).collect()
         [('a', 'x'), ('a', 'y'), ('a', 'z'), ('b', 'p'), ('b', 'r')]
         """
-        flat_map_fn = lambda (k, v): ((k, x) for x in f(v))
+        flat_map_fn = lambda kv: ((kv[0], x) for x in f(kv[1]))
         return self.flatMap(flat_map_fn, preservesPartitioning=True)
 
     def mapValues(self, f):
@@ -1833,7 +1862,7 @@ class RDD(object):
         >>> x.mapValues(f).collect()
         [('a', 3), ('b', 1)]
         """
-        map_values_fn = lambda (k, v): (k, f(v))
+        map_values_fn = lambda kv: (kv[0], f(kv[1]))
         return self.map(map_values_fn, preservesPartitioning=True)
 
     def groupWith(self, other, *others):
@@ -1844,8 +1873,7 @@ class RDD(object):
         >>> x = sc.parallelize([("a", 1), ("b", 4)])
         >>> y = sc.parallelize([("a", 2)])
         >>> z = sc.parallelize([("b", 42)])
-        >>> map((lambda (x,y): (x, (list(y[0]), list(y[1]), list(y[2]), list(y[3])))), \
-                sorted(list(w.groupWith(x, y, z).collect())))
+        >>> [(x, tuple(map(list, y))) for x, y in sorted(list(w.groupWith(x, y, z).collect()))]
         [('a', ([5], [1], [2], [])), ('b', ([6], [4], [], [42]))]
 
         """
@@ -1860,7 +1888,7 @@ class RDD(object):
 
         >>> x = sc.parallelize([("a", 1), ("b", 4)])
         >>> y = sc.parallelize([("a", 2)])
-        >>> map((lambda (x,y): (x, (list(y[0]), list(y[1])))), sorted(list(x.cogroup(y).collect())))
+        >>> [(x, tuple(map(list, y))) for x, y in sorted(list(x.cogroup(y).collect()))]
         [('a', ([1], [2])), ('b', ([4], []))]
         """
         return python_cogroup((self, other), numPartitions)
@@ -1896,8 +1924,9 @@ class RDD(object):
         >>> sorted(x.subtractByKey(y).collect())
         [('b', 4), ('b', 5)]
         """
-        def filter_func((key, vals)):
-            return vals[0] and not vals[1]
+        def filter_func(pair):
+            key, (val1, val2) = pair
+            return val1 and not val2
         return self.cogroup(other, numPartitions).filter(filter_func).flatMapValues(lambda x: x[0])
 
     def subtract(self, other, numPartitions=None):
@@ -1919,8 +1948,8 @@ class RDD(object):
 
         >>> x = sc.parallelize(range(0,3)).keyBy(lambda x: x*x)
         >>> y = sc.parallelize(zip(range(0,5), range(0,5)))
-        >>> map((lambda (x,y): (x, (list(y[0]), (list(y[1]))))), sorted(x.cogroup(y).collect()))
-        [(0, ([0], [0])), (1, ([1], [1])), (2, ([], [2])), (3, ([], [3])), (4, ([2], [4]))]
+        >>> [(x, list(map(list, y))) for x, y in sorted(x.cogroup(y).collect())]
+        [(0, [[0], [0]]), (1, [[1], [1]]), (2, [[], [2]]), (3, [[], [3]]), (4, [[2], [4]])]
         """
         return self.map(lambda x: (f(x), x))
 
@@ -2049,17 +2078,18 @@ class RDD(object):
         """
         Return the name of this RDD.
         """
-        name_ = self._jrdd.name()
-        if name_:
-            return name_.encode('utf-8')
+        n = self._jrdd.name()
+        if n:
+            return n
 
+    @ignore_unicode_prefix
     def setName(self, name):
         """
         Assign a name to this RDD.
 
-        >>> rdd1 = sc.parallelize([1,2])
+        >>> rdd1 = sc.parallelize([1, 2])
         >>> rdd1.setName('RDD1').name()
-        'RDD1'
+        u'RDD1'
         """
         self._jrdd.setName(name)
         return self
@@ -2121,7 +2151,7 @@ class RDD(object):
         >>> sorted.lookup(1024)
         []
         """
-        values = self.filter(lambda (k, v): k == key).values()
+        values = self.filter(lambda kv: kv[0] == key).values()
 
         if self.partitioner is not None:
             return self.ctx.runJob(values, lambda x: x, [self.partitioner(key)], False)
@@ -2159,7 +2189,7 @@ class RDD(object):
         or meet the confidence.
 
         >>> rdd = sc.parallelize(range(1000), 10)
-        >>> r = sum(xrange(1000))
+        >>> r = sum(range(1000))
         >>> (rdd.sumApprox(1000) - r) / r < 0.05
         True
         """
@@ -2176,7 +2206,7 @@ class RDD(object):
         or meet the confidence.
 
         >>> rdd = sc.parallelize(range(1000), 10)
-        >>> r = sum(xrange(1000)) / 1000.0
+        >>> r = sum(range(1000)) / 1000.0
         >>> (rdd.meanApprox(1000) - r) / r < 0.05
         True
         """
@@ -2201,10 +2231,10 @@ class RDD(object):
                            It must be greater than 0.000017.
 
         >>> n = sc.parallelize(range(1000)).map(str).countApproxDistinct()
-        >>> 950 < n < 1050
+        >>> 900 < n < 1100
         True
         >>> n = sc.parallelize([i % 20 for i in range(1000)]).countApproxDistinct()
-        >>> 18 < n < 22
+        >>> 16 < n < 24
         True
         """
         if relativeSD < 0.000017:
@@ -2223,8 +2253,7 @@ class RDD(object):
         >>> [x for x in rdd.toLocalIterator()]
         [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
         """
-        partitions = xrange(self.getNumPartitions())
-        for partition in partitions:
+        for partition in range(self.getNumPartitions()):
             rows = self.context.runJob(self, lambda x: x, [partition])
             for row in rows:
                 yield row

http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/python/pyspark/rddsampler.py
----------------------------------------------------------------------
diff --git a/python/pyspark/rddsampler.py b/python/pyspark/rddsampler.py
index 459e142..fe8f873 100644
--- a/python/pyspark/rddsampler.py
+++ b/python/pyspark/rddsampler.py
@@ -23,7 +23,7 @@ import math
 class RDDSamplerBase(object):
 
     def __init__(self, withReplacement, seed=None):
-        self._seed = seed if seed is not None else random.randint(0, sys.maxint)
+        self._seed = seed if seed is not None else random.randint(0, sys.maxsize)
         self._withReplacement = withReplacement
         self._random = None
 
@@ -31,7 +31,7 @@ class RDDSamplerBase(object):
         self._random = random.Random(self._seed ^ split)
 
         # mixing because the initial seeds are close to each other
-        for _ in xrange(10):
+        for _ in range(10):
             self._random.randint(0, 1)
 
     def getUniformSample(self):

http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/python/pyspark/serializers.py
----------------------------------------------------------------------
diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py
index 4afa82f..d8cdcda 100644
--- a/python/pyspark/serializers.py
+++ b/python/pyspark/serializers.py
@@ -49,16 +49,24 @@ which contains two batches of two objects:
 >>> sc.stop()
 """
 
-import cPickle
-from itertools import chain, izip, product
+import sys
+from itertools import chain, product
 import marshal
 import struct
-import sys
 import types
 import collections
 import zlib
 import itertools
 
+if sys.version < '3':
+    import cPickle as pickle
+    protocol = 2
+    from itertools import izip as zip
+else:
+    import pickle
+    protocol = 3
+    xrange = range
+
 from pyspark import cloudpickle
 
 
@@ -97,7 +105,7 @@ class Serializer(object):
     # subclasses should override __eq__ as appropriate.
 
     def __eq__(self, other):
-        return isinstance(other, self.__class__)
+        return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
 
     def __ne__(self, other):
         return not self.__eq__(other)
@@ -212,10 +220,6 @@ class BatchedSerializer(Serializer):
     def _load_stream_without_unbatching(self, stream):
         return self.serializer.load_stream(stream)
 
-    def __eq__(self, other):
-        return (isinstance(other, BatchedSerializer) and
-                other.serializer == self.serializer and other.batchSize == self.batchSize)
-
     def __repr__(self):
         return "BatchedSerializer(%s, %d)" % (str(self.serializer), self.batchSize)
 
@@ -233,14 +237,14 @@ class FlattenedValuesSerializer(BatchedSerializer):
     def _batched(self, iterator):
         n = self.batchSize
         for key, values in iterator:
-            for i in xrange(0, len(values), n):
+            for i in range(0, len(values), n):
                 yield key, values[i:i + n]
 
     def load_stream(self, stream):
         return self.serializer.load_stream(stream)
 
     def __repr__(self):
-        return "FlattenedValuesSerializer(%d)" % self.batchSize
+        return "FlattenedValuesSerializer(%s, %d)" % (self.serializer, self.batchSize)
 
 
 class AutoBatchedSerializer(BatchedSerializer):
@@ -270,12 +274,8 @@ class AutoBatchedSerializer(BatchedSerializer):
             elif size > best * 10 and batch > 1:
                 batch /= 2
 
-    def __eq__(self, other):
-        return (isinstance(other, AutoBatchedSerializer) and
-                other.serializer == self.serializer and other.bestSize == self.bestSize)
-
     def __repr__(self):
-        return "AutoBatchedSerializer(%s)" % str(self.serializer)
+        return "AutoBatchedSerializer(%s)" % self.serializer
 
 
 class CartesianDeserializer(FramedSerializer):
@@ -285,6 +285,7 @@ class CartesianDeserializer(FramedSerializer):
     """
 
     def __init__(self, key_ser, val_ser):
+        FramedSerializer.__init__(self)
         self.key_ser = key_ser
         self.val_ser = val_ser
 
@@ -293,7 +294,7 @@ class CartesianDeserializer(FramedSerializer):
         val_stream = self.val_ser._load_stream_without_unbatching(stream)
         key_is_batched = isinstance(self.key_ser, BatchedSerializer)
         val_is_batched = isinstance(self.val_ser, BatchedSerializer)
-        for (keys, vals) in izip(key_stream, val_stream):
+        for (keys, vals) in zip(key_stream, val_stream):
             keys = keys if key_is_batched else [keys]
             vals = vals if val_is_batched else [vals]
             yield (keys, vals)
@@ -303,10 +304,6 @@ class CartesianDeserializer(FramedSerializer):
             for pair in product(keys, vals):
                 yield pair
 
-    def __eq__(self, other):
-        return (isinstance(other, CartesianDeserializer) and
-                self.key_ser == other.key_ser and self.val_ser == other.val_ser)
-
     def __repr__(self):
         return "CartesianDeserializer(%s, %s)" % \
                (str(self.key_ser), str(self.val_ser))
@@ -318,22 +315,14 @@ class PairDeserializer(CartesianDeserializer):
     Deserializes the JavaRDD zip() of two PythonRDDs.
     """
 
-    def __init__(self, key_ser, val_ser):
-        self.key_ser = key_ser
-        self.val_ser = val_ser
-
     def load_stream(self, stream):
         for (keys, vals) in self.prepare_keys_values(stream):
             if len(keys) != len(vals):
                 raise ValueError("Can not deserialize RDD with different number of items"
                                  " in pair: (%d, %d)" % (len(keys), len(vals)))
-            for pair in izip(keys, vals):
+            for pair in zip(keys, vals):
                 yield pair
 
-    def __eq__(self, other):
-        return (isinstance(other, PairDeserializer) and
-                self.key_ser == other.key_ser and self.val_ser == other.val_ser)
-
     def __repr__(self):
         return "PairDeserializer(%s, %s)" % (str(self.key_ser), str(self.val_ser))
 
@@ -382,8 +371,8 @@ def _hijack_namedtuple():
     global _old_namedtuple  # or it will put in closure
 
     def _copy_func(f):
-        return types.FunctionType(f.func_code, f.func_globals, f.func_name,
-                                  f.func_defaults, f.func_closure)
+        return types.FunctionType(f.__code__, f.__globals__, f.__name__,
+                                  f.__defaults__, f.__closure__)
 
     _old_namedtuple = _copy_func(collections.namedtuple)
 
@@ -392,15 +381,15 @@ def _hijack_namedtuple():
         return _hack_namedtuple(cls)
 
     # replace namedtuple with new one
-    collections.namedtuple.func_globals["_old_namedtuple"] = _old_namedtuple
-    collections.namedtuple.func_globals["_hack_namedtuple"] = _hack_namedtuple
-    collections.namedtuple.func_code = namedtuple.func_code
+    collections.namedtuple.__globals__["_old_namedtuple"] = _old_namedtuple
+    collections.namedtuple.__globals__["_hack_namedtuple"] = _hack_namedtuple
+    collections.namedtuple.__code__ = namedtuple.__code__
     collections.namedtuple.__hijack = 1
 
     # hack the cls already generated by namedtuple
     # those created in other module can be pickled as normal,
     # so only hack those in __main__ module
-    for n, o in sys.modules["__main__"].__dict__.iteritems():
+    for n, o in sys.modules["__main__"].__dict__.items():
         if (type(o) is type and o.__base__ is tuple
                 and hasattr(o, "_fields")
                 and "__reduce__" not in o.__dict__):
@@ -413,7 +402,7 @@ _hijack_namedtuple()
 class PickleSerializer(FramedSerializer):
 
     """
-    Serializes objects using Python's cPickle serializer:
+    Serializes objects using Python's pickle serializer:
 
         http://docs.python.org/2/library/pickle.html
 
@@ -422,10 +411,14 @@ class PickleSerializer(FramedSerializer):
     """
 
     def dumps(self, obj):
-        return cPickle.dumps(obj, 2)
+        return pickle.dumps(obj, protocol)
 
-    def loads(self, obj):
-        return cPickle.loads(obj)
+    if sys.version >= '3':
+        def loads(self, obj, encoding="bytes"):
+            return pickle.loads(obj, encoding=encoding)
+    else:
+        def loads(self, obj, encoding=None):
+            return pickle.loads(obj)
 
 
 class CloudPickleSerializer(PickleSerializer):
@@ -454,7 +447,7 @@ class MarshalSerializer(FramedSerializer):
 class AutoSerializer(FramedSerializer):
 
     """
-    Choose marshal or cPickle as serialization protocol automatically
+    Choose marshal or pickle as serialization protocol automatically
     """
 
     def __init__(self):
@@ -463,19 +456,19 @@ class AutoSerializer(FramedSerializer):
 
     def dumps(self, obj):
         if self._type is not None:
-            return 'P' + cPickle.dumps(obj, -1)
+            return b'P' + pickle.dumps(obj, -1)
         try:
-            return 'M' + marshal.dumps(obj)
+            return b'M' + marshal.dumps(obj)
         except Exception:
-            self._type = 'P'
-            return 'P' + cPickle.dumps(obj, -1)
+            self._type = b'P'
+            return b'P' + pickle.dumps(obj, -1)
 
     def loads(self, obj):
         _type = obj[0]
-        if _type == 'M':
+        if _type == b'M':
             return marshal.loads(obj[1:])
-        elif _type == 'P':
-            return cPickle.loads(obj[1:])
+        elif _type == b'P':
+            return pickle.loads(obj[1:])
         else:
             raise ValueError("invalid sevialization type: %s" % _type)
 
@@ -495,8 +488,8 @@ class CompressedSerializer(FramedSerializer):
     def loads(self, obj):
         return self.serializer.loads(zlib.decompress(obj))
 
-    def __eq__(self, other):
-        return isinstance(other, CompressedSerializer) and self.serializer == other.serializer
+    def __repr__(self):
+        return "CompressedSerializer(%s)" % self.serializer
 
 
 class UTF8Deserializer(Serializer):
@@ -505,7 +498,7 @@ class UTF8Deserializer(Serializer):
     Deserializes streams written by String.getBytes.
     """
 
-    def __init__(self, use_unicode=False):
+    def __init__(self, use_unicode=True):
         self.use_unicode = use_unicode
 
     def loads(self, stream):
@@ -526,13 +519,13 @@ class UTF8Deserializer(Serializer):
         except EOFError:
             return
 
-    def __eq__(self, other):
-        return isinstance(other, UTF8Deserializer) and self.use_unicode == other.use_unicode
+    def __repr__(self):
+        return "UTF8Deserializer(%s)" % self.use_unicode
 
 
 def read_long(stream):
     length = stream.read(8)
-    if length == "":
+    if not length:
         raise EOFError
     return struct.unpack("!q", length)[0]
 
@@ -547,7 +540,7 @@ def pack_long(value):
 
 def read_int(stream):
     length = stream.read(4)
-    if length == "":
+    if not length:
         raise EOFError
     return struct.unpack("!i", length)[0]
 

http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/python/pyspark/shell.py
----------------------------------------------------------------------
diff --git a/python/pyspark/shell.py b/python/pyspark/shell.py
index 81aa970..144cdf0 100644
--- a/python/pyspark/shell.py
+++ b/python/pyspark/shell.py
@@ -21,13 +21,6 @@ An interactive shell.
 This file is designed to be launched as a PYTHONSTARTUP script.
 """
 
-import sys
-if sys.version_info[0] != 2:
-    print("Error: Default Python used is Python%s" % sys.version_info.major)
-    print("\tSet env variable PYSPARK_PYTHON to Python2 binary and re-run it.")
-    sys.exit(1)
-
-
 import atexit
 import os
 import platform
@@ -53,9 +46,14 @@ atexit.register(lambda: sc.stop())
 try:
     # Try to access HiveConf, it will raise exception if Hive is not added
     sc._jvm.org.apache.hadoop.hive.conf.HiveConf()
-    sqlCtx = sqlContext = HiveContext(sc)
+    sqlContext = HiveContext(sc)
 except py4j.protocol.Py4JError:
-    sqlCtx = sqlContext = SQLContext(sc)
+    sqlContext = SQLContext(sc)
+except TypeError:
+    sqlContext = SQLContext(sc)
+
+# for compatibility
+sqlCtx = sqlContext
 
 print("""Welcome to
       ____              __

http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/python/pyspark/shuffle.py
----------------------------------------------------------------------
diff --git a/python/pyspark/shuffle.py b/python/pyspark/shuffle.py
index 8a6fc62..b54baa5 100644
--- a/python/pyspark/shuffle.py
+++ b/python/pyspark/shuffle.py
@@ -78,8 +78,8 @@ def _get_local_dirs(sub):
 
 
 # global stats
-MemoryBytesSpilled = 0L
-DiskBytesSpilled = 0L
+MemoryBytesSpilled = 0
+DiskBytesSpilled = 0
 
 
 class Aggregator(object):
@@ -126,7 +126,7 @@ class Merger(object):
         """ Merge the combined items by mergeCombiner """
         raise NotImplementedError
 
-    def iteritems(self):
+    def items(self):
         """ Return the merged items ad iterator """
         raise NotImplementedError
 
@@ -156,9 +156,9 @@ class InMemoryMerger(Merger):
         for k, v in iterator:
             d[k] = comb(d[k], v) if k in d else v
 
-    def iteritems(self):
-        """ Return the merged items as iterator """
-        return self.data.iteritems()
+    def items(self):
+        """ Return the merged items ad iterator """
+        return iter(self.data.items())
 
 
 def _compressed_serializer(self, serializer=None):
@@ -208,15 +208,15 @@ class ExternalMerger(Merger):
     >>> agg = SimpleAggregator(lambda x, y: x + y)
     >>> merger = ExternalMerger(agg, 10)
     >>> N = 10000
-    >>> merger.mergeValues(zip(xrange(N), xrange(N)))
+    >>> merger.mergeValues(zip(range(N), range(N)))
     >>> assert merger.spills > 0
-    >>> sum(v for k,v in merger.iteritems())
+    >>> sum(v for k,v in merger.items())
     49995000
 
     >>> merger = ExternalMerger(agg, 10)
-    >>> merger.mergeCombiners(zip(xrange(N), xrange(N)))
+    >>> merger.mergeCombiners(zip(range(N), range(N)))
     >>> assert merger.spills > 0
-    >>> sum(v for k,v in merger.iteritems())
+    >>> sum(v for k,v in merger.items())
     49995000
     """
 
@@ -335,10 +335,10 @@ class ExternalMerger(Merger):
             # above limit at the first time.
 
             # open all the files for writing
-            streams = [open(os.path.join(path, str(i)), 'w')
+            streams = [open(os.path.join(path, str(i)), 'wb')
                        for i in range(self.partitions)]
 
-            for k, v in self.data.iteritems():
+            for k, v in self.data.items():
                 h = self._partition(k)
                 # put one item in batch, make it compatible with load_stream
                 # it will increase the memory if dump them in batch
@@ -354,9 +354,9 @@ class ExternalMerger(Merger):
         else:
             for i in range(self.partitions):
                 p = os.path.join(path, str(i))
-                with open(p, "w") as f:
+                with open(p, "wb") as f:
                     # dump items in batch
-                    self.serializer.dump_stream(self.pdata[i].iteritems(), f)
+                    self.serializer.dump_stream(iter(self.pdata[i].items()), f)
                 self.pdata[i].clear()
                 DiskBytesSpilled += os.path.getsize(p)
 
@@ -364,10 +364,10 @@ class ExternalMerger(Merger):
         gc.collect()  # release the memory as much as possible
         MemoryBytesSpilled += (used_memory - get_used_memory()) << 20
 
-    def iteritems(self):
+    def items(self):
         """ Return all merged items as iterator """
         if not self.pdata and not self.spills:
-            return self.data.iteritems()
+            return iter(self.data.items())
         return self._external_items()
 
     def _external_items(self):
@@ -398,7 +398,8 @@ class ExternalMerger(Merger):
             path = self._get_spill_dir(j)
             p = os.path.join(path, str(index))
             # do not check memory during merging
-            self.mergeCombiners(self.serializer.load_stream(open(p)), 0)
+            with open(p, "rb") as f:
+                self.mergeCombiners(self.serializer.load_stream(f), 0)
 
             # limit the total partitions
             if (self.scale * self.partitions < self.MAX_TOTAL_PARTITIONS
@@ -408,7 +409,7 @@ class ExternalMerger(Merger):
                 gc.collect()  # release the memory as much as possible
                 return self._recursive_merged_items(index)
 
-        return self.data.iteritems()
+        return self.data.items()
 
     def _recursive_merged_items(self, index):
         """
@@ -426,7 +427,8 @@ class ExternalMerger(Merger):
         for j in range(self.spills):
             path = self._get_spill_dir(j)
             p = os.path.join(path, str(index))
-            m.mergeCombiners(self.serializer.load_stream(open(p)), 0)
+            with open(p, 'rb') as f:
+                m.mergeCombiners(self.serializer.load_stream(f), 0)
 
             if get_used_memory() > limit:
                 m._spill()
@@ -451,7 +453,7 @@ class ExternalSorter(object):
 
     >>> sorter = ExternalSorter(1)  # 1M
     >>> import random
-    >>> l = range(1024)
+    >>> l = list(range(1024))
     >>> random.shuffle(l)
     >>> sorted(l) == list(sorter.sorted(l))
     True
@@ -499,9 +501,16 @@ class ExternalSorter(object):
                 # sort them inplace will save memory
                 current_chunk.sort(key=key, reverse=reverse)
                 path = self._get_path(len(chunks))
-                with open(path, 'w') as f:
+                with open(path, 'wb') as f:
                     self.serializer.dump_stream(current_chunk, f)
-                chunks.append(self.serializer.load_stream(open(path)))
+
+                def load(f):
+                    for v in self.serializer.load_stream(f):
+                        yield v
+                    # close the file explicit once we consume all the items
+                    # to avoid ResourceWarning in Python3
+                    f.close()
+                chunks.append(load(open(path, 'rb')))
                 current_chunk = []
                 gc.collect()
                 limit = self._next_limit()
@@ -527,7 +536,7 @@ class ExternalList(object):
     ExternalList can have many items which cannot be hold in memory in
     the same time.
 
-    >>> l = ExternalList(range(100))
+    >>> l = ExternalList(list(range(100)))
     >>> len(l)
     100
     >>> l.append(10)
@@ -555,11 +564,11 @@ class ExternalList(object):
     def __getstate__(self):
         if self._file is not None:
             self._file.flush()
-            f = os.fdopen(os.dup(self._file.fileno()))
-            f.seek(0)
-            serialized = f.read()
+            with os.fdopen(os.dup(self._file.fileno()), "rb") as f:
+                f.seek(0)
+                serialized = f.read()
         else:
-            serialized = ''
+            serialized = b''
         return self.values, self.count, serialized
 
     def __setstate__(self, item):
@@ -575,7 +584,7 @@ class ExternalList(object):
         if self._file is not None:
             self._file.flush()
             # read all items from disks first
-            with os.fdopen(os.dup(self._file.fileno()), 'r') as f:
+            with os.fdopen(os.dup(self._file.fileno()), 'rb') as f:
                 f.seek(0)
                 for v in self._ser.load_stream(f):
                     yield v
@@ -598,11 +607,16 @@ class ExternalList(object):
         d = dirs[id(self) % len(dirs)]
         if not os.path.exists(d):
             os.makedirs(d)
-        p = os.path.join(d, str(id))
-        self._file = open(p, "w+", 65536)
+        p = os.path.join(d, str(id(self)))
+        self._file = open(p, "wb+", 65536)
         self._ser = BatchedSerializer(CompressedSerializer(PickleSerializer()), 1024)
         os.unlink(p)
 
+    def __del__(self):
+        if self._file:
+            self._file.close()
+            self._file = None
+
     def _spill(self):
         """ dump the values into disk """
         global MemoryBytesSpilled, DiskBytesSpilled
@@ -651,33 +665,28 @@ class GroupByKey(object):
     """
     Group a sorted iterator as [(k1, it1), (k2, it2), ...]
 
-    >>> k = [i/3 for i in range(6)]
+    >>> k = [i // 3 for i in range(6)]
     >>> v = [[i] for i in range(6)]
-    >>> g = GroupByKey(iter(zip(k, v)))
+    >>> g = GroupByKey(zip(k, v))
     >>> [(k, list(it)) for k, it in g]
     [(0, [0, 1, 2]), (1, [3, 4, 5])]
     """
 
     def __init__(self, iterator):
-        self.iterator = iter(iterator)
-        self.next_item = None
+        self.iterator = iterator
 
     def __iter__(self):
-        return self
-
-    def next(self):
-        key, value = self.next_item if self.next_item else next(self.iterator)
-        values = ExternalListOfList([value])
-        try:
-            while True:
-                k, v = next(self.iterator)
-                if k != key:
-                    self.next_item = (k, v)
-                    break
+        key, values = None, None
+        for k, v in self.iterator:
+            if values is not None and k == key:
                 values.append(v)
-        except StopIteration:
-            self.next_item = None
-        return key, values
+            else:
+                if values is not None:
+                    yield (key, values)
+                key = k
+                values = ExternalListOfList([v])
+        if values is not None:
+            yield (key, values)
 
 
 class ExternalGroupBy(ExternalMerger):
@@ -744,7 +753,7 @@ class ExternalGroupBy(ExternalMerger):
             # above limit at the first time.
 
             # open all the files for writing
-            streams = [open(os.path.join(path, str(i)), 'w')
+            streams = [open(os.path.join(path, str(i)), 'wb')
                        for i in range(self.partitions)]
 
             # If the number of keys is small, then the overhead of sort is small
@@ -756,7 +765,7 @@ class ExternalGroupBy(ExternalMerger):
                     h = self._partition(k)
                     self.serializer.dump_stream([(k, self.data[k])], streams[h])
             else:
-                for k, v in self.data.iteritems():
+                for k, v in self.data.items():
                     h = self._partition(k)
                     self.serializer.dump_stream([(k, v)], streams[h])
 
@@ -771,14 +780,14 @@ class ExternalGroupBy(ExternalMerger):
         else:
             for i in range(self.partitions):
                 p = os.path.join(path, str(i))
-                with open(p, "w") as f:
+                with open(p, "wb") as f:
                     # dump items in batch
                     if self._sorted:
                         # sort by key only (stable)
-                        sorted_items = sorted(self.pdata[i].iteritems(), key=operator.itemgetter(0))
+                        sorted_items = sorted(self.pdata[i].items(), key=operator.itemgetter(0))
                         self.serializer.dump_stream(sorted_items, f)
                     else:
-                        self.serializer.dump_stream(self.pdata[i].iteritems(), f)
+                        self.serializer.dump_stream(self.pdata[i].items(), f)
                 self.pdata[i].clear()
                 DiskBytesSpilled += os.path.getsize(p)
 
@@ -792,7 +801,7 @@ class ExternalGroupBy(ExternalMerger):
         # if the memory can not hold all the partition,
         # then use sort based merge. Because of compression,
         # the data on disks will be much smaller than needed memory
-        if (size >> 20) >= self.memory_limit / 10:
+        if size >= self.memory_limit << 17:  # * 1M / 8
             return self._merge_sorted_items(index)
 
         self.data = {}
@@ -800,15 +809,18 @@ class ExternalGroupBy(ExternalMerger):
             path = self._get_spill_dir(j)
             p = os.path.join(path, str(index))
             # do not check memory during merging
-            self.mergeCombiners(self.serializer.load_stream(open(p)), 0)
-        return self.data.iteritems()
+            with open(p, "rb") as f:
+                self.mergeCombiners(self.serializer.load_stream(f), 0)
+        return self.data.items()
 
     def _merge_sorted_items(self, index):
         """ load a partition from disk, then sort and group by key """
         def load_partition(j):
             path = self._get_spill_dir(j)
             p = os.path.join(path, str(index))
-            return self.serializer.load_stream(open(p, 'r', 65536))
+            with open(p, 'rb', 65536) as f:
+                for v in self.serializer.load_stream(f):
+                    yield v
 
         disk_items = [load_partition(j) for j in range(self.spills)]
 

http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/python/pyspark/sql/__init__.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/__init__.py b/python/pyspark/sql/__init__.py
index 65abb24..6d54b9e 100644
--- a/python/pyspark/sql/__init__.py
+++ b/python/pyspark/sql/__init__.py
@@ -37,9 +37,22 @@ Important classes of Spark SQL and DataFrames:
     - L{types}
       List of data types available.
 """
+from __future__ import absolute_import
+
+# fix the module name conflict for Python 3+
+import sys
+from . import _types as types
+modname = __name__ + '.types'
+types.__name__ = modname
+# update the __module__ for all objects, make them picklable
+for v in types.__dict__.values():
+    if hasattr(v, "__module__") and v.__module__.endswith('._types'):
+        v.__module__ = modname
+sys.modules[modname] = types
+del modname, sys
 
-from pyspark.sql.context import SQLContext, HiveContext
 from pyspark.sql.types import Row
+from pyspark.sql.context import SQLContext, HiveContext
 from pyspark.sql.dataframe import DataFrame, GroupedData, Column, SchemaRDD, DataFrameNaFunctions
 
 __all__ = [


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org