You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by me...@apache.org on 2014/11/25 01:37:19 UTC

spark git commit: [SPARK-4562] [MLlib] speedup vector

Repository: spark
Updated Branches:
  refs/heads/master cb0e9b098 -> b660de7a9


[SPARK-4562] [MLlib] speedup vector

This PR change the underline array of DenseVector to numpy.ndarray to avoid the conversion, because most of the users will using numpy.array.

It also improve the serialization of DenseVector.

Before this change:

trial	| trainingTime | 	testTime
-------|--------|--------
0	| 5.126 | 	1.786
1	|2.698	|1.693

After the change:

trial	| trainingTime |	testTime
-------|--------|--------
0	|4.692	|0.554
1	|2.307	|0.525

This could partially fix the performance regression during test.

Author: Davies Liu <da...@databricks.com>

Closes #3420 from davies/ser2 and squashes the following commits:

0e1e6f3 [Davies Liu] fix tests
426f5db [Davies Liu] impove toArray()
44707ec [Davies Liu] add name for ISO-8859-1
fa7d791 [Davies Liu] address comments
1cfb137 [Davies Liu] handle zero sparse vector
2548ee2 [Davies Liu] fix tests
9e6389d [Davies Liu] bugfix
470f702 [Davies Liu] speed up DenseMatrix
f0d3c40 [Davies Liu] speedup SparseVector
ef6ce70 [Davies Liu] speed up dense vector


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b660de7a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b660de7a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b660de7a

Branch: refs/heads/master
Commit: b660de7a9cbdea3df4a37fbcf60c1c33c71782b8
Parents: cb0e9b0
Author: Davies Liu <da...@databricks.com>
Authored: Mon Nov 24 16:37:14 2014 -0800
Committer: Xiangrui Meng <me...@databricks.com>
Committed: Mon Nov 24 16:37:14 2014 -0800

----------------------------------------------------------------------
 .../spark/mllib/api/python/PythonMLLibAPI.scala | 73 +++++++++++++++++---
 python/pyspark/mllib/linalg.py                  | 73 +++++++++++++-------
 python/pyspark/mllib/tests.py                   |  6 +-
 3 files changed, 118 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b660de7a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
index f04df1c..9f20cd5 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.mllib.api.python
 
 import java.io.OutputStream
+import java.nio.{ByteBuffer, ByteOrder}
 import java.util.{ArrayList => JArrayList, List => JList, Map => JMap}
 
 import scala.collection.JavaConverters._
@@ -684,6 +685,7 @@ class PythonMLLibAPI extends Serializable {
 private[spark] object SerDe extends Serializable {
 
   val PYSPARK_PACKAGE = "pyspark.mllib"
+  val LATIN1 = "ISO-8859-1"
 
   /**
    * Base class used for pickle
@@ -735,7 +737,16 @@ private[spark] object SerDe extends Serializable {
 
     def saveState(obj: Object, out: OutputStream, pickler: Pickler) = {
       val vector: DenseVector = obj.asInstanceOf[DenseVector]
-      saveObjects(out, pickler, vector.toArray)
+      val bytes = new Array[Byte](8 * vector.size)
+      val bb = ByteBuffer.wrap(bytes)
+      bb.order(ByteOrder.nativeOrder())
+      val db = bb.asDoubleBuffer()
+      db.put(vector.values)
+
+      out.write(Opcodes.BINSTRING)
+      out.write(PickleUtils.integer_to_bytes(bytes.length))
+      out.write(bytes)
+      out.write(Opcodes.TUPLE1)
     }
 
     def construct(args: Array[Object]): Object = {
@@ -743,7 +754,13 @@ private[spark] object SerDe extends Serializable {
       if (args.length != 1) {
         throw new PickleException("should be 1")
       }
-      new DenseVector(args(0).asInstanceOf[Array[Double]])
+      val bytes = args(0).asInstanceOf[String].getBytes(LATIN1)
+      val bb = ByteBuffer.wrap(bytes, 0, bytes.length)
+      bb.order(ByteOrder.nativeOrder())
+      val db = bb.asDoubleBuffer()
+      val ans = new Array[Double](bytes.length / 8)
+      db.get(ans)
+      Vectors.dense(ans)
     }
   }
 
@@ -752,15 +769,30 @@ private[spark] object SerDe extends Serializable {
 
     def saveState(obj: Object, out: OutputStream, pickler: Pickler) = {
       val m: DenseMatrix = obj.asInstanceOf[DenseMatrix]
-      saveObjects(out, pickler, m.numRows, m.numCols, m.values)
+      val bytes = new Array[Byte](8 * m.values.size)
+      val order = ByteOrder.nativeOrder()
+      ByteBuffer.wrap(bytes).order(order).asDoubleBuffer().put(m.values)
+
+      out.write(Opcodes.BININT)
+      out.write(PickleUtils.integer_to_bytes(m.numRows))
+      out.write(Opcodes.BININT)
+      out.write(PickleUtils.integer_to_bytes(m.numCols))
+      out.write(Opcodes.BINSTRING)
+      out.write(PickleUtils.integer_to_bytes(bytes.length))
+      out.write(bytes)
+      out.write(Opcodes.TUPLE3)
     }
 
     def construct(args: Array[Object]): Object = {
       if (args.length != 3) {
         throw new PickleException("should be 3")
       }
-      new DenseMatrix(args(0).asInstanceOf[Int], args(1).asInstanceOf[Int],
-        args(2).asInstanceOf[Array[Double]])
+      val bytes = args(2).asInstanceOf[String].getBytes(LATIN1)
+      val n = bytes.length / 8
+      val values = new Array[Double](n)
+      val order = ByteOrder.nativeOrder()
+      ByteBuffer.wrap(bytes).order(order).asDoubleBuffer().get(values)
+      new DenseMatrix(args(0).asInstanceOf[Int], args(1).asInstanceOf[Int], values)
     }
   }
 
@@ -769,15 +801,40 @@ private[spark] object SerDe extends Serializable {
 
     def saveState(obj: Object, out: OutputStream, pickler: Pickler) = {
       val v: SparseVector = obj.asInstanceOf[SparseVector]
-      saveObjects(out, pickler, v.size, v.indices, v.values)
+      val n = v.indices.size
+      val indiceBytes = new Array[Byte](4 * n)
+      val order = ByteOrder.nativeOrder()
+      ByteBuffer.wrap(indiceBytes).order(order).asIntBuffer().put(v.indices)
+      val valueBytes = new Array[Byte](8 * n)
+      ByteBuffer.wrap(valueBytes).order(order).asDoubleBuffer().put(v.values)
+
+      out.write(Opcodes.BININT)
+      out.write(PickleUtils.integer_to_bytes(v.size))
+      out.write(Opcodes.BINSTRING)
+      out.write(PickleUtils.integer_to_bytes(indiceBytes.length))
+      out.write(indiceBytes)
+      out.write(Opcodes.BINSTRING)
+      out.write(PickleUtils.integer_to_bytes(valueBytes.length))
+      out.write(valueBytes)
+      out.write(Opcodes.TUPLE3)
     }
 
     def construct(args: Array[Object]): Object = {
       if (args.length != 3) {
         throw new PickleException("should be 3")
       }
-      new SparseVector(args(0).asInstanceOf[Int], args(1).asInstanceOf[Array[Int]],
-        args(2).asInstanceOf[Array[Double]])
+      val size = args(0).asInstanceOf[Int]
+      val indiceBytes = args(1).asInstanceOf[String].getBytes(LATIN1)
+      val valueBytes = args(2).asInstanceOf[String].getBytes(LATIN1)
+      val n = indiceBytes.length / 4
+      val indices = new Array[Int](n)
+      val values = new Array[Double](n)
+      if (n > 0) {
+        val order = ByteOrder.nativeOrder()
+        ByteBuffer.wrap(indiceBytes).order(order).asIntBuffer().get(indices)
+        ByteBuffer.wrap(valueBytes).order(order).asDoubleBuffer().get(values)
+      }
+      new SparseVector(size, indices, values)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b660de7a/python/pyspark/mllib/linalg.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/linalg.py b/python/pyspark/mllib/linalg.py
index 537b176..f7aa2b0 100644
--- a/python/pyspark/mllib/linalg.py
+++ b/python/pyspark/mllib/linalg.py
@@ -30,7 +30,7 @@ import copy_reg
 import numpy as np
 
 from pyspark.sql import UserDefinedType, StructField, StructType, ArrayType, DoubleType, \
-    IntegerType, ByteType, Row
+    IntegerType, ByteType
 
 
 __all__ = ['Vector', 'DenseVector', 'SparseVector', 'Vectors', 'DenseMatrix', 'Matrices']
@@ -173,12 +173,16 @@ class DenseVector(Vector):
     A dense vector represented by a value array.
     """
     def __init__(self, ar):
-        if not isinstance(ar, array.array):
-            ar = array.array('d', ar)
+        if isinstance(ar, basestring):
+            ar = np.frombuffer(ar, dtype=np.float64)
+        elif not isinstance(ar, np.ndarray):
+            ar = np.array(ar, dtype=np.float64)
+        if ar.dtype != np.float64:
+            ar.astype(np.float64)
         self.array = ar
 
     def __reduce__(self):
-        return DenseVector, (self.array,)
+        return DenseVector, (self.array.tostring(),)
 
     def dot(self, other):
         """
@@ -207,9 +211,10 @@ class DenseVector(Vector):
             ...
         AssertionError: dimension mismatch
         """
-        if type(other) == np.ndarray and other.ndim > 1:
-            assert len(self) == other.shape[0], "dimension mismatch"
-            return np.dot(self.toArray(), other)
+        if type(other) == np.ndarray:
+            if other.ndim > 1:
+                assert len(self) == other.shape[0], "dimension mismatch"
+            return np.dot(self.array, other)
         elif _have_scipy and scipy.sparse.issparse(other):
             assert len(self) == other.shape[0], "dimension mismatch"
             return other.transpose().dot(self.toArray())
@@ -261,7 +266,7 @@ class DenseVector(Vector):
         return np.dot(diff, diff)
 
     def toArray(self):
-        return np.array(self.array)
+        return self.array
 
     def __getitem__(self, item):
         return self.array[item]
@@ -276,7 +281,7 @@ class DenseVector(Vector):
         return "DenseVector([%s])" % (', '.join(_format_float(i) for i in self.array))
 
     def __eq__(self, other):
-        return isinstance(other, DenseVector) and self.array == other.array
+        return isinstance(other, DenseVector) and np.array_equal(self.array, other.array)
 
     def __ne__(self, other):
         return not self == other
@@ -314,18 +319,28 @@ class SparseVector(Vector):
             if type(pairs) == dict:
                 pairs = pairs.items()
             pairs = sorted(pairs)
-            self.indices = array.array('i', [p[0] for p in pairs])
-            self.values = array.array('d', [p[1] for p in pairs])
+            self.indices = np.array([p[0] for p in pairs], dtype=np.int32)
+            self.values = np.array([p[1] for p in pairs], dtype=np.float64)
         else:
-            assert len(args[0]) == len(args[1]), "index and value arrays not same length"
-            self.indices = array.array('i', args[0])
-            self.values = array.array('d', args[1])
+            if isinstance(args[0], basestring):
+                assert isinstance(args[1], str), "values should be string too"
+                if args[0]:
+                    self.indices = np.frombuffer(args[0], np.int32)
+                    self.values = np.frombuffer(args[1], np.float64)
+                else:
+                    # np.frombuffer() doesn't work well with empty string in older version
+                    self.indices = np.array([], dtype=np.int32)
+                    self.values = np.array([], dtype=np.float64)
+            else:
+                self.indices = np.array(args[0], dtype=np.int32)
+                self.values = np.array(args[1], dtype=np.float64)
+            assert len(self.indices) == len(self.values), "index and value arrays not same length"
             for i in xrange(len(self.indices) - 1):
                 if self.indices[i] >= self.indices[i + 1]:
                     raise TypeError("indices array must be sorted")
 
     def __reduce__(self):
-        return (SparseVector, (self.size, self.indices, self.values))
+        return (SparseVector, (self.size, self.indices.tostring(), self.values.tostring()))
 
     def dot(self, other):
         """
@@ -461,8 +476,7 @@ class SparseVector(Vector):
         Returns a copy of this SparseVector as a 1-dimensional NumPy array.
         """
         arr = np.zeros((self.size,), dtype=np.float64)
-        for i in xrange(len(self.indices)):
-            arr[self.indices[i]] = self.values[i]
+        arr[self.indices] = self.values
         return arr
 
     def __len__(self):
@@ -493,8 +507,8 @@ class SparseVector(Vector):
         """
         return (isinstance(other, self.__class__)
                 and other.size == self.size
-                and other.indices == self.indices
-                and other.values == self.values)
+                and np.array_equal(other.indices, self.indices)
+                and np.array_equal(other.values, self.values))
 
     def __ne__(self, other):
         return not self.__eq__(other)
@@ -577,25 +591,34 @@ class DenseMatrix(Matrix):
     """
     def __init__(self, numRows, numCols, values):
         Matrix.__init__(self, numRows, numCols)
+        if isinstance(values, basestring):
+            values = np.frombuffer(values, dtype=np.float64)
+        elif not isinstance(values, np.ndarray):
+            values = np.array(values, dtype=np.float64)
         assert len(values) == numRows * numCols
-        if not isinstance(values, array.array):
-            values = array.array('d', values)
+        if values.dtype != np.float64:
+            values.astype(np.float64)
         self.values = values
 
     def __reduce__(self):
-        return DenseMatrix, (self.numRows, self.numCols, self.values)
+        return DenseMatrix, (self.numRows, self.numCols, self.values.tostring())
 
     def toArray(self):
         """
         Return an numpy.ndarray
 
-        >>> arr = array.array('d', [float(i) for i in range(4)])
-        >>> m = DenseMatrix(2, 2, arr)
+        >>> m = DenseMatrix(2, 2, range(4))
         >>> m.toArray()
         array([[ 0.,  2.],
                [ 1.,  3.]])
         """
-        return np.reshape(self.values, (self.numRows, self.numCols), order='F')
+        return self.values.reshape((self.numRows, self.numCols), order='F')
+
+    def __eq__(self, other):
+        return (isinstance(other, DenseMatrix) and
+                self.numRows == other.numRows and
+                self.numCols == other.numCols and
+                all(self.values == other.values))
 
 
 class Matrices(object):

http://git-wip-us.apache.org/repos/asf/spark/blob/b660de7a/python/pyspark/mllib/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/tests.py b/python/pyspark/mllib/tests.py
index 9fa4d6f..8332f8e 100644
--- a/python/pyspark/mllib/tests.py
+++ b/python/pyspark/mllib/tests.py
@@ -33,7 +33,8 @@ if sys.version_info[:2] <= (2, 6):
 else:
     import unittest
 
-from pyspark.mllib.linalg import Vector, SparseVector, DenseVector, VectorUDT, _convert_to_vector
+from pyspark.mllib.linalg import Vector, SparseVector, DenseVector, VectorUDT, _convert_to_vector,\
+    DenseMatrix
 from pyspark.mllib.regression import LabeledPoint
 from pyspark.mllib.random import RandomRDDs
 from pyspark.mllib.stat import Statistics
@@ -62,6 +63,7 @@ def _squared_distance(a, b):
 class VectorTests(PySparkTestCase):
 
     def _test_serialize(self, v):
+        self.assertEqual(v, ser.loads(ser.dumps(v)))
         jvec = self.sc._jvm.SerDe.loads(bytearray(ser.dumps(v)))
         nv = ser.loads(str(self.sc._jvm.SerDe.dumps(jvec)))
         self.assertEqual(v, nv)
@@ -75,6 +77,8 @@ class VectorTests(PySparkTestCase):
         self._test_serialize(DenseVector(array([1., 2., 3., 4.])))
         self._test_serialize(DenseVector(pyarray.array('d', range(10))))
         self._test_serialize(SparseVector(4, {1: 1, 3: 2}))
+        self._test_serialize(SparseVector(3, {}))
+        self._test_serialize(DenseMatrix(2, 3, range(6)))
 
     def test_dot(self):
         sv = SparseVector(4, {1: 1, 3: 2})


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org