You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2013/12/26 07:31:18 UTC

[01/28] git commit: First cut at python mllib bindings. Only LinearRegression is supported.

Updated Branches:
  refs/heads/master 56094bcd8 -> c344ed04c


First cut at python mllib bindings.  Only LinearRegression is supported.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/95915f8b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/95915f8b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/95915f8b

Branch: refs/heads/master
Commit: 95915f8b3b6d07a9dddb09a637aa23c8622bff9b
Parents: d3b1af4
Author: Tor Myklebust <tm...@gmail.com>
Authored: Thu Dec 19 01:22:18 2013 -0500
Committer: Tor Myklebust <tm...@gmail.com>
Committed: Thu Dec 19 01:29:09 2013 -0500

----------------------------------------------------------------------
 .../apache/spark/mllib/api/PythonMLLibAPI.scala |  51 +++++++++
 python/pyspark/mllib.py                         | 114 +++++++++++++++++++
 2 files changed, 165 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/95915f8b/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala
new file mode 100644
index 0000000..19d2e9a
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala
@@ -0,0 +1,51 @@
+import org.apache.spark.api.java.JavaRDD
+import org.apache.spark.mllib.regression._
+import java.nio.ByteBuffer
+import java.nio.ByteOrder
+import java.nio.DoubleBuffer
+
+class PythonMLLibAPI extends Serializable {
+  def deserializeDoubleVector(bytes: Array[Byte]): Array[Double] = {
+    val packetLength = bytes.length;
+    if (packetLength < 16) {
+      throw new IllegalArgumentException("Byte array too short.");
+    }
+    val bb = ByteBuffer.wrap(bytes);
+    bb.order(ByteOrder.nativeOrder());
+    val magic = bb.getLong();
+    if (magic != 1) {
+      throw new IllegalArgumentException("Magic " + magic + " is wrong.");
+    }
+    val length = bb.getLong();
+    if (packetLength != 16 + 8 * length) {
+      throw new IllegalArgumentException("Length " + length + "is wrong.");
+    }
+    val db = bb.asDoubleBuffer();
+    val ans = new Array[Double](length.toInt);
+    db.get(ans);
+    return ans;
+  }
+
+  def serializeDoubleVector(doubles: Array[Double]): Array[Byte] = {
+    val len = doubles.length;
+    val bytes = new Array[Byte](16 + 8 * len);
+    val bb = ByteBuffer.wrap(bytes);
+    bb.order(ByteOrder.nativeOrder());
+    bb.putLong(1);
+    bb.putLong(len);
+    val db = bb.asDoubleBuffer();
+    db.put(doubles);
+    return bytes;
+  }
+
+  def trainLinearRegressionModel(dataBytesJRDD: JavaRDD[Array[Byte]]):
+      java.util.List[java.lang.Object] = {
+    val data = dataBytesJRDD.rdd.map(x => deserializeDoubleVector(x))
+        .map(v => LabeledPoint(v(0), v.slice(1, v.length)));
+    val model = LinearRegressionWithSGD.train(data, 222);
+    val ret = new java.util.LinkedList[java.lang.Object]();
+    ret.add(serializeDoubleVector(model.weights));
+    ret.add(model.intercept: java.lang.Double);
+    return ret;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/95915f8b/python/pyspark/mllib.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib.py b/python/pyspark/mllib.py
new file mode 100644
index 0000000..8237f66
--- /dev/null
+++ b/python/pyspark/mllib.py
@@ -0,0 +1,114 @@
+from numpy import *;
+from pyspark.serializers import NoOpSerializer, FramedSerializer, \
+    BatchedSerializer, CloudPickleSerializer, pack_long
+
+#__all__ = ["train_linear_regression_model"];
+
+# Double vector format:
+#
+# [8-byte 1] [8-byte length] [length*8 bytes of data]
+#
+# Double matrix format:
+#
+# [8-byte 2] [8-byte rows] [8-byte cols] [rows*cols*8 bytes of data]
+# 
+# This is all in machine-endian.  That means that the Java interpreter and the
+# Python interpreter must agree on what endian the machine is.
+
+def deserialize_byte_array(shape, ba, offset):
+  """Implementation detail.  Do not use directly."""
+  ar = ndarray(shape=shape, buffer=ba, offset=offset, dtype="float64", \
+      order='C');
+  return ar.copy();
+
+def serialize_double_vector(v):
+  """Implementation detail.  Do not use directly."""
+  if (type(v) == ndarray and v.dtype == float64 and v.ndim == 1):
+    length = v.shape[0];
+    ba = bytearray(16 + 8*length);
+    header = ndarray(shape=[2], buffer=ba, dtype="int64");
+    header[0] = 1;
+    header[1] = length;
+    copyto(ndarray(shape=[length], buffer=ba, offset=16, dtype="float64"), v);
+    return ba;
+  else:
+    raise TypeError("serialize_double_vector called on a non-double-vector");
+
+def deserialize_double_vector(ba):
+  """Implementation detail.  Do not use directly."""
+  if (type(ba) == bytearray and len(ba) >= 16 and (len(ba) & 7 == 0)):
+    header = ndarray(shape=[2], buffer=ba, dtype="int64");
+    if (header[0] != 1):
+      raise TypeError("deserialize_double_vector called on bytearray with " \
+                      "wrong magic");
+    length = header[1];
+    if (len(ba) != 8*length + 16):
+      raise TypeError("deserialize_double_vector called on bytearray with " \
+                      "wrong length");
+    return deserialize_byte_array([length], ba, 16);
+  else:
+    raise TypeError("deserialize_double_vector called on a non-bytearray");
+
+def serialize_double_matrix(m):
+  """Implementation detail.  Do not use directly."""
+  if (type(m) == ndarray and m.dtype == float64 and m.ndim == 2):
+    rows = m.shape[0];
+    cols = m.shape[1];
+    ba = bytearray(24 + 8 * rows * cols);
+    header = ndarray(shape=[3], buffer=ba, dtype="int64");
+    header[0] = 2;
+    header[1] = rows;
+    header[2] = cols;
+    copyto(ndarray(shape=[rows, cols], buffer=ba, offset=24, dtype="float64", \
+        order='C'), m);
+    return ba;
+  else:
+    print type(m);
+    print m.dtype;
+    print m.ndim;
+    raise TypeError("serialize_double_matrix called on a non-double-matrix");
+
+def deserialize_double_matrix(ba):
+  """Implementation detail.  Do not use directly."""
+  if (type(ba) == bytearray and len(ba) >= 24 and (len(ba) & 7 == 0)):
+    header = ndarray(shape=[3], buffer=ba, dtype="int64");
+    if (header[0] != 2):
+      raise TypeError("deserialize_double_matrix called on bytearray with " \
+                      "wrong magic");
+    rows = header[1];
+    cols = header[2];
+    if (len(ba) != 8*rows*cols + 24):
+      raise TypeError("deserialize_double_matrix called on bytearray with " \
+                      "wrong length");
+    return deserialize_byte_array([rows, cols], ba, 24);
+  else:
+    raise TypeError("deserialize_double_matrix called on a non-bytearray");
+
+class LinearRegressionModel:
+  _coeff = None;
+  _intercept = None;
+  def __init__(self, coeff, intercept):
+    self._coeff = coeff;
+    self._intercept = intercept;
+  def predict(self, x):
+    if (type(x) == ndarray):
+      if (x.ndim == 1):
+        return dot(_coeff, x) - _intercept;
+      else:
+        raise RuntimeError("Bulk predict not yet supported.");
+    elif (type(x) == RDD):
+      raise RuntimeError("Bulk predict not yet supported.");
+    else:
+      raise TypeError("Bad type argument to LinearRegressionModel::predict");
+
+def train_linear_regression_model(sc, data):
+  """Train a linear regression model on the given data."""
+  dataBytes = data.map(serialize_double_vector);
+  sc.serializer = NoOpSerializer();
+  dataBytes.cache();
+  api = sc._jvm.PythonMLLibAPI();
+  ans = api.trainLinearRegressionModel(dataBytes._jrdd);
+  if (len(ans) != 2 or type(ans[0]) != bytearray or type(ans[1]) != float):
+    raise RuntimeError("train_linear_regression_model received garbage " \
+                       "from JVM");
+  return LinearRegressionModel(deserialize_double_vector(ans[0]), ans[1]);


[23/28] git commit: Split the mllib bindings into a whole bunch of modules and rename some things.

Posted by ma...@apache.org.
Split the mllib bindings into a whole bunch of modules and rename some things.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/05163057
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/05163057
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/05163057

Branch: refs/heads/master
Commit: 05163057a1810f0a32b722e8c93e5435240636d9
Parents: 86e38c4
Author: Tor Myklebust <tm...@gmail.com>
Authored: Wed Dec 25 00:08:05 2013 -0500
Committer: Tor Myklebust <tm...@gmail.com>
Committed: Wed Dec 25 00:08:05 2013 -0500

----------------------------------------------------------------------
 python/pyspark/__init__.py             |   7 +-
 python/pyspark/mllib.py                | 391 ----------------------------
 python/pyspark/mllib/__init__.py       |  46 ++++
 python/pyspark/mllib/_common.py        | 227 ++++++++++++++++
 python/pyspark/mllib/classification.py |  86 ++++++
 python/pyspark/mllib/clustering.py     |  79 ++++++
 python/pyspark/mllib/recommendation.py |  74 ++++++
 python/pyspark/mllib/regression.py     | 110 ++++++++
 8 files changed, 623 insertions(+), 397 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/05163057/python/pyspark/__init__.py
----------------------------------------------------------------------
diff --git a/python/pyspark/__init__.py b/python/pyspark/__init__.py
index 3d73d95..1f35f6f 100644
--- a/python/pyspark/__init__.py
+++ b/python/pyspark/__init__.py
@@ -42,11 +42,6 @@ from pyspark.context import SparkContext
 from pyspark.rdd import RDD
 from pyspark.files import SparkFiles
 from pyspark.storagelevel import StorageLevel
-from pyspark.mllib import LinearRegressionModel, LassoModel, \
-    RidgeRegressionModel, LogisticRegressionModel, SVMModel, KMeansModel, \
-    ALSModel
 
 
-__all__ = ["SparkContext", "RDD", "SparkFiles", "StorageLevel",
-    "LinearRegressionModel", "LassoModel", "RidgeRegressionModel",
-    "LogisticRegressionModel", "SVMModel", "KMeansModel", "ALSModel"];
+__all__ = ["SparkContext", "RDD", "SparkFiles", "StorageLevel"]

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/05163057/python/pyspark/mllib.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib.py b/python/pyspark/mllib.py
deleted file mode 100644
index 46f368b..0000000
--- a/python/pyspark/mllib.py
+++ /dev/null
@@ -1,391 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from numpy import *
-from pyspark import SparkContext
-
-# Double vector format:
-#
-# [8-byte 1] [8-byte length] [length*8 bytes of data]
-#
-# Double matrix format:
-#
-# [8-byte 2] [8-byte rows] [8-byte cols] [rows*cols*8 bytes of data]
-#
-# This is all in machine-endian.  That means that the Java interpreter and the
-# Python interpreter must agree on what endian the machine is.
-
-def _deserialize_byte_array(shape, ba, offset):
-    """Wrapper around ndarray aliasing hack.
-
-    >>> x = array([1.0, 2.0, 3.0, 4.0, 5.0])
-    >>> array_equal(x, _deserialize_byte_array(x.shape, x.data, 0))
-    True
-    >>> x = array([1.0, 2.0, 3.0, 4.0]).reshape(2,2)
-    >>> array_equal(x, _deserialize_byte_array(x.shape, x.data, 0))
-    True
-    """
-    ar = ndarray(shape=shape, buffer=ba, offset=offset, dtype="float64",
-            order='C')
-    return ar.copy()
-
-def _serialize_double_vector(v):
-    """Serialize a double vector into a mutually understood format."""
-    if type(v) != ndarray:
-        raise TypeError("_serialize_double_vector called on a %s; "
-                "wanted ndarray" % type(v))
-    if v.dtype != float64:
-        raise TypeError("_serialize_double_vector called on an ndarray of %s; "
-                "wanted ndarray of float64" % v.dtype)
-    if v.ndim != 1:
-        raise TypeError("_serialize_double_vector called on a %ddarray; "
-                "wanted a 1darray" % v.ndim)
-    length = v.shape[0]
-    ba = bytearray(16 + 8*length)
-    header = ndarray(shape=[2], buffer=ba, dtype="int64")
-    header[0] = 1
-    header[1] = length
-    copyto(ndarray(shape=[length], buffer=ba, offset=16,
-            dtype="float64"), v)
-    return ba
-
-def _deserialize_double_vector(ba):
-    """Deserialize a double vector from a mutually understood format.
-
-    >>> x = array([1.0, 2.0, 3.0, 4.0, -1.0, 0.0, -0.0])
-    >>> array_equal(x, _deserialize_double_vector(_serialize_double_vector(x)))
-    True
-    """
-    if type(ba) != bytearray:
-        raise TypeError("_deserialize_double_vector called on a %s; "
-                "wanted bytearray" % type(ba))
-    if len(ba) < 16:
-        raise TypeError("_deserialize_double_vector called on a %d-byte array, "
-                "which is too short" % len(ba))
-    if (len(ba) & 7) != 0:
-        raise TypeError("_deserialize_double_vector called on a %d-byte array, "
-                "which is not a multiple of 8" % len(ba))
-    header = ndarray(shape=[2], buffer=ba, dtype="int64")
-    if header[0] != 1:
-        raise TypeError("_deserialize_double_vector called on bytearray "
-                        "with wrong magic")
-    length = header[1]
-    if len(ba) != 8*length + 16:
-        raise TypeError("_deserialize_double_vector called on bytearray "
-                        "with wrong length")
-    return _deserialize_byte_array([length], ba, 16)
-
-def _serialize_double_matrix(m):
-    """Serialize a double matrix into a mutually understood format."""
-    if (type(m) == ndarray and m.dtype == float64 and m.ndim == 2):
-        rows = m.shape[0]
-        cols = m.shape[1]
-        ba = bytearray(24 + 8 * rows * cols)
-        header = ndarray(shape=[3], buffer=ba, dtype="int64")
-        header[0] = 2
-        header[1] = rows
-        header[2] = cols
-        copyto(ndarray(shape=[rows, cols], buffer=ba, offset=24,
-                       dtype="float64", order='C'), m)
-        return ba
-    else:
-        raise TypeError("_serialize_double_matrix called on a "
-                        "non-double-matrix")
-
-def _deserialize_double_matrix(ba):
-    """Deserialize a double matrix from a mutually understood format."""
-    if type(ba) != bytearray:
-        raise TypeError("_deserialize_double_matrix called on a %s; "
-                "wanted bytearray" % type(ba))
-    if len(ba) < 24:
-        raise TypeError("_deserialize_double_matrix called on a %d-byte array, "
-                "which is too short" % len(ba))
-    if (len(ba) & 7) != 0:
-        raise TypeError("_deserialize_double_matrix called on a %d-byte array, "
-                "which is not a multiple of 8" % len(ba))
-    header = ndarray(shape=[3], buffer=ba, dtype="int64")
-    if (header[0] != 2):
-        raise TypeError("_deserialize_double_matrix called on bytearray "
-                        "with wrong magic")
-    rows = header[1]
-    cols = header[2]
-    if (len(ba) != 8*rows*cols + 24):
-        raise TypeError("_deserialize_double_matrix called on bytearray "
-                        "with wrong length")
-    return _deserialize_byte_array([rows, cols], ba, 24)
-
-def _linear_predictor_typecheck(x, coeffs):
-    """Check that x is a one-dimensional vector of the right shape.
-    This is a temporary hackaround until I actually implement bulk predict."""
-    if type(x) == ndarray:
-        if x.ndim == 1:
-            if x.shape == coeffs.shape:
-                pass
-            else:
-                raise RuntimeError("Got array of %d elements; wanted %d"
-                        % shape(x)[0] % shape(coeffs)[0])
-        else:
-            raise RuntimeError("Bulk predict not yet supported.")
-    elif (type(x) == RDD):
-        raise RuntimeError("Bulk predict not yet supported.")
-    else:
-        raise TypeError("Argument of type " + type(x) + " unsupported")
-
-class LinearModel(object):
-    """Something that has a vector of coefficients and an intercept."""
-    def __init__(self, coeff, intercept):
-        self._coeff = coeff
-        self._intercept = intercept
-
-class LinearRegressionModelBase(LinearModel):
-    """A linear regression model.
-
-    >>> lrmb = LinearRegressionModelBase(array([1.0, 2.0]), 0.1)
-    >>> abs(lrmb.predict(array([-1.03, 7.777])) - 14.624) < 1e-6
-    True
-    """
-    def predict(self, x):
-        """Predict the value of the dependent variable given a vector x"""
-        """containing values for the independent variables."""
-        _linear_predictor_typecheck(x, self._coeff)
-        return dot(self._coeff, x) + self._intercept
-
-def _get_unmangled_rdd(data, serializer):
-    dataBytes = data.map(serializer)
-    dataBytes._bypass_serializer = True
-    dataBytes.cache()
-    return dataBytes
-
-# Map a pickled Python RDD of numpy double vectors to a Java RDD of
-# _serialized_double_vectors
-def _get_unmangled_double_vector_rdd(data):
-    return _get_unmangled_rdd(data, _serialize_double_vector)
-
-# If we weren't given initial weights, take a zero vector of the appropriate
-# length.
-def _get_initial_weights(initial_weights, data):
-    if initial_weights is None:
-        initial_weights = data.first()
-        if type(initial_weights) != ndarray:
-            raise TypeError("At least one data element has type "
-                    + type(initial_weights) + " which is not ndarray")
-        if initial_weights.ndim != 1:
-            raise TypeError("At least one data element has "
-                    + initial_weights.ndim + " dimensions, which is not 1")
-        initial_weights = zeros([initial_weights.shape[0] - 1])
-    return initial_weights
-
-# train_func should take two parameters, namely data and initial_weights, and
-# return the result of a call to the appropriate JVM stub.
-# _regression_train_wrapper is responsible for setup and error checking.
-def _regression_train_wrapper(sc, train_func, klass, data, initial_weights):
-    initial_weights = _get_initial_weights(initial_weights, data)
-    dataBytes = _get_unmangled_double_vector_rdd(data)
-    ans = train_func(dataBytes, _serialize_double_vector(initial_weights))
-    if len(ans) != 2:
-        raise RuntimeError("JVM call result had unexpected length")
-    elif type(ans[0]) != bytearray:
-        raise RuntimeError("JVM call result had first element of type "
-                + type(ans[0]) + " which is not bytearray")
-    elif type(ans[1]) != float:
-        raise RuntimeError("JVM call result had second element of type "
-                + type(ans[0]) + " which is not float")
-    return klass(_deserialize_double_vector(ans[0]), ans[1])
-
-class LinearRegressionModel(LinearRegressionModelBase):
-    """A linear regression model derived from a least-squares fit.
-
-    >>> data = array([0.0, 0.0, 1.0, 1.0, 3.0, 2.0, 2.0, 3.0]).reshape(4,2)
-    >>> lrm = LinearRegressionModel.train(sc, sc.parallelize(data), initial_weights=array([1.0]))
-    """
-    @classmethod
-    def train(cls, sc, data, iterations=100, step=1.0,
-              mini_batch_fraction=1.0, initial_weights=None):
-        """Train a linear regression model on the given data."""
-        return _regression_train_wrapper(sc, lambda d, i:
-                sc._jvm.PythonMLLibAPI().trainLinearRegressionModel(
-                        d._jrdd, iterations, step, mini_batch_fraction, i),
-                LinearRegressionModel, data, initial_weights)
-
-class LassoModel(LinearRegressionModelBase):
-    """A linear regression model derived from a least-squares fit with an
-    l_1 penalty term.
-
-    >>> data = array([0.0, 0.0, 1.0, 1.0, 3.0, 2.0, 2.0, 3.0]).reshape(4,2)
-    >>> lrm = LassoModel.train(sc, sc.parallelize(data), initial_weights=array([1.0]))
-    """
-    @classmethod
-    def train(cls, sc, data, iterations=100, step=1.0, reg_param=1.0,
-              mini_batch_fraction=1.0, initial_weights=None):
-        """Train a Lasso regression model on the given data."""
-        return _regression_train_wrapper(sc, lambda d, i:
-                sc._jvm.PythonMLLibAPI().trainLassoModel(d._jrdd,
-                        iterations, step, reg_param, mini_batch_fraction, i),
-                LassoModel, data, initial_weights)
-
-class RidgeRegressionModel(LinearRegressionModelBase):
-    """A linear regression model derived from a least-squares fit with an
-    l_2 penalty term.
-
-    >>> data = array([0.0, 0.0, 1.0, 1.0, 3.0, 2.0, 2.0, 3.0]).reshape(4,2)
-    >>> lrm = RidgeRegressionModel.train(sc, sc.parallelize(data), initial_weights=array([1.0]))
-    """
-    @classmethod
-    def train(cls, sc, data, iterations=100, step=1.0, reg_param=1.0,
-              mini_batch_fraction=1.0, initial_weights=None):
-        """Train a ridge regression model on the given data."""
-        return _regression_train_wrapper(sc, lambda d, i:
-                sc._jvm.PythonMLLibAPI().trainRidgeModel(d._jrdd,
-                        iterations, step, reg_param, mini_batch_fraction, i),
-                RidgeRegressionModel, data, initial_weights)
-
-class LogisticRegressionModel(LinearModel):
-    """A linear binary classification model derived from logistic regression.
-
-    >>> data = array([0.0, 0.0, 1.0, 1.0, 1.0, 2.0, 1.0, 3.0]).reshape(4,2)
-    >>> lrm = LogisticRegressionModel.train(sc, sc.parallelize(data))
-    """
-    def predict(self, x):
-        _linear_predictor_typecheck(x, _coeff)
-        margin = dot(x, _coeff) + intercept
-        prob = 1/(1 + exp(-margin))
-        return 1 if prob > 0.5 else 0
-
-    @classmethod
-    def train(cls, sc, data, iterations=100, step=1.0,
-              mini_batch_fraction=1.0, initial_weights=None):
-        """Train a logistic regression model on the given data."""
-        return _regression_train_wrapper(sc, lambda d, i:
-                sc._jvm.PythonMLLibAPI().trainLogisticRegressionModel(d._jrdd,
-                        iterations, step, mini_batch_fraction, i),
-                LogisticRegressionModel, data, initial_weights)
-
-class SVMModel(LinearModel):
-    """A support vector machine.
-
-    >>> data = array([0.0, 0.0, 1.0, 1.0, 1.0, 2.0, 1.0, 3.0]).reshape(4,2)
-    >>> svm = SVMModel.train(sc, sc.parallelize(data))
-    """
-    def predict(self, x):
-        _linear_predictor_typecheck(x, _coeff)
-        margin = dot(x, _coeff) + intercept
-        return 1 if margin >= 0 else 0
-    @classmethod
-    def train(cls, sc, data, iterations=100, step=1.0, reg_param=1.0,
-              mini_batch_fraction=1.0, initial_weights=None):
-        """Train a support vector machine on the given data."""
-        return _regression_train_wrapper(sc, lambda d, i:
-                sc._jvm.PythonMLLibAPI().trainSVMModel(d._jrdd,
-                        iterations, step, reg_param, mini_batch_fraction, i),
-                SVMModel, data, initial_weights)
-
-class KMeansModel(object):
-    """A clustering model derived from the k-means method.
-
-    >>> data = array([0.0,0.0, 1.0,1.0, 9.0,8.0, 8.0,9.0]).reshape(4,2)
-    >>> clusters = KMeansModel.train(sc, sc.parallelize(data), 2, maxIterations=10, runs=30, initialization_mode="random")
-    >>> clusters.predict(array([0.0, 0.0])) == clusters.predict(array([1.0, 1.0]))
-    True
-    >>> clusters.predict(array([8.0, 9.0])) == clusters.predict(array([9.0, 8.0]))
-    True
-    >>> clusters = KMeansModel.train(sc, sc.parallelize(data), 2)
-    """
-    def __init__(self, centers_):
-        self.centers = centers_
-
-    def predict(self, x):
-        """Find the cluster to which x belongs in this model."""
-        best = 0
-        best_distance = 1e75
-        for i in range(0, self.centers.shape[0]):
-            diff = x - self.centers[i]
-            distance = sqrt(dot(diff, diff))
-            if distance < best_distance:
-                best = i
-                best_distance = distance
-        return best
-
-    @classmethod
-    def train(cls, sc, data, k, maxIterations=100, runs=1,
-            initialization_mode="k-means||"):
-        """Train a k-means clustering model."""
-        dataBytes = _get_unmangled_double_vector_rdd(data)
-        ans = sc._jvm.PythonMLLibAPI().trainKMeansModel(dataBytes._jrdd,
-                k, maxIterations, runs, initialization_mode)
-        if len(ans) != 1:
-            raise RuntimeError("JVM call result had unexpected length")
-        elif type(ans[0]) != bytearray:
-            raise RuntimeError("JVM call result had first element of type "
-                    + type(ans[0]) + " which is not bytearray")
-        return KMeansModel(_deserialize_double_matrix(ans[0]))
-
-def _serialize_rating(r):
-    ba = bytearray(16)
-    intpart = ndarray(shape=[2], buffer=ba, dtype=int32)
-    doublepart = ndarray(shape=[1], buffer=ba, dtype=float64, offset=8)
-    intpart[0], intpart[1], doublepart[0] = r
-    return ba
-
-class ALSModel(object):
-    """A matrix factorisation model trained by regularized alternating
-    least-squares.
-
-    >>> r1 = (1, 1, 1.0)
-    >>> r2 = (1, 2, 2.0)
-    >>> r3 = (2, 1, 2.0)
-    >>> ratings = sc.parallelize([r1, r2, r3])
-    >>> model = ALSModel.trainImplicit(sc, ratings, 1)
-    >>> model.predict(2,2) is not None
-    True
-    """
-
-    def __init__(self, sc, java_model):
-        self._context = sc
-        self._java_model = java_model
-
-    def __del__(self):
-        self._context._gateway.detach(self._java_model)
-
-    def predict(self, user, product):
-        return self._java_model.predict(user, product)
-
-    @classmethod
-    def train(cls, sc, ratings, rank, iterations=5, lambda_=0.01, blocks=-1):
-        ratingBytes = _get_unmangled_rdd(ratings, _serialize_rating)
-        mod = sc._jvm.PythonMLLibAPI().trainALSModel(ratingBytes._jrdd,
-                rank, iterations, lambda_, blocks)
-        return ALSModel(sc, mod)
-
-    @classmethod
-    def trainImplicit(cls, sc, ratings, rank, iterations=5, lambda_=0.01, blocks=-1, alpha=0.01):
-        ratingBytes = _get_unmangled_rdd(ratings, _serialize_rating)
-        mod = sc._jvm.PythonMLLibAPI().trainImplicitALSModel(ratingBytes._jrdd,
-                rank, iterations, lambda_, blocks, alpha)
-        return ALSModel(sc, mod)
-
-def _test():
-    import doctest
-    globs = globals().copy()
-    globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
-    (failure_count, test_count) = doctest.testmod(globs=globs,
-            optionflags=doctest.ELLIPSIS)
-    globs['sc'].stop()
-    if failure_count:
-        exit(-1)
-
-if __name__ == "__main__":
-    _test()

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/05163057/python/pyspark/mllib/__init__.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/__init__.py b/python/pyspark/mllib/__init__.py
new file mode 100644
index 0000000..6037a3a
--- /dev/null
+++ b/python/pyspark/mllib/__init__.py
@@ -0,0 +1,46 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+PySpark is the Python API for Spark.
+
+Public classes:
+
+    - L{SparkContext<pyspark.context.SparkContext>}
+        Main entry point for Spark functionality.
+    - L{RDD<pyspark.rdd.RDD>}
+        A Resilient Distributed Dataset (RDD), the basic abstraction in Spark.
+    - L{Broadcast<pyspark.broadcast.Broadcast>}
+        A broadcast variable that gets reused across tasks.
+    - L{Accumulator<pyspark.accumulators.Accumulator>}
+        An "add-only" shared variable that tasks can only add values to.
+    - L{SparkFiles<pyspark.files.SparkFiles>}
+        Access files shipped with jobs.
+    - L{StorageLevel<pyspark.storagelevel.StorageLevel>}
+        Finer-grained cache persistence levels.
+"""
+import sys
+import os
+sys.path.insert(0, os.path.join(os.environ["SPARK_HOME"], "python/lib/py4j0.7.egg"))
+
+from pyspark.mllib.regression import LinearRegressionModel, LassoModel, RidgeRegressionModel, LinearRegressionWithSGD, LassoWithSGD, RidgeRegressionWithSGD
+from pyspark.mllib.classification import LogisticRegressionModel, SVMModel, LogisticRegressionWithSGD, SVMWithSGD
+from pyspark.mllib.recommendation import MatrixFactorizationModel, ALS
+from pyspark.mllib.clustering import KMeansModel, KMeans
+
+
+__all__ = ["LinearRegressionModel", "LassoModel", "RidgeRegressionModel", "LinearRegressionWithSGD", "LassoWithSGD", "RidgeRegressionWithSGD", "LogisticRegressionModel", "SVMModel", "LogisticRegressionWithSGD", "SVMWithSGD", "MatrixFactorizationModel", "ALS", "KMeansModel", "KMeans"]

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/05163057/python/pyspark/mllib/_common.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/_common.py b/python/pyspark/mllib/_common.py
new file mode 100644
index 0000000..e68bd8a
--- /dev/null
+++ b/python/pyspark/mllib/_common.py
@@ -0,0 +1,227 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from numpy import ndarray, copyto, float64, int64, int32, zeros, array_equal, array, dot, shape
+from pyspark import SparkContext
+
+# Double vector format:
+#
+# [8-byte 1] [8-byte length] [length*8 bytes of data]
+#
+# Double matrix format:
+#
+# [8-byte 2] [8-byte rows] [8-byte cols] [rows*cols*8 bytes of data]
+#
+# This is all in machine-endian.  That means that the Java interpreter and the
+# Python interpreter must agree on what endian the machine is.
+
+def _deserialize_byte_array(shape, ba, offset):
+    """Wrapper around ndarray aliasing hack.
+
+    >>> x = array([1.0, 2.0, 3.0, 4.0, 5.0])
+    >>> array_equal(x, _deserialize_byte_array(x.shape, x.data, 0))
+    True
+    >>> x = array([1.0, 2.0, 3.0, 4.0]).reshape(2,2)
+    >>> array_equal(x, _deserialize_byte_array(x.shape, x.data, 0))
+    True
+    """
+    ar = ndarray(shape=shape, buffer=ba, offset=offset, dtype="float64",
+            order='C')
+    return ar.copy()
+
+def _serialize_double_vector(v):
+    """Serialize a double vector into a mutually understood format."""
+    if type(v) != ndarray:
+        raise TypeError("_serialize_double_vector called on a %s; "
+                "wanted ndarray" % type(v))
+    if v.dtype != float64:
+        raise TypeError("_serialize_double_vector called on an ndarray of %s; "
+                "wanted ndarray of float64" % v.dtype)
+    if v.ndim != 1:
+        raise TypeError("_serialize_double_vector called on a %ddarray; "
+                "wanted a 1darray" % v.ndim)
+    length = v.shape[0]
+    ba = bytearray(16 + 8*length)
+    header = ndarray(shape=[2], buffer=ba, dtype="int64")
+    header[0] = 1
+    header[1] = length
+    copyto(ndarray(shape=[length], buffer=ba, offset=16,
+            dtype="float64"), v)
+    return ba
+
+def _deserialize_double_vector(ba):
+    """Deserialize a double vector from a mutually understood format.
+
+    >>> x = array([1.0, 2.0, 3.0, 4.0, -1.0, 0.0, -0.0])
+    >>> array_equal(x, _deserialize_double_vector(_serialize_double_vector(x)))
+    True
+    """
+    if type(ba) != bytearray:
+        raise TypeError("_deserialize_double_vector called on a %s; "
+                "wanted bytearray" % type(ba))
+    if len(ba) < 16:
+        raise TypeError("_deserialize_double_vector called on a %d-byte array, "
+                "which is too short" % len(ba))
+    if (len(ba) & 7) != 0:
+        raise TypeError("_deserialize_double_vector called on a %d-byte array, "
+                "which is not a multiple of 8" % len(ba))
+    header = ndarray(shape=[2], buffer=ba, dtype="int64")
+    if header[0] != 1:
+        raise TypeError("_deserialize_double_vector called on bytearray "
+                        "with wrong magic")
+    length = header[1]
+    if len(ba) != 8*length + 16:
+        raise TypeError("_deserialize_double_vector called on bytearray "
+                        "with wrong length")
+    return _deserialize_byte_array([length], ba, 16)
+
+def _serialize_double_matrix(m):
+    """Serialize a double matrix into a mutually understood format."""
+    if (type(m) == ndarray and m.dtype == float64 and m.ndim == 2):
+        rows = m.shape[0]
+        cols = m.shape[1]
+        ba = bytearray(24 + 8 * rows * cols)
+        header = ndarray(shape=[3], buffer=ba, dtype="int64")
+        header[0] = 2
+        header[1] = rows
+        header[2] = cols
+        copyto(ndarray(shape=[rows, cols], buffer=ba, offset=24,
+                       dtype="float64", order='C'), m)
+        return ba
+    else:
+        raise TypeError("_serialize_double_matrix called on a "
+                        "non-double-matrix")
+
+def _deserialize_double_matrix(ba):
+    """Deserialize a double matrix from a mutually understood format."""
+    if type(ba) != bytearray:
+        raise TypeError("_deserialize_double_matrix called on a %s; "
+                "wanted bytearray" % type(ba))
+    if len(ba) < 24:
+        raise TypeError("_deserialize_double_matrix called on a %d-byte array, "
+                "which is too short" % len(ba))
+    if (len(ba) & 7) != 0:
+        raise TypeError("_deserialize_double_matrix called on a %d-byte array, "
+                "which is not a multiple of 8" % len(ba))
+    header = ndarray(shape=[3], buffer=ba, dtype="int64")
+    if (header[0] != 2):
+        raise TypeError("_deserialize_double_matrix called on bytearray "
+                        "with wrong magic")
+    rows = header[1]
+    cols = header[2]
+    if (len(ba) != 8*rows*cols + 24):
+        raise TypeError("_deserialize_double_matrix called on bytearray "
+                        "with wrong length")
+    return _deserialize_byte_array([rows, cols], ba, 24)
+
+def _linear_predictor_typecheck(x, coeffs):
+    """Check that x is a one-dimensional vector of the right shape.
+    This is a temporary hackaround until I actually implement bulk predict."""
+    if type(x) == ndarray:
+        if x.ndim == 1:
+            if x.shape == coeffs.shape:
+                pass
+            else:
+                raise RuntimeError("Got array of %d elements; wanted %d"
+                        % (shape(x)[0], shape(coeffs)[0]))
+        else:
+            raise RuntimeError("Bulk predict not yet supported.")
+    elif (type(x) == RDD):
+        raise RuntimeError("Bulk predict not yet supported.")
+    else:
+        raise TypeError("Argument of type " + type(x) + " unsupported")
+
+def _get_unmangled_rdd(data, serializer):
+    dataBytes = data.map(serializer)
+    dataBytes._bypass_serializer = True
+    dataBytes.cache()
+    return dataBytes
+
+# Map a pickled Python RDD of numpy double vectors to a Java RDD of
+# _serialized_double_vectors
+def _get_unmangled_double_vector_rdd(data):
+    return _get_unmangled_rdd(data, _serialize_double_vector)
+
+class LinearModel(object):
+    """Something that has a vector of coefficients and an intercept."""
+    def __init__(self, coeff, intercept):
+        self._coeff = coeff
+        self._intercept = intercept
+
+class LinearRegressionModelBase(LinearModel):
+    """A linear regression model.
+
+    >>> lrmb = LinearRegressionModelBase(array([1.0, 2.0]), 0.1)
+    >>> abs(lrmb.predict(array([-1.03, 7.777])) - 14.624) < 1e-6
+    True
+    """
+    def predict(self, x):
+        """Predict the value of the dependent variable given a vector x"""
+        """containing values for the independent variables."""
+        _linear_predictor_typecheck(x, self._coeff)
+        return dot(self._coeff, x) + self._intercept
+
+# If we weren't given initial weights, take a zero vector of the appropriate
+# length.
+def _get_initial_weights(initial_weights, data):
+    if initial_weights is None:
+        initial_weights = data.first()
+        if type(initial_weights) != ndarray:
+            raise TypeError("At least one data element has type "
+                    + type(initial_weights) + " which is not ndarray")
+        if initial_weights.ndim != 1:
+            raise TypeError("At least one data element has "
+                    + initial_weights.ndim + " dimensions, which is not 1")
+        initial_weights = zeros([initial_weights.shape[0] - 1])
+    return initial_weights
+
+# train_func should take two parameters, namely data and initial_weights, and
+# return the result of a call to the appropriate JVM stub.
+# _regression_train_wrapper is responsible for setup and error checking.
+def _regression_train_wrapper(sc, train_func, klass, data, initial_weights):
+    initial_weights = _get_initial_weights(initial_weights, data)
+    dataBytes = _get_unmangled_double_vector_rdd(data)
+    ans = train_func(dataBytes, _serialize_double_vector(initial_weights))
+    if len(ans) != 2:
+        raise RuntimeError("JVM call result had unexpected length")
+    elif type(ans[0]) != bytearray:
+        raise RuntimeError("JVM call result had first element of type "
+                + type(ans[0]) + " which is not bytearray")
+    elif type(ans[1]) != float:
+        raise RuntimeError("JVM call result had second element of type "
+                + type(ans[0]) + " which is not float")
+    return klass(_deserialize_double_vector(ans[0]), ans[1])
+
+def _serialize_rating(r):
+    ba = bytearray(16)
+    intpart = ndarray(shape=[2], buffer=ba, dtype=int32)
+    doublepart = ndarray(shape=[1], buffer=ba, dtype=float64, offset=8)
+    intpart[0], intpart[1], doublepart[0] = r
+    return ba
+
+def _test():
+    import doctest
+    globs = globals().copy()
+    globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
+    (failure_count, test_count) = doctest.testmod(globs=globs,
+            optionflags=doctest.ELLIPSIS)
+    globs['sc'].stop()
+    if failure_count:
+        exit(-1)
+
+if __name__ == "__main__":
+    _test()

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/05163057/python/pyspark/mllib/classification.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/classification.py b/python/pyspark/mllib/classification.py
new file mode 100644
index 0000000..70de332
--- /dev/null
+++ b/python/pyspark/mllib/classification.py
@@ -0,0 +1,86 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from numpy import array, dot, shape
+from pyspark import SparkContext
+from pyspark.mllib._common import \
+    _get_unmangled_rdd, _get_unmangled_double_vector_rdd, \
+    _serialize_double_matrix, _deserialize_double_matrix, \
+    _serialize_double_vector, _deserialize_double_vector, \
+    _get_initial_weights, _serialize_rating, _regression_train_wrapper, \
+    LinearModel, _linear_predictor_typecheck
+from math import exp, log
+
+class LogisticRegressionModel(LinearModel):
+    """A linear binary classification model derived from logistic regression.
+
+    >>> data = array([0.0, 0.0, 1.0, 1.0, 1.0, 2.0, 1.0, 3.0]).reshape(4,2)
+    >>> lrm = LogisticRegressionWithSGD.train(sc, sc.parallelize(data))
+    >>> lrm.predict(array([1.0])) != None
+    True
+    """
+    def predict(self, x):
+        _linear_predictor_typecheck(x, self._coeff)
+        margin = dot(x, self._coeff) + self._intercept
+        prob = 1/(1 + exp(-margin))
+        return 1 if prob > 0.5 else 0
+
+class LogisticRegressionWithSGD(object):
+    @classmethod
+    def train(cls, sc, data, iterations=100, step=1.0,
+              mini_batch_fraction=1.0, initial_weights=None):
+        """Train a logistic regression model on the given data."""
+        return _regression_train_wrapper(sc, lambda d, i:
+                sc._jvm.PythonMLLibAPI().trainLogisticRegressionModelWithSGD(d._jrdd,
+                        iterations, step, mini_batch_fraction, i),
+                LogisticRegressionModel, data, initial_weights)
+
+class SVMModel(LinearModel):
+    """A support vector machine.
+
+    >>> data = array([0.0, 0.0, 1.0, 1.0, 1.0, 2.0, 1.0, 3.0]).reshape(4,2)
+    >>> svm = SVMWithSGD.train(sc, sc.parallelize(data))
+    >>> svm.predict(array([1.0])) != None
+    True
+    """
+    def predict(self, x):
+        _linear_predictor_typecheck(x, self._coeff)
+        margin = dot(x, self._coeff) + self._intercept
+        return 1 if margin >= 0 else 0
+
+class SVMWithSGD(object):
+    @classmethod
+    def train(cls, sc, data, iterations=100, step=1.0, reg_param=1.0,
+              mini_batch_fraction=1.0, initial_weights=None):
+        """Train a support vector machine on the given data."""
+        return _regression_train_wrapper(sc, lambda d, i:
+                sc._jvm.PythonMLLibAPI().trainSVMModelWithSGD(d._jrdd,
+                        iterations, step, reg_param, mini_batch_fraction, i),
+                SVMModel, data, initial_weights)
+
+def _test():
+    import doctest
+    globs = globals().copy()
+    globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
+    (failure_count, test_count) = doctest.testmod(globs=globs,
+            optionflags=doctest.ELLIPSIS)
+    globs['sc'].stop()
+    if failure_count:
+        exit(-1)
+
+if __name__ == "__main__":
+    _test()

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/05163057/python/pyspark/mllib/clustering.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py
new file mode 100644
index 0000000..8cf20e5
--- /dev/null
+++ b/python/pyspark/mllib/clustering.py
@@ -0,0 +1,79 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from numpy import array, dot
+from math import sqrt
+from pyspark import SparkContext
+from pyspark.mllib._common import \
+    _get_unmangled_rdd, _get_unmangled_double_vector_rdd, \
+    _serialize_double_matrix, _deserialize_double_matrix, \
+    _serialize_double_vector, _deserialize_double_vector, \
+    _get_initial_weights, _serialize_rating, _regression_train_wrapper
+
+class KMeansModel(object):
+    """A clustering model derived from the k-means method.
+
+    >>> data = array([0.0,0.0, 1.0,1.0, 9.0,8.0, 8.0,9.0]).reshape(4,2)
+    >>> clusters = KMeans.train(sc, sc.parallelize(data), 2, maxIterations=10, runs=30, initialization_mode="random")
+    >>> clusters.predict(array([0.0, 0.0])) == clusters.predict(array([1.0, 1.0]))
+    True
+    >>> clusters.predict(array([8.0, 9.0])) == clusters.predict(array([9.0, 8.0]))
+    True
+    >>> clusters = KMeans.train(sc, sc.parallelize(data), 2)
+    """
+    def __init__(self, centers_):
+        self.centers = centers_
+
+    def predict(self, x):
+        """Find the cluster to which x belongs in this model."""
+        best = 0
+        best_distance = 1e75
+        for i in range(0, self.centers.shape[0]):
+            diff = x - self.centers[i]
+            distance = sqrt(dot(diff, diff))
+            if distance < best_distance:
+                best = i
+                best_distance = distance
+        return best
+
+class KMeans(object):
+    @classmethod
+    def train(cls, sc, data, k, maxIterations=100, runs=1,
+            initialization_mode="k-means||"):
+        """Train a k-means clustering model."""
+        dataBytes = _get_unmangled_double_vector_rdd(data)
+        ans = sc._jvm.PythonMLLibAPI().trainKMeansModel(dataBytes._jrdd,
+                k, maxIterations, runs, initialization_mode)
+        if len(ans) != 1:
+            raise RuntimeError("JVM call result had unexpected length")
+        elif type(ans[0]) != bytearray:
+            raise RuntimeError("JVM call result had first element of type "
+                    + type(ans[0]) + " which is not bytearray")
+        return KMeansModel(_deserialize_double_matrix(ans[0]))
+
+def _test():
+    import doctest
+    globs = globals().copy()
+    globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
+    (failure_count, test_count) = doctest.testmod(globs=globs,
+            optionflags=doctest.ELLIPSIS)
+    globs['sc'].stop()
+    if failure_count:
+        exit(-1)
+
+if __name__ == "__main__":
+    _test()

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/05163057/python/pyspark/mllib/recommendation.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/recommendation.py b/python/pyspark/mllib/recommendation.py
new file mode 100644
index 0000000..14d06cb
--- /dev/null
+++ b/python/pyspark/mllib/recommendation.py
@@ -0,0 +1,74 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from pyspark import SparkContext
+from pyspark.mllib._common import \
+    _get_unmangled_rdd, _get_unmangled_double_vector_rdd, \
+    _serialize_double_matrix, _deserialize_double_matrix, \
+    _serialize_double_vector, _deserialize_double_vector, \
+    _get_initial_weights, _serialize_rating, _regression_train_wrapper
+
+class MatrixFactorizationModel(object):
+    """A matrix factorisation model trained by regularized alternating
+    least-squares.
+
+    >>> r1 = (1, 1, 1.0)
+    >>> r2 = (1, 2, 2.0)
+    >>> r3 = (2, 1, 2.0)
+    >>> ratings = sc.parallelize([r1, r2, r3])
+    >>> model = ALS.trainImplicit(sc, ratings, 1)
+    >>> model.predict(2,2) is not None
+    True
+    """
+
+    def __init__(self, sc, java_model):
+        self._context = sc
+        self._java_model = java_model
+
+    def __del__(self):
+        self._context._gateway.detach(self._java_model)
+
+    def predict(self, user, product):
+        return self._java_model.predict(user, product)
+
+class ALS(object):
+    @classmethod
+    def train(cls, sc, ratings, rank, iterations=5, lambda_=0.01, blocks=-1):
+        ratingBytes = _get_unmangled_rdd(ratings, _serialize_rating)
+        mod = sc._jvm.PythonMLLibAPI().trainALSModel(ratingBytes._jrdd,
+                rank, iterations, lambda_, blocks)
+        return MatrixFactorizationModel(sc, mod)
+
+    @classmethod
+    def trainImplicit(cls, sc, ratings, rank, iterations=5, lambda_=0.01, blocks=-1, alpha=0.01):
+        ratingBytes = _get_unmangled_rdd(ratings, _serialize_rating)
+        mod = sc._jvm.PythonMLLibAPI().trainImplicitALSModel(ratingBytes._jrdd,
+                rank, iterations, lambda_, blocks, alpha)
+        return MatrixFactorizationModel(sc, mod)
+
+def _test():
+    import doctest
+    globs = globals().copy()
+    globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
+    (failure_count, test_count) = doctest.testmod(globs=globs,
+            optionflags=doctest.ELLIPSIS)
+    globs['sc'].stop()
+    if failure_count:
+        exit(-1)
+
+if __name__ == "__main__":
+    _test()

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/05163057/python/pyspark/mllib/regression.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/regression.py b/python/pyspark/mllib/regression.py
new file mode 100644
index 0000000..a3a68b2
--- /dev/null
+++ b/python/pyspark/mllib/regression.py
@@ -0,0 +1,110 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from numpy import array, dot
+from pyspark import SparkContext
+from pyspark.mllib._common import \
+    _get_unmangled_rdd, _get_unmangled_double_vector_rdd, \
+    _serialize_double_matrix, _deserialize_double_matrix, \
+    _serialize_double_vector, _deserialize_double_vector, \
+    _get_initial_weights, _serialize_rating, _regression_train_wrapper, \
+    _linear_predictor_typecheck
+
+class LinearModel(object):
+    """Something that has a vector of coefficients and an intercept."""
+    def __init__(self, coeff, intercept):
+        self._coeff = coeff
+        self._intercept = intercept
+
+class LinearRegressionModelBase(LinearModel):
+    """A linear regression model.
+
+    >>> lrmb = LinearRegressionModelBase(array([1.0, 2.0]), 0.1)
+    >>> abs(lrmb.predict(array([-1.03, 7.777])) - 14.624) < 1e-6
+    True
+    """
+    def predict(self, x):
+        """Predict the value of the dependent variable given a vector x"""
+        """containing values for the independent variables."""
+        _linear_predictor_typecheck(x, self._coeff)
+        return dot(self._coeff, x) + self._intercept
+
+class LinearRegressionModel(LinearRegressionModelBase):
+    """A linear regression model derived from a least-squares fit.
+
+    >>> data = array([0.0, 0.0, 1.0, 1.0, 3.0, 2.0, 2.0, 3.0]).reshape(4,2)
+    >>> lrm = LinearRegressionWithSGD.train(sc, sc.parallelize(data), initial_weights=array([1.0]))
+    """
+
+class LinearRegressionWithSGD(object):
+    @classmethod
+    def train(cls, sc, data, iterations=100, step=1.0,
+              mini_batch_fraction=1.0, initial_weights=None):
+        """Train a linear regression model on the given data."""
+        return _regression_train_wrapper(sc, lambda d, i:
+                sc._jvm.PythonMLLibAPI().trainLinearRegressionModelWithSGD(
+                        d._jrdd, iterations, step, mini_batch_fraction, i),
+                LinearRegressionModel, data, initial_weights)
+
+class LassoModel(LinearRegressionModelBase):
+    """A linear regression model derived from a least-squares fit with an
+    l_1 penalty term.
+
+    >>> data = array([0.0, 0.0, 1.0, 1.0, 3.0, 2.0, 2.0, 3.0]).reshape(4,2)
+    >>> lrm = LassoWithSGD.train(sc, sc.parallelize(data), initial_weights=array([1.0]))
+    """
+    
+class LassoWithSGD(object):
+    @classmethod
+    def train(cls, sc, data, iterations=100, step=1.0, reg_param=1.0,
+              mini_batch_fraction=1.0, initial_weights=None):
+        """Train a Lasso regression model on the given data."""
+        return _regression_train_wrapper(sc, lambda d, i:
+                sc._jvm.PythonMLLibAPI().trainLassoModelWithSGD(d._jrdd,
+                        iterations, step, reg_param, mini_batch_fraction, i),
+                LassoModel, data, initial_weights)
+
+class RidgeRegressionModel(LinearRegressionModelBase):
+    """A linear regression model derived from a least-squares fit with an
+    l_2 penalty term.
+
+    >>> data = array([0.0, 0.0, 1.0, 1.0, 3.0, 2.0, 2.0, 3.0]).reshape(4,2)
+    >>> lrm = RidgeRegressionWithSGD.train(sc, sc.parallelize(data), initial_weights=array([1.0]))
+    """
+
+class RidgeRegressionWithSGD(object):
+    @classmethod
+    def train(cls, sc, data, iterations=100, step=1.0, reg_param=1.0,
+              mini_batch_fraction=1.0, initial_weights=None):
+        """Train a ridge regression model on the given data."""
+        return _regression_train_wrapper(sc, lambda d, i:
+                sc._jvm.PythonMLLibAPI().trainRidgeModelWithSGD(d._jrdd,
+                        iterations, step, reg_param, mini_batch_fraction, i),
+                RidgeRegressionModel, data, initial_weights)
+
+def _test():
+    import doctest
+    globs = globals().copy()
+    globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
+    (failure_count, test_count) = doctest.testmod(globs=globs,
+            optionflags=doctest.ELLIPSIS)
+    globs['sc'].stop()
+    if failure_count:
+        exit(-1)
+
+if __name__ == "__main__":
+    _test()


[18/28] git commit: Release JVM reference to the ALSModel when done.

Posted by ma...@apache.org.
Release JVM reference to the ALSModel when done.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/cbb28111
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/cbb28111
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/cbb28111

Branch: refs/heads/master
Commit: cbb28111896844a0fd94346cd9c6f9926c706555
Parents: 20f85ec
Author: Tor Myklebust <tm...@gmail.com>
Authored: Sun Dec 22 15:03:58 2013 -0500
Committer: Tor Myklebust <tm...@gmail.com>
Committed: Sun Dec 22 15:03:58 2013 -0500

----------------------------------------------------------------------
 python/pyspark/mllib.py | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/cbb28111/python/pyspark/mllib.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib.py b/python/pyspark/mllib.py
index 22187eb..1f5a5f6 100644
--- a/python/pyspark/mllib.py
+++ b/python/pyspark/mllib.py
@@ -357,8 +357,8 @@ class ALSModel(object):
         self._context = sc
         self._java_model = java_model
 
-    #def __del__(self):
-        #self._gateway.detach(self._java_model)
+    def __del__(self):
+        self._context._gateway.detach(self._java_model)
 
     def predict(self, user, product):
         return self._java_model.predict(user, product)


[07/28] git commit: Scala classification and clustering stubs; matrix serialization/deserialization.

Posted by ma...@apache.org.
Scala classification and clustering stubs; matrix serialization/deserialization.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/f99970e8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/f99970e8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/f99970e8

Branch: refs/heads/master
Commit: f99970e8cdc85eae33999b57a4c5c1893fe3727a
Parents: 2328bdd
Author: Tor Myklebust <tm...@gmail.com>
Authored: Fri Dec 20 00:12:22 2013 -0500
Committer: Tor Myklebust <tm...@gmail.com>
Committed: Fri Dec 20 00:12:22 2013 -0500

----------------------------------------------------------------------
 .../apache/spark/mllib/api/PythonMLLibAPI.scala | 82 +++++++++++++++++++-
 1 file changed, 79 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f99970e8/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala
index c9bd7c6..bcf2f07 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala
@@ -1,5 +1,7 @@
 import org.apache.spark.api.java.JavaRDD
 import org.apache.spark.mllib.regression._
+import org.apache.spark.mllib.classification._
+import org.apache.spark.mllib.clustering._
 import org.apache.spark.rdd.RDD
 import java.nio.ByteBuffer
 import java.nio.ByteOrder
@@ -39,6 +41,52 @@ class PythonMLLibAPI extends Serializable {
     return bytes
   }
 
+  def deserializeDoubleMatrix(bytes: Array[Byte]): Array[Array[Double]] = {
+    val packetLength = bytes.length
+    if (packetLength < 24) {
+      throw new IllegalArgumentException("Byte array too short.")
+    }
+    val bb = ByteBuffer.wrap(bytes)
+    bb.order(ByteOrder.nativeOrder())
+    val magic = bb.getLong()
+    if (magic != 2) {
+      throw new IllegalArgumentException("Magic " + magic + " is wrong.")
+    }
+    val rows = bb.getLong()
+    val cols = bb.getLong()
+    if (packetLength != 24 + 8 * rows * cols) {
+      throw new IllegalArgumentException("Size " + rows + "x" + cols + "is wrong.")
+    }
+    val db = bb.asDoubleBuffer()
+    val ans = new Array[Array[Double]](rows.toInt)
+    var i = 0
+    for (i <- 0 until rows.toInt) {
+      ans(i) = new Array[Double](cols.toInt)
+      db.get(ans(i))
+    }
+    return ans
+  }
+
+  def serializeDoubleMatrix(doubles: Array[Array[Double]]): Array[Byte] = {
+    val rows = doubles.length
+    var cols = 0
+    if (rows > 0) {
+      cols = doubles(0).length
+    }
+    val bytes = new Array[Byte](24 + 8 * rows * cols)
+    val bb = ByteBuffer.wrap(bytes)
+    bb.order(ByteOrder.nativeOrder())
+    bb.putLong(2)
+    bb.putLong(rows)
+    bb.putLong(cols)
+    val db = bb.asDoubleBuffer()
+    var i = 0
+    for (i <- 0 until rows) {
+      db.put(doubles(i))
+    }
+    return bytes
+  }
+
   def trainRegressionModel(trainFunc: (RDD[LabeledPoint], Array[Double]) => GeneralizedLinearModel,
       dataBytesJRDD: JavaRDD[Array[Byte]], initialWeightsBA: Array[Byte]):
       java.util.LinkedList[java.lang.Object] = {
@@ -60,7 +108,7 @@ class PythonMLLibAPI extends Serializable {
     return trainRegressionModel((data, initialWeights) =>
         LinearRegressionWithSGD.train(data, numIterations, stepSize,
                                       miniBatchFraction, initialWeights),
-        dataBytesJRDD, initialWeightsBA);
+        dataBytesJRDD, initialWeightsBA)
   }
 
   def trainLassoModel(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int,
@@ -69,7 +117,7 @@ class PythonMLLibAPI extends Serializable {
     return trainRegressionModel((data, initialWeights) =>
         LassoWithSGD.train(data, numIterations, stepSize, regParam,
                            miniBatchFraction, initialWeights),
-        dataBytesJRDD, initialWeightsBA);
+        dataBytesJRDD, initialWeightsBA)
   }
 
   def trainRidgeModel(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int,
@@ -78,6 +126,34 @@ class PythonMLLibAPI extends Serializable {
     return trainRegressionModel((data, initialWeights) =>
         RidgeRegressionWithSGD.train(data, numIterations, stepSize, regParam,
                                      miniBatchFraction, initialWeights),
-        dataBytesJRDD, initialWeightsBA);
+        dataBytesJRDD, initialWeightsBA)
+  }
+
+  def trainSVMModel(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int,
+      stepSize: Double, regParam: Double, miniBatchFraction: Double,
+      initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
+    return trainRegressionModel((data, initialWeights) =>
+        SVMWithSGD.train(data, numIterations, stepSize, regParam,
+                                     miniBatchFraction, initialWeights),
+        dataBytesJRDD, initialWeightsBA)
+  }
+
+  def trainLogisticRegressionModel(dataBytesJRDD: JavaRDD[Array[Byte]],
+      numIterations: Int, stepSize: Double, miniBatchFraction: Double,
+      initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
+    return trainRegressionModel((data, initialWeights) =>
+        LogisticRegressionWithSGD.train(data, numIterations, stepSize,
+                                     miniBatchFraction, initialWeights),
+        dataBytesJRDD, initialWeightsBA)
+  }
+
+  def trainKMeansModel(dataBytesJRDD: JavaRDD[Array[Byte]], k: Int,
+      maxIterations: Int, runs: Int, initializationMode: String):
+      java.util.List[java.lang.Object] = {
+    val data = dataBytesJRDD.rdd.map(xBytes => deserializeDoubleVector(xBytes))
+    val model = KMeans.train(data, k, maxIterations, runs, initializationMode)
+    val ret = new java.util.LinkedList[java.lang.Object]()
+    ret.add(serializeDoubleMatrix(model.clusterCenters))
+    return ret
   }
 }


[28/28] git commit: Merge pull request #283 from tmyklebu/master

Posted by ma...@apache.org.
Merge pull request #283 from tmyklebu/master

Python bindings for mllib

This pull request contains Python bindings for the regression, clustering, classification, and recommendation tools in mllib.

For each 'train' frontend exposed, there is a Scala stub in PythonMLLibAPI.scala and a Python stub in mllib.py.  The Python stub serialises the input RDD and any vector/matrix arguments into a mutually-understood format and calls the Scala stub.  The Scala stub deserialises the RDD and the vector/matrix arguments, calls the appropriate 'train' function, serialises the resulting model, and returns the serialised model.

ALSModel is slightly different since a MatrixFactorizationModel has RDDs inside.  The Scala stub returns a handle to a Scala MatrixFactorizationModel; prediction is done by calling the Scala predict method.

I have tested these bindings on an x86_64 machine running Linux.  There is a risk that these bindings may fail on some choose-your-own-endian platform if Python's endian differs from java.nio.ByteBuffer's idea of the native byte order.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/c344ed04
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/c344ed04
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/c344ed04

Branch: refs/heads/master
Commit: c344ed04c7d65d64e87bb50ad6eba57534945398
Parents: 56094bc 9cbcf81
Author: Matei Zaharia <ma...@databricks.com>
Authored: Thu Dec 26 01:31:06 2013 -0500
Committer: Matei Zaharia <ma...@databricks.com>
Committed: Thu Dec 26 01:31:06 2013 -0500

----------------------------------------------------------------------
 .../spark/mllib/api/python/PythonMLLibAPI.scala | 232 +++++++++++++++++++
 python/pyspark/java_gateway.py                  |   1 +
 python/pyspark/mllib/__init__.py                |  20 ++
 python/pyspark/mllib/_common.py                 | 227 ++++++++++++++++++
 python/pyspark/mllib/classification.py          |  86 +++++++
 python/pyspark/mllib/clustering.py              |  79 +++++++
 python/pyspark/mllib/recommendation.py          |  74 ++++++
 python/pyspark/mllib/regression.py              | 110 +++++++++
 python/pyspark/serializers.py                   |   2 +-
 9 files changed, 830 insertions(+), 1 deletion(-)
----------------------------------------------------------------------



[05/28] git commit: Bindings for linear, Lasso, and ridge regression.

Posted by ma...@apache.org.
Bindings for linear, Lasso, and ridge regression.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/ded67ee9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/ded67ee9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/ded67ee9

Branch: refs/heads/master
Commit: ded67ee90c2c0b22d67e623156a3f6cce8573abd
Parents: 2a41c9a
Author: Tor Myklebust <tm...@gmail.com>
Authored: Thu Dec 19 22:42:12 2013 -0500
Committer: Tor Myklebust <tm...@gmail.com>
Committed: Thu Dec 19 22:42:12 2013 -0500

----------------------------------------------------------------------
 .../apache/spark/mllib/api/PythonMLLibAPI.scala | 42 +++++++++++++++++---
 1 file changed, 37 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ded67ee9/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala
index 3daf5dc..c9bd7c6 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala
@@ -1,5 +1,6 @@
 import org.apache.spark.api.java.JavaRDD
 import org.apache.spark.mllib.regression._
+import org.apache.spark.rdd.RDD
 import java.nio.ByteBuffer
 import java.nio.ByteOrder
 import java.nio.DoubleBuffer
@@ -38,14 +39,45 @@ class PythonMLLibAPI extends Serializable {
     return bytes
   }
 
-  def trainLinearRegressionModel(dataBytesJRDD: JavaRDD[Array[Byte]]):
-      java.util.List[java.lang.Object] = {
-    val data = dataBytesJRDD.rdd.map(x => deserializeDoubleVector(x))
-        .map(v => LabeledPoint(v(0), v.slice(1, v.length)))
-    val model = LinearRegressionWithSGD.train(data, 222)
+  def trainRegressionModel(trainFunc: (RDD[LabeledPoint], Array[Double]) => GeneralizedLinearModel,
+      dataBytesJRDD: JavaRDD[Array[Byte]], initialWeightsBA: Array[Byte]):
+      java.util.LinkedList[java.lang.Object] = {
+    val data = dataBytesJRDD.rdd.map(xBytes => {
+        val x = deserializeDoubleVector(xBytes)
+        LabeledPoint(x(0), x.slice(1, x.length))
+    })
+    val initialWeights = deserializeDoubleVector(initialWeightsBA)
+    val model = trainFunc(data, initialWeights)
     val ret = new java.util.LinkedList[java.lang.Object]()
     ret.add(serializeDoubleVector(model.weights))
     ret.add(model.intercept: java.lang.Double)
     return ret
   }
+
+  def trainLinearRegressionModel(dataBytesJRDD: JavaRDD[Array[Byte]],
+      numIterations: Int, stepSize: Double, miniBatchFraction: Double,
+      initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
+    return trainRegressionModel((data, initialWeights) =>
+        LinearRegressionWithSGD.train(data, numIterations, stepSize,
+                                      miniBatchFraction, initialWeights),
+        dataBytesJRDD, initialWeightsBA);
+  }
+
+  def trainLassoModel(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int,
+      stepSize: Double, regParam: Double, miniBatchFraction: Double,
+      initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
+    return trainRegressionModel((data, initialWeights) =>
+        LassoWithSGD.train(data, numIterations, stepSize, regParam,
+                           miniBatchFraction, initialWeights),
+        dataBytesJRDD, initialWeightsBA);
+  }
+
+  def trainRidgeModel(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int,
+      stepSize: Double, regParam: Double, miniBatchFraction: Double,
+      initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
+    return trainRegressionModel((data, initialWeights) =>
+        RidgeRegressionWithSGD.train(data, numIterations, stepSize, regParam,
+                                     miniBatchFraction, initialWeights),
+        dataBytesJRDD, initialWeightsBA);
+  }
 }


[10/28] git commit: Remove gigantic endian-specific test and exception tests.

Posted by ma...@apache.org.
Remove gigantic endian-specific test and exception tests.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/319520b9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/319520b9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/319520b9

Branch: refs/heads/master
Commit: 319520b9bb0071527a0be1e0e545ca084ac090ee
Parents: 2940201
Author: Tor Myklebust <tm...@gmail.com>
Authored: Fri Dec 20 01:48:44 2013 -0500
Committer: Tor Myklebust <tm...@gmail.com>
Committed: Fri Dec 20 01:48:44 2013 -0500

----------------------------------------------------------------------
 python/pyspark/mllib.py | 41 +++--------------------------------------
 1 file changed, 3 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/319520b9/python/pyspark/mllib.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib.py b/python/pyspark/mllib.py
index aa9fc76..e7e2216 100644
--- a/python/pyspark/mllib.py
+++ b/python/pyspark/mllib.py
@@ -27,40 +27,7 @@ def _deserialize_byte_array(shape, ba, offset):
     return ar.copy()
 
 def _serialize_double_vector(v):
-    """Serialize a double vector into a mutually understood format.
-
-    >>> _serialize_double_vector(array([]))
-    bytearray(b'\\x01\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00')
-    >>> _serialize_double_vector(array([0.0, 1.0]))
-    bytearray(b'\\x01\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x02\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\xf0?')
-    >>> _serialize_double_vector("hello, world")
-    Traceback (most recent call last):
-      File "/usr/lib/python2.7/doctest.py", line 1289, in __run
-        compileflags, 1) in test.globs
-      File "<doctest __main__._serialize_double_vector[1]>", line 1, in <module>
-        _serialize_double_vector("hello, world")
-      File "python/pyspark/mllib.py", line 41, in _serialize_double_vector
-        raise TypeError("_serialize_double_vector called on a %s; wanted ndarray" % type(v))
-    TypeError: _serialize_double_vector called on a <type 'str'>; wanted ndarray
-    >>> _serialize_double_vector(array([0, 1]))
-    Traceback (most recent call last):
-      File "/usr/lib/python2.7/doctest.py", line 1289, in __run
-        compileflags, 1) in test.globs
-      File "<doctest __main__._serialize_double_vector[2]>", line 1, in <module>
-        _serialize_double_vector(array([0, 1]))
-      File "python/pyspark/mllib.py", line 51, in _serialize_double_vector
-        "wanted ndarray of float64" % v.dtype)
-    TypeError: _serialize_double_vector called on an ndarray of int64; wanted ndarray of float64
-    >>> _serialize_double_vector(array([0.0, 1.0, 2.0, 3.0]).reshape(2,2))
-    Traceback (most recent call last):
-      File "/usr/lib/python2.7/doctest.py", line 1289, in __run
-        compileflags, 1) in test.globs
-      File "<doctest __main__._serialize_double_vector[3]>", line 1, in <module>
-        _serialize_double_vector(array([0.0, 1.0, 2.0, 3.0]).reshape(2,2))
-      File "python/pyspark/mllib.py", line 62, in _serialize_double_vector
-        "wanted a 1darray" % v.ndim)
-    TypeError: _serialize_double_vector called on a 2darray; wanted a 1darray
-    """
+    """Serialize a double vector into a mutually understood format."""
     if type(v) != ndarray:
         raise TypeError("_serialize_double_vector called on a %s; "
                 "wanted ndarray" % type(v))
@@ -106,8 +73,7 @@ def _deserialize_double_vector(ba):
     return _deserialize_byte_array([length], ba, 16)
 
 def _serialize_double_matrix(m):
-    """Serialize a double matrix into a mutually understood format.
-    """
+    """Serialize a double matrix into a mutually understood format."""
     if (type(m) == ndarray and m.dtype == float64 and m.ndim == 2):
         rows = m.shape[0]
         cols = m.shape[1]
@@ -124,8 +90,7 @@ def _serialize_double_matrix(m):
                         "non-double-matrix")
 
 def _deserialize_double_matrix(ba):
-    """Deserialize a double matrix from a mutually understood format.
-    """
+    """Deserialize a double matrix from a mutually understood format."""
     if type(ba) != bytearray:
         raise TypeError("_deserialize_double_matrix called on a %s; "
                 "wanted bytearray" % type(ba))


[12/28] git commit: Licence notice.

Posted by ma...@apache.org.
Licence notice.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/b835ddf3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/b835ddf3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/b835ddf3

Branch: refs/heads/master
Commit: b835ddf3dffe8698dab3b42c14a9da472868b13c
Parents: d89cc1e
Author: Tor Myklebust <tm...@gmail.com>
Authored: Fri Dec 20 01:55:03 2013 -0500
Committer: Tor Myklebust <tm...@gmail.com>
Committed: Fri Dec 20 01:55:03 2013 -0500

----------------------------------------------------------------------
 .../apache/spark/mllib/api/PythonMLLibAPI.scala    | 17 +++++++++++++++++
 python/pyspark/mllib.py                            | 17 +++++++++++++++++
 2 files changed, 34 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b835ddf3/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala
index bcf2f07..bad1f66 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 import org.apache.spark.api.java.JavaRDD
 import org.apache.spark.mllib.regression._
 import org.apache.spark.mllib.classification._

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b835ddf3/python/pyspark/mllib.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib.py b/python/pyspark/mllib.py
index 1877404..ce1363f 100644
--- a/python/pyspark/mllib.py
+++ b/python/pyspark/mllib.py
@@ -1,3 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
 from numpy import *
 from pyspark import SparkContext
 


[26/28] git commit: Fix copypasta in __init__.py. Don't import anything directly into pyspark.mllib.

Posted by ma...@apache.org.
Fix copypasta in __init__.py.  Don't import anything directly into pyspark.mllib.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/5e71354c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/5e71354c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/5e71354c

Branch: refs/heads/master
Commit: 5e71354cb7ff758d9a70494ca1788aebea1bbb08
Parents: 02208a1
Author: Tor Myklebust <tm...@gmail.com>
Authored: Wed Dec 25 14:10:55 2013 -0500
Committer: Tor Myklebust <tm...@gmail.com>
Committed: Wed Dec 25 14:10:55 2013 -0500

----------------------------------------------------------------------
 python/pyspark/mllib/__init__.py | 34 ++++++++--------------------------
 1 file changed, 8 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/5e71354c/python/pyspark/mllib/__init__.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/__init__.py b/python/pyspark/mllib/__init__.py
index 6037a3a..e9c62f3 100644
--- a/python/pyspark/mllib/__init__.py
+++ b/python/pyspark/mllib/__init__.py
@@ -16,31 +16,13 @@
 #
 
 """
-PySpark is the Python API for Spark.
-
-Public classes:
-
-    - L{SparkContext<pyspark.context.SparkContext>}
-        Main entry point for Spark functionality.
-    - L{RDD<pyspark.rdd.RDD>}
-        A Resilient Distributed Dataset (RDD), the basic abstraction in Spark.
-    - L{Broadcast<pyspark.broadcast.Broadcast>}
-        A broadcast variable that gets reused across tasks.
-    - L{Accumulator<pyspark.accumulators.Accumulator>}
-        An "add-only" shared variable that tasks can only add values to.
-    - L{SparkFiles<pyspark.files.SparkFiles>}
-        Access files shipped with jobs.
-    - L{StorageLevel<pyspark.storagelevel.StorageLevel>}
-        Finer-grained cache persistence levels.
+Python bindings for MLlib.
 """
-import sys
-import os
-sys.path.insert(0, os.path.join(os.environ["SPARK_HOME"], "python/lib/py4j0.7.egg"))
-
-from pyspark.mllib.regression import LinearRegressionModel, LassoModel, RidgeRegressionModel, LinearRegressionWithSGD, LassoWithSGD, RidgeRegressionWithSGD
-from pyspark.mllib.classification import LogisticRegressionModel, SVMModel, LogisticRegressionWithSGD, SVMWithSGD
-from pyspark.mllib.recommendation import MatrixFactorizationModel, ALS
-from pyspark.mllib.clustering import KMeansModel, KMeans
 
-
-__all__ = ["LinearRegressionModel", "LassoModel", "RidgeRegressionModel", "LinearRegressionWithSGD", "LassoWithSGD", "RidgeRegressionWithSGD", "LogisticRegressionModel", "SVMModel", "LogisticRegressionWithSGD", "SVMWithSGD", "MatrixFactorizationModel", "ALS", "KMeansModel", "KMeans"]
+#from pyspark.mllib.regression import LinearRegressionModel, LassoModel, RidgeRegressionModel, LinearRegressionWithSGD, LassoWithSGD, RidgeRegressionWithSGD
+#from pyspark.mllib.classification import LogisticRegressionModel, SVMModel, LogisticRegressionWithSGD, SVMWithSGD
+#from pyspark.mllib.recommendation import MatrixFactorizationModel, ALS
+#from pyspark.mllib.clustering import KMeansModel, KMeans
+#
+#
+#__all__ = ["LinearRegressionModel", "LassoModel", "RidgeRegressionModel", "LinearRegressionWithSGD", "LassoWithSGD", "RidgeRegressionWithSGD", "LogisticRegressionModel", "SVMModel", "LogisticRegressionWithSGD", "SVMWithSGD", "MatrixFactorizationModel", "ALS", "KMeansModel", "KMeans"]


[09/28] git commit: Tests for the Python side of the mllib bindings.

Posted by ma...@apache.org.
Tests for the Python side of the mllib bindings.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/2940201a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/2940201a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/2940201a

Branch: refs/heads/master
Commit: 2940201ad86e5dee16cf7386b3c934fc75c15582
Parents: 73e1706
Author: Tor Myklebust <tm...@gmail.com>
Authored: Fri Dec 20 01:33:32 2013 -0500
Committer: Tor Myklebust <tm...@gmail.com>
Committed: Fri Dec 20 01:33:32 2013 -0500

----------------------------------------------------------------------
 python/pyspark/mllib.py | 224 +++++++++++++++++++++++++++++++++----------
 1 file changed, 172 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2940201a/python/pyspark/mllib.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib.py b/python/pyspark/mllib.py
index 21f3c03..aa9fc76 100644
--- a/python/pyspark/mllib.py
+++ b/python/pyspark/mllib.py
@@ -1,4 +1,5 @@
 from numpy import *
+from pyspark import SparkContext
 
 # Double vector format:
 #
@@ -7,44 +8,106 @@ from numpy import *
 # Double matrix format:
 #
 # [8-byte 2] [8-byte rows] [8-byte cols] [rows*cols*8 bytes of data]
-# 
+#
 # This is all in machine-endian.  That means that the Java interpreter and the
 # Python interpreter must agree on what endian the machine is.
 
 def _deserialize_byte_array(shape, ba, offset):
+    """Wrapper around ndarray aliasing hack.
+
+    >>> x = array([1.0, 2.0, 3.0, 4.0, 5.0])
+    >>> array_equal(x, _deserialize_byte_array(x.shape, x.data, 0))
+    True
+    >>> x = array([1.0, 2.0, 3.0, 4.0]).reshape(2,2)
+    >>> array_equal(x, _deserialize_byte_array(x.shape, x.data, 0))
+    True
+    """
     ar = ndarray(shape=shape, buffer=ba, offset=offset, dtype="float64",
             order='C')
     return ar.copy()
 
 def _serialize_double_vector(v):
-    if (type(v) == ndarray and v.dtype == float64 and v.ndim == 1):
-        length = v.shape[0]
-        ba = bytearray(16 + 8*length)
-        header = ndarray(shape=[2], buffer=ba, dtype="int64")
-        header[0] = 1
-        header[1] = length
-        copyto(ndarray(shape=[length], buffer=ba, offset=16,
-                dtype="float64"), v)
-        return ba
-    else:
-        raise TypeError("_serialize_double_vector called on a "
-                        "non-double-vector")
+    """Serialize a double vector into a mutually understood format.
+
+    >>> _serialize_double_vector(array([]))
+    bytearray(b'\\x01\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00')
+    >>> _serialize_double_vector(array([0.0, 1.0]))
+    bytearray(b'\\x01\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x02\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\xf0?')
+    >>> _serialize_double_vector("hello, world")
+    Traceback (most recent call last):
+      File "/usr/lib/python2.7/doctest.py", line 1289, in __run
+        compileflags, 1) in test.globs
+      File "<doctest __main__._serialize_double_vector[1]>", line 1, in <module>
+        _serialize_double_vector("hello, world")
+      File "python/pyspark/mllib.py", line 41, in _serialize_double_vector
+        raise TypeError("_serialize_double_vector called on a %s; wanted ndarray" % type(v))
+    TypeError: _serialize_double_vector called on a <type 'str'>; wanted ndarray
+    >>> _serialize_double_vector(array([0, 1]))
+    Traceback (most recent call last):
+      File "/usr/lib/python2.7/doctest.py", line 1289, in __run
+        compileflags, 1) in test.globs
+      File "<doctest __main__._serialize_double_vector[2]>", line 1, in <module>
+        _serialize_double_vector(array([0, 1]))
+      File "python/pyspark/mllib.py", line 51, in _serialize_double_vector
+        "wanted ndarray of float64" % v.dtype)
+    TypeError: _serialize_double_vector called on an ndarray of int64; wanted ndarray of float64
+    >>> _serialize_double_vector(array([0.0, 1.0, 2.0, 3.0]).reshape(2,2))
+    Traceback (most recent call last):
+      File "/usr/lib/python2.7/doctest.py", line 1289, in __run
+        compileflags, 1) in test.globs
+      File "<doctest __main__._serialize_double_vector[3]>", line 1, in <module>
+        _serialize_double_vector(array([0.0, 1.0, 2.0, 3.0]).reshape(2,2))
+      File "python/pyspark/mllib.py", line 62, in _serialize_double_vector
+        "wanted a 1darray" % v.ndim)
+    TypeError: _serialize_double_vector called on a 2darray; wanted a 1darray
+    """
+    if type(v) != ndarray:
+        raise TypeError("_serialize_double_vector called on a %s; "
+                "wanted ndarray" % type(v))
+    if v.dtype != float64:
+        raise TypeError("_serialize_double_vector called on an ndarray of %s; "
+                "wanted ndarray of float64" % v.dtype)
+    if v.ndim != 1:
+        raise TypeError("_serialize_double_vector called on a %ddarray; "
+                "wanted a 1darray" % v.ndim)
+    length = v.shape[0]
+    ba = bytearray(16 + 8*length)
+    header = ndarray(shape=[2], buffer=ba, dtype="int64")
+    header[0] = 1
+    header[1] = length
+    copyto(ndarray(shape=[length], buffer=ba, offset=16,
+            dtype="float64"), v)
+    return ba
 
 def _deserialize_double_vector(ba):
-    if (type(ba) == bytearray and len(ba) >= 16 and (len(ba) & 7 == 0)):
-        header = ndarray(shape=[2], buffer=ba, dtype="int64")
-        if (header[0] != 1):
-            raise TypeError("_deserialize_double_vector called on bytearray "
-                            "with wrong magic")
-        length = header[1]
-        if (len(ba) != 8*length + 16):
-            raise TypeError("_deserialize_double_vector called on bytearray "
-                            "with wrong length")
-        return _deserialize_byte_array([length], ba, 16)
-    else:
-        raise TypeError("_deserialize_double_vector called on a non-bytearray")
+    """Deserialize a double vector from a mutually understood format.
+
+    >>> x = array([1.0, 2.0, 3.0, 4.0, -1.0, 0.0, -0.0])
+    >>> array_equal(x, _deserialize_double_vector(_serialize_double_vector(x)))
+    True
+    """
+    if type(ba) != bytearray:
+        raise TypeError("_deserialize_double_vector called on a %s; "
+                "wanted bytearray" % type(ba))
+    if len(ba) < 16:
+        raise TypeError("_deserialize_double_vector called on a %d-byte array, "
+                "which is too short" % len(ba))
+    if (len(ba) & 7) != 0:
+        raise TypeError("_deserialize_double_vector called on a %d-byte array, "
+                "which is not a multiple of 8" % len(ba))
+    header = ndarray(shape=[2], buffer=ba, dtype="int64")
+    if header[0] != 1:
+        raise TypeError("_deserialize_double_vector called on bytearray "
+                        "with wrong magic")
+    length = header[1]
+    if len(ba) != 8*length + 16:
+        raise TypeError("_deserialize_double_vector called on bytearray "
+                        "with wrong length")
+    return _deserialize_byte_array([length], ba, 16)
 
 def _serialize_double_matrix(m):
+    """Serialize a double matrix into a mutually understood format.
+    """
     if (type(m) == ndarray and m.dtype == float64 and m.ndim == 2):
         rows = m.shape[0]
         cols = m.shape[1]
@@ -61,22 +124,31 @@ def _serialize_double_matrix(m):
                         "non-double-matrix")
 
 def _deserialize_double_matrix(ba):
-    if (type(ba) == bytearray and len(ba) >= 24 and (len(ba) & 7 == 0)):
-        header = ndarray(shape=[3], buffer=ba, dtype="int64")
-        if (header[0] != 2):
-            raise TypeError("_deserialize_double_matrix called on bytearray "
-                            "with wrong magic")
-        rows = header[1]
-        cols = header[2]
-        if (len(ba) != 8*rows*cols + 24):
-            raise TypeError("_deserialize_double_matrix called on bytearray "
-                            "with wrong length")
-        return _deserialize_byte_array([rows, cols], ba, 24)
-    else:
-        raise TypeError("_deserialize_double_matrix called on a non-bytearray")
+    """Deserialize a double matrix from a mutually understood format.
+    """
+    if type(ba) != bytearray:
+        raise TypeError("_deserialize_double_matrix called on a %s; "
+                "wanted bytearray" % type(ba))
+    if len(ba) < 24:
+        raise TypeError("_deserialize_double_matrix called on a %d-byte array, "
+                "which is too short" % len(ba))
+    if (len(ba) & 7) != 0:
+        raise TypeError("_deserialize_double_matrix called on a %d-byte array, "
+                "which is not a multiple of 8" % len(ba))
+    header = ndarray(shape=[3], buffer=ba, dtype="int64")
+    if (header[0] != 2):
+        raise TypeError("_deserialize_double_matrix called on bytearray "
+                        "with wrong magic")
+    rows = header[1]
+    cols = header[2]
+    if (len(ba) != 8*rows*cols + 24):
+        raise TypeError("_deserialize_double_matrix called on bytearray "
+                        "with wrong length")
+    return _deserialize_byte_array([rows, cols], ba, 24)
 
 def _linear_predictor_typecheck(x, coeffs):
-    """Predict the class of the vector x."""
+    """Check that x is a one-dimensional vector of the right shape.
+    This is a temporary hackaround until I actually implement bulk predict."""
     if type(x) == ndarray:
         if x.ndim == 1:
             if x.shape == coeffs.shape:
@@ -98,12 +170,17 @@ class LinearModel(object):
         self._intercept = intercept
 
 class LinearRegressionModelBase(LinearModel):
-    """A linear regression model."""
+    """A linear regression model.
+
+    >>> lrmb = LinearRegressionModelBase(array([1.0, 2.0]), 0.1)
+    >>> abs(lrmb.predict(array([-1.03, 7.777])) - 14.624) < 1e-6
+    True
+    """
     def predict(self, x):
         """Predict the value of the dependent variable given a vector x"""
         """containing values for the independent variables."""
-        _linear_predictor_typecheck(x, _coeff)
-        return dot(_coeff, x) + _intercept
+        _linear_predictor_typecheck(x, self._coeff)
+        return dot(self._coeff, x) + self._intercept
 
 # Map a pickled Python RDD of numpy double vectors to a Java RDD of
 # _serialized_double_vectors
@@ -145,7 +222,11 @@ def _regression_train_wrapper(sc, train_func, klass, data, initial_weights):
     return klass(_deserialize_double_vector(ans[0]), ans[1]);
 
 class LinearRegressionModel(LinearRegressionModelBase):
-    """A linear regression model derived from a least-squares fit."""
+    """A linear regression model derived from a least-squares fit.
+
+    >>> data = array([0.0, 0.0, 1.0, 1.0, 3.0, 2.0, 2.0, 3.0]).reshape(4,2)
+    >>> lrm = LinearRegressionModel.train(sc, sc.parallelize(data), initial_weights=array([1.0]))
+    """
     @classmethod
     def train(cls, sc, data, iterations=100, step=1.0,
               mini_batch_fraction=1.0, initial_weights=None):
@@ -156,8 +237,12 @@ class LinearRegressionModel(LinearRegressionModelBase):
                 LinearRegressionModel, data, initial_weights)
 
 class LassoModel(LinearRegressionModelBase):
-    """A linear regression model derived from a least-squares fit with an """
-    """l_1 penalty term."""
+    """A linear regression model derived from a least-squares fit with an
+    l_1 penalty term.
+
+    >>> data = array([0.0, 0.0, 1.0, 1.0, 3.0, 2.0, 2.0, 3.0]).reshape(4,2)
+    >>> lrm = LassoModel.train(sc, sc.parallelize(data), initial_weights=array([1.0]))
+    """
     @classmethod
     def train(cls, sc, data, iterations=100, step=1.0, reg_param=1.0,
               mini_batch_fraction=1.0, initial_weights=None):
@@ -168,8 +253,12 @@ class LassoModel(LinearRegressionModelBase):
                 LassoModel, data, initial_weights)
 
 class RidgeRegressionModel(LinearRegressionModelBase):
-    """A linear regression model derived from a least-squares fit with an """
-    """l_2 penalty term."""
+    """A linear regression model derived from a least-squares fit with an
+    l_2 penalty term.
+
+    >>> data = array([0.0, 0.0, 1.0, 1.0, 3.0, 2.0, 2.0, 3.0]).reshape(4,2)
+    >>> lrm = RidgeRegressionModel.train(sc, sc.parallelize(data), initial_weights=array([1.0]))
+    """
     @classmethod
     def train(cls, sc, data, iterations=100, step=1.0, reg_param=1.0,
               mini_batch_fraction=1.0, initial_weights=None):
@@ -180,7 +269,11 @@ class RidgeRegressionModel(LinearRegressionModelBase):
                 RidgeRegressionModel, data, initial_weights)
 
 class LogisticRegressionModel(LinearModel):
-    """A linear binary classification model derived from logistic regression."""
+    """A linear binary classification model derived from logistic regression.
+
+    >>> data = array([0.0, 0.0, 1.0, 1.0, 1.0, 2.0, 1.0, 3.0]).reshape(4,2)
+    >>> lrm = LogisticRegressionModel.train(sc, sc.parallelize(data))
+    """
     def predict(self, x):
         _linear_predictor_typecheck(x, _coeff)
         margin = dot(x, _coeff) + intercept
@@ -197,7 +290,11 @@ class LogisticRegressionModel(LinearModel):
                 LogisticRegressionModel, data, initial_weights)
 
 class SVMModel(LinearModel):
-    """A support vector machine."""
+    """A support vector machine.
+
+    >>> data = array([0.0, 0.0, 1.0, 1.0, 1.0, 2.0, 1.0, 3.0]).reshape(4,2)
+    >>> svm = SVMModel.train(sc, sc.parallelize(data))
+    """
     def predict(self, x):
         _linear_predictor_typecheck(x, _coeff)
         margin = dot(x, _coeff) + intercept
@@ -212,15 +309,24 @@ class SVMModel(LinearModel):
                 SVMModel, data, initial_weights)
 
 class KMeansModel(object):
-    """A clustering model derived from the k-means method."""
+    """A clustering model derived from the k-means method.
+
+    >>> data = array([0.0, 0.0, 1.0,1.0, 9.0,8.0, 8.0,9.0]).reshape(4,2)
+    >>> clusters = KMeansModel.train(sc, sc.parallelize(data), 2, maxIterations=10, runs=30, initialization_mode="random")
+    >>> clusters.predict(array([0.0, 0.0])) == clusters.predict(array([1.0, 1.0]))
+    True
+    >>> clusters.predict(array([8.0, 9.0])) == clusters.predict(array([9.0, 8.0]))
+    True
+    >>> clusters = KMeansModel.train(sc, sc.parallelize(data), 2)
+    """
     def __init__(self, centers_):
         self.centers = centers_
 
     def predict(self, x):
         best = 0
         best_distance = 1e75
-        for i in range(0, centers.shape[0]):
-            diff = x - centers[i]
+        for i in range(0, self.centers.shape[0]):
+            diff = x - self.centers[i]
             distance = sqrt(dot(diff, diff))
             if distance < best_distance:
                 best = i
@@ -239,3 +345,17 @@ class KMeansModel(object):
             raise RuntimeError("JVM call result had first element of type "
                     + type(ans[0]) + " which is not bytearray");
         return KMeansModel(_deserialize_double_matrix(ans[0]));
+
+def _test():
+    import doctest
+    globs = globals().copy()
+    globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
+    (failure_count, test_count) = doctest.testmod(globs=globs,
+        optionflags=doctest.ELLIPSIS)
+    globs['sc'].stop()
+    print failure_count,"failures among",test_count,"tests"
+    if failure_count:
+        exit(-1)
+
+if __name__ == "__main__":
+    _test()


[15/28] git commit: Javadocs; also, declare some things private.

Posted by ma...@apache.org.
Javadocs; also, declare some things private.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/b454fdc2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/b454fdc2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/b454fdc2

Branch: refs/heads/master
Commit: b454fdc2ebc495e4d13162f4bea8cf3e33909463
Parents: 0b494c2
Author: Tor Myklebust <tm...@gmail.com>
Authored: Fri Dec 20 02:10:21 2013 -0500
Committer: Tor Myklebust <tm...@gmail.com>
Committed: Fri Dec 20 02:10:21 2013 -0500

----------------------------------------------------------------------
 .../apache/spark/mllib/api/PythonMLLibAPI.scala | 31 ++++++++++++++++----
 1 file changed, 26 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b454fdc2/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala
index bad1f66..6472bf6 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala
@@ -24,8 +24,11 @@ import java.nio.ByteBuffer
 import java.nio.ByteOrder
 import java.nio.DoubleBuffer
 
+/**
+ * The Java stubs necessary for the Python mllib bindings.
+ */
 class PythonMLLibAPI extends Serializable {
-  def deserializeDoubleVector(bytes: Array[Byte]): Array[Double] = {
+  private def deserializeDoubleVector(bytes: Array[Byte]): Array[Double] = {
     val packetLength = bytes.length
     if (packetLength < 16) {
       throw new IllegalArgumentException("Byte array too short.")
@@ -46,7 +49,7 @@ class PythonMLLibAPI extends Serializable {
     return ans
   }
 
-  def serializeDoubleVector(doubles: Array[Double]): Array[Byte] = {
+  private def serializeDoubleVector(doubles: Array[Double]): Array[Byte] = {
     val len = doubles.length
     val bytes = new Array[Byte](16 + 8 * len)
     val bb = ByteBuffer.wrap(bytes)
@@ -58,7 +61,7 @@ class PythonMLLibAPI extends Serializable {
     return bytes
   }
 
-  def deserializeDoubleMatrix(bytes: Array[Byte]): Array[Array[Double]] = {
+  private def deserializeDoubleMatrix(bytes: Array[Byte]): Array[Array[Double]] = {
     val packetLength = bytes.length
     if (packetLength < 24) {
       throw new IllegalArgumentException("Byte array too short.")
@@ -84,7 +87,7 @@ class PythonMLLibAPI extends Serializable {
     return ans
   }
 
-  def serializeDoubleMatrix(doubles: Array[Array[Double]]): Array[Byte] = {
+  private def serializeDoubleMatrix(doubles: Array[Array[Double]]): Array[Byte] = {
     val rows = doubles.length
     var cols = 0
     if (rows > 0) {
@@ -104,7 +107,7 @@ class PythonMLLibAPI extends Serializable {
     return bytes
   }
 
-  def trainRegressionModel(trainFunc: (RDD[LabeledPoint], Array[Double]) => GeneralizedLinearModel,
+  private def trainRegressionModel(trainFunc: (RDD[LabeledPoint], Array[Double]) => GeneralizedLinearModel,
       dataBytesJRDD: JavaRDD[Array[Byte]], initialWeightsBA: Array[Byte]):
       java.util.LinkedList[java.lang.Object] = {
     val data = dataBytesJRDD.rdd.map(xBytes => {
@@ -119,6 +122,9 @@ class PythonMLLibAPI extends Serializable {
     return ret
   }
 
+  /**
+   * Java stub for Python mllib LinearRegressionModel.train()
+   */
   def trainLinearRegressionModel(dataBytesJRDD: JavaRDD[Array[Byte]],
       numIterations: Int, stepSize: Double, miniBatchFraction: Double,
       initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
@@ -128,6 +134,9 @@ class PythonMLLibAPI extends Serializable {
         dataBytesJRDD, initialWeightsBA)
   }
 
+  /**
+   * Java stub for Python mllib LassoModel.train()
+   */
   def trainLassoModel(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int,
       stepSize: Double, regParam: Double, miniBatchFraction: Double,
       initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
@@ -137,6 +146,9 @@ class PythonMLLibAPI extends Serializable {
         dataBytesJRDD, initialWeightsBA)
   }
 
+  /**
+   * Java stub for Python mllib RidgeRegressionModel.train()
+   */
   def trainRidgeModel(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int,
       stepSize: Double, regParam: Double, miniBatchFraction: Double,
       initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
@@ -146,6 +158,9 @@ class PythonMLLibAPI extends Serializable {
         dataBytesJRDD, initialWeightsBA)
   }
 
+  /**
+   * Java stub for Python mllib SVMModel.train()
+   */
   def trainSVMModel(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int,
       stepSize: Double, regParam: Double, miniBatchFraction: Double,
       initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
@@ -155,6 +170,9 @@ class PythonMLLibAPI extends Serializable {
         dataBytesJRDD, initialWeightsBA)
   }
 
+  /**
+   * Java stub for Python mllib LogisticRegressionModel.train()
+   */
   def trainLogisticRegressionModel(dataBytesJRDD: JavaRDD[Array[Byte]],
       numIterations: Int, stepSize: Double, miniBatchFraction: Double,
       initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
@@ -164,6 +182,9 @@ class PythonMLLibAPI extends Serializable {
         dataBytesJRDD, initialWeightsBA)
   }
 
+  /**
+   * Java stub for Python mllib KMeansModel.train()
+   */
   def trainKMeansModel(dataBytesJRDD: JavaRDD[Array[Byte]], k: Int,
       maxIterations: Int, runs: Int, initializationMode: String):
       java.util.List[java.lang.Object] = {


[06/28] git commit: Python side of python bindings for linear, Lasso, and ridge regression

Posted by ma...@apache.org.
Python side of python bindings for linear, Lasso, and ridge regression


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/2328bdd0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/2328bdd0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/2328bdd0

Branch: refs/heads/master
Commit: 2328bdd00f701ca3b1bc7fdf8b2968fafc58fd11
Parents: ded67ee
Author: Tor Myklebust <tm...@gmail.com>
Authored: Thu Dec 19 22:45:16 2013 -0500
Committer: Tor Myklebust <tm...@gmail.com>
Committed: Thu Dec 19 22:45:16 2013 -0500

----------------------------------------------------------------------
 python/pyspark/__init__.py |  6 ++-
 python/pyspark/mllib.py    | 81 ++++++++++++++++++++++++++++++++++-------
 2 files changed, 72 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2328bdd0/python/pyspark/__init__.py
----------------------------------------------------------------------
diff --git a/python/pyspark/__init__.py b/python/pyspark/__init__.py
index 9f71db3..7c8f914 100644
--- a/python/pyspark/__init__.py
+++ b/python/pyspark/__init__.py
@@ -42,7 +42,9 @@ from pyspark.context import SparkContext
 from pyspark.rdd import RDD
 from pyspark.files import SparkFiles
 from pyspark.storagelevel import StorageLevel
-from pyspark.mllib import LinearRegressionModel
+from pyspark.mllib import LinearRegressionModel, LassoModel, \
+    RidgeRegressionModel
 
 
-__all__ = ["SparkContext", "RDD", "SparkFiles", "StorageLevel", "LinearRegressionModel"];
+__all__ = ["SparkContext", "RDD", "SparkFiles", "StorageLevel", \
+    "LinearRegressionModel", "LassoModel", "RidgeRegressionModel"];

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2328bdd0/python/pyspark/mllib.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib.py b/python/pyspark/mllib.py
index 0dfc490..d312787 100644
--- a/python/pyspark/mllib.py
+++ b/python/pyspark/mllib.py
@@ -75,7 +75,7 @@ def _deserialize_double_matrix(ba):
     else:
         raise TypeError("_deserialize_double_matrix called on a non-bytearray")
 
-class LinearRegressionModel(object):
+class LinearModel(object):
     def __init__(self, coeff, intercept):
         self._coeff = coeff
         self._intercept = intercept
@@ -83,7 +83,7 @@ class LinearRegressionModel(object):
     def predict(self, x):
         if (type(x) == ndarray):
             if (x.ndim == 1):
-                return dot(_coeff, x) - _intercept
+                return dot(_coeff, x) + _intercept
             else:
                 raise RuntimeError("Bulk predict not yet supported.")
         elif (type(x) == RDD):
@@ -92,16 +92,71 @@ class LinearRegressionModel(object):
             raise TypeError("Bad type argument to "
                             "LinearRegressionModel::predict")
 
+# Map a pickled Python RDD of numpy double vectors to a Java RDD of
+# _serialized_double_vectors
+def _get_unmangled_double_vector_rdd(data):
+    dataBytes = data.map(_serialize_double_vector)
+    dataBytes._bypass_serializer = True
+    dataBytes.cache()
+    return dataBytes;
+
+# If we weren't given initial weights, take a zero vector of the appropriate
+# length.
+def _get_initial_weights(initial_weights, data):
+    if initial_weights is None:
+        initial_weights = data.first()
+        if type(initial_weights) != ndarray:
+            raise TypeError("At least one data element has type "
+                    + type(initial_weights) + " which is not ndarray")
+        if initial_weights.ndim != 1:
+            raise TypeError("At least one data element has "
+                    + initial_weights.ndim + " dimensions, which is not 1")
+        initial_weights = zeros([initial_weights.shape[0] - 1]);
+    return initial_weights;
+
+# train_func should take two parameters, namely data and initial_weights, and
+# return the result of a call to the appropriate JVM stub.
+# _regression_train_wrapper is responsible for setup and error checking.
+def _regression_train_wrapper(sc, train_func, klass, data, initial_weights):
+    initial_weights = _get_initial_weights(initial_weights, data)
+    dataBytes = _get_unmangled_double_vector_rdd(data)
+    ans = train_func(dataBytes, _serialize_double_vector(initial_weights))
+    if len(ans) != 2:
+        raise RuntimeError("JVM call result had unexpected length");
+    elif type(ans[0]) != bytearray:
+        raise RuntimeError("JVM call result had first element of type "
+                + type(ans[0]) + " which is not bytearray");
+    elif type(ans[1]) != float:
+        raise RuntimeError("JVM call result had second element of type "
+                + type(ans[0]) + " which is not float");
+    return klass(_deserialize_double_vector(ans[0]), ans[1]);
+
+class LinearRegressionModel(LinearModel):
     @classmethod
-    def train(cls, sc, data):
+    def train(cls, sc, data, iterations=100, step=1.0,
+              mini_batch_fraction=1.0, initial_weights=None):
         """Train a linear regression model on the given data."""
-        dataBytes = data.map(_serialize_double_vector)
-        dataBytes._bypass_serializer = True
-        dataBytes.cache()
-        api = sc._jvm.PythonMLLibAPI()
-        ans = api.trainLinearRegressionModel(dataBytes._jrdd)
-        if (len(ans) != 2 or type(ans[0]) != bytearray
-                or type(ans[1]) != float):
-            raise RuntimeError("train_linear_regression_model received "
-                               "garbage from JVM")
-        return LinearRegressionModel(_deserialize_double_vector(ans[0]), ans[1])
+        return _regression_train_wrapper(sc, lambda d, i:
+                sc._jvm.PythonMLLibAPI().trainLinearRegressionModel(
+                        d._jrdd, iterations, step, mini_batch_fraction, i),
+                LinearRegressionModel, data, initial_weights)
+
+class LassoModel(LinearModel):
+    @classmethod
+    def train(cls, sc, data, iterations=100, step=1.0, reg_param=1.0,
+              mini_batch_fraction=1.0, initial_weights=None):
+        """Train a Lasso regression model on the given data."""
+        return _regression_train_wrapper(sc, lambda d, i:
+                sc._jvm.PythonMLLibAPI().trainLassoModel(d._jrdd,
+                        iterations, step, reg_param, mini_batch_fraction, i),
+                LassoModel, data, initial_weights)
+
+class RidgeRegressionModel(LinearModel):
+    @classmethod
+    def train(cls, sc, data, iterations=100, step=1.0, reg_param=1.0,
+              mini_batch_fraction=1.0, initial_weights=None):
+        """Train a ridge regression model on the given data."""
+        return _regression_train_wrapper(sc, lambda d, i:
+                sc._jvm.PythonMLLibAPI().trainRidgeModel(d._jrdd,
+                        iterations, step, reg_param, mini_batch_fraction, i),
+                RidgeRegressionModel, data, initial_weights)


[22/28] git commit: Remove useless line from test stub.

Posted by ma...@apache.org.
Remove useless line from test stub.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/86e38c49
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/86e38c49
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/86e38c49

Branch: refs/heads/master
Commit: 86e38c49420098da422a17e7c098efa34c94c35b
Parents: 4efec6e
Author: Tor Myklebust <tm...@gmail.com>
Authored: Tue Dec 24 16:49:31 2013 -0500
Committer: Tor Myklebust <tm...@gmail.com>
Committed: Tue Dec 24 16:49:31 2013 -0500

----------------------------------------------------------------------
 python/pyspark/mllib.py | 1 -
 1 file changed, 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/86e38c49/python/pyspark/mllib.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib.py b/python/pyspark/mllib.py
index 1f5a5f6..46f368b 100644
--- a/python/pyspark/mllib.py
+++ b/python/pyspark/mllib.py
@@ -384,7 +384,6 @@ def _test():
     (failure_count, test_count) = doctest.testmod(globs=globs,
             optionflags=doctest.ELLIPSIS)
     globs['sc'].stop()
-    print failure_count,"failures among",test_count,"tests"
     if failure_count:
         exit(-1)
 


[11/28] git commit: Whitespace.

Posted by ma...@apache.org.
Whitespace.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/d89cc1e2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/d89cc1e2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/d89cc1e2

Branch: refs/heads/master
Commit: d89cc1e28a88a5f943fed096c4bd647f79753c9f
Parents: 319520b
Author: Tor Myklebust <tm...@gmail.com>
Authored: Fri Dec 20 01:50:42 2013 -0500
Committer: Tor Myklebust <tm...@gmail.com>
Committed: Fri Dec 20 01:50:42 2013 -0500

----------------------------------------------------------------------
 python/pyspark/mllib.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d89cc1e2/python/pyspark/mllib.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib.py b/python/pyspark/mllib.py
index e7e2216..1877404 100644
--- a/python/pyspark/mllib.py
+++ b/python/pyspark/mllib.py
@@ -276,7 +276,7 @@ class SVMModel(LinearModel):
 class KMeansModel(object):
     """A clustering model derived from the k-means method.
 
-    >>> data = array([0.0, 0.0, 1.0,1.0, 9.0,8.0, 8.0,9.0]).reshape(4,2)
+    >>> data = array([0.0,0.0, 1.0,1.0, 9.0,8.0, 8.0,9.0]).reshape(4,2)
     >>> clusters = KMeansModel.train(sc, sc.parallelize(data), 2, maxIterations=10, runs=30, initialization_mode="random")
     >>> clusters.predict(array([0.0, 0.0])) == clusters.predict(array([1.0, 1.0]))
     True


[19/28] git commit: Fix error message ugliness.

Posted by ma...@apache.org.
Fix error message ugliness.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/2402180b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/2402180b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/2402180b

Branch: refs/heads/master
Commit: 2402180b32d530319d0526490afa3cfafc5c36b8
Parents: cbb2811
Author: Tor Myklebust <tm...@gmail.com>
Authored: Tue Dec 24 16:18:33 2013 -0500
Committer: Tor Myklebust <tm...@gmail.com>
Committed: Tue Dec 24 16:18:33 2013 -0500

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala   | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2402180b/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala
index 4620cab..67ec974 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala
@@ -42,7 +42,7 @@ class PythonMLLibAPI extends Serializable {
     }
     val length = bb.getLong()
     if (packetLength != 16 + 8 * length) {
-      throw new IllegalArgumentException("Length " + length + "is wrong.")
+      throw new IllegalArgumentException("Length " + length + " is wrong.")
     }
     val db = bb.asDoubleBuffer()
     val ans = new Array[Double](length.toInt)
@@ -76,7 +76,7 @@ class PythonMLLibAPI extends Serializable {
     val rows = bb.getLong()
     val cols = bb.getLong()
     if (packetLength != 24 + 8 * rows * cols) {
-      throw new IllegalArgumentException("Size " + rows + "x" + cols + "is wrong.")
+      throw new IllegalArgumentException("Size " + rows + "x" + cols + " is wrong.")
     }
     val db = bb.asDoubleBuffer()
     val ans = new Array[Array[Double]](rows.toInt)


[08/28] git commit: Python stubs for classification and clustering.

Posted by ma...@apache.org.
Python stubs for classification and clustering.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/73e17064
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/73e17064
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/73e17064

Branch: refs/heads/master
Commit: 73e17064c60c5aa2297dffbeaae4747890da0115
Parents: f99970e
Author: Tor Myklebust <tm...@gmail.com>
Authored: Fri Dec 20 00:12:48 2013 -0500
Committer: Tor Myklebust <tm...@gmail.com>
Committed: Fri Dec 20 00:12:48 2013 -0500

----------------------------------------------------------------------
 python/pyspark/__init__.py |   7 +--
 python/pyspark/mllib.py    | 105 +++++++++++++++++++++++++++++++++++-----
 2 files changed, 96 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/73e17064/python/pyspark/__init__.py
----------------------------------------------------------------------
diff --git a/python/pyspark/__init__.py b/python/pyspark/__init__.py
index 7c8f914..8b5bb79 100644
--- a/python/pyspark/__init__.py
+++ b/python/pyspark/__init__.py
@@ -43,8 +43,9 @@ from pyspark.rdd import RDD
 from pyspark.files import SparkFiles
 from pyspark.storagelevel import StorageLevel
 from pyspark.mllib import LinearRegressionModel, LassoModel, \
-    RidgeRegressionModel
+    RidgeRegressionModel, LogisticRegressionModel, SVMModel, KMeansModel
 
 
-__all__ = ["SparkContext", "RDD", "SparkFiles", "StorageLevel", \
-    "LinearRegressionModel", "LassoModel", "RidgeRegressionModel"];
+__all__ = ["SparkContext", "RDD", "SparkFiles", "StorageLevel",
+    "LinearRegressionModel", "LassoModel", "RidgeRegressionModel",
+    "LogisticRegressionModel", "SVMModel", "KMeansModel"];

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/73e17064/python/pyspark/mllib.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib.py b/python/pyspark/mllib.py
index d312787..21f3c03 100644
--- a/python/pyspark/mllib.py
+++ b/python/pyspark/mllib.py
@@ -75,22 +75,35 @@ def _deserialize_double_matrix(ba):
     else:
         raise TypeError("_deserialize_double_matrix called on a non-bytearray")
 
+def _linear_predictor_typecheck(x, coeffs):
+    """Predict the class of the vector x."""
+    if type(x) == ndarray:
+        if x.ndim == 1:
+            if x.shape == coeffs.shape:
+                pass
+            else:
+                raise RuntimeError("Got array of %d elements; wanted %d"
+                        % shape(x)[0] % shape(coeffs)[0])
+        else:
+            raise RuntimeError("Bulk predict not yet supported.")
+    elif (type(x) == RDD):
+        raise RuntimeError("Bulk predict not yet supported.")
+    else:
+        raise TypeError("Argument of type " + type(x) + " unsupported");
+
 class LinearModel(object):
+    """Something containing a vector of coefficients and an intercept."""
     def __init__(self, coeff, intercept):
         self._coeff = coeff
         self._intercept = intercept
 
+class LinearRegressionModelBase(LinearModel):
+    """A linear regression model."""
     def predict(self, x):
-        if (type(x) == ndarray):
-            if (x.ndim == 1):
-                return dot(_coeff, x) + _intercept
-            else:
-                raise RuntimeError("Bulk predict not yet supported.")
-        elif (type(x) == RDD):
-            raise RuntimeError("Bulk predict not yet supported.")
-        else:
-            raise TypeError("Bad type argument to "
-                            "LinearRegressionModel::predict")
+        """Predict the value of the dependent variable given a vector x"""
+        """containing values for the independent variables."""
+        _linear_predictor_typecheck(x, _coeff)
+        return dot(_coeff, x) + _intercept
 
 # Map a pickled Python RDD of numpy double vectors to a Java RDD of
 # _serialized_double_vectors
@@ -131,7 +144,8 @@ def _regression_train_wrapper(sc, train_func, klass, data, initial_weights):
                 + type(ans[0]) + " which is not float");
     return klass(_deserialize_double_vector(ans[0]), ans[1]);
 
-class LinearRegressionModel(LinearModel):
+class LinearRegressionModel(LinearRegressionModelBase):
+    """A linear regression model derived from a least-squares fit."""
     @classmethod
     def train(cls, sc, data, iterations=100, step=1.0,
               mini_batch_fraction=1.0, initial_weights=None):
@@ -141,7 +155,9 @@ class LinearRegressionModel(LinearModel):
                         d._jrdd, iterations, step, mini_batch_fraction, i),
                 LinearRegressionModel, data, initial_weights)
 
-class LassoModel(LinearModel):
+class LassoModel(LinearRegressionModelBase):
+    """A linear regression model derived from a least-squares fit with an """
+    """l_1 penalty term."""
     @classmethod
     def train(cls, sc, data, iterations=100, step=1.0, reg_param=1.0,
               mini_batch_fraction=1.0, initial_weights=None):
@@ -151,7 +167,9 @@ class LassoModel(LinearModel):
                         iterations, step, reg_param, mini_batch_fraction, i),
                 LassoModel, data, initial_weights)
 
-class RidgeRegressionModel(LinearModel):
+class RidgeRegressionModel(LinearRegressionModelBase):
+    """A linear regression model derived from a least-squares fit with an """
+    """l_2 penalty term."""
     @classmethod
     def train(cls, sc, data, iterations=100, step=1.0, reg_param=1.0,
               mini_batch_fraction=1.0, initial_weights=None):
@@ -160,3 +178,64 @@ class RidgeRegressionModel(LinearModel):
                 sc._jvm.PythonMLLibAPI().trainRidgeModel(d._jrdd,
                         iterations, step, reg_param, mini_batch_fraction, i),
                 RidgeRegressionModel, data, initial_weights)
+
+class LogisticRegressionModel(LinearModel):
+    """A linear binary classification model derived from logistic regression."""
+    def predict(self, x):
+        _linear_predictor_typecheck(x, _coeff)
+        margin = dot(x, _coeff) + intercept
+        prob = 1/(1 + exp(-margin))
+        return 1 if prob > 0.5 else 0
+
+    @classmethod
+    def train(cls, sc, data, iterations=100, step=1.0,
+              mini_batch_fraction=1.0, initial_weights=None):
+        """Train a logistic regression model on the given data."""
+        return _regression_train_wrapper(sc, lambda d, i:
+                sc._jvm.PythonMLLibAPI().trainLogisticRegressionModel(d._jrdd,
+                        iterations, step, mini_batch_fraction, i),
+                LogisticRegressionModel, data, initial_weights)
+
+class SVMModel(LinearModel):
+    """A support vector machine."""
+    def predict(self, x):
+        _linear_predictor_typecheck(x, _coeff)
+        margin = dot(x, _coeff) + intercept
+        return 1 if margin >= 0 else 0
+    @classmethod
+    def train(cls, sc, data, iterations=100, step=1.0, reg_param=1.0,
+              mini_batch_fraction=1.0, initial_weights=None):
+        """Train a support vector machine on the given data."""
+        return _regression_train_wrapper(sc, lambda d, i:
+                sc._jvm.PythonMLLibAPI().trainSVMModel(d._jrdd,
+                        iterations, step, reg_param, mini_batch_fraction, i),
+                SVMModel, data, initial_weights)
+
+class KMeansModel(object):
+    """A clustering model derived from the k-means method."""
+    def __init__(self, centers_):
+        self.centers = centers_
+
+    def predict(self, x):
+        best = 0
+        best_distance = 1e75
+        for i in range(0, centers.shape[0]):
+            diff = x - centers[i]
+            distance = sqrt(dot(diff, diff))
+            if distance < best_distance:
+                best = i
+                best_distance = distance
+        return best
+
+    @classmethod
+    def train(cls, sc, data, k, maxIterations = 100, runs = 1,
+            initialization_mode="k-means||"):
+        dataBytes = _get_unmangled_double_vector_rdd(data)
+        ans = sc._jvm.PythonMLLibAPI().trainKMeansModel(dataBytes._jrdd,
+                k, maxIterations, runs, initialization_mode)
+        if len(ans) != 1:
+            raise RuntimeError("JVM call result had unexpected length");
+        elif type(ans[0]) != bytearray:
+            raise RuntimeError("JVM call result had first element of type "
+                    + type(ans[0]) + " which is not bytearray");
+        return KMeansModel(_deserialize_double_matrix(ans[0]));


[21/28] git commit: Python change for move of PythonMLLibAPI.

Posted by ma...@apache.org.
Python change for move of PythonMLLibAPI.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/4efec6eb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/4efec6eb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/4efec6eb

Branch: refs/heads/master
Commit: 4efec6eb941c1c6cdec884174ea98c040a277cde
Parents: 58e2a7d
Author: Tor Myklebust <tm...@gmail.com>
Authored: Tue Dec 24 16:49:03 2013 -0500
Committer: Tor Myklebust <tm...@gmail.com>
Committed: Tue Dec 24 16:49:03 2013 -0500

----------------------------------------------------------------------
 python/pyspark/java_gateway.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/4efec6eb/python/pyspark/java_gateway.py
----------------------------------------------------------------------
diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py
index 2941984..eb79135 100644
--- a/python/pyspark/java_gateway.py
+++ b/python/pyspark/java_gateway.py
@@ -62,6 +62,6 @@ def launch_gateway():
     # Import the classes used by PySpark
     java_import(gateway.jvm, "org.apache.spark.api.java.*")
     java_import(gateway.jvm, "org.apache.spark.api.python.*")
-    java_import(gateway.jvm, "org.apache.spark.mllib.api.*")
+    java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*")
     java_import(gateway.jvm, "scala.Tuple2")
     return gateway


[20/28] git commit: Move PythonMLLibAPI into its own package.

Posted by ma...@apache.org.
Move PythonMLLibAPI into its own package.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/58e2a7d6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/58e2a7d6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/58e2a7d6

Branch: refs/heads/master
Commit: 58e2a7d6d4f036b20896674b1cac076d8daa55e8
Parents: 2402180
Author: Tor Myklebust <tm...@gmail.com>
Authored: Tue Dec 24 16:48:40 2013 -0500
Committer: Tor Myklebust <tm...@gmail.com>
Committed: Tue Dec 24 16:48:40 2013 -0500

----------------------------------------------------------------------
 .../apache/spark/mllib/api/PythonMLLibAPI.scala | 231 ------------------
 .../spark/mllib/api/python/PythonMLLibAPI.scala | 232 +++++++++++++++++++
 2 files changed, 232 insertions(+), 231 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/58e2a7d6/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala
deleted file mode 100644
index 67ec974..0000000
--- a/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala
+++ /dev/null
@@ -1,231 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import org.apache.spark.api.java.JavaRDD
-import org.apache.spark.mllib.regression._
-import org.apache.spark.mllib.classification._
-import org.apache.spark.mllib.clustering._
-import org.apache.spark.mllib.recommendation._
-import org.apache.spark.rdd.RDD
-import java.nio.ByteBuffer
-import java.nio.ByteOrder
-import java.nio.DoubleBuffer
-
-/**
- * The Java stubs necessary for the Python mllib bindings.
- */
-class PythonMLLibAPI extends Serializable {
-  private def deserializeDoubleVector(bytes: Array[Byte]): Array[Double] = {
-    val packetLength = bytes.length
-    if (packetLength < 16) {
-      throw new IllegalArgumentException("Byte array too short.")
-    }
-    val bb = ByteBuffer.wrap(bytes)
-    bb.order(ByteOrder.nativeOrder())
-    val magic = bb.getLong()
-    if (magic != 1) {
-      throw new IllegalArgumentException("Magic " + magic + " is wrong.")
-    }
-    val length = bb.getLong()
-    if (packetLength != 16 + 8 * length) {
-      throw new IllegalArgumentException("Length " + length + " is wrong.")
-    }
-    val db = bb.asDoubleBuffer()
-    val ans = new Array[Double](length.toInt)
-    db.get(ans)
-    return ans
-  }
-
-  private def serializeDoubleVector(doubles: Array[Double]): Array[Byte] = {
-    val len = doubles.length
-    val bytes = new Array[Byte](16 + 8 * len)
-    val bb = ByteBuffer.wrap(bytes)
-    bb.order(ByteOrder.nativeOrder())
-    bb.putLong(1)
-    bb.putLong(len)
-    val db = bb.asDoubleBuffer()
-    db.put(doubles)
-    return bytes
-  }
-
-  private def deserializeDoubleMatrix(bytes: Array[Byte]): Array[Array[Double]] = {
-    val packetLength = bytes.length
-    if (packetLength < 24) {
-      throw new IllegalArgumentException("Byte array too short.")
-    }
-    val bb = ByteBuffer.wrap(bytes)
-    bb.order(ByteOrder.nativeOrder())
-    val magic = bb.getLong()
-    if (magic != 2) {
-      throw new IllegalArgumentException("Magic " + magic + " is wrong.")
-    }
-    val rows = bb.getLong()
-    val cols = bb.getLong()
-    if (packetLength != 24 + 8 * rows * cols) {
-      throw new IllegalArgumentException("Size " + rows + "x" + cols + " is wrong.")
-    }
-    val db = bb.asDoubleBuffer()
-    val ans = new Array[Array[Double]](rows.toInt)
-    var i = 0
-    for (i <- 0 until rows.toInt) {
-      ans(i) = new Array[Double](cols.toInt)
-      db.get(ans(i))
-    }
-    return ans
-  }
-
-  private def serializeDoubleMatrix(doubles: Array[Array[Double]]): Array[Byte] = {
-    val rows = doubles.length
-    var cols = 0
-    if (rows > 0) {
-      cols = doubles(0).length
-    }
-    val bytes = new Array[Byte](24 + 8 * rows * cols)
-    val bb = ByteBuffer.wrap(bytes)
-    bb.order(ByteOrder.nativeOrder())
-    bb.putLong(2)
-    bb.putLong(rows)
-    bb.putLong(cols)
-    val db = bb.asDoubleBuffer()
-    var i = 0
-    for (i <- 0 until rows) {
-      db.put(doubles(i))
-    }
-    return bytes
-  }
-
-  private def trainRegressionModel(trainFunc: (RDD[LabeledPoint], Array[Double]) => GeneralizedLinearModel,
-      dataBytesJRDD: JavaRDD[Array[Byte]], initialWeightsBA: Array[Byte]):
-      java.util.LinkedList[java.lang.Object] = {
-    val data = dataBytesJRDD.rdd.map(xBytes => {
-        val x = deserializeDoubleVector(xBytes)
-        LabeledPoint(x(0), x.slice(1, x.length))
-    })
-    val initialWeights = deserializeDoubleVector(initialWeightsBA)
-    val model = trainFunc(data, initialWeights)
-    val ret = new java.util.LinkedList[java.lang.Object]()
-    ret.add(serializeDoubleVector(model.weights))
-    ret.add(model.intercept: java.lang.Double)
-    return ret
-  }
-
-  /**
-   * Java stub for Python mllib LinearRegressionModel.train()
-   */
-  def trainLinearRegressionModel(dataBytesJRDD: JavaRDD[Array[Byte]],
-      numIterations: Int, stepSize: Double, miniBatchFraction: Double,
-      initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
-    return trainRegressionModel((data, initialWeights) =>
-        LinearRegressionWithSGD.train(data, numIterations, stepSize,
-                                      miniBatchFraction, initialWeights),
-        dataBytesJRDD, initialWeightsBA)
-  }
-
-  /**
-   * Java stub for Python mllib LassoModel.train()
-   */
-  def trainLassoModel(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int,
-      stepSize: Double, regParam: Double, miniBatchFraction: Double,
-      initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
-    return trainRegressionModel((data, initialWeights) =>
-        LassoWithSGD.train(data, numIterations, stepSize, regParam,
-                           miniBatchFraction, initialWeights),
-        dataBytesJRDD, initialWeightsBA)
-  }
-
-  /**
-   * Java stub for Python mllib RidgeRegressionModel.train()
-   */
-  def trainRidgeModel(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int,
-      stepSize: Double, regParam: Double, miniBatchFraction: Double,
-      initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
-    return trainRegressionModel((data, initialWeights) =>
-        RidgeRegressionWithSGD.train(data, numIterations, stepSize, regParam,
-                                     miniBatchFraction, initialWeights),
-        dataBytesJRDD, initialWeightsBA)
-  }
-
-  /**
-   * Java stub for Python mllib SVMModel.train()
-   */
-  def trainSVMModel(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int,
-      stepSize: Double, regParam: Double, miniBatchFraction: Double,
-      initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
-    return trainRegressionModel((data, initialWeights) =>
-        SVMWithSGD.train(data, numIterations, stepSize, regParam,
-                                     miniBatchFraction, initialWeights),
-        dataBytesJRDD, initialWeightsBA)
-  }
-
-  /**
-   * Java stub for Python mllib LogisticRegressionModel.train()
-   */
-  def trainLogisticRegressionModel(dataBytesJRDD: JavaRDD[Array[Byte]],
-      numIterations: Int, stepSize: Double, miniBatchFraction: Double,
-      initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
-    return trainRegressionModel((data, initialWeights) =>
-        LogisticRegressionWithSGD.train(data, numIterations, stepSize,
-                                     miniBatchFraction, initialWeights),
-        dataBytesJRDD, initialWeightsBA)
-  }
-
-  /**
-   * Java stub for Python mllib KMeansModel.train()
-   */
-  def trainKMeansModel(dataBytesJRDD: JavaRDD[Array[Byte]], k: Int,
-      maxIterations: Int, runs: Int, initializationMode: String):
-      java.util.List[java.lang.Object] = {
-    val data = dataBytesJRDD.rdd.map(xBytes => deserializeDoubleVector(xBytes))
-    val model = KMeans.train(data, k, maxIterations, runs, initializationMode)
-    val ret = new java.util.LinkedList[java.lang.Object]()
-    ret.add(serializeDoubleMatrix(model.clusterCenters))
-    return ret
-  }
-
-  private def unpackRating(ratingBytes: Array[Byte]): Rating = {
-    val bb = ByteBuffer.wrap(ratingBytes)
-    bb.order(ByteOrder.nativeOrder())
-    val user = bb.getInt()
-    val product = bb.getInt()
-    val rating = bb.getDouble()
-    return new Rating(user, product, rating)
-  }
-
-  /**
-   * Java stub for Python mllib ALSModel.train().  This stub returns a handle
-   * to the Java object instead of the content of the Java object.  Extra care
-   * needs to be taken in the Python code to ensure it gets freed on exit; see
-   * the Py4J documentation.
-   */
-  def trainALSModel(ratingsBytesJRDD: JavaRDD[Array[Byte]], rank: Int,
-      iterations: Int, lambda: Double, blocks: Int): MatrixFactorizationModel = {
-    val ratings = ratingsBytesJRDD.rdd.map(unpackRating)
-    return ALS.train(ratings, rank, iterations, lambda, blocks)
-  }
-
-  /**
-   * Java stub for Python mllib ALSModel.trainImplicit().  This stub returns a
-   * handle to the Java object instead of the content of the Java object.
-   * Extra care needs to be taken in the Python code to ensure it gets freed on
-   * exit; see the Py4J documentation.
-   */
-  def trainImplicitALSModel(ratingsBytesJRDD: JavaRDD[Array[Byte]], rank: Int,
-      iterations: Int, lambda: Double, blocks: Int, alpha: Double): MatrixFactorizationModel = {
-    val ratings = ratingsBytesJRDD.rdd.map(unpackRating)
-    return ALS.trainImplicit(ratings, rank, iterations, lambda, blocks, alpha)
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/58e2a7d6/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
new file mode 100644
index 0000000..ca47432
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.api.python
+import org.apache.spark.api.java.JavaRDD
+import org.apache.spark.mllib.regression._
+import org.apache.spark.mllib.classification._
+import org.apache.spark.mllib.clustering._
+import org.apache.spark.mllib.recommendation._
+import org.apache.spark.rdd.RDD
+import java.nio.ByteBuffer
+import java.nio.ByteOrder
+import java.nio.DoubleBuffer
+
+/**
+ * The Java stubs necessary for the Python mllib bindings.
+ */
+class PythonMLLibAPI extends Serializable {
+  private def deserializeDoubleVector(bytes: Array[Byte]): Array[Double] = {
+    val packetLength = bytes.length
+    if (packetLength < 16) {
+      throw new IllegalArgumentException("Byte array too short.")
+    }
+    val bb = ByteBuffer.wrap(bytes)
+    bb.order(ByteOrder.nativeOrder())
+    val magic = bb.getLong()
+    if (magic != 1) {
+      throw new IllegalArgumentException("Magic " + magic + " is wrong.")
+    }
+    val length = bb.getLong()
+    if (packetLength != 16 + 8 * length) {
+      throw new IllegalArgumentException("Length " + length + " is wrong.")
+    }
+    val db = bb.asDoubleBuffer()
+    val ans = new Array[Double](length.toInt)
+    db.get(ans)
+    return ans
+  }
+
+  private def serializeDoubleVector(doubles: Array[Double]): Array[Byte] = {
+    val len = doubles.length
+    val bytes = new Array[Byte](16 + 8 * len)
+    val bb = ByteBuffer.wrap(bytes)
+    bb.order(ByteOrder.nativeOrder())
+    bb.putLong(1)
+    bb.putLong(len)
+    val db = bb.asDoubleBuffer()
+    db.put(doubles)
+    return bytes
+  }
+
+  private def deserializeDoubleMatrix(bytes: Array[Byte]): Array[Array[Double]] = {
+    val packetLength = bytes.length
+    if (packetLength < 24) {
+      throw new IllegalArgumentException("Byte array too short.")
+    }
+    val bb = ByteBuffer.wrap(bytes)
+    bb.order(ByteOrder.nativeOrder())
+    val magic = bb.getLong()
+    if (magic != 2) {
+      throw new IllegalArgumentException("Magic " + magic + " is wrong.")
+    }
+    val rows = bb.getLong()
+    val cols = bb.getLong()
+    if (packetLength != 24 + 8 * rows * cols) {
+      throw new IllegalArgumentException("Size " + rows + "x" + cols + " is wrong.")
+    }
+    val db = bb.asDoubleBuffer()
+    val ans = new Array[Array[Double]](rows.toInt)
+    var i = 0
+    for (i <- 0 until rows.toInt) {
+      ans(i) = new Array[Double](cols.toInt)
+      db.get(ans(i))
+    }
+    return ans
+  }
+
+  private def serializeDoubleMatrix(doubles: Array[Array[Double]]): Array[Byte] = {
+    val rows = doubles.length
+    var cols = 0
+    if (rows > 0) {
+      cols = doubles(0).length
+    }
+    val bytes = new Array[Byte](24 + 8 * rows * cols)
+    val bb = ByteBuffer.wrap(bytes)
+    bb.order(ByteOrder.nativeOrder())
+    bb.putLong(2)
+    bb.putLong(rows)
+    bb.putLong(cols)
+    val db = bb.asDoubleBuffer()
+    var i = 0
+    for (i <- 0 until rows) {
+      db.put(doubles(i))
+    }
+    return bytes
+  }
+
+  private def trainRegressionModel(trainFunc: (RDD[LabeledPoint], Array[Double]) => GeneralizedLinearModel,
+      dataBytesJRDD: JavaRDD[Array[Byte]], initialWeightsBA: Array[Byte]):
+      java.util.LinkedList[java.lang.Object] = {
+    val data = dataBytesJRDD.rdd.map(xBytes => {
+        val x = deserializeDoubleVector(xBytes)
+        LabeledPoint(x(0), x.slice(1, x.length))
+    })
+    val initialWeights = deserializeDoubleVector(initialWeightsBA)
+    val model = trainFunc(data, initialWeights)
+    val ret = new java.util.LinkedList[java.lang.Object]()
+    ret.add(serializeDoubleVector(model.weights))
+    ret.add(model.intercept: java.lang.Double)
+    return ret
+  }
+
+  /**
+   * Java stub for Python mllib LinearRegressionModel.train()
+   */
+  def trainLinearRegressionModel(dataBytesJRDD: JavaRDD[Array[Byte]],
+      numIterations: Int, stepSize: Double, miniBatchFraction: Double,
+      initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
+    return trainRegressionModel((data, initialWeights) =>
+        LinearRegressionWithSGD.train(data, numIterations, stepSize,
+                                      miniBatchFraction, initialWeights),
+        dataBytesJRDD, initialWeightsBA)
+  }
+
+  /**
+   * Java stub for Python mllib LassoModel.train()
+   */
+  def trainLassoModel(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int,
+      stepSize: Double, regParam: Double, miniBatchFraction: Double,
+      initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
+    return trainRegressionModel((data, initialWeights) =>
+        LassoWithSGD.train(data, numIterations, stepSize, regParam,
+                           miniBatchFraction, initialWeights),
+        dataBytesJRDD, initialWeightsBA)
+  }
+
+  /**
+   * Java stub for Python mllib RidgeRegressionModel.train()
+   */
+  def trainRidgeModel(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int,
+      stepSize: Double, regParam: Double, miniBatchFraction: Double,
+      initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
+    return trainRegressionModel((data, initialWeights) =>
+        RidgeRegressionWithSGD.train(data, numIterations, stepSize, regParam,
+                                     miniBatchFraction, initialWeights),
+        dataBytesJRDD, initialWeightsBA)
+  }
+
+  /**
+   * Java stub for Python mllib SVMModel.train()
+   */
+  def trainSVMModel(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int,
+      stepSize: Double, regParam: Double, miniBatchFraction: Double,
+      initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
+    return trainRegressionModel((data, initialWeights) =>
+        SVMWithSGD.train(data, numIterations, stepSize, regParam,
+                                     miniBatchFraction, initialWeights),
+        dataBytesJRDD, initialWeightsBA)
+  }
+
+  /**
+   * Java stub for Python mllib LogisticRegressionModel.train()
+   */
+  def trainLogisticRegressionModel(dataBytesJRDD: JavaRDD[Array[Byte]],
+      numIterations: Int, stepSize: Double, miniBatchFraction: Double,
+      initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
+    return trainRegressionModel((data, initialWeights) =>
+        LogisticRegressionWithSGD.train(data, numIterations, stepSize,
+                                     miniBatchFraction, initialWeights),
+        dataBytesJRDD, initialWeightsBA)
+  }
+
+  /**
+   * Java stub for Python mllib KMeansModel.train()
+   */
+  def trainKMeansModel(dataBytesJRDD: JavaRDD[Array[Byte]], k: Int,
+      maxIterations: Int, runs: Int, initializationMode: String):
+      java.util.List[java.lang.Object] = {
+    val data = dataBytesJRDD.rdd.map(xBytes => deserializeDoubleVector(xBytes))
+    val model = KMeans.train(data, k, maxIterations, runs, initializationMode)
+    val ret = new java.util.LinkedList[java.lang.Object]()
+    ret.add(serializeDoubleMatrix(model.clusterCenters))
+    return ret
+  }
+
+  private def unpackRating(ratingBytes: Array[Byte]): Rating = {
+    val bb = ByteBuffer.wrap(ratingBytes)
+    bb.order(ByteOrder.nativeOrder())
+    val user = bb.getInt()
+    val product = bb.getInt()
+    val rating = bb.getDouble()
+    return new Rating(user, product, rating)
+  }
+
+  /**
+   * Java stub for Python mllib ALSModel.train().  This stub returns a handle
+   * to the Java object instead of the content of the Java object.  Extra care
+   * needs to be taken in the Python code to ensure it gets freed on exit; see
+   * the Py4J documentation.
+   */
+  def trainALSModel(ratingsBytesJRDD: JavaRDD[Array[Byte]], rank: Int,
+      iterations: Int, lambda: Double, blocks: Int): MatrixFactorizationModel = {
+    val ratings = ratingsBytesJRDD.rdd.map(unpackRating)
+    return ALS.train(ratings, rank, iterations, lambda, blocks)
+  }
+
+  /**
+   * Java stub for Python mllib ALSModel.trainImplicit().  This stub returns a
+   * handle to the Java object instead of the content of the Java object.
+   * Extra care needs to be taken in the Python code to ensure it gets freed on
+   * exit; see the Py4J documentation.
+   */
+  def trainImplicitALSModel(ratingsBytesJRDD: JavaRDD[Array[Byte]], rank: Int,
+      iterations: Int, lambda: Double, blocks: Int, alpha: Double): MatrixFactorizationModel = {
+    val ratings = ratingsBytesJRDD.rdd.map(unpackRating)
+    return ALS.trainImplicit(ratings, rank, iterations, lambda, blocks, alpha)
+  }
+}


[25/28] git commit: Initial weights in Scala are ones; do that too. Also fix some errors.

Posted by ma...@apache.org.
Initial weights in Scala are ones; do that too.  Also fix some errors.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/02208a17
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/02208a17
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/02208a17

Branch: refs/heads/master
Commit: 02208a175c76be111eeb66dc19c7499a6656a067
Parents: 4e82139
Author: Tor Myklebust <tm...@gmail.com>
Authored: Wed Dec 25 00:53:48 2013 -0500
Committer: Tor Myklebust <tm...@gmail.com>
Committed: Wed Dec 25 00:53:48 2013 -0500

----------------------------------------------------------------------
 python/pyspark/mllib/_common.py | 12 ++++++------
 1 file changed, 6 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/02208a17/python/pyspark/mllib/_common.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/_common.py b/python/pyspark/mllib/_common.py
index e68bd8a..e74ba0f 100644
--- a/python/pyspark/mllib/_common.py
+++ b/python/pyspark/mllib/_common.py
@@ -15,7 +15,7 @@
 # limitations under the License.
 #
 
-from numpy import ndarray, copyto, float64, int64, int32, zeros, array_equal, array, dot, shape
+from numpy import ndarray, copyto, float64, int64, int32, ones, array_equal, array, dot, shape
 from pyspark import SparkContext
 
 # Double vector format:
@@ -143,7 +143,7 @@ def _linear_predictor_typecheck(x, coeffs):
     elif (type(x) == RDD):
         raise RuntimeError("Bulk predict not yet supported.")
     else:
-        raise TypeError("Argument of type " + type(x) + " unsupported")
+        raise TypeError("Argument of type " + type(x).__name__ + " unsupported")
 
 def _get_unmangled_rdd(data, serializer):
     dataBytes = data.map(serializer)
@@ -182,11 +182,11 @@ def _get_initial_weights(initial_weights, data):
         initial_weights = data.first()
         if type(initial_weights) != ndarray:
             raise TypeError("At least one data element has type "
-                    + type(initial_weights) + " which is not ndarray")
+                    + type(initial_weights).__name__ + " which is not ndarray")
         if initial_weights.ndim != 1:
             raise TypeError("At least one data element has "
                     + initial_weights.ndim + " dimensions, which is not 1")
-        initial_weights = zeros([initial_weights.shape[0] - 1])
+        initial_weights = ones([initial_weights.shape[0] - 1])
     return initial_weights
 
 # train_func should take two parameters, namely data and initial_weights, and
@@ -200,10 +200,10 @@ def _regression_train_wrapper(sc, train_func, klass, data, initial_weights):
         raise RuntimeError("JVM call result had unexpected length")
     elif type(ans[0]) != bytearray:
         raise RuntimeError("JVM call result had first element of type "
-                + type(ans[0]) + " which is not bytearray")
+                + type(ans[0]).__name__ + " which is not bytearray")
     elif type(ans[1]) != float:
         raise RuntimeError("JVM call result had second element of type "
-                + type(ans[0]) + " which is not float")
+                + type(ans[0]).__name__ + " which is not float")
     return klass(_deserialize_double_vector(ans[0]), ans[1])
 
 def _serialize_rating(r):


[16/28] git commit: Python stubs for ALSModel.

Posted by ma...@apache.org.
Python stubs for ALSModel.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/076fc162
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/076fc162
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/076fc162

Branch: refs/heads/master
Commit: 076fc1622190d342e20592c00ca19f8c0a56997f
Parents: b454fdc
Author: Tor Myklebust <tm...@gmail.com>
Authored: Sat Dec 21 14:54:01 2013 -0500
Committer: Tor Myklebust <tm...@gmail.com>
Committed: Sat Dec 21 14:54:01 2013 -0500

----------------------------------------------------------------------
 python/pyspark/__init__.py |  5 ++--
 python/pyspark/mllib.py    | 59 ++++++++++++++++++++++++++++++++++++-----
 2 files changed, 56 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/076fc162/python/pyspark/__init__.py
----------------------------------------------------------------------
diff --git a/python/pyspark/__init__.py b/python/pyspark/__init__.py
index 8b5bb79..3d73d95 100644
--- a/python/pyspark/__init__.py
+++ b/python/pyspark/__init__.py
@@ -43,9 +43,10 @@ from pyspark.rdd import RDD
 from pyspark.files import SparkFiles
 from pyspark.storagelevel import StorageLevel
 from pyspark.mllib import LinearRegressionModel, LassoModel, \
-    RidgeRegressionModel, LogisticRegressionModel, SVMModel, KMeansModel
+    RidgeRegressionModel, LogisticRegressionModel, SVMModel, KMeansModel, \
+    ALSModel
 
 
 __all__ = ["SparkContext", "RDD", "SparkFiles", "StorageLevel",
     "LinearRegressionModel", "LassoModel", "RidgeRegressionModel",
-    "LogisticRegressionModel", "SVMModel", "KMeansModel"];
+    "LogisticRegressionModel", "SVMModel", "KMeansModel", "ALSModel"];

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/076fc162/python/pyspark/mllib.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib.py b/python/pyspark/mllib.py
index 8848284..22187eb 100644
--- a/python/pyspark/mllib.py
+++ b/python/pyspark/mllib.py
@@ -164,14 +164,17 @@ class LinearRegressionModelBase(LinearModel):
         _linear_predictor_typecheck(x, self._coeff)
         return dot(self._coeff, x) + self._intercept
 
-# Map a pickled Python RDD of numpy double vectors to a Java RDD of
-# _serialized_double_vectors
-def _get_unmangled_double_vector_rdd(data):
-    dataBytes = data.map(_serialize_double_vector)
+def _get_unmangled_rdd(data, serializer):
+    dataBytes = data.map(serializer)
     dataBytes._bypass_serializer = True
     dataBytes.cache()
     return dataBytes
 
+# Map a pickled Python RDD of numpy double vectors to a Java RDD of
+# _serialized_double_vectors
+def _get_unmangled_double_vector_rdd(data):
+    return _get_unmangled_rdd(data, _serialize_double_vector)
+
 # If we weren't given initial weights, take a zero vector of the appropriate
 # length.
 def _get_initial_weights(initial_weights, data):
@@ -317,7 +320,7 @@ class KMeansModel(object):
         return best
 
     @classmethod
-    def train(cls, sc, data, k, maxIterations = 100, runs = 1,
+    def train(cls, sc, data, k, maxIterations=100, runs=1,
             initialization_mode="k-means||"):
         """Train a k-means clustering model."""
         dataBytes = _get_unmangled_double_vector_rdd(data)
@@ -330,12 +333,56 @@ class KMeansModel(object):
                     + type(ans[0]) + " which is not bytearray")
         return KMeansModel(_deserialize_double_matrix(ans[0]))
 
+def _serialize_rating(r):
+    ba = bytearray(16)
+    intpart = ndarray(shape=[2], buffer=ba, dtype=int32)
+    doublepart = ndarray(shape=[1], buffer=ba, dtype=float64, offset=8)
+    intpart[0], intpart[1], doublepart[0] = r
+    return ba
+
+class ALSModel(object):
+    """A matrix factorisation model trained by regularized alternating
+    least-squares.
+
+    >>> r1 = (1, 1, 1.0)
+    >>> r2 = (1, 2, 2.0)
+    >>> r3 = (2, 1, 2.0)
+    >>> ratings = sc.parallelize([r1, r2, r3])
+    >>> model = ALSModel.trainImplicit(sc, ratings, 1)
+    >>> model.predict(2,2) is not None
+    True
+    """
+
+    def __init__(self, sc, java_model):
+        self._context = sc
+        self._java_model = java_model
+
+    #def __del__(self):
+        #self._gateway.detach(self._java_model)
+
+    def predict(self, user, product):
+        return self._java_model.predict(user, product)
+
+    @classmethod
+    def train(cls, sc, ratings, rank, iterations=5, lambda_=0.01, blocks=-1):
+        ratingBytes = _get_unmangled_rdd(ratings, _serialize_rating)
+        mod = sc._jvm.PythonMLLibAPI().trainALSModel(ratingBytes._jrdd,
+                rank, iterations, lambda_, blocks)
+        return ALSModel(sc, mod)
+
+    @classmethod
+    def trainImplicit(cls, sc, ratings, rank, iterations=5, lambda_=0.01, blocks=-1, alpha=0.01):
+        ratingBytes = _get_unmangled_rdd(ratings, _serialize_rating)
+        mod = sc._jvm.PythonMLLibAPI().trainImplicitALSModel(ratingBytes._jrdd,
+                rank, iterations, lambda_, blocks, alpha)
+        return ALSModel(sc, mod)
+
 def _test():
     import doctest
     globs = globals().copy()
     globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
     (failure_count, test_count) = doctest.testmod(globs=globs,
-        optionflags=doctest.ELLIPSIS)
+            optionflags=doctest.ELLIPSIS)
     globs['sc'].stop()
     print failure_count,"failures among",test_count,"tests"
     if failure_count:


[27/28] git commit: Remove commented code in __init__.py.

Posted by ma...@apache.org.
Remove commented code in __init__.py.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/9cbcf814
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/9cbcf814
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/9cbcf814

Branch: refs/heads/master
Commit: 9cbcf81453a9afca58645969c1bc3ff366392734
Parents: 5e71354
Author: Tor Myklebust <tm...@gmail.com>
Authored: Wed Dec 25 14:12:42 2013 -0500
Committer: Tor Myklebust <tm...@gmail.com>
Committed: Wed Dec 25 14:12:42 2013 -0500

----------------------------------------------------------------------
 python/pyspark/mllib/__init__.py | 8 --------
 1 file changed, 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/9cbcf814/python/pyspark/mllib/__init__.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/__init__.py b/python/pyspark/mllib/__init__.py
index e9c62f3..b1a5df1 100644
--- a/python/pyspark/mllib/__init__.py
+++ b/python/pyspark/mllib/__init__.py
@@ -18,11 +18,3 @@
 """
 Python bindings for MLlib.
 """
-
-#from pyspark.mllib.regression import LinearRegressionModel, LassoModel, RidgeRegressionModel, LinearRegressionWithSGD, LassoWithSGD, RidgeRegressionWithSGD
-#from pyspark.mllib.classification import LogisticRegressionModel, SVMModel, LogisticRegressionWithSGD, SVMWithSGD
-#from pyspark.mllib.recommendation import MatrixFactorizationModel, ALS
-#from pyspark.mllib.clustering import KMeansModel, KMeans
-#
-#
-#__all__ = ["LinearRegressionModel", "LassoModel", "RidgeRegressionModel", "LinearRegressionWithSGD", "LassoWithSGD", "RidgeRegressionWithSGD", "LogisticRegressionModel", "SVMModel", "LogisticRegressionWithSGD", "SVMWithSGD", "MatrixFactorizationModel", "ALS", "KMeansModel", "KMeans"]


[17/28] git commit: Java stubs for ALSModel.

Posted by ma...@apache.org.
Java stubs for ALSModel.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/20f85eca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/20f85eca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/20f85eca

Branch: refs/heads/master
Commit: 20f85eca3d924aecd0fcf61cd516d9ac8e369dc1
Parents: 076fc16
Author: Tor Myklebust <tm...@gmail.com>
Authored: Sat Dec 21 14:54:13 2013 -0500
Committer: Tor Myklebust <tm...@gmail.com>
Committed: Sat Dec 21 14:54:13 2013 -0500

----------------------------------------------------------------------
 .../apache/spark/mllib/api/PythonMLLibAPI.scala | 34 ++++++++++++++++++++
 1 file changed, 34 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/20f85eca/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala
index 6472bf6..4620cab 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala
@@ -19,6 +19,7 @@ import org.apache.spark.api.java.JavaRDD
 import org.apache.spark.mllib.regression._
 import org.apache.spark.mllib.classification._
 import org.apache.spark.mllib.clustering._
+import org.apache.spark.mllib.recommendation._
 import org.apache.spark.rdd.RDD
 import java.nio.ByteBuffer
 import java.nio.ByteOrder
@@ -194,4 +195,37 @@ class PythonMLLibAPI extends Serializable {
     ret.add(serializeDoubleMatrix(model.clusterCenters))
     return ret
   }
+
+  private def unpackRating(ratingBytes: Array[Byte]): Rating = {
+    val bb = ByteBuffer.wrap(ratingBytes)
+    bb.order(ByteOrder.nativeOrder())
+    val user = bb.getInt()
+    val product = bb.getInt()
+    val rating = bb.getDouble()
+    return new Rating(user, product, rating)
+  }
+
+  /**
+   * Java stub for Python mllib ALSModel.train().  This stub returns a handle
+   * to the Java object instead of the content of the Java object.  Extra care
+   * needs to be taken in the Python code to ensure it gets freed on exit; see
+   * the Py4J documentation.
+   */
+  def trainALSModel(ratingsBytesJRDD: JavaRDD[Array[Byte]], rank: Int,
+      iterations: Int, lambda: Double, blocks: Int): MatrixFactorizationModel = {
+    val ratings = ratingsBytesJRDD.rdd.map(unpackRating)
+    return ALS.train(ratings, rank, iterations, lambda, blocks)
+  }
+
+  /**
+   * Java stub for Python mllib ALSModel.trainImplicit().  This stub returns a
+   * handle to the Java object instead of the content of the Java object.
+   * Extra care needs to be taken in the Python code to ensure it gets freed on
+   * exit; see the Py4J documentation.
+   */
+  def trainImplicitALSModel(ratingsBytesJRDD: JavaRDD[Array[Byte]], rank: Int,
+      iterations: Int, lambda: Double, blocks: Int, alpha: Double): MatrixFactorizationModel = {
+    val ratings = ratingsBytesJRDD.rdd.map(unpackRating)
+    return ALS.trainImplicit(ratings, rank, iterations, lambda, blocks, alpha)
+  }
 }


[02/28] git commit: The rest of the Python side of those bindings.

Posted by ma...@apache.org.
The rest of the Python side of those bindings.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/bf491bb3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/bf491bb3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/bf491bb3

Branch: refs/heads/master
Commit: bf491bb3c0a9008caa4ac112672a4760b3d1c7b8
Parents: 95915f8
Author: Tor Myklebust <tm...@gmail.com>
Authored: Thu Dec 19 01:29:51 2013 -0500
Committer: Tor Myklebust <tm...@gmail.com>
Committed: Thu Dec 19 01:29:51 2013 -0500

----------------------------------------------------------------------
 python/pyspark/__init__.py     | 3 ++-
 python/pyspark/java_gateway.py | 1 +
 python/pyspark/serializers.py  | 2 +-
 3 files changed, 4 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/bf491bb3/python/pyspark/__init__.py
----------------------------------------------------------------------
diff --git a/python/pyspark/__init__.py b/python/pyspark/__init__.py
index 1f35f6f..949406c 100644
--- a/python/pyspark/__init__.py
+++ b/python/pyspark/__init__.py
@@ -42,6 +42,7 @@ from pyspark.context import SparkContext
 from pyspark.rdd import RDD
 from pyspark.files import SparkFiles
 from pyspark.storagelevel import StorageLevel
+from pyspark.mllib import train_linear_regression_model
 
 
-__all__ = ["SparkContext", "RDD", "SparkFiles", "StorageLevel"]
+__all__ = ["SparkContext", "RDD", "SparkFiles", "StorageLevel", "train_linear_regression_model"]

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/bf491bb3/python/pyspark/java_gateway.py
----------------------------------------------------------------------
diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py
index e615c1e..2941984 100644
--- a/python/pyspark/java_gateway.py
+++ b/python/pyspark/java_gateway.py
@@ -62,5 +62,6 @@ def launch_gateway():
     # Import the classes used by PySpark
     java_import(gateway.jvm, "org.apache.spark.api.java.*")
     java_import(gateway.jvm, "org.apache.spark.api.python.*")
+    java_import(gateway.jvm, "org.apache.spark.mllib.api.*")
     java_import(gateway.jvm, "scala.Tuple2")
     return gateway

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/bf491bb3/python/pyspark/serializers.py
----------------------------------------------------------------------
diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py
index 811fa6f..2a500ab 100644
--- a/python/pyspark/serializers.py
+++ b/python/pyspark/serializers.py
@@ -308,4 +308,4 @@ def write_int(value, stream):
 
 def write_with_length(obj, stream):
     write_int(len(obj), stream)
-    stream.write(obj)
\ No newline at end of file
+    stream.write(obj)


[24/28] git commit: Scala stubs for updated Python bindings.

Posted by ma...@apache.org.
Scala stubs for updated Python bindings.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/4e821390
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/4e821390
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/4e821390

Branch: refs/heads/master
Commit: 4e821390bca0d1f40b6f2f011bdc71476a1d3aa4
Parents: 0516305
Author: Tor Myklebust <tm...@gmail.com>
Authored: Wed Dec 25 00:09:00 2013 -0500
Committer: Tor Myklebust <tm...@gmail.com>
Committed: Wed Dec 25 00:09:00 2013 -0500

----------------------------------------------------------------------
 .../spark/mllib/api/python/PythonMLLibAPI.scala | 26 ++++++++++----------
 1 file changed, 13 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/4e821390/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
index ca47432..8247c1e 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
@@ -125,9 +125,9 @@ class PythonMLLibAPI extends Serializable {
   }
 
   /**
-   * Java stub for Python mllib LinearRegressionModel.train()
+   * Java stub for Python mllib LinearRegressionWithSGD.train()
    */
-  def trainLinearRegressionModel(dataBytesJRDD: JavaRDD[Array[Byte]],
+  def trainLinearRegressionModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]],
       numIterations: Int, stepSize: Double, miniBatchFraction: Double,
       initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
     return trainRegressionModel((data, initialWeights) =>
@@ -137,9 +137,9 @@ class PythonMLLibAPI extends Serializable {
   }
 
   /**
-   * Java stub for Python mllib LassoModel.train()
+   * Java stub for Python mllib LassoWithSGD.train()
    */
-  def trainLassoModel(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int,
+  def trainLassoModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int,
       stepSize: Double, regParam: Double, miniBatchFraction: Double,
       initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
     return trainRegressionModel((data, initialWeights) =>
@@ -149,9 +149,9 @@ class PythonMLLibAPI extends Serializable {
   }
 
   /**
-   * Java stub for Python mllib RidgeRegressionModel.train()
+   * Java stub for Python mllib RidgeRegressionWithSGD.train()
    */
-  def trainRidgeModel(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int,
+  def trainRidgeModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int,
       stepSize: Double, regParam: Double, miniBatchFraction: Double,
       initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
     return trainRegressionModel((data, initialWeights) =>
@@ -161,9 +161,9 @@ class PythonMLLibAPI extends Serializable {
   }
 
   /**
-   * Java stub for Python mllib SVMModel.train()
+   * Java stub for Python mllib SVMWithSGD.train()
    */
-  def trainSVMModel(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int,
+  def trainSVMModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int,
       stepSize: Double, regParam: Double, miniBatchFraction: Double,
       initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
     return trainRegressionModel((data, initialWeights) =>
@@ -173,9 +173,9 @@ class PythonMLLibAPI extends Serializable {
   }
 
   /**
-   * Java stub for Python mllib LogisticRegressionModel.train()
+   * Java stub for Python mllib LogisticRegressionWithSGD.train()
    */
-  def trainLogisticRegressionModel(dataBytesJRDD: JavaRDD[Array[Byte]],
+  def trainLogisticRegressionModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]],
       numIterations: Int, stepSize: Double, miniBatchFraction: Double,
       initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
     return trainRegressionModel((data, initialWeights) =>
@@ -185,7 +185,7 @@ class PythonMLLibAPI extends Serializable {
   }
 
   /**
-   * Java stub for Python mllib KMeansModel.train()
+   * Java stub for Python mllib KMeans.train()
    */
   def trainKMeansModel(dataBytesJRDD: JavaRDD[Array[Byte]], k: Int,
       maxIterations: Int, runs: Int, initializationMode: String):
@@ -207,7 +207,7 @@ class PythonMLLibAPI extends Serializable {
   }
 
   /**
-   * Java stub for Python mllib ALSModel.train().  This stub returns a handle
+   * Java stub for Python mllib ALS.train().  This stub returns a handle
    * to the Java object instead of the content of the Java object.  Extra care
    * needs to be taken in the Python code to ensure it gets freed on exit; see
    * the Py4J documentation.
@@ -219,7 +219,7 @@ class PythonMLLibAPI extends Serializable {
   }
 
   /**
-   * Java stub for Python mllib ALSModel.trainImplicit().  This stub returns a
+   * Java stub for Python mllib ALS.trainImplicit().  This stub returns a
    * handle to the Java object instead of the content of the Java object.
    * Extra care needs to be taken in the Python code to ensure it gets freed on
    * exit; see the Py4J documentation.


[13/28] git commit: Change some docstrings and add some others.

Posted by ma...@apache.org.
Change some docstrings and add some others.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/0a5cacb9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/0a5cacb9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/0a5cacb9

Branch: refs/heads/master
Commit: 0a5cacb9615d960c93bca8cc3f4ad2a599f94ec0
Parents: b835ddf
Author: Tor Myklebust <tm...@gmail.com>
Authored: Fri Dec 20 02:05:15 2013 -0500
Committer: Tor Myklebust <tm...@gmail.com>
Committed: Fri Dec 20 02:05:15 2013 -0500

----------------------------------------------------------------------
 python/pyspark/mllib.py | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a5cacb9/python/pyspark/mllib.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib.py b/python/pyspark/mllib.py
index ce1363f..928caa9 100644
--- a/python/pyspark/mllib.py
+++ b/python/pyspark/mllib.py
@@ -146,7 +146,7 @@ def _linear_predictor_typecheck(x, coeffs):
         raise TypeError("Argument of type " + type(x) + " unsupported");
 
 class LinearModel(object):
-    """Something containing a vector of coefficients and an intercept."""
+    """Something that has a vector of coefficients and an intercept."""
     def __init__(self, coeff, intercept):
         self._coeff = coeff
         self._intercept = intercept
@@ -305,6 +305,7 @@ class KMeansModel(object):
         self.centers = centers_
 
     def predict(self, x):
+        """Find the cluster to which x belongs in this model."""
         best = 0
         best_distance = 1e75
         for i in range(0, self.centers.shape[0]):
@@ -318,6 +319,7 @@ class KMeansModel(object):
     @classmethod
     def train(cls, sc, data, k, maxIterations = 100, runs = 1,
             initialization_mode="k-means||"):
+        """Train a k-means clustering model."""
         dataBytes = _get_unmangled_double_vector_rdd(data)
         ans = sc._jvm.PythonMLLibAPI().trainKMeansModel(dataBytes._jrdd,
                 k, maxIterations, runs, initialization_mode)


[03/28] git commit: Incorporate most of Josh's style suggestions. I don't want to deal with the type and length checking errors until we've got at least one working stub that we're all happy with.

Posted by ma...@apache.org.
Incorporate most of Josh's style suggestions.  I don't want to deal with the type and length checking errors until we've got at least one working stub that we're all happy with.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/bf20591a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/bf20591a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/bf20591a

Branch: refs/heads/master
Commit: bf20591a006b9d2fdd9a674d637f5e929fd065a2
Parents: bf491bb
Author: Tor Myklebust <tm...@gmail.com>
Authored: Thu Dec 19 03:40:57 2013 -0500
Committer: Tor Myklebust <tm...@gmail.com>
Committed: Thu Dec 19 03:40:57 2013 -0500

----------------------------------------------------------------------
 python/pyspark/__init__.py |   4 +-
 python/pyspark/mllib.py    | 185 +++++++++++++++++++---------------------
 2 files changed, 91 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/bf20591a/python/pyspark/__init__.py
----------------------------------------------------------------------
diff --git a/python/pyspark/__init__.py b/python/pyspark/__init__.py
index 949406c..9f71db3 100644
--- a/python/pyspark/__init__.py
+++ b/python/pyspark/__init__.py
@@ -42,7 +42,7 @@ from pyspark.context import SparkContext
 from pyspark.rdd import RDD
 from pyspark.files import SparkFiles
 from pyspark.storagelevel import StorageLevel
-from pyspark.mllib import train_linear_regression_model
+from pyspark.mllib import LinearRegressionModel
 
 
-__all__ = ["SparkContext", "RDD", "SparkFiles", "StorageLevel", "train_linear_regression_model"]
+__all__ = ["SparkContext", "RDD", "SparkFiles", "StorageLevel", "LinearRegressionModel"];

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/bf20591a/python/pyspark/mllib.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib.py b/python/pyspark/mllib.py
index 8237f66..0dfc490 100644
--- a/python/pyspark/mllib.py
+++ b/python/pyspark/mllib.py
@@ -1,8 +1,4 @@
-from numpy import *;
-from pyspark.serializers import NoOpSerializer, FramedSerializer, \
-    BatchedSerializer, CloudPickleSerializer, pack_long
-
-#__all__ = ["train_linear_regression_model"];
+from numpy import *
 
 # Double vector format:
 #
@@ -15,100 +11,97 @@ from pyspark.serializers import NoOpSerializer, FramedSerializer, \
 # This is all in machine-endian.  That means that the Java interpreter and the
 # Python interpreter must agree on what endian the machine is.
 
-def deserialize_byte_array(shape, ba, offset):
-  """Implementation detail.  Do not use directly."""
-  ar = ndarray(shape=shape, buffer=ba, offset=offset, dtype="float64", \
-      order='C');
-  return ar.copy();
-
-def serialize_double_vector(v):
-  """Implementation detail.  Do not use directly."""
-  if (type(v) == ndarray and v.dtype == float64 and v.ndim == 1):
-    length = v.shape[0];
-    ba = bytearray(16 + 8*length);
-    header = ndarray(shape=[2], buffer=ba, dtype="int64");
-    header[0] = 1;
-    header[1] = length;
-    copyto(ndarray(shape=[length], buffer=ba, offset=16, dtype="float64"), v);
-    return ba;
-  else:
-    raise TypeError("serialize_double_vector called on a non-double-vector");
+def _deserialize_byte_array(shape, ba, offset):
+    ar = ndarray(shape=shape, buffer=ba, offset=offset, dtype="float64",
+            order='C')
+    return ar.copy()
 
-def deserialize_double_vector(ba):
-  """Implementation detail.  Do not use directly."""
-  if (type(ba) == bytearray and len(ba) >= 16 and (len(ba) & 7 == 0)):
-    header = ndarray(shape=[2], buffer=ba, dtype="int64");
-    if (header[0] != 1):
-      raise TypeError("deserialize_double_vector called on bytearray with " \
-                      "wrong magic");
-    length = header[1];
-    if (len(ba) != 8*length + 16):
-      raise TypeError("deserialize_double_vector called on bytearray with " \
-                      "wrong length");
-    return deserialize_byte_array([length], ba, 16);
-  else:
-    raise TypeError("deserialize_double_vector called on a non-bytearray");
+def _serialize_double_vector(v):
+    if (type(v) == ndarray and v.dtype == float64 and v.ndim == 1):
+        length = v.shape[0]
+        ba = bytearray(16 + 8*length)
+        header = ndarray(shape=[2], buffer=ba, dtype="int64")
+        header[0] = 1
+        header[1] = length
+        copyto(ndarray(shape=[length], buffer=ba, offset=16,
+                dtype="float64"), v)
+        return ba
+    else:
+        raise TypeError("_serialize_double_vector called on a "
+                        "non-double-vector")
 
-def serialize_double_matrix(m):
-  """Implementation detail.  Do not use directly."""
-  if (type(m) == ndarray and m.dtype == float64 and m.ndim == 2):
-    rows = m.shape[0];
-    cols = m.shape[1];
-    ba = bytearray(24 + 8 * rows * cols);
-    header = ndarray(shape=[3], buffer=ba, dtype="int64");
-    header[0] = 2;
-    header[1] = rows;
-    header[2] = cols;
-    copyto(ndarray(shape=[rows, cols], buffer=ba, offset=24, dtype="float64", \
-        order='C'), m);
-    return ba;
-  else:
-    print type(m);
-    print m.dtype;
-    print m.ndim;
-    raise TypeError("serialize_double_matrix called on a non-double-matrix");
+def _deserialize_double_vector(ba):
+    if (type(ba) == bytearray and len(ba) >= 16 and (len(ba) & 7 == 0)):
+        header = ndarray(shape=[2], buffer=ba, dtype="int64")
+        if (header[0] != 1):
+            raise TypeError("_deserialize_double_vector called on bytearray "
+                            "with wrong magic")
+        length = header[1]
+        if (len(ba) != 8*length + 16):
+            raise TypeError("_deserialize_double_vector called on bytearray "
+                            "with wrong length")
+        return _deserialize_byte_array([length], ba, 16)
+    else:
+        raise TypeError("_deserialize_double_vector called on a non-bytearray")
 
-def deserialize_double_matrix(ba):
-  """Implementation detail.  Do not use directly."""
-  if (type(ba) == bytearray and len(ba) >= 24 and (len(ba) & 7 == 0)):
-    header = ndarray(shape=[3], buffer=ba, dtype="int64");
-    if (header[0] != 2):
-      raise TypeError("deserialize_double_matrix called on bytearray with " \
-                      "wrong magic");
-    rows = header[1];
-    cols = header[2];
-    if (len(ba) != 8*rows*cols + 24):
-      raise TypeError("deserialize_double_matrix called on bytearray with " \
-                      "wrong length");
-    return deserialize_byte_array([rows, cols], ba, 24);
-  else:
-    raise TypeError("deserialize_double_matrix called on a non-bytearray");
+def _serialize_double_matrix(m):
+    if (type(m) == ndarray and m.dtype == float64 and m.ndim == 2):
+        rows = m.shape[0]
+        cols = m.shape[1]
+        ba = bytearray(24 + 8 * rows * cols)
+        header = ndarray(shape=[3], buffer=ba, dtype="int64")
+        header[0] = 2
+        header[1] = rows
+        header[2] = cols
+        copyto(ndarray(shape=[rows, cols], buffer=ba, offset=24,
+                       dtype="float64", order='C'), m)
+        return ba
+    else:
+        raise TypeError("_serialize_double_matrix called on a "
+                        "non-double-matrix")
 
-class LinearRegressionModel:
-  _coeff = None;
-  _intercept = None;
-  def __init__(self, coeff, intercept):
-    self._coeff = coeff;
-    self._intercept = intercept;
-  def predict(self, x):
-    if (type(x) == ndarray):
-      if (x.ndim == 1):
-        return dot(_coeff, x) - _intercept;
-      else:
-        raise RuntimeError("Bulk predict not yet supported.");
-    elif (type(x) == RDD):
-      raise RuntimeError("Bulk predict not yet supported.");
+def _deserialize_double_matrix(ba):
+    if (type(ba) == bytearray and len(ba) >= 24 and (len(ba) & 7 == 0)):
+        header = ndarray(shape=[3], buffer=ba, dtype="int64")
+        if (header[0] != 2):
+            raise TypeError("_deserialize_double_matrix called on bytearray "
+                            "with wrong magic")
+        rows = header[1]
+        cols = header[2]
+        if (len(ba) != 8*rows*cols + 24):
+            raise TypeError("_deserialize_double_matrix called on bytearray "
+                            "with wrong length")
+        return _deserialize_byte_array([rows, cols], ba, 24)
     else:
-      raise TypeError("Bad type argument to LinearRegressionModel::predict");
+        raise TypeError("_deserialize_double_matrix called on a non-bytearray")
+
+class LinearRegressionModel(object):
+    def __init__(self, coeff, intercept):
+        self._coeff = coeff
+        self._intercept = intercept
+
+    def predict(self, x):
+        if (type(x) == ndarray):
+            if (x.ndim == 1):
+                return dot(_coeff, x) - _intercept
+            else:
+                raise RuntimeError("Bulk predict not yet supported.")
+        elif (type(x) == RDD):
+            raise RuntimeError("Bulk predict not yet supported.")
+        else:
+            raise TypeError("Bad type argument to "
+                            "LinearRegressionModel::predict")
 
-def train_linear_regression_model(sc, data):
-  """Train a linear regression model on the given data."""
-  dataBytes = data.map(serialize_double_vector);
-  sc.serializer = NoOpSerializer();
-  dataBytes.cache();
-  api = sc._jvm.PythonMLLibAPI();
-  ans = api.trainLinearRegressionModel(dataBytes._jrdd);
-  if (len(ans) != 2 or type(ans[0]) != bytearray or type(ans[1]) != float):
-    raise RuntimeError("train_linear_regression_model received garbage " \
-                       "from JVM");
-  return LinearRegressionModel(deserialize_double_vector(ans[0]), ans[1]);
+    @classmethod
+    def train(cls, sc, data):
+        """Train a linear regression model on the given data."""
+        dataBytes = data.map(_serialize_double_vector)
+        dataBytes._bypass_serializer = True
+        dataBytes.cache()
+        api = sc._jvm.PythonMLLibAPI()
+        ans = api.trainLinearRegressionModel(dataBytes._jrdd)
+        if (len(ans) != 2 or type(ans[0]) != bytearray
+                or type(ans[1]) != float):
+            raise RuntimeError("train_linear_regression_model received "
+                               "garbage from JVM")
+        return LinearRegressionModel(_deserialize_double_vector(ans[0]), ans[1])


[14/28] git commit: Un-semicolon mllib.py.

Posted by ma...@apache.org.
Un-semicolon mllib.py.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/0b494c21
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/0b494c21
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/0b494c21

Branch: refs/heads/master
Commit: 0b494c21675b6cc3b5d669dbd9b9a8f277216613
Parents: 0a5cacb
Author: Tor Myklebust <tm...@gmail.com>
Authored: Fri Dec 20 02:05:55 2013 -0500
Committer: Tor Myklebust <tm...@gmail.com>
Committed: Fri Dec 20 02:05:55 2013 -0500

----------------------------------------------------------------------
 python/pyspark/mllib.py | 22 +++++++++++-----------
 1 file changed, 11 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0b494c21/python/pyspark/mllib.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib.py b/python/pyspark/mllib.py
index 928caa9..8848284 100644
--- a/python/pyspark/mllib.py
+++ b/python/pyspark/mllib.py
@@ -143,7 +143,7 @@ def _linear_predictor_typecheck(x, coeffs):
     elif (type(x) == RDD):
         raise RuntimeError("Bulk predict not yet supported.")
     else:
-        raise TypeError("Argument of type " + type(x) + " unsupported");
+        raise TypeError("Argument of type " + type(x) + " unsupported")
 
 class LinearModel(object):
     """Something that has a vector of coefficients and an intercept."""
@@ -170,7 +170,7 @@ def _get_unmangled_double_vector_rdd(data):
     dataBytes = data.map(_serialize_double_vector)
     dataBytes._bypass_serializer = True
     dataBytes.cache()
-    return dataBytes;
+    return dataBytes
 
 # If we weren't given initial weights, take a zero vector of the appropriate
 # length.
@@ -183,8 +183,8 @@ def _get_initial_weights(initial_weights, data):
         if initial_weights.ndim != 1:
             raise TypeError("At least one data element has "
                     + initial_weights.ndim + " dimensions, which is not 1")
-        initial_weights = zeros([initial_weights.shape[0] - 1]);
-    return initial_weights;
+        initial_weights = zeros([initial_weights.shape[0] - 1])
+    return initial_weights
 
 # train_func should take two parameters, namely data and initial_weights, and
 # return the result of a call to the appropriate JVM stub.
@@ -194,14 +194,14 @@ def _regression_train_wrapper(sc, train_func, klass, data, initial_weights):
     dataBytes = _get_unmangled_double_vector_rdd(data)
     ans = train_func(dataBytes, _serialize_double_vector(initial_weights))
     if len(ans) != 2:
-        raise RuntimeError("JVM call result had unexpected length");
+        raise RuntimeError("JVM call result had unexpected length")
     elif type(ans[0]) != bytearray:
         raise RuntimeError("JVM call result had first element of type "
-                + type(ans[0]) + " which is not bytearray");
+                + type(ans[0]) + " which is not bytearray")
     elif type(ans[1]) != float:
         raise RuntimeError("JVM call result had second element of type "
-                + type(ans[0]) + " which is not float");
-    return klass(_deserialize_double_vector(ans[0]), ans[1]);
+                + type(ans[0]) + " which is not float")
+    return klass(_deserialize_double_vector(ans[0]), ans[1])
 
 class LinearRegressionModel(LinearRegressionModelBase):
     """A linear regression model derived from a least-squares fit.
@@ -324,11 +324,11 @@ class KMeansModel(object):
         ans = sc._jvm.PythonMLLibAPI().trainKMeansModel(dataBytes._jrdd,
                 k, maxIterations, runs, initialization_mode)
         if len(ans) != 1:
-            raise RuntimeError("JVM call result had unexpected length");
+            raise RuntimeError("JVM call result had unexpected length")
         elif type(ans[0]) != bytearray:
             raise RuntimeError("JVM call result had first element of type "
-                    + type(ans[0]) + " which is not bytearray");
-        return KMeansModel(_deserialize_double_matrix(ans[0]));
+                    + type(ans[0]) + " which is not bytearray")
+        return KMeansModel(_deserialize_double_matrix(ans[0]))
 
 def _test():
     import doctest


[04/28] git commit: Un-semicolon PythonMLLibAPI.

Posted by ma...@apache.org.
Un-semicolon PythonMLLibAPI.


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/2a41c9aa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/2a41c9aa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/2a41c9aa

Branch: refs/heads/master
Commit: 2a41c9aad3d0a8477a11bf910fa57b49ea4dc6dc
Parents: bf20591
Author: Tor Myklebust <tm...@gmail.com>
Authored: Thu Dec 19 21:27:11 2013 -0500
Committer: Tor Myklebust <tm...@gmail.com>
Committed: Thu Dec 19 21:27:11 2013 -0500

----------------------------------------------------------------------
 .../apache/spark/mllib/api/PythonMLLibAPI.scala | 54 ++++++++++----------
 1 file changed, 27 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2a41c9aa/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala
index 19d2e9a..3daf5dc 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala
@@ -6,46 +6,46 @@ import java.nio.DoubleBuffer
 
 class PythonMLLibAPI extends Serializable {
   def deserializeDoubleVector(bytes: Array[Byte]): Array[Double] = {
-    val packetLength = bytes.length;
+    val packetLength = bytes.length
     if (packetLength < 16) {
-      throw new IllegalArgumentException("Byte array too short.");
+      throw new IllegalArgumentException("Byte array too short.")
     }
-    val bb = ByteBuffer.wrap(bytes);
-    bb.order(ByteOrder.nativeOrder());
-    val magic = bb.getLong();
+    val bb = ByteBuffer.wrap(bytes)
+    bb.order(ByteOrder.nativeOrder())
+    val magic = bb.getLong()
     if (magic != 1) {
-      throw new IllegalArgumentException("Magic " + magic + " is wrong.");
+      throw new IllegalArgumentException("Magic " + magic + " is wrong.")
     }
-    val length = bb.getLong();
+    val length = bb.getLong()
     if (packetLength != 16 + 8 * length) {
-      throw new IllegalArgumentException("Length " + length + "is wrong.");
+      throw new IllegalArgumentException("Length " + length + "is wrong.")
     }
-    val db = bb.asDoubleBuffer();
-    val ans = new Array[Double](length.toInt);
-    db.get(ans);
-    return ans;
+    val db = bb.asDoubleBuffer()
+    val ans = new Array[Double](length.toInt)
+    db.get(ans)
+    return ans
   }
 
   def serializeDoubleVector(doubles: Array[Double]): Array[Byte] = {
-    val len = doubles.length;
-    val bytes = new Array[Byte](16 + 8 * len);
-    val bb = ByteBuffer.wrap(bytes);
-    bb.order(ByteOrder.nativeOrder());
-    bb.putLong(1);
-    bb.putLong(len);
-    val db = bb.asDoubleBuffer();
-    db.put(doubles);
-    return bytes;
+    val len = doubles.length
+    val bytes = new Array[Byte](16 + 8 * len)
+    val bb = ByteBuffer.wrap(bytes)
+    bb.order(ByteOrder.nativeOrder())
+    bb.putLong(1)
+    bb.putLong(len)
+    val db = bb.asDoubleBuffer()
+    db.put(doubles)
+    return bytes
   }
 
   def trainLinearRegressionModel(dataBytesJRDD: JavaRDD[Array[Byte]]):
       java.util.List[java.lang.Object] = {
     val data = dataBytesJRDD.rdd.map(x => deserializeDoubleVector(x))
-        .map(v => LabeledPoint(v(0), v.slice(1, v.length)));
-    val model = LinearRegressionWithSGD.train(data, 222);
-    val ret = new java.util.LinkedList[java.lang.Object]();
-    ret.add(serializeDoubleVector(model.weights));
-    ret.add(model.intercept: java.lang.Double);
-    return ret;
+        .map(v => LabeledPoint(v(0), v.slice(1, v.length)))
+    val model = LinearRegressionWithSGD.train(data, 222)
+    val ret = new java.util.LinkedList[java.lang.Object]()
+    ret.add(serializeDoubleVector(model.weights))
+    ret.add(model.intercept: java.lang.Double)
+    return ret
   }
 }