You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by me...@apache.org on 2014/10/16 23:56:55 UTC

git commit: [SPARK-3971] [MLLib] [PySpark] hotfix: Customized pickler should work in cluster mode

Repository: spark
Updated Branches:
  refs/heads/master 4c589cac4 -> 091d32c52


[SPARK-3971] [MLLib] [PySpark] hotfix: Customized pickler should work in cluster mode

Customized pickler should be registered before unpickling, but in executor, there is no way to register the picklers before run the tasks.

So, we need to register the picklers in the tasks itself, duplicate the javaToPython() and pythonToJava() in MLlib, call SerDe.initialize() before pickling or unpickling.

Author: Davies Liu <da...@gmail.com>

Closes #2830 from davies/fix_pickle and squashes the following commits:

0c85fb9 [Davies Liu] revert the privacy change
6b94e15 [Davies Liu] use JavaConverters instead of JavaConversions
0f02050 [Davies Liu] hotfix: Customized pickler does not work in cluster


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/091d32c5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/091d32c5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/091d32c5

Branch: refs/heads/master
Commit: 091d32c52e9d73da95896016c1d920e89858abfa
Parents: 4c589ca
Author: Davies Liu <da...@gmail.com>
Authored: Thu Oct 16 14:56:50 2014 -0700
Committer: Xiangrui Meng <me...@databricks.com>
Committed: Thu Oct 16 14:56:50 2014 -0700

----------------------------------------------------------------------
 .../org/apache/spark/api/python/PythonRDD.scala |  7 ++-
 .../org/apache/spark/api/python/SerDeUtil.scala | 14 +++++-
 .../spark/mllib/api/python/PythonMLLibAPI.scala | 52 ++++++++++++++++++--
 python/pyspark/context.py                       |  2 -
 python/pyspark/mllib/classification.py          |  4 +-
 python/pyspark/mllib/clustering.py              |  4 +-
 python/pyspark/mllib/feature.py                 |  5 +-
 python/pyspark/mllib/linalg.py                  | 13 +++++
 python/pyspark/mllib/random.py                  |  2 +-
 python/pyspark/mllib/recommendation.py          |  7 +--
 python/pyspark/mllib/regression.py              |  4 +-
 python/pyspark/mllib/stat.py                    |  7 +--
 python/pyspark/mllib/tree.py                    |  8 +--
 python/pyspark/mllib/util.py                    |  6 +--
 14 files changed, 101 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/091d32c5/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 4acbdf9..29ca751 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -23,6 +23,7 @@ import java.nio.charset.Charset
 import java.util.{List => JList, ArrayList => JArrayList, Map => JMap, Collections}
 
 import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.language.existentials
 
@@ -746,6 +747,7 @@ private[spark] object PythonRDD extends Logging {
   def pythonToJavaMap(pyRDD: JavaRDD[Array[Byte]]): JavaRDD[Map[String, _]] = {
     pyRDD.rdd.mapPartitions { iter =>
       val unpickle = new Unpickler
+      SerDeUtil.initialize()
       iter.flatMap { row =>
         unpickle.loads(row) match {
           // in case of objects are pickled in batch mode
@@ -785,7 +787,7 @@ private[spark] object PythonRDD extends Logging {
     }.toJavaRDD()
   }
 
-  private class AutoBatchedPickler(iter: Iterator[Any]) extends Iterator[Array[Byte]] {
+  private[spark] class AutoBatchedPickler(iter: Iterator[Any]) extends Iterator[Array[Byte]] {
     private val pickle = new Pickler()
     private var batch = 1
     private val buffer = new mutable.ArrayBuffer[Any]
@@ -822,11 +824,12 @@ private[spark] object PythonRDD extends Logging {
     */
   def pythonToJava(pyRDD: JavaRDD[Array[Byte]], batched: Boolean): JavaRDD[Any] = {
     pyRDD.rdd.mapPartitions { iter =>
+      SerDeUtil.initialize()
       val unpickle = new Unpickler
       iter.flatMap { row =>
         val obj = unpickle.loads(row)
         if (batched) {
-          obj.asInstanceOf[JArrayList[_]]
+          obj.asInstanceOf[JArrayList[_]].asScala
         } else {
           Seq(obj)
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/091d32c5/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
index 7903457..ebdc353 100644
--- a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala
@@ -29,7 +29,7 @@ import org.apache.spark.{Logging, SparkException}
 import org.apache.spark.rdd.RDD
 
 /** Utilities for serialization / deserialization between Python and Java, using Pickle. */
-private[python] object SerDeUtil extends Logging {
+private[spark] object SerDeUtil extends Logging {
   // Unpickle array.array generated by Python 2.6
   class ArrayConstructor extends net.razorvine.pickle.objects.ArrayConstructor {
     //  /* Description of types */
@@ -76,9 +76,18 @@ private[python] object SerDeUtil extends Logging {
     }
   }
 
+  private var initialized = false
+  // This should be called before trying to unpickle array.array from Python
+  // In cluster mode, this should be put in closure
   def initialize() = {
-    Unpickler.registerConstructor("array", "array", new ArrayConstructor())
+    synchronized{
+      if (!initialized) {
+        Unpickler.registerConstructor("array", "array", new ArrayConstructor())
+        initialized = true
+      }
+    }
   }
+  initialize()
 
   private def checkPickle(t: (Any, Any)): (Boolean, Boolean) = {
     val pickle = new Pickler
@@ -143,6 +152,7 @@ private[python] object SerDeUtil extends Logging {
         obj.asInstanceOf[Array[_]].length == 2
     }
     pyRDD.mapPartitions { iter =>
+      initialize()
       val unpickle = new Unpickler
       val unpickled =
         if (batchSerialized) {

http://git-wip-us.apache.org/repos/asf/spark/blob/091d32c5/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
index f7251e6..9a10017 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.mllib.api.python
 
 import java.io.OutputStream
+import java.util.{ArrayList => JArrayList}
 
 import scala.collection.JavaConverters._
 import scala.language.existentials
@@ -27,6 +28,7 @@ import net.razorvine.pickle._
 
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
+import org.apache.spark.api.python.{PythonRDD, SerDeUtil}
 import org.apache.spark.mllib.classification._
 import org.apache.spark.mllib.clustering._
 import org.apache.spark.mllib.feature.Word2Vec
@@ -639,13 +641,24 @@ private[spark] object SerDe extends Serializable {
     }
   }
 
+  var initialized = false
+  // This should be called before trying to serialize any above classes
+  // In cluster mode, this should be put in the closure
   def initialize(): Unit = {
-    new DenseVectorPickler().register()
-    new DenseMatrixPickler().register()
-    new SparseVectorPickler().register()
-    new LabeledPointPickler().register()
-    new RatingPickler().register()
+    SerDeUtil.initialize()
+    synchronized {
+      if (!initialized) {
+        new DenseVectorPickler().register()
+        new DenseMatrixPickler().register()
+        new SparseVectorPickler().register()
+        new LabeledPointPickler().register()
+        new RatingPickler().register()
+        initialized = true
+      }
+    }
   }
+  // will not called in Executor automatically
+  initialize()
 
   def dumps(obj: AnyRef): Array[Byte] = {
     new Pickler().dumps(obj)
@@ -659,4 +672,33 @@ private[spark] object SerDe extends Serializable {
   def asTupleRDD(rdd: RDD[Array[Any]]): RDD[(Int, Int)] = {
     rdd.map(x => (x(0).asInstanceOf[Int], x(1).asInstanceOf[Int]))
   }
+
+  /**
+   * Convert an RDD of Java objects to an RDD of serialized Python objects, that is usable by
+   * PySpark.
+   */
+  def javaToPython(jRDD: JavaRDD[Any]): JavaRDD[Array[Byte]] = {
+    jRDD.rdd.mapPartitions { iter =>
+      initialize()  // let it called in executor
+      new PythonRDD.AutoBatchedPickler(iter)
+    }
+  }
+
+  /**
+   * Convert an RDD of serialized Python objects to RDD of objects, that is usable by PySpark.
+   */
+  def pythonToJava(pyRDD: JavaRDD[Array[Byte]], batched: Boolean): JavaRDD[Any] = {
+    pyRDD.rdd.mapPartitions { iter =>
+      initialize()  // let it called in executor
+      val unpickle = new Unpickler
+      iter.flatMap { row =>
+        val obj = unpickle.loads(row)
+        if (batched) {
+          obj.asInstanceOf[JArrayList[_]].asScala
+        } else {
+          Seq(obj)
+        }
+      }
+    }.toJavaRDD()
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/091d32c5/python/pyspark/context.py
----------------------------------------------------------------------
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index 89d2e2e..8d27ccb 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -215,8 +215,6 @@ class SparkContext(object):
                 SparkContext._gateway = gateway or launch_gateway()
                 SparkContext._jvm = SparkContext._gateway.jvm
                 SparkContext._writeToFile = SparkContext._jvm.PythonRDD.writeToFile
-                SparkContext._jvm.SerDeUtil.initialize()
-                SparkContext._jvm.SerDe.initialize()
 
             if instance:
                 if (SparkContext._active_spark_context and

http://git-wip-us.apache.org/repos/asf/spark/blob/091d32c5/python/pyspark/mllib/classification.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/classification.py b/python/pyspark/mllib/classification.py
index cd43982..e295c9d 100644
--- a/python/pyspark/mllib/classification.py
+++ b/python/pyspark/mllib/classification.py
@@ -21,7 +21,7 @@ import numpy
 from numpy import array
 
 from pyspark import SparkContext, PickleSerializer
-from pyspark.mllib.linalg import SparseVector, _convert_to_vector
+from pyspark.mllib.linalg import SparseVector, _convert_to_vector, _to_java_object_rdd
 from pyspark.mllib.regression import LabeledPoint, LinearModel, _regression_train_wrapper
 
 
@@ -244,7 +244,7 @@ class NaiveBayes(object):
         :param lambda_: The smoothing parameter
         """
         sc = data.context
-        jlist = sc._jvm.PythonMLLibAPI().trainNaiveBayes(data._to_java_object_rdd(), lambda_)
+        jlist = sc._jvm.PythonMLLibAPI().trainNaiveBayes(_to_java_object_rdd(data), lambda_)
         labels, pi, theta = PickleSerializer().loads(str(sc._jvm.SerDe.dumps(jlist)))
         return NaiveBayesModel(labels.toArray(), pi.toArray(), numpy.array(theta))
 

http://git-wip-us.apache.org/repos/asf/spark/blob/091d32c5/python/pyspark/mllib/clustering.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py
index 12c5602..5ee7997 100644
--- a/python/pyspark/mllib/clustering.py
+++ b/python/pyspark/mllib/clustering.py
@@ -17,7 +17,7 @@
 
 from pyspark import SparkContext
 from pyspark.serializers import PickleSerializer, AutoBatchedSerializer
-from pyspark.mllib.linalg import SparseVector, _convert_to_vector
+from pyspark.mllib.linalg import SparseVector, _convert_to_vector, _to_java_object_rdd
 
 __all__ = ['KMeansModel', 'KMeans']
 
@@ -85,7 +85,7 @@ class KMeans(object):
         # cache serialized data to avoid objects over head in JVM
         cached = rdd.map(_convert_to_vector)._reserialize(AutoBatchedSerializer(ser)).cache()
         model = sc._jvm.PythonMLLibAPI().trainKMeansModel(
-            cached._to_java_object_rdd(), k, maxIterations, runs, initializationMode)
+            _to_java_object_rdd(cached), k, maxIterations, runs, initializationMode)
         bytes = sc._jvm.SerDe.dumps(model.clusterCenters())
         centers = ser.loads(str(bytes))
         return KMeansModel([c.toArray() for c in centers])

http://git-wip-us.apache.org/repos/asf/spark/blob/091d32c5/python/pyspark/mllib/feature.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/feature.py b/python/pyspark/mllib/feature.py
index f4cbf31..b5a3f22 100644
--- a/python/pyspark/mllib/feature.py
+++ b/python/pyspark/mllib/feature.py
@@ -19,8 +19,7 @@
 Python package for feature in MLlib.
 """
 from pyspark.serializers import PickleSerializer, AutoBatchedSerializer
-
-from pyspark.mllib.linalg import _convert_to_vector
+from pyspark.mllib.linalg import _convert_to_vector, _to_java_object_rdd
 
 __all__ = ['Word2Vec', 'Word2VecModel']
 
@@ -176,7 +175,7 @@ class Word2Vec(object):
         seed = self.seed
 
         model = sc._jvm.PythonMLLibAPI().trainWord2Vec(
-            data._to_java_object_rdd(), vectorSize,
+            _to_java_object_rdd(data), vectorSize,
             learningRate, numPartitions, numIterations, seed)
         return Word2VecModel(sc, model)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/091d32c5/python/pyspark/mllib/linalg.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/linalg.py b/python/pyspark/mllib/linalg.py
index 24c5480..773d8d3 100644
--- a/python/pyspark/mllib/linalg.py
+++ b/python/pyspark/mllib/linalg.py
@@ -29,6 +29,8 @@ import copy_reg
 
 import numpy as np
 
+from pyspark.serializers import AutoBatchedSerializer, PickleSerializer
+
 __all__ = ['Vector', 'DenseVector', 'SparseVector', 'Vectors']
 
 
@@ -50,6 +52,17 @@ except:
     _have_scipy = False
 
 
+# this will call the MLlib version of pythonToJava()
+def _to_java_object_rdd(rdd):
+    """ Return an JavaRDD of Object by unpickling
+
+    It will convert each Python object into Java object by Pyrolite, whenever the
+    RDD is serialized in batch or not.
+    """
+    rdd = rdd._reserialize(AutoBatchedSerializer(PickleSerializer()))
+    return rdd.ctx._jvm.SerDe.pythonToJava(rdd._jrdd, True)
+
+
 def _convert_to_vector(l):
     if isinstance(l, Vector):
         return l

http://git-wip-us.apache.org/repos/asf/spark/blob/091d32c5/python/pyspark/mllib/random.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/random.py b/python/pyspark/mllib/random.py
index a787e4d..73baba4 100644
--- a/python/pyspark/mllib/random.py
+++ b/python/pyspark/mllib/random.py
@@ -32,7 +32,7 @@ def serialize(f):
     @wraps(f)
     def func(sc, *a, **kw):
         jrdd = f(sc, *a, **kw)
-        return RDD(sc._jvm.PythonRDD.javaToPython(jrdd), sc,
+        return RDD(sc._jvm.SerDe.javaToPython(jrdd), sc,
                    BatchedSerializer(PickleSerializer(), 1024))
     return func
 

http://git-wip-us.apache.org/repos/asf/spark/blob/091d32c5/python/pyspark/mllib/recommendation.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/recommendation.py b/python/pyspark/mllib/recommendation.py
index 59c1c5f..17f96b8 100644
--- a/python/pyspark/mllib/recommendation.py
+++ b/python/pyspark/mllib/recommendation.py
@@ -18,6 +18,7 @@
 from pyspark import SparkContext
 from pyspark.serializers import PickleSerializer, AutoBatchedSerializer
 from pyspark.rdd import RDD
+from pyspark.mllib.linalg import _to_java_object_rdd
 
 __all__ = ['MatrixFactorizationModel', 'ALS']
 
@@ -77,9 +78,9 @@ class MatrixFactorizationModel(object):
             first = tuple(map(int, first))
         assert all(type(x) is int for x in first), "user and product in user_product shoul be int"
         sc = self._context
-        tuplerdd = sc._jvm.SerDe.asTupleRDD(user_product._to_java_object_rdd().rdd())
+        tuplerdd = sc._jvm.SerDe.asTupleRDD(_to_java_object_rdd(user_product).rdd())
         jresult = self._java_model.predict(tuplerdd).toJavaRDD()
-        return RDD(sc._jvm.PythonRDD.javaToPython(jresult), sc,
+        return RDD(sc._jvm.SerDe.javaToPython(jresult), sc,
                    AutoBatchedSerializer(PickleSerializer()))
 
 
@@ -97,7 +98,7 @@ class ALS(object):
         # serialize them by AutoBatchedSerializer before cache to reduce the
         # objects overhead in JVM
         cached = ratings._reserialize(AutoBatchedSerializer(PickleSerializer())).cache()
-        return cached._to_java_object_rdd()
+        return _to_java_object_rdd(cached)
 
     @classmethod
     def train(cls, ratings, rank, iterations=5, lambda_=0.01, blocks=-1):

http://git-wip-us.apache.org/repos/asf/spark/blob/091d32c5/python/pyspark/mllib/regression.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/regression.py b/python/pyspark/mllib/regression.py
index 12b322a..93e17fa 100644
--- a/python/pyspark/mllib/regression.py
+++ b/python/pyspark/mllib/regression.py
@@ -19,8 +19,8 @@ import numpy as np
 from numpy import array
 
 from pyspark import SparkContext
-from pyspark.mllib.linalg import SparseVector, _convert_to_vector
 from pyspark.serializers import PickleSerializer, AutoBatchedSerializer
+from pyspark.mllib.linalg import SparseVector, _convert_to_vector, _to_java_object_rdd
 
 __all__ = ['LabeledPoint', 'LinearModel', 'LinearRegressionModel', 'RidgeRegressionModel',
            'LinearRegressionWithSGD', 'LassoWithSGD', 'RidgeRegressionWithSGD']
@@ -131,7 +131,7 @@ def _regression_train_wrapper(sc, train_func, modelClass, data, initial_weights)
     # use AutoBatchedSerializer before cache to reduce the memory
     # overhead in JVM
     cached = data._reserialize(AutoBatchedSerializer(ser)).cache()
-    ans = train_func(cached._to_java_object_rdd(), initial_bytes)
+    ans = train_func(_to_java_object_rdd(cached), initial_bytes)
     assert len(ans) == 2, "JVM call result had unexpected length"
     weights = ser.loads(str(ans[0]))
     return modelClass(weights, ans[1])

http://git-wip-us.apache.org/repos/asf/spark/blob/091d32c5/python/pyspark/mllib/stat.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/stat.py b/python/pyspark/mllib/stat.py
index b9de090..a6019da 100644
--- a/python/pyspark/mllib/stat.py
+++ b/python/pyspark/mllib/stat.py
@@ -22,6 +22,7 @@ Python package for statistical functions in MLlib.
 from functools import wraps
 
 from pyspark import PickleSerializer
+from pyspark.mllib.linalg import _to_java_object_rdd
 
 
 __all__ = ['MultivariateStatisticalSummary', 'Statistics']
@@ -106,7 +107,7 @@ class Statistics(object):
         array([ 2.,  0.,  0., -2.])
         """
         sc = rdd.ctx
-        jrdd = rdd._to_java_object_rdd()
+        jrdd = _to_java_object_rdd(rdd)
         cStats = sc._jvm.PythonMLLibAPI().colStats(jrdd)
         return MultivariateStatisticalSummary(sc, cStats)
 
@@ -162,14 +163,14 @@ class Statistics(object):
         if type(y) == str:
             raise TypeError("Use 'method=' to specify method name.")
 
-        jx = x._to_java_object_rdd()
+        jx = _to_java_object_rdd(x)
         if not y:
             resultMat = sc._jvm.PythonMLLibAPI().corr(jx, method)
             bytes = sc._jvm.SerDe.dumps(resultMat)
             ser = PickleSerializer()
             return ser.loads(str(bytes)).toArray()
         else:
-            jy = y._to_java_object_rdd()
+            jy = _to_java_object_rdd(y)
             return sc._jvm.PythonMLLibAPI().corr(jx, jy, method)
 
 

http://git-wip-us.apache.org/repos/asf/spark/blob/091d32c5/python/pyspark/mllib/tree.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/tree.py b/python/pyspark/mllib/tree.py
index 5d7abfb..0938eeb 100644
--- a/python/pyspark/mllib/tree.py
+++ b/python/pyspark/mllib/tree.py
@@ -19,7 +19,7 @@ from py4j.java_collections import MapConverter
 
 from pyspark import SparkContext, RDD
 from pyspark.serializers import BatchedSerializer, PickleSerializer
-from pyspark.mllib.linalg import Vector, _convert_to_vector
+from pyspark.mllib.linalg import Vector, _convert_to_vector, _to_java_object_rdd
 from pyspark.mllib.regression import LabeledPoint
 
 __all__ = ['DecisionTreeModel', 'DecisionTree']
@@ -61,8 +61,8 @@ class DecisionTreeModel(object):
                 return self._sc.parallelize([])
             if not isinstance(first[0], Vector):
                 x = x.map(_convert_to_vector)
-            jPred = self._java_model.predict(x._to_java_object_rdd()).toJavaRDD()
-            jpyrdd = self._sc._jvm.PythonRDD.javaToPython(jPred)
+            jPred = self._java_model.predict(_to_java_object_rdd(x)).toJavaRDD()
+            jpyrdd = self._sc._jvm.SerDe.javaToPython(jPred)
             return RDD(jpyrdd, self._sc, BatchedSerializer(ser, 1024))
 
         else:
@@ -104,7 +104,7 @@ class DecisionTree(object):
         first = data.first()
         assert isinstance(first, LabeledPoint), "the data should be RDD of LabeledPoint"
         sc = data.context
-        jrdd = data._to_java_object_rdd()
+        jrdd = _to_java_object_rdd(data)
         cfiMap = MapConverter().convert(categoricalFeaturesInfo,
                                         sc._gateway._gateway_client)
         model = sc._jvm.PythonMLLibAPI().trainDecisionTreeModel(

http://git-wip-us.apache.org/repos/asf/spark/blob/091d32c5/python/pyspark/mllib/util.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/util.py b/python/pyspark/mllib/util.py
index 1357fd4..84b39a4 100644
--- a/python/pyspark/mllib/util.py
+++ b/python/pyspark/mllib/util.py
@@ -19,7 +19,7 @@ import numpy as np
 import warnings
 
 from pyspark.rdd import RDD
-from pyspark.serializers import BatchedSerializer, PickleSerializer
+from pyspark.serializers import AutoBatchedSerializer, PickleSerializer
 from pyspark.mllib.linalg import Vectors, SparseVector, _convert_to_vector
 from pyspark.mllib.regression import LabeledPoint
 
@@ -174,8 +174,8 @@ class MLUtils(object):
         """
         minPartitions = minPartitions or min(sc.defaultParallelism, 2)
         jrdd = sc._jvm.PythonMLLibAPI().loadLabeledPoints(sc._jsc, path, minPartitions)
-        jpyrdd = sc._jvm.PythonRDD.javaToPython(jrdd)
-        return RDD(jpyrdd, sc, BatchedSerializer(PickleSerializer()))
+        jpyrdd = sc._jvm.SerDe.javaToPython(jrdd)
+        return RDD(jpyrdd, sc, AutoBatchedSerializer(PickleSerializer()))
 
 
 def _test():


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org