You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2013/12/26 07:31:18 UTC
[01/28] git commit: First cut at python mllib bindings. Only
LinearRegression is supported.
Updated Branches:
refs/heads/master 56094bcd8 -> c344ed04c
First cut at python mllib bindings. Only LinearRegression is supported.
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/95915f8b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/95915f8b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/95915f8b
Branch: refs/heads/master
Commit: 95915f8b3b6d07a9dddb09a637aa23c8622bff9b
Parents: d3b1af4
Author: Tor Myklebust <tm...@gmail.com>
Authored: Thu Dec 19 01:22:18 2013 -0500
Committer: Tor Myklebust <tm...@gmail.com>
Committed: Thu Dec 19 01:29:09 2013 -0500
----------------------------------------------------------------------
.../apache/spark/mllib/api/PythonMLLibAPI.scala | 51 +++++++++
python/pyspark/mllib.py | 114 +++++++++++++++++++
2 files changed, 165 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/95915f8b/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala
new file mode 100644
index 0000000..19d2e9a
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala
@@ -0,0 +1,51 @@
+import org.apache.spark.api.java.JavaRDD
+import org.apache.spark.mllib.regression._
+import java.nio.ByteBuffer
+import java.nio.ByteOrder
+import java.nio.DoubleBuffer
+
+class PythonMLLibAPI extends Serializable {
+ def deserializeDoubleVector(bytes: Array[Byte]): Array[Double] = {
+ val packetLength = bytes.length;
+ if (packetLength < 16) {
+ throw new IllegalArgumentException("Byte array too short.");
+ }
+ val bb = ByteBuffer.wrap(bytes);
+ bb.order(ByteOrder.nativeOrder());
+ val magic = bb.getLong();
+ if (magic != 1) {
+ throw new IllegalArgumentException("Magic " + magic + " is wrong.");
+ }
+ val length = bb.getLong();
+ if (packetLength != 16 + 8 * length) {
+ throw new IllegalArgumentException("Length " + length + "is wrong.");
+ }
+ val db = bb.asDoubleBuffer();
+ val ans = new Array[Double](length.toInt);
+ db.get(ans);
+ return ans;
+ }
+
+ def serializeDoubleVector(doubles: Array[Double]): Array[Byte] = {
+ val len = doubles.length;
+ val bytes = new Array[Byte](16 + 8 * len);
+ val bb = ByteBuffer.wrap(bytes);
+ bb.order(ByteOrder.nativeOrder());
+ bb.putLong(1);
+ bb.putLong(len);
+ val db = bb.asDoubleBuffer();
+ db.put(doubles);
+ return bytes;
+ }
+
+ def trainLinearRegressionModel(dataBytesJRDD: JavaRDD[Array[Byte]]):
+ java.util.List[java.lang.Object] = {
+ val data = dataBytesJRDD.rdd.map(x => deserializeDoubleVector(x))
+ .map(v => LabeledPoint(v(0), v.slice(1, v.length)));
+ val model = LinearRegressionWithSGD.train(data, 222);
+ val ret = new java.util.LinkedList[java.lang.Object]();
+ ret.add(serializeDoubleVector(model.weights));
+ ret.add(model.intercept: java.lang.Double);
+ return ret;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/95915f8b/python/pyspark/mllib.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib.py b/python/pyspark/mllib.py
new file mode 100644
index 0000000..8237f66
--- /dev/null
+++ b/python/pyspark/mllib.py
@@ -0,0 +1,114 @@
+from numpy import *;
+from pyspark.serializers import NoOpSerializer, FramedSerializer, \
+ BatchedSerializer, CloudPickleSerializer, pack_long
+
+#__all__ = ["train_linear_regression_model"];
+
+# Double vector format:
+#
+# [8-byte 1] [8-byte length] [length*8 bytes of data]
+#
+# Double matrix format:
+#
+# [8-byte 2] [8-byte rows] [8-byte cols] [rows*cols*8 bytes of data]
+#
+# This is all in machine-endian. That means that the Java interpreter and the
+# Python interpreter must agree on what endian the machine is.
+
+def deserialize_byte_array(shape, ba, offset):
+ """Implementation detail. Do not use directly."""
+ ar = ndarray(shape=shape, buffer=ba, offset=offset, dtype="float64", \
+ order='C');
+ return ar.copy();
+
+def serialize_double_vector(v):
+ """Implementation detail. Do not use directly."""
+ if (type(v) == ndarray and v.dtype == float64 and v.ndim == 1):
+ length = v.shape[0];
+ ba = bytearray(16 + 8*length);
+ header = ndarray(shape=[2], buffer=ba, dtype="int64");
+ header[0] = 1;
+ header[1] = length;
+ copyto(ndarray(shape=[length], buffer=ba, offset=16, dtype="float64"), v);
+ return ba;
+ else:
+ raise TypeError("serialize_double_vector called on a non-double-vector");
+
+def deserialize_double_vector(ba):
+ """Implementation detail. Do not use directly."""
+ if (type(ba) == bytearray and len(ba) >= 16 and (len(ba) & 7 == 0)):
+ header = ndarray(shape=[2], buffer=ba, dtype="int64");
+ if (header[0] != 1):
+ raise TypeError("deserialize_double_vector called on bytearray with " \
+ "wrong magic");
+ length = header[1];
+ if (len(ba) != 8*length + 16):
+ raise TypeError("deserialize_double_vector called on bytearray with " \
+ "wrong length");
+ return deserialize_byte_array([length], ba, 16);
+ else:
+ raise TypeError("deserialize_double_vector called on a non-bytearray");
+
+def serialize_double_matrix(m):
+ """Implementation detail. Do not use directly."""
+ if (type(m) == ndarray and m.dtype == float64 and m.ndim == 2):
+ rows = m.shape[0];
+ cols = m.shape[1];
+ ba = bytearray(24 + 8 * rows * cols);
+ header = ndarray(shape=[3], buffer=ba, dtype="int64");
+ header[0] = 2;
+ header[1] = rows;
+ header[2] = cols;
+ copyto(ndarray(shape=[rows, cols], buffer=ba, offset=24, dtype="float64", \
+ order='C'), m);
+ return ba;
+ else:
+ print type(m);
+ print m.dtype;
+ print m.ndim;
+ raise TypeError("serialize_double_matrix called on a non-double-matrix");
+
+def deserialize_double_matrix(ba):
+ """Implementation detail. Do not use directly."""
+ if (type(ba) == bytearray and len(ba) >= 24 and (len(ba) & 7 == 0)):
+ header = ndarray(shape=[3], buffer=ba, dtype="int64");
+ if (header[0] != 2):
+ raise TypeError("deserialize_double_matrix called on bytearray with " \
+ "wrong magic");
+ rows = header[1];
+ cols = header[2];
+ if (len(ba) != 8*rows*cols + 24):
+ raise TypeError("deserialize_double_matrix called on bytearray with " \
+ "wrong length");
+ return deserialize_byte_array([rows, cols], ba, 24);
+ else:
+ raise TypeError("deserialize_double_matrix called on a non-bytearray");
+
+class LinearRegressionModel:
+ _coeff = None;
+ _intercept = None;
+ def __init__(self, coeff, intercept):
+ self._coeff = coeff;
+ self._intercept = intercept;
+ def predict(self, x):
+ if (type(x) == ndarray):
+ if (x.ndim == 1):
+ return dot(_coeff, x) - _intercept;
+ else:
+ raise RuntimeError("Bulk predict not yet supported.");
+ elif (type(x) == RDD):
+ raise RuntimeError("Bulk predict not yet supported.");
+ else:
+ raise TypeError("Bad type argument to LinearRegressionModel::predict");
+
+def train_linear_regression_model(sc, data):
+ """Train a linear regression model on the given data."""
+ dataBytes = data.map(serialize_double_vector);
+ sc.serializer = NoOpSerializer();
+ dataBytes.cache();
+ api = sc._jvm.PythonMLLibAPI();
+ ans = api.trainLinearRegressionModel(dataBytes._jrdd);
+ if (len(ans) != 2 or type(ans[0]) != bytearray or type(ans[1]) != float):
+ raise RuntimeError("train_linear_regression_model received garbage " \
+ "from JVM");
+ return LinearRegressionModel(deserialize_double_vector(ans[0]), ans[1]);
[23/28] git commit: Split the mllib bindings into a whole bunch of
modules and rename some things.
Posted by ma...@apache.org.
Split the mllib bindings into a whole bunch of modules and rename some things.
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/05163057
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/05163057
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/05163057
Branch: refs/heads/master
Commit: 05163057a1810f0a32b722e8c93e5435240636d9
Parents: 86e38c4
Author: Tor Myklebust <tm...@gmail.com>
Authored: Wed Dec 25 00:08:05 2013 -0500
Committer: Tor Myklebust <tm...@gmail.com>
Committed: Wed Dec 25 00:08:05 2013 -0500
----------------------------------------------------------------------
python/pyspark/__init__.py | 7 +-
python/pyspark/mllib.py | 391 ----------------------------
python/pyspark/mllib/__init__.py | 46 ++++
python/pyspark/mllib/_common.py | 227 ++++++++++++++++
python/pyspark/mllib/classification.py | 86 ++++++
python/pyspark/mllib/clustering.py | 79 ++++++
python/pyspark/mllib/recommendation.py | 74 ++++++
python/pyspark/mllib/regression.py | 110 ++++++++
8 files changed, 623 insertions(+), 397 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/05163057/python/pyspark/__init__.py
----------------------------------------------------------------------
diff --git a/python/pyspark/__init__.py b/python/pyspark/__init__.py
index 3d73d95..1f35f6f 100644
--- a/python/pyspark/__init__.py
+++ b/python/pyspark/__init__.py
@@ -42,11 +42,6 @@ from pyspark.context import SparkContext
from pyspark.rdd import RDD
from pyspark.files import SparkFiles
from pyspark.storagelevel import StorageLevel
-from pyspark.mllib import LinearRegressionModel, LassoModel, \
- RidgeRegressionModel, LogisticRegressionModel, SVMModel, KMeansModel, \
- ALSModel
-__all__ = ["SparkContext", "RDD", "SparkFiles", "StorageLevel",
- "LinearRegressionModel", "LassoModel", "RidgeRegressionModel",
- "LogisticRegressionModel", "SVMModel", "KMeansModel", "ALSModel"];
+__all__ = ["SparkContext", "RDD", "SparkFiles", "StorageLevel"]
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/05163057/python/pyspark/mllib.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib.py b/python/pyspark/mllib.py
deleted file mode 100644
index 46f368b..0000000
--- a/python/pyspark/mllib.py
+++ /dev/null
@@ -1,391 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-from numpy import *
-from pyspark import SparkContext
-
-# Double vector format:
-#
-# [8-byte 1] [8-byte length] [length*8 bytes of data]
-#
-# Double matrix format:
-#
-# [8-byte 2] [8-byte rows] [8-byte cols] [rows*cols*8 bytes of data]
-#
-# This is all in machine-endian. That means that the Java interpreter and the
-# Python interpreter must agree on what endian the machine is.
-
-def _deserialize_byte_array(shape, ba, offset):
- """Wrapper around ndarray aliasing hack.
-
- >>> x = array([1.0, 2.0, 3.0, 4.0, 5.0])
- >>> array_equal(x, _deserialize_byte_array(x.shape, x.data, 0))
- True
- >>> x = array([1.0, 2.0, 3.0, 4.0]).reshape(2,2)
- >>> array_equal(x, _deserialize_byte_array(x.shape, x.data, 0))
- True
- """
- ar = ndarray(shape=shape, buffer=ba, offset=offset, dtype="float64",
- order='C')
- return ar.copy()
-
-def _serialize_double_vector(v):
- """Serialize a double vector into a mutually understood format."""
- if type(v) != ndarray:
- raise TypeError("_serialize_double_vector called on a %s; "
- "wanted ndarray" % type(v))
- if v.dtype != float64:
- raise TypeError("_serialize_double_vector called on an ndarray of %s; "
- "wanted ndarray of float64" % v.dtype)
- if v.ndim != 1:
- raise TypeError("_serialize_double_vector called on a %ddarray; "
- "wanted a 1darray" % v.ndim)
- length = v.shape[0]
- ba = bytearray(16 + 8*length)
- header = ndarray(shape=[2], buffer=ba, dtype="int64")
- header[0] = 1
- header[1] = length
- copyto(ndarray(shape=[length], buffer=ba, offset=16,
- dtype="float64"), v)
- return ba
-
-def _deserialize_double_vector(ba):
- """Deserialize a double vector from a mutually understood format.
-
- >>> x = array([1.0, 2.0, 3.0, 4.0, -1.0, 0.0, -0.0])
- >>> array_equal(x, _deserialize_double_vector(_serialize_double_vector(x)))
- True
- """
- if type(ba) != bytearray:
- raise TypeError("_deserialize_double_vector called on a %s; "
- "wanted bytearray" % type(ba))
- if len(ba) < 16:
- raise TypeError("_deserialize_double_vector called on a %d-byte array, "
- "which is too short" % len(ba))
- if (len(ba) & 7) != 0:
- raise TypeError("_deserialize_double_vector called on a %d-byte array, "
- "which is not a multiple of 8" % len(ba))
- header = ndarray(shape=[2], buffer=ba, dtype="int64")
- if header[0] != 1:
- raise TypeError("_deserialize_double_vector called on bytearray "
- "with wrong magic")
- length = header[1]
- if len(ba) != 8*length + 16:
- raise TypeError("_deserialize_double_vector called on bytearray "
- "with wrong length")
- return _deserialize_byte_array([length], ba, 16)
-
-def _serialize_double_matrix(m):
- """Serialize a double matrix into a mutually understood format."""
- if (type(m) == ndarray and m.dtype == float64 and m.ndim == 2):
- rows = m.shape[0]
- cols = m.shape[1]
- ba = bytearray(24 + 8 * rows * cols)
- header = ndarray(shape=[3], buffer=ba, dtype="int64")
- header[0] = 2
- header[1] = rows
- header[2] = cols
- copyto(ndarray(shape=[rows, cols], buffer=ba, offset=24,
- dtype="float64", order='C'), m)
- return ba
- else:
- raise TypeError("_serialize_double_matrix called on a "
- "non-double-matrix")
-
-def _deserialize_double_matrix(ba):
- """Deserialize a double matrix from a mutually understood format."""
- if type(ba) != bytearray:
- raise TypeError("_deserialize_double_matrix called on a %s; "
- "wanted bytearray" % type(ba))
- if len(ba) < 24:
- raise TypeError("_deserialize_double_matrix called on a %d-byte array, "
- "which is too short" % len(ba))
- if (len(ba) & 7) != 0:
- raise TypeError("_deserialize_double_matrix called on a %d-byte array, "
- "which is not a multiple of 8" % len(ba))
- header = ndarray(shape=[3], buffer=ba, dtype="int64")
- if (header[0] != 2):
- raise TypeError("_deserialize_double_matrix called on bytearray "
- "with wrong magic")
- rows = header[1]
- cols = header[2]
- if (len(ba) != 8*rows*cols + 24):
- raise TypeError("_deserialize_double_matrix called on bytearray "
- "with wrong length")
- return _deserialize_byte_array([rows, cols], ba, 24)
-
-def _linear_predictor_typecheck(x, coeffs):
- """Check that x is a one-dimensional vector of the right shape.
- This is a temporary hackaround until I actually implement bulk predict."""
- if type(x) == ndarray:
- if x.ndim == 1:
- if x.shape == coeffs.shape:
- pass
- else:
- raise RuntimeError("Got array of %d elements; wanted %d"
- % shape(x)[0] % shape(coeffs)[0])
- else:
- raise RuntimeError("Bulk predict not yet supported.")
- elif (type(x) == RDD):
- raise RuntimeError("Bulk predict not yet supported.")
- else:
- raise TypeError("Argument of type " + type(x) + " unsupported")
-
-class LinearModel(object):
- """Something that has a vector of coefficients and an intercept."""
- def __init__(self, coeff, intercept):
- self._coeff = coeff
- self._intercept = intercept
-
-class LinearRegressionModelBase(LinearModel):
- """A linear regression model.
-
- >>> lrmb = LinearRegressionModelBase(array([1.0, 2.0]), 0.1)
- >>> abs(lrmb.predict(array([-1.03, 7.777])) - 14.624) < 1e-6
- True
- """
- def predict(self, x):
- """Predict the value of the dependent variable given a vector x"""
- """containing values for the independent variables."""
- _linear_predictor_typecheck(x, self._coeff)
- return dot(self._coeff, x) + self._intercept
-
-def _get_unmangled_rdd(data, serializer):
- dataBytes = data.map(serializer)
- dataBytes._bypass_serializer = True
- dataBytes.cache()
- return dataBytes
-
-# Map a pickled Python RDD of numpy double vectors to a Java RDD of
-# _serialized_double_vectors
-def _get_unmangled_double_vector_rdd(data):
- return _get_unmangled_rdd(data, _serialize_double_vector)
-
-# If we weren't given initial weights, take a zero vector of the appropriate
-# length.
-def _get_initial_weights(initial_weights, data):
- if initial_weights is None:
- initial_weights = data.first()
- if type(initial_weights) != ndarray:
- raise TypeError("At least one data element has type "
- + type(initial_weights) + " which is not ndarray")
- if initial_weights.ndim != 1:
- raise TypeError("At least one data element has "
- + initial_weights.ndim + " dimensions, which is not 1")
- initial_weights = zeros([initial_weights.shape[0] - 1])
- return initial_weights
-
-# train_func should take two parameters, namely data and initial_weights, and
-# return the result of a call to the appropriate JVM stub.
-# _regression_train_wrapper is responsible for setup and error checking.
-def _regression_train_wrapper(sc, train_func, klass, data, initial_weights):
- initial_weights = _get_initial_weights(initial_weights, data)
- dataBytes = _get_unmangled_double_vector_rdd(data)
- ans = train_func(dataBytes, _serialize_double_vector(initial_weights))
- if len(ans) != 2:
- raise RuntimeError("JVM call result had unexpected length")
- elif type(ans[0]) != bytearray:
- raise RuntimeError("JVM call result had first element of type "
- + type(ans[0]) + " which is not bytearray")
- elif type(ans[1]) != float:
- raise RuntimeError("JVM call result had second element of type "
- + type(ans[0]) + " which is not float")
- return klass(_deserialize_double_vector(ans[0]), ans[1])
-
-class LinearRegressionModel(LinearRegressionModelBase):
- """A linear regression model derived from a least-squares fit.
-
- >>> data = array([0.0, 0.0, 1.0, 1.0, 3.0, 2.0, 2.0, 3.0]).reshape(4,2)
- >>> lrm = LinearRegressionModel.train(sc, sc.parallelize(data), initial_weights=array([1.0]))
- """
- @classmethod
- def train(cls, sc, data, iterations=100, step=1.0,
- mini_batch_fraction=1.0, initial_weights=None):
- """Train a linear regression model on the given data."""
- return _regression_train_wrapper(sc, lambda d, i:
- sc._jvm.PythonMLLibAPI().trainLinearRegressionModel(
- d._jrdd, iterations, step, mini_batch_fraction, i),
- LinearRegressionModel, data, initial_weights)
-
-class LassoModel(LinearRegressionModelBase):
- """A linear regression model derived from a least-squares fit with an
- l_1 penalty term.
-
- >>> data = array([0.0, 0.0, 1.0, 1.0, 3.0, 2.0, 2.0, 3.0]).reshape(4,2)
- >>> lrm = LassoModel.train(sc, sc.parallelize(data), initial_weights=array([1.0]))
- """
- @classmethod
- def train(cls, sc, data, iterations=100, step=1.0, reg_param=1.0,
- mini_batch_fraction=1.0, initial_weights=None):
- """Train a Lasso regression model on the given data."""
- return _regression_train_wrapper(sc, lambda d, i:
- sc._jvm.PythonMLLibAPI().trainLassoModel(d._jrdd,
- iterations, step, reg_param, mini_batch_fraction, i),
- LassoModel, data, initial_weights)
-
-class RidgeRegressionModel(LinearRegressionModelBase):
- """A linear regression model derived from a least-squares fit with an
- l_2 penalty term.
-
- >>> data = array([0.0, 0.0, 1.0, 1.0, 3.0, 2.0, 2.0, 3.0]).reshape(4,2)
- >>> lrm = RidgeRegressionModel.train(sc, sc.parallelize(data), initial_weights=array([1.0]))
- """
- @classmethod
- def train(cls, sc, data, iterations=100, step=1.0, reg_param=1.0,
- mini_batch_fraction=1.0, initial_weights=None):
- """Train a ridge regression model on the given data."""
- return _regression_train_wrapper(sc, lambda d, i:
- sc._jvm.PythonMLLibAPI().trainRidgeModel(d._jrdd,
- iterations, step, reg_param, mini_batch_fraction, i),
- RidgeRegressionModel, data, initial_weights)
-
-class LogisticRegressionModel(LinearModel):
- """A linear binary classification model derived from logistic regression.
-
- >>> data = array([0.0, 0.0, 1.0, 1.0, 1.0, 2.0, 1.0, 3.0]).reshape(4,2)
- >>> lrm = LogisticRegressionModel.train(sc, sc.parallelize(data))
- """
- def predict(self, x):
- _linear_predictor_typecheck(x, _coeff)
- margin = dot(x, _coeff) + intercept
- prob = 1/(1 + exp(-margin))
- return 1 if prob > 0.5 else 0
-
- @classmethod
- def train(cls, sc, data, iterations=100, step=1.0,
- mini_batch_fraction=1.0, initial_weights=None):
- """Train a logistic regression model on the given data."""
- return _regression_train_wrapper(sc, lambda d, i:
- sc._jvm.PythonMLLibAPI().trainLogisticRegressionModel(d._jrdd,
- iterations, step, mini_batch_fraction, i),
- LogisticRegressionModel, data, initial_weights)
-
-class SVMModel(LinearModel):
- """A support vector machine.
-
- >>> data = array([0.0, 0.0, 1.0, 1.0, 1.0, 2.0, 1.0, 3.0]).reshape(4,2)
- >>> svm = SVMModel.train(sc, sc.parallelize(data))
- """
- def predict(self, x):
- _linear_predictor_typecheck(x, _coeff)
- margin = dot(x, _coeff) + intercept
- return 1 if margin >= 0 else 0
- @classmethod
- def train(cls, sc, data, iterations=100, step=1.0, reg_param=1.0,
- mini_batch_fraction=1.0, initial_weights=None):
- """Train a support vector machine on the given data."""
- return _regression_train_wrapper(sc, lambda d, i:
- sc._jvm.PythonMLLibAPI().trainSVMModel(d._jrdd,
- iterations, step, reg_param, mini_batch_fraction, i),
- SVMModel, data, initial_weights)
-
-class KMeansModel(object):
- """A clustering model derived from the k-means method.
-
- >>> data = array([0.0,0.0, 1.0,1.0, 9.0,8.0, 8.0,9.0]).reshape(4,2)
- >>> clusters = KMeansModel.train(sc, sc.parallelize(data), 2, maxIterations=10, runs=30, initialization_mode="random")
- >>> clusters.predict(array([0.0, 0.0])) == clusters.predict(array([1.0, 1.0]))
- True
- >>> clusters.predict(array([8.0, 9.0])) == clusters.predict(array([9.0, 8.0]))
- True
- >>> clusters = KMeansModel.train(sc, sc.parallelize(data), 2)
- """
- def __init__(self, centers_):
- self.centers = centers_
-
- def predict(self, x):
- """Find the cluster to which x belongs in this model."""
- best = 0
- best_distance = 1e75
- for i in range(0, self.centers.shape[0]):
- diff = x - self.centers[i]
- distance = sqrt(dot(diff, diff))
- if distance < best_distance:
- best = i
- best_distance = distance
- return best
-
- @classmethod
- def train(cls, sc, data, k, maxIterations=100, runs=1,
- initialization_mode="k-means||"):
- """Train a k-means clustering model."""
- dataBytes = _get_unmangled_double_vector_rdd(data)
- ans = sc._jvm.PythonMLLibAPI().trainKMeansModel(dataBytes._jrdd,
- k, maxIterations, runs, initialization_mode)
- if len(ans) != 1:
- raise RuntimeError("JVM call result had unexpected length")
- elif type(ans[0]) != bytearray:
- raise RuntimeError("JVM call result had first element of type "
- + type(ans[0]) + " which is not bytearray")
- return KMeansModel(_deserialize_double_matrix(ans[0]))
-
-def _serialize_rating(r):
- ba = bytearray(16)
- intpart = ndarray(shape=[2], buffer=ba, dtype=int32)
- doublepart = ndarray(shape=[1], buffer=ba, dtype=float64, offset=8)
- intpart[0], intpart[1], doublepart[0] = r
- return ba
-
-class ALSModel(object):
- """A matrix factorisation model trained by regularized alternating
- least-squares.
-
- >>> r1 = (1, 1, 1.0)
- >>> r2 = (1, 2, 2.0)
- >>> r3 = (2, 1, 2.0)
- >>> ratings = sc.parallelize([r1, r2, r3])
- >>> model = ALSModel.trainImplicit(sc, ratings, 1)
- >>> model.predict(2,2) is not None
- True
- """
-
- def __init__(self, sc, java_model):
- self._context = sc
- self._java_model = java_model
-
- def __del__(self):
- self._context._gateway.detach(self._java_model)
-
- def predict(self, user, product):
- return self._java_model.predict(user, product)
-
- @classmethod
- def train(cls, sc, ratings, rank, iterations=5, lambda_=0.01, blocks=-1):
- ratingBytes = _get_unmangled_rdd(ratings, _serialize_rating)
- mod = sc._jvm.PythonMLLibAPI().trainALSModel(ratingBytes._jrdd,
- rank, iterations, lambda_, blocks)
- return ALSModel(sc, mod)
-
- @classmethod
- def trainImplicit(cls, sc, ratings, rank, iterations=5, lambda_=0.01, blocks=-1, alpha=0.01):
- ratingBytes = _get_unmangled_rdd(ratings, _serialize_rating)
- mod = sc._jvm.PythonMLLibAPI().trainImplicitALSModel(ratingBytes._jrdd,
- rank, iterations, lambda_, blocks, alpha)
- return ALSModel(sc, mod)
-
-def _test():
- import doctest
- globs = globals().copy()
- globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
- (failure_count, test_count) = doctest.testmod(globs=globs,
- optionflags=doctest.ELLIPSIS)
- globs['sc'].stop()
- if failure_count:
- exit(-1)
-
-if __name__ == "__main__":
- _test()
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/05163057/python/pyspark/mllib/__init__.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/__init__.py b/python/pyspark/mllib/__init__.py
new file mode 100644
index 0000000..6037a3a
--- /dev/null
+++ b/python/pyspark/mllib/__init__.py
@@ -0,0 +1,46 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+PySpark is the Python API for Spark.
+
+Public classes:
+
+ - L{SparkContext<pyspark.context.SparkContext>}
+ Main entry point for Spark functionality.
+ - L{RDD<pyspark.rdd.RDD>}
+ A Resilient Distributed Dataset (RDD), the basic abstraction in Spark.
+ - L{Broadcast<pyspark.broadcast.Broadcast>}
+ A broadcast variable that gets reused across tasks.
+ - L{Accumulator<pyspark.accumulators.Accumulator>}
+ An "add-only" shared variable that tasks can only add values to.
+ - L{SparkFiles<pyspark.files.SparkFiles>}
+ Access files shipped with jobs.
+ - L{StorageLevel<pyspark.storagelevel.StorageLevel>}
+ Finer-grained cache persistence levels.
+"""
+import sys
+import os
+sys.path.insert(0, os.path.join(os.environ["SPARK_HOME"], "python/lib/py4j0.7.egg"))
+
+from pyspark.mllib.regression import LinearRegressionModel, LassoModel, RidgeRegressionModel, LinearRegressionWithSGD, LassoWithSGD, RidgeRegressionWithSGD
+from pyspark.mllib.classification import LogisticRegressionModel, SVMModel, LogisticRegressionWithSGD, SVMWithSGD
+from pyspark.mllib.recommendation import MatrixFactorizationModel, ALS
+from pyspark.mllib.clustering import KMeansModel, KMeans
+
+
+__all__ = ["LinearRegressionModel", "LassoModel", "RidgeRegressionModel", "LinearRegressionWithSGD", "LassoWithSGD", "RidgeRegressionWithSGD", "LogisticRegressionModel", "SVMModel", "LogisticRegressionWithSGD", "SVMWithSGD", "MatrixFactorizationModel", "ALS", "KMeansModel", "KMeans"]
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/05163057/python/pyspark/mllib/_common.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/_common.py b/python/pyspark/mllib/_common.py
new file mode 100644
index 0000000..e68bd8a
--- /dev/null
+++ b/python/pyspark/mllib/_common.py
@@ -0,0 +1,227 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from numpy import ndarray, copyto, float64, int64, int32, zeros, array_equal, array, dot, shape
+from pyspark import SparkContext
+
+# Double vector format:
+#
+# [8-byte 1] [8-byte length] [length*8 bytes of data]
+#
+# Double matrix format:
+#
+# [8-byte 2] [8-byte rows] [8-byte cols] [rows*cols*8 bytes of data]
+#
+# This is all in machine-endian. That means that the Java interpreter and the
+# Python interpreter must agree on what endian the machine is.
+
+def _deserialize_byte_array(shape, ba, offset):
+ """Wrapper around ndarray aliasing hack.
+
+ >>> x = array([1.0, 2.0, 3.0, 4.0, 5.0])
+ >>> array_equal(x, _deserialize_byte_array(x.shape, x.data, 0))
+ True
+ >>> x = array([1.0, 2.0, 3.0, 4.0]).reshape(2,2)
+ >>> array_equal(x, _deserialize_byte_array(x.shape, x.data, 0))
+ True
+ """
+ ar = ndarray(shape=shape, buffer=ba, offset=offset, dtype="float64",
+ order='C')
+ return ar.copy()
+
+def _serialize_double_vector(v):
+ """Serialize a double vector into a mutually understood format."""
+ if type(v) != ndarray:
+ raise TypeError("_serialize_double_vector called on a %s; "
+ "wanted ndarray" % type(v))
+ if v.dtype != float64:
+ raise TypeError("_serialize_double_vector called on an ndarray of %s; "
+ "wanted ndarray of float64" % v.dtype)
+ if v.ndim != 1:
+ raise TypeError("_serialize_double_vector called on a %ddarray; "
+ "wanted a 1darray" % v.ndim)
+ length = v.shape[0]
+ ba = bytearray(16 + 8*length)
+ header = ndarray(shape=[2], buffer=ba, dtype="int64")
+ header[0] = 1
+ header[1] = length
+ copyto(ndarray(shape=[length], buffer=ba, offset=16,
+ dtype="float64"), v)
+ return ba
+
+def _deserialize_double_vector(ba):
+ """Deserialize a double vector from a mutually understood format.
+
+ >>> x = array([1.0, 2.0, 3.0, 4.0, -1.0, 0.0, -0.0])
+ >>> array_equal(x, _deserialize_double_vector(_serialize_double_vector(x)))
+ True
+ """
+ if type(ba) != bytearray:
+ raise TypeError("_deserialize_double_vector called on a %s; "
+ "wanted bytearray" % type(ba))
+ if len(ba) < 16:
+ raise TypeError("_deserialize_double_vector called on a %d-byte array, "
+ "which is too short" % len(ba))
+ if (len(ba) & 7) != 0:
+ raise TypeError("_deserialize_double_vector called on a %d-byte array, "
+ "which is not a multiple of 8" % len(ba))
+ header = ndarray(shape=[2], buffer=ba, dtype="int64")
+ if header[0] != 1:
+ raise TypeError("_deserialize_double_vector called on bytearray "
+ "with wrong magic")
+ length = header[1]
+ if len(ba) != 8*length + 16:
+ raise TypeError("_deserialize_double_vector called on bytearray "
+ "with wrong length")
+ return _deserialize_byte_array([length], ba, 16)
+
+def _serialize_double_matrix(m):
+ """Serialize a double matrix into a mutually understood format."""
+ if (type(m) == ndarray and m.dtype == float64 and m.ndim == 2):
+ rows = m.shape[0]
+ cols = m.shape[1]
+ ba = bytearray(24 + 8 * rows * cols)
+ header = ndarray(shape=[3], buffer=ba, dtype="int64")
+ header[0] = 2
+ header[1] = rows
+ header[2] = cols
+ copyto(ndarray(shape=[rows, cols], buffer=ba, offset=24,
+ dtype="float64", order='C'), m)
+ return ba
+ else:
+ raise TypeError("_serialize_double_matrix called on a "
+ "non-double-matrix")
+
+def _deserialize_double_matrix(ba):
+ """Deserialize a double matrix from a mutually understood format."""
+ if type(ba) != bytearray:
+ raise TypeError("_deserialize_double_matrix called on a %s; "
+ "wanted bytearray" % type(ba))
+ if len(ba) < 24:
+ raise TypeError("_deserialize_double_matrix called on a %d-byte array, "
+ "which is too short" % len(ba))
+ if (len(ba) & 7) != 0:
+ raise TypeError("_deserialize_double_matrix called on a %d-byte array, "
+ "which is not a multiple of 8" % len(ba))
+ header = ndarray(shape=[3], buffer=ba, dtype="int64")
+ if (header[0] != 2):
+ raise TypeError("_deserialize_double_matrix called on bytearray "
+ "with wrong magic")
+ rows = header[1]
+ cols = header[2]
+ if (len(ba) != 8*rows*cols + 24):
+ raise TypeError("_deserialize_double_matrix called on bytearray "
+ "with wrong length")
+ return _deserialize_byte_array([rows, cols], ba, 24)
+
+def _linear_predictor_typecheck(x, coeffs):
+ """Check that x is a one-dimensional vector of the right shape.
+ This is a temporary hackaround until I actually implement bulk predict."""
+ if type(x) == ndarray:
+ if x.ndim == 1:
+ if x.shape == coeffs.shape:
+ pass
+ else:
+ raise RuntimeError("Got array of %d elements; wanted %d"
+ % (shape(x)[0], shape(coeffs)[0]))
+ else:
+ raise RuntimeError("Bulk predict not yet supported.")
+ elif (type(x) == RDD):
+ raise RuntimeError("Bulk predict not yet supported.")
+ else:
+ raise TypeError("Argument of type " + type(x) + " unsupported")
+
+def _get_unmangled_rdd(data, serializer):
+ dataBytes = data.map(serializer)
+ dataBytes._bypass_serializer = True
+ dataBytes.cache()
+ return dataBytes
+
+# Map a pickled Python RDD of numpy double vectors to a Java RDD of
+# _serialized_double_vectors
+def _get_unmangled_double_vector_rdd(data):
+ return _get_unmangled_rdd(data, _serialize_double_vector)
+
+class LinearModel(object):
+ """Something that has a vector of coefficients and an intercept."""
+ def __init__(self, coeff, intercept):
+ self._coeff = coeff
+ self._intercept = intercept
+
+class LinearRegressionModelBase(LinearModel):
+ """A linear regression model.
+
+ >>> lrmb = LinearRegressionModelBase(array([1.0, 2.0]), 0.1)
+ >>> abs(lrmb.predict(array([-1.03, 7.777])) - 14.624) < 1e-6
+ True
+ """
+ def predict(self, x):
+ """Predict the value of the dependent variable given a vector x"""
+ """containing values for the independent variables."""
+ _linear_predictor_typecheck(x, self._coeff)
+ return dot(self._coeff, x) + self._intercept
+
+# If we weren't given initial weights, take a zero vector of the appropriate
+# length.
+def _get_initial_weights(initial_weights, data):
+ if initial_weights is None:
+ initial_weights = data.first()
+ if type(initial_weights) != ndarray:
+ raise TypeError("At least one data element has type "
+ + type(initial_weights) + " which is not ndarray")
+ if initial_weights.ndim != 1:
+ raise TypeError("At least one data element has "
+ + initial_weights.ndim + " dimensions, which is not 1")
+ initial_weights = zeros([initial_weights.shape[0] - 1])
+ return initial_weights
+
+# train_func should take two parameters, namely data and initial_weights, and
+# return the result of a call to the appropriate JVM stub.
+# _regression_train_wrapper is responsible for setup and error checking.
+def _regression_train_wrapper(sc, train_func, klass, data, initial_weights):
+ initial_weights = _get_initial_weights(initial_weights, data)
+ dataBytes = _get_unmangled_double_vector_rdd(data)
+ ans = train_func(dataBytes, _serialize_double_vector(initial_weights))
+ if len(ans) != 2:
+ raise RuntimeError("JVM call result had unexpected length")
+ elif type(ans[0]) != bytearray:
+ raise RuntimeError("JVM call result had first element of type "
+ + type(ans[0]) + " which is not bytearray")
+ elif type(ans[1]) != float:
+ raise RuntimeError("JVM call result had second element of type "
+ + type(ans[0]) + " which is not float")
+ return klass(_deserialize_double_vector(ans[0]), ans[1])
+
+def _serialize_rating(r):
+ ba = bytearray(16)
+ intpart = ndarray(shape=[2], buffer=ba, dtype=int32)
+ doublepart = ndarray(shape=[1], buffer=ba, dtype=float64, offset=8)
+ intpart[0], intpart[1], doublepart[0] = r
+ return ba
+
+def _test():
+ import doctest
+ globs = globals().copy()
+ globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
+ (failure_count, test_count) = doctest.testmod(globs=globs,
+ optionflags=doctest.ELLIPSIS)
+ globs['sc'].stop()
+ if failure_count:
+ exit(-1)
+
+if __name__ == "__main__":
+ _test()
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/05163057/python/pyspark/mllib/classification.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/classification.py b/python/pyspark/mllib/classification.py
new file mode 100644
index 0000000..70de332
--- /dev/null
+++ b/python/pyspark/mllib/classification.py
@@ -0,0 +1,86 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from numpy import array, dot, shape
+from pyspark import SparkContext
+from pyspark.mllib._common import \
+ _get_unmangled_rdd, _get_unmangled_double_vector_rdd, \
+ _serialize_double_matrix, _deserialize_double_matrix, \
+ _serialize_double_vector, _deserialize_double_vector, \
+ _get_initial_weights, _serialize_rating, _regression_train_wrapper, \
+ LinearModel, _linear_predictor_typecheck
+from math import exp, log
+
+class LogisticRegressionModel(LinearModel):
+ """A linear binary classification model derived from logistic regression.
+
+ >>> data = array([0.0, 0.0, 1.0, 1.0, 1.0, 2.0, 1.0, 3.0]).reshape(4,2)
+ >>> lrm = LogisticRegressionWithSGD.train(sc, sc.parallelize(data))
+ >>> lrm.predict(array([1.0])) != None
+ True
+ """
+ def predict(self, x):
+ _linear_predictor_typecheck(x, self._coeff)
+ margin = dot(x, self._coeff) + self._intercept
+ prob = 1/(1 + exp(-margin))
+ return 1 if prob > 0.5 else 0
+
+class LogisticRegressionWithSGD(object):
+ @classmethod
+ def train(cls, sc, data, iterations=100, step=1.0,
+ mini_batch_fraction=1.0, initial_weights=None):
+ """Train a logistic regression model on the given data."""
+ return _regression_train_wrapper(sc, lambda d, i:
+ sc._jvm.PythonMLLibAPI().trainLogisticRegressionModelWithSGD(d._jrdd,
+ iterations, step, mini_batch_fraction, i),
+ LogisticRegressionModel, data, initial_weights)
+
+class SVMModel(LinearModel):
+ """A support vector machine.
+
+ >>> data = array([0.0, 0.0, 1.0, 1.0, 1.0, 2.0, 1.0, 3.0]).reshape(4,2)
+ >>> svm = SVMWithSGD.train(sc, sc.parallelize(data))
+ >>> svm.predict(array([1.0])) != None
+ True
+ """
+ def predict(self, x):
+ _linear_predictor_typecheck(x, self._coeff)
+ margin = dot(x, self._coeff) + self._intercept
+ return 1 if margin >= 0 else 0
+
+class SVMWithSGD(object):
+ @classmethod
+ def train(cls, sc, data, iterations=100, step=1.0, reg_param=1.0,
+ mini_batch_fraction=1.0, initial_weights=None):
+ """Train a support vector machine on the given data."""
+ return _regression_train_wrapper(sc, lambda d, i:
+ sc._jvm.PythonMLLibAPI().trainSVMModelWithSGD(d._jrdd,
+ iterations, step, reg_param, mini_batch_fraction, i),
+ SVMModel, data, initial_weights)
+
+def _test():
+ import doctest
+ globs = globals().copy()
+ globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
+ (failure_count, test_count) = doctest.testmod(globs=globs,
+ optionflags=doctest.ELLIPSIS)
+ globs['sc'].stop()
+ if failure_count:
+ exit(-1)
+
+if __name__ == "__main__":
+ _test()
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/05163057/python/pyspark/mllib/clustering.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py
new file mode 100644
index 0000000..8cf20e5
--- /dev/null
+++ b/python/pyspark/mllib/clustering.py
@@ -0,0 +1,79 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from numpy import array, dot
+from math import sqrt
+from pyspark import SparkContext
+from pyspark.mllib._common import \
+ _get_unmangled_rdd, _get_unmangled_double_vector_rdd, \
+ _serialize_double_matrix, _deserialize_double_matrix, \
+ _serialize_double_vector, _deserialize_double_vector, \
+ _get_initial_weights, _serialize_rating, _regression_train_wrapper
+
+class KMeansModel(object):
+ """A clustering model derived from the k-means method.
+
+ >>> data = array([0.0,0.0, 1.0,1.0, 9.0,8.0, 8.0,9.0]).reshape(4,2)
+ >>> clusters = KMeans.train(sc, sc.parallelize(data), 2, maxIterations=10, runs=30, initialization_mode="random")
+ >>> clusters.predict(array([0.0, 0.0])) == clusters.predict(array([1.0, 1.0]))
+ True
+ >>> clusters.predict(array([8.0, 9.0])) == clusters.predict(array([9.0, 8.0]))
+ True
+ >>> clusters = KMeans.train(sc, sc.parallelize(data), 2)
+ """
+ def __init__(self, centers_):
+ self.centers = centers_
+
+ def predict(self, x):
+ """Find the cluster to which x belongs in this model."""
+ best = 0
+ best_distance = 1e75
+ for i in range(0, self.centers.shape[0]):
+ diff = x - self.centers[i]
+ distance = sqrt(dot(diff, diff))
+ if distance < best_distance:
+ best = i
+ best_distance = distance
+ return best
+
+class KMeans(object):
+ @classmethod
+ def train(cls, sc, data, k, maxIterations=100, runs=1,
+ initialization_mode="k-means||"):
+ """Train a k-means clustering model."""
+ dataBytes = _get_unmangled_double_vector_rdd(data)
+ ans = sc._jvm.PythonMLLibAPI().trainKMeansModel(dataBytes._jrdd,
+ k, maxIterations, runs, initialization_mode)
+ if len(ans) != 1:
+ raise RuntimeError("JVM call result had unexpected length")
+ elif type(ans[0]) != bytearray:
+ raise RuntimeError("JVM call result had first element of type "
+ + type(ans[0]) + " which is not bytearray")
+ return KMeansModel(_deserialize_double_matrix(ans[0]))
+
+def _test():
+ import doctest
+ globs = globals().copy()
+ globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
+ (failure_count, test_count) = doctest.testmod(globs=globs,
+ optionflags=doctest.ELLIPSIS)
+ globs['sc'].stop()
+ if failure_count:
+ exit(-1)
+
+if __name__ == "__main__":
+ _test()
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/05163057/python/pyspark/mllib/recommendation.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/recommendation.py b/python/pyspark/mllib/recommendation.py
new file mode 100644
index 0000000..14d06cb
--- /dev/null
+++ b/python/pyspark/mllib/recommendation.py
@@ -0,0 +1,74 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from pyspark import SparkContext
+from pyspark.mllib._common import \
+ _get_unmangled_rdd, _get_unmangled_double_vector_rdd, \
+ _serialize_double_matrix, _deserialize_double_matrix, \
+ _serialize_double_vector, _deserialize_double_vector, \
+ _get_initial_weights, _serialize_rating, _regression_train_wrapper
+
+class MatrixFactorizationModel(object):
+ """A matrix factorisation model trained by regularized alternating
+ least-squares.
+
+ >>> r1 = (1, 1, 1.0)
+ >>> r2 = (1, 2, 2.0)
+ >>> r3 = (2, 1, 2.0)
+ >>> ratings = sc.parallelize([r1, r2, r3])
+ >>> model = ALS.trainImplicit(sc, ratings, 1)
+ >>> model.predict(2,2) is not None
+ True
+ """
+
+ def __init__(self, sc, java_model):
+ self._context = sc
+ self._java_model = java_model
+
+ def __del__(self):
+ self._context._gateway.detach(self._java_model)
+
+ def predict(self, user, product):
+ return self._java_model.predict(user, product)
+
+class ALS(object):
+ @classmethod
+ def train(cls, sc, ratings, rank, iterations=5, lambda_=0.01, blocks=-1):
+ ratingBytes = _get_unmangled_rdd(ratings, _serialize_rating)
+ mod = sc._jvm.PythonMLLibAPI().trainALSModel(ratingBytes._jrdd,
+ rank, iterations, lambda_, blocks)
+ return MatrixFactorizationModel(sc, mod)
+
+ @classmethod
+ def trainImplicit(cls, sc, ratings, rank, iterations=5, lambda_=0.01, blocks=-1, alpha=0.01):
+ ratingBytes = _get_unmangled_rdd(ratings, _serialize_rating)
+ mod = sc._jvm.PythonMLLibAPI().trainImplicitALSModel(ratingBytes._jrdd,
+ rank, iterations, lambda_, blocks, alpha)
+ return MatrixFactorizationModel(sc, mod)
+
+def _test():
+ import doctest
+ globs = globals().copy()
+ globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
+ (failure_count, test_count) = doctest.testmod(globs=globs,
+ optionflags=doctest.ELLIPSIS)
+ globs['sc'].stop()
+ if failure_count:
+ exit(-1)
+
+if __name__ == "__main__":
+ _test()
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/05163057/python/pyspark/mllib/regression.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/regression.py b/python/pyspark/mllib/regression.py
new file mode 100644
index 0000000..a3a68b2
--- /dev/null
+++ b/python/pyspark/mllib/regression.py
@@ -0,0 +1,110 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from numpy import array, dot
+from pyspark import SparkContext
+from pyspark.mllib._common import \
+ _get_unmangled_rdd, _get_unmangled_double_vector_rdd, \
+ _serialize_double_matrix, _deserialize_double_matrix, \
+ _serialize_double_vector, _deserialize_double_vector, \
+ _get_initial_weights, _serialize_rating, _regression_train_wrapper, \
+ _linear_predictor_typecheck
+
+class LinearModel(object):
+ """Something that has a vector of coefficients and an intercept."""
+ def __init__(self, coeff, intercept):
+ self._coeff = coeff
+ self._intercept = intercept
+
+class LinearRegressionModelBase(LinearModel):
+ """A linear regression model.
+
+ >>> lrmb = LinearRegressionModelBase(array([1.0, 2.0]), 0.1)
+ >>> abs(lrmb.predict(array([-1.03, 7.777])) - 14.624) < 1e-6
+ True
+ """
+ def predict(self, x):
+ """Predict the value of the dependent variable given a vector x"""
+ """containing values for the independent variables."""
+ _linear_predictor_typecheck(x, self._coeff)
+ return dot(self._coeff, x) + self._intercept
+
+class LinearRegressionModel(LinearRegressionModelBase):
+ """A linear regression model derived from a least-squares fit.
+
+ >>> data = array([0.0, 0.0, 1.0, 1.0, 3.0, 2.0, 2.0, 3.0]).reshape(4,2)
+ >>> lrm = LinearRegressionWithSGD.train(sc, sc.parallelize(data), initial_weights=array([1.0]))
+ """
+
+class LinearRegressionWithSGD(object):
+ @classmethod
+ def train(cls, sc, data, iterations=100, step=1.0,
+ mini_batch_fraction=1.0, initial_weights=None):
+ """Train a linear regression model on the given data."""
+ return _regression_train_wrapper(sc, lambda d, i:
+ sc._jvm.PythonMLLibAPI().trainLinearRegressionModelWithSGD(
+ d._jrdd, iterations, step, mini_batch_fraction, i),
+ LinearRegressionModel, data, initial_weights)
+
+class LassoModel(LinearRegressionModelBase):
+ """A linear regression model derived from a least-squares fit with an
+ l_1 penalty term.
+
+ >>> data = array([0.0, 0.0, 1.0, 1.0, 3.0, 2.0, 2.0, 3.0]).reshape(4,2)
+ >>> lrm = LassoWithSGD.train(sc, sc.parallelize(data), initial_weights=array([1.0]))
+ """
+
+class LassoWithSGD(object):
+ @classmethod
+ def train(cls, sc, data, iterations=100, step=1.0, reg_param=1.0,
+ mini_batch_fraction=1.0, initial_weights=None):
+ """Train a Lasso regression model on the given data."""
+ return _regression_train_wrapper(sc, lambda d, i:
+ sc._jvm.PythonMLLibAPI().trainLassoModelWithSGD(d._jrdd,
+ iterations, step, reg_param, mini_batch_fraction, i),
+ LassoModel, data, initial_weights)
+
+class RidgeRegressionModel(LinearRegressionModelBase):
+ """A linear regression model derived from a least-squares fit with an
+ l_2 penalty term.
+
+ >>> data = array([0.0, 0.0, 1.0, 1.0, 3.0, 2.0, 2.0, 3.0]).reshape(4,2)
+ >>> lrm = RidgeRegressionWithSGD.train(sc, sc.parallelize(data), initial_weights=array([1.0]))
+ """
+
+class RidgeRegressionWithSGD(object):
+ @classmethod
+ def train(cls, sc, data, iterations=100, step=1.0, reg_param=1.0,
+ mini_batch_fraction=1.0, initial_weights=None):
+ """Train a ridge regression model on the given data."""
+ return _regression_train_wrapper(sc, lambda d, i:
+ sc._jvm.PythonMLLibAPI().trainRidgeModelWithSGD(d._jrdd,
+ iterations, step, reg_param, mini_batch_fraction, i),
+ RidgeRegressionModel, data, initial_weights)
+
+def _test():
+ import doctest
+ globs = globals().copy()
+ globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
+ (failure_count, test_count) = doctest.testmod(globs=globs,
+ optionflags=doctest.ELLIPSIS)
+ globs['sc'].stop()
+ if failure_count:
+ exit(-1)
+
+if __name__ == "__main__":
+ _test()
[18/28] git commit: Release JVM reference to the ALSModel when done.
Posted by ma...@apache.org.
Release JVM reference to the ALSModel when done.
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/cbb28111
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/cbb28111
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/cbb28111
Branch: refs/heads/master
Commit: cbb28111896844a0fd94346cd9c6f9926c706555
Parents: 20f85ec
Author: Tor Myklebust <tm...@gmail.com>
Authored: Sun Dec 22 15:03:58 2013 -0500
Committer: Tor Myklebust <tm...@gmail.com>
Committed: Sun Dec 22 15:03:58 2013 -0500
----------------------------------------------------------------------
python/pyspark/mllib.py | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/cbb28111/python/pyspark/mllib.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib.py b/python/pyspark/mllib.py
index 22187eb..1f5a5f6 100644
--- a/python/pyspark/mllib.py
+++ b/python/pyspark/mllib.py
@@ -357,8 +357,8 @@ class ALSModel(object):
self._context = sc
self._java_model = java_model
- #def __del__(self):
- #self._gateway.detach(self._java_model)
+ def __del__(self):
+ self._context._gateway.detach(self._java_model)
def predict(self, user, product):
return self._java_model.predict(user, product)
[07/28] git commit: Scala classification and clustering stubs;
matrix serialization/deserialization.
Posted by ma...@apache.org.
Scala classification and clustering stubs; matrix serialization/deserialization.
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/f99970e8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/f99970e8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/f99970e8
Branch: refs/heads/master
Commit: f99970e8cdc85eae33999b57a4c5c1893fe3727a
Parents: 2328bdd
Author: Tor Myklebust <tm...@gmail.com>
Authored: Fri Dec 20 00:12:22 2013 -0500
Committer: Tor Myklebust <tm...@gmail.com>
Committed: Fri Dec 20 00:12:22 2013 -0500
----------------------------------------------------------------------
.../apache/spark/mllib/api/PythonMLLibAPI.scala | 82 +++++++++++++++++++-
1 file changed, 79 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f99970e8/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala
index c9bd7c6..bcf2f07 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala
@@ -1,5 +1,7 @@
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.mllib.regression._
+import org.apache.spark.mllib.classification._
+import org.apache.spark.mllib.clustering._
import org.apache.spark.rdd.RDD
import java.nio.ByteBuffer
import java.nio.ByteOrder
@@ -39,6 +41,52 @@ class PythonMLLibAPI extends Serializable {
return bytes
}
+ def deserializeDoubleMatrix(bytes: Array[Byte]): Array[Array[Double]] = {
+ val packetLength = bytes.length
+ if (packetLength < 24) {
+ throw new IllegalArgumentException("Byte array too short.")
+ }
+ val bb = ByteBuffer.wrap(bytes)
+ bb.order(ByteOrder.nativeOrder())
+ val magic = bb.getLong()
+ if (magic != 2) {
+ throw new IllegalArgumentException("Magic " + magic + " is wrong.")
+ }
+ val rows = bb.getLong()
+ val cols = bb.getLong()
+ if (packetLength != 24 + 8 * rows * cols) {
+ throw new IllegalArgumentException("Size " + rows + "x" + cols + "is wrong.")
+ }
+ val db = bb.asDoubleBuffer()
+ val ans = new Array[Array[Double]](rows.toInt)
+ var i = 0
+ for (i <- 0 until rows.toInt) {
+ ans(i) = new Array[Double](cols.toInt)
+ db.get(ans(i))
+ }
+ return ans
+ }
+
+ def serializeDoubleMatrix(doubles: Array[Array[Double]]): Array[Byte] = {
+ val rows = doubles.length
+ var cols = 0
+ if (rows > 0) {
+ cols = doubles(0).length
+ }
+ val bytes = new Array[Byte](24 + 8 * rows * cols)
+ val bb = ByteBuffer.wrap(bytes)
+ bb.order(ByteOrder.nativeOrder())
+ bb.putLong(2)
+ bb.putLong(rows)
+ bb.putLong(cols)
+ val db = bb.asDoubleBuffer()
+ var i = 0
+ for (i <- 0 until rows) {
+ db.put(doubles(i))
+ }
+ return bytes
+ }
+
def trainRegressionModel(trainFunc: (RDD[LabeledPoint], Array[Double]) => GeneralizedLinearModel,
dataBytesJRDD: JavaRDD[Array[Byte]], initialWeightsBA: Array[Byte]):
java.util.LinkedList[java.lang.Object] = {
@@ -60,7 +108,7 @@ class PythonMLLibAPI extends Serializable {
return trainRegressionModel((data, initialWeights) =>
LinearRegressionWithSGD.train(data, numIterations, stepSize,
miniBatchFraction, initialWeights),
- dataBytesJRDD, initialWeightsBA);
+ dataBytesJRDD, initialWeightsBA)
}
def trainLassoModel(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int,
@@ -69,7 +117,7 @@ class PythonMLLibAPI extends Serializable {
return trainRegressionModel((data, initialWeights) =>
LassoWithSGD.train(data, numIterations, stepSize, regParam,
miniBatchFraction, initialWeights),
- dataBytesJRDD, initialWeightsBA);
+ dataBytesJRDD, initialWeightsBA)
}
def trainRidgeModel(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int,
@@ -78,6 +126,34 @@ class PythonMLLibAPI extends Serializable {
return trainRegressionModel((data, initialWeights) =>
RidgeRegressionWithSGD.train(data, numIterations, stepSize, regParam,
miniBatchFraction, initialWeights),
- dataBytesJRDD, initialWeightsBA);
+ dataBytesJRDD, initialWeightsBA)
+ }
+
+ def trainSVMModel(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int,
+ stepSize: Double, regParam: Double, miniBatchFraction: Double,
+ initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
+ return trainRegressionModel((data, initialWeights) =>
+ SVMWithSGD.train(data, numIterations, stepSize, regParam,
+ miniBatchFraction, initialWeights),
+ dataBytesJRDD, initialWeightsBA)
+ }
+
+ def trainLogisticRegressionModel(dataBytesJRDD: JavaRDD[Array[Byte]],
+ numIterations: Int, stepSize: Double, miniBatchFraction: Double,
+ initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
+ return trainRegressionModel((data, initialWeights) =>
+ LogisticRegressionWithSGD.train(data, numIterations, stepSize,
+ miniBatchFraction, initialWeights),
+ dataBytesJRDD, initialWeightsBA)
+ }
+
+ def trainKMeansModel(dataBytesJRDD: JavaRDD[Array[Byte]], k: Int,
+ maxIterations: Int, runs: Int, initializationMode: String):
+ java.util.List[java.lang.Object] = {
+ val data = dataBytesJRDD.rdd.map(xBytes => deserializeDoubleVector(xBytes))
+ val model = KMeans.train(data, k, maxIterations, runs, initializationMode)
+ val ret = new java.util.LinkedList[java.lang.Object]()
+ ret.add(serializeDoubleMatrix(model.clusterCenters))
+ return ret
}
}
[28/28] git commit: Merge pull request #283 from tmyklebu/master
Posted by ma...@apache.org.
Merge pull request #283 from tmyklebu/master
Python bindings for mllib
This pull request contains Python bindings for the regression, clustering, classification, and recommendation tools in mllib.
For each 'train' frontend exposed, there is a Scala stub in PythonMLLibAPI.scala and a Python stub in mllib.py. The Python stub serialises the input RDD and any vector/matrix arguments into a mutually-understood format and calls the Scala stub. The Scala stub deserialises the RDD and the vector/matrix arguments, calls the appropriate 'train' function, serialises the resulting model, and returns the serialised model.
ALSModel is slightly different since a MatrixFactorizationModel has RDDs inside. The Scala stub returns a handle to a Scala MatrixFactorizationModel; prediction is done by calling the Scala predict method.
I have tested these bindings on an x86_64 machine running Linux. There is a risk that these bindings may fail on some choose-your-own-endian platform if Python's endian differs from java.nio.ByteBuffer's idea of the native byte order.
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/c344ed04
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/c344ed04
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/c344ed04
Branch: refs/heads/master
Commit: c344ed04c7d65d64e87bb50ad6eba57534945398
Parents: 56094bc 9cbcf81
Author: Matei Zaharia <ma...@databricks.com>
Authored: Thu Dec 26 01:31:06 2013 -0500
Committer: Matei Zaharia <ma...@databricks.com>
Committed: Thu Dec 26 01:31:06 2013 -0500
----------------------------------------------------------------------
.../spark/mllib/api/python/PythonMLLibAPI.scala | 232 +++++++++++++++++++
python/pyspark/java_gateway.py | 1 +
python/pyspark/mllib/__init__.py | 20 ++
python/pyspark/mllib/_common.py | 227 ++++++++++++++++++
python/pyspark/mllib/classification.py | 86 +++++++
python/pyspark/mllib/clustering.py | 79 +++++++
python/pyspark/mllib/recommendation.py | 74 ++++++
python/pyspark/mllib/regression.py | 110 +++++++++
python/pyspark/serializers.py | 2 +-
9 files changed, 830 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
[05/28] git commit: Bindings for linear, Lasso, and ridge regression.
Posted by ma...@apache.org.
Bindings for linear, Lasso, and ridge regression.
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/ded67ee9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/ded67ee9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/ded67ee9
Branch: refs/heads/master
Commit: ded67ee90c2c0b22d67e623156a3f6cce8573abd
Parents: 2a41c9a
Author: Tor Myklebust <tm...@gmail.com>
Authored: Thu Dec 19 22:42:12 2013 -0500
Committer: Tor Myklebust <tm...@gmail.com>
Committed: Thu Dec 19 22:42:12 2013 -0500
----------------------------------------------------------------------
.../apache/spark/mllib/api/PythonMLLibAPI.scala | 42 +++++++++++++++++---
1 file changed, 37 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/ded67ee9/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala
index 3daf5dc..c9bd7c6 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala
@@ -1,5 +1,6 @@
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.mllib.regression._
+import org.apache.spark.rdd.RDD
import java.nio.ByteBuffer
import java.nio.ByteOrder
import java.nio.DoubleBuffer
@@ -38,14 +39,45 @@ class PythonMLLibAPI extends Serializable {
return bytes
}
- def trainLinearRegressionModel(dataBytesJRDD: JavaRDD[Array[Byte]]):
- java.util.List[java.lang.Object] = {
- val data = dataBytesJRDD.rdd.map(x => deserializeDoubleVector(x))
- .map(v => LabeledPoint(v(0), v.slice(1, v.length)))
- val model = LinearRegressionWithSGD.train(data, 222)
+ def trainRegressionModel(trainFunc: (RDD[LabeledPoint], Array[Double]) => GeneralizedLinearModel,
+ dataBytesJRDD: JavaRDD[Array[Byte]], initialWeightsBA: Array[Byte]):
+ java.util.LinkedList[java.lang.Object] = {
+ val data = dataBytesJRDD.rdd.map(xBytes => {
+ val x = deserializeDoubleVector(xBytes)
+ LabeledPoint(x(0), x.slice(1, x.length))
+ })
+ val initialWeights = deserializeDoubleVector(initialWeightsBA)
+ val model = trainFunc(data, initialWeights)
val ret = new java.util.LinkedList[java.lang.Object]()
ret.add(serializeDoubleVector(model.weights))
ret.add(model.intercept: java.lang.Double)
return ret
}
+
+ def trainLinearRegressionModel(dataBytesJRDD: JavaRDD[Array[Byte]],
+ numIterations: Int, stepSize: Double, miniBatchFraction: Double,
+ initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
+ return trainRegressionModel((data, initialWeights) =>
+ LinearRegressionWithSGD.train(data, numIterations, stepSize,
+ miniBatchFraction, initialWeights),
+ dataBytesJRDD, initialWeightsBA);
+ }
+
+ def trainLassoModel(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int,
+ stepSize: Double, regParam: Double, miniBatchFraction: Double,
+ initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
+ return trainRegressionModel((data, initialWeights) =>
+ LassoWithSGD.train(data, numIterations, stepSize, regParam,
+ miniBatchFraction, initialWeights),
+ dataBytesJRDD, initialWeightsBA);
+ }
+
+ def trainRidgeModel(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int,
+ stepSize: Double, regParam: Double, miniBatchFraction: Double,
+ initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
+ return trainRegressionModel((data, initialWeights) =>
+ RidgeRegressionWithSGD.train(data, numIterations, stepSize, regParam,
+ miniBatchFraction, initialWeights),
+ dataBytesJRDD, initialWeightsBA);
+ }
}
[10/28] git commit: Remove gigantic endian-specific test and
exception tests.
Posted by ma...@apache.org.
Remove gigantic endian-specific test and exception tests.
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/319520b9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/319520b9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/319520b9
Branch: refs/heads/master
Commit: 319520b9bb0071527a0be1e0e545ca084ac090ee
Parents: 2940201
Author: Tor Myklebust <tm...@gmail.com>
Authored: Fri Dec 20 01:48:44 2013 -0500
Committer: Tor Myklebust <tm...@gmail.com>
Committed: Fri Dec 20 01:48:44 2013 -0500
----------------------------------------------------------------------
python/pyspark/mllib.py | 41 +++--------------------------------------
1 file changed, 3 insertions(+), 38 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/319520b9/python/pyspark/mllib.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib.py b/python/pyspark/mllib.py
index aa9fc76..e7e2216 100644
--- a/python/pyspark/mllib.py
+++ b/python/pyspark/mllib.py
@@ -27,40 +27,7 @@ def _deserialize_byte_array(shape, ba, offset):
return ar.copy()
def _serialize_double_vector(v):
- """Serialize a double vector into a mutually understood format.
-
- >>> _serialize_double_vector(array([]))
- bytearray(b'\\x01\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00')
- >>> _serialize_double_vector(array([0.0, 1.0]))
- bytearray(b'\\x01\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x02\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\xf0?')
- >>> _serialize_double_vector("hello, world")
- Traceback (most recent call last):
- File "/usr/lib/python2.7/doctest.py", line 1289, in __run
- compileflags, 1) in test.globs
- File "<doctest __main__._serialize_double_vector[1]>", line 1, in <module>
- _serialize_double_vector("hello, world")
- File "python/pyspark/mllib.py", line 41, in _serialize_double_vector
- raise TypeError("_serialize_double_vector called on a %s; wanted ndarray" % type(v))
- TypeError: _serialize_double_vector called on a <type 'str'>; wanted ndarray
- >>> _serialize_double_vector(array([0, 1]))
- Traceback (most recent call last):
- File "/usr/lib/python2.7/doctest.py", line 1289, in __run
- compileflags, 1) in test.globs
- File "<doctest __main__._serialize_double_vector[2]>", line 1, in <module>
- _serialize_double_vector(array([0, 1]))
- File "python/pyspark/mllib.py", line 51, in _serialize_double_vector
- "wanted ndarray of float64" % v.dtype)
- TypeError: _serialize_double_vector called on an ndarray of int64; wanted ndarray of float64
- >>> _serialize_double_vector(array([0.0, 1.0, 2.0, 3.0]).reshape(2,2))
- Traceback (most recent call last):
- File "/usr/lib/python2.7/doctest.py", line 1289, in __run
- compileflags, 1) in test.globs
- File "<doctest __main__._serialize_double_vector[3]>", line 1, in <module>
- _serialize_double_vector(array([0.0, 1.0, 2.0, 3.0]).reshape(2,2))
- File "python/pyspark/mllib.py", line 62, in _serialize_double_vector
- "wanted a 1darray" % v.ndim)
- TypeError: _serialize_double_vector called on a 2darray; wanted a 1darray
- """
+ """Serialize a double vector into a mutually understood format."""
if type(v) != ndarray:
raise TypeError("_serialize_double_vector called on a %s; "
"wanted ndarray" % type(v))
@@ -106,8 +73,7 @@ def _deserialize_double_vector(ba):
return _deserialize_byte_array([length], ba, 16)
def _serialize_double_matrix(m):
- """Serialize a double matrix into a mutually understood format.
- """
+ """Serialize a double matrix into a mutually understood format."""
if (type(m) == ndarray and m.dtype == float64 and m.ndim == 2):
rows = m.shape[0]
cols = m.shape[1]
@@ -124,8 +90,7 @@ def _serialize_double_matrix(m):
"non-double-matrix")
def _deserialize_double_matrix(ba):
- """Deserialize a double matrix from a mutually understood format.
- """
+ """Deserialize a double matrix from a mutually understood format."""
if type(ba) != bytearray:
raise TypeError("_deserialize_double_matrix called on a %s; "
"wanted bytearray" % type(ba))
[12/28] git commit: Licence notice.
Posted by ma...@apache.org.
Licence notice.
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/b835ddf3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/b835ddf3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/b835ddf3
Branch: refs/heads/master
Commit: b835ddf3dffe8698dab3b42c14a9da472868b13c
Parents: d89cc1e
Author: Tor Myklebust <tm...@gmail.com>
Authored: Fri Dec 20 01:55:03 2013 -0500
Committer: Tor Myklebust <tm...@gmail.com>
Committed: Fri Dec 20 01:55:03 2013 -0500
----------------------------------------------------------------------
.../apache/spark/mllib/api/PythonMLLibAPI.scala | 17 +++++++++++++++++
python/pyspark/mllib.py | 17 +++++++++++++++++
2 files changed, 34 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b835ddf3/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala
index bcf2f07..bad1f66 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.mllib.regression._
import org.apache.spark.mllib.classification._
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b835ddf3/python/pyspark/mllib.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib.py b/python/pyspark/mllib.py
index 1877404..ce1363f 100644
--- a/python/pyspark/mllib.py
+++ b/python/pyspark/mllib.py
@@ -1,3 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
from numpy import *
from pyspark import SparkContext
[26/28] git commit: Fix copypasta in __init__.py. Don't import
anything directly into pyspark.mllib.
Posted by ma...@apache.org.
Fix copypasta in __init__.py. Don't import anything directly into pyspark.mllib.
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/5e71354c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/5e71354c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/5e71354c
Branch: refs/heads/master
Commit: 5e71354cb7ff758d9a70494ca1788aebea1bbb08
Parents: 02208a1
Author: Tor Myklebust <tm...@gmail.com>
Authored: Wed Dec 25 14:10:55 2013 -0500
Committer: Tor Myklebust <tm...@gmail.com>
Committed: Wed Dec 25 14:10:55 2013 -0500
----------------------------------------------------------------------
python/pyspark/mllib/__init__.py | 34 ++++++++--------------------------
1 file changed, 8 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/5e71354c/python/pyspark/mllib/__init__.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/__init__.py b/python/pyspark/mllib/__init__.py
index 6037a3a..e9c62f3 100644
--- a/python/pyspark/mllib/__init__.py
+++ b/python/pyspark/mllib/__init__.py
@@ -16,31 +16,13 @@
#
"""
-PySpark is the Python API for Spark.
-
-Public classes:
-
- - L{SparkContext<pyspark.context.SparkContext>}
- Main entry point for Spark functionality.
- - L{RDD<pyspark.rdd.RDD>}
- A Resilient Distributed Dataset (RDD), the basic abstraction in Spark.
- - L{Broadcast<pyspark.broadcast.Broadcast>}
- A broadcast variable that gets reused across tasks.
- - L{Accumulator<pyspark.accumulators.Accumulator>}
- An "add-only" shared variable that tasks can only add values to.
- - L{SparkFiles<pyspark.files.SparkFiles>}
- Access files shipped with jobs.
- - L{StorageLevel<pyspark.storagelevel.StorageLevel>}
- Finer-grained cache persistence levels.
+Python bindings for MLlib.
"""
-import sys
-import os
-sys.path.insert(0, os.path.join(os.environ["SPARK_HOME"], "python/lib/py4j0.7.egg"))
-
-from pyspark.mllib.regression import LinearRegressionModel, LassoModel, RidgeRegressionModel, LinearRegressionWithSGD, LassoWithSGD, RidgeRegressionWithSGD
-from pyspark.mllib.classification import LogisticRegressionModel, SVMModel, LogisticRegressionWithSGD, SVMWithSGD
-from pyspark.mllib.recommendation import MatrixFactorizationModel, ALS
-from pyspark.mllib.clustering import KMeansModel, KMeans
-
-__all__ = ["LinearRegressionModel", "LassoModel", "RidgeRegressionModel", "LinearRegressionWithSGD", "LassoWithSGD", "RidgeRegressionWithSGD", "LogisticRegressionModel", "SVMModel", "LogisticRegressionWithSGD", "SVMWithSGD", "MatrixFactorizationModel", "ALS", "KMeansModel", "KMeans"]
+#from pyspark.mllib.regression import LinearRegressionModel, LassoModel, RidgeRegressionModel, LinearRegressionWithSGD, LassoWithSGD, RidgeRegressionWithSGD
+#from pyspark.mllib.classification import LogisticRegressionModel, SVMModel, LogisticRegressionWithSGD, SVMWithSGD
+#from pyspark.mllib.recommendation import MatrixFactorizationModel, ALS
+#from pyspark.mllib.clustering import KMeansModel, KMeans
+#
+#
+#__all__ = ["LinearRegressionModel", "LassoModel", "RidgeRegressionModel", "LinearRegressionWithSGD", "LassoWithSGD", "RidgeRegressionWithSGD", "LogisticRegressionModel", "SVMModel", "LogisticRegressionWithSGD", "SVMWithSGD", "MatrixFactorizationModel", "ALS", "KMeansModel", "KMeans"]
[09/28] git commit: Tests for the Python side of the mllib bindings.
Posted by ma...@apache.org.
Tests for the Python side of the mllib bindings.
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/2940201a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/2940201a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/2940201a
Branch: refs/heads/master
Commit: 2940201ad86e5dee16cf7386b3c934fc75c15582
Parents: 73e1706
Author: Tor Myklebust <tm...@gmail.com>
Authored: Fri Dec 20 01:33:32 2013 -0500
Committer: Tor Myklebust <tm...@gmail.com>
Committed: Fri Dec 20 01:33:32 2013 -0500
----------------------------------------------------------------------
python/pyspark/mllib.py | 224 +++++++++++++++++++++++++++++++++----------
1 file changed, 172 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2940201a/python/pyspark/mllib.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib.py b/python/pyspark/mllib.py
index 21f3c03..aa9fc76 100644
--- a/python/pyspark/mllib.py
+++ b/python/pyspark/mllib.py
@@ -1,4 +1,5 @@
from numpy import *
+from pyspark import SparkContext
# Double vector format:
#
@@ -7,44 +8,106 @@ from numpy import *
# Double matrix format:
#
# [8-byte 2] [8-byte rows] [8-byte cols] [rows*cols*8 bytes of data]
-#
+#
# This is all in machine-endian. That means that the Java interpreter and the
# Python interpreter must agree on what endian the machine is.
def _deserialize_byte_array(shape, ba, offset):
+ """Wrapper around ndarray aliasing hack.
+
+ >>> x = array([1.0, 2.0, 3.0, 4.0, 5.0])
+ >>> array_equal(x, _deserialize_byte_array(x.shape, x.data, 0))
+ True
+ >>> x = array([1.0, 2.0, 3.0, 4.0]).reshape(2,2)
+ >>> array_equal(x, _deserialize_byte_array(x.shape, x.data, 0))
+ True
+ """
ar = ndarray(shape=shape, buffer=ba, offset=offset, dtype="float64",
order='C')
return ar.copy()
def _serialize_double_vector(v):
- if (type(v) == ndarray and v.dtype == float64 and v.ndim == 1):
- length = v.shape[0]
- ba = bytearray(16 + 8*length)
- header = ndarray(shape=[2], buffer=ba, dtype="int64")
- header[0] = 1
- header[1] = length
- copyto(ndarray(shape=[length], buffer=ba, offset=16,
- dtype="float64"), v)
- return ba
- else:
- raise TypeError("_serialize_double_vector called on a "
- "non-double-vector")
+ """Serialize a double vector into a mutually understood format.
+
+ >>> _serialize_double_vector(array([]))
+ bytearray(b'\\x01\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00')
+ >>> _serialize_double_vector(array([0.0, 1.0]))
+ bytearray(b'\\x01\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x02\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\x00\\xf0?')
+ >>> _serialize_double_vector("hello, world")
+ Traceback (most recent call last):
+ File "/usr/lib/python2.7/doctest.py", line 1289, in __run
+ compileflags, 1) in test.globs
+ File "<doctest __main__._serialize_double_vector[1]>", line 1, in <module>
+ _serialize_double_vector("hello, world")
+ File "python/pyspark/mllib.py", line 41, in _serialize_double_vector
+ raise TypeError("_serialize_double_vector called on a %s; wanted ndarray" % type(v))
+ TypeError: _serialize_double_vector called on a <type 'str'>; wanted ndarray
+ >>> _serialize_double_vector(array([0, 1]))
+ Traceback (most recent call last):
+ File "/usr/lib/python2.7/doctest.py", line 1289, in __run
+ compileflags, 1) in test.globs
+ File "<doctest __main__._serialize_double_vector[2]>", line 1, in <module>
+ _serialize_double_vector(array([0, 1]))
+ File "python/pyspark/mllib.py", line 51, in _serialize_double_vector
+ "wanted ndarray of float64" % v.dtype)
+ TypeError: _serialize_double_vector called on an ndarray of int64; wanted ndarray of float64
+ >>> _serialize_double_vector(array([0.0, 1.0, 2.0, 3.0]).reshape(2,2))
+ Traceback (most recent call last):
+ File "/usr/lib/python2.7/doctest.py", line 1289, in __run
+ compileflags, 1) in test.globs
+ File "<doctest __main__._serialize_double_vector[3]>", line 1, in <module>
+ _serialize_double_vector(array([0.0, 1.0, 2.0, 3.0]).reshape(2,2))
+ File "python/pyspark/mllib.py", line 62, in _serialize_double_vector
+ "wanted a 1darray" % v.ndim)
+ TypeError: _serialize_double_vector called on a 2darray; wanted a 1darray
+ """
+ if type(v) != ndarray:
+ raise TypeError("_serialize_double_vector called on a %s; "
+ "wanted ndarray" % type(v))
+ if v.dtype != float64:
+ raise TypeError("_serialize_double_vector called on an ndarray of %s; "
+ "wanted ndarray of float64" % v.dtype)
+ if v.ndim != 1:
+ raise TypeError("_serialize_double_vector called on a %ddarray; "
+ "wanted a 1darray" % v.ndim)
+ length = v.shape[0]
+ ba = bytearray(16 + 8*length)
+ header = ndarray(shape=[2], buffer=ba, dtype="int64")
+ header[0] = 1
+ header[1] = length
+ copyto(ndarray(shape=[length], buffer=ba, offset=16,
+ dtype="float64"), v)
+ return ba
def _deserialize_double_vector(ba):
- if (type(ba) == bytearray and len(ba) >= 16 and (len(ba) & 7 == 0)):
- header = ndarray(shape=[2], buffer=ba, dtype="int64")
- if (header[0] != 1):
- raise TypeError("_deserialize_double_vector called on bytearray "
- "with wrong magic")
- length = header[1]
- if (len(ba) != 8*length + 16):
- raise TypeError("_deserialize_double_vector called on bytearray "
- "with wrong length")
- return _deserialize_byte_array([length], ba, 16)
- else:
- raise TypeError("_deserialize_double_vector called on a non-bytearray")
+ """Deserialize a double vector from a mutually understood format.
+
+ >>> x = array([1.0, 2.0, 3.0, 4.0, -1.0, 0.0, -0.0])
+ >>> array_equal(x, _deserialize_double_vector(_serialize_double_vector(x)))
+ True
+ """
+ if type(ba) != bytearray:
+ raise TypeError("_deserialize_double_vector called on a %s; "
+ "wanted bytearray" % type(ba))
+ if len(ba) < 16:
+ raise TypeError("_deserialize_double_vector called on a %d-byte array, "
+ "which is too short" % len(ba))
+ if (len(ba) & 7) != 0:
+ raise TypeError("_deserialize_double_vector called on a %d-byte array, "
+ "which is not a multiple of 8" % len(ba))
+ header = ndarray(shape=[2], buffer=ba, dtype="int64")
+ if header[0] != 1:
+ raise TypeError("_deserialize_double_vector called on bytearray "
+ "with wrong magic")
+ length = header[1]
+ if len(ba) != 8*length + 16:
+ raise TypeError("_deserialize_double_vector called on bytearray "
+ "with wrong length")
+ return _deserialize_byte_array([length], ba, 16)
def _serialize_double_matrix(m):
+ """Serialize a double matrix into a mutually understood format.
+ """
if (type(m) == ndarray and m.dtype == float64 and m.ndim == 2):
rows = m.shape[0]
cols = m.shape[1]
@@ -61,22 +124,31 @@ def _serialize_double_matrix(m):
"non-double-matrix")
def _deserialize_double_matrix(ba):
- if (type(ba) == bytearray and len(ba) >= 24 and (len(ba) & 7 == 0)):
- header = ndarray(shape=[3], buffer=ba, dtype="int64")
- if (header[0] != 2):
- raise TypeError("_deserialize_double_matrix called on bytearray "
- "with wrong magic")
- rows = header[1]
- cols = header[2]
- if (len(ba) != 8*rows*cols + 24):
- raise TypeError("_deserialize_double_matrix called on bytearray "
- "with wrong length")
- return _deserialize_byte_array([rows, cols], ba, 24)
- else:
- raise TypeError("_deserialize_double_matrix called on a non-bytearray")
+ """Deserialize a double matrix from a mutually understood format.
+ """
+ if type(ba) != bytearray:
+ raise TypeError("_deserialize_double_matrix called on a %s; "
+ "wanted bytearray" % type(ba))
+ if len(ba) < 24:
+ raise TypeError("_deserialize_double_matrix called on a %d-byte array, "
+ "which is too short" % len(ba))
+ if (len(ba) & 7) != 0:
+ raise TypeError("_deserialize_double_matrix called on a %d-byte array, "
+ "which is not a multiple of 8" % len(ba))
+ header = ndarray(shape=[3], buffer=ba, dtype="int64")
+ if (header[0] != 2):
+ raise TypeError("_deserialize_double_matrix called on bytearray "
+ "with wrong magic")
+ rows = header[1]
+ cols = header[2]
+ if (len(ba) != 8*rows*cols + 24):
+ raise TypeError("_deserialize_double_matrix called on bytearray "
+ "with wrong length")
+ return _deserialize_byte_array([rows, cols], ba, 24)
def _linear_predictor_typecheck(x, coeffs):
- """Predict the class of the vector x."""
+ """Check that x is a one-dimensional vector of the right shape.
+ This is a temporary hackaround until I actually implement bulk predict."""
if type(x) == ndarray:
if x.ndim == 1:
if x.shape == coeffs.shape:
@@ -98,12 +170,17 @@ class LinearModel(object):
self._intercept = intercept
class LinearRegressionModelBase(LinearModel):
- """A linear regression model."""
+ """A linear regression model.
+
+ >>> lrmb = LinearRegressionModelBase(array([1.0, 2.0]), 0.1)
+ >>> abs(lrmb.predict(array([-1.03, 7.777])) - 14.624) < 1e-6
+ True
+ """
def predict(self, x):
"""Predict the value of the dependent variable given a vector x"""
"""containing values for the independent variables."""
- _linear_predictor_typecheck(x, _coeff)
- return dot(_coeff, x) + _intercept
+ _linear_predictor_typecheck(x, self._coeff)
+ return dot(self._coeff, x) + self._intercept
# Map a pickled Python RDD of numpy double vectors to a Java RDD of
# _serialized_double_vectors
@@ -145,7 +222,11 @@ def _regression_train_wrapper(sc, train_func, klass, data, initial_weights):
return klass(_deserialize_double_vector(ans[0]), ans[1]);
class LinearRegressionModel(LinearRegressionModelBase):
- """A linear regression model derived from a least-squares fit."""
+ """A linear regression model derived from a least-squares fit.
+
+ >>> data = array([0.0, 0.0, 1.0, 1.0, 3.0, 2.0, 2.0, 3.0]).reshape(4,2)
+ >>> lrm = LinearRegressionModel.train(sc, sc.parallelize(data), initial_weights=array([1.0]))
+ """
@classmethod
def train(cls, sc, data, iterations=100, step=1.0,
mini_batch_fraction=1.0, initial_weights=None):
@@ -156,8 +237,12 @@ class LinearRegressionModel(LinearRegressionModelBase):
LinearRegressionModel, data, initial_weights)
class LassoModel(LinearRegressionModelBase):
- """A linear regression model derived from a least-squares fit with an """
- """l_1 penalty term."""
+ """A linear regression model derived from a least-squares fit with an
+ l_1 penalty term.
+
+ >>> data = array([0.0, 0.0, 1.0, 1.0, 3.0, 2.0, 2.0, 3.0]).reshape(4,2)
+ >>> lrm = LassoModel.train(sc, sc.parallelize(data), initial_weights=array([1.0]))
+ """
@classmethod
def train(cls, sc, data, iterations=100, step=1.0, reg_param=1.0,
mini_batch_fraction=1.0, initial_weights=None):
@@ -168,8 +253,12 @@ class LassoModel(LinearRegressionModelBase):
LassoModel, data, initial_weights)
class RidgeRegressionModel(LinearRegressionModelBase):
- """A linear regression model derived from a least-squares fit with an """
- """l_2 penalty term."""
+ """A linear regression model derived from a least-squares fit with an
+ l_2 penalty term.
+
+ >>> data = array([0.0, 0.0, 1.0, 1.0, 3.0, 2.0, 2.0, 3.0]).reshape(4,2)
+ >>> lrm = RidgeRegressionModel.train(sc, sc.parallelize(data), initial_weights=array([1.0]))
+ """
@classmethod
def train(cls, sc, data, iterations=100, step=1.0, reg_param=1.0,
mini_batch_fraction=1.0, initial_weights=None):
@@ -180,7 +269,11 @@ class RidgeRegressionModel(LinearRegressionModelBase):
RidgeRegressionModel, data, initial_weights)
class LogisticRegressionModel(LinearModel):
- """A linear binary classification model derived from logistic regression."""
+ """A linear binary classification model derived from logistic regression.
+
+ >>> data = array([0.0, 0.0, 1.0, 1.0, 1.0, 2.0, 1.0, 3.0]).reshape(4,2)
+ >>> lrm = LogisticRegressionModel.train(sc, sc.parallelize(data))
+ """
def predict(self, x):
_linear_predictor_typecheck(x, _coeff)
margin = dot(x, _coeff) + intercept
@@ -197,7 +290,11 @@ class LogisticRegressionModel(LinearModel):
LogisticRegressionModel, data, initial_weights)
class SVMModel(LinearModel):
- """A support vector machine."""
+ """A support vector machine.
+
+ >>> data = array([0.0, 0.0, 1.0, 1.0, 1.0, 2.0, 1.0, 3.0]).reshape(4,2)
+ >>> svm = SVMModel.train(sc, sc.parallelize(data))
+ """
def predict(self, x):
_linear_predictor_typecheck(x, _coeff)
margin = dot(x, _coeff) + intercept
@@ -212,15 +309,24 @@ class SVMModel(LinearModel):
SVMModel, data, initial_weights)
class KMeansModel(object):
- """A clustering model derived from the k-means method."""
+ """A clustering model derived from the k-means method.
+
+ >>> data = array([0.0, 0.0, 1.0,1.0, 9.0,8.0, 8.0,9.0]).reshape(4,2)
+ >>> clusters = KMeansModel.train(sc, sc.parallelize(data), 2, maxIterations=10, runs=30, initialization_mode="random")
+ >>> clusters.predict(array([0.0, 0.0])) == clusters.predict(array([1.0, 1.0]))
+ True
+ >>> clusters.predict(array([8.0, 9.0])) == clusters.predict(array([9.0, 8.0]))
+ True
+ >>> clusters = KMeansModel.train(sc, sc.parallelize(data), 2)
+ """
def __init__(self, centers_):
self.centers = centers_
def predict(self, x):
best = 0
best_distance = 1e75
- for i in range(0, centers.shape[0]):
- diff = x - centers[i]
+ for i in range(0, self.centers.shape[0]):
+ diff = x - self.centers[i]
distance = sqrt(dot(diff, diff))
if distance < best_distance:
best = i
@@ -239,3 +345,17 @@ class KMeansModel(object):
raise RuntimeError("JVM call result had first element of type "
+ type(ans[0]) + " which is not bytearray");
return KMeansModel(_deserialize_double_matrix(ans[0]));
+
+def _test():
+ import doctest
+ globs = globals().copy()
+ globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
+ (failure_count, test_count) = doctest.testmod(globs=globs,
+ optionflags=doctest.ELLIPSIS)
+ globs['sc'].stop()
+ print failure_count,"failures among",test_count,"tests"
+ if failure_count:
+ exit(-1)
+
+if __name__ == "__main__":
+ _test()
[15/28] git commit: Javadocs; also, declare some things private.
Posted by ma...@apache.org.
Javadocs; also, declare some things private.
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/b454fdc2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/b454fdc2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/b454fdc2
Branch: refs/heads/master
Commit: b454fdc2ebc495e4d13162f4bea8cf3e33909463
Parents: 0b494c2
Author: Tor Myklebust <tm...@gmail.com>
Authored: Fri Dec 20 02:10:21 2013 -0500
Committer: Tor Myklebust <tm...@gmail.com>
Committed: Fri Dec 20 02:10:21 2013 -0500
----------------------------------------------------------------------
.../apache/spark/mllib/api/PythonMLLibAPI.scala | 31 ++++++++++++++++----
1 file changed, 26 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/b454fdc2/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala
index bad1f66..6472bf6 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala
@@ -24,8 +24,11 @@ import java.nio.ByteBuffer
import java.nio.ByteOrder
import java.nio.DoubleBuffer
+/**
+ * The Java stubs necessary for the Python mllib bindings.
+ */
class PythonMLLibAPI extends Serializable {
- def deserializeDoubleVector(bytes: Array[Byte]): Array[Double] = {
+ private def deserializeDoubleVector(bytes: Array[Byte]): Array[Double] = {
val packetLength = bytes.length
if (packetLength < 16) {
throw new IllegalArgumentException("Byte array too short.")
@@ -46,7 +49,7 @@ class PythonMLLibAPI extends Serializable {
return ans
}
- def serializeDoubleVector(doubles: Array[Double]): Array[Byte] = {
+ private def serializeDoubleVector(doubles: Array[Double]): Array[Byte] = {
val len = doubles.length
val bytes = new Array[Byte](16 + 8 * len)
val bb = ByteBuffer.wrap(bytes)
@@ -58,7 +61,7 @@ class PythonMLLibAPI extends Serializable {
return bytes
}
- def deserializeDoubleMatrix(bytes: Array[Byte]): Array[Array[Double]] = {
+ private def deserializeDoubleMatrix(bytes: Array[Byte]): Array[Array[Double]] = {
val packetLength = bytes.length
if (packetLength < 24) {
throw new IllegalArgumentException("Byte array too short.")
@@ -84,7 +87,7 @@ class PythonMLLibAPI extends Serializable {
return ans
}
- def serializeDoubleMatrix(doubles: Array[Array[Double]]): Array[Byte] = {
+ private def serializeDoubleMatrix(doubles: Array[Array[Double]]): Array[Byte] = {
val rows = doubles.length
var cols = 0
if (rows > 0) {
@@ -104,7 +107,7 @@ class PythonMLLibAPI extends Serializable {
return bytes
}
- def trainRegressionModel(trainFunc: (RDD[LabeledPoint], Array[Double]) => GeneralizedLinearModel,
+ private def trainRegressionModel(trainFunc: (RDD[LabeledPoint], Array[Double]) => GeneralizedLinearModel,
dataBytesJRDD: JavaRDD[Array[Byte]], initialWeightsBA: Array[Byte]):
java.util.LinkedList[java.lang.Object] = {
val data = dataBytesJRDD.rdd.map(xBytes => {
@@ -119,6 +122,9 @@ class PythonMLLibAPI extends Serializable {
return ret
}
+ /**
+ * Java stub for Python mllib LinearRegressionModel.train()
+ */
def trainLinearRegressionModel(dataBytesJRDD: JavaRDD[Array[Byte]],
numIterations: Int, stepSize: Double, miniBatchFraction: Double,
initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
@@ -128,6 +134,9 @@ class PythonMLLibAPI extends Serializable {
dataBytesJRDD, initialWeightsBA)
}
+ /**
+ * Java stub for Python mllib LassoModel.train()
+ */
def trainLassoModel(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int,
stepSize: Double, regParam: Double, miniBatchFraction: Double,
initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
@@ -137,6 +146,9 @@ class PythonMLLibAPI extends Serializable {
dataBytesJRDD, initialWeightsBA)
}
+ /**
+ * Java stub for Python mllib RidgeRegressionModel.train()
+ */
def trainRidgeModel(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int,
stepSize: Double, regParam: Double, miniBatchFraction: Double,
initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
@@ -146,6 +158,9 @@ class PythonMLLibAPI extends Serializable {
dataBytesJRDD, initialWeightsBA)
}
+ /**
+ * Java stub for Python mllib SVMModel.train()
+ */
def trainSVMModel(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int,
stepSize: Double, regParam: Double, miniBatchFraction: Double,
initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
@@ -155,6 +170,9 @@ class PythonMLLibAPI extends Serializable {
dataBytesJRDD, initialWeightsBA)
}
+ /**
+ * Java stub for Python mllib LogisticRegressionModel.train()
+ */
def trainLogisticRegressionModel(dataBytesJRDD: JavaRDD[Array[Byte]],
numIterations: Int, stepSize: Double, miniBatchFraction: Double,
initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
@@ -164,6 +182,9 @@ class PythonMLLibAPI extends Serializable {
dataBytesJRDD, initialWeightsBA)
}
+ /**
+ * Java stub for Python mllib KMeansModel.train()
+ */
def trainKMeansModel(dataBytesJRDD: JavaRDD[Array[Byte]], k: Int,
maxIterations: Int, runs: Int, initializationMode: String):
java.util.List[java.lang.Object] = {
[06/28] git commit: Python side of python bindings for linear, Lasso,
and ridge regression
Posted by ma...@apache.org.
Python side of python bindings for linear, Lasso, and ridge regression
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/2328bdd0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/2328bdd0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/2328bdd0
Branch: refs/heads/master
Commit: 2328bdd00f701ca3b1bc7fdf8b2968fafc58fd11
Parents: ded67ee
Author: Tor Myklebust <tm...@gmail.com>
Authored: Thu Dec 19 22:45:16 2013 -0500
Committer: Tor Myklebust <tm...@gmail.com>
Committed: Thu Dec 19 22:45:16 2013 -0500
----------------------------------------------------------------------
python/pyspark/__init__.py | 6 ++-
python/pyspark/mllib.py | 81 ++++++++++++++++++++++++++++++++++-------
2 files changed, 72 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2328bdd0/python/pyspark/__init__.py
----------------------------------------------------------------------
diff --git a/python/pyspark/__init__.py b/python/pyspark/__init__.py
index 9f71db3..7c8f914 100644
--- a/python/pyspark/__init__.py
+++ b/python/pyspark/__init__.py
@@ -42,7 +42,9 @@ from pyspark.context import SparkContext
from pyspark.rdd import RDD
from pyspark.files import SparkFiles
from pyspark.storagelevel import StorageLevel
-from pyspark.mllib import LinearRegressionModel
+from pyspark.mllib import LinearRegressionModel, LassoModel, \
+ RidgeRegressionModel
-__all__ = ["SparkContext", "RDD", "SparkFiles", "StorageLevel", "LinearRegressionModel"];
+__all__ = ["SparkContext", "RDD", "SparkFiles", "StorageLevel", \
+ "LinearRegressionModel", "LassoModel", "RidgeRegressionModel"];
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2328bdd0/python/pyspark/mllib.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib.py b/python/pyspark/mllib.py
index 0dfc490..d312787 100644
--- a/python/pyspark/mllib.py
+++ b/python/pyspark/mllib.py
@@ -75,7 +75,7 @@ def _deserialize_double_matrix(ba):
else:
raise TypeError("_deserialize_double_matrix called on a non-bytearray")
-class LinearRegressionModel(object):
+class LinearModel(object):
def __init__(self, coeff, intercept):
self._coeff = coeff
self._intercept = intercept
@@ -83,7 +83,7 @@ class LinearRegressionModel(object):
def predict(self, x):
if (type(x) == ndarray):
if (x.ndim == 1):
- return dot(_coeff, x) - _intercept
+ return dot(_coeff, x) + _intercept
else:
raise RuntimeError("Bulk predict not yet supported.")
elif (type(x) == RDD):
@@ -92,16 +92,71 @@ class LinearRegressionModel(object):
raise TypeError("Bad type argument to "
"LinearRegressionModel::predict")
+# Map a pickled Python RDD of numpy double vectors to a Java RDD of
+# _serialized_double_vectors
+def _get_unmangled_double_vector_rdd(data):
+ dataBytes = data.map(_serialize_double_vector)
+ dataBytes._bypass_serializer = True
+ dataBytes.cache()
+ return dataBytes;
+
+# If we weren't given initial weights, take a zero vector of the appropriate
+# length.
+def _get_initial_weights(initial_weights, data):
+ if initial_weights is None:
+ initial_weights = data.first()
+ if type(initial_weights) != ndarray:
+ raise TypeError("At least one data element has type "
+ + type(initial_weights) + " which is not ndarray")
+ if initial_weights.ndim != 1:
+ raise TypeError("At least one data element has "
+ + initial_weights.ndim + " dimensions, which is not 1")
+ initial_weights = zeros([initial_weights.shape[0] - 1]);
+ return initial_weights;
+
+# train_func should take two parameters, namely data and initial_weights, and
+# return the result of a call to the appropriate JVM stub.
+# _regression_train_wrapper is responsible for setup and error checking.
+def _regression_train_wrapper(sc, train_func, klass, data, initial_weights):
+ initial_weights = _get_initial_weights(initial_weights, data)
+ dataBytes = _get_unmangled_double_vector_rdd(data)
+ ans = train_func(dataBytes, _serialize_double_vector(initial_weights))
+ if len(ans) != 2:
+ raise RuntimeError("JVM call result had unexpected length");
+ elif type(ans[0]) != bytearray:
+ raise RuntimeError("JVM call result had first element of type "
+ + type(ans[0]) + " which is not bytearray");
+ elif type(ans[1]) != float:
+ raise RuntimeError("JVM call result had second element of type "
+ + type(ans[0]) + " which is not float");
+ return klass(_deserialize_double_vector(ans[0]), ans[1]);
+
+class LinearRegressionModel(LinearModel):
@classmethod
- def train(cls, sc, data):
+ def train(cls, sc, data, iterations=100, step=1.0,
+ mini_batch_fraction=1.0, initial_weights=None):
"""Train a linear regression model on the given data."""
- dataBytes = data.map(_serialize_double_vector)
- dataBytes._bypass_serializer = True
- dataBytes.cache()
- api = sc._jvm.PythonMLLibAPI()
- ans = api.trainLinearRegressionModel(dataBytes._jrdd)
- if (len(ans) != 2 or type(ans[0]) != bytearray
- or type(ans[1]) != float):
- raise RuntimeError("train_linear_regression_model received "
- "garbage from JVM")
- return LinearRegressionModel(_deserialize_double_vector(ans[0]), ans[1])
+ return _regression_train_wrapper(sc, lambda d, i:
+ sc._jvm.PythonMLLibAPI().trainLinearRegressionModel(
+ d._jrdd, iterations, step, mini_batch_fraction, i),
+ LinearRegressionModel, data, initial_weights)
+
+class LassoModel(LinearModel):
+ @classmethod
+ def train(cls, sc, data, iterations=100, step=1.0, reg_param=1.0,
+ mini_batch_fraction=1.0, initial_weights=None):
+ """Train a Lasso regression model on the given data."""
+ return _regression_train_wrapper(sc, lambda d, i:
+ sc._jvm.PythonMLLibAPI().trainLassoModel(d._jrdd,
+ iterations, step, reg_param, mini_batch_fraction, i),
+ LassoModel, data, initial_weights)
+
+class RidgeRegressionModel(LinearModel):
+ @classmethod
+ def train(cls, sc, data, iterations=100, step=1.0, reg_param=1.0,
+ mini_batch_fraction=1.0, initial_weights=None):
+ """Train a ridge regression model on the given data."""
+ return _regression_train_wrapper(sc, lambda d, i:
+ sc._jvm.PythonMLLibAPI().trainRidgeModel(d._jrdd,
+ iterations, step, reg_param, mini_batch_fraction, i),
+ RidgeRegressionModel, data, initial_weights)
[22/28] git commit: Remove useless line from test stub.
Posted by ma...@apache.org.
Remove useless line from test stub.
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/86e38c49
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/86e38c49
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/86e38c49
Branch: refs/heads/master
Commit: 86e38c49420098da422a17e7c098efa34c94c35b
Parents: 4efec6e
Author: Tor Myklebust <tm...@gmail.com>
Authored: Tue Dec 24 16:49:31 2013 -0500
Committer: Tor Myklebust <tm...@gmail.com>
Committed: Tue Dec 24 16:49:31 2013 -0500
----------------------------------------------------------------------
python/pyspark/mllib.py | 1 -
1 file changed, 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/86e38c49/python/pyspark/mllib.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib.py b/python/pyspark/mllib.py
index 1f5a5f6..46f368b 100644
--- a/python/pyspark/mllib.py
+++ b/python/pyspark/mllib.py
@@ -384,7 +384,6 @@ def _test():
(failure_count, test_count) = doctest.testmod(globs=globs,
optionflags=doctest.ELLIPSIS)
globs['sc'].stop()
- print failure_count,"failures among",test_count,"tests"
if failure_count:
exit(-1)
[11/28] git commit: Whitespace.
Posted by ma...@apache.org.
Whitespace.
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/d89cc1e2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/d89cc1e2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/d89cc1e2
Branch: refs/heads/master
Commit: d89cc1e28a88a5f943fed096c4bd647f79753c9f
Parents: 319520b
Author: Tor Myklebust <tm...@gmail.com>
Authored: Fri Dec 20 01:50:42 2013 -0500
Committer: Tor Myklebust <tm...@gmail.com>
Committed: Fri Dec 20 01:50:42 2013 -0500
----------------------------------------------------------------------
python/pyspark/mllib.py | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/d89cc1e2/python/pyspark/mllib.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib.py b/python/pyspark/mllib.py
index e7e2216..1877404 100644
--- a/python/pyspark/mllib.py
+++ b/python/pyspark/mllib.py
@@ -276,7 +276,7 @@ class SVMModel(LinearModel):
class KMeansModel(object):
"""A clustering model derived from the k-means method.
- >>> data = array([0.0, 0.0, 1.0,1.0, 9.0,8.0, 8.0,9.0]).reshape(4,2)
+ >>> data = array([0.0,0.0, 1.0,1.0, 9.0,8.0, 8.0,9.0]).reshape(4,2)
>>> clusters = KMeansModel.train(sc, sc.parallelize(data), 2, maxIterations=10, runs=30, initialization_mode="random")
>>> clusters.predict(array([0.0, 0.0])) == clusters.predict(array([1.0, 1.0]))
True
[19/28] git commit: Fix error message ugliness.
Posted by ma...@apache.org.
Fix error message ugliness.
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/2402180b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/2402180b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/2402180b
Branch: refs/heads/master
Commit: 2402180b32d530319d0526490afa3cfafc5c36b8
Parents: cbb2811
Author: Tor Myklebust <tm...@gmail.com>
Authored: Tue Dec 24 16:18:33 2013 -0500
Committer: Tor Myklebust <tm...@gmail.com>
Committed: Tue Dec 24 16:18:33 2013 -0500
----------------------------------------------------------------------
.../main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2402180b/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala
index 4620cab..67ec974 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala
@@ -42,7 +42,7 @@ class PythonMLLibAPI extends Serializable {
}
val length = bb.getLong()
if (packetLength != 16 + 8 * length) {
- throw new IllegalArgumentException("Length " + length + "is wrong.")
+ throw new IllegalArgumentException("Length " + length + " is wrong.")
}
val db = bb.asDoubleBuffer()
val ans = new Array[Double](length.toInt)
@@ -76,7 +76,7 @@ class PythonMLLibAPI extends Serializable {
val rows = bb.getLong()
val cols = bb.getLong()
if (packetLength != 24 + 8 * rows * cols) {
- throw new IllegalArgumentException("Size " + rows + "x" + cols + "is wrong.")
+ throw new IllegalArgumentException("Size " + rows + "x" + cols + " is wrong.")
}
val db = bb.asDoubleBuffer()
val ans = new Array[Array[Double]](rows.toInt)
[08/28] git commit: Python stubs for classification and clustering.
Posted by ma...@apache.org.
Python stubs for classification and clustering.
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/73e17064
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/73e17064
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/73e17064
Branch: refs/heads/master
Commit: 73e17064c60c5aa2297dffbeaae4747890da0115
Parents: f99970e
Author: Tor Myklebust <tm...@gmail.com>
Authored: Fri Dec 20 00:12:48 2013 -0500
Committer: Tor Myklebust <tm...@gmail.com>
Committed: Fri Dec 20 00:12:48 2013 -0500
----------------------------------------------------------------------
python/pyspark/__init__.py | 7 +--
python/pyspark/mllib.py | 105 +++++++++++++++++++++++++++++++++++-----
2 files changed, 96 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/73e17064/python/pyspark/__init__.py
----------------------------------------------------------------------
diff --git a/python/pyspark/__init__.py b/python/pyspark/__init__.py
index 7c8f914..8b5bb79 100644
--- a/python/pyspark/__init__.py
+++ b/python/pyspark/__init__.py
@@ -43,8 +43,9 @@ from pyspark.rdd import RDD
from pyspark.files import SparkFiles
from pyspark.storagelevel import StorageLevel
from pyspark.mllib import LinearRegressionModel, LassoModel, \
- RidgeRegressionModel
+ RidgeRegressionModel, LogisticRegressionModel, SVMModel, KMeansModel
-__all__ = ["SparkContext", "RDD", "SparkFiles", "StorageLevel", \
- "LinearRegressionModel", "LassoModel", "RidgeRegressionModel"];
+__all__ = ["SparkContext", "RDD", "SparkFiles", "StorageLevel",
+ "LinearRegressionModel", "LassoModel", "RidgeRegressionModel",
+ "LogisticRegressionModel", "SVMModel", "KMeansModel"];
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/73e17064/python/pyspark/mllib.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib.py b/python/pyspark/mllib.py
index d312787..21f3c03 100644
--- a/python/pyspark/mllib.py
+++ b/python/pyspark/mllib.py
@@ -75,22 +75,35 @@ def _deserialize_double_matrix(ba):
else:
raise TypeError("_deserialize_double_matrix called on a non-bytearray")
+def _linear_predictor_typecheck(x, coeffs):
+ """Predict the class of the vector x."""
+ if type(x) == ndarray:
+ if x.ndim == 1:
+ if x.shape == coeffs.shape:
+ pass
+ else:
+ raise RuntimeError("Got array of %d elements; wanted %d"
+ % shape(x)[0] % shape(coeffs)[0])
+ else:
+ raise RuntimeError("Bulk predict not yet supported.")
+ elif (type(x) == RDD):
+ raise RuntimeError("Bulk predict not yet supported.")
+ else:
+ raise TypeError("Argument of type " + type(x) + " unsupported");
+
class LinearModel(object):
+ """Something containing a vector of coefficients and an intercept."""
def __init__(self, coeff, intercept):
self._coeff = coeff
self._intercept = intercept
+class LinearRegressionModelBase(LinearModel):
+ """A linear regression model."""
def predict(self, x):
- if (type(x) == ndarray):
- if (x.ndim == 1):
- return dot(_coeff, x) + _intercept
- else:
- raise RuntimeError("Bulk predict not yet supported.")
- elif (type(x) == RDD):
- raise RuntimeError("Bulk predict not yet supported.")
- else:
- raise TypeError("Bad type argument to "
- "LinearRegressionModel::predict")
+ """Predict the value of the dependent variable given a vector x"""
+ """containing values for the independent variables."""
+ _linear_predictor_typecheck(x, _coeff)
+ return dot(_coeff, x) + _intercept
# Map a pickled Python RDD of numpy double vectors to a Java RDD of
# _serialized_double_vectors
@@ -131,7 +144,8 @@ def _regression_train_wrapper(sc, train_func, klass, data, initial_weights):
+ type(ans[0]) + " which is not float");
return klass(_deserialize_double_vector(ans[0]), ans[1]);
-class LinearRegressionModel(LinearModel):
+class LinearRegressionModel(LinearRegressionModelBase):
+ """A linear regression model derived from a least-squares fit."""
@classmethod
def train(cls, sc, data, iterations=100, step=1.0,
mini_batch_fraction=1.0, initial_weights=None):
@@ -141,7 +155,9 @@ class LinearRegressionModel(LinearModel):
d._jrdd, iterations, step, mini_batch_fraction, i),
LinearRegressionModel, data, initial_weights)
-class LassoModel(LinearModel):
+class LassoModel(LinearRegressionModelBase):
+ """A linear regression model derived from a least-squares fit with an """
+ """l_1 penalty term."""
@classmethod
def train(cls, sc, data, iterations=100, step=1.0, reg_param=1.0,
mini_batch_fraction=1.0, initial_weights=None):
@@ -151,7 +167,9 @@ class LassoModel(LinearModel):
iterations, step, reg_param, mini_batch_fraction, i),
LassoModel, data, initial_weights)
-class RidgeRegressionModel(LinearModel):
+class RidgeRegressionModel(LinearRegressionModelBase):
+ """A linear regression model derived from a least-squares fit with an """
+ """l_2 penalty term."""
@classmethod
def train(cls, sc, data, iterations=100, step=1.0, reg_param=1.0,
mini_batch_fraction=1.0, initial_weights=None):
@@ -160,3 +178,64 @@ class RidgeRegressionModel(LinearModel):
sc._jvm.PythonMLLibAPI().trainRidgeModel(d._jrdd,
iterations, step, reg_param, mini_batch_fraction, i),
RidgeRegressionModel, data, initial_weights)
+
+class LogisticRegressionModel(LinearModel):
+ """A linear binary classification model derived from logistic regression."""
+ def predict(self, x):
+ _linear_predictor_typecheck(x, _coeff)
+ margin = dot(x, _coeff) + intercept
+ prob = 1/(1 + exp(-margin))
+ return 1 if prob > 0.5 else 0
+
+ @classmethod
+ def train(cls, sc, data, iterations=100, step=1.0,
+ mini_batch_fraction=1.0, initial_weights=None):
+ """Train a logistic regression model on the given data."""
+ return _regression_train_wrapper(sc, lambda d, i:
+ sc._jvm.PythonMLLibAPI().trainLogisticRegressionModel(d._jrdd,
+ iterations, step, mini_batch_fraction, i),
+ LogisticRegressionModel, data, initial_weights)
+
+class SVMModel(LinearModel):
+ """A support vector machine."""
+ def predict(self, x):
+ _linear_predictor_typecheck(x, _coeff)
+ margin = dot(x, _coeff) + intercept
+ return 1 if margin >= 0 else 0
+ @classmethod
+ def train(cls, sc, data, iterations=100, step=1.0, reg_param=1.0,
+ mini_batch_fraction=1.0, initial_weights=None):
+ """Train a support vector machine on the given data."""
+ return _regression_train_wrapper(sc, lambda d, i:
+ sc._jvm.PythonMLLibAPI().trainSVMModel(d._jrdd,
+ iterations, step, reg_param, mini_batch_fraction, i),
+ SVMModel, data, initial_weights)
+
+class KMeansModel(object):
+ """A clustering model derived from the k-means method."""
+ def __init__(self, centers_):
+ self.centers = centers_
+
+ def predict(self, x):
+ best = 0
+ best_distance = 1e75
+ for i in range(0, centers.shape[0]):
+ diff = x - centers[i]
+ distance = sqrt(dot(diff, diff))
+ if distance < best_distance:
+ best = i
+ best_distance = distance
+ return best
+
+ @classmethod
+ def train(cls, sc, data, k, maxIterations = 100, runs = 1,
+ initialization_mode="k-means||"):
+ dataBytes = _get_unmangled_double_vector_rdd(data)
+ ans = sc._jvm.PythonMLLibAPI().trainKMeansModel(dataBytes._jrdd,
+ k, maxIterations, runs, initialization_mode)
+ if len(ans) != 1:
+ raise RuntimeError("JVM call result had unexpected length");
+ elif type(ans[0]) != bytearray:
+ raise RuntimeError("JVM call result had first element of type "
+ + type(ans[0]) + " which is not bytearray");
+ return KMeansModel(_deserialize_double_matrix(ans[0]));
[21/28] git commit: Python change for move of PythonMLLibAPI.
Posted by ma...@apache.org.
Python change for move of PythonMLLibAPI.
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/4efec6eb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/4efec6eb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/4efec6eb
Branch: refs/heads/master
Commit: 4efec6eb941c1c6cdec884174ea98c040a277cde
Parents: 58e2a7d
Author: Tor Myklebust <tm...@gmail.com>
Authored: Tue Dec 24 16:49:03 2013 -0500
Committer: Tor Myklebust <tm...@gmail.com>
Committed: Tue Dec 24 16:49:03 2013 -0500
----------------------------------------------------------------------
python/pyspark/java_gateway.py | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/4efec6eb/python/pyspark/java_gateway.py
----------------------------------------------------------------------
diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py
index 2941984..eb79135 100644
--- a/python/pyspark/java_gateway.py
+++ b/python/pyspark/java_gateway.py
@@ -62,6 +62,6 @@ def launch_gateway():
# Import the classes used by PySpark
java_import(gateway.jvm, "org.apache.spark.api.java.*")
java_import(gateway.jvm, "org.apache.spark.api.python.*")
- java_import(gateway.jvm, "org.apache.spark.mllib.api.*")
+ java_import(gateway.jvm, "org.apache.spark.mllib.api.python.*")
java_import(gateway.jvm, "scala.Tuple2")
return gateway
[20/28] git commit: Move PythonMLLibAPI into its own package.
Posted by ma...@apache.org.
Move PythonMLLibAPI into its own package.
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/58e2a7d6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/58e2a7d6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/58e2a7d6
Branch: refs/heads/master
Commit: 58e2a7d6d4f036b20896674b1cac076d8daa55e8
Parents: 2402180
Author: Tor Myklebust <tm...@gmail.com>
Authored: Tue Dec 24 16:48:40 2013 -0500
Committer: Tor Myklebust <tm...@gmail.com>
Committed: Tue Dec 24 16:48:40 2013 -0500
----------------------------------------------------------------------
.../apache/spark/mllib/api/PythonMLLibAPI.scala | 231 ------------------
.../spark/mllib/api/python/PythonMLLibAPI.scala | 232 +++++++++++++++++++
2 files changed, 232 insertions(+), 231 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/58e2a7d6/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala
deleted file mode 100644
index 67ec974..0000000
--- a/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala
+++ /dev/null
@@ -1,231 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import org.apache.spark.api.java.JavaRDD
-import org.apache.spark.mllib.regression._
-import org.apache.spark.mllib.classification._
-import org.apache.spark.mllib.clustering._
-import org.apache.spark.mllib.recommendation._
-import org.apache.spark.rdd.RDD
-import java.nio.ByteBuffer
-import java.nio.ByteOrder
-import java.nio.DoubleBuffer
-
-/**
- * The Java stubs necessary for the Python mllib bindings.
- */
-class PythonMLLibAPI extends Serializable {
- private def deserializeDoubleVector(bytes: Array[Byte]): Array[Double] = {
- val packetLength = bytes.length
- if (packetLength < 16) {
- throw new IllegalArgumentException("Byte array too short.")
- }
- val bb = ByteBuffer.wrap(bytes)
- bb.order(ByteOrder.nativeOrder())
- val magic = bb.getLong()
- if (magic != 1) {
- throw new IllegalArgumentException("Magic " + magic + " is wrong.")
- }
- val length = bb.getLong()
- if (packetLength != 16 + 8 * length) {
- throw new IllegalArgumentException("Length " + length + " is wrong.")
- }
- val db = bb.asDoubleBuffer()
- val ans = new Array[Double](length.toInt)
- db.get(ans)
- return ans
- }
-
- private def serializeDoubleVector(doubles: Array[Double]): Array[Byte] = {
- val len = doubles.length
- val bytes = new Array[Byte](16 + 8 * len)
- val bb = ByteBuffer.wrap(bytes)
- bb.order(ByteOrder.nativeOrder())
- bb.putLong(1)
- bb.putLong(len)
- val db = bb.asDoubleBuffer()
- db.put(doubles)
- return bytes
- }
-
- private def deserializeDoubleMatrix(bytes: Array[Byte]): Array[Array[Double]] = {
- val packetLength = bytes.length
- if (packetLength < 24) {
- throw new IllegalArgumentException("Byte array too short.")
- }
- val bb = ByteBuffer.wrap(bytes)
- bb.order(ByteOrder.nativeOrder())
- val magic = bb.getLong()
- if (magic != 2) {
- throw new IllegalArgumentException("Magic " + magic + " is wrong.")
- }
- val rows = bb.getLong()
- val cols = bb.getLong()
- if (packetLength != 24 + 8 * rows * cols) {
- throw new IllegalArgumentException("Size " + rows + "x" + cols + " is wrong.")
- }
- val db = bb.asDoubleBuffer()
- val ans = new Array[Array[Double]](rows.toInt)
- var i = 0
- for (i <- 0 until rows.toInt) {
- ans(i) = new Array[Double](cols.toInt)
- db.get(ans(i))
- }
- return ans
- }
-
- private def serializeDoubleMatrix(doubles: Array[Array[Double]]): Array[Byte] = {
- val rows = doubles.length
- var cols = 0
- if (rows > 0) {
- cols = doubles(0).length
- }
- val bytes = new Array[Byte](24 + 8 * rows * cols)
- val bb = ByteBuffer.wrap(bytes)
- bb.order(ByteOrder.nativeOrder())
- bb.putLong(2)
- bb.putLong(rows)
- bb.putLong(cols)
- val db = bb.asDoubleBuffer()
- var i = 0
- for (i <- 0 until rows) {
- db.put(doubles(i))
- }
- return bytes
- }
-
- private def trainRegressionModel(trainFunc: (RDD[LabeledPoint], Array[Double]) => GeneralizedLinearModel,
- dataBytesJRDD: JavaRDD[Array[Byte]], initialWeightsBA: Array[Byte]):
- java.util.LinkedList[java.lang.Object] = {
- val data = dataBytesJRDD.rdd.map(xBytes => {
- val x = deserializeDoubleVector(xBytes)
- LabeledPoint(x(0), x.slice(1, x.length))
- })
- val initialWeights = deserializeDoubleVector(initialWeightsBA)
- val model = trainFunc(data, initialWeights)
- val ret = new java.util.LinkedList[java.lang.Object]()
- ret.add(serializeDoubleVector(model.weights))
- ret.add(model.intercept: java.lang.Double)
- return ret
- }
-
- /**
- * Java stub for Python mllib LinearRegressionModel.train()
- */
- def trainLinearRegressionModel(dataBytesJRDD: JavaRDD[Array[Byte]],
- numIterations: Int, stepSize: Double, miniBatchFraction: Double,
- initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
- return trainRegressionModel((data, initialWeights) =>
- LinearRegressionWithSGD.train(data, numIterations, stepSize,
- miniBatchFraction, initialWeights),
- dataBytesJRDD, initialWeightsBA)
- }
-
- /**
- * Java stub for Python mllib LassoModel.train()
- */
- def trainLassoModel(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int,
- stepSize: Double, regParam: Double, miniBatchFraction: Double,
- initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
- return trainRegressionModel((data, initialWeights) =>
- LassoWithSGD.train(data, numIterations, stepSize, regParam,
- miniBatchFraction, initialWeights),
- dataBytesJRDD, initialWeightsBA)
- }
-
- /**
- * Java stub for Python mllib RidgeRegressionModel.train()
- */
- def trainRidgeModel(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int,
- stepSize: Double, regParam: Double, miniBatchFraction: Double,
- initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
- return trainRegressionModel((data, initialWeights) =>
- RidgeRegressionWithSGD.train(data, numIterations, stepSize, regParam,
- miniBatchFraction, initialWeights),
- dataBytesJRDD, initialWeightsBA)
- }
-
- /**
- * Java stub for Python mllib SVMModel.train()
- */
- def trainSVMModel(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int,
- stepSize: Double, regParam: Double, miniBatchFraction: Double,
- initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
- return trainRegressionModel((data, initialWeights) =>
- SVMWithSGD.train(data, numIterations, stepSize, regParam,
- miniBatchFraction, initialWeights),
- dataBytesJRDD, initialWeightsBA)
- }
-
- /**
- * Java stub for Python mllib LogisticRegressionModel.train()
- */
- def trainLogisticRegressionModel(dataBytesJRDD: JavaRDD[Array[Byte]],
- numIterations: Int, stepSize: Double, miniBatchFraction: Double,
- initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
- return trainRegressionModel((data, initialWeights) =>
- LogisticRegressionWithSGD.train(data, numIterations, stepSize,
- miniBatchFraction, initialWeights),
- dataBytesJRDD, initialWeightsBA)
- }
-
- /**
- * Java stub for Python mllib KMeansModel.train()
- */
- def trainKMeansModel(dataBytesJRDD: JavaRDD[Array[Byte]], k: Int,
- maxIterations: Int, runs: Int, initializationMode: String):
- java.util.List[java.lang.Object] = {
- val data = dataBytesJRDD.rdd.map(xBytes => deserializeDoubleVector(xBytes))
- val model = KMeans.train(data, k, maxIterations, runs, initializationMode)
- val ret = new java.util.LinkedList[java.lang.Object]()
- ret.add(serializeDoubleMatrix(model.clusterCenters))
- return ret
- }
-
- private def unpackRating(ratingBytes: Array[Byte]): Rating = {
- val bb = ByteBuffer.wrap(ratingBytes)
- bb.order(ByteOrder.nativeOrder())
- val user = bb.getInt()
- val product = bb.getInt()
- val rating = bb.getDouble()
- return new Rating(user, product, rating)
- }
-
- /**
- * Java stub for Python mllib ALSModel.train(). This stub returns a handle
- * to the Java object instead of the content of the Java object. Extra care
- * needs to be taken in the Python code to ensure it gets freed on exit; see
- * the Py4J documentation.
- */
- def trainALSModel(ratingsBytesJRDD: JavaRDD[Array[Byte]], rank: Int,
- iterations: Int, lambda: Double, blocks: Int): MatrixFactorizationModel = {
- val ratings = ratingsBytesJRDD.rdd.map(unpackRating)
- return ALS.train(ratings, rank, iterations, lambda, blocks)
- }
-
- /**
- * Java stub for Python mllib ALSModel.trainImplicit(). This stub returns a
- * handle to the Java object instead of the content of the Java object.
- * Extra care needs to be taken in the Python code to ensure it gets freed on
- * exit; see the Py4J documentation.
- */
- def trainImplicitALSModel(ratingsBytesJRDD: JavaRDD[Array[Byte]], rank: Int,
- iterations: Int, lambda: Double, blocks: Int, alpha: Double): MatrixFactorizationModel = {
- val ratings = ratingsBytesJRDD.rdd.map(unpackRating)
- return ALS.trainImplicit(ratings, rank, iterations, lambda, blocks, alpha)
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/58e2a7d6/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
new file mode 100644
index 0000000..ca47432
--- /dev/null
+++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.api.python
+import org.apache.spark.api.java.JavaRDD
+import org.apache.spark.mllib.regression._
+import org.apache.spark.mllib.classification._
+import org.apache.spark.mllib.clustering._
+import org.apache.spark.mllib.recommendation._
+import org.apache.spark.rdd.RDD
+import java.nio.ByteBuffer
+import java.nio.ByteOrder
+import java.nio.DoubleBuffer
+
+/**
+ * The Java stubs necessary for the Python mllib bindings.
+ */
+class PythonMLLibAPI extends Serializable {
+ private def deserializeDoubleVector(bytes: Array[Byte]): Array[Double] = {
+ val packetLength = bytes.length
+ if (packetLength < 16) {
+ throw new IllegalArgumentException("Byte array too short.")
+ }
+ val bb = ByteBuffer.wrap(bytes)
+ bb.order(ByteOrder.nativeOrder())
+ val magic = bb.getLong()
+ if (magic != 1) {
+ throw new IllegalArgumentException("Magic " + magic + " is wrong.")
+ }
+ val length = bb.getLong()
+ if (packetLength != 16 + 8 * length) {
+ throw new IllegalArgumentException("Length " + length + " is wrong.")
+ }
+ val db = bb.asDoubleBuffer()
+ val ans = new Array[Double](length.toInt)
+ db.get(ans)
+ return ans
+ }
+
+ private def serializeDoubleVector(doubles: Array[Double]): Array[Byte] = {
+ val len = doubles.length
+ val bytes = new Array[Byte](16 + 8 * len)
+ val bb = ByteBuffer.wrap(bytes)
+ bb.order(ByteOrder.nativeOrder())
+ bb.putLong(1)
+ bb.putLong(len)
+ val db = bb.asDoubleBuffer()
+ db.put(doubles)
+ return bytes
+ }
+
+ private def deserializeDoubleMatrix(bytes: Array[Byte]): Array[Array[Double]] = {
+ val packetLength = bytes.length
+ if (packetLength < 24) {
+ throw new IllegalArgumentException("Byte array too short.")
+ }
+ val bb = ByteBuffer.wrap(bytes)
+ bb.order(ByteOrder.nativeOrder())
+ val magic = bb.getLong()
+ if (magic != 2) {
+ throw new IllegalArgumentException("Magic " + magic + " is wrong.")
+ }
+ val rows = bb.getLong()
+ val cols = bb.getLong()
+ if (packetLength != 24 + 8 * rows * cols) {
+ throw new IllegalArgumentException("Size " + rows + "x" + cols + " is wrong.")
+ }
+ val db = bb.asDoubleBuffer()
+ val ans = new Array[Array[Double]](rows.toInt)
+ var i = 0
+ for (i <- 0 until rows.toInt) {
+ ans(i) = new Array[Double](cols.toInt)
+ db.get(ans(i))
+ }
+ return ans
+ }
+
+ private def serializeDoubleMatrix(doubles: Array[Array[Double]]): Array[Byte] = {
+ val rows = doubles.length
+ var cols = 0
+ if (rows > 0) {
+ cols = doubles(0).length
+ }
+ val bytes = new Array[Byte](24 + 8 * rows * cols)
+ val bb = ByteBuffer.wrap(bytes)
+ bb.order(ByteOrder.nativeOrder())
+ bb.putLong(2)
+ bb.putLong(rows)
+ bb.putLong(cols)
+ val db = bb.asDoubleBuffer()
+ var i = 0
+ for (i <- 0 until rows) {
+ db.put(doubles(i))
+ }
+ return bytes
+ }
+
+ private def trainRegressionModel(trainFunc: (RDD[LabeledPoint], Array[Double]) => GeneralizedLinearModel,
+ dataBytesJRDD: JavaRDD[Array[Byte]], initialWeightsBA: Array[Byte]):
+ java.util.LinkedList[java.lang.Object] = {
+ val data = dataBytesJRDD.rdd.map(xBytes => {
+ val x = deserializeDoubleVector(xBytes)
+ LabeledPoint(x(0), x.slice(1, x.length))
+ })
+ val initialWeights = deserializeDoubleVector(initialWeightsBA)
+ val model = trainFunc(data, initialWeights)
+ val ret = new java.util.LinkedList[java.lang.Object]()
+ ret.add(serializeDoubleVector(model.weights))
+ ret.add(model.intercept: java.lang.Double)
+ return ret
+ }
+
+ /**
+ * Java stub for Python mllib LinearRegressionModel.train()
+ */
+ def trainLinearRegressionModel(dataBytesJRDD: JavaRDD[Array[Byte]],
+ numIterations: Int, stepSize: Double, miniBatchFraction: Double,
+ initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
+ return trainRegressionModel((data, initialWeights) =>
+ LinearRegressionWithSGD.train(data, numIterations, stepSize,
+ miniBatchFraction, initialWeights),
+ dataBytesJRDD, initialWeightsBA)
+ }
+
+ /**
+ * Java stub for Python mllib LassoModel.train()
+ */
+ def trainLassoModel(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int,
+ stepSize: Double, regParam: Double, miniBatchFraction: Double,
+ initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
+ return trainRegressionModel((data, initialWeights) =>
+ LassoWithSGD.train(data, numIterations, stepSize, regParam,
+ miniBatchFraction, initialWeights),
+ dataBytesJRDD, initialWeightsBA)
+ }
+
+ /**
+ * Java stub for Python mllib RidgeRegressionModel.train()
+ */
+ def trainRidgeModel(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int,
+ stepSize: Double, regParam: Double, miniBatchFraction: Double,
+ initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
+ return trainRegressionModel((data, initialWeights) =>
+ RidgeRegressionWithSGD.train(data, numIterations, stepSize, regParam,
+ miniBatchFraction, initialWeights),
+ dataBytesJRDD, initialWeightsBA)
+ }
+
+ /**
+ * Java stub for Python mllib SVMModel.train()
+ */
+ def trainSVMModel(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int,
+ stepSize: Double, regParam: Double, miniBatchFraction: Double,
+ initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
+ return trainRegressionModel((data, initialWeights) =>
+ SVMWithSGD.train(data, numIterations, stepSize, regParam,
+ miniBatchFraction, initialWeights),
+ dataBytesJRDD, initialWeightsBA)
+ }
+
+ /**
+ * Java stub for Python mllib LogisticRegressionModel.train()
+ */
+ def trainLogisticRegressionModel(dataBytesJRDD: JavaRDD[Array[Byte]],
+ numIterations: Int, stepSize: Double, miniBatchFraction: Double,
+ initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
+ return trainRegressionModel((data, initialWeights) =>
+ LogisticRegressionWithSGD.train(data, numIterations, stepSize,
+ miniBatchFraction, initialWeights),
+ dataBytesJRDD, initialWeightsBA)
+ }
+
+ /**
+ * Java stub for Python mllib KMeansModel.train()
+ */
+ def trainKMeansModel(dataBytesJRDD: JavaRDD[Array[Byte]], k: Int,
+ maxIterations: Int, runs: Int, initializationMode: String):
+ java.util.List[java.lang.Object] = {
+ val data = dataBytesJRDD.rdd.map(xBytes => deserializeDoubleVector(xBytes))
+ val model = KMeans.train(data, k, maxIterations, runs, initializationMode)
+ val ret = new java.util.LinkedList[java.lang.Object]()
+ ret.add(serializeDoubleMatrix(model.clusterCenters))
+ return ret
+ }
+
+ private def unpackRating(ratingBytes: Array[Byte]): Rating = {
+ val bb = ByteBuffer.wrap(ratingBytes)
+ bb.order(ByteOrder.nativeOrder())
+ val user = bb.getInt()
+ val product = bb.getInt()
+ val rating = bb.getDouble()
+ return new Rating(user, product, rating)
+ }
+
+ /**
+ * Java stub for Python mllib ALSModel.train(). This stub returns a handle
+ * to the Java object instead of the content of the Java object. Extra care
+ * needs to be taken in the Python code to ensure it gets freed on exit; see
+ * the Py4J documentation.
+ */
+ def trainALSModel(ratingsBytesJRDD: JavaRDD[Array[Byte]], rank: Int,
+ iterations: Int, lambda: Double, blocks: Int): MatrixFactorizationModel = {
+ val ratings = ratingsBytesJRDD.rdd.map(unpackRating)
+ return ALS.train(ratings, rank, iterations, lambda, blocks)
+ }
+
+ /**
+ * Java stub for Python mllib ALSModel.trainImplicit(). This stub returns a
+ * handle to the Java object instead of the content of the Java object.
+ * Extra care needs to be taken in the Python code to ensure it gets freed on
+ * exit; see the Py4J documentation.
+ */
+ def trainImplicitALSModel(ratingsBytesJRDD: JavaRDD[Array[Byte]], rank: Int,
+ iterations: Int, lambda: Double, blocks: Int, alpha: Double): MatrixFactorizationModel = {
+ val ratings = ratingsBytesJRDD.rdd.map(unpackRating)
+ return ALS.trainImplicit(ratings, rank, iterations, lambda, blocks, alpha)
+ }
+}
[25/28] git commit: Initial weights in Scala are ones;
do that too. Also fix some errors.
Posted by ma...@apache.org.
Initial weights in Scala are ones; do that too. Also fix some errors.
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/02208a17
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/02208a17
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/02208a17
Branch: refs/heads/master
Commit: 02208a175c76be111eeb66dc19c7499a6656a067
Parents: 4e82139
Author: Tor Myklebust <tm...@gmail.com>
Authored: Wed Dec 25 00:53:48 2013 -0500
Committer: Tor Myklebust <tm...@gmail.com>
Committed: Wed Dec 25 00:53:48 2013 -0500
----------------------------------------------------------------------
python/pyspark/mllib/_common.py | 12 ++++++------
1 file changed, 6 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/02208a17/python/pyspark/mllib/_common.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/_common.py b/python/pyspark/mllib/_common.py
index e68bd8a..e74ba0f 100644
--- a/python/pyspark/mllib/_common.py
+++ b/python/pyspark/mllib/_common.py
@@ -15,7 +15,7 @@
# limitations under the License.
#
-from numpy import ndarray, copyto, float64, int64, int32, zeros, array_equal, array, dot, shape
+from numpy import ndarray, copyto, float64, int64, int32, ones, array_equal, array, dot, shape
from pyspark import SparkContext
# Double vector format:
@@ -143,7 +143,7 @@ def _linear_predictor_typecheck(x, coeffs):
elif (type(x) == RDD):
raise RuntimeError("Bulk predict not yet supported.")
else:
- raise TypeError("Argument of type " + type(x) + " unsupported")
+ raise TypeError("Argument of type " + type(x).__name__ + " unsupported")
def _get_unmangled_rdd(data, serializer):
dataBytes = data.map(serializer)
@@ -182,11 +182,11 @@ def _get_initial_weights(initial_weights, data):
initial_weights = data.first()
if type(initial_weights) != ndarray:
raise TypeError("At least one data element has type "
- + type(initial_weights) + " which is not ndarray")
+ + type(initial_weights).__name__ + " which is not ndarray")
if initial_weights.ndim != 1:
raise TypeError("At least one data element has "
+ initial_weights.ndim + " dimensions, which is not 1")
- initial_weights = zeros([initial_weights.shape[0] - 1])
+ initial_weights = ones([initial_weights.shape[0] - 1])
return initial_weights
# train_func should take two parameters, namely data and initial_weights, and
@@ -200,10 +200,10 @@ def _regression_train_wrapper(sc, train_func, klass, data, initial_weights):
raise RuntimeError("JVM call result had unexpected length")
elif type(ans[0]) != bytearray:
raise RuntimeError("JVM call result had first element of type "
- + type(ans[0]) + " which is not bytearray")
+ + type(ans[0]).__name__ + " which is not bytearray")
elif type(ans[1]) != float:
raise RuntimeError("JVM call result had second element of type "
- + type(ans[0]) + " which is not float")
+ + type(ans[0]).__name__ + " which is not float")
return klass(_deserialize_double_vector(ans[0]), ans[1])
def _serialize_rating(r):
[16/28] git commit: Python stubs for ALSModel.
Posted by ma...@apache.org.
Python stubs for ALSModel.
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/076fc162
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/076fc162
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/076fc162
Branch: refs/heads/master
Commit: 076fc1622190d342e20592c00ca19f8c0a56997f
Parents: b454fdc
Author: Tor Myklebust <tm...@gmail.com>
Authored: Sat Dec 21 14:54:01 2013 -0500
Committer: Tor Myklebust <tm...@gmail.com>
Committed: Sat Dec 21 14:54:01 2013 -0500
----------------------------------------------------------------------
python/pyspark/__init__.py | 5 ++--
python/pyspark/mllib.py | 59 ++++++++++++++++++++++++++++++++++++-----
2 files changed, 56 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/076fc162/python/pyspark/__init__.py
----------------------------------------------------------------------
diff --git a/python/pyspark/__init__.py b/python/pyspark/__init__.py
index 8b5bb79..3d73d95 100644
--- a/python/pyspark/__init__.py
+++ b/python/pyspark/__init__.py
@@ -43,9 +43,10 @@ from pyspark.rdd import RDD
from pyspark.files import SparkFiles
from pyspark.storagelevel import StorageLevel
from pyspark.mllib import LinearRegressionModel, LassoModel, \
- RidgeRegressionModel, LogisticRegressionModel, SVMModel, KMeansModel
+ RidgeRegressionModel, LogisticRegressionModel, SVMModel, KMeansModel, \
+ ALSModel
__all__ = ["SparkContext", "RDD", "SparkFiles", "StorageLevel",
"LinearRegressionModel", "LassoModel", "RidgeRegressionModel",
- "LogisticRegressionModel", "SVMModel", "KMeansModel"];
+ "LogisticRegressionModel", "SVMModel", "KMeansModel", "ALSModel"];
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/076fc162/python/pyspark/mllib.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib.py b/python/pyspark/mllib.py
index 8848284..22187eb 100644
--- a/python/pyspark/mllib.py
+++ b/python/pyspark/mllib.py
@@ -164,14 +164,17 @@ class LinearRegressionModelBase(LinearModel):
_linear_predictor_typecheck(x, self._coeff)
return dot(self._coeff, x) + self._intercept
-# Map a pickled Python RDD of numpy double vectors to a Java RDD of
-# _serialized_double_vectors
-def _get_unmangled_double_vector_rdd(data):
- dataBytes = data.map(_serialize_double_vector)
+def _get_unmangled_rdd(data, serializer):
+ dataBytes = data.map(serializer)
dataBytes._bypass_serializer = True
dataBytes.cache()
return dataBytes
+# Map a pickled Python RDD of numpy double vectors to a Java RDD of
+# _serialized_double_vectors
+def _get_unmangled_double_vector_rdd(data):
+ return _get_unmangled_rdd(data, _serialize_double_vector)
+
# If we weren't given initial weights, take a zero vector of the appropriate
# length.
def _get_initial_weights(initial_weights, data):
@@ -317,7 +320,7 @@ class KMeansModel(object):
return best
@classmethod
- def train(cls, sc, data, k, maxIterations = 100, runs = 1,
+ def train(cls, sc, data, k, maxIterations=100, runs=1,
initialization_mode="k-means||"):
"""Train a k-means clustering model."""
dataBytes = _get_unmangled_double_vector_rdd(data)
@@ -330,12 +333,56 @@ class KMeansModel(object):
+ type(ans[0]) + " which is not bytearray")
return KMeansModel(_deserialize_double_matrix(ans[0]))
+def _serialize_rating(r):
+ ba = bytearray(16)
+ intpart = ndarray(shape=[2], buffer=ba, dtype=int32)
+ doublepart = ndarray(shape=[1], buffer=ba, dtype=float64, offset=8)
+ intpart[0], intpart[1], doublepart[0] = r
+ return ba
+
+class ALSModel(object):
+ """A matrix factorisation model trained by regularized alternating
+ least-squares.
+
+ >>> r1 = (1, 1, 1.0)
+ >>> r2 = (1, 2, 2.0)
+ >>> r3 = (2, 1, 2.0)
+ >>> ratings = sc.parallelize([r1, r2, r3])
+ >>> model = ALSModel.trainImplicit(sc, ratings, 1)
+ >>> model.predict(2,2) is not None
+ True
+ """
+
+ def __init__(self, sc, java_model):
+ self._context = sc
+ self._java_model = java_model
+
+ #def __del__(self):
+ #self._gateway.detach(self._java_model)
+
+ def predict(self, user, product):
+ return self._java_model.predict(user, product)
+
+ @classmethod
+ def train(cls, sc, ratings, rank, iterations=5, lambda_=0.01, blocks=-1):
+ ratingBytes = _get_unmangled_rdd(ratings, _serialize_rating)
+ mod = sc._jvm.PythonMLLibAPI().trainALSModel(ratingBytes._jrdd,
+ rank, iterations, lambda_, blocks)
+ return ALSModel(sc, mod)
+
+ @classmethod
+ def trainImplicit(cls, sc, ratings, rank, iterations=5, lambda_=0.01, blocks=-1, alpha=0.01):
+ ratingBytes = _get_unmangled_rdd(ratings, _serialize_rating)
+ mod = sc._jvm.PythonMLLibAPI().trainImplicitALSModel(ratingBytes._jrdd,
+ rank, iterations, lambda_, blocks, alpha)
+ return ALSModel(sc, mod)
+
def _test():
import doctest
globs = globals().copy()
globs['sc'] = SparkContext('local[4]', 'PythonTest', batchSize=2)
(failure_count, test_count) = doctest.testmod(globs=globs,
- optionflags=doctest.ELLIPSIS)
+ optionflags=doctest.ELLIPSIS)
globs['sc'].stop()
print failure_count,"failures among",test_count,"tests"
if failure_count:
[27/28] git commit: Remove commented code in __init__.py.
Posted by ma...@apache.org.
Remove commented code in __init__.py.
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/9cbcf814
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/9cbcf814
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/9cbcf814
Branch: refs/heads/master
Commit: 9cbcf81453a9afca58645969c1bc3ff366392734
Parents: 5e71354
Author: Tor Myklebust <tm...@gmail.com>
Authored: Wed Dec 25 14:12:42 2013 -0500
Committer: Tor Myklebust <tm...@gmail.com>
Committed: Wed Dec 25 14:12:42 2013 -0500
----------------------------------------------------------------------
python/pyspark/mllib/__init__.py | 8 --------
1 file changed, 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/9cbcf814/python/pyspark/mllib/__init__.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/__init__.py b/python/pyspark/mllib/__init__.py
index e9c62f3..b1a5df1 100644
--- a/python/pyspark/mllib/__init__.py
+++ b/python/pyspark/mllib/__init__.py
@@ -18,11 +18,3 @@
"""
Python bindings for MLlib.
"""
-
-#from pyspark.mllib.regression import LinearRegressionModel, LassoModel, RidgeRegressionModel, LinearRegressionWithSGD, LassoWithSGD, RidgeRegressionWithSGD
-#from pyspark.mllib.classification import LogisticRegressionModel, SVMModel, LogisticRegressionWithSGD, SVMWithSGD
-#from pyspark.mllib.recommendation import MatrixFactorizationModel, ALS
-#from pyspark.mllib.clustering import KMeansModel, KMeans
-#
-#
-#__all__ = ["LinearRegressionModel", "LassoModel", "RidgeRegressionModel", "LinearRegressionWithSGD", "LassoWithSGD", "RidgeRegressionWithSGD", "LogisticRegressionModel", "SVMModel", "LogisticRegressionWithSGD", "SVMWithSGD", "MatrixFactorizationModel", "ALS", "KMeansModel", "KMeans"]
[17/28] git commit: Java stubs for ALSModel.
Posted by ma...@apache.org.
Java stubs for ALSModel.
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/20f85eca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/20f85eca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/20f85eca
Branch: refs/heads/master
Commit: 20f85eca3d924aecd0fcf61cd516d9ac8e369dc1
Parents: 076fc16
Author: Tor Myklebust <tm...@gmail.com>
Authored: Sat Dec 21 14:54:13 2013 -0500
Committer: Tor Myklebust <tm...@gmail.com>
Committed: Sat Dec 21 14:54:13 2013 -0500
----------------------------------------------------------------------
.../apache/spark/mllib/api/PythonMLLibAPI.scala | 34 ++++++++++++++++++++
1 file changed, 34 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/20f85eca/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala
index 6472bf6..4620cab 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala
@@ -19,6 +19,7 @@ import org.apache.spark.api.java.JavaRDD
import org.apache.spark.mllib.regression._
import org.apache.spark.mllib.classification._
import org.apache.spark.mllib.clustering._
+import org.apache.spark.mllib.recommendation._
import org.apache.spark.rdd.RDD
import java.nio.ByteBuffer
import java.nio.ByteOrder
@@ -194,4 +195,37 @@ class PythonMLLibAPI extends Serializable {
ret.add(serializeDoubleMatrix(model.clusterCenters))
return ret
}
+
+ private def unpackRating(ratingBytes: Array[Byte]): Rating = {
+ val bb = ByteBuffer.wrap(ratingBytes)
+ bb.order(ByteOrder.nativeOrder())
+ val user = bb.getInt()
+ val product = bb.getInt()
+ val rating = bb.getDouble()
+ return new Rating(user, product, rating)
+ }
+
+ /**
+ * Java stub for Python mllib ALSModel.train(). This stub returns a handle
+ * to the Java object instead of the content of the Java object. Extra care
+ * needs to be taken in the Python code to ensure it gets freed on exit; see
+ * the Py4J documentation.
+ */
+ def trainALSModel(ratingsBytesJRDD: JavaRDD[Array[Byte]], rank: Int,
+ iterations: Int, lambda: Double, blocks: Int): MatrixFactorizationModel = {
+ val ratings = ratingsBytesJRDD.rdd.map(unpackRating)
+ return ALS.train(ratings, rank, iterations, lambda, blocks)
+ }
+
+ /**
+ * Java stub for Python mllib ALSModel.trainImplicit(). This stub returns a
+ * handle to the Java object instead of the content of the Java object.
+ * Extra care needs to be taken in the Python code to ensure it gets freed on
+ * exit; see the Py4J documentation.
+ */
+ def trainImplicitALSModel(ratingsBytesJRDD: JavaRDD[Array[Byte]], rank: Int,
+ iterations: Int, lambda: Double, blocks: Int, alpha: Double): MatrixFactorizationModel = {
+ val ratings = ratingsBytesJRDD.rdd.map(unpackRating)
+ return ALS.trainImplicit(ratings, rank, iterations, lambda, blocks, alpha)
+ }
}
[02/28] git commit: The rest of the Python side of those bindings.
Posted by ma...@apache.org.
The rest of the Python side of those bindings.
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/bf491bb3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/bf491bb3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/bf491bb3
Branch: refs/heads/master
Commit: bf491bb3c0a9008caa4ac112672a4760b3d1c7b8
Parents: 95915f8
Author: Tor Myklebust <tm...@gmail.com>
Authored: Thu Dec 19 01:29:51 2013 -0500
Committer: Tor Myklebust <tm...@gmail.com>
Committed: Thu Dec 19 01:29:51 2013 -0500
----------------------------------------------------------------------
python/pyspark/__init__.py | 3 ++-
python/pyspark/java_gateway.py | 1 +
python/pyspark/serializers.py | 2 +-
3 files changed, 4 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/bf491bb3/python/pyspark/__init__.py
----------------------------------------------------------------------
diff --git a/python/pyspark/__init__.py b/python/pyspark/__init__.py
index 1f35f6f..949406c 100644
--- a/python/pyspark/__init__.py
+++ b/python/pyspark/__init__.py
@@ -42,6 +42,7 @@ from pyspark.context import SparkContext
from pyspark.rdd import RDD
from pyspark.files import SparkFiles
from pyspark.storagelevel import StorageLevel
+from pyspark.mllib import train_linear_regression_model
-__all__ = ["SparkContext", "RDD", "SparkFiles", "StorageLevel"]
+__all__ = ["SparkContext", "RDD", "SparkFiles", "StorageLevel", "train_linear_regression_model"]
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/bf491bb3/python/pyspark/java_gateway.py
----------------------------------------------------------------------
diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py
index e615c1e..2941984 100644
--- a/python/pyspark/java_gateway.py
+++ b/python/pyspark/java_gateway.py
@@ -62,5 +62,6 @@ def launch_gateway():
# Import the classes used by PySpark
java_import(gateway.jvm, "org.apache.spark.api.java.*")
java_import(gateway.jvm, "org.apache.spark.api.python.*")
+ java_import(gateway.jvm, "org.apache.spark.mllib.api.*")
java_import(gateway.jvm, "scala.Tuple2")
return gateway
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/bf491bb3/python/pyspark/serializers.py
----------------------------------------------------------------------
diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py
index 811fa6f..2a500ab 100644
--- a/python/pyspark/serializers.py
+++ b/python/pyspark/serializers.py
@@ -308,4 +308,4 @@ def write_int(value, stream):
def write_with_length(obj, stream):
write_int(len(obj), stream)
- stream.write(obj)
\ No newline at end of file
+ stream.write(obj)
[24/28] git commit: Scala stubs for updated Python bindings.
Posted by ma...@apache.org.
Scala stubs for updated Python bindings.
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/4e821390
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/4e821390
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/4e821390
Branch: refs/heads/master
Commit: 4e821390bca0d1f40b6f2f011bdc71476a1d3aa4
Parents: 0516305
Author: Tor Myklebust <tm...@gmail.com>
Authored: Wed Dec 25 00:09:00 2013 -0500
Committer: Tor Myklebust <tm...@gmail.com>
Committed: Wed Dec 25 00:09:00 2013 -0500
----------------------------------------------------------------------
.../spark/mllib/api/python/PythonMLLibAPI.scala | 26 ++++++++++----------
1 file changed, 13 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/4e821390/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
index ca47432..8247c1e 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
@@ -125,9 +125,9 @@ class PythonMLLibAPI extends Serializable {
}
/**
- * Java stub for Python mllib LinearRegressionModel.train()
+ * Java stub for Python mllib LinearRegressionWithSGD.train()
*/
- def trainLinearRegressionModel(dataBytesJRDD: JavaRDD[Array[Byte]],
+ def trainLinearRegressionModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]],
numIterations: Int, stepSize: Double, miniBatchFraction: Double,
initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
return trainRegressionModel((data, initialWeights) =>
@@ -137,9 +137,9 @@ class PythonMLLibAPI extends Serializable {
}
/**
- * Java stub for Python mllib LassoModel.train()
+ * Java stub for Python mllib LassoWithSGD.train()
*/
- def trainLassoModel(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int,
+ def trainLassoModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int,
stepSize: Double, regParam: Double, miniBatchFraction: Double,
initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
return trainRegressionModel((data, initialWeights) =>
@@ -149,9 +149,9 @@ class PythonMLLibAPI extends Serializable {
}
/**
- * Java stub for Python mllib RidgeRegressionModel.train()
+ * Java stub for Python mllib RidgeRegressionWithSGD.train()
*/
- def trainRidgeModel(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int,
+ def trainRidgeModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int,
stepSize: Double, regParam: Double, miniBatchFraction: Double,
initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
return trainRegressionModel((data, initialWeights) =>
@@ -161,9 +161,9 @@ class PythonMLLibAPI extends Serializable {
}
/**
- * Java stub for Python mllib SVMModel.train()
+ * Java stub for Python mllib SVMWithSGD.train()
*/
- def trainSVMModel(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int,
+ def trainSVMModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int,
stepSize: Double, regParam: Double, miniBatchFraction: Double,
initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
return trainRegressionModel((data, initialWeights) =>
@@ -173,9 +173,9 @@ class PythonMLLibAPI extends Serializable {
}
/**
- * Java stub for Python mllib LogisticRegressionModel.train()
+ * Java stub for Python mllib LogisticRegressionWithSGD.train()
*/
- def trainLogisticRegressionModel(dataBytesJRDD: JavaRDD[Array[Byte]],
+ def trainLogisticRegressionModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]],
numIterations: Int, stepSize: Double, miniBatchFraction: Double,
initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
return trainRegressionModel((data, initialWeights) =>
@@ -185,7 +185,7 @@ class PythonMLLibAPI extends Serializable {
}
/**
- * Java stub for Python mllib KMeansModel.train()
+ * Java stub for Python mllib KMeans.train()
*/
def trainKMeansModel(dataBytesJRDD: JavaRDD[Array[Byte]], k: Int,
maxIterations: Int, runs: Int, initializationMode: String):
@@ -207,7 +207,7 @@ class PythonMLLibAPI extends Serializable {
}
/**
- * Java stub for Python mllib ALSModel.train(). This stub returns a handle
+ * Java stub for Python mllib ALS.train(). This stub returns a handle
* to the Java object instead of the content of the Java object. Extra care
* needs to be taken in the Python code to ensure it gets freed on exit; see
* the Py4J documentation.
@@ -219,7 +219,7 @@ class PythonMLLibAPI extends Serializable {
}
/**
- * Java stub for Python mllib ALSModel.trainImplicit(). This stub returns a
+ * Java stub for Python mllib ALS.trainImplicit(). This stub returns a
* handle to the Java object instead of the content of the Java object.
* Extra care needs to be taken in the Python code to ensure it gets freed on
* exit; see the Py4J documentation.
[13/28] git commit: Change some docstrings and add some others.
Posted by ma...@apache.org.
Change some docstrings and add some others.
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/0a5cacb9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/0a5cacb9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/0a5cacb9
Branch: refs/heads/master
Commit: 0a5cacb9615d960c93bca8cc3f4ad2a599f94ec0
Parents: b835ddf
Author: Tor Myklebust <tm...@gmail.com>
Authored: Fri Dec 20 02:05:15 2013 -0500
Committer: Tor Myklebust <tm...@gmail.com>
Committed: Fri Dec 20 02:05:15 2013 -0500
----------------------------------------------------------------------
python/pyspark/mllib.py | 4 +++-
1 file changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0a5cacb9/python/pyspark/mllib.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib.py b/python/pyspark/mllib.py
index ce1363f..928caa9 100644
--- a/python/pyspark/mllib.py
+++ b/python/pyspark/mllib.py
@@ -146,7 +146,7 @@ def _linear_predictor_typecheck(x, coeffs):
raise TypeError("Argument of type " + type(x) + " unsupported");
class LinearModel(object):
- """Something containing a vector of coefficients and an intercept."""
+ """Something that has a vector of coefficients and an intercept."""
def __init__(self, coeff, intercept):
self._coeff = coeff
self._intercept = intercept
@@ -305,6 +305,7 @@ class KMeansModel(object):
self.centers = centers_
def predict(self, x):
+ """Find the cluster to which x belongs in this model."""
best = 0
best_distance = 1e75
for i in range(0, self.centers.shape[0]):
@@ -318,6 +319,7 @@ class KMeansModel(object):
@classmethod
def train(cls, sc, data, k, maxIterations = 100, runs = 1,
initialization_mode="k-means||"):
+ """Train a k-means clustering model."""
dataBytes = _get_unmangled_double_vector_rdd(data)
ans = sc._jvm.PythonMLLibAPI().trainKMeansModel(dataBytes._jrdd,
k, maxIterations, runs, initialization_mode)
[03/28] git commit: Incorporate most of Josh's style suggestions. I
don't want to deal with the type and length checking errors until we've got
at least one working stub that we're all happy with.
Posted by ma...@apache.org.
Incorporate most of Josh's style suggestions. I don't want to deal with the type and length checking errors until we've got at least one working stub that we're all happy with.
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/bf20591a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/bf20591a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/bf20591a
Branch: refs/heads/master
Commit: bf20591a006b9d2fdd9a674d637f5e929fd065a2
Parents: bf491bb
Author: Tor Myklebust <tm...@gmail.com>
Authored: Thu Dec 19 03:40:57 2013 -0500
Committer: Tor Myklebust <tm...@gmail.com>
Committed: Thu Dec 19 03:40:57 2013 -0500
----------------------------------------------------------------------
python/pyspark/__init__.py | 4 +-
python/pyspark/mllib.py | 185 +++++++++++++++++++---------------------
2 files changed, 91 insertions(+), 98 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/bf20591a/python/pyspark/__init__.py
----------------------------------------------------------------------
diff --git a/python/pyspark/__init__.py b/python/pyspark/__init__.py
index 949406c..9f71db3 100644
--- a/python/pyspark/__init__.py
+++ b/python/pyspark/__init__.py
@@ -42,7 +42,7 @@ from pyspark.context import SparkContext
from pyspark.rdd import RDD
from pyspark.files import SparkFiles
from pyspark.storagelevel import StorageLevel
-from pyspark.mllib import train_linear_regression_model
+from pyspark.mllib import LinearRegressionModel
-__all__ = ["SparkContext", "RDD", "SparkFiles", "StorageLevel", "train_linear_regression_model"]
+__all__ = ["SparkContext", "RDD", "SparkFiles", "StorageLevel", "LinearRegressionModel"];
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/bf20591a/python/pyspark/mllib.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib.py b/python/pyspark/mllib.py
index 8237f66..0dfc490 100644
--- a/python/pyspark/mllib.py
+++ b/python/pyspark/mllib.py
@@ -1,8 +1,4 @@
-from numpy import *;
-from pyspark.serializers import NoOpSerializer, FramedSerializer, \
- BatchedSerializer, CloudPickleSerializer, pack_long
-
-#__all__ = ["train_linear_regression_model"];
+from numpy import *
# Double vector format:
#
@@ -15,100 +11,97 @@ from pyspark.serializers import NoOpSerializer, FramedSerializer, \
# This is all in machine-endian. That means that the Java interpreter and the
# Python interpreter must agree on what endian the machine is.
-def deserialize_byte_array(shape, ba, offset):
- """Implementation detail. Do not use directly."""
- ar = ndarray(shape=shape, buffer=ba, offset=offset, dtype="float64", \
- order='C');
- return ar.copy();
-
-def serialize_double_vector(v):
- """Implementation detail. Do not use directly."""
- if (type(v) == ndarray and v.dtype == float64 and v.ndim == 1):
- length = v.shape[0];
- ba = bytearray(16 + 8*length);
- header = ndarray(shape=[2], buffer=ba, dtype="int64");
- header[0] = 1;
- header[1] = length;
- copyto(ndarray(shape=[length], buffer=ba, offset=16, dtype="float64"), v);
- return ba;
- else:
- raise TypeError("serialize_double_vector called on a non-double-vector");
+def _deserialize_byte_array(shape, ba, offset):
+ ar = ndarray(shape=shape, buffer=ba, offset=offset, dtype="float64",
+ order='C')
+ return ar.copy()
-def deserialize_double_vector(ba):
- """Implementation detail. Do not use directly."""
- if (type(ba) == bytearray and len(ba) >= 16 and (len(ba) & 7 == 0)):
- header = ndarray(shape=[2], buffer=ba, dtype="int64");
- if (header[0] != 1):
- raise TypeError("deserialize_double_vector called on bytearray with " \
- "wrong magic");
- length = header[1];
- if (len(ba) != 8*length + 16):
- raise TypeError("deserialize_double_vector called on bytearray with " \
- "wrong length");
- return deserialize_byte_array([length], ba, 16);
- else:
- raise TypeError("deserialize_double_vector called on a non-bytearray");
+def _serialize_double_vector(v):
+ if (type(v) == ndarray and v.dtype == float64 and v.ndim == 1):
+ length = v.shape[0]
+ ba = bytearray(16 + 8*length)
+ header = ndarray(shape=[2], buffer=ba, dtype="int64")
+ header[0] = 1
+ header[1] = length
+ copyto(ndarray(shape=[length], buffer=ba, offset=16,
+ dtype="float64"), v)
+ return ba
+ else:
+ raise TypeError("_serialize_double_vector called on a "
+ "non-double-vector")
-def serialize_double_matrix(m):
- """Implementation detail. Do not use directly."""
- if (type(m) == ndarray and m.dtype == float64 and m.ndim == 2):
- rows = m.shape[0];
- cols = m.shape[1];
- ba = bytearray(24 + 8 * rows * cols);
- header = ndarray(shape=[3], buffer=ba, dtype="int64");
- header[0] = 2;
- header[1] = rows;
- header[2] = cols;
- copyto(ndarray(shape=[rows, cols], buffer=ba, offset=24, dtype="float64", \
- order='C'), m);
- return ba;
- else:
- print type(m);
- print m.dtype;
- print m.ndim;
- raise TypeError("serialize_double_matrix called on a non-double-matrix");
+def _deserialize_double_vector(ba):
+ if (type(ba) == bytearray and len(ba) >= 16 and (len(ba) & 7 == 0)):
+ header = ndarray(shape=[2], buffer=ba, dtype="int64")
+ if (header[0] != 1):
+ raise TypeError("_deserialize_double_vector called on bytearray "
+ "with wrong magic")
+ length = header[1]
+ if (len(ba) != 8*length + 16):
+ raise TypeError("_deserialize_double_vector called on bytearray "
+ "with wrong length")
+ return _deserialize_byte_array([length], ba, 16)
+ else:
+ raise TypeError("_deserialize_double_vector called on a non-bytearray")
-def deserialize_double_matrix(ba):
- """Implementation detail. Do not use directly."""
- if (type(ba) == bytearray and len(ba) >= 24 and (len(ba) & 7 == 0)):
- header = ndarray(shape=[3], buffer=ba, dtype="int64");
- if (header[0] != 2):
- raise TypeError("deserialize_double_matrix called on bytearray with " \
- "wrong magic");
- rows = header[1];
- cols = header[2];
- if (len(ba) != 8*rows*cols + 24):
- raise TypeError("deserialize_double_matrix called on bytearray with " \
- "wrong length");
- return deserialize_byte_array([rows, cols], ba, 24);
- else:
- raise TypeError("deserialize_double_matrix called on a non-bytearray");
+def _serialize_double_matrix(m):
+ if (type(m) == ndarray and m.dtype == float64 and m.ndim == 2):
+ rows = m.shape[0]
+ cols = m.shape[1]
+ ba = bytearray(24 + 8 * rows * cols)
+ header = ndarray(shape=[3], buffer=ba, dtype="int64")
+ header[0] = 2
+ header[1] = rows
+ header[2] = cols
+ copyto(ndarray(shape=[rows, cols], buffer=ba, offset=24,
+ dtype="float64", order='C'), m)
+ return ba
+ else:
+ raise TypeError("_serialize_double_matrix called on a "
+ "non-double-matrix")
-class LinearRegressionModel:
- _coeff = None;
- _intercept = None;
- def __init__(self, coeff, intercept):
- self._coeff = coeff;
- self._intercept = intercept;
- def predict(self, x):
- if (type(x) == ndarray):
- if (x.ndim == 1):
- return dot(_coeff, x) - _intercept;
- else:
- raise RuntimeError("Bulk predict not yet supported.");
- elif (type(x) == RDD):
- raise RuntimeError("Bulk predict not yet supported.");
+def _deserialize_double_matrix(ba):
+ if (type(ba) == bytearray and len(ba) >= 24 and (len(ba) & 7 == 0)):
+ header = ndarray(shape=[3], buffer=ba, dtype="int64")
+ if (header[0] != 2):
+ raise TypeError("_deserialize_double_matrix called on bytearray "
+ "with wrong magic")
+ rows = header[1]
+ cols = header[2]
+ if (len(ba) != 8*rows*cols + 24):
+ raise TypeError("_deserialize_double_matrix called on bytearray "
+ "with wrong length")
+ return _deserialize_byte_array([rows, cols], ba, 24)
else:
- raise TypeError("Bad type argument to LinearRegressionModel::predict");
+ raise TypeError("_deserialize_double_matrix called on a non-bytearray")
+
+class LinearRegressionModel(object):
+ def __init__(self, coeff, intercept):
+ self._coeff = coeff
+ self._intercept = intercept
+
+ def predict(self, x):
+ if (type(x) == ndarray):
+ if (x.ndim == 1):
+ return dot(_coeff, x) - _intercept
+ else:
+ raise RuntimeError("Bulk predict not yet supported.")
+ elif (type(x) == RDD):
+ raise RuntimeError("Bulk predict not yet supported.")
+ else:
+ raise TypeError("Bad type argument to "
+ "LinearRegressionModel::predict")
-def train_linear_regression_model(sc, data):
- """Train a linear regression model on the given data."""
- dataBytes = data.map(serialize_double_vector);
- sc.serializer = NoOpSerializer();
- dataBytes.cache();
- api = sc._jvm.PythonMLLibAPI();
- ans = api.trainLinearRegressionModel(dataBytes._jrdd);
- if (len(ans) != 2 or type(ans[0]) != bytearray or type(ans[1]) != float):
- raise RuntimeError("train_linear_regression_model received garbage " \
- "from JVM");
- return LinearRegressionModel(deserialize_double_vector(ans[0]), ans[1]);
+ @classmethod
+ def train(cls, sc, data):
+ """Train a linear regression model on the given data."""
+ dataBytes = data.map(_serialize_double_vector)
+ dataBytes._bypass_serializer = True
+ dataBytes.cache()
+ api = sc._jvm.PythonMLLibAPI()
+ ans = api.trainLinearRegressionModel(dataBytes._jrdd)
+ if (len(ans) != 2 or type(ans[0]) != bytearray
+ or type(ans[1]) != float):
+ raise RuntimeError("train_linear_regression_model received "
+ "garbage from JVM")
+ return LinearRegressionModel(_deserialize_double_vector(ans[0]), ans[1])
[14/28] git commit: Un-semicolon mllib.py.
Posted by ma...@apache.org.
Un-semicolon mllib.py.
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/0b494c21
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/0b494c21
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/0b494c21
Branch: refs/heads/master
Commit: 0b494c21675b6cc3b5d669dbd9b9a8f277216613
Parents: 0a5cacb
Author: Tor Myklebust <tm...@gmail.com>
Authored: Fri Dec 20 02:05:55 2013 -0500
Committer: Tor Myklebust <tm...@gmail.com>
Committed: Fri Dec 20 02:05:55 2013 -0500
----------------------------------------------------------------------
python/pyspark/mllib.py | 22 +++++++++++-----------
1 file changed, 11 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/0b494c21/python/pyspark/mllib.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib.py b/python/pyspark/mllib.py
index 928caa9..8848284 100644
--- a/python/pyspark/mllib.py
+++ b/python/pyspark/mllib.py
@@ -143,7 +143,7 @@ def _linear_predictor_typecheck(x, coeffs):
elif (type(x) == RDD):
raise RuntimeError("Bulk predict not yet supported.")
else:
- raise TypeError("Argument of type " + type(x) + " unsupported");
+ raise TypeError("Argument of type " + type(x) + " unsupported")
class LinearModel(object):
"""Something that has a vector of coefficients and an intercept."""
@@ -170,7 +170,7 @@ def _get_unmangled_double_vector_rdd(data):
dataBytes = data.map(_serialize_double_vector)
dataBytes._bypass_serializer = True
dataBytes.cache()
- return dataBytes;
+ return dataBytes
# If we weren't given initial weights, take a zero vector of the appropriate
# length.
@@ -183,8 +183,8 @@ def _get_initial_weights(initial_weights, data):
if initial_weights.ndim != 1:
raise TypeError("At least one data element has "
+ initial_weights.ndim + " dimensions, which is not 1")
- initial_weights = zeros([initial_weights.shape[0] - 1]);
- return initial_weights;
+ initial_weights = zeros([initial_weights.shape[0] - 1])
+ return initial_weights
# train_func should take two parameters, namely data and initial_weights, and
# return the result of a call to the appropriate JVM stub.
@@ -194,14 +194,14 @@ def _regression_train_wrapper(sc, train_func, klass, data, initial_weights):
dataBytes = _get_unmangled_double_vector_rdd(data)
ans = train_func(dataBytes, _serialize_double_vector(initial_weights))
if len(ans) != 2:
- raise RuntimeError("JVM call result had unexpected length");
+ raise RuntimeError("JVM call result had unexpected length")
elif type(ans[0]) != bytearray:
raise RuntimeError("JVM call result had first element of type "
- + type(ans[0]) + " which is not bytearray");
+ + type(ans[0]) + " which is not bytearray")
elif type(ans[1]) != float:
raise RuntimeError("JVM call result had second element of type "
- + type(ans[0]) + " which is not float");
- return klass(_deserialize_double_vector(ans[0]), ans[1]);
+ + type(ans[0]) + " which is not float")
+ return klass(_deserialize_double_vector(ans[0]), ans[1])
class LinearRegressionModel(LinearRegressionModelBase):
"""A linear regression model derived from a least-squares fit.
@@ -324,11 +324,11 @@ class KMeansModel(object):
ans = sc._jvm.PythonMLLibAPI().trainKMeansModel(dataBytes._jrdd,
k, maxIterations, runs, initialization_mode)
if len(ans) != 1:
- raise RuntimeError("JVM call result had unexpected length");
+ raise RuntimeError("JVM call result had unexpected length")
elif type(ans[0]) != bytearray:
raise RuntimeError("JVM call result had first element of type "
- + type(ans[0]) + " which is not bytearray");
- return KMeansModel(_deserialize_double_matrix(ans[0]));
+ + type(ans[0]) + " which is not bytearray")
+ return KMeansModel(_deserialize_double_matrix(ans[0]))
def _test():
import doctest
[04/28] git commit: Un-semicolon PythonMLLibAPI.
Posted by ma...@apache.org.
Un-semicolon PythonMLLibAPI.
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/2a41c9aa
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/2a41c9aa
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/2a41c9aa
Branch: refs/heads/master
Commit: 2a41c9aad3d0a8477a11bf910fa57b49ea4dc6dc
Parents: bf20591
Author: Tor Myklebust <tm...@gmail.com>
Authored: Thu Dec 19 21:27:11 2013 -0500
Committer: Tor Myklebust <tm...@gmail.com>
Committed: Thu Dec 19 21:27:11 2013 -0500
----------------------------------------------------------------------
.../apache/spark/mllib/api/PythonMLLibAPI.scala | 54 ++++++++++----------
1 file changed, 27 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2a41c9aa/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala
index 19d2e9a..3daf5dc 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/api/PythonMLLibAPI.scala
@@ -6,46 +6,46 @@ import java.nio.DoubleBuffer
class PythonMLLibAPI extends Serializable {
def deserializeDoubleVector(bytes: Array[Byte]): Array[Double] = {
- val packetLength = bytes.length;
+ val packetLength = bytes.length
if (packetLength < 16) {
- throw new IllegalArgumentException("Byte array too short.");
+ throw new IllegalArgumentException("Byte array too short.")
}
- val bb = ByteBuffer.wrap(bytes);
- bb.order(ByteOrder.nativeOrder());
- val magic = bb.getLong();
+ val bb = ByteBuffer.wrap(bytes)
+ bb.order(ByteOrder.nativeOrder())
+ val magic = bb.getLong()
if (magic != 1) {
- throw new IllegalArgumentException("Magic " + magic + " is wrong.");
+ throw new IllegalArgumentException("Magic " + magic + " is wrong.")
}
- val length = bb.getLong();
+ val length = bb.getLong()
if (packetLength != 16 + 8 * length) {
- throw new IllegalArgumentException("Length " + length + "is wrong.");
+ throw new IllegalArgumentException("Length " + length + "is wrong.")
}
- val db = bb.asDoubleBuffer();
- val ans = new Array[Double](length.toInt);
- db.get(ans);
- return ans;
+ val db = bb.asDoubleBuffer()
+ val ans = new Array[Double](length.toInt)
+ db.get(ans)
+ return ans
}
def serializeDoubleVector(doubles: Array[Double]): Array[Byte] = {
- val len = doubles.length;
- val bytes = new Array[Byte](16 + 8 * len);
- val bb = ByteBuffer.wrap(bytes);
- bb.order(ByteOrder.nativeOrder());
- bb.putLong(1);
- bb.putLong(len);
- val db = bb.asDoubleBuffer();
- db.put(doubles);
- return bytes;
+ val len = doubles.length
+ val bytes = new Array[Byte](16 + 8 * len)
+ val bb = ByteBuffer.wrap(bytes)
+ bb.order(ByteOrder.nativeOrder())
+ bb.putLong(1)
+ bb.putLong(len)
+ val db = bb.asDoubleBuffer()
+ db.put(doubles)
+ return bytes
}
def trainLinearRegressionModel(dataBytesJRDD: JavaRDD[Array[Byte]]):
java.util.List[java.lang.Object] = {
val data = dataBytesJRDD.rdd.map(x => deserializeDoubleVector(x))
- .map(v => LabeledPoint(v(0), v.slice(1, v.length)));
- val model = LinearRegressionWithSGD.train(data, 222);
- val ret = new java.util.LinkedList[java.lang.Object]();
- ret.add(serializeDoubleVector(model.weights));
- ret.add(model.intercept: java.lang.Double);
- return ret;
+ .map(v => LabeledPoint(v(0), v.slice(1, v.length)))
+ val model = LinearRegressionWithSGD.train(data, 222)
+ val ret = new java.util.LinkedList[java.lang.Object]()
+ ret.add(serializeDoubleVector(model.weights))
+ ret.add(model.intercept: java.lang.Double)
+ return ret
}
}