You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2015/04/17 01:21:15 UTC
[4/6] spark git commit: [SPARK-4897] [PySpark] Python 3 support
http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/python/pyspark/mllib/fpm.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/fpm.py b/python/pyspark/mllib/fpm.py
index 3aa6d79..628ccc0 100644
--- a/python/pyspark/mllib/fpm.py
+++ b/python/pyspark/mllib/fpm.py
@@ -16,12 +16,14 @@
#
from pyspark import SparkContext
+from pyspark.rdd import ignore_unicode_prefix
from pyspark.mllib.common import JavaModelWrapper, callMLlibFunc, inherit_doc
__all__ = ['FPGrowth', 'FPGrowthModel']
@inherit_doc
+@ignore_unicode_prefix
class FPGrowthModel(JavaModelWrapper):
"""
http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/python/pyspark/mllib/linalg.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/linalg.py b/python/pyspark/mllib/linalg.py
index a80320c..38b3aa3 100644
--- a/python/pyspark/mllib/linalg.py
+++ b/python/pyspark/mllib/linalg.py
@@ -25,7 +25,13 @@ SciPy is available in their environment.
import sys
import array
-import copy_reg
+
+if sys.version >= '3':
+ basestring = str
+ xrange = range
+ import copyreg as copy_reg
+else:
+ import copy_reg
import numpy as np
@@ -57,7 +63,7 @@ except:
def _convert_to_vector(l):
if isinstance(l, Vector):
return l
- elif type(l) in (array.array, np.array, np.ndarray, list, tuple):
+ elif type(l) in (array.array, np.array, np.ndarray, list, tuple, xrange):
return DenseVector(l)
elif _have_scipy and scipy.sparse.issparse(l):
assert l.shape[1] == 1, "Expected column vector"
@@ -88,7 +94,7 @@ def _vector_size(v):
"""
if isinstance(v, Vector):
return len(v)
- elif type(v) in (array.array, list, tuple):
+ elif type(v) in (array.array, list, tuple, xrange):
return len(v)
elif type(v) == np.ndarray:
if v.ndim == 1 or (v.ndim == 2 and v.shape[1] == 1):
@@ -193,7 +199,7 @@ class DenseVector(Vector):
DenseVector([1.0, 0.0])
"""
def __init__(self, ar):
- if isinstance(ar, basestring):
+ if isinstance(ar, bytes):
ar = np.frombuffer(ar, dtype=np.float64)
elif not isinstance(ar, np.ndarray):
ar = np.array(ar, dtype=np.float64)
@@ -321,11 +327,13 @@ class DenseVector(Vector):
__sub__ = _delegate("__sub__")
__mul__ = _delegate("__mul__")
__div__ = _delegate("__div__")
+ __truediv__ = _delegate("__truediv__")
__mod__ = _delegate("__mod__")
__radd__ = _delegate("__radd__")
__rsub__ = _delegate("__rsub__")
__rmul__ = _delegate("__rmul__")
__rdiv__ = _delegate("__rdiv__")
+ __rtruediv__ = _delegate("__rtruediv__")
__rmod__ = _delegate("__rmod__")
@@ -344,12 +352,12 @@ class SparseVector(Vector):
:param args: Non-zero entries, as a dictionary, list of tupes,
or two sorted lists containing indices and values.
- >>> print SparseVector(4, {1: 1.0, 3: 5.5})
- (4,[1,3],[1.0,5.5])
- >>> print SparseVector(4, [(1, 1.0), (3, 5.5)])
- (4,[1,3],[1.0,5.5])
- >>> print SparseVector(4, [1, 3], [1.0, 5.5])
- (4,[1,3],[1.0,5.5])
+ >>> SparseVector(4, {1: 1.0, 3: 5.5})
+ SparseVector(4, {1: 1.0, 3: 5.5})
+ >>> SparseVector(4, [(1, 1.0), (3, 5.5)])
+ SparseVector(4, {1: 1.0, 3: 5.5})
+ >>> SparseVector(4, [1, 3], [1.0, 5.5])
+ SparseVector(4, {1: 1.0, 3: 5.5})
"""
self.size = int(size)
assert 1 <= len(args) <= 2, "must pass either 2 or 3 arguments"
@@ -361,8 +369,8 @@ class SparseVector(Vector):
self.indices = np.array([p[0] for p in pairs], dtype=np.int32)
self.values = np.array([p[1] for p in pairs], dtype=np.float64)
else:
- if isinstance(args[0], basestring):
- assert isinstance(args[1], str), "values should be string too"
+ if isinstance(args[0], bytes):
+ assert isinstance(args[1], bytes), "values should be string too"
if args[0]:
self.indices = np.frombuffer(args[0], np.int32)
self.values = np.frombuffer(args[1], np.float64)
@@ -591,12 +599,12 @@ class Vectors(object):
:param args: Non-zero entries, as a dictionary, list of tupes,
or two sorted lists containing indices and values.
- >>> print Vectors.sparse(4, {1: 1.0, 3: 5.5})
- (4,[1,3],[1.0,5.5])
- >>> print Vectors.sparse(4, [(1, 1.0), (3, 5.5)])
- (4,[1,3],[1.0,5.5])
- >>> print Vectors.sparse(4, [1, 3], [1.0, 5.5])
- (4,[1,3],[1.0,5.5])
+ >>> Vectors.sparse(4, {1: 1.0, 3: 5.5})
+ SparseVector(4, {1: 1.0, 3: 5.5})
+ >>> Vectors.sparse(4, [(1, 1.0), (3, 5.5)])
+ SparseVector(4, {1: 1.0, 3: 5.5})
+ >>> Vectors.sparse(4, [1, 3], [1.0, 5.5])
+ SparseVector(4, {1: 1.0, 3: 5.5})
"""
return SparseVector(size, *args)
@@ -645,7 +653,7 @@ class Matrix(object):
"""
Convert Matrix attributes which are array-like or buffer to array.
"""
- if isinstance(array_like, basestring):
+ if isinstance(array_like, bytes):
return np.frombuffer(array_like, dtype=dtype)
return np.asarray(array_like, dtype=dtype)
@@ -677,7 +685,7 @@ class DenseMatrix(Matrix):
def toSparse(self):
"""Convert to SparseMatrix"""
indices = np.nonzero(self.values)[0]
- colCounts = np.bincount(indices / self.numRows)
+ colCounts = np.bincount(indices // self.numRows)
colPtrs = np.cumsum(np.hstack(
(0, colCounts, np.zeros(self.numCols - colCounts.size))))
values = self.values[indices]
http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/python/pyspark/mllib/rand.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/rand.py b/python/pyspark/mllib/rand.py
index 20ee9d7..06fbc0e 100644
--- a/python/pyspark/mllib/rand.py
+++ b/python/pyspark/mllib/rand.py
@@ -88,10 +88,10 @@ class RandomRDDs(object):
:param seed: Random seed (default: a random long integer).
:return: RDD of float comprised of i.i.d. samples ~ N(0.0, 1.0).
- >>> x = RandomRDDs.normalRDD(sc, 1000, seed=1L)
+ >>> x = RandomRDDs.normalRDD(sc, 1000, seed=1)
>>> stats = x.stats()
>>> stats.count()
- 1000L
+ 1000
>>> abs(stats.mean() - 0.0) < 0.1
True
>>> abs(stats.stdev() - 1.0) < 0.1
@@ -118,10 +118,10 @@ class RandomRDDs(object):
>>> std = 1.0
>>> expMean = exp(mean + 0.5 * std * std)
>>> expStd = sqrt((exp(std * std) - 1.0) * exp(2.0 * mean + std * std))
- >>> x = RandomRDDs.logNormalRDD(sc, mean, std, 1000, seed=2L)
+ >>> x = RandomRDDs.logNormalRDD(sc, mean, std, 1000, seed=2)
>>> stats = x.stats()
>>> stats.count()
- 1000L
+ 1000
>>> abs(stats.mean() - expMean) < 0.5
True
>>> from math import sqrt
@@ -145,10 +145,10 @@ class RandomRDDs(object):
:return: RDD of float comprised of i.i.d. samples ~ Pois(mean).
>>> mean = 100.0
- >>> x = RandomRDDs.poissonRDD(sc, mean, 1000, seed=2L)
+ >>> x = RandomRDDs.poissonRDD(sc, mean, 1000, seed=2)
>>> stats = x.stats()
>>> stats.count()
- 1000L
+ 1000
>>> abs(stats.mean() - mean) < 0.5
True
>>> from math import sqrt
@@ -171,10 +171,10 @@ class RandomRDDs(object):
:return: RDD of float comprised of i.i.d. samples ~ Exp(mean).
>>> mean = 2.0
- >>> x = RandomRDDs.exponentialRDD(sc, mean, 1000, seed=2L)
+ >>> x = RandomRDDs.exponentialRDD(sc, mean, 1000, seed=2)
>>> stats = x.stats()
>>> stats.count()
- 1000L
+ 1000
>>> abs(stats.mean() - mean) < 0.5
True
>>> from math import sqrt
@@ -202,10 +202,10 @@ class RandomRDDs(object):
>>> scale = 2.0
>>> expMean = shape * scale
>>> expStd = sqrt(shape * scale * scale)
- >>> x = RandomRDDs.gammaRDD(sc, shape, scale, 1000, seed=2L)
+ >>> x = RandomRDDs.gammaRDD(sc, shape, scale, 1000, seed=2)
>>> stats = x.stats()
>>> stats.count()
- 1000L
+ 1000
>>> abs(stats.mean() - expMean) < 0.5
True
>>> abs(stats.stdev() - expStd) < 0.5
@@ -254,7 +254,7 @@ class RandomRDDs(object):
:return: RDD of Vector with vectors containing i.i.d. samples ~ `N(0.0, 1.0)`.
>>> import numpy as np
- >>> mat = np.matrix(RandomRDDs.normalVectorRDD(sc, 100, 100, seed=1L).collect())
+ >>> mat = np.matrix(RandomRDDs.normalVectorRDD(sc, 100, 100, seed=1).collect())
>>> mat.shape
(100, 100)
>>> abs(mat.mean() - 0.0) < 0.1
@@ -286,8 +286,8 @@ class RandomRDDs(object):
>>> std = 1.0
>>> expMean = exp(mean + 0.5 * std * std)
>>> expStd = sqrt((exp(std * std) - 1.0) * exp(2.0 * mean + std * std))
- >>> mat = np.matrix(RandomRDDs.logNormalVectorRDD(sc, mean, std, \
- 100, 100, seed=1L).collect())
+ >>> m = RandomRDDs.logNormalVectorRDD(sc, mean, std, 100, 100, seed=1).collect()
+ >>> mat = np.matrix(m)
>>> mat.shape
(100, 100)
>>> abs(mat.mean() - expMean) < 0.1
@@ -315,7 +315,7 @@ class RandomRDDs(object):
>>> import numpy as np
>>> mean = 100.0
- >>> rdd = RandomRDDs.poissonVectorRDD(sc, mean, 100, 100, seed=1L)
+ >>> rdd = RandomRDDs.poissonVectorRDD(sc, mean, 100, 100, seed=1)
>>> mat = np.mat(rdd.collect())
>>> mat.shape
(100, 100)
@@ -345,7 +345,7 @@ class RandomRDDs(object):
>>> import numpy as np
>>> mean = 0.5
- >>> rdd = RandomRDDs.exponentialVectorRDD(sc, mean, 100, 100, seed=1L)
+ >>> rdd = RandomRDDs.exponentialVectorRDD(sc, mean, 100, 100, seed=1)
>>> mat = np.mat(rdd.collect())
>>> mat.shape
(100, 100)
@@ -380,8 +380,7 @@ class RandomRDDs(object):
>>> scale = 2.0
>>> expMean = shape * scale
>>> expStd = sqrt(shape * scale * scale)
- >>> mat = np.matrix(RandomRDDs.gammaVectorRDD(sc, shape, scale, \
- 100, 100, seed=1L).collect())
+ >>> mat = np.matrix(RandomRDDs.gammaVectorRDD(sc, shape, scale, 100, 100, seed=1).collect())
>>> mat.shape
(100, 100)
>>> abs(mat.mean() - expMean) < 0.1
http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/python/pyspark/mllib/recommendation.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/recommendation.py b/python/pyspark/mllib/recommendation.py
index c5c4c13..80e0a35 100644
--- a/python/pyspark/mllib/recommendation.py
+++ b/python/pyspark/mllib/recommendation.py
@@ -15,6 +15,7 @@
# limitations under the License.
#
+import array
from collections import namedtuple
from pyspark import SparkContext
@@ -104,14 +105,14 @@ class MatrixFactorizationModel(JavaModelWrapper, JavaSaveable, JavaLoader):
assert isinstance(user_product, RDD), "user_product should be RDD of (user, product)"
first = user_product.first()
assert len(first) == 2, "user_product should be RDD of (user, product)"
- user_product = user_product.map(lambda (u, p): (int(u), int(p)))
+ user_product = user_product.map(lambda u_p: (int(u_p[0]), int(u_p[1])))
return self.call("predict", user_product)
def userFeatures(self):
- return self.call("getUserFeatures")
+ return self.call("getUserFeatures").mapValues(lambda v: array.array('d', v))
def productFeatures(self):
- return self.call("getProductFeatures")
+ return self.call("getProductFeatures").mapValues(lambda v: array.array('d', v))
@classmethod
def load(cls, sc, path):
http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/python/pyspark/mllib/stat/_statistics.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/stat/_statistics.py b/python/pyspark/mllib/stat/_statistics.py
index 1d83e9d..b475be4 100644
--- a/python/pyspark/mllib/stat/_statistics.py
+++ b/python/pyspark/mllib/stat/_statistics.py
@@ -15,7 +15,7 @@
# limitations under the License.
#
-from pyspark import RDD
+from pyspark.rdd import RDD, ignore_unicode_prefix
from pyspark.mllib.common import callMLlibFunc, JavaModelWrapper
from pyspark.mllib.linalg import Matrix, _convert_to_vector
from pyspark.mllib.regression import LabeledPoint
@@ -38,7 +38,7 @@ class MultivariateStatisticalSummary(JavaModelWrapper):
return self.call("variance").toArray()
def count(self):
- return self.call("count")
+ return int(self.call("count"))
def numNonzeros(self):
return self.call("numNonzeros").toArray()
@@ -78,7 +78,7 @@ class Statistics(object):
>>> cStats.variance()
array([ 4., 13., 0., 25.])
>>> cStats.count()
- 3L
+ 3
>>> cStats.numNonzeros()
array([ 3., 2., 0., 3.])
>>> cStats.max()
@@ -124,20 +124,20 @@ class Statistics(object):
>>> rdd = sc.parallelize([Vectors.dense([1, 0, 0, -2]), Vectors.dense([4, 5, 0, 3]),
... Vectors.dense([6, 7, 0, 8]), Vectors.dense([9, 0, 0, 1])])
>>> pearsonCorr = Statistics.corr(rdd)
- >>> print str(pearsonCorr).replace('nan', 'NaN')
+ >>> print(str(pearsonCorr).replace('nan', 'NaN'))
[[ 1. 0.05564149 NaN 0.40047142]
[ 0.05564149 1. NaN 0.91359586]
[ NaN NaN 1. NaN]
[ 0.40047142 0.91359586 NaN 1. ]]
>>> spearmanCorr = Statistics.corr(rdd, method="spearman")
- >>> print str(spearmanCorr).replace('nan', 'NaN')
+ >>> print(str(spearmanCorr).replace('nan', 'NaN'))
[[ 1. 0.10540926 NaN 0.4 ]
[ 0.10540926 1. NaN 0.9486833 ]
[ NaN NaN 1. NaN]
[ 0.4 0.9486833 NaN 1. ]]
>>> try:
... Statistics.corr(rdd, "spearman")
- ... print "Method name as second argument without 'method=' shouldn't be allowed."
+ ... print("Method name as second argument without 'method=' shouldn't be allowed.")
... except TypeError:
... pass
"""
@@ -153,6 +153,7 @@ class Statistics(object):
return callMLlibFunc("corr", x.map(float), y.map(float), method)
@staticmethod
+ @ignore_unicode_prefix
def chiSqTest(observed, expected=None):
"""
.. note:: Experimental
@@ -188,11 +189,11 @@ class Statistics(object):
>>> from pyspark.mllib.linalg import Vectors, Matrices
>>> observed = Vectors.dense([4, 6, 5])
>>> pearson = Statistics.chiSqTest(observed)
- >>> print pearson.statistic
+ >>> print(pearson.statistic)
0.4
>>> pearson.degreesOfFreedom
2
- >>> print round(pearson.pValue, 4)
+ >>> print(round(pearson.pValue, 4))
0.8187
>>> pearson.method
u'pearson'
@@ -202,12 +203,12 @@ class Statistics(object):
>>> observed = Vectors.dense([21, 38, 43, 80])
>>> expected = Vectors.dense([3, 5, 7, 20])
>>> pearson = Statistics.chiSqTest(observed, expected)
- >>> print round(pearson.pValue, 4)
+ >>> print(round(pearson.pValue, 4))
0.0027
>>> data = [40.0, 24.0, 29.0, 56.0, 32.0, 42.0, 31.0, 10.0, 0.0, 30.0, 15.0, 12.0]
>>> chi = Statistics.chiSqTest(Matrices.dense(3, 4, data))
- >>> print round(chi.statistic, 4)
+ >>> print(round(chi.statistic, 4))
21.9958
>>> data = [LabeledPoint(0.0, Vectors.dense([0.5, 10.0])),
@@ -218,9 +219,9 @@ class Statistics(object):
... LabeledPoint(1.0, Vectors.dense([3.5, 40.0])),]
>>> rdd = sc.parallelize(data, 4)
>>> chi = Statistics.chiSqTest(rdd)
- >>> print chi[0].statistic
+ >>> print(chi[0].statistic)
0.75
- >>> print chi[1].statistic
+ >>> print(chi[1].statistic)
1.5
"""
if isinstance(observed, RDD):
http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/python/pyspark/mllib/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py
index 8eaddcf..c6ed5ac 100644
--- a/python/pyspark/mllib/tests.py
+++ b/python/pyspark/mllib/tests.py
@@ -72,11 +72,11 @@ class VectorTests(PySparkTestCase):
def _test_serialize(self, v):
self.assertEqual(v, ser.loads(ser.dumps(v)))
jvec = self.sc._jvm.SerDe.loads(bytearray(ser.dumps(v)))
- nv = ser.loads(str(self.sc._jvm.SerDe.dumps(jvec)))
+ nv = ser.loads(bytes(self.sc._jvm.SerDe.dumps(jvec)))
self.assertEqual(v, nv)
vs = [v] * 100
jvecs = self.sc._jvm.SerDe.loads(bytearray(ser.dumps(vs)))
- nvs = ser.loads(str(self.sc._jvm.SerDe.dumps(jvecs)))
+ nvs = ser.loads(bytes(self.sc._jvm.SerDe.dumps(jvecs)))
self.assertEqual(vs, nvs)
def test_serialize(self):
@@ -412,11 +412,11 @@ class StatTests(PySparkTestCase):
self.assertEqual(10, len(summary.normL1()))
self.assertEqual(10, len(summary.normL2()))
- data2 = self.sc.parallelize(xrange(10)).map(lambda x: Vectors.dense(x))
+ data2 = self.sc.parallelize(range(10)).map(lambda x: Vectors.dense(x))
summary2 = Statistics.colStats(data2)
self.assertEqual(array([45.0]), summary2.normL1())
import math
- expectedNormL2 = math.sqrt(sum(map(lambda x: x*x, xrange(10))))
+ expectedNormL2 = math.sqrt(sum(map(lambda x: x*x, range(10))))
self.assertTrue(math.fabs(summary2.normL2()[0] - expectedNormL2) < 1e-14)
@@ -438,11 +438,11 @@ class VectorUDTTests(PySparkTestCase):
def test_infer_schema(self):
sqlCtx = SQLContext(self.sc)
rdd = self.sc.parallelize([LabeledPoint(1.0, self.dv1), LabeledPoint(0.0, self.sv1)])
- srdd = sqlCtx.inferSchema(rdd)
- schema = srdd.schema
+ df = rdd.toDF()
+ schema = df.schema
field = [f for f in schema.fields if f.name == "features"][0]
self.assertEqual(field.dataType, self.udt)
- vectors = srdd.map(lambda p: p.features).collect()
+ vectors = df.map(lambda p: p.features).collect()
self.assertEqual(len(vectors), 2)
for v in vectors:
if isinstance(v, SparseVector):
@@ -695,7 +695,7 @@ class ChiSqTestTests(PySparkTestCase):
class SerDeTest(PySparkTestCase):
def test_to_java_object_rdd(self): # SPARK-6660
- data = RandomRDDs.uniformRDD(self.sc, 10, 5, seed=0L)
+ data = RandomRDDs.uniformRDD(self.sc, 10, 5, seed=0)
self.assertEqual(_to_java_object_rdd(data).count(), 10)
@@ -771,7 +771,7 @@ class StandardScalerTests(PySparkTestCase):
if __name__ == "__main__":
if not _have_scipy:
- print "NOTE: Skipping SciPy tests as it does not seem to be installed"
+ print("NOTE: Skipping SciPy tests as it does not seem to be installed")
unittest.main()
if not _have_scipy:
- print "NOTE: SciPy tests were skipped as it does not seem to be installed"
+ print("NOTE: SciPy tests were skipped as it does not seem to be installed")
http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/python/pyspark/mllib/tree.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/tree.py b/python/pyspark/mllib/tree.py
index a7a4d2a..0fe6e4f 100644
--- a/python/pyspark/mllib/tree.py
+++ b/python/pyspark/mllib/tree.py
@@ -163,14 +163,16 @@ class DecisionTree(object):
... LabeledPoint(1.0, [3.0])
... ]
>>> model = DecisionTree.trainClassifier(sc.parallelize(data), 2, {})
- >>> print model, # it already has newline
+ >>> print(model)
DecisionTreeModel classifier of depth 1 with 3 nodes
- >>> print model.toDebugString(), # it already has newline
+
+ >>> print(model.toDebugString())
DecisionTreeModel classifier of depth 1 with 3 nodes
If (feature 0 <= 0.0)
Predict: 0.0
Else (feature 0 > 0.0)
Predict: 1.0
+ <BLANKLINE>
>>> model.predict(array([1.0]))
1.0
>>> model.predict(array([0.0]))
@@ -318,9 +320,10 @@ class RandomForest(object):
3
>>> model.totalNumNodes()
7
- >>> print model,
+ >>> print(model)
TreeEnsembleModel classifier with 3 trees
- >>> print model.toDebugString(),
+ <BLANKLINE>
+ >>> print(model.toDebugString())
TreeEnsembleModel classifier with 3 trees
<BLANKLINE>
Tree 0:
@@ -335,6 +338,7 @@ class RandomForest(object):
Predict: 0.0
Else (feature 0 > 1.0)
Predict: 1.0
+ <BLANKLINE>
>>> model.predict([2.0])
1.0
>>> model.predict([0.0])
@@ -483,8 +487,9 @@ class GradientBoostedTrees(object):
100
>>> model.totalNumNodes()
300
- >>> print model, # it already has newline
+ >>> print(model) # it already has newline
TreeEnsembleModel classifier with 100 trees
+ <BLANKLINE>
>>> model.predict([2.0])
1.0
>>> model.predict([0.0])
http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/python/pyspark/mllib/util.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/util.py b/python/pyspark/mllib/util.py
index c5c3468..16a90db 100644
--- a/python/pyspark/mllib/util.py
+++ b/python/pyspark/mllib/util.py
@@ -15,10 +15,14 @@
# limitations under the License.
#
+import sys
import numpy as np
import warnings
-from pyspark.mllib.common import callMLlibFunc, JavaModelWrapper, inherit_doc
+if sys.version > '3':
+ xrange = range
+
+from pyspark.mllib.common import callMLlibFunc, inherit_doc
from pyspark.mllib.linalg import Vectors, SparseVector, _convert_to_vector
@@ -94,22 +98,16 @@ class MLUtils(object):
>>> from pyspark.mllib.util import MLUtils
>>> from pyspark.mllib.regression import LabeledPoint
>>> tempFile = NamedTemporaryFile(delete=True)
- >>> tempFile.write("+1 1:1.0 3:2.0 5:3.0\\n-1\\n-1 2:4.0 4:5.0 6:6.0")
+ >>> _ = tempFile.write(b"+1 1:1.0 3:2.0 5:3.0\\n-1\\n-1 2:4.0 4:5.0 6:6.0")
>>> tempFile.flush()
>>> examples = MLUtils.loadLibSVMFile(sc, tempFile.name).collect()
>>> tempFile.close()
- >>> type(examples[0]) == LabeledPoint
- True
- >>> print examples[0]
- (1.0,(6,[0,2,4],[1.0,2.0,3.0]))
- >>> type(examples[1]) == LabeledPoint
- True
- >>> print examples[1]
- (-1.0,(6,[],[]))
- >>> type(examples[2]) == LabeledPoint
- True
- >>> print examples[2]
- (-1.0,(6,[1,3,5],[4.0,5.0,6.0]))
+ >>> examples[0]
+ LabeledPoint(1.0, (6,[0,2,4],[1.0,2.0,3.0]))
+ >>> examples[1]
+ LabeledPoint(-1.0, (6,[],[]))
+ >>> examples[2]
+ LabeledPoint(-1.0, (6,[1,3,5],[4.0,5.0,6.0]))
"""
from pyspark.mllib.regression import LabeledPoint
if multiclass is not None:
http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/python/pyspark/profiler.py
----------------------------------------------------------------------
diff --git a/python/pyspark/profiler.py b/python/pyspark/profiler.py
index 4408996..d18daaa 100644
--- a/python/pyspark/profiler.py
+++ b/python/pyspark/profiler.py
@@ -84,11 +84,11 @@ class Profiler(object):
>>> from pyspark import BasicProfiler
>>> class MyCustomProfiler(BasicProfiler):
... def show(self, id):
- ... print "My custom profiles for RDD:%s" % id
+ ... print("My custom profiles for RDD:%s" % id)
...
>>> conf = SparkConf().set("spark.python.profile", "true")
>>> sc = SparkContext('local', 'test', conf=conf, profiler_cls=MyCustomProfiler)
- >>> sc.parallelize(list(range(1000))).map(lambda x: 2 * x).take(10)
+ >>> sc.parallelize(range(1000)).map(lambda x: 2 * x).take(10)
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
>>> sc.show_profiles()
My custom profiles for RDD:1
@@ -111,9 +111,9 @@ class Profiler(object):
""" Print the profile stats to stdout, id is the RDD id """
stats = self.stats()
if stats:
- print "=" * 60
- print "Profile of RDD<id=%d>" % id
- print "=" * 60
+ print("=" * 60)
+ print("Profile of RDD<id=%d>" % id)
+ print("=" * 60)
stats.sort_stats("time", "cumulative").print_stats()
def dump(self, id, path):
http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/python/pyspark/rdd.py
----------------------------------------------------------------------
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 93e658e..d9cdbb6 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -16,21 +16,29 @@
#
import copy
-from collections import defaultdict
-from itertools import chain, ifilter, imap
-import operator
import sys
+import os
+import re
+import operator
import shlex
-from subprocess import Popen, PIPE
-from tempfile import NamedTemporaryFile
-from threading import Thread
import warnings
import heapq
import bisect
import random
import socket
+from subprocess import Popen, PIPE
+from tempfile import NamedTemporaryFile
+from threading import Thread
+from collections import defaultdict
+from itertools import chain
+from functools import reduce
from math import sqrt, log, isinf, isnan, pow, ceil
+if sys.version > '3':
+ basestring = unicode = str
+else:
+ from itertools import imap as map, ifilter as filter
+
from pyspark.serializers import NoOpSerializer, CartesianDeserializer, \
BatchedSerializer, CloudPickleSerializer, PairDeserializer, \
PickleSerializer, pack_long, AutoBatchedSerializer
@@ -50,20 +58,21 @@ from py4j.java_collections import ListConverter, MapConverter
__all__ = ["RDD"]
-# TODO: for Python 3.3+, PYTHONHASHSEED should be reset to disable randomized
-# hash for string
def portable_hash(x):
"""
- This function returns consistant hash code for builtin types, especially
+ This function returns consistent hash code for builtin types, especially
for None and tuple with None.
- The algrithm is similar to that one used by CPython 2.7
+ The algorithm is similar to that one used by CPython 2.7
>>> portable_hash(None)
0
>>> portable_hash((None, 1)) & 0xffffffff
219750521
"""
+ if sys.version >= '3.3' and 'PYTHONHASHSEED' not in os.environ:
+ raise Exception("Randomness of hash of string should be disabled via PYTHONHASHSEED")
+
if x is None:
return 0
if isinstance(x, tuple):
@@ -71,7 +80,7 @@ def portable_hash(x):
for i in x:
h ^= portable_hash(i)
h *= 1000003
- h &= sys.maxint
+ h &= sys.maxsize
h ^= len(x)
if h == -1:
h = -2
@@ -123,6 +132,19 @@ def _load_from_socket(port, serializer):
sock.close()
+def ignore_unicode_prefix(f):
+ """
+ Ignore the 'u' prefix of string in doc tests, to make it works
+ in both python 2 and 3
+ """
+ if sys.version >= '3':
+ # the representation of unicode string in Python 3 does not have prefix 'u',
+ # so remove the prefix 'u' for doc tests
+ literal_re = re.compile(r"(\W|^)[uU](['])", re.UNICODE)
+ f.__doc__ = literal_re.sub(r'\1\2', f.__doc__)
+ return f
+
+
class Partitioner(object):
def __init__(self, numPartitions, partitionFunc):
self.numPartitions = numPartitions
@@ -251,7 +273,7 @@ class RDD(object):
[('a', 1), ('b', 1), ('c', 1)]
"""
def func(_, iterator):
- return imap(f, iterator)
+ return map(f, iterator)
return self.mapPartitionsWithIndex(func, preservesPartitioning)
def flatMap(self, f, preservesPartitioning=False):
@@ -266,7 +288,7 @@ class RDD(object):
[(2, 2), (2, 2), (3, 3), (3, 3), (4, 4), (4, 4)]
"""
def func(s, iterator):
- return chain.from_iterable(imap(f, iterator))
+ return chain.from_iterable(map(f, iterator))
return self.mapPartitionsWithIndex(func, preservesPartitioning)
def mapPartitions(self, f, preservesPartitioning=False):
@@ -329,7 +351,7 @@ class RDD(object):
[2, 4]
"""
def func(iterator):
- return ifilter(f, iterator)
+ return filter(f, iterator)
return self.mapPartitions(func, True)
def distinct(self, numPartitions=None):
@@ -341,7 +363,7 @@ class RDD(object):
"""
return self.map(lambda x: (x, None)) \
.reduceByKey(lambda x, _: x, numPartitions) \
- .map(lambda (x, _): x)
+ .map(lambda x: x[0])
def sample(self, withReplacement, fraction, seed=None):
"""
@@ -354,8 +376,8 @@ class RDD(object):
:param seed: seed for the random number generator
>>> rdd = sc.parallelize(range(100), 4)
- >>> rdd.sample(False, 0.1, 81).count()
- 10
+ >>> 6 <= rdd.sample(False, 0.1, 81).count() <= 14
+ True
"""
assert fraction >= 0.0, "Negative fraction value: %s" % fraction
return self.mapPartitionsWithIndex(RDDSampler(withReplacement, fraction, seed).func, True)
@@ -368,12 +390,14 @@ class RDD(object):
:param seed: random seed
:return: split RDDs in a list
- >>> rdd = sc.parallelize(range(5), 1)
+ >>> rdd = sc.parallelize(range(500), 1)
>>> rdd1, rdd2 = rdd.randomSplit([2, 3], 17)
- >>> rdd1.collect()
- [1, 3]
- >>> rdd2.collect()
- [0, 2, 4]
+ >>> len(rdd1.collect() + rdd2.collect())
+ 500
+ >>> 150 < rdd1.count() < 250
+ True
+ >>> 250 < rdd2.count() < 350
+ True
"""
s = float(sum(weights))
cweights = [0.0]
@@ -416,7 +440,7 @@ class RDD(object):
rand.shuffle(samples)
return samples
- maxSampleSize = sys.maxint - int(numStDev * sqrt(sys.maxint))
+ maxSampleSize = sys.maxsize - int(numStDev * sqrt(sys.maxsize))
if num > maxSampleSize:
raise ValueError(
"Sample size cannot be greater than %d." % maxSampleSize)
@@ -430,7 +454,7 @@ class RDD(object):
# See: scala/spark/RDD.scala
while len(samples) < num:
# TODO: add log warning for when more than one iteration was run
- seed = rand.randint(0, sys.maxint)
+ seed = rand.randint(0, sys.maxsize)
samples = self.sample(withReplacement, fraction, seed).collect()
rand.shuffle(samples)
@@ -507,7 +531,7 @@ class RDD(object):
"""
return self.map(lambda v: (v, None)) \
.cogroup(other.map(lambda v: (v, None))) \
- .filter(lambda (k, vs): all(vs)) \
+ .filter(lambda k_vs: all(k_vs[1])) \
.keys()
def _reserialize(self, serializer=None):
@@ -549,7 +573,7 @@ class RDD(object):
def sortPartition(iterator):
sort = ExternalSorter(memory * 0.9, serializer).sorted if spill else sorted
- return iter(sort(iterator, key=lambda (k, v): keyfunc(k), reverse=(not ascending)))
+ return iter(sort(iterator, key=lambda k_v: keyfunc(k_v[0]), reverse=(not ascending)))
return self.partitionBy(numPartitions, partitionFunc).mapPartitions(sortPartition, True)
@@ -579,7 +603,7 @@ class RDD(object):
def sortPartition(iterator):
sort = ExternalSorter(memory * 0.9, serializer).sorted if spill else sorted
- return iter(sort(iterator, key=lambda (k, v): keyfunc(k), reverse=(not ascending)))
+ return iter(sort(iterator, key=lambda kv: keyfunc(kv[0]), reverse=(not ascending)))
if numPartitions == 1:
if self.getNumPartitions() > 1:
@@ -594,12 +618,12 @@ class RDD(object):
return self # empty RDD
maxSampleSize = numPartitions * 20.0 # constant from Spark's RangePartitioner
fraction = min(maxSampleSize / max(rddSize, 1), 1.0)
- samples = self.sample(False, fraction, 1).map(lambda (k, v): k).collect()
+ samples = self.sample(False, fraction, 1).map(lambda kv: kv[0]).collect()
samples = sorted(samples, key=keyfunc)
# we have numPartitions many parts but one of the them has
# an implicit boundary
- bounds = [samples[len(samples) * (i + 1) / numPartitions]
+ bounds = [samples[int(len(samples) * (i + 1) / numPartitions)]
for i in range(0, numPartitions - 1)]
def rangePartitioner(k):
@@ -662,12 +686,13 @@ class RDD(object):
"""
return self.map(lambda x: (f(x), x)).groupByKey(numPartitions)
+ @ignore_unicode_prefix
def pipe(self, command, env={}):
"""
Return an RDD created by piping elements to a forked external process.
>>> sc.parallelize(['1', '2', '', '3']).pipe('cat').collect()
- ['1', '2', '', '3']
+ [u'1', u'2', u'', u'3']
"""
def func(iterator):
pipe = Popen(
@@ -675,17 +700,18 @@ class RDD(object):
def pipe_objs(out):
for obj in iterator:
- out.write(str(obj).rstrip('\n') + '\n')
+ s = str(obj).rstrip('\n') + '\n'
+ out.write(s.encode('utf-8'))
out.close()
Thread(target=pipe_objs, args=[pipe.stdin]).start()
- return (x.rstrip('\n') for x in iter(pipe.stdout.readline, ''))
+ return (x.rstrip(b'\n').decode('utf-8') for x in iter(pipe.stdout.readline, b''))
return self.mapPartitions(func)
def foreach(self, f):
"""
Applies a function to all elements of this RDD.
- >>> def f(x): print x
+ >>> def f(x): print(x)
>>> sc.parallelize([1, 2, 3, 4, 5]).foreach(f)
"""
def processPartition(iterator):
@@ -700,7 +726,7 @@ class RDD(object):
>>> def f(iterator):
... for x in iterator:
- ... print x
+ ... print(x)
>>> sc.parallelize([1, 2, 3, 4, 5]).foreachPartition(f)
"""
def func(it):
@@ -874,7 +900,7 @@ class RDD(object):
# aggregation.
while numPartitions > scale + numPartitions / scale:
numPartitions /= scale
- curNumPartitions = numPartitions
+ curNumPartitions = int(numPartitions)
def mapPartition(i, iterator):
for obj in iterator:
@@ -984,7 +1010,7 @@ class RDD(object):
(('a', 'b', 'c'), [2, 2])
"""
- if isinstance(buckets, (int, long)):
+ if isinstance(buckets, int):
if buckets < 1:
raise ValueError("number of buckets must be >= 1")
@@ -1020,6 +1046,7 @@ class RDD(object):
raise ValueError("Can not generate buckets with infinite value")
# keep them as integer if possible
+ inc = int(inc)
if inc * buckets != maxv - minv:
inc = (maxv - minv) * 1.0 / buckets
@@ -1137,7 +1164,7 @@ class RDD(object):
yield counts
def mergeMaps(m1, m2):
- for k, v in m2.iteritems():
+ for k, v in m2.items():
m1[k] += v
return m1
return self.mapPartitions(countPartition).reduce(mergeMaps)
@@ -1378,8 +1405,8 @@ class RDD(object):
>>> tmpFile = NamedTemporaryFile(delete=True)
>>> tmpFile.close()
>>> sc.parallelize([1, 2, 'spark', 'rdd']).saveAsPickleFile(tmpFile.name, 3)
- >>> sorted(sc.pickleFile(tmpFile.name, 5).collect())
- [1, 2, 'rdd', 'spark']
+ >>> sorted(sc.pickleFile(tmpFile.name, 5).map(str).collect())
+ ['1', '2', 'rdd', 'spark']
"""
if batchSize == 0:
ser = AutoBatchedSerializer(PickleSerializer())
@@ -1387,6 +1414,7 @@ class RDD(object):
ser = BatchedSerializer(PickleSerializer(), batchSize)
self._reserialize(ser)._jrdd.saveAsObjectFile(path)
+ @ignore_unicode_prefix
def saveAsTextFile(self, path, compressionCodecClass=None):
"""
Save this RDD as a text file, using string representations of elements.
@@ -1418,12 +1446,13 @@ class RDD(object):
>>> codec = "org.apache.hadoop.io.compress.GzipCodec"
>>> sc.parallelize(['foo', 'bar']).saveAsTextFile(tempFile3.name, codec)
>>> from fileinput import input, hook_compressed
- >>> ''.join(sorted(input(glob(tempFile3.name + "/part*.gz"), openhook=hook_compressed)))
- 'bar\\nfoo\\n'
+ >>> result = sorted(input(glob(tempFile3.name + "/part*.gz"), openhook=hook_compressed))
+ >>> b''.join(result).decode('utf-8')
+ u'bar\\nfoo\\n'
"""
def func(split, iterator):
for x in iterator:
- if not isinstance(x, basestring):
+ if not isinstance(x, (unicode, bytes)):
x = unicode(x)
if isinstance(x, unicode):
x = x.encode("utf-8")
@@ -1458,7 +1487,7 @@ class RDD(object):
>>> m.collect()
[1, 3]
"""
- return self.map(lambda (k, v): k)
+ return self.map(lambda x: x[0])
def values(self):
"""
@@ -1468,7 +1497,7 @@ class RDD(object):
>>> m.collect()
[2, 4]
"""
- return self.map(lambda (k, v): v)
+ return self.map(lambda x: x[1])
def reduceByKey(self, func, numPartitions=None):
"""
@@ -1507,7 +1536,7 @@ class RDD(object):
yield m
def mergeMaps(m1, m2):
- for k, v in m2.iteritems():
+ for k, v in m2.items():
m1[k] = func(m1[k], v) if k in m1 else v
return m1
return self.mapPartitions(reducePartition).reduce(mergeMaps)
@@ -1604,8 +1633,8 @@ class RDD(object):
>>> pairs = sc.parallelize([1, 2, 3, 4, 2, 4, 1]).map(lambda x: (x, x))
>>> sets = pairs.partitionBy(2).glom().collect()
- >>> set(sets[0]).intersection(set(sets[1]))
- set([])
+ >>> len(set(sets[0]).intersection(set(sets[1])))
+ 0
"""
if numPartitions is None:
numPartitions = self._defaultReducePartitions()
@@ -1637,22 +1666,22 @@ class RDD(object):
if (c % 1000 == 0 and get_used_memory() > limit
or c > batch):
n, size = len(buckets), 0
- for split in buckets.keys():
+ for split in list(buckets.keys()):
yield pack_long(split)
d = outputSerializer.dumps(buckets[split])
del buckets[split]
yield d
size += len(d)
- avg = (size / n) >> 20
+ avg = int(size / n) >> 20
# let 1M < avg < 10M
if avg < 1:
batch *= 1.5
elif avg > 10:
- batch = max(batch / 1.5, 1)
+ batch = max(int(batch / 1.5), 1)
c = 0
- for split, items in buckets.iteritems():
+ for split, items in buckets.items():
yield pack_long(split)
yield outputSerializer.dumps(items)
@@ -1707,7 +1736,7 @@ class RDD(object):
merger = ExternalMerger(agg, memory * 0.9, serializer) \
if spill else InMemoryMerger(agg)
merger.mergeValues(iterator)
- return merger.iteritems()
+ return merger.items()
locally_combined = self.mapPartitions(combineLocally, preservesPartitioning=True)
shuffled = locally_combined.partitionBy(numPartitions)
@@ -1716,7 +1745,7 @@ class RDD(object):
merger = ExternalMerger(agg, memory, serializer) \
if spill else InMemoryMerger(agg)
merger.mergeCombiners(iterator)
- return merger.iteritems()
+ return merger.items()
return shuffled.mapPartitions(_mergeCombiners, preservesPartitioning=True)
@@ -1745,7 +1774,7 @@ class RDD(object):
>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> from operator import add
- >>> rdd.foldByKey(0, add).collect()
+ >>> sorted(rdd.foldByKey(0, add).collect())
[('a', 2), ('b', 1)]
"""
def createZero():
@@ -1769,10 +1798,10 @@ class RDD(object):
sum or average) over each key, using reduceByKey or aggregateByKey will
provide much better performance.
- >>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
- >>> sorted(x.groupByKey().mapValues(len).collect())
+ >>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
+ >>> sorted(rdd.groupByKey().mapValues(len).collect())
[('a', 2), ('b', 1)]
- >>> sorted(x.groupByKey().mapValues(list).collect())
+ >>> sorted(rdd.groupByKey().mapValues(list).collect())
[('a', [1, 1]), ('b', [1])]
"""
def createCombiner(x):
@@ -1795,7 +1824,7 @@ class RDD(object):
merger = ExternalMerger(agg, memory * 0.9, serializer) \
if spill else InMemoryMerger(agg)
merger.mergeValues(iterator)
- return merger.iteritems()
+ return merger.items()
locally_combined = self.mapPartitions(combine, preservesPartitioning=True)
shuffled = locally_combined.partitionBy(numPartitions)
@@ -1804,7 +1833,7 @@ class RDD(object):
merger = ExternalGroupBy(agg, memory, serializer)\
if spill else InMemoryMerger(agg)
merger.mergeCombiners(it)
- return merger.iteritems()
+ return merger.items()
return shuffled.mapPartitions(groupByKey, True).mapValues(ResultIterable)
@@ -1819,7 +1848,7 @@ class RDD(object):
>>> x.flatMapValues(f).collect()
[('a', 'x'), ('a', 'y'), ('a', 'z'), ('b', 'p'), ('b', 'r')]
"""
- flat_map_fn = lambda (k, v): ((k, x) for x in f(v))
+ flat_map_fn = lambda kv: ((kv[0], x) for x in f(kv[1]))
return self.flatMap(flat_map_fn, preservesPartitioning=True)
def mapValues(self, f):
@@ -1833,7 +1862,7 @@ class RDD(object):
>>> x.mapValues(f).collect()
[('a', 3), ('b', 1)]
"""
- map_values_fn = lambda (k, v): (k, f(v))
+ map_values_fn = lambda kv: (kv[0], f(kv[1]))
return self.map(map_values_fn, preservesPartitioning=True)
def groupWith(self, other, *others):
@@ -1844,8 +1873,7 @@ class RDD(object):
>>> x = sc.parallelize([("a", 1), ("b", 4)])
>>> y = sc.parallelize([("a", 2)])
>>> z = sc.parallelize([("b", 42)])
- >>> map((lambda (x,y): (x, (list(y[0]), list(y[1]), list(y[2]), list(y[3])))), \
- sorted(list(w.groupWith(x, y, z).collect())))
+ >>> [(x, tuple(map(list, y))) for x, y in sorted(list(w.groupWith(x, y, z).collect()))]
[('a', ([5], [1], [2], [])), ('b', ([6], [4], [], [42]))]
"""
@@ -1860,7 +1888,7 @@ class RDD(object):
>>> x = sc.parallelize([("a", 1), ("b", 4)])
>>> y = sc.parallelize([("a", 2)])
- >>> map((lambda (x,y): (x, (list(y[0]), list(y[1])))), sorted(list(x.cogroup(y).collect())))
+ >>> [(x, tuple(map(list, y))) for x, y in sorted(list(x.cogroup(y).collect()))]
[('a', ([1], [2])), ('b', ([4], []))]
"""
return python_cogroup((self, other), numPartitions)
@@ -1896,8 +1924,9 @@ class RDD(object):
>>> sorted(x.subtractByKey(y).collect())
[('b', 4), ('b', 5)]
"""
- def filter_func((key, vals)):
- return vals[0] and not vals[1]
+ def filter_func(pair):
+ key, (val1, val2) = pair
+ return val1 and not val2
return self.cogroup(other, numPartitions).filter(filter_func).flatMapValues(lambda x: x[0])
def subtract(self, other, numPartitions=None):
@@ -1919,8 +1948,8 @@ class RDD(object):
>>> x = sc.parallelize(range(0,3)).keyBy(lambda x: x*x)
>>> y = sc.parallelize(zip(range(0,5), range(0,5)))
- >>> map((lambda (x,y): (x, (list(y[0]), (list(y[1]))))), sorted(x.cogroup(y).collect()))
- [(0, ([0], [0])), (1, ([1], [1])), (2, ([], [2])), (3, ([], [3])), (4, ([2], [4]))]
+ >>> [(x, list(map(list, y))) for x, y in sorted(x.cogroup(y).collect())]
+ [(0, [[0], [0]]), (1, [[1], [1]]), (2, [[], [2]]), (3, [[], [3]]), (4, [[2], [4]])]
"""
return self.map(lambda x: (f(x), x))
@@ -2049,17 +2078,18 @@ class RDD(object):
"""
Return the name of this RDD.
"""
- name_ = self._jrdd.name()
- if name_:
- return name_.encode('utf-8')
+ n = self._jrdd.name()
+ if n:
+ return n
+ @ignore_unicode_prefix
def setName(self, name):
"""
Assign a name to this RDD.
- >>> rdd1 = sc.parallelize([1,2])
+ >>> rdd1 = sc.parallelize([1, 2])
>>> rdd1.setName('RDD1').name()
- 'RDD1'
+ u'RDD1'
"""
self._jrdd.setName(name)
return self
@@ -2121,7 +2151,7 @@ class RDD(object):
>>> sorted.lookup(1024)
[]
"""
- values = self.filter(lambda (k, v): k == key).values()
+ values = self.filter(lambda kv: kv[0] == key).values()
if self.partitioner is not None:
return self.ctx.runJob(values, lambda x: x, [self.partitioner(key)], False)
@@ -2159,7 +2189,7 @@ class RDD(object):
or meet the confidence.
>>> rdd = sc.parallelize(range(1000), 10)
- >>> r = sum(xrange(1000))
+ >>> r = sum(range(1000))
>>> (rdd.sumApprox(1000) - r) / r < 0.05
True
"""
@@ -2176,7 +2206,7 @@ class RDD(object):
or meet the confidence.
>>> rdd = sc.parallelize(range(1000), 10)
- >>> r = sum(xrange(1000)) / 1000.0
+ >>> r = sum(range(1000)) / 1000.0
>>> (rdd.meanApprox(1000) - r) / r < 0.05
True
"""
@@ -2201,10 +2231,10 @@ class RDD(object):
It must be greater than 0.000017.
>>> n = sc.parallelize(range(1000)).map(str).countApproxDistinct()
- >>> 950 < n < 1050
+ >>> 900 < n < 1100
True
>>> n = sc.parallelize([i % 20 for i in range(1000)]).countApproxDistinct()
- >>> 18 < n < 22
+ >>> 16 < n < 24
True
"""
if relativeSD < 0.000017:
@@ -2223,8 +2253,7 @@ class RDD(object):
>>> [x for x in rdd.toLocalIterator()]
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
"""
- partitions = xrange(self.getNumPartitions())
- for partition in partitions:
+ for partition in range(self.getNumPartitions()):
rows = self.context.runJob(self, lambda x: x, [partition])
for row in rows:
yield row
http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/python/pyspark/rddsampler.py
----------------------------------------------------------------------
diff --git a/python/pyspark/rddsampler.py b/python/pyspark/rddsampler.py
index 459e142..fe8f873 100644
--- a/python/pyspark/rddsampler.py
+++ b/python/pyspark/rddsampler.py
@@ -23,7 +23,7 @@ import math
class RDDSamplerBase(object):
def __init__(self, withReplacement, seed=None):
- self._seed = seed if seed is not None else random.randint(0, sys.maxint)
+ self._seed = seed if seed is not None else random.randint(0, sys.maxsize)
self._withReplacement = withReplacement
self._random = None
@@ -31,7 +31,7 @@ class RDDSamplerBase(object):
self._random = random.Random(self._seed ^ split)
# mixing because the initial seeds are close to each other
- for _ in xrange(10):
+ for _ in range(10):
self._random.randint(0, 1)
def getUniformSample(self):
http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/python/pyspark/serializers.py
----------------------------------------------------------------------
diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py
index 4afa82f..d8cdcda 100644
--- a/python/pyspark/serializers.py
+++ b/python/pyspark/serializers.py
@@ -49,16 +49,24 @@ which contains two batches of two objects:
>>> sc.stop()
"""
-import cPickle
-from itertools import chain, izip, product
+import sys
+from itertools import chain, product
import marshal
import struct
-import sys
import types
import collections
import zlib
import itertools
+if sys.version < '3':
+ import cPickle as pickle
+ protocol = 2
+ from itertools import izip as zip
+else:
+ import pickle
+ protocol = 3
+ xrange = range
+
from pyspark import cloudpickle
@@ -97,7 +105,7 @@ class Serializer(object):
# subclasses should override __eq__ as appropriate.
def __eq__(self, other):
- return isinstance(other, self.__class__)
+ return isinstance(other, self.__class__) and self.__dict__ == other.__dict__
def __ne__(self, other):
return not self.__eq__(other)
@@ -212,10 +220,6 @@ class BatchedSerializer(Serializer):
def _load_stream_without_unbatching(self, stream):
return self.serializer.load_stream(stream)
- def __eq__(self, other):
- return (isinstance(other, BatchedSerializer) and
- other.serializer == self.serializer and other.batchSize == self.batchSize)
-
def __repr__(self):
return "BatchedSerializer(%s, %d)" % (str(self.serializer), self.batchSize)
@@ -233,14 +237,14 @@ class FlattenedValuesSerializer(BatchedSerializer):
def _batched(self, iterator):
n = self.batchSize
for key, values in iterator:
- for i in xrange(0, len(values), n):
+ for i in range(0, len(values), n):
yield key, values[i:i + n]
def load_stream(self, stream):
return self.serializer.load_stream(stream)
def __repr__(self):
- return "FlattenedValuesSerializer(%d)" % self.batchSize
+ return "FlattenedValuesSerializer(%s, %d)" % (self.serializer, self.batchSize)
class AutoBatchedSerializer(BatchedSerializer):
@@ -270,12 +274,8 @@ class AutoBatchedSerializer(BatchedSerializer):
elif size > best * 10 and batch > 1:
batch /= 2
- def __eq__(self, other):
- return (isinstance(other, AutoBatchedSerializer) and
- other.serializer == self.serializer and other.bestSize == self.bestSize)
-
def __repr__(self):
- return "AutoBatchedSerializer(%s)" % str(self.serializer)
+ return "AutoBatchedSerializer(%s)" % self.serializer
class CartesianDeserializer(FramedSerializer):
@@ -285,6 +285,7 @@ class CartesianDeserializer(FramedSerializer):
"""
def __init__(self, key_ser, val_ser):
+ FramedSerializer.__init__(self)
self.key_ser = key_ser
self.val_ser = val_ser
@@ -293,7 +294,7 @@ class CartesianDeserializer(FramedSerializer):
val_stream = self.val_ser._load_stream_without_unbatching(stream)
key_is_batched = isinstance(self.key_ser, BatchedSerializer)
val_is_batched = isinstance(self.val_ser, BatchedSerializer)
- for (keys, vals) in izip(key_stream, val_stream):
+ for (keys, vals) in zip(key_stream, val_stream):
keys = keys if key_is_batched else [keys]
vals = vals if val_is_batched else [vals]
yield (keys, vals)
@@ -303,10 +304,6 @@ class CartesianDeserializer(FramedSerializer):
for pair in product(keys, vals):
yield pair
- def __eq__(self, other):
- return (isinstance(other, CartesianDeserializer) and
- self.key_ser == other.key_ser and self.val_ser == other.val_ser)
-
def __repr__(self):
return "CartesianDeserializer(%s, %s)" % \
(str(self.key_ser), str(self.val_ser))
@@ -318,22 +315,14 @@ class PairDeserializer(CartesianDeserializer):
Deserializes the JavaRDD zip() of two PythonRDDs.
"""
- def __init__(self, key_ser, val_ser):
- self.key_ser = key_ser
- self.val_ser = val_ser
-
def load_stream(self, stream):
for (keys, vals) in self.prepare_keys_values(stream):
if len(keys) != len(vals):
raise ValueError("Can not deserialize RDD with different number of items"
" in pair: (%d, %d)" % (len(keys), len(vals)))
- for pair in izip(keys, vals):
+ for pair in zip(keys, vals):
yield pair
- def __eq__(self, other):
- return (isinstance(other, PairDeserializer) and
- self.key_ser == other.key_ser and self.val_ser == other.val_ser)
-
def __repr__(self):
return "PairDeserializer(%s, %s)" % (str(self.key_ser), str(self.val_ser))
@@ -382,8 +371,8 @@ def _hijack_namedtuple():
global _old_namedtuple # or it will put in closure
def _copy_func(f):
- return types.FunctionType(f.func_code, f.func_globals, f.func_name,
- f.func_defaults, f.func_closure)
+ return types.FunctionType(f.__code__, f.__globals__, f.__name__,
+ f.__defaults__, f.__closure__)
_old_namedtuple = _copy_func(collections.namedtuple)
@@ -392,15 +381,15 @@ def _hijack_namedtuple():
return _hack_namedtuple(cls)
# replace namedtuple with new one
- collections.namedtuple.func_globals["_old_namedtuple"] = _old_namedtuple
- collections.namedtuple.func_globals["_hack_namedtuple"] = _hack_namedtuple
- collections.namedtuple.func_code = namedtuple.func_code
+ collections.namedtuple.__globals__["_old_namedtuple"] = _old_namedtuple
+ collections.namedtuple.__globals__["_hack_namedtuple"] = _hack_namedtuple
+ collections.namedtuple.__code__ = namedtuple.__code__
collections.namedtuple.__hijack = 1
# hack the cls already generated by namedtuple
# those created in other module can be pickled as normal,
# so only hack those in __main__ module
- for n, o in sys.modules["__main__"].__dict__.iteritems():
+ for n, o in sys.modules["__main__"].__dict__.items():
if (type(o) is type and o.__base__ is tuple
and hasattr(o, "_fields")
and "__reduce__" not in o.__dict__):
@@ -413,7 +402,7 @@ _hijack_namedtuple()
class PickleSerializer(FramedSerializer):
"""
- Serializes objects using Python's cPickle serializer:
+ Serializes objects using Python's pickle serializer:
http://docs.python.org/2/library/pickle.html
@@ -422,10 +411,14 @@ class PickleSerializer(FramedSerializer):
"""
def dumps(self, obj):
- return cPickle.dumps(obj, 2)
+ return pickle.dumps(obj, protocol)
- def loads(self, obj):
- return cPickle.loads(obj)
+ if sys.version >= '3':
+ def loads(self, obj, encoding="bytes"):
+ return pickle.loads(obj, encoding=encoding)
+ else:
+ def loads(self, obj, encoding=None):
+ return pickle.loads(obj)
class CloudPickleSerializer(PickleSerializer):
@@ -454,7 +447,7 @@ class MarshalSerializer(FramedSerializer):
class AutoSerializer(FramedSerializer):
"""
- Choose marshal or cPickle as serialization protocol automatically
+ Choose marshal or pickle as serialization protocol automatically
"""
def __init__(self):
@@ -463,19 +456,19 @@ class AutoSerializer(FramedSerializer):
def dumps(self, obj):
if self._type is not None:
- return 'P' + cPickle.dumps(obj, -1)
+ return b'P' + pickle.dumps(obj, -1)
try:
- return 'M' + marshal.dumps(obj)
+ return b'M' + marshal.dumps(obj)
except Exception:
- self._type = 'P'
- return 'P' + cPickle.dumps(obj, -1)
+ self._type = b'P'
+ return b'P' + pickle.dumps(obj, -1)
def loads(self, obj):
_type = obj[0]
- if _type == 'M':
+ if _type == b'M':
return marshal.loads(obj[1:])
- elif _type == 'P':
- return cPickle.loads(obj[1:])
+ elif _type == b'P':
+ return pickle.loads(obj[1:])
else:
raise ValueError("invalid sevialization type: %s" % _type)
@@ -495,8 +488,8 @@ class CompressedSerializer(FramedSerializer):
def loads(self, obj):
return self.serializer.loads(zlib.decompress(obj))
- def __eq__(self, other):
- return isinstance(other, CompressedSerializer) and self.serializer == other.serializer
+ def __repr__(self):
+ return "CompressedSerializer(%s)" % self.serializer
class UTF8Deserializer(Serializer):
@@ -505,7 +498,7 @@ class UTF8Deserializer(Serializer):
Deserializes streams written by String.getBytes.
"""
- def __init__(self, use_unicode=False):
+ def __init__(self, use_unicode=True):
self.use_unicode = use_unicode
def loads(self, stream):
@@ -526,13 +519,13 @@ class UTF8Deserializer(Serializer):
except EOFError:
return
- def __eq__(self, other):
- return isinstance(other, UTF8Deserializer) and self.use_unicode == other.use_unicode
+ def __repr__(self):
+ return "UTF8Deserializer(%s)" % self.use_unicode
def read_long(stream):
length = stream.read(8)
- if length == "":
+ if not length:
raise EOFError
return struct.unpack("!q", length)[0]
@@ -547,7 +540,7 @@ def pack_long(value):
def read_int(stream):
length = stream.read(4)
- if length == "":
+ if not length:
raise EOFError
return struct.unpack("!i", length)[0]
http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/python/pyspark/shell.py
----------------------------------------------------------------------
diff --git a/python/pyspark/shell.py b/python/pyspark/shell.py
index 81aa970..144cdf0 100644
--- a/python/pyspark/shell.py
+++ b/python/pyspark/shell.py
@@ -21,13 +21,6 @@ An interactive shell.
This file is designed to be launched as a PYTHONSTARTUP script.
"""
-import sys
-if sys.version_info[0] != 2:
- print("Error: Default Python used is Python%s" % sys.version_info.major)
- print("\tSet env variable PYSPARK_PYTHON to Python2 binary and re-run it.")
- sys.exit(1)
-
-
import atexit
import os
import platform
@@ -53,9 +46,14 @@ atexit.register(lambda: sc.stop())
try:
# Try to access HiveConf, it will raise exception if Hive is not added
sc._jvm.org.apache.hadoop.hive.conf.HiveConf()
- sqlCtx = sqlContext = HiveContext(sc)
+ sqlContext = HiveContext(sc)
except py4j.protocol.Py4JError:
- sqlCtx = sqlContext = SQLContext(sc)
+ sqlContext = SQLContext(sc)
+except TypeError:
+ sqlContext = SQLContext(sc)
+
+# for compatibility
+sqlCtx = sqlContext
print("""Welcome to
____ __
http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/python/pyspark/shuffle.py
----------------------------------------------------------------------
diff --git a/python/pyspark/shuffle.py b/python/pyspark/shuffle.py
index 8a6fc62..b54baa5 100644
--- a/python/pyspark/shuffle.py
+++ b/python/pyspark/shuffle.py
@@ -78,8 +78,8 @@ def _get_local_dirs(sub):
# global stats
-MemoryBytesSpilled = 0L
-DiskBytesSpilled = 0L
+MemoryBytesSpilled = 0
+DiskBytesSpilled = 0
class Aggregator(object):
@@ -126,7 +126,7 @@ class Merger(object):
""" Merge the combined items by mergeCombiner """
raise NotImplementedError
- def iteritems(self):
+ def items(self):
""" Return the merged items ad iterator """
raise NotImplementedError
@@ -156,9 +156,9 @@ class InMemoryMerger(Merger):
for k, v in iterator:
d[k] = comb(d[k], v) if k in d else v
- def iteritems(self):
- """ Return the merged items as iterator """
- return self.data.iteritems()
+ def items(self):
+ """ Return the merged items ad iterator """
+ return iter(self.data.items())
def _compressed_serializer(self, serializer=None):
@@ -208,15 +208,15 @@ class ExternalMerger(Merger):
>>> agg = SimpleAggregator(lambda x, y: x + y)
>>> merger = ExternalMerger(agg, 10)
>>> N = 10000
- >>> merger.mergeValues(zip(xrange(N), xrange(N)))
+ >>> merger.mergeValues(zip(range(N), range(N)))
>>> assert merger.spills > 0
- >>> sum(v for k,v in merger.iteritems())
+ >>> sum(v for k,v in merger.items())
49995000
>>> merger = ExternalMerger(agg, 10)
- >>> merger.mergeCombiners(zip(xrange(N), xrange(N)))
+ >>> merger.mergeCombiners(zip(range(N), range(N)))
>>> assert merger.spills > 0
- >>> sum(v for k,v in merger.iteritems())
+ >>> sum(v for k,v in merger.items())
49995000
"""
@@ -335,10 +335,10 @@ class ExternalMerger(Merger):
# above limit at the first time.
# open all the files for writing
- streams = [open(os.path.join(path, str(i)), 'w')
+ streams = [open(os.path.join(path, str(i)), 'wb')
for i in range(self.partitions)]
- for k, v in self.data.iteritems():
+ for k, v in self.data.items():
h = self._partition(k)
# put one item in batch, make it compatible with load_stream
# it will increase the memory if dump them in batch
@@ -354,9 +354,9 @@ class ExternalMerger(Merger):
else:
for i in range(self.partitions):
p = os.path.join(path, str(i))
- with open(p, "w") as f:
+ with open(p, "wb") as f:
# dump items in batch
- self.serializer.dump_stream(self.pdata[i].iteritems(), f)
+ self.serializer.dump_stream(iter(self.pdata[i].items()), f)
self.pdata[i].clear()
DiskBytesSpilled += os.path.getsize(p)
@@ -364,10 +364,10 @@ class ExternalMerger(Merger):
gc.collect() # release the memory as much as possible
MemoryBytesSpilled += (used_memory - get_used_memory()) << 20
- def iteritems(self):
+ def items(self):
""" Return all merged items as iterator """
if not self.pdata and not self.spills:
- return self.data.iteritems()
+ return iter(self.data.items())
return self._external_items()
def _external_items(self):
@@ -398,7 +398,8 @@ class ExternalMerger(Merger):
path = self._get_spill_dir(j)
p = os.path.join(path, str(index))
# do not check memory during merging
- self.mergeCombiners(self.serializer.load_stream(open(p)), 0)
+ with open(p, "rb") as f:
+ self.mergeCombiners(self.serializer.load_stream(f), 0)
# limit the total partitions
if (self.scale * self.partitions < self.MAX_TOTAL_PARTITIONS
@@ -408,7 +409,7 @@ class ExternalMerger(Merger):
gc.collect() # release the memory as much as possible
return self._recursive_merged_items(index)
- return self.data.iteritems()
+ return self.data.items()
def _recursive_merged_items(self, index):
"""
@@ -426,7 +427,8 @@ class ExternalMerger(Merger):
for j in range(self.spills):
path = self._get_spill_dir(j)
p = os.path.join(path, str(index))
- m.mergeCombiners(self.serializer.load_stream(open(p)), 0)
+ with open(p, 'rb') as f:
+ m.mergeCombiners(self.serializer.load_stream(f), 0)
if get_used_memory() > limit:
m._spill()
@@ -451,7 +453,7 @@ class ExternalSorter(object):
>>> sorter = ExternalSorter(1) # 1M
>>> import random
- >>> l = range(1024)
+ >>> l = list(range(1024))
>>> random.shuffle(l)
>>> sorted(l) == list(sorter.sorted(l))
True
@@ -499,9 +501,16 @@ class ExternalSorter(object):
# sort them inplace will save memory
current_chunk.sort(key=key, reverse=reverse)
path = self._get_path(len(chunks))
- with open(path, 'w') as f:
+ with open(path, 'wb') as f:
self.serializer.dump_stream(current_chunk, f)
- chunks.append(self.serializer.load_stream(open(path)))
+
+ def load(f):
+ for v in self.serializer.load_stream(f):
+ yield v
+ # close the file explicit once we consume all the items
+ # to avoid ResourceWarning in Python3
+ f.close()
+ chunks.append(load(open(path, 'rb')))
current_chunk = []
gc.collect()
limit = self._next_limit()
@@ -527,7 +536,7 @@ class ExternalList(object):
ExternalList can have many items which cannot be hold in memory in
the same time.
- >>> l = ExternalList(range(100))
+ >>> l = ExternalList(list(range(100)))
>>> len(l)
100
>>> l.append(10)
@@ -555,11 +564,11 @@ class ExternalList(object):
def __getstate__(self):
if self._file is not None:
self._file.flush()
- f = os.fdopen(os.dup(self._file.fileno()))
- f.seek(0)
- serialized = f.read()
+ with os.fdopen(os.dup(self._file.fileno()), "rb") as f:
+ f.seek(0)
+ serialized = f.read()
else:
- serialized = ''
+ serialized = b''
return self.values, self.count, serialized
def __setstate__(self, item):
@@ -575,7 +584,7 @@ class ExternalList(object):
if self._file is not None:
self._file.flush()
# read all items from disks first
- with os.fdopen(os.dup(self._file.fileno()), 'r') as f:
+ with os.fdopen(os.dup(self._file.fileno()), 'rb') as f:
f.seek(0)
for v in self._ser.load_stream(f):
yield v
@@ -598,11 +607,16 @@ class ExternalList(object):
d = dirs[id(self) % len(dirs)]
if not os.path.exists(d):
os.makedirs(d)
- p = os.path.join(d, str(id))
- self._file = open(p, "w+", 65536)
+ p = os.path.join(d, str(id(self)))
+ self._file = open(p, "wb+", 65536)
self._ser = BatchedSerializer(CompressedSerializer(PickleSerializer()), 1024)
os.unlink(p)
+ def __del__(self):
+ if self._file:
+ self._file.close()
+ self._file = None
+
def _spill(self):
""" dump the values into disk """
global MemoryBytesSpilled, DiskBytesSpilled
@@ -651,33 +665,28 @@ class GroupByKey(object):
"""
Group a sorted iterator as [(k1, it1), (k2, it2), ...]
- >>> k = [i/3 for i in range(6)]
+ >>> k = [i // 3 for i in range(6)]
>>> v = [[i] for i in range(6)]
- >>> g = GroupByKey(iter(zip(k, v)))
+ >>> g = GroupByKey(zip(k, v))
>>> [(k, list(it)) for k, it in g]
[(0, [0, 1, 2]), (1, [3, 4, 5])]
"""
def __init__(self, iterator):
- self.iterator = iter(iterator)
- self.next_item = None
+ self.iterator = iterator
def __iter__(self):
- return self
-
- def next(self):
- key, value = self.next_item if self.next_item else next(self.iterator)
- values = ExternalListOfList([value])
- try:
- while True:
- k, v = next(self.iterator)
- if k != key:
- self.next_item = (k, v)
- break
+ key, values = None, None
+ for k, v in self.iterator:
+ if values is not None and k == key:
values.append(v)
- except StopIteration:
- self.next_item = None
- return key, values
+ else:
+ if values is not None:
+ yield (key, values)
+ key = k
+ values = ExternalListOfList([v])
+ if values is not None:
+ yield (key, values)
class ExternalGroupBy(ExternalMerger):
@@ -744,7 +753,7 @@ class ExternalGroupBy(ExternalMerger):
# above limit at the first time.
# open all the files for writing
- streams = [open(os.path.join(path, str(i)), 'w')
+ streams = [open(os.path.join(path, str(i)), 'wb')
for i in range(self.partitions)]
# If the number of keys is small, then the overhead of sort is small
@@ -756,7 +765,7 @@ class ExternalGroupBy(ExternalMerger):
h = self._partition(k)
self.serializer.dump_stream([(k, self.data[k])], streams[h])
else:
- for k, v in self.data.iteritems():
+ for k, v in self.data.items():
h = self._partition(k)
self.serializer.dump_stream([(k, v)], streams[h])
@@ -771,14 +780,14 @@ class ExternalGroupBy(ExternalMerger):
else:
for i in range(self.partitions):
p = os.path.join(path, str(i))
- with open(p, "w") as f:
+ with open(p, "wb") as f:
# dump items in batch
if self._sorted:
# sort by key only (stable)
- sorted_items = sorted(self.pdata[i].iteritems(), key=operator.itemgetter(0))
+ sorted_items = sorted(self.pdata[i].items(), key=operator.itemgetter(0))
self.serializer.dump_stream(sorted_items, f)
else:
- self.serializer.dump_stream(self.pdata[i].iteritems(), f)
+ self.serializer.dump_stream(self.pdata[i].items(), f)
self.pdata[i].clear()
DiskBytesSpilled += os.path.getsize(p)
@@ -792,7 +801,7 @@ class ExternalGroupBy(ExternalMerger):
# if the memory can not hold all the partition,
# then use sort based merge. Because of compression,
# the data on disks will be much smaller than needed memory
- if (size >> 20) >= self.memory_limit / 10:
+ if size >= self.memory_limit << 17: # * 1M / 8
return self._merge_sorted_items(index)
self.data = {}
@@ -800,15 +809,18 @@ class ExternalGroupBy(ExternalMerger):
path = self._get_spill_dir(j)
p = os.path.join(path, str(index))
# do not check memory during merging
- self.mergeCombiners(self.serializer.load_stream(open(p)), 0)
- return self.data.iteritems()
+ with open(p, "rb") as f:
+ self.mergeCombiners(self.serializer.load_stream(f), 0)
+ return self.data.items()
def _merge_sorted_items(self, index):
""" load a partition from disk, then sort and group by key """
def load_partition(j):
path = self._get_spill_dir(j)
p = os.path.join(path, str(index))
- return self.serializer.load_stream(open(p, 'r', 65536))
+ with open(p, 'rb', 65536) as f:
+ for v in self.serializer.load_stream(f):
+ yield v
disk_items = [load_partition(j) for j in range(self.spills)]
http://git-wip-us.apache.org/repos/asf/spark/blob/04e44b37/python/pyspark/sql/__init__.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/__init__.py b/python/pyspark/sql/__init__.py
index 65abb24..6d54b9e 100644
--- a/python/pyspark/sql/__init__.py
+++ b/python/pyspark/sql/__init__.py
@@ -37,9 +37,22 @@ Important classes of Spark SQL and DataFrames:
- L{types}
List of data types available.
"""
+from __future__ import absolute_import
+
+# fix the module name conflict for Python 3+
+import sys
+from . import _types as types
+modname = __name__ + '.types'
+types.__name__ = modname
+# update the __module__ for all objects, make them picklable
+for v in types.__dict__.values():
+ if hasattr(v, "__module__") and v.__module__.endswith('._types'):
+ v.__module__ = modname
+sys.modules[modname] = types
+del modname, sys
-from pyspark.sql.context import SQLContext, HiveContext
from pyspark.sql.types import Row
+from pyspark.sql.context import SQLContext, HiveContext
from pyspark.sql.dataframe import DataFrame, GroupedData, Column, SchemaRDD, DataFrameNaFunctions
__all__ = [
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org