You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by davies <gi...@git.apache.org> on 2014/09/13 09:27:16 UTC

[GitHub] spark pull request: [SPARK-3491] [WIP] [MLlib] [PySpark] use pickl...

GitHub user davies opened a pull request:

    https://github.com/apache/spark/pull/2378

    [SPARK-3491] [WIP] [MLlib] [PySpark] use pickle to serialize data in MLlib

    Currently, we serialize the data between JVM and Python case by case manually, this cannot scale to support so many APIs in MLlib.
    
    This patch will try to address this problem by serialize the data using pickle protocol, using Pyrolite library to serialize/deserialize in JVM. Pickle protocol can be easily extended to support customized class.
    
    In the first step, it can support Double, DenseVector, SparseVector, DenseMatrix, LabeledPoint, Rating, Tuple2 now, the recommendation module had been refactor to use this new protocol.
    
    Later, I will refactor all others to use this protocol.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/davies/spark pickle_mllib

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/2378.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #2378
    
----
commit b30ef35ec7830cee08b4f8d692da26d98cac70e8
Author: Davies Liu <da...@gmail.com>
Date:   2014-09-13T07:18:33Z

    use pickle to serialize data for mllib/recommendation

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...

Posted by staple <gi...@git.apache.org>.
Github user staple commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2378#discussion_r17687682
  
    --- Diff: python/pyspark/mllib/recommendation.py ---
    @@ -54,34 +64,51 @@ def __del__(self):
         def predict(self, user, product):
             return self._java_model.predict(user, product)
     
    -    def predictAll(self, usersProducts):
    -        usersProductsJRDD = _get_unmangled_rdd(usersProducts, _serialize_tuple)
    -        return RDD(self._java_model.predict(usersProductsJRDD._jrdd),
    -                   self._context, RatingDeserializer())
    +    def predictAll(self, user_product):
    +        assert isinstance(user_product, RDD), "user_product should be RDD of (user, product)"
    +        sc = self._context
    +        tuplerdd = sc._jvm.SerDe.asTupleRDD(user_product._to_java_object_rdd().rdd())
    +        jresult = self._java_model.predict(tuplerdd).toJavaRDD()
    +        return RDD(sc._jvm.PythonRDD.javaToPython(jresult), sc,
    +                   AutoBatchedSerializer(PickleSerializer()))
     
     
     class ALS(object):
     
         @classmethod
    +    def _prepare(cls, ratings):
    +        assert isinstance(ratings, RDD), "ratings should be RDD"
    +        first = ratings.first()
    +        if not isinstance(first, Rating):
    +            if isinstance(first, (tuple, list)):
    +                ratings = ratings.map(lambda x: Rating(*x))
    +            else:
    +                raise ValueError("rating should be RDD of Rating or tuple/list")
    +        # serialize them by AutoBatchedSerializer before cache to reduce the
    +        # objects overhead in JVM
    +        cached = ratings._reserialize(AutoBatchedSerializer(PickleSerializer())).cache()
    --- End diff --
    
    FWIW it looks like the DecisionTree learner can do multiple reads (of varying types) of its input. In DecisionTree.train, DecisionTreeMetadata.buildMetadata does a count() on the input, DecisionTree.findSplitsBins can do a sampled read, and then TreePoint.convertToTreeRDD will do a mapped read that gets persisted. I'm not knowledgable enough to know how expensive that initial count() will be without further investigation. But I think for DecisionTree the suggestion was not to cache before learning.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [WIP] [MLlib] [PySpark] use pickl...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2378#discussion_r17574784
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala ---
    @@ -472,214 +452,140 @@ class PythonMLLibAPI extends Serializable {
           numRows: Long,
           numCols: Int,
           numPartitions: java.lang.Integer,
    -      seed: java.lang.Long): JavaRDD[Array[Byte]] = {
    +      seed: java.lang.Long): JavaRDD[Vector] = {
         val parts = getNumPartitionsOrDefault(numPartitions, jsc)
         val s = getSeedOrDefault(seed)
    -    RG.poissonVectorRDD(jsc.sc, mean, numRows, numCols, parts, s).map(SerDe.serializeDoubleVector)
    +    RG.poissonVectorRDD(jsc.sc, mean, numRows, numCols, parts, s)
       }
     
     }
     
     /**
    - * :: DeveloperApi ::
    - * MultivariateStatisticalSummary with Vector fields serialized.
    + * SerDe utility functions for PythonMLLibAPI.
      */
    -@DeveloperApi
    -class MultivariateStatisticalSummarySerialized(val summary: MultivariateStatisticalSummary)
    -  extends Serializable {
    +private[spark] object SerDe extends Serializable {
     
    -  def mean: Array[Byte] = SerDe.serializeDoubleVector(summary.mean)
    +  private[python] def reduce_object(out: OutputStream, pickler: Pickler,
    +                                    module: String, name: String, objects: Object*) = {
    +    out.write(Opcodes.GLOBAL)
    +    out.write((module + "\n" + name + "\n").getBytes)
    +    out.write(Opcodes.MARK)
    +    objects.foreach(pickler.save(_))
    +    out.write(Opcodes.TUPLE)
    +    out.write(Opcodes.REDUCE)
    +  }
     
    -  def variance: Array[Byte] = SerDe.serializeDoubleVector(summary.variance)
    +  private[python] class DenseVectorPickler extends IObjectPickler {
    +    def pickle(obj: Object, out: OutputStream, pickler: Pickler) = {
    +      val vector: DenseVector = obj.asInstanceOf[DenseVector]
    +      reduce_object(out, pickler, "pyspark.mllib.linalg", "DenseVector", vector.toArray)
    +    }
    +  }
     
    -  def count: Long = summary.count
    +  private[python] class DenseVectorConstructor extends IObjectConstructor {
    +    def construct(args: Array[Object]) :Object = {
    +      require(args.length == 1)
    +      new DenseVector(args(0).asInstanceOf[Array[Double]])
    +    }
    +  }
    +
    +  private[python] class DenseMatrixPickler extends IObjectPickler {
    +    def pickle(obj: Object, out: OutputStream, pickler: Pickler) = {
    +      val m: DenseMatrix = obj.asInstanceOf[DenseMatrix]
    +      reduce_object(out, pickler, "pyspark.mllib.linalg", "DenseMatrix",
    +        m.numRows.asInstanceOf[Object], m.numCols.asInstanceOf[Object], m.values)
    +    }
    +  }
     
    -  def numNonzeros: Array[Byte] = SerDe.serializeDoubleVector(summary.numNonzeros)
    +  private[python] class DenseMatrixConstructor extends IObjectConstructor {
    +    def construct(args: Array[Object]) :Object = {
    +      require(args.length == 3)
    +      new DenseMatrix(args(0).asInstanceOf[Int], args(1).asInstanceOf[Int],
    +        args(2).asInstanceOf[Array[Double]])
    +    }
    +  }
     
    -  def max: Array[Byte] = SerDe.serializeDoubleVector(summary.max)
    +  private[python] class SparseVectorPickler extends IObjectPickler {
    +    def pickle(obj: Object, out: OutputStream, pickler: Pickler) = {
    +      val v: SparseVector = obj.asInstanceOf[SparseVector]
    +      reduce_object(out, pickler, "pyspark.mllib.linalg", "SparseVector",
    +        v.size.asInstanceOf[Object], v.indices, v.values)
    +    }
    +  }
     
    -  def min: Array[Byte] = SerDe.serializeDoubleVector(summary.min)
    -}
    +  private[python] class SparseVectorConstructor extends IObjectConstructor {
    +    def construct(args: Array[Object]) :Object = {
    +      require(args.length == 3)
    +      new SparseVector(args(0).asInstanceOf[Int], args(1).asInstanceOf[Array[Int]],
    +        args(2).asInstanceOf[Array[Double]])
    +    }
    +  }
     
    -/**
    - * SerDe utility functions for PythonMLLibAPI.
    - */
    -private[spark] object SerDe extends Serializable {
    -  private val DENSE_VECTOR_MAGIC: Byte = 1
    -  private val SPARSE_VECTOR_MAGIC: Byte = 2
    -  private val DENSE_MATRIX_MAGIC: Byte = 3
    -  private val LABELED_POINT_MAGIC: Byte = 4
    -
    -  private[python] def deserializeDoubleVector(bytes: Array[Byte], offset: Int = 0): Vector = {
    -    require(bytes.length - offset >= 5, "Byte array too short")
    -    val magic = bytes(offset)
    -    if (magic == DENSE_VECTOR_MAGIC) {
    -      deserializeDenseVector(bytes, offset)
    -    } else if (magic == SPARSE_VECTOR_MAGIC) {
    -      deserializeSparseVector(bytes, offset)
    -    } else {
    -      throw new IllegalArgumentException("Magic " + magic + " is wrong.")
    +  private[python] class LabeledPointPickler extends IObjectPickler {
    +    def pickle(obj: Object, out: OutputStream, pickler: Pickler) = {
    +      val point: LabeledPoint = obj.asInstanceOf[LabeledPoint]
    +      reduce_object(out, pickler, "pyspark.mllib.regression", "LabeledPoint",
    +        point.label.asInstanceOf[Object], point.features)
         }
       }
     
    -  private[python] def deserializeDouble(bytes: Array[Byte], offset: Int = 0): Double = {
    -    require(bytes.length - offset == 8, "Wrong size byte array for Double")
    -    val bb = ByteBuffer.wrap(bytes, offset, bytes.length - offset)
    -    bb.order(ByteOrder.nativeOrder())
    -    bb.getDouble
    -  }
    -
    -  private[python] def deserializeDenseVector(bytes: Array[Byte], offset: Int = 0): Vector = {
    -    val packetLength = bytes.length - offset
    -    require(packetLength >= 5, "Byte array too short")
    -    val bb = ByteBuffer.wrap(bytes, offset, bytes.length - offset)
    -    bb.order(ByteOrder.nativeOrder())
    -    val magic = bb.get()
    -    require(magic == DENSE_VECTOR_MAGIC, "Invalid magic: " + magic)
    -    val length = bb.getInt()
    -    require (packetLength == 5 + 8 * length, "Invalid packet length: " + packetLength)
    -    val db = bb.asDoubleBuffer()
    -    val ans = new Array[Double](length.toInt)
    -    db.get(ans)
    -    Vectors.dense(ans)
    -  }
    -
    -  private[python] def deserializeSparseVector(bytes: Array[Byte], offset: Int = 0): Vector = {
    -    val packetLength = bytes.length - offset
    -    require(packetLength >= 9, "Byte array too short")
    -    val bb = ByteBuffer.wrap(bytes, offset, bytes.length - offset)
    -    bb.order(ByteOrder.nativeOrder())
    -    val magic = bb.get()
    -    require(magic == SPARSE_VECTOR_MAGIC, "Invalid magic: " + magic)
    -    val size = bb.getInt()
    -    val nonZeros = bb.getInt()
    -    require (packetLength == 9 + 12 * nonZeros, "Invalid packet length: " + packetLength)
    -    val ib = bb.asIntBuffer()
    -    val indices = new Array[Int](nonZeros)
    -    ib.get(indices)
    -    bb.position(bb.position() + 4 * nonZeros)
    -    val db = bb.asDoubleBuffer()
    -    val values = new Array[Double](nonZeros)
    -    db.get(values)
    -    Vectors.sparse(size, indices, values)
    +  private[python] class LabeledPointConstructor extends IObjectConstructor {
    +    def construct(args: Array[Object]) :Object = {
    +      if (args.length != 2) {
    +        throw new PickleException("should be 2")
    +      }
    +      new LabeledPoint(args(0).asInstanceOf[Double], args(1).asInstanceOf[Vector])
    +    }
       }
     
       /**
    -   * Returns an 8-byte array for the input Double.
    -   *
    -   * Note: we currently do not use a magic byte for double for storage efficiency.
    -   * This should be reconsidered when we add Ser/De for other 8-byte types (e.g. Long), for safety.
    -   * The corresponding deserializer, deserializeDouble, needs to be modified as well if the
    -   * serialization scheme changes.
    +   * Pickle Rating
        */
    -  private[python] def serializeDouble(double: Double): Array[Byte] = {
    -    val bytes = new Array[Byte](8)
    -    val bb = ByteBuffer.wrap(bytes)
    -    bb.order(ByteOrder.nativeOrder())
    -    bb.putDouble(double)
    -    bytes
    -  }
    -
    -  private[python] def serializeDenseVector(doubles: Array[Double]): Array[Byte] = {
    -    val len = doubles.length
    -    val bytes = new Array[Byte](5 + 8 * len)
    -    val bb = ByteBuffer.wrap(bytes)
    -    bb.order(ByteOrder.nativeOrder())
    -    bb.put(DENSE_VECTOR_MAGIC)
    -    bb.putInt(len)
    -    val db = bb.asDoubleBuffer()
    -    db.put(doubles)
    -    bytes
    -  }
    -
    -  private[python] def serializeSparseVector(vector: SparseVector): Array[Byte] = {
    -    val nonZeros = vector.indices.length
    -    val bytes = new Array[Byte](9 + 12 * nonZeros)
    -    val bb = ByteBuffer.wrap(bytes)
    -    bb.order(ByteOrder.nativeOrder())
    -    bb.put(SPARSE_VECTOR_MAGIC)
    -    bb.putInt(vector.size)
    -    bb.putInt(nonZeros)
    -    val ib = bb.asIntBuffer()
    -    ib.put(vector.indices)
    -    bb.position(bb.position() + 4 * nonZeros)
    -    val db = bb.asDoubleBuffer()
    -    db.put(vector.values)
    -    bytes
    -  }
    -
    -  private[python] def serializeDoubleVector(vector: Vector): Array[Byte] = vector match {
    -    case s: SparseVector =>
    -      serializeSparseVector(s)
    -    case _ =>
    -      serializeDenseVector(vector.toArray)
    -  }
    -
    -  private[python] def deserializeDoubleMatrix(bytes: Array[Byte]): Array[Array[Double]] = {
    -    val packetLength = bytes.length
    -    if (packetLength < 9) {
    -      throw new IllegalArgumentException("Byte array too short.")
    +  private[python] class RatingPickler extends IObjectPickler {
    +    def pickle(obj: Object, out: OutputStream, pickler: Pickler) = {
    +      val rating: Rating = obj.asInstanceOf[Rating]
    +      reduce_object(out, pickler, "pyspark.mllib.recommendation", "Rating",
    +        rating.user.asInstanceOf[Object], rating.product.asInstanceOf[Object],
    +        rating.rating.asInstanceOf[Object])
         }
    -    val bb = ByteBuffer.wrap(bytes)
    -    bb.order(ByteOrder.nativeOrder())
    -    val magic = bb.get()
    -    if (magic != DENSE_MATRIX_MAGIC) {
    -      throw new IllegalArgumentException("Magic " + magic + " is wrong.")
    -    }
    -    val rows = bb.getInt()
    -    val cols = bb.getInt()
    -    if (packetLength != 9 + 8 * rows * cols) {
    -      throw new IllegalArgumentException("Size " + rows + "x" + cols + " is wrong.")
    -    }
    -    val db = bb.asDoubleBuffer()
    -    val ans = new Array[Array[Double]](rows.toInt)
    -    for (i <- 0 until rows.toInt) {
    -      ans(i) = new Array[Double](cols.toInt)
    -      db.get(ans(i))
    -    }
    -    ans
       }
     
    -  private[python] def serializeDoubleMatrix(doubles: Array[Array[Double]]): Array[Byte] = {
    -    val rows = doubles.length
    -    var cols = 0
    -    if (rows > 0) {
    -      cols = doubles(0).length
    -    }
    -    val bytes = new Array[Byte](9 + 8 * rows * cols)
    -    val bb = ByteBuffer.wrap(bytes)
    -    bb.order(ByteOrder.nativeOrder())
    -    bb.put(DENSE_MATRIX_MAGIC)
    -    bb.putInt(rows)
    -    bb.putInt(cols)
    -    val db = bb.asDoubleBuffer()
    -    for (i <- 0 until rows) {
    -      db.put(doubles(i))
    +  /**
    +   * Unpickle Rating
    +   */
    +  private[python] class RatingConstructor extends IObjectConstructor {
    +    def construct(args: Array[Object]) :Object = {
    +      if (args.length != 3) {
    +        throw new PickleException("should be 3")
    +      }
    +      new Rating(args(0).asInstanceOf[Int], args(1).asInstanceOf[Int],
    +        args(2).asInstanceOf[Double])
         }
    -    bytes
       }
     
    -  private[python] def serializeLabeledPoint(p: LabeledPoint): Array[Byte] = {
    -    val fb = serializeDoubleVector(p.features)
    -    val bytes = new Array[Byte](1 + 8 + fb.length)
    -    val bb = ByteBuffer.wrap(bytes)
    -    bb.order(ByteOrder.nativeOrder())
    -    bb.put(LABELED_POINT_MAGIC)
    -    bb.putDouble(p.label)
    -    bb.put(fb)
    -    bytes
    +  def initialize() = {
    --- End diff --
    
    Please add return type explicitly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on the pull request:

    https://github.com/apache/spark/pull/2378#issuecomment-56241679
  
    @davies LGTM except few linear algebra operators and caching. But those are orthogonal to this PR. I'm merging this and we will update the linear algebra ops later.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [WIP] [MLlib] [PySpark] use pickl...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2378#issuecomment-55497580
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20273/consoleFull) for   PR 2378 at commit [`8fe166a`](https://github.com/apache/spark/commit/8fe166a80c5162914a9393b9526bc14150ce2402).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [WIP] [MLlib] [PySpark] use pickl...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2378#discussion_r17631575
  
    --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala ---
    @@ -775,17 +775,38 @@ private[spark] object PythonRDD extends Logging {
         }.toJavaRDD()
       }
     
    +  private class AutoBatchedPickler(iter: Iterator[Any]) extends Iterator[Array[Byte]] {
    +    private val pickle = new Pickler()
    +    private var batch = 1
    +    private val buffer = new mutable.ArrayBuffer[Any]
    +
    +    override def hasNext(): Boolean = iter.hasNext
    +
    +    override def next(): Array[Byte] = {
    +      while (iter.hasNext && buffer.length < batch) {
    +        buffer += iter.next()
    +      }
    +      val bytes = pickle.dumps(buffer.toArray)
    +      val size = bytes.length
    +      // let  1M < size < 10M
    +      if (size < 1024 * 100) {
    +        batch = (1024 * 100) / size  // fast grow
    --- End diff --
    
    Good question. Without this fast path, `batch` may need to grow 15 times to become stable, it's good and safer. I will remove this fast path. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [WIP] [MLlib] [PySpark] use pickl...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2378#discussion_r17574578
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala ---
    @@ -60,18 +60,18 @@ class PythonMLLibAPI extends Serializable {
       def loadLabeledPoints(
           jsc: JavaSparkContext,
           path: String,
    -      minPartitions: Int): JavaRDD[Array[Byte]] =
    -    MLUtils.loadLabeledPoints(jsc.sc, path, minPartitions).map(SerDe.serializeLabeledPoint)
    +      minPartitions: Int): JavaRDD[LabeledPoint] =
    +    MLUtils.loadLabeledPoints(jsc.sc, path, minPartitions)
     
       private def trainRegressionModel(
           trainFunc: (RDD[LabeledPoint], Vector) => GeneralizedLinearModel,
    -      dataBytesJRDD: JavaRDD[Array[Byte]],
    +      dataJRDD: JavaRDD[Any],
           initialWeightsBA: Array[Byte]): java.util.LinkedList[java.lang.Object] = {
    -    val data = dataBytesJRDD.rdd.map(SerDe.deserializeLabeledPoint)
    -    val initialWeights = SerDe.deserializeDoubleVector(initialWeightsBA)
    +    val data = dataJRDD.rdd.map(_.asInstanceOf[LabeledPoint])
    --- End diff --
    
    maybe we can try `dataJRDD.rdd.asInstanceOf[RDD[LabeledPoint]]` instead of `map`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2378#discussion_r17686887
  
    --- Diff: python/pyspark/mllib/recommendation.py ---
    @@ -54,34 +64,51 @@ def __del__(self):
         def predict(self, user, product):
             return self._java_model.predict(user, product)
     
    -    def predictAll(self, usersProducts):
    -        usersProductsJRDD = _get_unmangled_rdd(usersProducts, _serialize_tuple)
    -        return RDD(self._java_model.predict(usersProductsJRDD._jrdd),
    -                   self._context, RatingDeserializer())
    +    def predictAll(self, user_product):
    +        assert isinstance(user_product, RDD), "user_product should be RDD of (user, product)"
    +        sc = self._context
    +        tuplerdd = sc._jvm.SerDe.asTupleRDD(user_product._to_java_object_rdd().rdd())
    +        jresult = self._java_model.predict(tuplerdd).toJavaRDD()
    +        return RDD(sc._jvm.PythonRDD.javaToPython(jresult), sc,
    +                   AutoBatchedSerializer(PickleSerializer()))
     
     
     class ALS(object):
     
         @classmethod
    +    def _prepare(cls, ratings):
    +        assert isinstance(ratings, RDD), "ratings should be RDD"
    +        first = ratings.first()
    +        if not isinstance(first, Rating):
    +            if isinstance(first, (tuple, list)):
    +                ratings = ratings.map(lambda x: Rating(*x))
    +            else:
    +                raise ValueError("rating should be RDD of Rating or tuple/list")
    +        # serialize them by AutoBatchedSerializer before cache to reduce the
    +        # objects overhead in JVM
    +        cached = ratings._reserialize(AutoBatchedSerializer(PickleSerializer())).cache()
    --- End diff --
    
    @mengxr Any comments about it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [WIP] [MLlib] [PySpark] use pickl...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2378#issuecomment-55706058
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20375/consoleFull) for   PR 2378 at commit [`708dc02`](https://github.com/apache/spark/commit/708dc0288d23385ff3638fd07fdff9efc3ff8272).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2378#discussion_r17760498
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala ---
    @@ -476,259 +436,167 @@ class PythonMLLibAPI extends Serializable {
           numRows: Long,
           numCols: Int,
           numPartitions: java.lang.Integer,
    -      seed: java.lang.Long): JavaRDD[Array[Byte]] = {
    +      seed: java.lang.Long): JavaRDD[Vector] = {
         val parts = getNumPartitionsOrDefault(numPartitions, jsc)
         val s = getSeedOrDefault(seed)
    -    RG.poissonVectorRDD(jsc.sc, mean, numRows, numCols, parts, s).map(SerDe.serializeDoubleVector)
    +    RG.poissonVectorRDD(jsc.sc, mean, numRows, numCols, parts, s)
       }
     
     }
     
     /**
    - * :: DeveloperApi ::
    - * MultivariateStatisticalSummary with Vector fields serialized.
    + * SerDe utility functions for PythonMLLibAPI.
      */
    -@DeveloperApi
    -class MultivariateStatisticalSummarySerialized(val summary: MultivariateStatisticalSummary)
    -  extends Serializable {
    +private[spark] object SerDe extends Serializable {
     
    -  def mean: Array[Byte] = SerDe.serializeDoubleVector(summary.mean)
    +  val PYSPARK_PACKAGE = "pyspark.mllib"
     
    -  def variance: Array[Byte] = SerDe.serializeDoubleVector(summary.variance)
    +  /**
    +   * Base class used for pickle
    +   */
    +  private[python] abstract class BasePickler[T: ClassTag]
    +    extends IObjectPickler with IObjectConstructor {
    +
    +    private val cls = implicitly[ClassTag[T]].runtimeClass
    +    private val module = PYSPARK_PACKAGE + "." + cls.getName.split('.')(4)
    +    private val name = cls.getSimpleName
    +
    +    // register this to Pickler and Unpickler
    +    def register(): Unit = {
    +      Pickler.registerCustomPickler(this.getClass, this)
    +      Pickler.registerCustomPickler(cls, this)
    +      Unpickler.registerConstructor(module, name, this)
    +    }
     
    -  def count: Long = summary.count
    +    def pickle(obj: Object, out: OutputStream, pickler: Pickler): Unit = {
    +      if (obj == this) {
    +        out.write(Opcodes.GLOBAL)
    +        out.write((module + "\n" + name + "\n").getBytes())
    +      } else {
    +        pickler.save(this)  // it will be memorized by Pickler
    +        saveState(obj, out, pickler)
    +        out.write(Opcodes.REDUCE)
    +      }
    +    }
    +
    +    private[python] def saveObjects(out: OutputStream, pickler: Pickler,
    +                                    objects: Any*) = {
    +      if (objects.length == 0 || objects.length > 3) {
    +        out.write(Opcodes.MARK)
    +      }
    +      objects.foreach(pickler.save(_))
    +      val code = objects.length match {
    +        case 1 => Opcodes.TUPLE1
    +        case 2 => Opcodes.TUPLE2
    +        case 3 => Opcodes.TUPLE3
    +        case _ => Opcodes.TUPLE
    +      }
    +      out.write(code)
    +    }
     
    -  def numNonzeros: Array[Byte] = SerDe.serializeDoubleVector(summary.numNonzeros)
    +    private[python] def saveState(obj: Object, out: OutputStream, pickler: Pickler)
    +  }
     
    -  def max: Array[Byte] = SerDe.serializeDoubleVector(summary.max)
    +  // Pickler for DenseVector
    +  private[python] class DenseVectorPickler
    +    extends BasePickler[DenseVector] {
     
    -  def min: Array[Byte] = SerDe.serializeDoubleVector(summary.min)
    -}
    +    def saveState(obj: Object, out: OutputStream, pickler: Pickler) = {
    +      val vector: DenseVector = obj.asInstanceOf[DenseVector]
    +      saveObjects(out, pickler, vector.toArray)
    +    }
     
    -/**
    - * SerDe utility functions for PythonMLLibAPI.
    - */
    -private[spark] object SerDe extends Serializable {
    -  private val DENSE_VECTOR_MAGIC: Byte = 1
    -  private val SPARSE_VECTOR_MAGIC: Byte = 2
    -  private val DENSE_MATRIX_MAGIC: Byte = 3
    -  private val LABELED_POINT_MAGIC: Byte = 4
    -
    -  private[python] def deserializeDoubleVector(bytes: Array[Byte], offset: Int = 0): Vector = {
    -    require(bytes.length - offset >= 5, "Byte array too short")
    -    val magic = bytes(offset)
    -    if (magic == DENSE_VECTOR_MAGIC) {
    -      deserializeDenseVector(bytes, offset)
    -    } else if (magic == SPARSE_VECTOR_MAGIC) {
    -      deserializeSparseVector(bytes, offset)
    -    } else {
    -      throw new IllegalArgumentException("Magic " + magic + " is wrong.")
    +    def construct(args: Array[Object]) :Object = {
    +      require(args.length == 1)
    +      new DenseVector(args(0).asInstanceOf[Array[Double]])
         }
       }
     
    -  private[python] def deserializeDouble(bytes: Array[Byte], offset: Int = 0): Double = {
    -    require(bytes.length - offset == 8, "Wrong size byte array for Double")
    -    val bb = ByteBuffer.wrap(bytes, offset, bytes.length - offset)
    -    bb.order(ByteOrder.nativeOrder())
    -    bb.getDouble
    -  }
    -
    -  private[python] def deserializeDenseVector(bytes: Array[Byte], offset: Int = 0): Vector = {
    -    val packetLength = bytes.length - offset
    -    require(packetLength >= 5, "Byte array too short")
    -    val bb = ByteBuffer.wrap(bytes, offset, bytes.length - offset)
    -    bb.order(ByteOrder.nativeOrder())
    -    val magic = bb.get()
    -    require(magic == DENSE_VECTOR_MAGIC, "Invalid magic: " + magic)
    -    val length = bb.getInt()
    -    require (packetLength == 5 + 8 * length, "Invalid packet length: " + packetLength)
    -    val db = bb.asDoubleBuffer()
    -    val ans = new Array[Double](length.toInt)
    -    db.get(ans)
    -    Vectors.dense(ans)
    -  }
    -
    -  private[python] def deserializeSparseVector(bytes: Array[Byte], offset: Int = 0): Vector = {
    -    val packetLength = bytes.length - offset
    -    require(packetLength >= 9, "Byte array too short")
    -    val bb = ByteBuffer.wrap(bytes, offset, bytes.length - offset)
    -    bb.order(ByteOrder.nativeOrder())
    -    val magic = bb.get()
    -    require(magic == SPARSE_VECTOR_MAGIC, "Invalid magic: " + magic)
    -    val size = bb.getInt()
    -    val nonZeros = bb.getInt()
    -    require (packetLength == 9 + 12 * nonZeros, "Invalid packet length: " + packetLength)
    -    val ib = bb.asIntBuffer()
    -    val indices = new Array[Int](nonZeros)
    -    ib.get(indices)
    -    bb.position(bb.position() + 4 * nonZeros)
    -    val db = bb.asDoubleBuffer()
    -    val values = new Array[Double](nonZeros)
    -    db.get(values)
    -    Vectors.sparse(size, indices, values)
    -  }
    +  // Pickler for DenseMatrix
    +  private[python] class DenseMatrixPickler
    +    extends BasePickler[DenseMatrix] {
     
    -  /**
    -   * Returns an 8-byte array for the input Double.
    -   *
    -   * Note: we currently do not use a magic byte for double for storage efficiency.
    -   * This should be reconsidered when we add Ser/De for other 8-byte types (e.g. Long), for safety.
    -   * The corresponding deserializer, deserializeDouble, needs to be modified as well if the
    -   * serialization scheme changes.
    -   */
    -  private[python] def serializeDouble(double: Double): Array[Byte] = {
    -    val bytes = new Array[Byte](8)
    -    val bb = ByteBuffer.wrap(bytes)
    -    bb.order(ByteOrder.nativeOrder())
    -    bb.putDouble(double)
    -    bytes
    -  }
    -
    -  private[python] def serializeDenseVector(doubles: Array[Double]): Array[Byte] = {
    -    val len = doubles.length
    -    val bytes = new Array[Byte](5 + 8 * len)
    -    val bb = ByteBuffer.wrap(bytes)
    -    bb.order(ByteOrder.nativeOrder())
    -    bb.put(DENSE_VECTOR_MAGIC)
    -    bb.putInt(len)
    -    val db = bb.asDoubleBuffer()
    -    db.put(doubles)
    -    bytes
    -  }
    -
    -  private[python] def serializeSparseVector(vector: SparseVector): Array[Byte] = {
    -    val nonZeros = vector.indices.length
    -    val bytes = new Array[Byte](9 + 12 * nonZeros)
    -    val bb = ByteBuffer.wrap(bytes)
    -    bb.order(ByteOrder.nativeOrder())
    -    bb.put(SPARSE_VECTOR_MAGIC)
    -    bb.putInt(vector.size)
    -    bb.putInt(nonZeros)
    -    val ib = bb.asIntBuffer()
    -    ib.put(vector.indices)
    -    bb.position(bb.position() + 4 * nonZeros)
    -    val db = bb.asDoubleBuffer()
    -    db.put(vector.values)
    -    bytes
    -  }
    -
    -  private[python] def serializeDoubleVector(vector: Vector): Array[Byte] = vector match {
    -    case s: SparseVector =>
    -      serializeSparseVector(s)
    -    case _ =>
    -      serializeDenseVector(vector.toArray)
    -  }
    -
    -  private[python] def deserializeDoubleMatrix(bytes: Array[Byte]): Array[Array[Double]] = {
    -    val packetLength = bytes.length
    -    if (packetLength < 9) {
    -      throw new IllegalArgumentException("Byte array too short.")
    +    def saveState(obj: Object, out: OutputStream, pickler: Pickler) = {
    +      val m: DenseMatrix = obj.asInstanceOf[DenseMatrix]
    +      saveObjects(out, pickler, m.numRows, m.numCols, m.values)
         }
    -    val bb = ByteBuffer.wrap(bytes)
    -    bb.order(ByteOrder.nativeOrder())
    -    val magic = bb.get()
    -    if (magic != DENSE_MATRIX_MAGIC) {
    -      throw new IllegalArgumentException("Magic " + magic + " is wrong.")
    +
    +    def construct(args: Array[Object]) :Object = {
    +      require(args.length == 3)
    +      new DenseMatrix(args(0).asInstanceOf[Int], args(1).asInstanceOf[Int],
    +        args(2).asInstanceOf[Array[Double]])
         }
    -    val rows = bb.getInt()
    -    val cols = bb.getInt()
    -    if (packetLength != 9 + 8 * rows * cols) {
    -      throw new IllegalArgumentException("Size " + rows + "x" + cols + " is wrong.")
    +  }
    +
    +  // Pickler for SparseVector
    +  private[python] class SparseVectorPickler
    +    extends BasePickler[SparseVector] {
    +
    +    def saveState(obj: Object, out: OutputStream, pickler: Pickler) = {
    +      val v: SparseVector = obj.asInstanceOf[SparseVector]
    +      saveObjects(out, pickler, v.size, v.indices, v.values)
         }
    -    val db = bb.asDoubleBuffer()
    -    val ans = new Array[Array[Double]](rows.toInt)
    -    for (i <- 0 until rows.toInt) {
    -      ans(i) = new Array[Double](cols.toInt)
    -      db.get(ans(i))
    +
    +    def construct(args: Array[Object]) :Object = {
    +      require(args.length == 3)
    +      new SparseVector(args(0).asInstanceOf[Int], args(1).asInstanceOf[Array[Int]],
    +        args(2).asInstanceOf[Array[Double]])
         }
    -    ans
       }
     
    -  private[python] def serializeDoubleMatrix(doubles: Array[Array[Double]]): Array[Byte] = {
    -    val rows = doubles.length
    -    var cols = 0
    -    if (rows > 0) {
    -      cols = doubles(0).length
    +  // Pickler for LabeledPoint
    +  private[python] class LabeledPointPickler
    +    extends BasePickler[LabeledPoint] {
    +
    +    def saveState(obj: Object, out: OutputStream, pickler: Pickler) = {
    +      val point: LabeledPoint = obj.asInstanceOf[LabeledPoint]
    +      saveObjects(out, pickler, point.label, point.features)
         }
    -    val bytes = new Array[Byte](9 + 8 * rows * cols)
    -    val bb = ByteBuffer.wrap(bytes)
    -    bb.order(ByteOrder.nativeOrder())
    -    bb.put(DENSE_MATRIX_MAGIC)
    -    bb.putInt(rows)
    -    bb.putInt(cols)
    -    val db = bb.asDoubleBuffer()
    -    for (i <- 0 until rows) {
    -      db.put(doubles(i))
    +
    +    def construct(args: Array[Object]) :Object = {
    +      if (args.length != 2) {
    +        throw new PickleException("should be 2")
    +      }
    +      new LabeledPoint(args(0).asInstanceOf[Double], args(1).asInstanceOf[Vector])
         }
    -    bytes
       }
     
    -  private[python] def serializeLabeledPoint(p: LabeledPoint): Array[Byte] = {
    -    val fb = serializeDoubleVector(p.features)
    -    val bytes = new Array[Byte](1 + 8 + fb.length)
    -    val bb = ByteBuffer.wrap(bytes)
    -    bb.order(ByteOrder.nativeOrder())
    -    bb.put(LABELED_POINT_MAGIC)
    -    bb.putDouble(p.label)
    -    bb.put(fb)
    -    bytes
    -  }
    +  // Pickler for Rating
    +  private[python] class RatingPickler
    +    extends BasePickler[Rating] {
     
    -  private[python] def deserializeLabeledPoint(bytes: Array[Byte]): LabeledPoint = {
    -    require(bytes.length >= 9, "Byte array too short")
    -    val magic = bytes(0)
    -    if (magic != LABELED_POINT_MAGIC) {
    -      throw new IllegalArgumentException("Magic " + magic + " is wrong.")
    +    def saveState(obj: Object, out: OutputStream, pickler: Pickler) = {
    +      val rating: Rating = obj.asInstanceOf[Rating]
    +      saveObjects(out, pickler, rating.user, rating.product, rating.rating)
         }
    -    val labelBytes = ByteBuffer.wrap(bytes, 1, 8)
    -    labelBytes.order(ByteOrder.nativeOrder())
    -    val label = labelBytes.asDoubleBuffer().get(0)
    -    LabeledPoint(label, deserializeDoubleVector(bytes, 9))
    -  }
     
    -  // Reformat a Matrix into Array[Array[Double]] for serialization
    -  private[python] def to2dArray(matrix: Matrix): Array[Array[Double]] = {
    -    val values = matrix.toArray
    -    Array.tabulate(matrix.numRows, matrix.numCols)((i, j) => values(i + j * matrix.numRows))
    +    def construct(args: Array[Object]) :Object = {
    +      if (args.length != 3) {
    +        throw new PickleException("should be 3")
    +      }
    +      new Rating(args(0).asInstanceOf[Int], args(1).asInstanceOf[Int],
    +        args(2).asInstanceOf[Double])
    +    }
       }
     
    +  def initialize(): Unit = {
    +    new DenseVectorPickler().register()
    +    new DenseMatrixPickler().register()
    +    new SparseVectorPickler().register()
    +    new LabeledPointPickler().register()
    +    new RatingPickler().register()
    +  }
     
    -  /** Unpack a Rating object from an array of bytes */
    -  private[python] def unpackRating(ratingBytes: Array[Byte]): Rating = {
    -    val bb = ByteBuffer.wrap(ratingBytes)
    -    bb.order(ByteOrder.nativeOrder())
    -    val user = bb.getInt()
    -    val product = bb.getInt()
    -    val rating = bb.getDouble()
    -    new Rating(user, product, rating)
    +  def dumps(obj: AnyRef): Array[Byte] = {
    +    new Pickler().dumps(obj)
       }
     
    -  /** Unpack a tuple of Ints from an array of bytes */
    -  def unpackTuple(tupleBytes: Array[Byte]): (Int, Int) = {
    -    val bb = ByteBuffer.wrap(tupleBytes)
    -    bb.order(ByteOrder.nativeOrder())
    -    val v1 = bb.getInt()
    -    val v2 = bb.getInt()
    -    (v1, v2)
    +  def loads(bytes: Array[Byte]): AnyRef = {
    +    new Unpickler().loads(bytes)
       }
     
    -  /**
    -   * Serialize a Rating object into an array of bytes.
    -   * It can be deserialized using RatingDeserializer().
    -   *
    -   * @param rate the Rating object to serialize
    -   * @return
    -   */
    -  def serializeRating(rate: Rating): Array[Byte] = {
    -    val len = 3
    -    val bytes = new Array[Byte](4 + 8 * len)
    -    val bb = ByteBuffer.wrap(bytes)
    -    bb.order(ByteOrder.nativeOrder())
    -    bb.putInt(len)
    -    val db = bb.asDoubleBuffer()
    -    db.put(rate.user.toDouble)
    -    db.put(rate.product.toDouble)
    -    db.put(rate.rating)
    -    bytes
    +  /* convert object into Tuple */
    +  def asTupleRDD(rdd: RDD[Array[Any]]): RDD[(Int, Int)] = {
    --- End diff --
    
    OK, sounds good.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [WIP] [MLlib] [PySpark] use pickl...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2378#issuecomment-55851024
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20449/consoleFull) for   PR 2378 at commit [`19d0967`](https://github.com/apache/spark/commit/19d096783b60e741173f48f2944d91f650616140).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [WIP] [MLlib] [PySpark] use pickl...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2378#issuecomment-55711136
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20375/consoleFull) for   PR 2378 at commit [`708dc02`](https://github.com/apache/spark/commit/708dc0288d23385ff3638fd07fdff9efc3ff8272).
     * This patch **fails** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [WIP] [MLlib] [PySpark] use pickl...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2378#issuecomment-55855022
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/121/consoleFull) for   PR 2378 at commit [`1fccf1a`](https://github.com/apache/spark/commit/1fccf1adc91e78a6c9e65f4ae14ba770a7eecd2c).
     * This patch **passes** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [WIP] [MLlib] [PySpark] use pickl...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2378#issuecomment-55716038
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20377/consoleFull) for   PR 2378 at commit [`44736d7`](https://github.com/apache/spark/commit/44736d7d849a523419006b565cf51fa732e8854c).
     * This patch **fails** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [WIP] [MLlib] [PySpark] use pickl...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2378#issuecomment-55552292
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20320/consoleFull) for   PR 2378 at commit [`722dd96`](https://github.com/apache/spark/commit/722dd96976d6a083b0ddb985ac6c518c791bce39).
     * This patch **fails** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [WIP] [MLlib] [PySpark] use pickl...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2378#issuecomment-55807054
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/117/consoleFull) for   PR 2378 at commit [`9ceff73`](https://github.com/apache/spark/commit/9ceff7360427e9b36d7151c5f296d0ce199610dc).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2378#issuecomment-56216817
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20576/consoleFull) for   PR 2378 at commit [`dffbba2`](https://github.com/apache/spark/commit/dffbba2ba206bbbd3dfc740a55f1b0df341860e7).
     * This patch **passes** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [WIP] [MLlib] [PySpark] use pickl...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2378#issuecomment-55815158
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/117/consoleFull) for   PR 2378 at commit [`9ceff73`](https://github.com/apache/spark/commit/9ceff7360427e9b36d7151c5f296d0ce199610dc).
     * This patch **fails** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:

    https://github.com/apache/spark/pull/2378#issuecomment-56211052
  
    @mengxr In this PR, I just tried to avoid other changes except serialization, we could change the cache behavior or compression later.
    
    It's will be good to have some number of about the performance regression, I only see 5% regression in  LogisticRegressionWithSGD.train() with small dataset (locally).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [WIP] [MLlib] [PySpark] use pickl...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2378#discussion_r17574385
  
    --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala ---
    @@ -778,8 +778,8 @@ private[spark] object PythonRDD extends Logging {
       def javaToPython(jRDD: JavaRDD[Any]): JavaRDD[Array[Byte]] = {
         jRDD.rdd.mapPartitions { iter =>
           val pickle = new Pickler
    -      iter.map { row =>
    -        pickle.dumps(row)
    +      iter.grouped(1024).map { rows =>
    --- End diff --
    
    Shall we divide groups based on the serialized size?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2378#issuecomment-56104238
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20551/consoleFull) for   PR 2378 at commit [`bd738ab`](https://github.com/apache/spark/commit/bd738abee534f467c0fb707f414d854af128fec5).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2378#discussion_r17702050
  
    --- Diff: python/pyspark/mllib/linalg.py ---
    @@ -257,10 +410,34 @@ def stringify(vector):
             >>> Vectors.stringify(Vectors.dense([0.0, 1.0]))
             '[0.0,1.0]'
             """
    -        if type(vector) == SparseVector:
    -            return str(vector)
    -        else:
    -            return "[" + ",".join([str(v) for v in vector]) + "]"
    +        return str(vector)
    +
    +
    +class Matrix(Vector):
    +    """ the Matrix """
    +
    +
    +class DenseMatrix(Matrix):
    +    def __init__(self, nRow, nCol, values):
    +        assert len(values) == nRow * nCol
    +        self.nRow = nRow
    --- End diff --
    
    Should nRow and nCol not belong to the Matrix class?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [WIP] [MLlib] [PySpark] use pickl...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2378#issuecomment-55699312
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20371/consoleFull) for   PR 2378 at commit [`df19464`](https://github.com/apache/spark/commit/df194640e7dd72d9c6413ec2935889d422a41de2).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [WIP] [MLlib] [PySpark] use pickl...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2378#issuecomment-55487217
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20265/consoleFull) for   PR 2378 at commit [`f1544c4`](https://github.com/apache/spark/commit/f1544c47917836d7ef77353c467182cc5cc7addb).
     * This patch **fails** unit tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class Vector(object):`
      * `class DenseVector(Vector):`
      * `class SparseVector(Vector):`
      * `class Matrix(object):`
      * `class DenseMatrix(Matrix):`
      * `class Rating(object):`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [WIP] [MLlib] [PySpark] use pickl...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2378#issuecomment-55487600
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20267/consoleFull) for   PR 2378 at commit [`aa2287e`](https://github.com/apache/spark/commit/aa2287ec75998bbc5512a37d5415dc2115615533).
     * This patch **fails** unit tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class JavaSparkContext(val sc: SparkContext)`
      * `class TaskCompletionListenerException(errorMessages: Seq[String]) extends Exception `
      * `        class Dummy(object):`
      * `class Vector(object):`
      * `class DenseVector(Vector):`
      * `class SparseVector(Vector):`
      * `class Matrix(object):`
      * `class DenseMatrix(Matrix):`
      * `class Rating(object):`
      * `class JavaStreamingContext(val ssc: StreamingContext) extends Closeable `



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [WIP] [MLlib] [PySpark] use pickl...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2378#issuecomment-55485771
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20259/consoleFull) for   PR 2378 at commit [`b30ef35`](https://github.com/apache/spark/commit/b30ef35ec7830cee08b4f8d692da26d98cac70e8).
     * This patch **passes** unit tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `class JavaSparkContext(val sc: SparkContext)`
      * `class Rating(object):`
      * `class JavaStreamingContext(val ssc: StreamingContext) extends Closeable `



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2378#issuecomment-55860805
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20453/consoleFull) for   PR 2378 at commit [`e431377`](https://github.com/apache/spark/commit/e431377170172571974aadcae7ff42d3a79e2cd9).
     * This patch **passes** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...

Posted by staple <gi...@git.apache.org>.
Github user staple commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2378#discussion_r17686208
  
    --- Diff: python/pyspark/mllib/recommendation.py ---
    @@ -54,34 +64,51 @@ def __del__(self):
         def predict(self, user, product):
             return self._java_model.predict(user, product)
     
    -    def predictAll(self, usersProducts):
    -        usersProductsJRDD = _get_unmangled_rdd(usersProducts, _serialize_tuple)
    -        return RDD(self._java_model.predict(usersProductsJRDD._jrdd),
    -                   self._context, RatingDeserializer())
    +    def predictAll(self, user_product):
    +        assert isinstance(user_product, RDD), "user_product should be RDD of (user, product)"
    +        sc = self._context
    +        tuplerdd = sc._jvm.SerDe.asTupleRDD(user_product._to_java_object_rdd().rdd())
    +        jresult = self._java_model.predict(tuplerdd).toJavaRDD()
    +        return RDD(sc._jvm.PythonRDD.javaToPython(jresult), sc,
    +                   AutoBatchedSerializer(PickleSerializer()))
     
     
     class ALS(object):
     
         @classmethod
    +    def _prepare(cls, ratings):
    +        assert isinstance(ratings, RDD), "ratings should be RDD"
    +        first = ratings.first()
    +        if not isinstance(first, Rating):
    +            if isinstance(first, (tuple, list)):
    +                ratings = ratings.map(lambda x: Rating(*x))
    +            else:
    +                raise ValueError("rating should be RDD of Rating or tuple/list")
    +        # serialize them by AutoBatchedSerializer before cache to reduce the
    +        # objects overhead in JVM
    +        cached = ratings._reserialize(AutoBatchedSerializer(PickleSerializer())).cache()
    --- End diff --
    
    Hi, just wanted to check on your decision to cache for ALS. It looks like in ALS the makeLinkRDDs calls handle persistence for a transformation of the input data. Though there are two calls to makeLinkRDDs, so perhaps two reads of the input data. Are those two reads the reason for caching here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [WIP] [MLlib] [PySpark] use pickl...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2378#issuecomment-55499182
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20273/consoleFull) for   PR 2378 at commit [`8fe166a`](https://github.com/apache/spark/commit/8fe166a80c5162914a9393b9526bc14150ce2402).
     * This patch **fails** unit tests.
     * This patch merges cleanly.
     * This patch adds the following public classes _(experimental)_:
      * `  class ArrayConstructor extends net.razorvine.pickle.objects.ArrayConstructor `
      * `class Vector(object):`
      * `class DenseVector(Vector):`
      * `class SparseVector(Vector):`
      * `class Matrix(object):`
      * `class DenseMatrix(Matrix):`
      * `class Rating(object):`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [WIP] [MLlib] [PySpark] use pickl...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2378#issuecomment-55532844
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20307/consoleFull) for   PR 2378 at commit [`0ee1525`](https://github.com/apache/spark/commit/0ee1525054e6ab75ef4b456fe1de148ef866de4e).
     * This patch **fails** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on the pull request:

    https://github.com/apache/spark/pull/2378#issuecomment-55987147
  
    @davies  This looks like a great PR!  I don’t see major issues, though +1 to the remarks about checking for performance regressions.  Pending performance testing and my small comments, this looks good to me.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2378#discussion_r17701626
  
    --- Diff: python/pyspark/mllib/linalg.py ---
    @@ -61,16 +195,19 @@ def __init__(self, size, *args):
                 if type(pairs) == dict:
                     pairs = pairs.items()
                 pairs = sorted(pairs)
    -            self.indices = array([p[0] for p in pairs], dtype=int32)
    -            self.values = array([p[1] for p in pairs], dtype=float64)
    +            self.indices = array.array('i', [p[0] for p in pairs])
    +            self.values = array.array('d', [p[1] for p in pairs])
             else:
                 assert len(args[0]) == len(args[1]), "index and value arrays not same length"
    -            self.indices = array(args[0], dtype=int32)
    -            self.values = array(args[1], dtype=float64)
    +            self.indices = array.array('i', args[0])
    +            self.values = array.array('d', args[1])
                 for i in xrange(len(self.indices) - 1):
                     if self.indices[i] >= self.indices[i + 1]:
                         raise TypeError("indices array must be sorted")
     
    +    def __reduce__(self):
    +        return (SparseVector, (self.size, self.indices, self.values))
    +
         def dot(self, other):
             """
             Dot product with a SparseVector or 1- or 2-dimensional Numpy array.
    --- End diff --
    
    Update doc to say dot product with each column if given a 2-d array?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [WIP] [MLlib] [PySpark] use pickl...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2378#discussion_r17630886
  
    --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala ---
    @@ -775,17 +775,38 @@ private[spark] object PythonRDD extends Logging {
         }.toJavaRDD()
       }
     
    +  private class AutoBatchedPickler(iter: Iterator[Any]) extends Iterator[Array[Byte]] {
    +    private val pickle = new Pickler()
    +    private var batch = 1
    +    private val buffer = new mutable.ArrayBuffer[Any]
    +
    +    override def hasNext(): Boolean = iter.hasNext
    +
    +    override def next(): Array[Byte] = {
    +      while (iter.hasNext && buffer.length < batch) {
    +        buffer += iter.next()
    +      }
    +      val bytes = pickle.dumps(buffer.toArray)
    +      val size = bytes.length
    +      // let  1M < size < 10M
    +      if (size < 1024 * 100) {
    +        batch = (1024 * 100) / size  // fast grow
    --- End diff --
    
    If the first record is small, e.g., a SparseVector with a single nonzero, and the records followed are large vectors, line 789 may cause memory problems. Does it give significant performance gain? under what circumstances?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2378#issuecomment-55855377
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20453/consoleFull) for   PR 2378 at commit [`e431377`](https://github.com/apache/spark/commit/e431377170172571974aadcae7ff42d3a79e2cd9).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [WIP] [MLlib] [PySpark] use pickl...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2378#issuecomment-55829999
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20433/consoleFull) for   PR 2378 at commit [`1fccf1a`](https://github.com/apache/spark/commit/1fccf1adc91e78a6c9e65f4ae14ba770a7eecd2c).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2378#issuecomment-56110037
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20554/consoleFull) for   PR 2378 at commit [`032cd62`](https://github.com/apache/spark/commit/032cd62cee6b2bd134f6b9017a7e68ef333990a5).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:

    https://github.com/apache/spark/pull/2378#issuecomment-55916761
  
    @mengxr it's ready to review now, thanks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2378#issuecomment-55855348
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/122/consoleFull) for   PR 2378 at commit [`e431377`](https://github.com/apache/spark/commit/e431377170172571974aadcae7ff42d3a79e2cd9).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2378#discussion_r17697102
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala ---
    @@ -476,259 +436,167 @@ class PythonMLLibAPI extends Serializable {
           numRows: Long,
           numCols: Int,
           numPartitions: java.lang.Integer,
    -      seed: java.lang.Long): JavaRDD[Array[Byte]] = {
    +      seed: java.lang.Long): JavaRDD[Vector] = {
         val parts = getNumPartitionsOrDefault(numPartitions, jsc)
         val s = getSeedOrDefault(seed)
    -    RG.poissonVectorRDD(jsc.sc, mean, numRows, numCols, parts, s).map(SerDe.serializeDoubleVector)
    +    RG.poissonVectorRDD(jsc.sc, mean, numRows, numCols, parts, s)
       }
     
     }
     
     /**
    - * :: DeveloperApi ::
    - * MultivariateStatisticalSummary with Vector fields serialized.
    + * SerDe utility functions for PythonMLLibAPI.
      */
    -@DeveloperApi
    -class MultivariateStatisticalSummarySerialized(val summary: MultivariateStatisticalSummary)
    -  extends Serializable {
    +private[spark] object SerDe extends Serializable {
     
    -  def mean: Array[Byte] = SerDe.serializeDoubleVector(summary.mean)
    +  val PYSPARK_PACKAGE = "pyspark.mllib"
     
    -  def variance: Array[Byte] = SerDe.serializeDoubleVector(summary.variance)
    +  /**
    +   * Base class used for pickle
    +   */
    +  private[python] abstract class BasePickler[T: ClassTag]
    +    extends IObjectPickler with IObjectConstructor {
    +
    +    private val cls = implicitly[ClassTag[T]].runtimeClass
    +    private val module = PYSPARK_PACKAGE + "." + cls.getName.split('.')(4)
    +    private val name = cls.getSimpleName
    +
    +    // register this to Pickler and Unpickler
    +    def register(): Unit = {
    +      Pickler.registerCustomPickler(this.getClass, this)
    +      Pickler.registerCustomPickler(cls, this)
    +      Unpickler.registerConstructor(module, name, this)
    +    }
     
    -  def count: Long = summary.count
    +    def pickle(obj: Object, out: OutputStream, pickler: Pickler): Unit = {
    +      if (obj == this) {
    +        out.write(Opcodes.GLOBAL)
    +        out.write((module + "\n" + name + "\n").getBytes())
    +      } else {
    +        pickler.save(this)  // it will be memorized by Pickler
    +        saveState(obj, out, pickler)
    +        out.write(Opcodes.REDUCE)
    +      }
    +    }
    +
    +    private[python] def saveObjects(out: OutputStream, pickler: Pickler,
    +                                    objects: Any*) = {
    --- End diff --
    
    Can fit def on 1 line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [WIP] [MLlib] [PySpark] use pickl...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2378#discussion_r17574396
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala ---
    @@ -472,214 +452,140 @@ class PythonMLLibAPI extends Serializable {
           numRows: Long,
           numCols: Int,
           numPartitions: java.lang.Integer,
    -      seed: java.lang.Long): JavaRDD[Array[Byte]] = {
    +      seed: java.lang.Long): JavaRDD[Vector] = {
         val parts = getNumPartitionsOrDefault(numPartitions, jsc)
         val s = getSeedOrDefault(seed)
    -    RG.poissonVectorRDD(jsc.sc, mean, numRows, numCols, parts, s).map(SerDe.serializeDoubleVector)
    +    RG.poissonVectorRDD(jsc.sc, mean, numRows, numCols, parts, s)
       }
     
     }
     
     /**
    - * :: DeveloperApi ::
    - * MultivariateStatisticalSummary with Vector fields serialized.
    + * SerDe utility functions for PythonMLLibAPI.
      */
    -@DeveloperApi
    -class MultivariateStatisticalSummarySerialized(val summary: MultivariateStatisticalSummary)
    -  extends Serializable {
    +private[spark] object SerDe extends Serializable {
     
    -  def mean: Array[Byte] = SerDe.serializeDoubleVector(summary.mean)
    +  private[python] def reduce_object(out: OutputStream, pickler: Pickler,
    --- End diff --
    
    use camelCase for method names


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [WIP] [MLlib] [PySpark] use pickl...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on the pull request:

    https://github.com/apache/spark/pull/2378#issuecomment-55795901
  
    @davies Couple Python tests failed with this change. Could you fix them?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [WIP] [MLlib] [PySpark] use pickl...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2378#issuecomment-55518181
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20304/consoleFull) for   PR 2378 at commit [`b02e34f`](https://github.com/apache/spark/commit/b02e34f53f8e0ba992477b20def58ddf356aa3f1).
     * This patch **fails** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [WIP] [MLlib] [PySpark] use pickl...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2378#issuecomment-55850933
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/121/consoleFull) for   PR 2378 at commit [`1fccf1a`](https://github.com/apache/spark/commit/1fccf1adc91e78a6c9e65f4ae14ba770a7eecd2c).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [WIP] [MLlib] [PySpark] use pickl...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2378#discussion_r17632544
  
    --- Diff: core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala ---
    @@ -775,17 +775,38 @@ private[spark] object PythonRDD extends Logging {
         }.toJavaRDD()
       }
     
    +  private class AutoBatchedPickler(iter: Iterator[Any]) extends Iterator[Array[Byte]] {
    +    private val pickle = new Pickler()
    +    private var batch = 1
    +    private val buffer = new mutable.ArrayBuffer[Any]
    +
    +    override def hasNext(): Boolean = iter.hasNext
    +
    +    override def next(): Array[Byte] = {
    +      while (iter.hasNext && buffer.length < batch) {
    +        buffer += iter.next()
    +      }
    +      val bytes = pickle.dumps(buffer.toArray)
    +      val size = bytes.length
    +      // let  1M < size < 10M
    +      if (size < 1024 * 100) {
    +        batch = (1024 * 100) / size  // fast grow
    +      } else if (size < 1024 * 1024) {
    +        batch *= 2
    +      } else if (size > 1024 * 1024 * 10) {
    +        batch /= 2
    --- End diff --
    
    If the first record is very large, `batch` will be 0.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [WIP] [MLlib] [PySpark] use pickl...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2378#issuecomment-55699370
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20371/consoleFull) for   PR 2378 at commit [`df19464`](https://github.com/apache/spark/commit/df194640e7dd72d9c6413ec2935889d422a41de2).
     * This patch **fails** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:

    https://github.com/apache/spark/pull/2378#issuecomment-56109944
  
    @jkbradley I should have addressed all your comments, or leave comments if I have not figure out how to do now, thanks for reviewing this huge PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [WIP] [MLlib] [PySpark] use pickl...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2378#issuecomment-55712664
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20376/consoleFull) for   PR 2378 at commit [`e1d1bfc`](https://github.com/apache/spark/commit/e1d1bfce4b464e6b14f649081155faf7c4d28471).
     * This patch **fails** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2378#discussion_r17700424
  
    --- Diff: python/pyspark/mllib/linalg.py ---
    @@ -23,14 +23,148 @@
     SciPy is available in their environment.
     """
     
    -import numpy
    -from numpy import array, array_equal, ndarray, float64, int32
    +import sys
    +import array
    +import copy_reg
     
    +import numpy as np
     
    -__all__ = ['SparseVector', 'Vectors']
    +__all__ = ['Vector', 'DenseVector', 'SparseVector', 'Vectors']
     
     
    -class SparseVector(object):
    +if sys.version_info[:2] == (2, 7):
    +    # speed up pickling array in Python 2.7
    +    def fast_pickle_array(ar):
    +        return array.array, (ar.typecode, ar.tostring())
    +    copy_reg.pickle(array.array, fast_pickle_array)
    +
    +
    +# Check whether we have SciPy. MLlib works without it too, but if we have it, some methods,
    +# such as _dot and _serialize_double_vector, start to support scipy.sparse matrices.
    +
    +try:
    +    import scipy.sparse
    +    _have_scipy = True
    +except:
    +    # No SciPy in environment, but that's okay
    +    _have_scipy = False
    +
    +
    +def _convert_to_vector(l):
    +    if isinstance(l, Vector):
    +        return l
    +    elif type(l) in (array.array, np.array, np.ndarray, list, tuple):
    +        return DenseVector(l)
    +    elif _have_scipy and scipy.sparse.issparse(l):
    +        assert l.shape[1] == 1, "Expected column vector"
    +        csc = l.tocsc()
    +        return SparseVector(l.shape[0], csc.indices, csc.data)
    +    else:
    +        raise TypeError("Cannot convert type %s into Vector" % type(l))
    +
    +
    +class Vector(object):
    +    """
    +    Abstract class for DenseVector and SparseVector
    +    """
    +    def toArray(self):
    +        """
    +        Convert the vector into an numpy.ndarray
    +        :return: numpy.ndarray
    +        """
    +        raise NotImplementedError
    +
    +
    +class DenseVector(Vector):
    +    def __init__(self, ar):
    +        if not isinstance(ar, array.array):
    +            ar = array.array('d', ar)
    +        self.array = ar
    +
    +    def __reduce__(self):
    +        return DenseVector, (self.array,)
    +
    +    def dot(self, other):
    +        """
    +        Compute the dot product of two Vectors. We support
    +        (Numpy array, list, SparseVector, or SciPy sparse)
    +        and a target NumPy array that is either 1- or 2-dimensional.
    +        Equivalent to calling numpy.dot of the two vectors.
    +
    +        >>> dense = DenseVector(array.array('d', [1., 2.]))
    +        >>> dense.dot(dense)
    +        5.0
    +        >>> dense.dot(SparseVector(2, [0, 1], [2., 1.]))
    +        4.0
    +        >>> dense.dot(range(1, 3))
    +        5.0
    +        >>> dense.dot(np.array(range(1, 3)))
    +        5.0
    +        """
    +        if isinstance(other, SparseVector):
    +            return other.dot(self)
    +        elif _have_scipy and scipy.sparse.issparse(other):
    +            return other.transpose().dot(self.toArray())[0]
    +        elif isinstance(other, Vector):
    +            return np.dot(self.toArray(), other.toArray())
    +        else:
    +            return np.dot(self.toArray(), other)
    +
    +    def squared_distance(self, other):
    +        """
    +        Squared distance of two Vectors.
    +
    +        >>> dense1 = DenseVector(array.array('d', [1., 2.]))
    +        >>> dense1.squared_distance(dense1)
    +        0.0
    +        >>> dense2 = np.array([2., 1.])
    +        >>> dense1.squared_distance(dense2)
    +        2.0
    +        >>> dense3 = [2., 1.]
    +        >>> dense1.squared_distance(dense2)
    --- End diff --
    
    "dense2" --> "dense3"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2378#discussion_r17700987
  
    --- Diff: python/pyspark/mllib/linalg.py ---
    @@ -23,14 +23,148 @@
     SciPy is available in their environment.
     """
     
    -import numpy
    -from numpy import array, array_equal, ndarray, float64, int32
    +import sys
    +import array
    +import copy_reg
     
    +import numpy as np
     
    -__all__ = ['SparseVector', 'Vectors']
    +__all__ = ['Vector', 'DenseVector', 'SparseVector', 'Vectors']
     
     
    -class SparseVector(object):
    +if sys.version_info[:2] == (2, 7):
    +    # speed up pickling array in Python 2.7
    +    def fast_pickle_array(ar):
    +        return array.array, (ar.typecode, ar.tostring())
    +    copy_reg.pickle(array.array, fast_pickle_array)
    +
    +
    +# Check whether we have SciPy. MLlib works without it too, but if we have it, some methods,
    +# such as _dot and _serialize_double_vector, start to support scipy.sparse matrices.
    +
    +try:
    +    import scipy.sparse
    +    _have_scipy = True
    +except:
    +    # No SciPy in environment, but that's okay
    +    _have_scipy = False
    +
    +
    +def _convert_to_vector(l):
    +    if isinstance(l, Vector):
    +        return l
    +    elif type(l) in (array.array, np.array, np.ndarray, list, tuple):
    +        return DenseVector(l)
    +    elif _have_scipy and scipy.sparse.issparse(l):
    +        assert l.shape[1] == 1, "Expected column vector"
    +        csc = l.tocsc()
    +        return SparseVector(l.shape[0], csc.indices, csc.data)
    +    else:
    +        raise TypeError("Cannot convert type %s into Vector" % type(l))
    +
    +
    +class Vector(object):
    +    """
    +    Abstract class for DenseVector and SparseVector
    +    """
    +    def toArray(self):
    +        """
    +        Convert the vector into an numpy.ndarray
    +        :return: numpy.ndarray
    +        """
    +        raise NotImplementedError
    +
    +
    +class DenseVector(Vector):
    +    def __init__(self, ar):
    +        if not isinstance(ar, array.array):
    +            ar = array.array('d', ar)
    +        self.array = ar
    +
    +    def __reduce__(self):
    +        return DenseVector, (self.array,)
    +
    +    def dot(self, other):
    +        """
    +        Compute the dot product of two Vectors. We support
    +        (Numpy array, list, SparseVector, or SciPy sparse)
    +        and a target NumPy array that is either 1- or 2-dimensional.
    +        Equivalent to calling numpy.dot of the two vectors.
    +
    +        >>> dense = DenseVector(array.array('d', [1., 2.]))
    +        >>> dense.dot(dense)
    +        5.0
    +        >>> dense.dot(SparseVector(2, [0, 1], [2., 1.]))
    +        4.0
    +        >>> dense.dot(range(1, 3))
    +        5.0
    +        >>> dense.dot(np.array(range(1, 3)))
    +        5.0
    +        """
    +        if isinstance(other, SparseVector):
    +            return other.dot(self)
    +        elif _have_scipy and scipy.sparse.issparse(other):
    +            return other.transpose().dot(self.toArray())[0]
    +        elif isinstance(other, Vector):
    +            return np.dot(self.toArray(), other.toArray())
    +        else:
    +            return np.dot(self.toArray(), other)
    +
    +    def squared_distance(self, other):
    +        """
    +        Squared distance of two Vectors.
    +
    +        >>> dense1 = DenseVector(array.array('d', [1., 2.]))
    +        >>> dense1.squared_distance(dense1)
    +        0.0
    +        >>> dense2 = np.array([2., 1.])
    +        >>> dense1.squared_distance(dense2)
    +        2.0
    +        >>> dense3 = [2., 1.]
    +        >>> dense1.squared_distance(dense2)
    +        2.0
    +        >>> sparse1 = SparseVector(2, [0, 1], [2., 1.])
    +        >>> dense1.squared_distance(sparse1)
    +        2.0
    +        """
    +        if isinstance(other, SparseVector):
    +            return other.squared_distance(self)
    +        elif _have_scipy and scipy.sparse.issparse(other):
    +            return _convert_to_vector(other).squared_distance(self)
    +
    +        if isinstance(other, Vector):
    +            other = other.toArray()
    +        elif not isinstance(other, np.ndarray):
    +            other = np.array(other)
    +        diff = self.toArray() - other
    +        return np.dot(diff, diff)
    +
    +    def toArray(self):
    +        return np.array(self.array)
    +
    +    def __getitem__(self, item):
    +        return self.array[item]
    +
    +    def __len__(self):
    +        return len(self.array)
    +
    +    def __str__(self):
    +        return "[" + ",".join([str(v) for v in self.array]) + "]"
    +
    +    def __repr__(self):
    +        return "DenseVector(%r)" % self.array
    +
    +    def __eq__(self, other):
    +        return isinstance(other, DenseVector) and self.array == other.array
    +
    +    def __ne__(self, other):
    +        return not self == other
    +
    +    def __getattr__(self, item):
    +        return getattr(self.array, item)
    +
    +
    +class SparseVector(Vector):
     
         """
         A simple sparse vector class for passing data to MLlib. Users may
    --- End diff --
    
    A few lines below this: doc typo: "tupes" --> "tuples"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [WIP] [MLlib] [PySpark] use pickl...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2378#issuecomment-55855160
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20449/consoleFull) for   PR 2378 at commit [`19d0967`](https://github.com/apache/spark/commit/19d096783b60e741173f48f2944d91f650616140).
     * This patch **passes** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [WIP] [MLlib] [PySpark] use pickl...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2378#discussion_r17574404
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala ---
    @@ -472,214 +452,140 @@ class PythonMLLibAPI extends Serializable {
           numRows: Long,
           numCols: Int,
           numPartitions: java.lang.Integer,
    -      seed: java.lang.Long): JavaRDD[Array[Byte]] = {
    +      seed: java.lang.Long): JavaRDD[Vector] = {
         val parts = getNumPartitionsOrDefault(numPartitions, jsc)
         val s = getSeedOrDefault(seed)
    -    RG.poissonVectorRDD(jsc.sc, mean, numRows, numCols, parts, s).map(SerDe.serializeDoubleVector)
    +    RG.poissonVectorRDD(jsc.sc, mean, numRows, numCols, parts, s)
       }
     
     }
     
     /**
    - * :: DeveloperApi ::
    - * MultivariateStatisticalSummary with Vector fields serialized.
    + * SerDe utility functions for PythonMLLibAPI.
      */
    -@DeveloperApi
    -class MultivariateStatisticalSummarySerialized(val summary: MultivariateStatisticalSummary)
    -  extends Serializable {
    +private[spark] object SerDe extends Serializable {
     
    -  def mean: Array[Byte] = SerDe.serializeDoubleVector(summary.mean)
    +  private[python] def reduce_object(out: OutputStream, pickler: Pickler,
    +                                    module: String, name: String, objects: Object*) = {
    +    out.write(Opcodes.GLOBAL)
    +    out.write((module + "\n" + name + "\n").getBytes)
    +    out.write(Opcodes.MARK)
    +    objects.foreach(pickler.save(_))
    +    out.write(Opcodes.TUPLE)
    +    out.write(Opcodes.REDUCE)
    +  }
     
    -  def variance: Array[Byte] = SerDe.serializeDoubleVector(summary.variance)
    +  private[python] class DenseVectorPickler extends IObjectPickler {
    +    def pickle(obj: Object, out: OutputStream, pickler: Pickler) = {
    +      val vector: DenseVector = obj.asInstanceOf[DenseVector]
    +      reduce_object(out, pickler, "pyspark.mllib.linalg", "DenseVector", vector.toArray)
    --- End diff --
    
    ditto: what is the cost of using class names?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2378#issuecomment-55860743
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/122/consoleFull) for   PR 2378 at commit [`e431377`](https://github.com/apache/spark/commit/e431377170172571974aadcae7ff42d3a79e2cd9).
     * This patch **passes** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2378#discussion_r17700471
  
    --- Diff: python/pyspark/mllib/linalg.py ---
    @@ -23,14 +23,148 @@
     SciPy is available in their environment.
     """
     
    -import numpy
    -from numpy import array, array_equal, ndarray, float64, int32
    +import sys
    +import array
    +import copy_reg
     
    +import numpy as np
     
    -__all__ = ['SparseVector', 'Vectors']
    +__all__ = ['Vector', 'DenseVector', 'SparseVector', 'Vectors']
     
     
    -class SparseVector(object):
    +if sys.version_info[:2] == (2, 7):
    +    # speed up pickling array in Python 2.7
    +    def fast_pickle_array(ar):
    +        return array.array, (ar.typecode, ar.tostring())
    +    copy_reg.pickle(array.array, fast_pickle_array)
    +
    +
    +# Check whether we have SciPy. MLlib works without it too, but if we have it, some methods,
    +# such as _dot and _serialize_double_vector, start to support scipy.sparse matrices.
    +
    +try:
    +    import scipy.sparse
    +    _have_scipy = True
    +except:
    +    # No SciPy in environment, but that's okay
    +    _have_scipy = False
    +
    +
    +def _convert_to_vector(l):
    +    if isinstance(l, Vector):
    +        return l
    +    elif type(l) in (array.array, np.array, np.ndarray, list, tuple):
    +        return DenseVector(l)
    +    elif _have_scipy and scipy.sparse.issparse(l):
    +        assert l.shape[1] == 1, "Expected column vector"
    +        csc = l.tocsc()
    +        return SparseVector(l.shape[0], csc.indices, csc.data)
    +    else:
    +        raise TypeError("Cannot convert type %s into Vector" % type(l))
    +
    +
    +class Vector(object):
    +    """
    +    Abstract class for DenseVector and SparseVector
    +    """
    +    def toArray(self):
    +        """
    +        Convert the vector into an numpy.ndarray
    +        :return: numpy.ndarray
    +        """
    +        raise NotImplementedError
    +
    +
    +class DenseVector(Vector):
    +    def __init__(self, ar):
    +        if not isinstance(ar, array.array):
    +            ar = array.array('d', ar)
    +        self.array = ar
    +
    +    def __reduce__(self):
    +        return DenseVector, (self.array,)
    +
    +    def dot(self, other):
    +        """
    +        Compute the dot product of two Vectors. We support
    +        (Numpy array, list, SparseVector, or SciPy sparse)
    +        and a target NumPy array that is either 1- or 2-dimensional.
    +        Equivalent to calling numpy.dot of the two vectors.
    +
    +        >>> dense = DenseVector(array.array('d', [1., 2.]))
    +        >>> dense.dot(dense)
    +        5.0
    +        >>> dense.dot(SparseVector(2, [0, 1], [2., 1.]))
    +        4.0
    +        >>> dense.dot(range(1, 3))
    +        5.0
    +        >>> dense.dot(np.array(range(1, 3)))
    +        5.0
    +        """
    +        if isinstance(other, SparseVector):
    +            return other.dot(self)
    +        elif _have_scipy and scipy.sparse.issparse(other):
    +            return other.transpose().dot(self.toArray())[0]
    +        elif isinstance(other, Vector):
    +            return np.dot(self.toArray(), other.toArray())
    --- End diff --
    
    (This question applies to other functions too.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [WIP] [MLlib] [PySpark] use pickl...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2378#issuecomment-55834191
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20433/consoleFull) for   PR 2378 at commit [`1fccf1a`](https://github.com/apache/spark/commit/1fccf1adc91e78a6c9e65f4ae14ba770a7eecd2c).
     * This patch **fails** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2378#discussion_r17698673
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala ---
    @@ -64,6 +64,12 @@ class DenseMatrix(val numRows: Int, val numCols: Int, val values: Array[Double])
       override def toArray: Array[Double] = values
     
       private[mllib] override def toBreeze: BM[Double] = new BDM[Double](numRows, numCols, values)
    +
    +  override def equals(o: Any) = o match {
    --- End diff --
    
    Should this not check values?  Even though that would be expensive, it should be necessary to match the expected behavior of equals().


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [WIP] [MLlib] [PySpark] use pickl...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2378#issuecomment-55517503
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20304/consoleFull) for   PR 2378 at commit [`b02e34f`](https://github.com/apache/spark/commit/b02e34f53f8e0ba992477b20def58ddf356aa3f1).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [WIP] [MLlib] [PySpark] use pickl...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2378#issuecomment-55518966
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20302/consoleFull) for   PR 2378 at commit [`4d7963e`](https://github.com/apache/spark/commit/4d7963ef91851fba280025b0778f0583fe819c55).
     * This patch **fails** unit tests.
     * This patch **does not** merge cleanly!



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [WIP] [MLlib] [PySpark] use pickl...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2378#discussion_r17574827
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala ---
    @@ -472,214 +452,140 @@ class PythonMLLibAPI extends Serializable {
           numRows: Long,
           numCols: Int,
           numPartitions: java.lang.Integer,
    -      seed: java.lang.Long): JavaRDD[Array[Byte]] = {
    +      seed: java.lang.Long): JavaRDD[Vector] = {
         val parts = getNumPartitionsOrDefault(numPartitions, jsc)
         val s = getSeedOrDefault(seed)
    -    RG.poissonVectorRDD(jsc.sc, mean, numRows, numCols, parts, s).map(SerDe.serializeDoubleVector)
    +    RG.poissonVectorRDD(jsc.sc, mean, numRows, numCols, parts, s)
       }
     
     }
     
     /**
    - * :: DeveloperApi ::
    - * MultivariateStatisticalSummary with Vector fields serialized.
    + * SerDe utility functions for PythonMLLibAPI.
      */
    -@DeveloperApi
    -class MultivariateStatisticalSummarySerialized(val summary: MultivariateStatisticalSummary)
    -  extends Serializable {
    +private[spark] object SerDe extends Serializable {
     
    -  def mean: Array[Byte] = SerDe.serializeDoubleVector(summary.mean)
    +  private[python] def reduce_object(out: OutputStream, pickler: Pickler,
    +                                    module: String, name: String, objects: Object*) = {
    +    out.write(Opcodes.GLOBAL)
    +    out.write((module + "\n" + name + "\n").getBytes)
    +    out.write(Opcodes.MARK)
    +    objects.foreach(pickler.save(_))
    +    out.write(Opcodes.TUPLE)
    +    out.write(Opcodes.REDUCE)
    +  }
     
    -  def variance: Array[Byte] = SerDe.serializeDoubleVector(summary.variance)
    +  private[python] class DenseVectorPickler extends IObjectPickler {
    +    def pickle(obj: Object, out: OutputStream, pickler: Pickler) = {
    +      val vector: DenseVector = obj.asInstanceOf[DenseVector]
    +      reduce_object(out, pickler, "pyspark.mllib.linalg", "DenseVector", vector.toArray)
    +    }
    +  }
     
    -  def count: Long = summary.count
    +  private[python] class DenseVectorConstructor extends IObjectConstructor {
    +    def construct(args: Array[Object]) :Object = {
    +      require(args.length == 1)
    +      new DenseVector(args(0).asInstanceOf[Array[Double]])
    +    }
    +  }
    +
    +  private[python] class DenseMatrixPickler extends IObjectPickler {
    +    def pickle(obj: Object, out: OutputStream, pickler: Pickler) = {
    +      val m: DenseMatrix = obj.asInstanceOf[DenseMatrix]
    +      reduce_object(out, pickler, "pyspark.mllib.linalg", "DenseMatrix",
    +        m.numRows.asInstanceOf[Object], m.numCols.asInstanceOf[Object], m.values)
    +    }
    +  }
     
    -  def numNonzeros: Array[Byte] = SerDe.serializeDoubleVector(summary.numNonzeros)
    +  private[python] class DenseMatrixConstructor extends IObjectConstructor {
    +    def construct(args: Array[Object]) :Object = {
    +      require(args.length == 3)
    +      new DenseMatrix(args(0).asInstanceOf[Int], args(1).asInstanceOf[Int],
    +        args(2).asInstanceOf[Array[Double]])
    +    }
    +  }
     
    -  def max: Array[Byte] = SerDe.serializeDoubleVector(summary.max)
    +  private[python] class SparseVectorPickler extends IObjectPickler {
    +    def pickle(obj: Object, out: OutputStream, pickler: Pickler) = {
    +      val v: SparseVector = obj.asInstanceOf[SparseVector]
    +      reduce_object(out, pickler, "pyspark.mllib.linalg", "SparseVector",
    +        v.size.asInstanceOf[Object], v.indices, v.values)
    +    }
    +  }
     
    -  def min: Array[Byte] = SerDe.serializeDoubleVector(summary.min)
    -}
    +  private[python] class SparseVectorConstructor extends IObjectConstructor {
    +    def construct(args: Array[Object]) :Object = {
    +      require(args.length == 3)
    +      new SparseVector(args(0).asInstanceOf[Int], args(1).asInstanceOf[Array[Int]],
    +        args(2).asInstanceOf[Array[Double]])
    +    }
    +  }
     
    -/**
    - * SerDe utility functions for PythonMLLibAPI.
    - */
    -private[spark] object SerDe extends Serializable {
    -  private val DENSE_VECTOR_MAGIC: Byte = 1
    -  private val SPARSE_VECTOR_MAGIC: Byte = 2
    -  private val DENSE_MATRIX_MAGIC: Byte = 3
    -  private val LABELED_POINT_MAGIC: Byte = 4
    -
    -  private[python] def deserializeDoubleVector(bytes: Array[Byte], offset: Int = 0): Vector = {
    -    require(bytes.length - offset >= 5, "Byte array too short")
    -    val magic = bytes(offset)
    -    if (magic == DENSE_VECTOR_MAGIC) {
    -      deserializeDenseVector(bytes, offset)
    -    } else if (magic == SPARSE_VECTOR_MAGIC) {
    -      deserializeSparseVector(bytes, offset)
    -    } else {
    -      throw new IllegalArgumentException("Magic " + magic + " is wrong.")
    +  private[python] class LabeledPointPickler extends IObjectPickler {
    +    def pickle(obj: Object, out: OutputStream, pickler: Pickler) = {
    +      val point: LabeledPoint = obj.asInstanceOf[LabeledPoint]
    +      reduce_object(out, pickler, "pyspark.mllib.regression", "LabeledPoint",
    +        point.label.asInstanceOf[Object], point.features)
         }
       }
     
    -  private[python] def deserializeDouble(bytes: Array[Byte], offset: Int = 0): Double = {
    -    require(bytes.length - offset == 8, "Wrong size byte array for Double")
    -    val bb = ByteBuffer.wrap(bytes, offset, bytes.length - offset)
    -    bb.order(ByteOrder.nativeOrder())
    -    bb.getDouble
    -  }
    -
    -  private[python] def deserializeDenseVector(bytes: Array[Byte], offset: Int = 0): Vector = {
    -    val packetLength = bytes.length - offset
    -    require(packetLength >= 5, "Byte array too short")
    -    val bb = ByteBuffer.wrap(bytes, offset, bytes.length - offset)
    -    bb.order(ByteOrder.nativeOrder())
    -    val magic = bb.get()
    -    require(magic == DENSE_VECTOR_MAGIC, "Invalid magic: " + magic)
    -    val length = bb.getInt()
    -    require (packetLength == 5 + 8 * length, "Invalid packet length: " + packetLength)
    -    val db = bb.asDoubleBuffer()
    -    val ans = new Array[Double](length.toInt)
    -    db.get(ans)
    -    Vectors.dense(ans)
    -  }
    -
    -  private[python] def deserializeSparseVector(bytes: Array[Byte], offset: Int = 0): Vector = {
    -    val packetLength = bytes.length - offset
    -    require(packetLength >= 9, "Byte array too short")
    -    val bb = ByteBuffer.wrap(bytes, offset, bytes.length - offset)
    -    bb.order(ByteOrder.nativeOrder())
    -    val magic = bb.get()
    -    require(magic == SPARSE_VECTOR_MAGIC, "Invalid magic: " + magic)
    -    val size = bb.getInt()
    -    val nonZeros = bb.getInt()
    -    require (packetLength == 9 + 12 * nonZeros, "Invalid packet length: " + packetLength)
    -    val ib = bb.asIntBuffer()
    -    val indices = new Array[Int](nonZeros)
    -    ib.get(indices)
    -    bb.position(bb.position() + 4 * nonZeros)
    -    val db = bb.asDoubleBuffer()
    -    val values = new Array[Double](nonZeros)
    -    db.get(values)
    -    Vectors.sparse(size, indices, values)
    +  private[python] class LabeledPointConstructor extends IObjectConstructor {
    +    def construct(args: Array[Object]) :Object = {
    +      if (args.length != 2) {
    +        throw new PickleException("should be 2")
    +      }
    +      new LabeledPoint(args(0).asInstanceOf[Double], args(1).asInstanceOf[Vector])
    +    }
       }
     
       /**
    -   * Returns an 8-byte array for the input Double.
    -   *
    -   * Note: we currently do not use a magic byte for double for storage efficiency.
    -   * This should be reconsidered when we add Ser/De for other 8-byte types (e.g. Long), for safety.
    -   * The corresponding deserializer, deserializeDouble, needs to be modified as well if the
    -   * serialization scheme changes.
    +   * Pickle Rating
        */
    -  private[python] def serializeDouble(double: Double): Array[Byte] = {
    -    val bytes = new Array[Byte](8)
    -    val bb = ByteBuffer.wrap(bytes)
    -    bb.order(ByteOrder.nativeOrder())
    -    bb.putDouble(double)
    -    bytes
    -  }
    -
    -  private[python] def serializeDenseVector(doubles: Array[Double]): Array[Byte] = {
    -    val len = doubles.length
    -    val bytes = new Array[Byte](5 + 8 * len)
    -    val bb = ByteBuffer.wrap(bytes)
    -    bb.order(ByteOrder.nativeOrder())
    -    bb.put(DENSE_VECTOR_MAGIC)
    -    bb.putInt(len)
    -    val db = bb.asDoubleBuffer()
    -    db.put(doubles)
    -    bytes
    -  }
    -
    -  private[python] def serializeSparseVector(vector: SparseVector): Array[Byte] = {
    -    val nonZeros = vector.indices.length
    -    val bytes = new Array[Byte](9 + 12 * nonZeros)
    -    val bb = ByteBuffer.wrap(bytes)
    -    bb.order(ByteOrder.nativeOrder())
    -    bb.put(SPARSE_VECTOR_MAGIC)
    -    bb.putInt(vector.size)
    -    bb.putInt(nonZeros)
    -    val ib = bb.asIntBuffer()
    -    ib.put(vector.indices)
    -    bb.position(bb.position() + 4 * nonZeros)
    -    val db = bb.asDoubleBuffer()
    -    db.put(vector.values)
    -    bytes
    -  }
    -
    -  private[python] def serializeDoubleVector(vector: Vector): Array[Byte] = vector match {
    -    case s: SparseVector =>
    -      serializeSparseVector(s)
    -    case _ =>
    -      serializeDenseVector(vector.toArray)
    -  }
    -
    -  private[python] def deserializeDoubleMatrix(bytes: Array[Byte]): Array[Array[Double]] = {
    -    val packetLength = bytes.length
    -    if (packetLength < 9) {
    -      throw new IllegalArgumentException("Byte array too short.")
    +  private[python] class RatingPickler extends IObjectPickler {
    +    def pickle(obj: Object, out: OutputStream, pickler: Pickler) = {
    +      val rating: Rating = obj.asInstanceOf[Rating]
    +      reduce_object(out, pickler, "pyspark.mllib.recommendation", "Rating",
    +        rating.user.asInstanceOf[Object], rating.product.asInstanceOf[Object],
    +        rating.rating.asInstanceOf[Object])
         }
    -    val bb = ByteBuffer.wrap(bytes)
    -    bb.order(ByteOrder.nativeOrder())
    -    val magic = bb.get()
    -    if (magic != DENSE_MATRIX_MAGIC) {
    -      throw new IllegalArgumentException("Magic " + magic + " is wrong.")
    -    }
    -    val rows = bb.getInt()
    -    val cols = bb.getInt()
    -    if (packetLength != 9 + 8 * rows * cols) {
    -      throw new IllegalArgumentException("Size " + rows + "x" + cols + " is wrong.")
    -    }
    -    val db = bb.asDoubleBuffer()
    -    val ans = new Array[Array[Double]](rows.toInt)
    -    for (i <- 0 until rows.toInt) {
    -      ans(i) = new Array[Double](cols.toInt)
    -      db.get(ans(i))
    -    }
    -    ans
       }
     
    -  private[python] def serializeDoubleMatrix(doubles: Array[Array[Double]]): Array[Byte] = {
    -    val rows = doubles.length
    -    var cols = 0
    -    if (rows > 0) {
    -      cols = doubles(0).length
    -    }
    -    val bytes = new Array[Byte](9 + 8 * rows * cols)
    -    val bb = ByteBuffer.wrap(bytes)
    -    bb.order(ByteOrder.nativeOrder())
    -    bb.put(DENSE_MATRIX_MAGIC)
    -    bb.putInt(rows)
    -    bb.putInt(cols)
    -    val db = bb.asDoubleBuffer()
    -    for (i <- 0 until rows) {
    -      db.put(doubles(i))
    +  /**
    +   * Unpickle Rating
    +   */
    +  private[python] class RatingConstructor extends IObjectConstructor {
    +    def construct(args: Array[Object]) :Object = {
    +      if (args.length != 3) {
    +        throw new PickleException("should be 3")
    +      }
    +      new Rating(args(0).asInstanceOf[Int], args(1).asInstanceOf[Int],
    +        args(2).asInstanceOf[Double])
         }
    -    bytes
       }
     
    -  private[python] def serializeLabeledPoint(p: LabeledPoint): Array[Byte] = {
    -    val fb = serializeDoubleVector(p.features)
    -    val bytes = new Array[Byte](1 + 8 + fb.length)
    -    val bb = ByteBuffer.wrap(bytes)
    -    bb.order(ByteOrder.nativeOrder())
    -    bb.put(LABELED_POINT_MAGIC)
    -    bb.putDouble(p.label)
    -    bb.put(fb)
    -    bytes
    +  def initialize() = {
    +    Pickler.registerCustomPickler(classOf[DenseVector], new DenseVectorPickler)
    +    Pickler.registerCustomPickler(classOf[DenseMatrix], new DenseMatrixPickler)
    +    Pickler.registerCustomPickler(classOf[SparseVector], new SparseVectorPickler)
    +    Pickler.registerCustomPickler(classOf[LabeledPoint], new LabeledPointPickler)
    +    Pickler.registerCustomPickler(classOf[Rating], new RatingPickler)
    +    Unpickler.registerConstructor("pyspark.mllib.linalg", "DenseVector",
    +      new DenseVectorConstructor)
    +    Unpickler.registerConstructor("pyspark.mllib.linalg", "DenseMatrix",
    +      new DenseMatrixConstructor)
    +    Unpickler.registerConstructor("pyspark.mllib.linalg", "SparseVector",
    +      new SparseVectorConstructor)
    +    Unpickler.registerConstructor("pyspark.mllib.regression", "LabeledPoint",
    +      new LabeledPointConstructor)
    +    Unpickler.registerConstructor("pyspark.mllib.recommendation", "Rating", new RatingConstructor)
       }
     
    -  private[python] def deserializeLabeledPoint(bytes: Array[Byte]): LabeledPoint = {
    -    require(bytes.length >= 9, "Byte array too short")
    -    val magic = bytes(0)
    -    if (magic != LABELED_POINT_MAGIC) {
    -      throw new IllegalArgumentException("Magic " + magic + " is wrong.")
    -    }
    -    val labelBytes = ByteBuffer.wrap(bytes, 1, 8)
    -    labelBytes.order(ByteOrder.nativeOrder())
    -    val label = labelBytes.asDoubleBuffer().get(0)
    -    LabeledPoint(label, deserializeDoubleVector(bytes, 9))
    +  def dumps(obj: AnyRef): Array[Byte] = {
    +    new Pickler().dumps(obj)
    --- End diff --
    
    In Python, `dumps` and `loads` deal with strings. Do we have `load` and `dump` defined in `Pickler` and `Unpickler`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2378#issuecomment-56114946
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20554/consoleFull) for   PR 2378 at commit [`032cd62`](https://github.com/apache/spark/commit/032cd62cee6b2bd134f6b9017a7e68ef333990a5).
     * This patch **fails** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [WIP] [MLlib] [PySpark] use pickl...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2378#issuecomment-55709985
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20377/consoleFull) for   PR 2378 at commit [`44736d7`](https://github.com/apache/spark/commit/44736d7d849a523419006b565cf51fa732e8854c).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2378#discussion_r17752465
  
    --- Diff: python/pyspark/mllib/linalg.py ---
    @@ -23,14 +23,148 @@
     SciPy is available in their environment.
     """
     
    -import numpy
    -from numpy import array, array_equal, ndarray, float64, int32
    +import sys
    +import array
    +import copy_reg
     
    +import numpy as np
     
    -__all__ = ['SparseVector', 'Vectors']
    +__all__ = ['Vector', 'DenseVector', 'SparseVector', 'Vectors']
     
     
    -class SparseVector(object):
    +if sys.version_info[:2] == (2, 7):
    +    # speed up pickling array in Python 2.7
    +    def fast_pickle_array(ar):
    +        return array.array, (ar.typecode, ar.tostring())
    +    copy_reg.pickle(array.array, fast_pickle_array)
    +
    +
    +# Check whether we have SciPy. MLlib works without it too, but if we have it, some methods,
    +# such as _dot and _serialize_double_vector, start to support scipy.sparse matrices.
    +
    +try:
    +    import scipy.sparse
    +    _have_scipy = True
    +except:
    +    # No SciPy in environment, but that's okay
    +    _have_scipy = False
    +
    +
    +def _convert_to_vector(l):
    +    if isinstance(l, Vector):
    +        return l
    +    elif type(l) in (array.array, np.array, np.ndarray, list, tuple):
    +        return DenseVector(l)
    +    elif _have_scipy and scipy.sparse.issparse(l):
    +        assert l.shape[1] == 1, "Expected column vector"
    +        csc = l.tocsc()
    +        return SparseVector(l.shape[0], csc.indices, csc.data)
    +    else:
    +        raise TypeError("Cannot convert type %s into Vector" % type(l))
    +
    +
    +class Vector(object):
    +    """
    +    Abstract class for DenseVector and SparseVector
    +    """
    +    def toArray(self):
    +        """
    +        Convert the vector into an numpy.ndarray
    +        :return: numpy.ndarray
    +        """
    +        raise NotImplementedError
    +
    +
    +class DenseVector(Vector):
    +    def __init__(self, ar):
    +        if not isinstance(ar, array.array):
    +            ar = array.array('d', ar)
    +        self.array = ar
    +
    +    def __reduce__(self):
    +        return DenseVector, (self.array,)
    +
    +    def dot(self, other):
    +        """
    +        Compute the dot product of two Vectors. We support
    +        (Numpy array, list, SparseVector, or SciPy sparse)
    +        and a target NumPy array that is either 1- or 2-dimensional.
    +        Equivalent to calling numpy.dot of the two vectors.
    +
    +        >>> dense = DenseVector(array.array('d', [1., 2.]))
    +        >>> dense.dot(dense)
    +        5.0
    +        >>> dense.dot(SparseVector(2, [0, 1], [2., 1.]))
    +        4.0
    +        >>> dense.dot(range(1, 3))
    +        5.0
    +        >>> dense.dot(np.array(range(1, 3)))
    +        5.0
    +        """
    +        if isinstance(other, SparseVector):
    +            return other.dot(self)
    +        elif _have_scipy and scipy.sparse.issparse(other):
    +            return other.transpose().dot(self.toArray())[0]
    +        elif isinstance(other, Vector):
    +            return np.dot(self.toArray(), other.toArray())
    --- End diff --
    
    Yes, there is performance penalty right now. In the case of most of the algorithm are run inside Scala, these type conversion should not be the bottle necks. If we do see this to slow down something, will improve them later.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [WIP] [MLlib] [PySpark] use pickl...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2378#issuecomment-55668739
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/109/consoleFull) for   PR 2378 at commit [`722dd96`](https://github.com/apache/spark/commit/722dd96976d6a083b0ddb985ac6c518c791bce39).
     * This patch **does not** merge cleanly!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [WIP] [MLlib] [PySpark] use pickl...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2378#issuecomment-55707384
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20376/consoleFull) for   PR 2378 at commit [`e1d1bfc`](https://github.com/apache/spark/commit/e1d1bfce4b464e6b14f649081155faf7c4d28471).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on the pull request:

    https://github.com/apache/spark/pull/2378#issuecomment-56147622
  
    @davies Does `PickleSerializer` compress data? If not, maybe we should cache the deserialized RDD instead of the one from `_.reserialize`. They have the same storage. I understand that batch-serialization can help GC. But algorithms like linear methods should only allocate short-lived objects. Is batch-serialization worth the tradeoff?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [WIP] [MLlib] [PySpark] use pickl...

Posted by JoshRosen <gi...@git.apache.org>.
Github user JoshRosen commented on the pull request:

    https://github.com/apache/spark/pull/2378#issuecomment-55685928
  
    Just merged #2365 in case you want to rebase.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [WIP] [MLlib] [PySpark] use pickl...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:

    https://github.com/apache/spark/pull/2378#issuecomment-55517521
  
    @mengxr The new approach is almost ready, please take a quick look. I will do some refactor later.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on the pull request:

    https://github.com/apache/spark/pull/2378#issuecomment-56136476
  
    test this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2378#issuecomment-56110091
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20551/consoleFull) for   PR 2378 at commit [`bd738ab`](https://github.com/apache/spark/commit/bd738abee534f467c0fb707f414d854af128fec5).
     * This patch **fails** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2378#discussion_r17686849
  
    --- Diff: python/pyspark/mllib/recommendation.py ---
    @@ -54,34 +64,51 @@ def __del__(self):
         def predict(self, user, product):
             return self._java_model.predict(user, product)
     
    -    def predictAll(self, usersProducts):
    -        usersProductsJRDD = _get_unmangled_rdd(usersProducts, _serialize_tuple)
    -        return RDD(self._java_model.predict(usersProductsJRDD._jrdd),
    -                   self._context, RatingDeserializer())
    +    def predictAll(self, user_product):
    +        assert isinstance(user_product, RDD), "user_product should be RDD of (user, product)"
    +        sc = self._context
    +        tuplerdd = sc._jvm.SerDe.asTupleRDD(user_product._to_java_object_rdd().rdd())
    +        jresult = self._java_model.predict(tuplerdd).toJavaRDD()
    +        return RDD(sc._jvm.PythonRDD.javaToPython(jresult), sc,
    +                   AutoBatchedSerializer(PickleSerializer()))
     
     
     class ALS(object):
     
         @classmethod
    +    def _prepare(cls, ratings):
    +        assert isinstance(ratings, RDD), "ratings should be RDD"
    +        first = ratings.first()
    +        if not isinstance(first, Rating):
    +            if isinstance(first, (tuple, list)):
    +                ratings = ratings.map(lambda x: Rating(*x))
    +            else:
    +                raise ValueError("rating should be RDD of Rating or tuple/list")
    +        # serialize them by AutoBatchedSerializer before cache to reduce the
    +        # objects overhead in JVM
    +        cached = ratings._reserialize(AutoBatchedSerializer(PickleSerializer())).cache()
    --- End diff --
    
    I'm not sure, caching will be better for small datasets, but worse for large dataset (may cause OOM someway).
    
    In this PR, it will keep the same behavior as before if I'm not sure to deal with it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2378#discussion_r17694320
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala ---
    @@ -40,11 +43,11 @@ import org.apache.spark.mllib.util.MLUtils
     import org.apache.spark.rdd.RDD
     import org.apache.spark.util.Utils
     
    +import scala.reflect.ClassTag
    --- End diff --
    
    Imports out of order.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [WIP] [MLlib] [PySpark] use pickl...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2378#issuecomment-55486079
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20265/consoleFull) for   PR 2378 at commit [`f1544c4`](https://github.com/apache/spark/commit/f1544c47917836d7ef77353c467182cc5cc7addb).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2378#discussion_r17752055
  
    --- Diff: python/pyspark/mllib/linalg.py ---
    @@ -257,10 +410,34 @@ def stringify(vector):
             >>> Vectors.stringify(Vectors.dense([0.0, 1.0]))
             '[0.0,1.0]'
             """
    -        if type(vector) == SparseVector:
    -            return str(vector)
    -        else:
    -            return "[" + ",".join([str(v) for v in vector]) + "]"
    +        return str(vector)
    +
    +
    +class Matrix(Vector):
    --- End diff --
    
    In the past, we use ndarray for both vector and matrix, the _dot(matrix, matrix) is provided by ndarray.
    
    So how to do with _dot(vector, matrix) and _dot(matrix, matrix) ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2378#discussion_r17693196
  
    --- Diff: python/pyspark/mllib/recommendation.py ---
    @@ -54,34 +64,51 @@ def __del__(self):
         def predict(self, user, product):
             return self._java_model.predict(user, product)
     
    -    def predictAll(self, usersProducts):
    -        usersProductsJRDD = _get_unmangled_rdd(usersProducts, _serialize_tuple)
    -        return RDD(self._java_model.predict(usersProductsJRDD._jrdd),
    -                   self._context, RatingDeserializer())
    +    def predictAll(self, user_product):
    +        assert isinstance(user_product, RDD), "user_product should be RDD of (user, product)"
    +        sc = self._context
    +        tuplerdd = sc._jvm.SerDe.asTupleRDD(user_product._to_java_object_rdd().rdd())
    +        jresult = self._java_model.predict(tuplerdd).toJavaRDD()
    +        return RDD(sc._jvm.PythonRDD.javaToPython(jresult), sc,
    +                   AutoBatchedSerializer(PickleSerializer()))
     
     
     class ALS(object):
     
         @classmethod
    +    def _prepare(cls, ratings):
    +        assert isinstance(ratings, RDD), "ratings should be RDD"
    +        first = ratings.first()
    +        if not isinstance(first, Rating):
    +            if isinstance(first, (tuple, list)):
    +                ratings = ratings.map(lambda x: Rating(*x))
    +            else:
    +                raise ValueError("rating should be RDD of Rating or tuple/list")
    +        # serialize them by AutoBatchedSerializer before cache to reduce the
    +        # objects overhead in JVM
    +        cached = ratings._reserialize(AutoBatchedSerializer(PickleSerializer())).cache()
    --- End diff --
    
    Concerning DecisionTree, this is something to be tested and possibly fixed.  I suspect it will be better to cache, but need to test.  Some tests do show that the final read and persist take much longer than the initial reads, so caching will not help.  But other tests indicate it may be worthwhile.  I think it is data-dependent, as suggested above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [WIP] [MLlib] [PySpark] use pickl...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2378#issuecomment-55517418
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20302/consoleFull) for   PR 2378 at commit [`4d7963e`](https://github.com/apache/spark/commit/4d7963ef91851fba280025b0778f0583fe819c55).
     * This patch **does not** merge cleanly!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [WIP] [MLlib] [PySpark] use pickl...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2378#issuecomment-55484501
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20259/consoleFull) for   PR 2378 at commit [`b30ef35`](https://github.com/apache/spark/commit/b30ef35ec7830cee08b4f8d692da26d98cac70e8).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [WIP] [MLlib] [PySpark] use pickl...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2378#issuecomment-55531127
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20307/consoleFull) for   PR 2378 at commit [`0ee1525`](https://github.com/apache/spark/commit/0ee1525054e6ab75ef4b456fe1de148ef866de4e).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2378#issuecomment-56122608
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20560/consoleFull) for   PR 2378 at commit [`810f97f`](https://github.com/apache/spark/commit/810f97f53befdb262e55e736500d909b5f869f1a).
     * This patch **passes** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [WIP] [MLlib] [PySpark] use pickl...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on the pull request:

    https://github.com/apache/spark/pull/2378#issuecomment-55795929
  
    test this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2378#issuecomment-56117852
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20560/consoleFull) for   PR 2378 at commit [`810f97f`](https://github.com/apache/spark/commit/810f97f53befdb262e55e736500d909b5f869f1a).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2378#discussion_r17698519
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala ---
    @@ -476,259 +436,167 @@ class PythonMLLibAPI extends Serializable {
           numRows: Long,
           numCols: Int,
           numPartitions: java.lang.Integer,
    -      seed: java.lang.Long): JavaRDD[Array[Byte]] = {
    +      seed: java.lang.Long): JavaRDD[Vector] = {
         val parts = getNumPartitionsOrDefault(numPartitions, jsc)
         val s = getSeedOrDefault(seed)
    -    RG.poissonVectorRDD(jsc.sc, mean, numRows, numCols, parts, s).map(SerDe.serializeDoubleVector)
    +    RG.poissonVectorRDD(jsc.sc, mean, numRows, numCols, parts, s)
       }
     
     }
     
     /**
    - * :: DeveloperApi ::
    - * MultivariateStatisticalSummary with Vector fields serialized.
    + * SerDe utility functions for PythonMLLibAPI.
      */
    -@DeveloperApi
    -class MultivariateStatisticalSummarySerialized(val summary: MultivariateStatisticalSummary)
    -  extends Serializable {
    +private[spark] object SerDe extends Serializable {
     
    -  def mean: Array[Byte] = SerDe.serializeDoubleVector(summary.mean)
    +  val PYSPARK_PACKAGE = "pyspark.mllib"
     
    -  def variance: Array[Byte] = SerDe.serializeDoubleVector(summary.variance)
    +  /**
    +   * Base class used for pickle
    +   */
    +  private[python] abstract class BasePickler[T: ClassTag]
    +    extends IObjectPickler with IObjectConstructor {
    +
    +    private val cls = implicitly[ClassTag[T]].runtimeClass
    +    private val module = PYSPARK_PACKAGE + "." + cls.getName.split('.')(4)
    +    private val name = cls.getSimpleName
    +
    +    // register this to Pickler and Unpickler
    +    def register(): Unit = {
    +      Pickler.registerCustomPickler(this.getClass, this)
    +      Pickler.registerCustomPickler(cls, this)
    +      Unpickler.registerConstructor(module, name, this)
    +    }
     
    -  def count: Long = summary.count
    +    def pickle(obj: Object, out: OutputStream, pickler: Pickler): Unit = {
    +      if (obj == this) {
    +        out.write(Opcodes.GLOBAL)
    +        out.write((module + "\n" + name + "\n").getBytes())
    +      } else {
    +        pickler.save(this)  // it will be memorized by Pickler
    +        saveState(obj, out, pickler)
    +        out.write(Opcodes.REDUCE)
    +      }
    +    }
    +
    +    private[python] def saveObjects(out: OutputStream, pickler: Pickler,
    +                                    objects: Any*) = {
    +      if (objects.length == 0 || objects.length > 3) {
    +        out.write(Opcodes.MARK)
    +      }
    +      objects.foreach(pickler.save(_))
    +      val code = objects.length match {
    +        case 1 => Opcodes.TUPLE1
    +        case 2 => Opcodes.TUPLE2
    +        case 3 => Opcodes.TUPLE3
    +        case _ => Opcodes.TUPLE
    +      }
    +      out.write(code)
    +    }
     
    -  def numNonzeros: Array[Byte] = SerDe.serializeDoubleVector(summary.numNonzeros)
    +    private[python] def saveState(obj: Object, out: OutputStream, pickler: Pickler)
    +  }
     
    -  def max: Array[Byte] = SerDe.serializeDoubleVector(summary.max)
    +  // Pickler for DenseVector
    +  private[python] class DenseVectorPickler
    +    extends BasePickler[DenseVector] {
     
    -  def min: Array[Byte] = SerDe.serializeDoubleVector(summary.min)
    -}
    +    def saveState(obj: Object, out: OutputStream, pickler: Pickler) = {
    +      val vector: DenseVector = obj.asInstanceOf[DenseVector]
    +      saveObjects(out, pickler, vector.toArray)
    +    }
     
    -/**
    - * SerDe utility functions for PythonMLLibAPI.
    - */
    -private[spark] object SerDe extends Serializable {
    -  private val DENSE_VECTOR_MAGIC: Byte = 1
    -  private val SPARSE_VECTOR_MAGIC: Byte = 2
    -  private val DENSE_MATRIX_MAGIC: Byte = 3
    -  private val LABELED_POINT_MAGIC: Byte = 4
    -
    -  private[python] def deserializeDoubleVector(bytes: Array[Byte], offset: Int = 0): Vector = {
    -    require(bytes.length - offset >= 5, "Byte array too short")
    -    val magic = bytes(offset)
    -    if (magic == DENSE_VECTOR_MAGIC) {
    -      deserializeDenseVector(bytes, offset)
    -    } else if (magic == SPARSE_VECTOR_MAGIC) {
    -      deserializeSparseVector(bytes, offset)
    -    } else {
    -      throw new IllegalArgumentException("Magic " + magic + " is wrong.")
    +    def construct(args: Array[Object]) :Object = {
    +      require(args.length == 1)
    +      new DenseVector(args(0).asInstanceOf[Array[Double]])
         }
       }
     
    -  private[python] def deserializeDouble(bytes: Array[Byte], offset: Int = 0): Double = {
    -    require(bytes.length - offset == 8, "Wrong size byte array for Double")
    -    val bb = ByteBuffer.wrap(bytes, offset, bytes.length - offset)
    -    bb.order(ByteOrder.nativeOrder())
    -    bb.getDouble
    -  }
    -
    -  private[python] def deserializeDenseVector(bytes: Array[Byte], offset: Int = 0): Vector = {
    -    val packetLength = bytes.length - offset
    -    require(packetLength >= 5, "Byte array too short")
    -    val bb = ByteBuffer.wrap(bytes, offset, bytes.length - offset)
    -    bb.order(ByteOrder.nativeOrder())
    -    val magic = bb.get()
    -    require(magic == DENSE_VECTOR_MAGIC, "Invalid magic: " + magic)
    -    val length = bb.getInt()
    -    require (packetLength == 5 + 8 * length, "Invalid packet length: " + packetLength)
    -    val db = bb.asDoubleBuffer()
    -    val ans = new Array[Double](length.toInt)
    -    db.get(ans)
    -    Vectors.dense(ans)
    -  }
    -
    -  private[python] def deserializeSparseVector(bytes: Array[Byte], offset: Int = 0): Vector = {
    -    val packetLength = bytes.length - offset
    -    require(packetLength >= 9, "Byte array too short")
    -    val bb = ByteBuffer.wrap(bytes, offset, bytes.length - offset)
    -    bb.order(ByteOrder.nativeOrder())
    -    val magic = bb.get()
    -    require(magic == SPARSE_VECTOR_MAGIC, "Invalid magic: " + magic)
    -    val size = bb.getInt()
    -    val nonZeros = bb.getInt()
    -    require (packetLength == 9 + 12 * nonZeros, "Invalid packet length: " + packetLength)
    -    val ib = bb.asIntBuffer()
    -    val indices = new Array[Int](nonZeros)
    -    ib.get(indices)
    -    bb.position(bb.position() + 4 * nonZeros)
    -    val db = bb.asDoubleBuffer()
    -    val values = new Array[Double](nonZeros)
    -    db.get(values)
    -    Vectors.sparse(size, indices, values)
    -  }
    +  // Pickler for DenseMatrix
    +  private[python] class DenseMatrixPickler
    +    extends BasePickler[DenseMatrix] {
     
    -  /**
    -   * Returns an 8-byte array for the input Double.
    -   *
    -   * Note: we currently do not use a magic byte for double for storage efficiency.
    -   * This should be reconsidered when we add Ser/De for other 8-byte types (e.g. Long), for safety.
    -   * The corresponding deserializer, deserializeDouble, needs to be modified as well if the
    -   * serialization scheme changes.
    -   */
    -  private[python] def serializeDouble(double: Double): Array[Byte] = {
    -    val bytes = new Array[Byte](8)
    -    val bb = ByteBuffer.wrap(bytes)
    -    bb.order(ByteOrder.nativeOrder())
    -    bb.putDouble(double)
    -    bytes
    -  }
    -
    -  private[python] def serializeDenseVector(doubles: Array[Double]): Array[Byte] = {
    -    val len = doubles.length
    -    val bytes = new Array[Byte](5 + 8 * len)
    -    val bb = ByteBuffer.wrap(bytes)
    -    bb.order(ByteOrder.nativeOrder())
    -    bb.put(DENSE_VECTOR_MAGIC)
    -    bb.putInt(len)
    -    val db = bb.asDoubleBuffer()
    -    db.put(doubles)
    -    bytes
    -  }
    -
    -  private[python] def serializeSparseVector(vector: SparseVector): Array[Byte] = {
    -    val nonZeros = vector.indices.length
    -    val bytes = new Array[Byte](9 + 12 * nonZeros)
    -    val bb = ByteBuffer.wrap(bytes)
    -    bb.order(ByteOrder.nativeOrder())
    -    bb.put(SPARSE_VECTOR_MAGIC)
    -    bb.putInt(vector.size)
    -    bb.putInt(nonZeros)
    -    val ib = bb.asIntBuffer()
    -    ib.put(vector.indices)
    -    bb.position(bb.position() + 4 * nonZeros)
    -    val db = bb.asDoubleBuffer()
    -    db.put(vector.values)
    -    bytes
    -  }
    -
    -  private[python] def serializeDoubleVector(vector: Vector): Array[Byte] = vector match {
    -    case s: SparseVector =>
    -      serializeSparseVector(s)
    -    case _ =>
    -      serializeDenseVector(vector.toArray)
    -  }
    -
    -  private[python] def deserializeDoubleMatrix(bytes: Array[Byte]): Array[Array[Double]] = {
    -    val packetLength = bytes.length
    -    if (packetLength < 9) {
    -      throw new IllegalArgumentException("Byte array too short.")
    +    def saveState(obj: Object, out: OutputStream, pickler: Pickler) = {
    +      val m: DenseMatrix = obj.asInstanceOf[DenseMatrix]
    +      saveObjects(out, pickler, m.numRows, m.numCols, m.values)
         }
    -    val bb = ByteBuffer.wrap(bytes)
    -    bb.order(ByteOrder.nativeOrder())
    -    val magic = bb.get()
    -    if (magic != DENSE_MATRIX_MAGIC) {
    -      throw new IllegalArgumentException("Magic " + magic + " is wrong.")
    +
    +    def construct(args: Array[Object]) :Object = {
    +      require(args.length == 3)
    +      new DenseMatrix(args(0).asInstanceOf[Int], args(1).asInstanceOf[Int],
    +        args(2).asInstanceOf[Array[Double]])
         }
    -    val rows = bb.getInt()
    -    val cols = bb.getInt()
    -    if (packetLength != 9 + 8 * rows * cols) {
    -      throw new IllegalArgumentException("Size " + rows + "x" + cols + " is wrong.")
    +  }
    +
    +  // Pickler for SparseVector
    +  private[python] class SparseVectorPickler
    +    extends BasePickler[SparseVector] {
    +
    +    def saveState(obj: Object, out: OutputStream, pickler: Pickler) = {
    +      val v: SparseVector = obj.asInstanceOf[SparseVector]
    +      saveObjects(out, pickler, v.size, v.indices, v.values)
         }
    -    val db = bb.asDoubleBuffer()
    -    val ans = new Array[Array[Double]](rows.toInt)
    -    for (i <- 0 until rows.toInt) {
    -      ans(i) = new Array[Double](cols.toInt)
    -      db.get(ans(i))
    +
    +    def construct(args: Array[Object]) :Object = {
    +      require(args.length == 3)
    +      new SparseVector(args(0).asInstanceOf[Int], args(1).asInstanceOf[Array[Int]],
    +        args(2).asInstanceOf[Array[Double]])
         }
    -    ans
       }
     
    -  private[python] def serializeDoubleMatrix(doubles: Array[Array[Double]]): Array[Byte] = {
    -    val rows = doubles.length
    -    var cols = 0
    -    if (rows > 0) {
    -      cols = doubles(0).length
    +  // Pickler for LabeledPoint
    +  private[python] class LabeledPointPickler
    +    extends BasePickler[LabeledPoint] {
    +
    +    def saveState(obj: Object, out: OutputStream, pickler: Pickler) = {
    +      val point: LabeledPoint = obj.asInstanceOf[LabeledPoint]
    +      saveObjects(out, pickler, point.label, point.features)
         }
    -    val bytes = new Array[Byte](9 + 8 * rows * cols)
    -    val bb = ByteBuffer.wrap(bytes)
    -    bb.order(ByteOrder.nativeOrder())
    -    bb.put(DENSE_MATRIX_MAGIC)
    -    bb.putInt(rows)
    -    bb.putInt(cols)
    -    val db = bb.asDoubleBuffer()
    -    for (i <- 0 until rows) {
    -      db.put(doubles(i))
    +
    +    def construct(args: Array[Object]) :Object = {
    +      if (args.length != 2) {
    +        throw new PickleException("should be 2")
    +      }
    +      new LabeledPoint(args(0).asInstanceOf[Double], args(1).asInstanceOf[Vector])
         }
    -    bytes
       }
     
    -  private[python] def serializeLabeledPoint(p: LabeledPoint): Array[Byte] = {
    -    val fb = serializeDoubleVector(p.features)
    -    val bytes = new Array[Byte](1 + 8 + fb.length)
    -    val bb = ByteBuffer.wrap(bytes)
    -    bb.order(ByteOrder.nativeOrder())
    -    bb.put(LABELED_POINT_MAGIC)
    -    bb.putDouble(p.label)
    -    bb.put(fb)
    -    bytes
    -  }
    +  // Pickler for Rating
    +  private[python] class RatingPickler
    +    extends BasePickler[Rating] {
     
    -  private[python] def deserializeLabeledPoint(bytes: Array[Byte]): LabeledPoint = {
    -    require(bytes.length >= 9, "Byte array too short")
    -    val magic = bytes(0)
    -    if (magic != LABELED_POINT_MAGIC) {
    -      throw new IllegalArgumentException("Magic " + magic + " is wrong.")
    +    def saveState(obj: Object, out: OutputStream, pickler: Pickler) = {
    +      val rating: Rating = obj.asInstanceOf[Rating]
    +      saveObjects(out, pickler, rating.user, rating.product, rating.rating)
         }
    -    val labelBytes = ByteBuffer.wrap(bytes, 1, 8)
    -    labelBytes.order(ByteOrder.nativeOrder())
    -    val label = labelBytes.asDoubleBuffer().get(0)
    -    LabeledPoint(label, deserializeDoubleVector(bytes, 9))
    -  }
     
    -  // Reformat a Matrix into Array[Array[Double]] for serialization
    -  private[python] def to2dArray(matrix: Matrix): Array[Array[Double]] = {
    -    val values = matrix.toArray
    -    Array.tabulate(matrix.numRows, matrix.numCols)((i, j) => values(i + j * matrix.numRows))
    +    def construct(args: Array[Object]) :Object = {
    +      if (args.length != 3) {
    +        throw new PickleException("should be 3")
    +      }
    +      new Rating(args(0).asInstanceOf[Int], args(1).asInstanceOf[Int],
    +        args(2).asInstanceOf[Double])
    +    }
       }
     
    +  def initialize(): Unit = {
    +    new DenseVectorPickler().register()
    +    new DenseMatrixPickler().register()
    +    new SparseVectorPickler().register()
    +    new LabeledPointPickler().register()
    +    new RatingPickler().register()
    +  }
     
    -  /** Unpack a Rating object from an array of bytes */
    -  private[python] def unpackRating(ratingBytes: Array[Byte]): Rating = {
    -    val bb = ByteBuffer.wrap(ratingBytes)
    -    bb.order(ByteOrder.nativeOrder())
    -    val user = bb.getInt()
    -    val product = bb.getInt()
    -    val rating = bb.getDouble()
    -    new Rating(user, product, rating)
    +  def dumps(obj: AnyRef): Array[Byte] = {
    +    new Pickler().dumps(obj)
       }
     
    -  /** Unpack a tuple of Ints from an array of bytes */
    -  def unpackTuple(tupleBytes: Array[Byte]): (Int, Int) = {
    -    val bb = ByteBuffer.wrap(tupleBytes)
    -    bb.order(ByteOrder.nativeOrder())
    -    val v1 = bb.getInt()
    -    val v2 = bb.getInt()
    -    (v1, v2)
    +  def loads(bytes: Array[Byte]): AnyRef = {
    +    new Unpickler().loads(bytes)
       }
     
    -  /**
    -   * Serialize a Rating object into an array of bytes.
    -   * It can be deserialized using RatingDeserializer().
    -   *
    -   * @param rate the Rating object to serialize
    -   * @return
    -   */
    -  def serializeRating(rate: Rating): Array[Byte] = {
    -    val len = 3
    -    val bytes = new Array[Byte](4 + 8 * len)
    -    val bb = ByteBuffer.wrap(bytes)
    -    bb.order(ByteOrder.nativeOrder())
    -    bb.putInt(len)
    -    val db = bb.asDoubleBuffer()
    -    db.put(rate.user.toDouble)
    -    db.put(rate.product.toDouble)
    -    db.put(rate.rating)
    -    bytes
    +  /* convert object into Tuple */
    +  def asTupleRDD(rdd: RDD[Array[Any]]): RDD[(Int, Int)] = {
    --- End diff --
    
    Should there be some checks here (or in the place it is used) to give better error messages?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2378#discussion_r17757949
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala ---
    @@ -476,259 +436,167 @@ class PythonMLLibAPI extends Serializable {
           numRows: Long,
           numCols: Int,
           numPartitions: java.lang.Integer,
    -      seed: java.lang.Long): JavaRDD[Array[Byte]] = {
    +      seed: java.lang.Long): JavaRDD[Vector] = {
         val parts = getNumPartitionsOrDefault(numPartitions, jsc)
         val s = getSeedOrDefault(seed)
    -    RG.poissonVectorRDD(jsc.sc, mean, numRows, numCols, parts, s).map(SerDe.serializeDoubleVector)
    +    RG.poissonVectorRDD(jsc.sc, mean, numRows, numCols, parts, s)
       }
     
     }
     
     /**
    - * :: DeveloperApi ::
    - * MultivariateStatisticalSummary with Vector fields serialized.
    + * SerDe utility functions for PythonMLLibAPI.
      */
    -@DeveloperApi
    -class MultivariateStatisticalSummarySerialized(val summary: MultivariateStatisticalSummary)
    -  extends Serializable {
    +private[spark] object SerDe extends Serializable {
     
    -  def mean: Array[Byte] = SerDe.serializeDoubleVector(summary.mean)
    +  val PYSPARK_PACKAGE = "pyspark.mllib"
     
    -  def variance: Array[Byte] = SerDe.serializeDoubleVector(summary.variance)
    +  /**
    +   * Base class used for pickle
    +   */
    +  private[python] abstract class BasePickler[T: ClassTag]
    +    extends IObjectPickler with IObjectConstructor {
    +
    +    private val cls = implicitly[ClassTag[T]].runtimeClass
    +    private val module = PYSPARK_PACKAGE + "." + cls.getName.split('.')(4)
    +    private val name = cls.getSimpleName
    +
    +    // register this to Pickler and Unpickler
    +    def register(): Unit = {
    +      Pickler.registerCustomPickler(this.getClass, this)
    +      Pickler.registerCustomPickler(cls, this)
    +      Unpickler.registerConstructor(module, name, this)
    +    }
     
    -  def count: Long = summary.count
    +    def pickle(obj: Object, out: OutputStream, pickler: Pickler): Unit = {
    +      if (obj == this) {
    +        out.write(Opcodes.GLOBAL)
    +        out.write((module + "\n" + name + "\n").getBytes())
    +      } else {
    +        pickler.save(this)  // it will be memorized by Pickler
    +        saveState(obj, out, pickler)
    +        out.write(Opcodes.REDUCE)
    +      }
    +    }
    +
    +    private[python] def saveObjects(out: OutputStream, pickler: Pickler,
    +                                    objects: Any*) = {
    +      if (objects.length == 0 || objects.length > 3) {
    +        out.write(Opcodes.MARK)
    +      }
    +      objects.foreach(pickler.save(_))
    +      val code = objects.length match {
    +        case 1 => Opcodes.TUPLE1
    +        case 2 => Opcodes.TUPLE2
    +        case 3 => Opcodes.TUPLE3
    +        case _ => Opcodes.TUPLE
    +      }
    +      out.write(code)
    +    }
     
    -  def numNonzeros: Array[Byte] = SerDe.serializeDoubleVector(summary.numNonzeros)
    +    private[python] def saveState(obj: Object, out: OutputStream, pickler: Pickler)
    +  }
     
    -  def max: Array[Byte] = SerDe.serializeDoubleVector(summary.max)
    +  // Pickler for DenseVector
    +  private[python] class DenseVectorPickler
    +    extends BasePickler[DenseVector] {
     
    -  def min: Array[Byte] = SerDe.serializeDoubleVector(summary.min)
    -}
    +    def saveState(obj: Object, out: OutputStream, pickler: Pickler) = {
    +      val vector: DenseVector = obj.asInstanceOf[DenseVector]
    +      saveObjects(out, pickler, vector.toArray)
    +    }
     
    -/**
    - * SerDe utility functions for PythonMLLibAPI.
    - */
    -private[spark] object SerDe extends Serializable {
    -  private val DENSE_VECTOR_MAGIC: Byte = 1
    -  private val SPARSE_VECTOR_MAGIC: Byte = 2
    -  private val DENSE_MATRIX_MAGIC: Byte = 3
    -  private val LABELED_POINT_MAGIC: Byte = 4
    -
    -  private[python] def deserializeDoubleVector(bytes: Array[Byte], offset: Int = 0): Vector = {
    -    require(bytes.length - offset >= 5, "Byte array too short")
    -    val magic = bytes(offset)
    -    if (magic == DENSE_VECTOR_MAGIC) {
    -      deserializeDenseVector(bytes, offset)
    -    } else if (magic == SPARSE_VECTOR_MAGIC) {
    -      deserializeSparseVector(bytes, offset)
    -    } else {
    -      throw new IllegalArgumentException("Magic " + magic + " is wrong.")
    +    def construct(args: Array[Object]) :Object = {
    +      require(args.length == 1)
    +      new DenseVector(args(0).asInstanceOf[Array[Double]])
         }
       }
     
    -  private[python] def deserializeDouble(bytes: Array[Byte], offset: Int = 0): Double = {
    -    require(bytes.length - offset == 8, "Wrong size byte array for Double")
    -    val bb = ByteBuffer.wrap(bytes, offset, bytes.length - offset)
    -    bb.order(ByteOrder.nativeOrder())
    -    bb.getDouble
    -  }
    -
    -  private[python] def deserializeDenseVector(bytes: Array[Byte], offset: Int = 0): Vector = {
    -    val packetLength = bytes.length - offset
    -    require(packetLength >= 5, "Byte array too short")
    -    val bb = ByteBuffer.wrap(bytes, offset, bytes.length - offset)
    -    bb.order(ByteOrder.nativeOrder())
    -    val magic = bb.get()
    -    require(magic == DENSE_VECTOR_MAGIC, "Invalid magic: " + magic)
    -    val length = bb.getInt()
    -    require (packetLength == 5 + 8 * length, "Invalid packet length: " + packetLength)
    -    val db = bb.asDoubleBuffer()
    -    val ans = new Array[Double](length.toInt)
    -    db.get(ans)
    -    Vectors.dense(ans)
    -  }
    -
    -  private[python] def deserializeSparseVector(bytes: Array[Byte], offset: Int = 0): Vector = {
    -    val packetLength = bytes.length - offset
    -    require(packetLength >= 9, "Byte array too short")
    -    val bb = ByteBuffer.wrap(bytes, offset, bytes.length - offset)
    -    bb.order(ByteOrder.nativeOrder())
    -    val magic = bb.get()
    -    require(magic == SPARSE_VECTOR_MAGIC, "Invalid magic: " + magic)
    -    val size = bb.getInt()
    -    val nonZeros = bb.getInt()
    -    require (packetLength == 9 + 12 * nonZeros, "Invalid packet length: " + packetLength)
    -    val ib = bb.asIntBuffer()
    -    val indices = new Array[Int](nonZeros)
    -    ib.get(indices)
    -    bb.position(bb.position() + 4 * nonZeros)
    -    val db = bb.asDoubleBuffer()
    -    val values = new Array[Double](nonZeros)
    -    db.get(values)
    -    Vectors.sparse(size, indices, values)
    -  }
    +  // Pickler for DenseMatrix
    +  private[python] class DenseMatrixPickler
    +    extends BasePickler[DenseMatrix] {
     
    -  /**
    -   * Returns an 8-byte array for the input Double.
    -   *
    -   * Note: we currently do not use a magic byte for double for storage efficiency.
    -   * This should be reconsidered when we add Ser/De for other 8-byte types (e.g. Long), for safety.
    -   * The corresponding deserializer, deserializeDouble, needs to be modified as well if the
    -   * serialization scheme changes.
    -   */
    -  private[python] def serializeDouble(double: Double): Array[Byte] = {
    -    val bytes = new Array[Byte](8)
    -    val bb = ByteBuffer.wrap(bytes)
    -    bb.order(ByteOrder.nativeOrder())
    -    bb.putDouble(double)
    -    bytes
    -  }
    -
    -  private[python] def serializeDenseVector(doubles: Array[Double]): Array[Byte] = {
    -    val len = doubles.length
    -    val bytes = new Array[Byte](5 + 8 * len)
    -    val bb = ByteBuffer.wrap(bytes)
    -    bb.order(ByteOrder.nativeOrder())
    -    bb.put(DENSE_VECTOR_MAGIC)
    -    bb.putInt(len)
    -    val db = bb.asDoubleBuffer()
    -    db.put(doubles)
    -    bytes
    -  }
    -
    -  private[python] def serializeSparseVector(vector: SparseVector): Array[Byte] = {
    -    val nonZeros = vector.indices.length
    -    val bytes = new Array[Byte](9 + 12 * nonZeros)
    -    val bb = ByteBuffer.wrap(bytes)
    -    bb.order(ByteOrder.nativeOrder())
    -    bb.put(SPARSE_VECTOR_MAGIC)
    -    bb.putInt(vector.size)
    -    bb.putInt(nonZeros)
    -    val ib = bb.asIntBuffer()
    -    ib.put(vector.indices)
    -    bb.position(bb.position() + 4 * nonZeros)
    -    val db = bb.asDoubleBuffer()
    -    db.put(vector.values)
    -    bytes
    -  }
    -
    -  private[python] def serializeDoubleVector(vector: Vector): Array[Byte] = vector match {
    -    case s: SparseVector =>
    -      serializeSparseVector(s)
    -    case _ =>
    -      serializeDenseVector(vector.toArray)
    -  }
    -
    -  private[python] def deserializeDoubleMatrix(bytes: Array[Byte]): Array[Array[Double]] = {
    -    val packetLength = bytes.length
    -    if (packetLength < 9) {
    -      throw new IllegalArgumentException("Byte array too short.")
    +    def saveState(obj: Object, out: OutputStream, pickler: Pickler) = {
    +      val m: DenseMatrix = obj.asInstanceOf[DenseMatrix]
    +      saveObjects(out, pickler, m.numRows, m.numCols, m.values)
         }
    -    val bb = ByteBuffer.wrap(bytes)
    -    bb.order(ByteOrder.nativeOrder())
    -    val magic = bb.get()
    -    if (magic != DENSE_MATRIX_MAGIC) {
    -      throw new IllegalArgumentException("Magic " + magic + " is wrong.")
    +
    +    def construct(args: Array[Object]) :Object = {
    +      require(args.length == 3)
    +      new DenseMatrix(args(0).asInstanceOf[Int], args(1).asInstanceOf[Int],
    +        args(2).asInstanceOf[Array[Double]])
         }
    -    val rows = bb.getInt()
    -    val cols = bb.getInt()
    -    if (packetLength != 9 + 8 * rows * cols) {
    -      throw new IllegalArgumentException("Size " + rows + "x" + cols + " is wrong.")
    +  }
    +
    +  // Pickler for SparseVector
    +  private[python] class SparseVectorPickler
    +    extends BasePickler[SparseVector] {
    +
    +    def saveState(obj: Object, out: OutputStream, pickler: Pickler) = {
    +      val v: SparseVector = obj.asInstanceOf[SparseVector]
    +      saveObjects(out, pickler, v.size, v.indices, v.values)
         }
    -    val db = bb.asDoubleBuffer()
    -    val ans = new Array[Array[Double]](rows.toInt)
    -    for (i <- 0 until rows.toInt) {
    -      ans(i) = new Array[Double](cols.toInt)
    -      db.get(ans(i))
    +
    +    def construct(args: Array[Object]) :Object = {
    +      require(args.length == 3)
    +      new SparseVector(args(0).asInstanceOf[Int], args(1).asInstanceOf[Array[Int]],
    +        args(2).asInstanceOf[Array[Double]])
         }
    -    ans
       }
     
    -  private[python] def serializeDoubleMatrix(doubles: Array[Array[Double]]): Array[Byte] = {
    -    val rows = doubles.length
    -    var cols = 0
    -    if (rows > 0) {
    -      cols = doubles(0).length
    +  // Pickler for LabeledPoint
    +  private[python] class LabeledPointPickler
    +    extends BasePickler[LabeledPoint] {
    +
    +    def saveState(obj: Object, out: OutputStream, pickler: Pickler) = {
    +      val point: LabeledPoint = obj.asInstanceOf[LabeledPoint]
    +      saveObjects(out, pickler, point.label, point.features)
         }
    -    val bytes = new Array[Byte](9 + 8 * rows * cols)
    -    val bb = ByteBuffer.wrap(bytes)
    -    bb.order(ByteOrder.nativeOrder())
    -    bb.put(DENSE_MATRIX_MAGIC)
    -    bb.putInt(rows)
    -    bb.putInt(cols)
    -    val db = bb.asDoubleBuffer()
    -    for (i <- 0 until rows) {
    -      db.put(doubles(i))
    +
    +    def construct(args: Array[Object]) :Object = {
    +      if (args.length != 2) {
    +        throw new PickleException("should be 2")
    +      }
    +      new LabeledPoint(args(0).asInstanceOf[Double], args(1).asInstanceOf[Vector])
         }
    -    bytes
       }
     
    -  private[python] def serializeLabeledPoint(p: LabeledPoint): Array[Byte] = {
    -    val fb = serializeDoubleVector(p.features)
    -    val bytes = new Array[Byte](1 + 8 + fb.length)
    -    val bb = ByteBuffer.wrap(bytes)
    -    bb.order(ByteOrder.nativeOrder())
    -    bb.put(LABELED_POINT_MAGIC)
    -    bb.putDouble(p.label)
    -    bb.put(fb)
    -    bytes
    -  }
    +  // Pickler for Rating
    +  private[python] class RatingPickler
    +    extends BasePickler[Rating] {
     
    -  private[python] def deserializeLabeledPoint(bytes: Array[Byte]): LabeledPoint = {
    -    require(bytes.length >= 9, "Byte array too short")
    -    val magic = bytes(0)
    -    if (magic != LABELED_POINT_MAGIC) {
    -      throw new IllegalArgumentException("Magic " + magic + " is wrong.")
    +    def saveState(obj: Object, out: OutputStream, pickler: Pickler) = {
    +      val rating: Rating = obj.asInstanceOf[Rating]
    +      saveObjects(out, pickler, rating.user, rating.product, rating.rating)
         }
    -    val labelBytes = ByteBuffer.wrap(bytes, 1, 8)
    -    labelBytes.order(ByteOrder.nativeOrder())
    -    val label = labelBytes.asDoubleBuffer().get(0)
    -    LabeledPoint(label, deserializeDoubleVector(bytes, 9))
    -  }
     
    -  // Reformat a Matrix into Array[Array[Double]] for serialization
    -  private[python] def to2dArray(matrix: Matrix): Array[Array[Double]] = {
    -    val values = matrix.toArray
    -    Array.tabulate(matrix.numRows, matrix.numCols)((i, j) => values(i + j * matrix.numRows))
    +    def construct(args: Array[Object]) :Object = {
    +      if (args.length != 3) {
    +        throw new PickleException("should be 3")
    +      }
    +      new Rating(args(0).asInstanceOf[Int], args(1).asInstanceOf[Int],
    +        args(2).asInstanceOf[Double])
    +    }
       }
     
    +  def initialize(): Unit = {
    +    new DenseVectorPickler().register()
    +    new DenseMatrixPickler().register()
    +    new SparseVectorPickler().register()
    +    new LabeledPointPickler().register()
    +    new RatingPickler().register()
    +  }
     
    -  /** Unpack a Rating object from an array of bytes */
    -  private[python] def unpackRating(ratingBytes: Array[Byte]): Rating = {
    -    val bb = ByteBuffer.wrap(ratingBytes)
    -    bb.order(ByteOrder.nativeOrder())
    -    val user = bb.getInt()
    -    val product = bb.getInt()
    -    val rating = bb.getDouble()
    -    new Rating(user, product, rating)
    +  def dumps(obj: AnyRef): Array[Byte] = {
    +    new Pickler().dumps(obj)
       }
     
    -  /** Unpack a tuple of Ints from an array of bytes */
    -  def unpackTuple(tupleBytes: Array[Byte]): (Int, Int) = {
    -    val bb = ByteBuffer.wrap(tupleBytes)
    -    bb.order(ByteOrder.nativeOrder())
    -    val v1 = bb.getInt()
    -    val v2 = bb.getInt()
    -    (v1, v2)
    +  def loads(bytes: Array[Byte]): AnyRef = {
    +    new Unpickler().loads(bytes)
       }
     
    -  /**
    -   * Serialize a Rating object into an array of bytes.
    -   * It can be deserialized using RatingDeserializer().
    -   *
    -   * @param rate the Rating object to serialize
    -   * @return
    -   */
    -  def serializeRating(rate: Rating): Array[Byte] = {
    -    val len = 3
    -    val bytes = new Array[Byte](4 + 8 * len)
    -    val bb = ByteBuffer.wrap(bytes)
    -    bb.order(ByteOrder.nativeOrder())
    -    bb.putInt(len)
    -    val db = bb.asDoubleBuffer()
    -    db.put(rate.user.toDouble)
    -    db.put(rate.product.toDouble)
    -    db.put(rate.rating)
    -    bytes
    +  /* convert object into Tuple */
    +  def asTupleRDD(rdd: RDD[Array[Any]]): RDD[(Int, Int)] = {
    --- End diff --
    
    I had add more type check in Python, it should be more friendly to show message before enter JVM.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on the pull request:

    https://github.com/apache/spark/pull/2378#issuecomment-56242298
  
    Merged. Thanks a lot!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/2378


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [WIP] [MLlib] [PySpark] use pickl...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2378#issuecomment-55486352
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20267/consoleFull) for   PR 2378 at commit [`aa2287e`](https://github.com/apache/spark/commit/aa2287ec75998bbc5512a37d5415dc2115615533).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2378#discussion_r17697397
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala ---
    @@ -476,259 +436,167 @@ class PythonMLLibAPI extends Serializable {
           numRows: Long,
           numCols: Int,
           numPartitions: java.lang.Integer,
    -      seed: java.lang.Long): JavaRDD[Array[Byte]] = {
    +      seed: java.lang.Long): JavaRDD[Vector] = {
         val parts = getNumPartitionsOrDefault(numPartitions, jsc)
         val s = getSeedOrDefault(seed)
    -    RG.poissonVectorRDD(jsc.sc, mean, numRows, numCols, parts, s).map(SerDe.serializeDoubleVector)
    +    RG.poissonVectorRDD(jsc.sc, mean, numRows, numCols, parts, s)
       }
     
     }
     
     /**
    - * :: DeveloperApi ::
    - * MultivariateStatisticalSummary with Vector fields serialized.
    + * SerDe utility functions for PythonMLLibAPI.
      */
    -@DeveloperApi
    -class MultivariateStatisticalSummarySerialized(val summary: MultivariateStatisticalSummary)
    -  extends Serializable {
    +private[spark] object SerDe extends Serializable {
     
    -  def mean: Array[Byte] = SerDe.serializeDoubleVector(summary.mean)
    +  val PYSPARK_PACKAGE = "pyspark.mllib"
     
    -  def variance: Array[Byte] = SerDe.serializeDoubleVector(summary.variance)
    +  /**
    +   * Base class used for pickle
    +   */
    +  private[python] abstract class BasePickler[T: ClassTag]
    +    extends IObjectPickler with IObjectConstructor {
    +
    +    private val cls = implicitly[ClassTag[T]].runtimeClass
    +    private val module = PYSPARK_PACKAGE + "." + cls.getName.split('.')(4)
    +    private val name = cls.getSimpleName
    +
    +    // register this to Pickler and Unpickler
    +    def register(): Unit = {
    +      Pickler.registerCustomPickler(this.getClass, this)
    +      Pickler.registerCustomPickler(cls, this)
    +      Unpickler.registerConstructor(module, name, this)
    +    }
     
    -  def count: Long = summary.count
    +    def pickle(obj: Object, out: OutputStream, pickler: Pickler): Unit = {
    +      if (obj == this) {
    +        out.write(Opcodes.GLOBAL)
    +        out.write((module + "\n" + name + "\n").getBytes())
    +      } else {
    +        pickler.save(this)  // it will be memorized by Pickler
    +        saveState(obj, out, pickler)
    +        out.write(Opcodes.REDUCE)
    +      }
    +    }
    +
    +    private[python] def saveObjects(out: OutputStream, pickler: Pickler,
    +                                    objects: Any*) = {
    +      if (objects.length == 0 || objects.length > 3) {
    +        out.write(Opcodes.MARK)
    +      }
    +      objects.foreach(pickler.save(_))
    +      val code = objects.length match {
    +        case 1 => Opcodes.TUPLE1
    +        case 2 => Opcodes.TUPLE2
    +        case 3 => Opcodes.TUPLE3
    +        case _ => Opcodes.TUPLE
    +      }
    +      out.write(code)
    +    }
     
    -  def numNonzeros: Array[Byte] = SerDe.serializeDoubleVector(summary.numNonzeros)
    +    private[python] def saveState(obj: Object, out: OutputStream, pickler: Pickler)
    +  }
     
    -  def max: Array[Byte] = SerDe.serializeDoubleVector(summary.max)
    +  // Pickler for DenseVector
    +  private[python] class DenseVectorPickler
    +    extends BasePickler[DenseVector] {
     
    -  def min: Array[Byte] = SerDe.serializeDoubleVector(summary.min)
    -}
    +    def saveState(obj: Object, out: OutputStream, pickler: Pickler) = {
    +      val vector: DenseVector = obj.asInstanceOf[DenseVector]
    +      saveObjects(out, pickler, vector.toArray)
    +    }
     
    -/**
    - * SerDe utility functions for PythonMLLibAPI.
    - */
    -private[spark] object SerDe extends Serializable {
    -  private val DENSE_VECTOR_MAGIC: Byte = 1
    -  private val SPARSE_VECTOR_MAGIC: Byte = 2
    -  private val DENSE_MATRIX_MAGIC: Byte = 3
    -  private val LABELED_POINT_MAGIC: Byte = 4
    -
    -  private[python] def deserializeDoubleVector(bytes: Array[Byte], offset: Int = 0): Vector = {
    -    require(bytes.length - offset >= 5, "Byte array too short")
    -    val magic = bytes(offset)
    -    if (magic == DENSE_VECTOR_MAGIC) {
    -      deserializeDenseVector(bytes, offset)
    -    } else if (magic == SPARSE_VECTOR_MAGIC) {
    -      deserializeSparseVector(bytes, offset)
    -    } else {
    -      throw new IllegalArgumentException("Magic " + magic + " is wrong.")
    +    def construct(args: Array[Object]) :Object = {
    +      require(args.length == 1)
    +      new DenseVector(args(0).asInstanceOf[Array[Double]])
         }
       }
     
    -  private[python] def deserializeDouble(bytes: Array[Byte], offset: Int = 0): Double = {
    -    require(bytes.length - offset == 8, "Wrong size byte array for Double")
    -    val bb = ByteBuffer.wrap(bytes, offset, bytes.length - offset)
    -    bb.order(ByteOrder.nativeOrder())
    -    bb.getDouble
    -  }
    -
    -  private[python] def deserializeDenseVector(bytes: Array[Byte], offset: Int = 0): Vector = {
    -    val packetLength = bytes.length - offset
    -    require(packetLength >= 5, "Byte array too short")
    -    val bb = ByteBuffer.wrap(bytes, offset, bytes.length - offset)
    -    bb.order(ByteOrder.nativeOrder())
    -    val magic = bb.get()
    -    require(magic == DENSE_VECTOR_MAGIC, "Invalid magic: " + magic)
    -    val length = bb.getInt()
    -    require (packetLength == 5 + 8 * length, "Invalid packet length: " + packetLength)
    -    val db = bb.asDoubleBuffer()
    -    val ans = new Array[Double](length.toInt)
    -    db.get(ans)
    -    Vectors.dense(ans)
    -  }
    -
    -  private[python] def deserializeSparseVector(bytes: Array[Byte], offset: Int = 0): Vector = {
    -    val packetLength = bytes.length - offset
    -    require(packetLength >= 9, "Byte array too short")
    -    val bb = ByteBuffer.wrap(bytes, offset, bytes.length - offset)
    -    bb.order(ByteOrder.nativeOrder())
    -    val magic = bb.get()
    -    require(magic == SPARSE_VECTOR_MAGIC, "Invalid magic: " + magic)
    -    val size = bb.getInt()
    -    val nonZeros = bb.getInt()
    -    require (packetLength == 9 + 12 * nonZeros, "Invalid packet length: " + packetLength)
    -    val ib = bb.asIntBuffer()
    -    val indices = new Array[Int](nonZeros)
    -    ib.get(indices)
    -    bb.position(bb.position() + 4 * nonZeros)
    -    val db = bb.asDoubleBuffer()
    -    val values = new Array[Double](nonZeros)
    -    db.get(values)
    -    Vectors.sparse(size, indices, values)
    -  }
    +  // Pickler for DenseMatrix
    +  private[python] class DenseMatrixPickler
    +    extends BasePickler[DenseMatrix] {
     
    -  /**
    -   * Returns an 8-byte array for the input Double.
    -   *
    -   * Note: we currently do not use a magic byte for double for storage efficiency.
    -   * This should be reconsidered when we add Ser/De for other 8-byte types (e.g. Long), for safety.
    -   * The corresponding deserializer, deserializeDouble, needs to be modified as well if the
    -   * serialization scheme changes.
    -   */
    -  private[python] def serializeDouble(double: Double): Array[Byte] = {
    -    val bytes = new Array[Byte](8)
    -    val bb = ByteBuffer.wrap(bytes)
    -    bb.order(ByteOrder.nativeOrder())
    -    bb.putDouble(double)
    -    bytes
    -  }
    -
    -  private[python] def serializeDenseVector(doubles: Array[Double]): Array[Byte] = {
    -    val len = doubles.length
    -    val bytes = new Array[Byte](5 + 8 * len)
    -    val bb = ByteBuffer.wrap(bytes)
    -    bb.order(ByteOrder.nativeOrder())
    -    bb.put(DENSE_VECTOR_MAGIC)
    -    bb.putInt(len)
    -    val db = bb.asDoubleBuffer()
    -    db.put(doubles)
    -    bytes
    -  }
    -
    -  private[python] def serializeSparseVector(vector: SparseVector): Array[Byte] = {
    -    val nonZeros = vector.indices.length
    -    val bytes = new Array[Byte](9 + 12 * nonZeros)
    -    val bb = ByteBuffer.wrap(bytes)
    -    bb.order(ByteOrder.nativeOrder())
    -    bb.put(SPARSE_VECTOR_MAGIC)
    -    bb.putInt(vector.size)
    -    bb.putInt(nonZeros)
    -    val ib = bb.asIntBuffer()
    -    ib.put(vector.indices)
    -    bb.position(bb.position() + 4 * nonZeros)
    -    val db = bb.asDoubleBuffer()
    -    db.put(vector.values)
    -    bytes
    -  }
    -
    -  private[python] def serializeDoubleVector(vector: Vector): Array[Byte] = vector match {
    -    case s: SparseVector =>
    -      serializeSparseVector(s)
    -    case _ =>
    -      serializeDenseVector(vector.toArray)
    -  }
    -
    -  private[python] def deserializeDoubleMatrix(bytes: Array[Byte]): Array[Array[Double]] = {
    -    val packetLength = bytes.length
    -    if (packetLength < 9) {
    -      throw new IllegalArgumentException("Byte array too short.")
    +    def saveState(obj: Object, out: OutputStream, pickler: Pickler) = {
    +      val m: DenseMatrix = obj.asInstanceOf[DenseMatrix]
    +      saveObjects(out, pickler, m.numRows, m.numCols, m.values)
         }
    -    val bb = ByteBuffer.wrap(bytes)
    -    bb.order(ByteOrder.nativeOrder())
    -    val magic = bb.get()
    -    if (magic != DENSE_MATRIX_MAGIC) {
    -      throw new IllegalArgumentException("Magic " + magic + " is wrong.")
    +
    +    def construct(args: Array[Object]) :Object = {
    +      require(args.length == 3)
    +      new DenseMatrix(args(0).asInstanceOf[Int], args(1).asInstanceOf[Int],
    +        args(2).asInstanceOf[Array[Double]])
         }
    -    val rows = bb.getInt()
    -    val cols = bb.getInt()
    -    if (packetLength != 9 + 8 * rows * cols) {
    -      throw new IllegalArgumentException("Size " + rows + "x" + cols + " is wrong.")
    +  }
    +
    +  // Pickler for SparseVector
    +  private[python] class SparseVectorPickler
    +    extends BasePickler[SparseVector] {
    +
    +    def saveState(obj: Object, out: OutputStream, pickler: Pickler) = {
    +      val v: SparseVector = obj.asInstanceOf[SparseVector]
    +      saveObjects(out, pickler, v.size, v.indices, v.values)
         }
    -    val db = bb.asDoubleBuffer()
    -    val ans = new Array[Array[Double]](rows.toInt)
    -    for (i <- 0 until rows.toInt) {
    -      ans(i) = new Array[Double](cols.toInt)
    -      db.get(ans(i))
    +
    +    def construct(args: Array[Object]) :Object = {
    +      require(args.length == 3)
    +      new SparseVector(args(0).asInstanceOf[Int], args(1).asInstanceOf[Array[Int]],
    +        args(2).asInstanceOf[Array[Double]])
         }
    -    ans
       }
     
    -  private[python] def serializeDoubleMatrix(doubles: Array[Array[Double]]): Array[Byte] = {
    -    val rows = doubles.length
    -    var cols = 0
    -    if (rows > 0) {
    -      cols = doubles(0).length
    +  // Pickler for LabeledPoint
    +  private[python] class LabeledPointPickler
    +    extends BasePickler[LabeledPoint] {
    +
    +    def saveState(obj: Object, out: OutputStream, pickler: Pickler) = {
    +      val point: LabeledPoint = obj.asInstanceOf[LabeledPoint]
    +      saveObjects(out, pickler, point.label, point.features)
         }
    -    val bytes = new Array[Byte](9 + 8 * rows * cols)
    -    val bb = ByteBuffer.wrap(bytes)
    -    bb.order(ByteOrder.nativeOrder())
    -    bb.put(DENSE_MATRIX_MAGIC)
    -    bb.putInt(rows)
    -    bb.putInt(cols)
    -    val db = bb.asDoubleBuffer()
    -    for (i <- 0 until rows) {
    -      db.put(doubles(i))
    +
    +    def construct(args: Array[Object]) :Object = {
    +      if (args.length != 2) {
    +        throw new PickleException("should be 2")
    --- End diff --
    
    Use consistent Exception type.  (In some other places, require() is used instead.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [WIP] [MLlib] [PySpark] use pickl...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2378#issuecomment-55550409
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20320/consoleFull) for   PR 2378 at commit [`722dd96`](https://github.com/apache/spark/commit/722dd96976d6a083b0ddb985ac6c518c791bce39).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2378#issuecomment-56207099
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/20576/consoleFull) for   PR 2378 at commit [`dffbba2`](https://github.com/apache/spark/commit/dffbba2ba206bbbd3dfc740a55f1b0df341860e7).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [WIP] [MLlib] [PySpark] use pickl...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2378#discussion_r17574390
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala ---
    @@ -17,16 +17,18 @@
     
     package org.apache.spark.mllib.api.python
     
    -import java.nio.{ByteBuffer, ByteOrder}
    +import java.io.OutputStream
     
     import scala.collection.JavaConverters._
     
    +import net.razorvine.pickle.{Pickler, Unpickler, IObjectConstructor, IObjectPickler, PickleException, Opcodes}
    --- End diff --
    
    use `_`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2378#discussion_r17757207
  
    --- Diff: python/pyspark/mllib/tests.py ---
    @@ -198,41 +212,36 @@ def test_serialize(self):
             lil[1, 0] = 1
             lil[3, 0] = 2
             sv = SparseVector(4, {1: 1, 3: 2})
    -        self.assertEquals(sv, _convert_vector(lil))
    -        self.assertEquals(sv, _convert_vector(lil.tocsc()))
    -        self.assertEquals(sv, _convert_vector(lil.tocoo()))
    -        self.assertEquals(sv, _convert_vector(lil.tocsr()))
    -        self.assertEquals(sv, _convert_vector(lil.todok()))
    -        self.assertEquals(sv, _deserialize_double_vector(_serialize_double_vector(lil)))
    -        self.assertEquals(sv, _deserialize_double_vector(_serialize_double_vector(lil.tocsc())))
    -        self.assertEquals(sv, _deserialize_double_vector(_serialize_double_vector(lil.tocsr())))
    -        self.assertEquals(sv, _deserialize_double_vector(_serialize_double_vector(lil.todok())))
    +        self.assertEquals(sv, _convert_to_vector(lil))
    +        self.assertEquals(sv, _convert_to_vector(lil.tocsc()))
    +        self.assertEquals(sv, _convert_to_vector(lil.tocoo()))
    +        self.assertEquals(sv, _convert_to_vector(lil.tocsr()))
    +        self.assertEquals(sv, _convert_to_vector(lil.todok()))
    +
    +        def serialize(l):
    +            return ser.loads(ser.dumps(_convert_to_vector(l)))
    +        self.assertEquals(sv, serialize(lil))
    +        self.assertEquals(sv, serialize(lil.tocsc()))
    +        self.assertEquals(sv, serialize(lil.tocsr()))
    +        self.assertEquals(sv, serialize(lil.todok()))
     
         def test_dot(self):
             from scipy.sparse import lil_matrix
             lil = lil_matrix((4, 1))
             lil[1, 0] = 1
             lil[3, 0] = 2
    -        dv = array([1., 2., 3., 4.])
    -        sv = SparseVector(4, {0: 1, 1: 2, 2: 3, 3: 4})
    -        mat = array([[1., 2., 3., 4.],
    -                     [1., 2., 3., 4.],
    -                     [1., 2., 3., 4.],
    -                     [1., 2., 3., 4.]])
    -        self.assertEquals(10.0, _dot(lil, dv))
    -        self.assertTrue(array_equal(array([3., 6., 9., 12.]), _dot(lil, mat)))
    +        dv = DenseVector(array([1., 2., 3., 4.]))
    +        self.assertEquals(10.0, dv.dot(lil))
    --- End diff --
    
    Your comment is good, I really worry about that whether I had missed something, it will be bad to have functionality regression.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2378#discussion_r17700232
  
    --- Diff: python/pyspark/mllib/linalg.py ---
    @@ -23,14 +23,148 @@
     SciPy is available in their environment.
     """
     
    -import numpy
    -from numpy import array, array_equal, ndarray, float64, int32
    +import sys
    +import array
    +import copy_reg
     
    +import numpy as np
     
    -__all__ = ['SparseVector', 'Vectors']
    +__all__ = ['Vector', 'DenseVector', 'SparseVector', 'Vectors']
     
     
    -class SparseVector(object):
    +if sys.version_info[:2] == (2, 7):
    +    # speed up pickling array in Python 2.7
    +    def fast_pickle_array(ar):
    +        return array.array, (ar.typecode, ar.tostring())
    +    copy_reg.pickle(array.array, fast_pickle_array)
    +
    +
    +# Check whether we have SciPy. MLlib works without it too, but if we have it, some methods,
    +# such as _dot and _serialize_double_vector, start to support scipy.sparse matrices.
    +
    +try:
    +    import scipy.sparse
    +    _have_scipy = True
    +except:
    +    # No SciPy in environment, but that's okay
    +    _have_scipy = False
    +
    +
    +def _convert_to_vector(l):
    +    if isinstance(l, Vector):
    +        return l
    +    elif type(l) in (array.array, np.array, np.ndarray, list, tuple):
    +        return DenseVector(l)
    +    elif _have_scipy and scipy.sparse.issparse(l):
    +        assert l.shape[1] == 1, "Expected column vector"
    +        csc = l.tocsc()
    +        return SparseVector(l.shape[0], csc.indices, csc.data)
    +    else:
    +        raise TypeError("Cannot convert type %s into Vector" % type(l))
    +
    +
    +class Vector(object):
    +    """
    +    Abstract class for DenseVector and SparseVector
    +    """
    +    def toArray(self):
    +        """
    +        Convert the vector into an numpy.ndarray
    +        :return: numpy.ndarray
    +        """
    +        raise NotImplementedError
    +
    +
    +class DenseVector(Vector):
    +    def __init__(self, ar):
    +        if not isinstance(ar, array.array):
    +            ar = array.array('d', ar)
    +        self.array = ar
    +
    +    def __reduce__(self):
    +        return DenseVector, (self.array,)
    +
    +    def dot(self, other):
    +        """
    +        Compute the dot product of two Vectors. We support
    +        (Numpy array, list, SparseVector, or SciPy sparse)
    +        and a target NumPy array that is either 1- or 2-dimensional.
    +        Equivalent to calling numpy.dot of the two vectors.
    +
    +        >>> dense = DenseVector(array.array('d', [1., 2.]))
    +        >>> dense.dot(dense)
    +        5.0
    +        >>> dense.dot(SparseVector(2, [0, 1], [2., 1.]))
    +        4.0
    +        >>> dense.dot(range(1, 3))
    +        5.0
    +        >>> dense.dot(np.array(range(1, 3)))
    +        5.0
    +        """
    +        if isinstance(other, SparseVector):
    +            return other.dot(self)
    +        elif _have_scipy and scipy.sparse.issparse(other):
    +            return other.transpose().dot(self.toArray())[0]
    +        elif isinstance(other, Vector):
    +            return np.dot(self.toArray(), other.toArray())
    --- End diff --
    
    Just wondering: Is it inefficient to have to convert types by calling toArray()?  Does that mean multiple passes over (or copies of) the data?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2378#discussion_r17701227
  
    --- Diff: python/pyspark/mllib/linalg.py ---
    @@ -61,16 +195,19 @@ def __init__(self, size, *args):
                 if type(pairs) == dict:
                     pairs = pairs.items()
                 pairs = sorted(pairs)
    -            self.indices = array([p[0] for p in pairs], dtype=int32)
    -            self.values = array([p[1] for p in pairs], dtype=float64)
    +            self.indices = array.array('i', [p[0] for p in pairs])
    --- End diff --
    
    Also maybe check that p is a pair.  I could imagine someone writing a mistake like this:
    SparseVector(4, [(1, 2, 3), (1, 5, 0)])
    as in: SparseVector(4, [(indices), (values)])
    I think the code would not catch this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [WIP] [MLlib] [PySpark] use pickl...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2378#issuecomment-55676382
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/109/consoleFull) for   PR 2378 at commit [`722dd96`](https://github.com/apache/spark/commit/722dd96976d6a083b0ddb985ac6c518c791bce39).
     * This patch **fails** unit tests.
     * This patch **does not** merge cleanly!



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2378#discussion_r17703595
  
    --- Diff: python/pyspark/mllib/tree.py ---
    @@ -90,53 +89,24 @@ class DecisionTree(object):
         EXPERIMENTAL: This is an experimental API.
                       It will probably be modified for Spark v1.2.
     
    -    Example usage:
    -
    -    >>> from numpy import array
    -    >>> import sys
    -    >>> from pyspark.mllib.regression import LabeledPoint
    -    >>> from pyspark.mllib.tree import DecisionTree
    -    >>> from pyspark.mllib.linalg import SparseVector
    -    >>>
    -    >>> data = [
    -    ...     LabeledPoint(0.0, [0.0]),
    -    ...     LabeledPoint(1.0, [1.0]),
    -    ...     LabeledPoint(1.0, [2.0]),
    -    ...     LabeledPoint(1.0, [3.0])
    -    ... ]
    -    >>> categoricalFeaturesInfo = {} # no categorical features
    -    >>> model = DecisionTree.trainClassifier(sc.parallelize(data), numClasses=2,
    -    ...                                      categoricalFeaturesInfo=categoricalFeaturesInfo)
    -    >>> sys.stdout.write(model)
    -    DecisionTreeModel classifier
    -      If (feature 0 <= 0.5)
    -       Predict: 0.0
    -      Else (feature 0 > 0.5)
    -       Predict: 1.0
    -    >>> model.predict(array([1.0])) > 0
    -    True
    -    >>> model.predict(array([0.0])) == 0
    -    True
    -    >>> sparse_data = [
    -    ...     LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
    -    ...     LabeledPoint(1.0, SparseVector(2, {1: 1.0})),
    -    ...     LabeledPoint(0.0, SparseVector(2, {0: 0.0})),
    -    ...     LabeledPoint(1.0, SparseVector(2, {1: 2.0}))
    -    ... ]
    -    >>>
    -    >>> model = DecisionTree.trainRegressor(sc.parallelize(sparse_data),
    -    ...                                     categoricalFeaturesInfo=categoricalFeaturesInfo)
    -    >>> model.predict(array([0.0, 1.0])) == 1
    -    True
    -    >>> model.predict(array([0.0, 0.0])) == 0
    -    True
    -    >>> model.predict(SparseVector(2, {1: 1.0})) == 1
    -    True
    -    >>> model.predict(SparseVector(2, {1: 0.0})) == 0
    -    True
         """
     
         @staticmethod
    +    def _train(data, type, numClasses, categoricalFeaturesInfo,
    +               impurity="gini", maxDepth=5, maxBins=32, minInstancesPerNode=1,
    +               minInfoGain=0.0):
    +        first = data.first()
    +        assert isinstance(first, LabeledPoint), "the data should be RDD of LabeleddPoint"
    --- End diff --
    
    "LabeleddPoint" --> "LabeledPoint"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on the pull request:

    https://github.com/apache/spark/pull/2378#issuecomment-56210084
  
    @mengxr PickleSerializer do not compress data, there is CompressSerializer can do it using gzip(level 1). Compression can help for small range of double or repeated values, will be worser with random double in large range.
    
    BatchedSerializer can help to reduce the overhead of name of class. In JVM, the memory of short lived objects can not be reused without GC, so batched-serialization will not increase the gc pressure if the batch size it not too large. (depend on how gc is configured)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2378#issuecomment-56112010
  
      [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/132/consoleFull) for   PR 2378 at commit [`032cd62`](https://github.com/apache/spark/commit/032cd62cee6b2bd134f6b9017a7e68ef333990a5).
     * This patch merges cleanly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [WIP] [MLlib] [PySpark] use pickl...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2378#discussion_r17574399
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala ---
    @@ -472,214 +452,140 @@ class PythonMLLibAPI extends Serializable {
           numRows: Long,
           numCols: Int,
           numPartitions: java.lang.Integer,
    -      seed: java.lang.Long): JavaRDD[Array[Byte]] = {
    +      seed: java.lang.Long): JavaRDD[Vector] = {
         val parts = getNumPartitionsOrDefault(numPartitions, jsc)
         val s = getSeedOrDefault(seed)
    -    RG.poissonVectorRDD(jsc.sc, mean, numRows, numCols, parts, s).map(SerDe.serializeDoubleVector)
    +    RG.poissonVectorRDD(jsc.sc, mean, numRows, numCols, parts, s)
       }
     
     }
     
     /**
    - * :: DeveloperApi ::
    - * MultivariateStatisticalSummary with Vector fields serialized.
    + * SerDe utility functions for PythonMLLibAPI.
      */
    -@DeveloperApi
    -class MultivariateStatisticalSummarySerialized(val summary: MultivariateStatisticalSummary)
    -  extends Serializable {
    +private[spark] object SerDe extends Serializable {
     
    -  def mean: Array[Byte] = SerDe.serializeDoubleVector(summary.mean)
    +  private[python] def reduce_object(out: OutputStream, pickler: Pickler,
    +                                    module: String, name: String, objects: Object*) = {
    +    out.write(Opcodes.GLOBAL)
    +    out.write((module + "\n" + name + "\n").getBytes)
    --- End diff --
    
    Does it increase the storage cost by a lot for small objects?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2378#discussion_r17752588
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/linalg/Matrices.scala ---
    @@ -64,6 +64,12 @@ class DenseMatrix(val numRows: Int, val numCols: Int, val values: Array[Double])
       override def toArray: Array[Double] = values
     
       private[mllib] override def toBreeze: BM[Double] = new BDM[Double](numRows, numCols, values)
    +
    +  override def equals(o: Any) = o match {
    --- End diff --
    
    good catch!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2378#discussion_r17752597
  
    --- Diff: python/pyspark/mllib/linalg.py ---
    @@ -23,14 +23,148 @@
     SciPy is available in their environment.
     """
     
    -import numpy
    -from numpy import array, array_equal, ndarray, float64, int32
    +import sys
    +import array
    +import copy_reg
     
    +import numpy as np
     
    -__all__ = ['SparseVector', 'Vectors']
    +__all__ = ['Vector', 'DenseVector', 'SparseVector', 'Vectors']
     
     
    -class SparseVector(object):
    +if sys.version_info[:2] == (2, 7):
    +    # speed up pickling array in Python 2.7
    +    def fast_pickle_array(ar):
    +        return array.array, (ar.typecode, ar.tostring())
    +    copy_reg.pickle(array.array, fast_pickle_array)
    +
    +
    +# Check whether we have SciPy. MLlib works without it too, but if we have it, some methods,
    +# such as _dot and _serialize_double_vector, start to support scipy.sparse matrices.
    +
    +try:
    +    import scipy.sparse
    +    _have_scipy = True
    +except:
    +    # No SciPy in environment, but that's okay
    +    _have_scipy = False
    +
    +
    +def _convert_to_vector(l):
    +    if isinstance(l, Vector):
    +        return l
    +    elif type(l) in (array.array, np.array, np.ndarray, list, tuple):
    +        return DenseVector(l)
    +    elif _have_scipy and scipy.sparse.issparse(l):
    +        assert l.shape[1] == 1, "Expected column vector"
    +        csc = l.tocsc()
    +        return SparseVector(l.shape[0], csc.indices, csc.data)
    +    else:
    +        raise TypeError("Cannot convert type %s into Vector" % type(l))
    +
    +
    +class Vector(object):
    +    """
    +    Abstract class for DenseVector and SparseVector
    +    """
    +    def toArray(self):
    +        """
    +        Convert the vector into an numpy.ndarray
    +        :return: numpy.ndarray
    +        """
    +        raise NotImplementedError
    +
    +
    +class DenseVector(Vector):
    +    def __init__(self, ar):
    +        if not isinstance(ar, array.array):
    +            ar = array.array('d', ar)
    +        self.array = ar
    +
    +    def __reduce__(self):
    +        return DenseVector, (self.array,)
    +
    +    def dot(self, other):
    +        """
    +        Compute the dot product of two Vectors. We support
    +        (Numpy array, list, SparseVector, or SciPy sparse)
    +        and a target NumPy array that is either 1- or 2-dimensional.
    +        Equivalent to calling numpy.dot of the two vectors.
    +
    +        >>> dense = DenseVector(array.array('d', [1., 2.]))
    +        >>> dense.dot(dense)
    +        5.0
    +        >>> dense.dot(SparseVector(2, [0, 1], [2., 1.]))
    +        4.0
    +        >>> dense.dot(range(1, 3))
    +        5.0
    +        >>> dense.dot(np.array(range(1, 3)))
    +        5.0
    +        """
    +        if isinstance(other, SparseVector):
    +            return other.dot(self)
    +        elif _have_scipy and scipy.sparse.issparse(other):
    +            return other.transpose().dot(self.toArray())[0]
    +        elif isinstance(other, Vector):
    +            return np.dot(self.toArray(), other.toArray())
    +        else:
    +            return np.dot(self.toArray(), other)
    +
    +    def squared_distance(self, other):
    +        """
    +        Squared distance of two Vectors.
    +
    +        >>> dense1 = DenseVector(array.array('d', [1., 2.]))
    +        >>> dense1.squared_distance(dense1)
    +        0.0
    +        >>> dense2 = np.array([2., 1.])
    +        >>> dense1.squared_distance(dense2)
    +        2.0
    +        >>> dense3 = [2., 1.]
    +        >>> dense1.squared_distance(dense2)
    --- End diff --
    
    good catch!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2378#discussion_r17702101
  
    --- Diff: python/pyspark/mllib/linalg.py ---
    @@ -257,10 +410,34 @@ def stringify(vector):
             >>> Vectors.stringify(Vectors.dense([0.0, 1.0]))
             '[0.0,1.0]'
             """
    -        if type(vector) == SparseVector:
    -            return str(vector)
    -        else:
    -            return "[" + ",".join([str(v) for v in vector]) + "]"
    +        return str(vector)
    +
    +
    +class Matrix(Vector):
    --- End diff --
    
    I'm not sure this should subclass Vector.  A Matrix is not really a type of Vector.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2378#discussion_r17756431
  
    --- Diff: python/pyspark/mllib/tests.py ---
    @@ -198,41 +212,36 @@ def test_serialize(self):
             lil[1, 0] = 1
             lil[3, 0] = 2
             sv = SparseVector(4, {1: 1, 3: 2})
    -        self.assertEquals(sv, _convert_vector(lil))
    -        self.assertEquals(sv, _convert_vector(lil.tocsc()))
    -        self.assertEquals(sv, _convert_vector(lil.tocoo()))
    -        self.assertEquals(sv, _convert_vector(lil.tocsr()))
    -        self.assertEquals(sv, _convert_vector(lil.todok()))
    -        self.assertEquals(sv, _deserialize_double_vector(_serialize_double_vector(lil)))
    -        self.assertEquals(sv, _deserialize_double_vector(_serialize_double_vector(lil.tocsc())))
    -        self.assertEquals(sv, _deserialize_double_vector(_serialize_double_vector(lil.tocsr())))
    -        self.assertEquals(sv, _deserialize_double_vector(_serialize_double_vector(lil.todok())))
    +        self.assertEquals(sv, _convert_to_vector(lil))
    +        self.assertEquals(sv, _convert_to_vector(lil.tocsc()))
    +        self.assertEquals(sv, _convert_to_vector(lil.tocoo()))
    +        self.assertEquals(sv, _convert_to_vector(lil.tocsr()))
    +        self.assertEquals(sv, _convert_to_vector(lil.todok()))
    +
    +        def serialize(l):
    +            return ser.loads(ser.dumps(_convert_to_vector(l)))
    +        self.assertEquals(sv, serialize(lil))
    +        self.assertEquals(sv, serialize(lil.tocsc()))
    +        self.assertEquals(sv, serialize(lil.tocsr()))
    +        self.assertEquals(sv, serialize(lil.todok()))
     
         def test_dot(self):
             from scipy.sparse import lil_matrix
             lil = lil_matrix((4, 1))
             lil[1, 0] = 1
             lil[3, 0] = 2
    -        dv = array([1., 2., 3., 4.])
    -        sv = SparseVector(4, {0: 1, 1: 2, 2: 3, 3: 4})
    -        mat = array([[1., 2., 3., 4.],
    -                     [1., 2., 3., 4.],
    -                     [1., 2., 3., 4.],
    -                     [1., 2., 3., 4.]])
    -        self.assertEquals(10.0, _dot(lil, dv))
    -        self.assertTrue(array_equal(array([3., 6., 9., 12.]), _dot(lil, mat)))
    +        dv = DenseVector(array([1., 2., 3., 4.]))
    +        self.assertEquals(10.0, dv.dot(lil))
    --- End diff --
    
    I guess the assumption in the old code was that one of the arguments of dot() would be an array represented as a 1-dimensional matrix.  Now that I think about it, that functionality is better left as traditional matrix-vector multiplication.  I revoke my previous comment!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/2378#issuecomment-56116566
  
      [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/132/consoleFull) for   PR 2378 at commit [`032cd62`](https://github.com/apache/spark/commit/032cd62cee6b2bd134f6b9017a7e68ef333990a5).
     * This patch **fails** unit tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2378#discussion_r17701086
  
    --- Diff: python/pyspark/mllib/linalg.py ---
    @@ -61,16 +195,19 @@ def __init__(self, size, *args):
                 if type(pairs) == dict:
                     pairs = pairs.items()
                 pairs = sorted(pairs)
    -            self.indices = array([p[0] for p in pairs], dtype=int32)
    -            self.values = array([p[1] for p in pairs], dtype=float64)
    +            self.indices = array.array('i', [p[0] for p in pairs])
    --- End diff --
    
    Check for duplicates?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...

Posted by davies <gi...@git.apache.org>.
Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2378#discussion_r17751963
  
    --- Diff: python/pyspark/mllib/tests.py ---
    @@ -198,41 +212,36 @@ def test_serialize(self):
             lil[1, 0] = 1
             lil[3, 0] = 2
             sv = SparseVector(4, {1: 1, 3: 2})
    -        self.assertEquals(sv, _convert_vector(lil))
    -        self.assertEquals(sv, _convert_vector(lil.tocsc()))
    -        self.assertEquals(sv, _convert_vector(lil.tocoo()))
    -        self.assertEquals(sv, _convert_vector(lil.tocsr()))
    -        self.assertEquals(sv, _convert_vector(lil.todok()))
    -        self.assertEquals(sv, _deserialize_double_vector(_serialize_double_vector(lil)))
    -        self.assertEquals(sv, _deserialize_double_vector(_serialize_double_vector(lil.tocsc())))
    -        self.assertEquals(sv, _deserialize_double_vector(_serialize_double_vector(lil.tocsr())))
    -        self.assertEquals(sv, _deserialize_double_vector(_serialize_double_vector(lil.todok())))
    +        self.assertEquals(sv, _convert_to_vector(lil))
    +        self.assertEquals(sv, _convert_to_vector(lil.tocsc()))
    +        self.assertEquals(sv, _convert_to_vector(lil.tocoo()))
    +        self.assertEquals(sv, _convert_to_vector(lil.tocsr()))
    +        self.assertEquals(sv, _convert_to_vector(lil.todok()))
    +
    +        def serialize(l):
    +            return ser.loads(ser.dumps(_convert_to_vector(l)))
    +        self.assertEquals(sv, serialize(lil))
    +        self.assertEquals(sv, serialize(lil.tocsc()))
    +        self.assertEquals(sv, serialize(lil.tocsr()))
    +        self.assertEquals(sv, serialize(lil.todok()))
     
         def test_dot(self):
             from scipy.sparse import lil_matrix
             lil = lil_matrix((4, 1))
             lil[1, 0] = 1
             lil[3, 0] = 2
    -        dv = array([1., 2., 3., 4.])
    -        sv = SparseVector(4, {0: 1, 1: 2, 2: 3, 3: 4})
    -        mat = array([[1., 2., 3., 4.],
    -                     [1., 2., 3., 4.],
    -                     [1., 2., 3., 4.],
    -                     [1., 2., 3., 4.]])
    -        self.assertEquals(10.0, _dot(lil, dv))
    -        self.assertTrue(array_equal(array([3., 6., 9., 12.]), _dot(lil, mat)))
    +        dv = DenseVector(array([1., 2., 3., 4.]))
    +        self.assertEquals(10.0, dv.dot(lil))
    --- End diff --
    
    lil and mat are both matrix, the dot of them are needed in which module/algorithm?
    
    I did not figure out how to deal with them now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-3491] [MLlib] [PySpark] use pickle to s...

Posted by jkbradley <gi...@git.apache.org>.
Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2378#discussion_r17703466
  
    --- Diff: python/pyspark/mllib/tests.py ---
    @@ -198,41 +212,36 @@ def test_serialize(self):
             lil[1, 0] = 1
             lil[3, 0] = 2
             sv = SparseVector(4, {1: 1, 3: 2})
    -        self.assertEquals(sv, _convert_vector(lil))
    -        self.assertEquals(sv, _convert_vector(lil.tocsc()))
    -        self.assertEquals(sv, _convert_vector(lil.tocoo()))
    -        self.assertEquals(sv, _convert_vector(lil.tocsr()))
    -        self.assertEquals(sv, _convert_vector(lil.todok()))
    -        self.assertEquals(sv, _deserialize_double_vector(_serialize_double_vector(lil)))
    -        self.assertEquals(sv, _deserialize_double_vector(_serialize_double_vector(lil.tocsc())))
    -        self.assertEquals(sv, _deserialize_double_vector(_serialize_double_vector(lil.tocsr())))
    -        self.assertEquals(sv, _deserialize_double_vector(_serialize_double_vector(lil.todok())))
    +        self.assertEquals(sv, _convert_to_vector(lil))
    +        self.assertEquals(sv, _convert_to_vector(lil.tocsc()))
    +        self.assertEquals(sv, _convert_to_vector(lil.tocoo()))
    +        self.assertEquals(sv, _convert_to_vector(lil.tocsr()))
    +        self.assertEquals(sv, _convert_to_vector(lil.todok()))
    +
    +        def serialize(l):
    +            return ser.loads(ser.dumps(_convert_to_vector(l)))
    +        self.assertEquals(sv, serialize(lil))
    +        self.assertEquals(sv, serialize(lil.tocsc()))
    +        self.assertEquals(sv, serialize(lil.tocsr()))
    +        self.assertEquals(sv, serialize(lil.todok()))
     
         def test_dot(self):
             from scipy.sparse import lil_matrix
             lil = lil_matrix((4, 1))
             lil[1, 0] = 1
             lil[3, 0] = 2
    -        dv = array([1., 2., 3., 4.])
    -        sv = SparseVector(4, {0: 1, 1: 2, 2: 3, 3: 4})
    -        mat = array([[1., 2., 3., 4.],
    -                     [1., 2., 3., 4.],
    -                     [1., 2., 3., 4.],
    -                     [1., 2., 3., 4.]])
    -        self.assertEquals(10.0, _dot(lil, dv))
    -        self.assertTrue(array_equal(array([3., 6., 9., 12.]), _dot(lil, mat)))
    +        dv = DenseVector(array([1., 2., 3., 4.]))
    +        self.assertEquals(10.0, dv.dot(lil))
    --- End diff --
    
    Why remove test of dot(vector, matrix)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org