You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/04/02 23:01:20 UTC

[1/2] [SPARK-1212, Part II] Support sparse data in MLlib

Repository: spark
Updated Branches:
  refs/heads/master ed730c950 -> 9c65fa76f


http://git-wip-us.apache.org/repos/asf/spark/blob/9c65fa76/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
index 08cd9ab..cb85e43 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
@@ -17,15 +17,13 @@
 
 package org.apache.spark.mllib.util
 
+import breeze.linalg.{Vector => BV, DenseVector => BDV, SparseVector => BSV,
+  squaredDistance => breezeSquaredDistance}
+
 import org.apache.spark.SparkContext
 import org.apache.spark.rdd.RDD
-import org.apache.spark.SparkContext._
-
-import org.jblas.DoubleMatrix
-
 import org.apache.spark.mllib.regression.LabeledPoint
-
-import breeze.linalg.{Vector => BV, SparseVector => BSV, squaredDistance => breezeSquaredDistance}
+import org.apache.spark.mllib.linalg.{Vector, Vectors}
 
 /**
  * Helper methods to load, save and pre-process data used in ML Lib.
@@ -41,6 +39,107 @@ object MLUtils {
   }
 
   /**
+   * Multiclass label parser, which parses a string into double.
+   */
+  val multiclassLabelParser: String => Double = _.toDouble
+
+  /**
+   * Binary label parser, which outputs 1.0 (positive) if the value is greater than 0.5,
+   * or 0.0 (negative) otherwise.
+   */
+  val binaryLabelParser: String => Double = label => if (label.toDouble > 0.5) 1.0 else 0.0
+
+  /**
+   * Loads labeled data in the LIBSVM format into an RDD[LabeledPoint].
+   * The LIBSVM format is a text-based format used by LIBSVM and LIBLINEAR.
+   * Each line represents a labeled sparse feature vector using the following format:
+   * {{{label index1:value1 index2:value2 ...}}}
+   * where the indices are one-based and in ascending order.
+   * This method parses each line into a [[org.apache.spark.mllib.regression.LabeledPoint]],
+   * where the feature indices are converted to zero-based.
+   *
+   * @param sc Spark context
+   * @param path file or directory path in any Hadoop-supported file system URI
+   * @param labelParser parser for labels, default: 1.0 if label > 0.5 or 0.0 otherwise
+   * @param numFeatures number of features, which will be determined from the input data if a
+   *                    negative value is given. The default value is -1.
+   * @param minSplits min number of partitions, default: sc.defaultMinSplits
+   * @return labeled data stored as an RDD[LabeledPoint]
+   */
+  def loadLibSVMData(
+      sc: SparkContext,
+      path: String,
+      labelParser: String => Double,
+      numFeatures: Int,
+      minSplits: Int): RDD[LabeledPoint] = {
+    val parsed = sc.textFile(path, minSplits)
+      .map(_.trim)
+      .filter(!_.isEmpty)
+      .map(_.split(' '))
+    // Determine number of features.
+    val d = if (numFeatures >= 0) {
+      numFeatures
+    } else {
+      parsed.map { items =>
+        if (items.length > 1) {
+          items.last.split(':')(0).toInt
+        } else {
+          0
+        }
+      }.reduce(math.max)
+    }
+    parsed.map { items =>
+      val label = labelParser(items.head)
+      val (indices, values) = items.tail.map { item =>
+        val indexAndValue = item.split(':')
+        val index = indexAndValue(0).toInt - 1
+        val value = indexAndValue(1).toDouble
+        (index, value)
+      }.unzip
+      LabeledPoint(label, Vectors.sparse(d, indices.toArray, values.toArray))
+    }
+  }
+
+  // Convenient methods for calling from Java.
+
+  /**
+   * Loads binary labeled data in the LIBSVM format into an RDD[LabeledPoint],
+   * with number of features determined automatically and the default number of partitions.
+   */
+  def loadLibSVMData(sc: SparkContext, path: String): RDD[LabeledPoint] =
+    loadLibSVMData(sc, path, binaryLabelParser, -1, sc.defaultMinSplits)
+
+  /**
+   * Loads binary labeled data in the LIBSVM format into an RDD[LabeledPoint],
+   * with number of features specified explicitly and the default number of partitions.
+   */
+  def loadLibSVMData(sc: SparkContext, path: String, numFeatures: Int): RDD[LabeledPoint] =
+    loadLibSVMData(sc, path, binaryLabelParser, numFeatures, sc.defaultMinSplits)
+
+  /**
+   * Loads labeled data in the LIBSVM format into an RDD[LabeledPoint],
+   * with the given label parser, number of features determined automatically,
+   * and the default number of partitions.
+   */
+  def loadLibSVMData(
+      sc: SparkContext,
+      path: String,
+      labelParser: String => Double): RDD[LabeledPoint] =
+    loadLibSVMData(sc, path, labelParser, -1, sc.defaultMinSplits)
+
+  /**
+   * Loads labeled data in the LIBSVM format into an RDD[LabeledPoint],
+   * with the given label parser, number of features specified explicitly,
+   * and the default number of partitions.
+   */
+  def loadLibSVMData(
+      sc: SparkContext,
+      path: String,
+      labelParser: String => Double,
+      numFeatures: Int): RDD[LabeledPoint] =
+    loadLibSVMData(sc, path, labelParser, numFeatures, sc.defaultMinSplits)
+
+  /**
    * Load labeled data from a file. The data format used here is
    * <L>, <f1> <f2> ...
    * where <f1>, <f2> are feature values in Double and <L> is the corresponding label as Double.
@@ -54,7 +153,7 @@ object MLUtils {
     sc.textFile(dir).map { line =>
       val parts = line.split(',')
       val label = parts(0).toDouble
-      val features = parts(1).trim().split(' ').map(_.toDouble)
+      val features = Vectors.dense(parts(1).trim().split(' ').map(_.toDouble))
       LabeledPoint(label, features)
     }
   }
@@ -68,7 +167,7 @@ object MLUtils {
    * @param dir Directory to save the data.
    */
   def saveLabeledData(data: RDD[LabeledPoint], dir: String) {
-    val dataStr = data.map(x => x.label + "," + x.features.mkString(" "))
+    val dataStr = data.map(x => x.label + "," + x.features.toArray.mkString(" "))
     dataStr.saveAsTextFile(dir)
   }
 
@@ -76,44 +175,52 @@ object MLUtils {
    * Utility function to compute mean and standard deviation on a given dataset.
    *
    * @param data - input data set whose statistics are computed
-   * @param nfeatures - number of features
-   * @param nexamples - number of examples in input dataset
+   * @param numFeatures - number of features
+   * @param numExamples - number of examples in input dataset
    *
    * @return (yMean, xColMean, xColSd) - Tuple consisting of
    *     yMean - mean of the labels
    *     xColMean - Row vector with mean for every column (or feature) of the input data
    *     xColSd - Row vector standard deviation for every column (or feature) of the input data.
    */
-  def computeStats(data: RDD[LabeledPoint], nfeatures: Int, nexamples: Long):
-      (Double, DoubleMatrix, DoubleMatrix) = {
-    val yMean: Double = data.map { labeledPoint => labeledPoint.label }.reduce(_ + _) / nexamples
-
-    // NOTE: We shuffle X by column here to compute column sum and sum of squares.
-    val xColSumSq: RDD[(Int, (Double, Double))] = data.flatMap { labeledPoint =>
-      val nCols = labeledPoint.features.length
-      // Traverse over every column and emit (col, value, value^2)
-      Iterator.tabulate(nCols) { i =>
-        (i, (labeledPoint.features(i), labeledPoint.features(i)*labeledPoint.features(i)))
-      }
-    }.reduceByKey { case(x1, x2) =>
-      (x1._1 + x2._1, x1._2 + x2._2)
+  def computeStats(
+      data: RDD[LabeledPoint],
+      numFeatures: Int,
+      numExamples: Long): (Double, Vector, Vector) = {
+    val brzData = data.map { case LabeledPoint(label, features) =>
+      (label, features.toBreeze)
     }
-    val xColSumsMap = xColSumSq.collectAsMap()
-
-    val xColMean = DoubleMatrix.zeros(nfeatures, 1)
-    val xColSd = DoubleMatrix.zeros(nfeatures, 1)
-
-    // Compute mean and unbiased variance using column sums
-    var col = 0
-    while (col < nfeatures) {
-      xColMean.put(col, xColSumsMap(col)._1 / nexamples)
-      val variance =
-        (xColSumsMap(col)._2 - (math.pow(xColSumsMap(col)._1, 2) / nexamples)) / nexamples
-      xColSd.put(col, math.sqrt(variance))
-      col += 1
+    val aggStats = brzData.aggregate(
+      (0L, 0.0, BDV.zeros[Double](numFeatures), BDV.zeros[Double](numFeatures))
+    )(
+      seqOp = (c, v) => (c, v) match {
+        case ((n, sumLabel, sum, sumSq), (label, features)) =>
+          features.activeIterator.foreach { case (i, x) =>
+            sumSq(i) += x * x
+          }
+          (n + 1L, sumLabel + label, sum += features, sumSq)
+      },
+      combOp = (c1, c2) => (c1, c2) match {
+        case ((n1, sumLabel1, sum1, sumSq1), (n2, sumLabel2, sum2, sumSq2)) =>
+          (n1 + n2, sumLabel1 + sumLabel2, sum1 += sum2, sumSq1 += sumSq2)
+      }
+    )
+    val (nl, sumLabel, sum, sumSq) = aggStats
+
+    require(nl > 0, "Input data is empty.")
+    require(nl == numExamples)
+
+    val n = nl.toDouble
+    val yMean = sumLabel / n
+    val mean = sum / n
+    val std = new Array[Double](sum.length)
+    var i = 0
+    while (i < numFeatures) {
+      std(i) = sumSq(i) / n - mean(i) * mean(i)
+      i += 1
     }
 
-    (yMean, xColMean, xColSd)
+    (yMean, Vectors.fromBreeze(mean), Vectors.dense(std))
   }
 
   /**
@@ -144,6 +251,18 @@ object MLUtils {
     val sumSquaredNorm = norm1 * norm1 + norm2 * norm2
     val normDiff = norm1 - norm2
     var sqDist = 0.0
+    /*
+     * The relative error is
+     * <pre>
+     * EPSILON * ( \|a\|_2^2 + \|b\\_2^2 + 2 |a^T b|) / ( \|a - b\|_2^2 ),
+     * </pre>
+     * which is bounded by
+     * <pre>
+     * 2.0 * EPSILON * ( \|a\|_2^2 + \|b\|_2^2 ) / ( (\|a\|_2 - \|b\|_2)^2 ).
+     * </pre>
+     * The bound doesn't need the inner product, so we can use it as a sufficient condition to
+     * check quickly whether the inner product approach is accurate.
+     */
     val precisionBound1 = 2.0 * EPSILON * sumSquaredNorm / (normDiff * normDiff + EPSILON)
     if (precisionBound1 < precision) {
       sqDist = sumSquaredNorm - 2.0 * v1.dot(v2)

http://git-wip-us.apache.org/repos/asf/spark/blob/9c65fa76/mllib/src/main/scala/org/apache/spark/mllib/util/SVMDataGenerator.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/SVMDataGenerator.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/SVMDataGenerator.scala
index c96c94f..e300c3d 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/util/SVMDataGenerator.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/util/SVMDataGenerator.scala
@@ -23,6 +23,7 @@ import org.jblas.DoubleMatrix
 
 import org.apache.spark.SparkContext
 import org.apache.spark.rdd.RDD
+import org.apache.spark.mllib.linalg.Vectors
 import org.apache.spark.mllib.regression.LabeledPoint
 
 /**
@@ -58,7 +59,7 @@ object SVMDataGenerator {
       }
       val yD = new DoubleMatrix(1, x.length, x: _*).dot(trueWeights) + rnd.nextGaussian() * 0.1
       val y = if (yD < 0) 0.0 else 1.0
-      LabeledPoint(y, x)
+      LabeledPoint(y, Vectors.dense(x))
     }
 
     MLUtils.saveLabeledData(data, outputPath)

http://git-wip-us.apache.org/repos/asf/spark/blob/9c65fa76/mllib/src/test/java/org/apache/spark/mllib/classification/JavaNaiveBayesSuite.java
----------------------------------------------------------------------
diff --git a/mllib/src/test/java/org/apache/spark/mllib/classification/JavaNaiveBayesSuite.java b/mllib/src/test/java/org/apache/spark/mllib/classification/JavaNaiveBayesSuite.java
index 073ded6..c80b113 100644
--- a/mllib/src/test/java/org/apache/spark/mllib/classification/JavaNaiveBayesSuite.java
+++ b/mllib/src/test/java/org/apache/spark/mllib/classification/JavaNaiveBayesSuite.java
@@ -19,6 +19,7 @@ package org.apache.spark.mllib.classification;
 
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.mllib.linalg.Vectors;
 import org.apache.spark.mllib.regression.LabeledPoint;
 import org.junit.After;
 import org.junit.Assert;
@@ -45,12 +46,12 @@ public class JavaNaiveBayesSuite implements Serializable {
   }
 
   private static final List<LabeledPoint> POINTS = Arrays.asList(
-    new LabeledPoint(0, new double[] {1.0, 0.0, 0.0}),
-    new LabeledPoint(0, new double[] {2.0, 0.0, 0.0}),
-    new LabeledPoint(1, new double[] {0.0, 1.0, 0.0}),
-    new LabeledPoint(1, new double[] {0.0, 2.0, 0.0}),
-    new LabeledPoint(2, new double[] {0.0, 0.0, 1.0}),
-    new LabeledPoint(2, new double[] {0.0, 0.0, 2.0})
+    new LabeledPoint(0, Vectors.dense(1.0, 0.0, 0.0)),
+    new LabeledPoint(0, Vectors.dense(2.0, 0.0, 0.0)),
+    new LabeledPoint(1, Vectors.dense(0.0, 1.0, 0.0)),
+    new LabeledPoint(1, Vectors.dense(0.0, 2.0, 0.0)),
+    new LabeledPoint(2, Vectors.dense(0.0, 0.0, 1.0)),
+    new LabeledPoint(2, Vectors.dense(0.0, 0.0, 2.0))
   );
 
   private int validatePrediction(List<LabeledPoint> points, NaiveBayesModel model) {

http://git-wip-us.apache.org/repos/asf/spark/blob/9c65fa76/mllib/src/test/java/org/apache/spark/mllib/classification/JavaSVMSuite.java
----------------------------------------------------------------------
diff --git a/mllib/src/test/java/org/apache/spark/mllib/classification/JavaSVMSuite.java b/mllib/src/test/java/org/apache/spark/mllib/classification/JavaSVMSuite.java
index 117e5ea..4701a5e 100644
--- a/mllib/src/test/java/org/apache/spark/mllib/classification/JavaSVMSuite.java
+++ b/mllib/src/test/java/org/apache/spark/mllib/classification/JavaSVMSuite.java
@@ -17,7 +17,6 @@
 
 package org.apache.spark.mllib.classification;
 
-
 import java.io.Serializable;
 import java.util.List;
 
@@ -28,7 +27,6 @@ import org.junit.Test;
 
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
-
 import org.apache.spark.mllib.regression.LabeledPoint;
 
 public class JavaSVMSuite implements Serializable {
@@ -94,5 +92,4 @@ public class JavaSVMSuite implements Serializable {
     int numAccurate = validatePrediction(validationData, model);
     Assert.assertTrue(numAccurate > nPoints * 4.0 / 5.0);
   }
-
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/9c65fa76/mllib/src/test/java/org/apache/spark/mllib/linalg/JavaVectorsSuite.java
----------------------------------------------------------------------
diff --git a/mllib/src/test/java/org/apache/spark/mllib/linalg/JavaVectorsSuite.java b/mllib/src/test/java/org/apache/spark/mllib/linalg/JavaVectorsSuite.java
index 2c4d795..c6d8425 100644
--- a/mllib/src/test/java/org/apache/spark/mllib/linalg/JavaVectorsSuite.java
+++ b/mllib/src/test/java/org/apache/spark/mllib/linalg/JavaVectorsSuite.java
@@ -19,10 +19,10 @@ package org.apache.spark.mllib.linalg;
 
 import java.io.Serializable;
 
-import com.google.common.collect.Lists;
-
 import scala.Tuple2;
 
+import com.google.common.collect.Lists;
+
 import org.junit.Test;
 import static org.junit.Assert.*;
 
@@ -36,7 +36,7 @@ public class JavaVectorsSuite implements Serializable {
 
   @Test
   public void sparseArrayConstruction() {
-    Vector v = Vectors.sparse(3, Lists.newArrayList(
+    Vector v = Vectors.sparse(3, Lists.<Tuple2<Integer, Double>>newArrayList(
         new Tuple2<Integer, Double>(0, 2.0),
         new Tuple2<Integer, Double>(2, 3.0)));
     assertArrayEquals(new double[]{2.0, 0.0, 3.0}, v.toArray(), 0.0);

http://git-wip-us.apache.org/repos/asf/spark/blob/9c65fa76/mllib/src/test/java/org/apache/spark/mllib/regression/JavaLassoSuite.java
----------------------------------------------------------------------
diff --git a/mllib/src/test/java/org/apache/spark/mllib/regression/JavaLassoSuite.java b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaLassoSuite.java
index f44b25c..f725924 100644
--- a/mllib/src/test/java/org/apache/spark/mllib/regression/JavaLassoSuite.java
+++ b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaLassoSuite.java
@@ -59,7 +59,7 @@ public class JavaLassoSuite implements Serializable {
   @Test
   public void runLassoUsingConstructor() {
     int nPoints = 10000;
-    double A = 2.0;
+    double A = 0.0;
     double[] weights = {-1.5, 1.0e-2};
 
     JavaRDD<LabeledPoint> testRDD = sc.parallelize(LinearDataGenerator.generateLinearInputAsList(A,
@@ -80,7 +80,7 @@ public class JavaLassoSuite implements Serializable {
   @Test
   public void runLassoUsingStaticMethods() {
     int nPoints = 10000;
-    double A = 2.0;
+    double A = 0.0;
     double[] weights = {-1.5, 1.0e-2};
 
     JavaRDD<LabeledPoint> testRDD = sc.parallelize(LinearDataGenerator.generateLinearInputAsList(A,

http://git-wip-us.apache.org/repos/asf/spark/blob/9c65fa76/mllib/src/test/java/org/apache/spark/mllib/regression/JavaRidgeRegressionSuite.java
----------------------------------------------------------------------
diff --git a/mllib/src/test/java/org/apache/spark/mllib/regression/JavaRidgeRegressionSuite.java b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaRidgeRegressionSuite.java
index 2fdd5fc..03714ae 100644
--- a/mllib/src/test/java/org/apache/spark/mllib/regression/JavaRidgeRegressionSuite.java
+++ b/mllib/src/test/java/org/apache/spark/mllib/regression/JavaRidgeRegressionSuite.java
@@ -55,30 +55,27 @@ public class JavaRidgeRegressionSuite implements Serializable {
     return errorSum / validationData.size();
   }
 
-  List<LabeledPoint> generateRidgeData(int numPoints, int nfeatures, double eps) {
+  List<LabeledPoint> generateRidgeData(int numPoints, int numFeatures, double std) {
     org.jblas.util.Random.seed(42);
     // Pick weights as random values distributed uniformly in [-0.5, 0.5]
-    DoubleMatrix w = DoubleMatrix.rand(nfeatures, 1).subi(0.5);
-    // Set first two weights to eps
-    w.put(0, 0, eps);
-    w.put(1, 0, eps);
-    return LinearDataGenerator.generateLinearInputAsList(0.0, w.data, numPoints, 42, eps);
+    DoubleMatrix w = DoubleMatrix.rand(numFeatures, 1).subi(0.5);
+    return LinearDataGenerator.generateLinearInputAsList(0.0, w.data, numPoints, 42, std);
   }
 
   @Test
   public void runRidgeRegressionUsingConstructor() {
-    int nexamples = 200;
-    int nfeatures = 20;
-    double eps = 10.0;
-    List<LabeledPoint> data = generateRidgeData(2*nexamples, nfeatures, eps);
+    int numExamples = 50;
+    int numFeatures = 20;
+    List<LabeledPoint> data = generateRidgeData(2*numExamples, numFeatures, 10.0);
 
-    JavaRDD<LabeledPoint> testRDD = sc.parallelize(data.subList(0, nexamples));
-    List<LabeledPoint> validationData = data.subList(nexamples, 2*nexamples);
+    JavaRDD<LabeledPoint> testRDD = sc.parallelize(data.subList(0, numExamples));
+    List<LabeledPoint> validationData = data.subList(numExamples, 2 * numExamples);
 
     RidgeRegressionWithSGD ridgeSGDImpl = new RidgeRegressionWithSGD();
-    ridgeSGDImpl.optimizer().setStepSize(1.0)
-                            .setRegParam(0.0)
-                            .setNumIterations(200);
+    ridgeSGDImpl.optimizer()
+      .setStepSize(1.0)
+      .setRegParam(0.0)
+      .setNumIterations(200);
     RidgeRegressionModel model = ridgeSGDImpl.run(testRDD.rdd());
     double unRegularizedErr = predictionError(validationData, model);
 
@@ -91,13 +88,12 @@ public class JavaRidgeRegressionSuite implements Serializable {
 
   @Test
   public void runRidgeRegressionUsingStaticMethods() {
-    int nexamples = 200;
-    int nfeatures = 20;
-    double eps = 10.0;
-    List<LabeledPoint> data = generateRidgeData(2*nexamples, nfeatures, eps);
+    int numExamples = 50;
+    int numFeatures = 20;
+    List<LabeledPoint> data = generateRidgeData(2 * numExamples, numFeatures, 10.0);
 
-    JavaRDD<LabeledPoint> testRDD = sc.parallelize(data.subList(0, nexamples));
-    List<LabeledPoint> validationData = data.subList(nexamples, 2*nexamples);
+    JavaRDD<LabeledPoint> testRDD = sc.parallelize(data.subList(0, numExamples));
+    List<LabeledPoint> validationData = data.subList(numExamples, 2 * numExamples);
 
     RidgeRegressionModel model = RidgeRegressionWithSGD.train(testRDD.rdd(), 200, 1.0, 0.0);
     double unRegularizedErr = predictionError(validationData, model);

http://git-wip-us.apache.org/repos/asf/spark/blob/9c65fa76/mllib/src/test/scala/org/apache/spark/mllib/classification/LogisticRegressionSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/classification/LogisticRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/classification/LogisticRegressionSuite.scala
index 05322b0..1e03c9d 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/classification/LogisticRegressionSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/classification/LogisticRegressionSuite.scala
@@ -20,11 +20,10 @@ package org.apache.spark.mllib.classification
 import scala.util.Random
 import scala.collection.JavaConversions._
 
-import org.scalatest.BeforeAndAfterAll
 import org.scalatest.FunSuite
 import org.scalatest.matchers.ShouldMatchers
 
-import org.apache.spark.SparkContext
+import org.apache.spark.mllib.linalg.Vectors
 import org.apache.spark.mllib.regression._
 import org.apache.spark.mllib.util.LocalSparkContext
 
@@ -61,7 +60,7 @@ object LogisticRegressionSuite {
       if (yVal > 0) 1 else 0
     }
 
-    val testData = (0 until nPoints).map(i => LabeledPoint(y(i), Array(x1(i))))
+    val testData = (0 until nPoints).map(i => LabeledPoint(y(i), Vectors.dense(Array(x1(i)))))
     testData
   }
 
@@ -113,7 +112,7 @@ class LogisticRegressionSuite extends FunSuite with LocalSparkContext with Shoul
     val testData = LogisticRegressionSuite.generateLogisticInput(A, B, nPoints, 42)
 
     val initialB = -1.0
-    val initialWeights = Array(initialB)
+    val initialWeights = Vectors.dense(initialB)
 
     val testRDD = sc.parallelize(testData, 2)
     testRDD.cache()

http://git-wip-us.apache.org/repos/asf/spark/blob/9c65fa76/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala
index 9dd6c79..516895d 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/classification/NaiveBayesSuite.scala
@@ -19,9 +19,9 @@ package org.apache.spark.mllib.classification
 
 import scala.util.Random
 
-import org.scalatest.BeforeAndAfterAll
 import org.scalatest.FunSuite
 
+import org.apache.spark.mllib.linalg.Vectors
 import org.apache.spark.mllib.regression.LabeledPoint
 import org.apache.spark.mllib.util.LocalSparkContext
 
@@ -54,7 +54,7 @@ object NaiveBayesSuite {
         if (rnd.nextDouble() < _theta(y)(j)) 1 else 0
       }
 
-      LabeledPoint(y, xi)
+      LabeledPoint(y, Vectors.dense(xi))
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/9c65fa76/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala
index bc7abb5..dfacbfe 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala
@@ -20,7 +20,6 @@ package org.apache.spark.mllib.classification
 import scala.util.Random
 import scala.collection.JavaConversions._
 
-import org.scalatest.BeforeAndAfterAll
 import org.scalatest.FunSuite
 
 import org.jblas.DoubleMatrix
@@ -28,6 +27,7 @@ import org.jblas.DoubleMatrix
 import org.apache.spark.SparkException
 import org.apache.spark.mllib.regression._
 import org.apache.spark.mllib.util.LocalSparkContext
+import org.apache.spark.mllib.linalg.Vectors
 
 object SVMSuite {
 
@@ -54,7 +54,7 @@ object SVMSuite {
         intercept + 0.01 * rnd.nextGaussian()
       if (yD < 0) 0.0 else 1.0
     }
-    y.zip(x).map(p => LabeledPoint(p._1, p._2))
+    y.zip(x).map(p => LabeledPoint(p._1, Vectors.dense(p._2)))
   }
 
 }
@@ -110,7 +110,7 @@ class SVMSuite extends FunSuite with LocalSparkContext {
 
     val initialB = -1.0
     val initialC = -1.0
-    val initialWeights = Array(initialB,initialC)
+    val initialWeights = Vectors.dense(initialB, initialC)
 
     val testRDD = sc.parallelize(testData, 2)
     testRDD.cache()
@@ -150,10 +150,10 @@ class SVMSuite extends FunSuite with LocalSparkContext {
     }
 
     intercept[SparkException] {
-      val model = SVMWithSGD.train(testRDDInvalid, 100)
+      SVMWithSGD.train(testRDDInvalid, 100)
     }
 
     // Turning off data validation should not throw an exception
-    val noValidationModel = new SVMWithSGD().setValidateData(false).run(testRDDInvalid)
+    new SVMWithSGD().setValidateData(false).run(testRDDInvalid)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/9c65fa76/mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentSuite.scala
index 631d0e2..c4b4334 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentSuite.scala
@@ -20,13 +20,12 @@ package org.apache.spark.mllib.optimization
 import scala.util.Random
 import scala.collection.JavaConversions._
 
-import org.scalatest.BeforeAndAfterAll
 import org.scalatest.FunSuite
 import org.scalatest.matchers.ShouldMatchers
 
-import org.apache.spark.SparkContext
 import org.apache.spark.mllib.regression._
 import org.apache.spark.mllib.util.LocalSparkContext
+import org.apache.spark.mllib.linalg.Vectors
 
 object GradientDescentSuite {
 
@@ -58,8 +57,7 @@ object GradientDescentSuite {
       if (yVal > 0) 1 else 0
     }
 
-    val testData = (0 until nPoints).map(i => LabeledPoint(y(i), Array(x1(i))))
-    testData
+    (0 until nPoints).map(i => LabeledPoint(y(i), Vectors.dense(x1(i))))
   }
 }
 
@@ -83,11 +81,11 @@ class GradientDescentSuite extends FunSuite with LocalSparkContext with ShouldMa
     // Add a extra variable consisting of all 1.0's for the intercept.
     val testData = GradientDescentSuite.generateGDInput(A, B, nPoints, 42)
     val data = testData.map { case LabeledPoint(label, features) =>
-      label -> Array(1.0, features: _*)
+      label -> Vectors.dense(1.0, features.toArray: _*)
     }
 
     val dataRDD = sc.parallelize(data, 2).cache()
-    val initialWeightsWithIntercept = Array(1.0, initialWeights: _*)
+    val initialWeightsWithIntercept = Vectors.dense(1.0, initialWeights: _*)
 
     val (_, loss) = GradientDescent.runMiniBatchSGD(
       dataRDD,
@@ -113,13 +111,13 @@ class GradientDescentSuite extends FunSuite with LocalSparkContext with ShouldMa
     // Add a extra variable consisting of all 1.0's for the intercept.
     val testData = GradientDescentSuite.generateGDInput(2.0, -1.5, 10000, 42)
     val data = testData.map { case LabeledPoint(label, features) =>
-      label -> Array(1.0, features: _*)
+      label -> Vectors.dense(1.0, features.toArray: _*)
     }
 
     val dataRDD = sc.parallelize(data, 2).cache()
 
     // Prepare non-zero weights
-    val initialWeightsWithIntercept = Array(1.0, 0.5)
+    val initialWeightsWithIntercept = Vectors.dense(1.0, 0.5)
 
     val regParam0 = 0
     val (newWeights0, loss0) = GradientDescent.runMiniBatchSGD(

http://git-wip-us.apache.org/repos/asf/spark/blob/9c65fa76/mllib/src/test/scala/org/apache/spark/mllib/regression/LassoSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/LassoSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/LassoSuite.scala
index 2cebac9..6aad9eb 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/regression/LassoSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/LassoSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.mllib.regression
 
 import org.scalatest.FunSuite
 
+import org.apache.spark.mllib.linalg.Vectors
 import org.apache.spark.mllib.util.{LinearDataGenerator, LocalSparkContext}
 
 class LassoSuite extends FunSuite with LocalSparkContext {
@@ -33,29 +34,33 @@ class LassoSuite extends FunSuite with LocalSparkContext {
   }
 
   test("Lasso local random SGD") {
-    val nPoints = 10000
+    val nPoints = 1000
 
     val A = 2.0
     val B = -1.5
     val C = 1.0e-2
 
-    val testData = LinearDataGenerator.generateLinearInput(A, Array[Double](B,C), nPoints, 42)
-
-    val testRDD = sc.parallelize(testData, 2)
-    testRDD.cache()
+    val testData = LinearDataGenerator.generateLinearInput(A, Array[Double](B, C), nPoints, 42)
+      .map { case LabeledPoint(label, features) =>
+      LabeledPoint(label, Vectors.dense(1.0 +: features.toArray))
+    }
+    val testRDD = sc.parallelize(testData, 2).cache()
 
     val ls = new LassoWithSGD()
-    ls.optimizer.setStepSize(1.0).setRegParam(0.01).setNumIterations(20)
+    ls.optimizer.setStepSize(1.0).setRegParam(0.01).setNumIterations(40)
 
     val model = ls.run(testRDD)
-
     val weight0 = model.weights(0)
     val weight1 = model.weights(1)
-    assert(model.intercept >= 1.9 && model.intercept <= 2.1, model.intercept + " not in [1.9, 2.1]")
-    assert(weight0 >= -1.60 && weight0 <= -1.40, weight0 + " not in [-1.6, -1.4]")
-    assert(weight1 >= -1.0e-3 && weight1 <= 1.0e-3, weight1 + " not in [-0.001, 0.001]")
+    val weight2 = model.weights(2)
+    assert(weight0 >= 1.9 && weight0 <= 2.1, weight0 + " not in [1.9, 2.1]")
+    assert(weight1 >= -1.60 && weight1 <= -1.40, weight1 + " not in [-1.6, -1.4]")
+    assert(weight2 >= -1.0e-3 && weight2 <= 1.0e-3, weight2 + " not in [-0.001, 0.001]")
 
     val validationData = LinearDataGenerator.generateLinearInput(A, Array[Double](B,C), nPoints, 17)
+      .map { case LabeledPoint(label, features) =>
+      LabeledPoint(label, Vectors.dense(1.0 +: features.toArray))
+    }
     val validationRDD  = sc.parallelize(validationData, 2)
 
     // Test prediction on RDD.
@@ -66,33 +71,39 @@ class LassoSuite extends FunSuite with LocalSparkContext {
   }
 
   test("Lasso local random SGD with initial weights") {
-    val nPoints = 10000
+    val nPoints = 1000
 
     val A = 2.0
     val B = -1.5
     val C = 1.0e-2
 
-    val testData = LinearDataGenerator.generateLinearInput(A, Array[Double](B,C), nPoints, 42)
+    val testData = LinearDataGenerator.generateLinearInput(A, Array[Double](B, C), nPoints, 42)
+      .map { case LabeledPoint(label, features) =>
+      LabeledPoint(label, Vectors.dense(1.0 +: features.toArray))
+    }
 
+    val initialA = -1.0
     val initialB = -1.0
     val initialC = -1.0
-    val initialWeights = Array(initialB,initialC)
+    val initialWeights = Vectors.dense(initialA, initialB, initialC)
 
-    val testRDD = sc.parallelize(testData, 2)
-    testRDD.cache()
+    val testRDD = sc.parallelize(testData, 2).cache()
 
     val ls = new LassoWithSGD()
-    ls.optimizer.setStepSize(1.0).setRegParam(0.01).setNumIterations(20)
+    ls.optimizer.setStepSize(1.0).setRegParam(0.01).setNumIterations(40)
 
     val model = ls.run(testRDD, initialWeights)
-
     val weight0 = model.weights(0)
     val weight1 = model.weights(1)
-    assert(model.intercept >= 1.9 && model.intercept <= 2.1, model.intercept + " not in [1.9, 2.1]")
-    assert(weight0 >= -1.60 && weight0 <= -1.40, weight0 + " not in [-1.6, -1.4]")
-    assert(weight1 >= -1.0e-3 && weight1 <= 1.0e-3, weight1 + " not in [-0.001, 0.001]")
+    val weight2 = model.weights(2)
+    assert(weight0 >= 1.9 && weight0 <= 2.1, weight0 + " not in [1.9, 2.1]")
+    assert(weight1 >= -1.60 && weight1 <= -1.40, weight1 + " not in [-1.6, -1.4]")
+    assert(weight2 >= -1.0e-3 && weight2 <= 1.0e-3, weight2 + " not in [-0.001, 0.001]")
 
     val validationData = LinearDataGenerator.generateLinearInput(A, Array[Double](B,C), nPoints, 17)
+      .map { case LabeledPoint(label, features) =>
+      LabeledPoint(label, Vectors.dense(1.0 +: features.toArray))
+    }
     val validationRDD  = sc.parallelize(validationData,2)
 
     // Test prediction on RDD.

http://git-wip-us.apache.org/repos/asf/spark/blob/9c65fa76/mllib/src/test/scala/org/apache/spark/mllib/regression/LinearRegressionSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/LinearRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/LinearRegressionSuite.scala
index 5d251bc..2f7d307 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/regression/LinearRegressionSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/LinearRegressionSuite.scala
@@ -19,6 +19,7 @@ package org.apache.spark.mllib.regression
 
 import org.scalatest.FunSuite
 
+import org.apache.spark.mllib.linalg.Vectors
 import org.apache.spark.mllib.util.{LinearDataGenerator, LocalSparkContext}
 
 class LinearRegressionSuite extends FunSuite with LocalSparkContext {
@@ -40,11 +41,12 @@ class LinearRegressionSuite extends FunSuite with LocalSparkContext {
     linReg.optimizer.setNumIterations(1000).setStepSize(1.0)
 
     val model = linReg.run(testRDD)
-
     assert(model.intercept >= 2.5 && model.intercept <= 3.5)
-    assert(model.weights.length === 2)
-    assert(model.weights(0) >= 9.0 && model.weights(0) <= 11.0)
-    assert(model.weights(1) >= 9.0 && model.weights(1) <= 11.0)
+
+    val weights = model.weights
+    assert(weights.size === 2)
+    assert(weights(0) >= 9.0 && weights(0) <= 11.0)
+    assert(weights(1) >= 9.0 && weights(1) <= 11.0)
 
     val validationData = LinearDataGenerator.generateLinearInput(
       3.0, Array(10.0, 10.0), 100, 17)
@@ -67,9 +69,11 @@ class LinearRegressionSuite extends FunSuite with LocalSparkContext {
     val model = linReg.run(testRDD)
 
     assert(model.intercept === 0.0)
-    assert(model.weights.length === 2)
-    assert(model.weights(0) >= 9.0 && model.weights(0) <= 11.0)
-    assert(model.weights(1) >= 9.0 && model.weights(1) <= 11.0)
+
+    val weights = model.weights
+    assert(weights.size === 2)
+    assert(weights(0) >= 9.0 && weights(0) <= 11.0)
+    assert(weights(1) >= 9.0 && weights(1) <= 11.0)
 
     val validationData = LinearDataGenerator.generateLinearInput(
       0.0, Array(10.0, 10.0), 100, 17)
@@ -81,4 +85,40 @@ class LinearRegressionSuite extends FunSuite with LocalSparkContext {
     // Test prediction on Array.
     validatePrediction(validationData.map(row => model.predict(row.features)), validationData)
   }
+
+  // Test if we can correctly learn Y = 10*X1 + 10*X10000
+  test("sparse linear regression without intercept") {
+    val denseRDD = sc.parallelize(
+      LinearDataGenerator.generateLinearInput(0.0, Array(10.0, 10.0), 100, 42), 2)
+    val sparseRDD = denseRDD.map { case LabeledPoint(label, v) =>
+      val sv = Vectors.sparse(10000, Seq((0, v(0)), (9999, v(1))))
+      LabeledPoint(label, sv)
+    }.cache()
+    val linReg = new LinearRegressionWithSGD().setIntercept(false)
+    linReg.optimizer.setNumIterations(1000).setStepSize(1.0)
+
+    val model = linReg.run(sparseRDD)
+
+    assert(model.intercept === 0.0)
+
+    val weights = model.weights
+    assert(weights.size === 10000)
+    assert(weights(0) >= 9.0 && weights(0) <= 11.0)
+    assert(weights(9999) >= 9.0 && weights(9999) <= 11.0)
+
+    val validationData = LinearDataGenerator.generateLinearInput(0.0, Array(10.0, 10.0), 100, 17)
+    val sparseValidationData = validationData.map { case LabeledPoint(label, v) =>
+      val sv = Vectors.sparse(10000, Seq((0, v(0)), (9999, v(1))))
+      LabeledPoint(label, sv)
+    }
+    val sparseValidationRDD = sc.parallelize(sparseValidationData, 2)
+
+      // Test prediction on RDD.
+    validatePrediction(
+      model.predict(sparseValidationRDD.map(_.features)).collect(), sparseValidationData)
+
+    // Test prediction on Array.
+    validatePrediction(
+      sparseValidationData.map(row => model.predict(row.features)), sparseValidationData)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/9c65fa76/mllib/src/test/scala/org/apache/spark/mllib/regression/RidgeRegressionSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/regression/RidgeRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/regression/RidgeRegressionSuite.scala
index b2044ed..f66fc6e 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/regression/RidgeRegressionSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/regression/RidgeRegressionSuite.scala
@@ -17,9 +17,10 @@
 
 package org.apache.spark.mllib.regression
 
-import org.jblas.DoubleMatrix
 import org.scalatest.FunSuite
 
+import org.jblas.DoubleMatrix
+
 import org.apache.spark.mllib.util.{LinearDataGenerator, LocalSparkContext}
 
 class RidgeRegressionSuite extends FunSuite with LocalSparkContext {
@@ -30,22 +31,22 @@ class RidgeRegressionSuite extends FunSuite with LocalSparkContext {
     }.reduceLeft(_ + _) / predictions.size
   }
 
-  test("regularization with skewed weights") {
-    val nexamples = 200
-    val nfeatures = 20
-    val eps = 10
+  test("ridge regression can help avoid overfitting") {
+
+    // For small number of examples and large variance of error distribution,
+    // ridge regression should give smaller generalization error that linear regression.
+
+    val numExamples = 50
+    val numFeatures = 20
 
     org.jblas.util.Random.seed(42)
     // Pick weights as random values distributed uniformly in [-0.5, 0.5]
-    val w = DoubleMatrix.rand(nfeatures, 1).subi(0.5)
-    // Set first two weights to eps
-    w.put(0, 0, eps)
-    w.put(1, 0, eps)
+    val w = DoubleMatrix.rand(numFeatures, 1).subi(0.5)
 
     // Use half of data for training and other half for validation
-    val data = LinearDataGenerator.generateLinearInput(3.0, w.toArray, 2*nexamples, 42, eps)
-    val testData = data.take(nexamples)
-    val validationData = data.takeRight(nexamples)
+    val data = LinearDataGenerator.generateLinearInput(3.0, w.toArray, 2 * numExamples, 42, 10.0)
+    val testData = data.take(numExamples)
+    val validationData = data.takeRight(numExamples)
 
     val testRDD = sc.parallelize(testData, 2).cache()
     val validationRDD = sc.parallelize(validationData, 2).cache()
@@ -67,7 +68,7 @@ class RidgeRegressionSuite extends FunSuite with LocalSparkContext {
     val ridgeErr = predictionError(
         ridgeModel.predict(validationRDD.map(_.features)).collect(), validationData)
 
-    // Ridge CV-error should be lower than linear regression
+    // Ridge validation error should be lower than linear regression.
     assert(ridgeErr < linearErr,
       "ridgeError (" + ridgeErr + ") was not less than linearError(" + linearErr + ")")
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/9c65fa76/mllib/src/test/scala/org/apache/spark/mllib/tree/DecisionTreeSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/tree/DecisionTreeSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/tree/DecisionTreeSuite.scala
index 4349c70..350130c 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/tree/DecisionTreeSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/tree/DecisionTreeSuite.scala
@@ -27,6 +27,7 @@ import org.apache.spark.mllib.tree.model.Filter
 import org.apache.spark.mllib.tree.configuration.Strategy
 import org.apache.spark.mllib.tree.configuration.Algo._
 import org.apache.spark.mllib.tree.configuration.FeatureType._
+import org.apache.spark.mllib.linalg.Vectors
 
 class DecisionTreeSuite extends FunSuite with BeforeAndAfterAll {
 
@@ -396,7 +397,7 @@ object DecisionTreeSuite {
   def generateOrderedLabeledPointsWithLabel0(): Array[LabeledPoint] = {
     val arr = new Array[LabeledPoint](1000)
     for (i <- 0 until 1000){
-      val lp = new LabeledPoint(0.0,Array(i.toDouble,1000.0-i))
+      val lp = new LabeledPoint(0.0, Vectors.dense(i.toDouble, 1000.0 - i))
       arr(i) = lp
     }
     arr
@@ -405,7 +406,7 @@ object DecisionTreeSuite {
   def generateOrderedLabeledPointsWithLabel1(): Array[LabeledPoint] = {
     val arr = new Array[LabeledPoint](1000)
     for (i <- 0 until 1000){
-      val lp = new LabeledPoint(1.0,Array(i.toDouble,999.0-i))
+      val lp = new LabeledPoint(1.0, Vectors.dense(i.toDouble, 999.0 - i))
       arr(i) = lp
     }
     arr
@@ -415,9 +416,9 @@ object DecisionTreeSuite {
     val arr = new Array[LabeledPoint](1000)
     for (i <- 0 until 1000){
       if (i < 600){
-        arr(i) = new LabeledPoint(1.0,Array(0.0,1.0))
+        arr(i) = new LabeledPoint(1.0, Vectors.dense(0.0, 1.0))
       } else {
-        arr(i) = new LabeledPoint(0.0,Array(1.0,0.0))
+        arr(i) = new LabeledPoint(0.0, Vectors.dense(1.0, 0.0))
       }
     }
     arr

http://git-wip-us.apache.org/repos/asf/spark/blob/9c65fa76/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala
index 60f053b..27d41c7 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/util/MLUtilsSuite.scala
@@ -17,14 +17,20 @@
 
 package org.apache.spark.mllib.util
 
+import java.io.File
+
 import org.scalatest.FunSuite
 
 import breeze.linalg.{DenseVector => BDV, SparseVector => BSV, norm => breezeNorm,
   squaredDistance => breezeSquaredDistance}
+import com.google.common.base.Charsets
+import com.google.common.io.Files
 
+import org.apache.spark.mllib.linalg.Vectors
+import org.apache.spark.mllib.regression.LabeledPoint
 import org.apache.spark.mllib.util.MLUtils._
 
-class MLUtilsSuite extends FunSuite {
+class MLUtilsSuite extends FunSuite with LocalSparkContext {
 
   test("epsilon computation") {
     assert(1.0 + EPSILON > 1.0, s"EPSILON is too small: $EPSILON.")
@@ -49,4 +55,55 @@ class MLUtilsSuite extends FunSuite {
       assert((fastSquaredDist2 - squaredDist) <= precision * squaredDist, s"failed with m = $m")
     }
   }
+
+  test("compute stats") {
+    val data = Seq.fill(3)(Seq(
+      LabeledPoint(1.0, Vectors.dense(1.0, 2.0, 3.0)),
+      LabeledPoint(0.0, Vectors.dense(3.0, 4.0, 5.0))
+    )).flatten
+    val rdd = sc.parallelize(data, 2)
+    val (meanLabel, mean, std) = MLUtils.computeStats(rdd, 3, 6)
+    assert(meanLabel === 0.5)
+    assert(mean === Vectors.dense(2.0, 3.0, 4.0))
+    assert(std === Vectors.dense(1.0, 1.0, 1.0))
+  }
+
+  test("loadLibSVMData") {
+    val lines =
+      """
+        |+1 1:1.0 3:2.0 5:3.0
+        |-1
+        |-1 2:4.0 4:5.0 6:6.0
+      """.stripMargin
+    val tempDir = Files.createTempDir()
+    val file = new File(tempDir.getPath, "part-00000")
+    Files.write(lines, file, Charsets.US_ASCII)
+    val path = tempDir.toURI.toString
+
+    val pointsWithNumFeatures = MLUtils.loadLibSVMData(sc, path, 6).collect()
+    val pointsWithoutNumFeatures = MLUtils.loadLibSVMData(sc, path).collect()
+
+    for (points <- Seq(pointsWithNumFeatures, pointsWithoutNumFeatures)) {
+      assert(points.length === 3)
+      assert(points(0).label === 1.0)
+      assert(points(0).features === Vectors.sparse(6, Seq((0, 1.0), (2, 2.0), (4, 3.0))))
+      assert(points(1).label == 0.0)
+      assert(points(1).features == Vectors.sparse(6, Seq()))
+      assert(points(2).label === 0.0)
+      assert(points(2).features === Vectors.sparse(6, Seq((1, 4.0), (3, 5.0), (5, 6.0))))
+    }
+
+    val multiclassPoints = MLUtils.loadLibSVMData(sc, path, MLUtils.multiclassLabelParser).collect()
+    assert(multiclassPoints.length === 3)
+    assert(multiclassPoints(0).label === 1.0)
+    assert(multiclassPoints(1).label === -1.0)
+    assert(multiclassPoints(2).label === -1.0)
+
+    try {
+      file.delete()
+      tempDir.delete()
+    } catch {
+      case t: Throwable =>
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/9c65fa76/python/pyspark/mllib/classification.py
----------------------------------------------------------------------
diff --git a/python/pyspark/mllib/classification.py b/python/pyspark/mllib/classification.py
index 19b90df..d2f9cdb 100644
--- a/python/pyspark/mllib/classification.py
+++ b/python/pyspark/mllib/classification.py
@@ -87,18 +87,19 @@ class NaiveBayesModel(object):
     >>> data = array([0.0, 0.0, 1.0, 0.0, 0.0, 2.0, 1.0, 1.0, 0.0]).reshape(3,3)
     >>> model = NaiveBayes.train(sc.parallelize(data))
     >>> model.predict(array([0.0, 1.0]))
-    0
+    0.0
     >>> model.predict(array([1.0, 0.0]))
-    1
+    1.0
     """
 
-    def __init__(self, pi, theta):
+    def __init__(self, labels, pi, theta):
+        self.labels = labels
         self.pi = pi
         self.theta = theta
 
     def predict(self, x):
         """Return the most likely class for a data vector x"""
-        return numpy.argmax(self.pi + dot(x, self.theta))
+        return self.labels[numpy.argmax(self.pi + dot(x, self.theta))]
 
 class NaiveBayes(object):
     @classmethod
@@ -122,7 +123,8 @@ class NaiveBayes(object):
         ans = sc._jvm.PythonMLLibAPI().trainNaiveBayes(dataBytes._jrdd, lambda_)
         return NaiveBayesModel(
             _deserialize_double_vector(ans[0]),
-            _deserialize_double_matrix(ans[1]))
+            _deserialize_double_vector(ans[1]),
+            _deserialize_double_matrix(ans[2]))
 
 
 def _test():


[2/2] git commit: [SPARK-1212, Part II] Support sparse data in MLlib

Posted by ma...@apache.org.
[SPARK-1212, Part II] Support sparse data in MLlib

In PR https://github.com/apache/spark/pull/117, we added dense/sparse vector data model and updated KMeans to support sparse input. This PR is to replace all other `Array[Double]` usage by `Vector` in generalized linear models (GLMs) and Naive Bayes. Major changes:

1. `LabeledPoint` becomes `LabeledPoint(Double, Vector)`.
2. Methods that accept `RDD[Array[Double]]` now accept `RDD[Vector]`. We cannot support both in an elegant way because of type erasure.
3. Mark 'createModel' and 'predictPoint' protected because they are not for end users.
4. Add libSVMFile to MLContext.
5. NaiveBayes can accept arbitrary labels (introducing a breaking change to Python's `NaiveBayesModel`).
6. Gradient computation no longer creates temp vectors.
7. Column normalization and centering are removed from Lasso and Ridge because the operation will densify the data. Simple feature transformation can be done before training.

TODO:
1. ~~Use axpy when possible.~~
2. ~~Optimize Naive Bayes.~~

Author: Xiangrui Meng <me...@databricks.com>

Closes #245 from mengxr/vector and squashes the following commits:

eb6e793 [Xiangrui Meng] move libSVMFile to MLUtils and rename to loadLibSVMData
c26c4fc [Xiangrui Meng] update DecisionTree to use RDD[Vector]
11999c7 [Xiangrui Meng] Merge branch 'master' into vector
f7da54b [Xiangrui Meng] add minSplits to libSVMFile
da25e24 [Xiangrui Meng] revert the change to default addIntercept because it might change the behavior of existing code without warning
493f26f [Xiangrui Meng] Merge branch 'master' into vector
7c1bc01 [Xiangrui Meng] add a TODO to NB
b9b7ef7 [Xiangrui Meng] change default value of addIntercept to false
b01df54 [Xiangrui Meng] allow to change or clear threshold in LR and SVM
4addc50 [Xiangrui Meng] merge master
4ca5b1b [Xiangrui Meng] remove normalization from Lasso and update tests
f04fe8a [Xiangrui Meng] remove normalization from RidgeRegression and update tests
d088552 [Xiangrui Meng] use static constructor for MLContext
6f59eed [Xiangrui Meng] update libSVMFile to determine number of features automatically
3432e84 [Xiangrui Meng] update NaiveBayes to support sparse data
0f8759b [Xiangrui Meng] minor updates to NB
b11659c [Xiangrui Meng] style update
78c4671 [Xiangrui Meng] add libSVMFile to MLContext
f0fe616 [Xiangrui Meng] add a test for sparse linear regression
44733e1 [Xiangrui Meng] use in-place gradient computation
e981396 [Xiangrui Meng] use axpy in Updater
db808a1 [Xiangrui Meng] update JavaLR example
befa592 [Xiangrui Meng] passed scala/java tests
75c83a4 [Xiangrui Meng] passed test compile
1859701 [Xiangrui Meng] passed compile
834ada2 [Xiangrui Meng] optimized MLUtils.computeStats update some ml algorithms to use Vector (cont.)
135ab72 [Xiangrui Meng] merge glm
0e57aa4 [Xiangrui Meng] update Lasso and RidgeRegression to parse the weights correctly from GLM mark createModel protected mark predictPoint protected
d7f629f [Xiangrui Meng] fix a bug in GLM when intercept is not used
3f346ba [Xiangrui Meng] update some ml algorithms to use Vector


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9c65fa76
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9c65fa76
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9c65fa76

Branch: refs/heads/master
Commit: 9c65fa76f9d413e311a80f29d35d3ff7722e9476
Parents: ed730c9
Author: Xiangrui Meng <me...@databricks.com>
Authored: Wed Apr 2 14:01:12 2014 -0700
Committer: Matei Zaharia <ma...@databricks.com>
Committed: Wed Apr 2 14:01:12 2014 -0700

----------------------------------------------------------------------
 .../org/apache/spark/mllib/examples/JavaLR.java |  14 +-
 .../spark/mllib/api/python/PythonMLLibAPI.scala | 154 ++++++++++-----
 .../classification/ClassificationModel.scala    |  13 +-
 .../classification/LogisticRegression.scala     |  84 ++++----
 .../spark/mllib/classification/NaiveBayes.scala | 116 +++++------
 .../apache/spark/mllib/classification/SVM.scala |  69 ++++---
 .../apache/spark/mllib/clustering/KMeans.scala  |   3 +-
 .../org/apache/spark/mllib/linalg/Vectors.scala |   8 +
 .../spark/mllib/optimization/Gradient.scala     | 117 ++++++++---
 .../mllib/optimization/GradientDescent.scala    |  58 +++---
 .../spark/mllib/optimization/Optimizer.scala    |   7 +-
 .../spark/mllib/optimization/Updater.scala      |  69 +++++--
 .../regression/GeneralizedLinearAlgorithm.scala |  69 +++----
 .../spark/mllib/regression/LabeledPoint.scala   |   6 +-
 .../apache/spark/mllib/regression/Lasso.scala   |  79 ++------
 .../mllib/regression/LinearRegression.scala     |  37 ++--
 .../mllib/regression/RegressionModel.scala      |   5 +-
 .../mllib/regression/RidgeRegression.scala      |  75 ++-----
 .../apache/spark/mllib/tree/DecisionTree.scala  |   9 +-
 .../mllib/tree/model/DecisionTreeModel.scala    |   5 +-
 .../apache/spark/mllib/tree/model/Node.scala    |   7 +-
 .../spark/mllib/util/LinearDataGenerator.scala  |   3 +-
 .../util/LogisticRegressionDataGenerator.scala  |   3 +-
 .../org/apache/spark/mllib/util/MLUtils.scala   | 193 +++++++++++++++----
 .../spark/mllib/util/SVMDataGenerator.scala     |   3 +-
 .../classification/JavaNaiveBayesSuite.java     |  13 +-
 .../mllib/classification/JavaSVMSuite.java      |   3 -
 .../spark/mllib/linalg/JavaVectorsSuite.java    |   6 +-
 .../spark/mllib/regression/JavaLassoSuite.java  |   4 +-
 .../regression/JavaRidgeRegressionSuite.java    |  38 ++--
 .../LogisticRegressionSuite.scala               |   7 +-
 .../mllib/classification/NaiveBayesSuite.scala  |   4 +-
 .../spark/mllib/classification/SVMSuite.scala   |  10 +-
 .../optimization/GradientDescentSuite.scala     |  14 +-
 .../spark/mllib/regression/LassoSuite.scala     |  51 +++--
 .../regression/LinearRegressionSuite.scala      |  54 +++++-
 .../mllib/regression/RidgeRegressionSuite.scala |  27 +--
 .../spark/mllib/tree/DecisionTreeSuite.scala    |   9 +-
 .../apache/spark/mllib/util/MLUtilsSuite.scala  |  59 +++++-
 python/pyspark/mllib/classification.py          |  12 +-
 40 files changed, 926 insertions(+), 591 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9c65fa76/examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java
----------------------------------------------------------------------
diff --git a/examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java b/examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java
index 667c72f..cd8879f 100644
--- a/examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java
+++ b/examples/src/main/java/org/apache/spark/mllib/examples/JavaLR.java
@@ -17,6 +17,7 @@
 
 package org.apache.spark.mllib.examples;
 
+import java.util.regex.Pattern;
 
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
@@ -24,11 +25,9 @@ import org.apache.spark.api.java.function.Function;
 
 import org.apache.spark.mllib.classification.LogisticRegressionWithSGD;
 import org.apache.spark.mllib.classification.LogisticRegressionModel;
+import org.apache.spark.mllib.linalg.Vectors;
 import org.apache.spark.mllib.regression.LabeledPoint;
 
-import java.util.Arrays;
-import java.util.regex.Pattern;
-
 /**
  * Logistic regression based classification using ML Lib.
  */
@@ -47,14 +46,10 @@ public final class JavaLR {
       for (int i = 0; i < tok.length; ++i) {
         x[i] = Double.parseDouble(tok[i]);
       }
-      return new LabeledPoint(y, x);
+      return new LabeledPoint(y, Vectors.dense(x));
     }
   }
 
-  public static void printWeights(double[] a) {
-    System.out.println(Arrays.toString(a));
-  }
-
   public static void main(String[] args) {
     if (args.length != 4) {
       System.err.println("Usage: JavaLR <master> <input_dir> <step_size> <niters>");
@@ -80,8 +75,7 @@ public final class JavaLR {
     LogisticRegressionModel model = LogisticRegressionWithSGD.train(points.rdd(),
         iterations, stepSize);
 
-    System.out.print("Final w: ");
-    printWeights(model.weights());
+    System.out.print("Final w: " + model.weights());
 
     System.exit(0);
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/9c65fa76/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
index 3449c69..2df5b0d 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/api/python/PythonMLLibAPI.scala
@@ -110,16 +110,16 @@ class PythonMLLibAPI extends Serializable {
 
   private def trainRegressionModel(
       trainFunc: (RDD[LabeledPoint], Array[Double]) => GeneralizedLinearModel,
-      dataBytesJRDD: JavaRDD[Array[Byte]], initialWeightsBA: Array[Byte]):
-      java.util.LinkedList[java.lang.Object] = {
+      dataBytesJRDD: JavaRDD[Array[Byte]],
+      initialWeightsBA: Array[Byte]): java.util.LinkedList[java.lang.Object] = {
     val data = dataBytesJRDD.rdd.map(xBytes => {
         val x = deserializeDoubleVector(xBytes)
-        LabeledPoint(x(0), x.slice(1, x.length))
+        LabeledPoint(x(0), Vectors.dense(x.slice(1, x.length)))
     })
     val initialWeights = deserializeDoubleVector(initialWeightsBA)
     val model = trainFunc(data, initialWeights)
     val ret = new java.util.LinkedList[java.lang.Object]()
-    ret.add(serializeDoubleVector(model.weights))
+    ret.add(serializeDoubleVector(model.weights.toArray))
     ret.add(model.intercept: java.lang.Double)
     ret
   }
@@ -127,75 +127,127 @@ class PythonMLLibAPI extends Serializable {
   /**
    * Java stub for Python mllib LinearRegressionWithSGD.train()
    */
-  def trainLinearRegressionModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]],
-      numIterations: Int, stepSize: Double, miniBatchFraction: Double,
+  def trainLinearRegressionModelWithSGD(
+      dataBytesJRDD: JavaRDD[Array[Byte]],
+      numIterations: Int,
+      stepSize: Double,
+      miniBatchFraction: Double,
       initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
-    trainRegressionModel((data, initialWeights) =>
-        LinearRegressionWithSGD.train(data, numIterations, stepSize,
-                                      miniBatchFraction, initialWeights),
-        dataBytesJRDD, initialWeightsBA)
+    trainRegressionModel(
+      (data, initialWeights) =>
+        LinearRegressionWithSGD.train(
+          data,
+          numIterations,
+          stepSize,
+          miniBatchFraction,
+          Vectors.dense(initialWeights)),
+      dataBytesJRDD,
+      initialWeightsBA)
   }
 
   /**
    * Java stub for Python mllib LassoWithSGD.train()
    */
-  def trainLassoModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int,
-      stepSize: Double, regParam: Double, miniBatchFraction: Double,
+  def trainLassoModelWithSGD(
+      dataBytesJRDD: JavaRDD[Array[Byte]],
+      numIterations: Int,
+      stepSize: Double,
+      regParam: Double,
+      miniBatchFraction: Double,
       initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
-    trainRegressionModel((data, initialWeights) =>
-        LassoWithSGD.train(data, numIterations, stepSize, regParam,
-                           miniBatchFraction, initialWeights),
-        dataBytesJRDD, initialWeightsBA)
+    trainRegressionModel(
+      (data, initialWeights) =>
+        LassoWithSGD.train(
+          data,
+          numIterations,
+          stepSize,
+          regParam,
+          miniBatchFraction,
+          Vectors.dense(initialWeights)),
+      dataBytesJRDD,
+      initialWeightsBA)
   }
 
   /**
    * Java stub for Python mllib RidgeRegressionWithSGD.train()
    */
-  def trainRidgeModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int,
-      stepSize: Double, regParam: Double, miniBatchFraction: Double,
+  def trainRidgeModelWithSGD(
+      dataBytesJRDD: JavaRDD[Array[Byte]],
+      numIterations: Int,
+      stepSize: Double,
+      regParam: Double,
+      miniBatchFraction: Double,
       initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
-    trainRegressionModel((data, initialWeights) =>
-        RidgeRegressionWithSGD.train(data, numIterations, stepSize, regParam,
-                                     miniBatchFraction, initialWeights),
-        dataBytesJRDD, initialWeightsBA)
+    trainRegressionModel(
+      (data, initialWeights) =>
+        RidgeRegressionWithSGD.train(
+          data,
+          numIterations,
+          stepSize,
+          regParam,
+          miniBatchFraction,
+          Vectors.dense(initialWeights)),
+      dataBytesJRDD,
+      initialWeightsBA)
   }
 
   /**
    * Java stub for Python mllib SVMWithSGD.train()
    */
-  def trainSVMModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]], numIterations: Int,
-      stepSize: Double, regParam: Double, miniBatchFraction: Double,
+  def trainSVMModelWithSGD(
+      dataBytesJRDD: JavaRDD[Array[Byte]],
+      numIterations: Int,
+      stepSize: Double,
+      regParam: Double,
+      miniBatchFraction: Double,
       initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
-    trainRegressionModel((data, initialWeights) =>
-        SVMWithSGD.train(data, numIterations, stepSize, regParam,
-                                     miniBatchFraction, initialWeights),
-        dataBytesJRDD, initialWeightsBA)
+    trainRegressionModel(
+      (data, initialWeights) =>
+        SVMWithSGD.train(
+          data,
+          numIterations,
+          stepSize,
+          regParam,
+          miniBatchFraction,
+          Vectors.dense(initialWeights)),
+      dataBytesJRDD,
+      initialWeightsBA)
   }
 
   /**
    * Java stub for Python mllib LogisticRegressionWithSGD.train()
    */
-  def trainLogisticRegressionModelWithSGD(dataBytesJRDD: JavaRDD[Array[Byte]],
-      numIterations: Int, stepSize: Double, miniBatchFraction: Double,
+  def trainLogisticRegressionModelWithSGD(
+      dataBytesJRDD: JavaRDD[Array[Byte]],
+      numIterations: Int,
+      stepSize: Double,
+      miniBatchFraction: Double,
       initialWeightsBA: Array[Byte]): java.util.List[java.lang.Object] = {
-    trainRegressionModel((data, initialWeights) =>
-        LogisticRegressionWithSGD.train(data, numIterations, stepSize,
-                                     miniBatchFraction, initialWeights),
-        dataBytesJRDD, initialWeightsBA)
+    trainRegressionModel(
+      (data, initialWeights) =>
+        LogisticRegressionWithSGD.train(
+          data,
+          numIterations,
+          stepSize,
+          miniBatchFraction,
+          Vectors.dense(initialWeights)),
+      dataBytesJRDD,
+      initialWeightsBA)
   }
 
   /**
    * Java stub for NaiveBayes.train()
    */
-  def trainNaiveBayes(dataBytesJRDD: JavaRDD[Array[Byte]], lambda: Double)
-      : java.util.List[java.lang.Object] =
-  {
+  def trainNaiveBayes(
+      dataBytesJRDD: JavaRDD[Array[Byte]],
+      lambda: Double): java.util.List[java.lang.Object] = {
     val data = dataBytesJRDD.rdd.map(xBytes => {
       val x = deserializeDoubleVector(xBytes)
-      LabeledPoint(x(0), x.slice(1, x.length))
+      LabeledPoint(x(0), Vectors.dense(x.slice(1, x.length)))
     })
     val model = NaiveBayes.train(data, lambda)
     val ret = new java.util.LinkedList[java.lang.Object]()
+    ret.add(serializeDoubleVector(model.labels))
     ret.add(serializeDoubleVector(model.pi))
     ret.add(serializeDoubleMatrix(model.theta))
     ret
@@ -204,9 +256,12 @@ class PythonMLLibAPI extends Serializable {
   /**
    * Java stub for Python mllib KMeans.train()
    */
-  def trainKMeansModel(dataBytesJRDD: JavaRDD[Array[Byte]], k: Int,
-      maxIterations: Int, runs: Int, initializationMode: String):
-      java.util.List[java.lang.Object] = {
+  def trainKMeansModel(
+      dataBytesJRDD: JavaRDD[Array[Byte]],
+      k: Int,
+      maxIterations: Int,
+      runs: Int,
+      initializationMode: String): java.util.List[java.lang.Object] = {
     val data = dataBytesJRDD.rdd.map(xBytes => Vectors.dense(deserializeDoubleVector(xBytes)))
     val model = KMeans.train(data, k, maxIterations, runs, initializationMode)
     val ret = new java.util.LinkedList[java.lang.Object]()
@@ -259,8 +314,12 @@ class PythonMLLibAPI extends Serializable {
    * needs to be taken in the Python code to ensure it gets freed on exit; see
    * the Py4J documentation.
    */
-  def trainALSModel(ratingsBytesJRDD: JavaRDD[Array[Byte]], rank: Int,
-      iterations: Int, lambda: Double, blocks: Int): MatrixFactorizationModel = {
+  def trainALSModel(
+      ratingsBytesJRDD: JavaRDD[Array[Byte]],
+      rank: Int,
+      iterations: Int,
+      lambda: Double,
+      blocks: Int): MatrixFactorizationModel = {
     val ratings = ratingsBytesJRDD.rdd.map(unpackRating)
     ALS.train(ratings, rank, iterations, lambda, blocks)
   }
@@ -271,8 +330,13 @@ class PythonMLLibAPI extends Serializable {
    * Extra care needs to be taken in the Python code to ensure it gets freed on
    * exit; see the Py4J documentation.
    */
-  def trainImplicitALSModel(ratingsBytesJRDD: JavaRDD[Array[Byte]], rank: Int,
-      iterations: Int, lambda: Double, blocks: Int, alpha: Double): MatrixFactorizationModel = {
+  def trainImplicitALSModel(
+      ratingsBytesJRDD: JavaRDD[Array[Byte]],
+      rank: Int,
+      iterations: Int,
+      lambda: Double,
+      blocks: Int,
+      alpha: Double): MatrixFactorizationModel = {
     val ratings = ratingsBytesJRDD.rdd.map(unpackRating)
     ALS.trainImplicit(ratings, rank, iterations, lambda, blocks, alpha)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/9c65fa76/mllib/src/main/scala/org/apache/spark/mllib/classification/ClassificationModel.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/ClassificationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/ClassificationModel.scala
index 391f5b9..bd10e2e 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/classification/ClassificationModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/ClassificationModel.scala
@@ -17,22 +17,27 @@
 
 package org.apache.spark.mllib.classification
 
+import org.apache.spark.mllib.linalg.Vector
 import org.apache.spark.rdd.RDD
 
+/**
+ * Represents a classification model that predicts to which of a set of categories an example
+ * belongs. The categories are represented by double values: 0.0, 1.0, 2.0, etc.
+ */
 trait ClassificationModel extends Serializable {
   /**
    * Predict values for the given data set using the model trained.
    *
    * @param testData RDD representing data points to be predicted
-   * @return RDD[Int] where each entry contains the corresponding prediction
+   * @return an RDD[Double] where each entry contains the corresponding prediction
    */
-  def predict(testData: RDD[Array[Double]]): RDD[Double]
+  def predict(testData: RDD[Vector]): RDD[Double]
 
   /**
    * Predict values for a single data point using the model trained.
    *
    * @param testData array representing a single data point
-   * @return Int prediction from the trained model
+   * @return predicted category from the trained model
    */
-  def predict(testData: Array[Double]): Double
+  def predict(testData: Vector): Double
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/9c65fa76/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala
index a481f52..798f3a5 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/LogisticRegression.scala
@@ -17,16 +17,12 @@
 
 package org.apache.spark.mllib.classification
 
-import scala.math.round
-
 import org.apache.spark.SparkContext
-import org.apache.spark.rdd.RDD
+import org.apache.spark.mllib.linalg.Vector
 import org.apache.spark.mllib.optimization._
 import org.apache.spark.mllib.regression._
-import org.apache.spark.mllib.util.MLUtils
-import org.apache.spark.mllib.util.DataValidators
-
-import org.jblas.DoubleMatrix
+import org.apache.spark.mllib.util.{DataValidators, MLUtils}
+import org.apache.spark.rdd.RDD
 
 /**
  * Classification model trained using Logistic Regression.
@@ -35,15 +31,38 @@ import org.jblas.DoubleMatrix
  * @param intercept Intercept computed for this model.
  */
 class LogisticRegressionModel(
-    override val weights: Array[Double],
+    override val weights: Vector,
     override val intercept: Double)
-  extends GeneralizedLinearModel(weights, intercept)
-  with ClassificationModel with Serializable {
+  extends GeneralizedLinearModel(weights, intercept) with ClassificationModel with Serializable {
+
+  private var threshold: Option[Double] = Some(0.5)
+
+  /**
+   * Sets the threshold that separates positive predictions from negative predictions. An example
+   * with prediction score greater than or equal to this threshold is identified as an positive,
+   * and negative otherwise. The default value is 0.5.
+   */
+  def setThreshold(threshold: Double): this.type = {
+    this.threshold = Some(threshold)
+    this
+  }
 
-  override def predictPoint(dataMatrix: DoubleMatrix, weightMatrix: DoubleMatrix,
+  /**
+   * Clears the threshold so that `predict` will output raw prediction scores.
+   */
+  def clearThreshold(): this.type = {
+    threshold = None
+    this
+  }
+
+  override def predictPoint(dataMatrix: Vector, weightMatrix: Vector,
       intercept: Double) = {
-    val margin = dataMatrix.mmul(weightMatrix).get(0) + intercept
-    round(1.0/ (1.0 + math.exp(margin * -1)))
+    val margin = weightMatrix.toBreeze.dot(dataMatrix.toBreeze) + intercept
+    val score = 1.0/ (1.0 + math.exp(-margin))
+    threshold match {
+      case Some(t) => if (score < t) 0.0 else 1.0
+      case None => score
+    }
   }
 }
 
@@ -56,16 +75,15 @@ class LogisticRegressionWithSGD private (
     var numIterations: Int,
     var regParam: Double,
     var miniBatchFraction: Double)
-  extends GeneralizedLinearAlgorithm[LogisticRegressionModel]
-  with Serializable {
+  extends GeneralizedLinearAlgorithm[LogisticRegressionModel] with Serializable {
 
   val gradient = new LogisticGradient()
   val updater = new SimpleUpdater()
   override val optimizer = new GradientDescent(gradient, updater)
-      .setStepSize(stepSize)
-      .setNumIterations(numIterations)
-      .setRegParam(regParam)
-      .setMiniBatchFraction(miniBatchFraction)
+    .setStepSize(stepSize)
+    .setNumIterations(numIterations)
+    .setRegParam(regParam)
+    .setMiniBatchFraction(miniBatchFraction)
   override val validators = List(DataValidators.classificationLabels)
 
   /**
@@ -73,7 +91,7 @@ class LogisticRegressionWithSGD private (
    */
   def this() = this(1.0, 100, 0.0, 1.0)
 
-  def createModel(weights: Array[Double], intercept: Double) = {
+  def createModel(weights: Vector, intercept: Double) = {
     new LogisticRegressionModel(weights, intercept)
   }
 }
@@ -105,11 +123,9 @@ object LogisticRegressionWithSGD {
       numIterations: Int,
       stepSize: Double,
       miniBatchFraction: Double,
-      initialWeights: Array[Double])
-    : LogisticRegressionModel =
-  {
-    new LogisticRegressionWithSGD(stepSize, numIterations, 0.0, miniBatchFraction).run(
-      input, initialWeights)
+      initialWeights: Vector): LogisticRegressionModel = {
+    new LogisticRegressionWithSGD(stepSize, numIterations, 0.0, miniBatchFraction)
+      .run(input, initialWeights)
   }
 
   /**
@@ -128,11 +144,9 @@ object LogisticRegressionWithSGD {
       input: RDD[LabeledPoint],
       numIterations: Int,
       stepSize: Double,
-      miniBatchFraction: Double)
-    : LogisticRegressionModel =
-  {
-    new LogisticRegressionWithSGD(stepSize, numIterations, 0.0, miniBatchFraction).run(
-      input)
+      miniBatchFraction: Double): LogisticRegressionModel = {
+    new LogisticRegressionWithSGD(stepSize, numIterations, 0.0, miniBatchFraction)
+      .run(input)
   }
 
   /**
@@ -150,9 +164,7 @@ object LogisticRegressionWithSGD {
   def train(
       input: RDD[LabeledPoint],
       numIterations: Int,
-      stepSize: Double)
-    : LogisticRegressionModel =
-  {
+      stepSize: Double): LogisticRegressionModel = {
     train(input, numIterations, stepSize, 1.0)
   }
 
@@ -168,9 +180,7 @@ object LogisticRegressionWithSGD {
    */
   def train(
       input: RDD[LabeledPoint],
-      numIterations: Int)
-    : LogisticRegressionModel =
-  {
+      numIterations: Int): LogisticRegressionModel = {
     train(input, numIterations, 1.0, 1.0)
   }
 
@@ -183,7 +193,7 @@ object LogisticRegressionWithSGD {
     val sc = new SparkContext(args(0), "LogisticRegression")
     val data = MLUtils.loadLabeledData(sc, args(1))
     val model = LogisticRegressionWithSGD.train(data, args(3).toInt, args(2).toDouble)
-    println("Weights: " + model.weights.mkString("[", ", ", "]"))
+    println("Weights: " + model.weights)
     println("Intercept: " + model.intercept)
 
     sc.stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/9c65fa76/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala
index 6539b2f..e956185 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/NaiveBayes.scala
@@ -17,14 +17,14 @@
 
 package org.apache.spark.mllib.classification
 
-import scala.collection.mutable
+import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, argmax => brzArgmax, sum => brzSum}
 
-import org.jblas.DoubleMatrix
-
-import org.apache.spark.{SparkContext, Logging}
+import org.apache.spark.{Logging, SparkContext}
+import org.apache.spark.SparkContext._
+import org.apache.spark.mllib.linalg.Vector
 import org.apache.spark.mllib.regression.LabeledPoint
-import org.apache.spark.rdd.RDD
 import org.apache.spark.mllib.util.MLUtils
+import org.apache.spark.rdd.RDD
 
 /**
  * Model for Naive Bayes Classifiers.
@@ -32,19 +32,28 @@ import org.apache.spark.mllib.util.MLUtils
  * @param pi Log of class priors, whose dimension is C.
  * @param theta Log of class conditional probabilities, whose dimension is CxD.
  */
-class NaiveBayesModel(val pi: Array[Double], val theta: Array[Array[Double]])
-  extends ClassificationModel with Serializable {
-
-  // Create a column vector that can be used for predictions
-  private val _pi = new DoubleMatrix(pi.length, 1, pi: _*)
-  private val _theta = new DoubleMatrix(theta)
+class NaiveBayesModel(
+    val labels: Array[Double],
+    val pi: Array[Double],
+    val theta: Array[Array[Double]]) extends ClassificationModel with Serializable {
+
+  private val brzPi = new BDV[Double](pi)
+  private val brzTheta = new BDM[Double](theta.length, theta(0).length)
+
+  var i = 0
+  while (i < theta.length) {
+    var j = 0
+    while (j < theta(i).length) {
+      brzTheta(i, j) = theta(i)(j)
+      j += 1
+    }
+    i += 1
+  }
 
-  def predict(testData: RDD[Array[Double]]): RDD[Double] = testData.map(predict)
+  override def predict(testData: RDD[Vector]): RDD[Double] = testData.map(predict)
 
-  def predict(testData: Array[Double]): Double = {
-    val dataMatrix = new DoubleMatrix(testData.length, 1, testData: _*)
-    val result = _pi.add(_theta.mmul(dataMatrix))
-    result.argmax()
+  override def predict(testData: Vector): Double = {
+    labels(brzArgmax(brzPi + brzTheta * testData.toBreeze))
   }
 }
 
@@ -56,9 +65,8 @@ class NaiveBayesModel(val pi: Array[Double], val theta: Array[Array[Double]])
  * document classification.  By making every vector a 0-1 vector, it can also be used as
  * Bernoulli NB ([[http://tinyurl.com/p7c96j6]]).
  */
-class NaiveBayes private (var lambda: Double)
-  extends Serializable with Logging
-{
+class NaiveBayes private (var lambda: Double) extends Serializable with Logging {
+
   def this() = this(1.0)
 
   /** Set the smoothing parameter. Default: 1.0. */
@@ -70,45 +78,42 @@ class NaiveBayes private (var lambda: Double)
   /**
    * Run the algorithm with the configured parameters on an input RDD of LabeledPoint entries.
    *
-   * @param data RDD of (label, array of features) pairs.
+   * @param data RDD of [[org.apache.spark.mllib.regression.LabeledPoint]].
    */
   def run(data: RDD[LabeledPoint]) = {
-    // Aggregates all sample points to driver side to get sample count and summed feature vector
-    // for each label.  The shape of `zeroCombiner` & `aggregated` is:
-    //
-    //    label: Int -> (count: Int, featuresSum: DoubleMatrix)
-    val zeroCombiner = mutable.Map.empty[Int, (Int, DoubleMatrix)]
-    val aggregated = data.aggregate(zeroCombiner)({ (combiner, point) =>
-      point match {
-        case LabeledPoint(label, features) =>
-          val (count, featuresSum) = combiner.getOrElse(label.toInt, (0, DoubleMatrix.zeros(1)))
-          val fs = new DoubleMatrix(features.length, 1, features: _*)
-          combiner += label.toInt -> (count + 1, featuresSum.addi(fs))
-      }
-    }, { (lhs, rhs) =>
-      for ((label, (c, fs)) <- rhs) {
-        val (count, featuresSum) = lhs.getOrElse(label, (0, DoubleMatrix.zeros(1)))
-        lhs(label) = (count + c, featuresSum.addi(fs))
+    // Aggregates term frequencies per label.
+    // TODO: Calling combineByKey and collect creates two stages, we can implement something
+    // TODO: similar to reduceByKeyLocally to save one stage.
+    val aggregated = data.map(p => (p.label, p.features)).combineByKey[(Long, BDV[Double])](
+      createCombiner = (v: Vector) => (1L, v.toBreeze.toDenseVector),
+      mergeValue = (c: (Long, BDV[Double]), v: Vector) => (c._1 + 1L, c._2 += v.toBreeze),
+      mergeCombiners = (c1: (Long, BDV[Double]), c2: (Long, BDV[Double])) =>
+        (c1._1 + c2._1, c1._2 += c2._2)
+    ).collect()
+    val numLabels = aggregated.length
+    var numDocuments = 0L
+    aggregated.foreach { case (_, (n, _)) =>
+      numDocuments += n
+    }
+    val numFeatures = aggregated.head match { case (_, (_, v)) => v.size }
+    val labels = new Array[Double](numLabels)
+    val pi = new Array[Double](numLabels)
+    val theta = Array.fill(numLabels)(new Array[Double](numFeatures))
+    val piLogDenom = math.log(numDocuments + numLabels * lambda)
+    var i = 0
+    aggregated.foreach { case (label, (n, sumTermFreqs)) =>
+      labels(i) = label
+      val thetaLogDenom = math.log(brzSum(sumTermFreqs) + numFeatures * lambda)
+      pi(i) = math.log(n + lambda) - piLogDenom
+      var j = 0
+      while (j < numFeatures) {
+        theta(i)(j) = math.log(sumTermFreqs(j) + lambda) - thetaLogDenom
+        j += 1
       }
-      lhs
-    })
-
-    // Kinds of label
-    val C = aggregated.size
-    // Total sample count
-    val N = aggregated.values.map(_._1).sum
-
-    val pi = new Array[Double](C)
-    val theta = new Array[Array[Double]](C)
-    val piLogDenom = math.log(N + C * lambda)
-
-    for ((label, (count, fs)) <- aggregated) {
-      val thetaLogDenom = math.log(fs.sum() + fs.length * lambda)
-      pi(label) = math.log(count + lambda) - piLogDenom
-      theta(label) = fs.toArray.map(f => math.log(f + lambda) - thetaLogDenom)
+      i += 1
     }
 
-    new NaiveBayesModel(pi, theta)
+    new NaiveBayesModel(labels, pi, theta)
   }
 }
 
@@ -158,8 +163,9 @@ object NaiveBayes {
     } else {
       NaiveBayes.train(data, args(2).toDouble)
     }
-    println("Pi: " + model.pi.mkString("[", ", ", "]"))
-    println("Theta:\n" + model.theta.map(_.mkString("[", ", ", "]")).mkString("[", "\n ", "]"))
+
+    println("Pi\n: " + model.pi)
+    println("Theta:\n" + model.theta)
 
     sc.stop()
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/9c65fa76/mllib/src/main/scala/org/apache/spark/mllib/classification/SVM.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/SVM.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/SVM.scala
index 6dff29d..e31a088 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/classification/SVM.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/SVM.scala
@@ -18,13 +18,11 @@
 package org.apache.spark.mllib.classification
 
 import org.apache.spark.SparkContext
-import org.apache.spark.rdd.RDD
+import org.apache.spark.mllib.linalg.Vector
 import org.apache.spark.mllib.optimization._
 import org.apache.spark.mllib.regression._
-import org.apache.spark.mllib.util.MLUtils
-import org.apache.spark.mllib.util.DataValidators
-
-import org.jblas.DoubleMatrix
+import org.apache.spark.mllib.util.{DataValidators, MLUtils}
+import org.apache.spark.rdd.RDD
 
 /**
  * Model for Support Vector Machines (SVMs).
@@ -33,15 +31,37 @@ import org.jblas.DoubleMatrix
  * @param intercept Intercept computed for this model.
  */
 class SVMModel(
-    override val weights: Array[Double],
+    override val weights: Vector,
     override val intercept: Double)
-  extends GeneralizedLinearModel(weights, intercept)
-  with ClassificationModel with Serializable {
+  extends GeneralizedLinearModel(weights, intercept) with ClassificationModel with Serializable {
+
+  private var threshold: Option[Double] = Some(0.0)
+
+  /**
+   * Sets the threshold that separates positive predictions from negative predictions. An example
+   * with prediction score greater than or equal to this threshold is identified as an positive,
+   * and negative otherwise. The default value is 0.0.
+   */
+  def setThreshold(threshold: Double): this.type = {
+    this.threshold = Some(threshold)
+    this
+  }
 
-  override def predictPoint(dataMatrix: DoubleMatrix, weightMatrix: DoubleMatrix,
+  /**
+   * Clears the threshold so that `predict` will output raw prediction scores.
+   */
+  def clearThreshold(): this.type = {
+    threshold = None
+    this
+  }
+
+  override def predictPoint(dataMatrix: Vector, weightMatrix: Vector,
       intercept: Double) = {
-    val margin = dataMatrix.dot(weightMatrix) + intercept
-    if (margin < 0) 0.0 else 1.0
+    val margin = weightMatrix.toBreeze.dot(dataMatrix.toBreeze) + intercept
+    threshold match {
+      case Some(t) => if (margin < 0) 0.0 else 1.0
+      case None => margin
+    }
   }
 }
 
@@ -71,7 +91,7 @@ class SVMWithSGD private (
    */
   def this() = this(1.0, 100, 1.0, 1.0)
 
-  def createModel(weights: Array[Double], intercept: Double) = {
+  def createModel(weights: Vector, intercept: Double) = {
     new SVMModel(weights, intercept)
   }
 }
@@ -103,11 +123,9 @@ object SVMWithSGD {
       stepSize: Double,
       regParam: Double,
       miniBatchFraction: Double,
-      initialWeights: Array[Double])
-    : SVMModel =
-  {
-    new SVMWithSGD(stepSize, numIterations, regParam, miniBatchFraction).run(input,
-      initialWeights)
+      initialWeights: Vector): SVMModel = {
+    new SVMWithSGD(stepSize, numIterations, regParam, miniBatchFraction)
+      .run(input, initialWeights)
   }
 
   /**
@@ -127,9 +145,7 @@ object SVMWithSGD {
       numIterations: Int,
       stepSize: Double,
       regParam: Double,
-      miniBatchFraction: Double)
-    : SVMModel =
-  {
+      miniBatchFraction: Double): SVMModel = {
     new SVMWithSGD(stepSize, numIterations, regParam, miniBatchFraction).run(input)
   }
 
@@ -149,9 +165,7 @@ object SVMWithSGD {
       input: RDD[LabeledPoint],
       numIterations: Int,
       stepSize: Double,
-      regParam: Double)
-    : SVMModel =
-  {
+      regParam: Double): SVMModel = {
     train(input, numIterations, stepSize, regParam, 1.0)
   }
 
@@ -165,11 +179,7 @@ object SVMWithSGD {
    * @param numIterations Number of iterations of gradient descent to run.
    * @return a SVMModel which has the weights and offset from training.
    */
-  def train(
-      input: RDD[LabeledPoint],
-      numIterations: Int)
-    : SVMModel =
-  {
+  def train(input: RDD[LabeledPoint], numIterations: Int): SVMModel = {
     train(input, numIterations, 1.0, 1.0, 1.0)
   }
 
@@ -181,7 +191,8 @@ object SVMWithSGD {
     val sc = new SparkContext(args(0), "SVM")
     val data = MLUtils.loadLabeledData(sc, args(1))
     val model = SVMWithSGD.train(data, args(4).toInt, args(2).toDouble, args(3).toDouble)
-    println("Weights: " + model.weights.mkString("[", ", ", "]"))
+
+    println("Weights: " + model.weights)
     println("Intercept: " + model.intercept)
 
     sc.stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/9c65fa76/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
index b412738..a78503d 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
@@ -42,8 +42,7 @@ class KMeans private (
     var runs: Int,
     var initializationMode: String,
     var initializationSteps: Int,
-    var epsilon: Double)
-  extends Serializable with Logging {
+    var epsilon: Double) extends Serializable with Logging {
   def this() = this(2, 20, 1, KMeans.K_MEANS_PARALLEL, 5, 1e-4)
 
   /** Set the number of clusters to create (k). Default: 2. */

http://git-wip-us.apache.org/repos/asf/spark/blob/9c65fa76/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
index 01c1501..2cea58c 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/Vectors.scala
@@ -54,6 +54,12 @@ trait Vector extends Serializable {
    * Converts the instance to a breeze vector.
    */
   private[mllib] def toBreeze: BV[Double]
+
+  /**
+   * Gets the value of the ith element.
+   * @param i index
+   */
+  private[mllib] def apply(i: Int): Double = toBreeze(i)
 }
 
 /**
@@ -145,6 +151,8 @@ class DenseVector(val values: Array[Double]) extends Vector {
   override def toArray: Array[Double] = values
 
   private[mllib] override def toBreeze: BV[Double] = new BDV[Double](values)
+
+  override def apply(i: Int) = values(i)
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/9c65fa76/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala
index 8212470..2065428 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Gradient.scala
@@ -17,7 +17,9 @@
 
 package org.apache.spark.mllib.optimization
 
-import org.jblas.DoubleMatrix
+import breeze.linalg.{axpy => brzAxpy}
+
+import org.apache.spark.mllib.linalg.{Vectors, Vector}
 
 /**
  * Class used to compute the gradient for a loss function, given a single data point.
@@ -26,17 +28,26 @@ abstract class Gradient extends Serializable {
   /**
    * Compute the gradient and loss given the features of a single data point.
    *
-   * @param data - Feature values for one data point. Column matrix of size dx1
-   *               where d is the number of features.
-   * @param label - Label for this data item.
-   * @param weights - Column matrix containing weights for every feature.
+   * @param data features for one data point
+   * @param label label for this data point
+   * @param weights weights/coefficients corresponding to features
    *
-   * @return A tuple of 2 elements. The first element is a column matrix containing the computed
-   *         gradient and the second element is the loss computed at this data point.
+   * @return (gradient: Vector, loss: Double)
+   */
+  def compute(data: Vector, label: Double, weights: Vector): (Vector, Double)
+
+  /**
+   * Compute the gradient and loss given the features of a single data point,
+   * add the gradient to a provided vector to avoid creating new objects, and return loss.
    *
+   * @param data features for one data point
+   * @param label label for this data point
+   * @param weights weights/coefficients corresponding to features
+   * @param cumGradient the computed gradient will be added to this vector
+   *
+   * @return loss
    */
-  def compute(data: DoubleMatrix, label: Double, weights: DoubleMatrix): 
-      (DoubleMatrix, Double)
+  def compute(data: Vector, label: Double, weights: Vector, cumGradient: Vector): Double
 }
 
 /**
@@ -44,12 +55,12 @@ abstract class Gradient extends Serializable {
  * See also the documentation for the precise formulation.
  */
 class LogisticGradient extends Gradient {
-  override def compute(data: DoubleMatrix, label: Double, weights: DoubleMatrix): 
-      (DoubleMatrix, Double) = {
-    val margin: Double = -1.0 * data.dot(weights)
+  override def compute(data: Vector, label: Double, weights: Vector): (Vector, Double) = {
+    val brzData = data.toBreeze
+    val brzWeights = weights.toBreeze
+    val margin: Double = -1.0 * brzWeights.dot(brzData)
     val gradientMultiplier = (1.0 / (1.0 + math.exp(margin))) - label
-
-    val gradient = data.mul(gradientMultiplier)
+    val gradient = brzData * gradientMultiplier
     val loss =
       if (label > 0) {
         math.log(1 + math.exp(margin))
@@ -57,7 +68,26 @@ class LogisticGradient extends Gradient {
         math.log(1 + math.exp(margin)) - margin
       }
 
-    (gradient, loss)
+    (Vectors.fromBreeze(gradient), loss)
+  }
+
+  override def compute(
+      data: Vector,
+      label: Double,
+      weights: Vector,
+      cumGradient: Vector): Double = {
+    val brzData = data.toBreeze
+    val brzWeights = weights.toBreeze
+    val margin: Double = -1.0 * brzWeights.dot(brzData)
+    val gradientMultiplier = (1.0 / (1.0 + math.exp(margin))) - label
+
+    brzAxpy(gradientMultiplier, brzData, cumGradient.toBreeze)
+
+    if (label > 0) {
+      math.log(1 + math.exp(margin))
+    } else {
+      math.log(1 + math.exp(margin)) - margin
+    }
   }
 }
 
@@ -68,14 +98,28 @@ class LogisticGradient extends Gradient {
  * See also the documentation for the precise formulation.
  */
 class LeastSquaresGradient extends Gradient {
-  override def compute(data: DoubleMatrix, label: Double, weights: DoubleMatrix): 
-      (DoubleMatrix, Double) = {
-    val diff: Double = data.dot(weights) - label
-
+  override def compute(data: Vector, label: Double, weights: Vector): (Vector, Double) = {
+    val brzData = data.toBreeze
+    val brzWeights = weights.toBreeze
+    val diff = brzWeights.dot(brzData) - label
     val loss = diff * diff
-    val gradient =  data.mul(2.0 * diff)
+    val gradient = brzData * (2.0 * diff)
 
-    (gradient, loss)
+    (Vectors.fromBreeze(gradient), loss)
+  }
+
+  override def compute(
+      data: Vector,
+      label: Double,
+      weights: Vector,
+      cumGradient: Vector): Double = {
+    val brzData = data.toBreeze
+    val brzWeights = weights.toBreeze
+    val diff = brzWeights.dot(brzData) - label
+
+    brzAxpy(2.0 * diff, brzData, cumGradient.toBreeze)
+
+    diff * diff
   }
 }
 
@@ -85,19 +129,40 @@ class LeastSquaresGradient extends Gradient {
  * NOTE: This assumes that the labels are {0,1}
  */
 class HingeGradient extends Gradient {
-  override def compute(data: DoubleMatrix, label: Double, weights: DoubleMatrix):
-      (DoubleMatrix, Double) = {
+  override def compute(data: Vector, label: Double, weights: Vector): (Vector, Double) = {
+    val brzData = data.toBreeze
+    val brzWeights = weights.toBreeze
+    val dotProduct = brzWeights.dot(brzData)
+
+    // Our loss function with {0, 1} labels is max(0, 1 - (2y – 1) (f_w(x)))
+    // Therefore the gradient is -(2y - 1)*x
+    val labelScaled = 2 * label - 1.0
+
+    if (1.0 > labelScaled * dotProduct) {
+      (Vectors.fromBreeze(brzData * (-labelScaled)), 1.0 - labelScaled * dotProduct)
+    } else {
+      (Vectors.dense(new Array[Double](weights.size)), 0.0)
+    }
+  }
 
-    val dotProduct = data.dot(weights)
+  override def compute(
+      data: Vector,
+      label: Double,
+      weights: Vector,
+      cumGradient: Vector): Double = {
+    val brzData = data.toBreeze
+    val brzWeights = weights.toBreeze
+    val dotProduct = brzWeights.dot(brzData)
 
     // Our loss function with {0, 1} labels is max(0, 1 - (2y – 1) (f_w(x)))
     // Therefore the gradient is -(2y - 1)*x
     val labelScaled = 2 * label - 1.0
 
     if (1.0 > labelScaled * dotProduct) {
-      (data.mul(-labelScaled), 1.0 - labelScaled * dotProduct)
+      brzAxpy(-labelScaled, brzData, cumGradient.toBreeze)
+      1.0 - labelScaled * dotProduct
     } else {
-      (DoubleMatrix.zeros(1, weights.length), 0.0)
+      0.0
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/9c65fa76/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala
index b967b22..d0777ff 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala
@@ -17,12 +17,13 @@
 
 package org.apache.spark.mllib.optimization
 
-import org.apache.spark.Logging
-import org.apache.spark.rdd.RDD
+import scala.collection.mutable.ArrayBuffer
 
-import org.jblas.DoubleMatrix
+import breeze.linalg.{Vector => BV, DenseVector => BDV}
 
-import scala.collection.mutable.ArrayBuffer
+import org.apache.spark.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.mllib.linalg.{Vectors, Vector}
 
 /**
  * Class used to solve an optimization problem using Gradient Descent.
@@ -91,18 +92,16 @@ class GradientDescent(var gradient: Gradient, var updater: Updater)
     this
   }
 
-  def optimize(data: RDD[(Double, Array[Double])], initialWeights: Array[Double])
-    : Array[Double] = {
-
-    val (weights, stochasticLossHistory) = GradientDescent.runMiniBatchSGD(
-        data,
-        gradient,
-        updater,
-        stepSize,
-        numIterations,
-        regParam,
-        miniBatchFraction,
-        initialWeights)
+  def optimize(data: RDD[(Double, Vector)], initialWeights: Vector): Vector = {
+    val (weights, _) = GradientDescent.runMiniBatchSGD(
+      data,
+      gradient,
+      updater,
+      stepSize,
+      numIterations,
+      regParam,
+      miniBatchFraction,
+      initialWeights)
     weights
   }
 
@@ -133,14 +132,14 @@ object GradientDescent extends Logging {
    *         stochastic loss computed for every iteration.
    */
   def runMiniBatchSGD(
-    data: RDD[(Double, Array[Double])],
+    data: RDD[(Double, Vector)],
     gradient: Gradient,
     updater: Updater,
     stepSize: Double,
     numIterations: Int,
     regParam: Double,
     miniBatchFraction: Double,
-    initialWeights: Array[Double]) : (Array[Double], Array[Double]) = {
+    initialWeights: Vector): (Vector, Array[Double]) = {
 
     val stochasticLossHistory = new ArrayBuffer[Double](numIterations)
 
@@ -148,24 +147,27 @@ object GradientDescent extends Logging {
     val miniBatchSize = nexamples * miniBatchFraction
 
     // Initialize weights as a column vector
-    var weights = new DoubleMatrix(initialWeights.length, 1, initialWeights:_*)
+    var weights = Vectors.dense(initialWeights.toArray)
 
     /**
      * For the first iteration, the regVal will be initialized as sum of sqrt of
      * weights if it's L2 update; for L1 update; the same logic is followed.
      */
     var regVal = updater.compute(
-      weights, new DoubleMatrix(initialWeights.length, 1), 0, 1, regParam)._2
+      weights, Vectors.dense(new Array[Double](weights.size)), 0, 1, regParam)._2
 
     for (i <- 1 to numIterations) {
       // Sample a subset (fraction miniBatchFraction) of the total data
       // compute and sum up the subgradients on this subset (this is one map-reduce)
-      val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42 + i).map {
-        case (y, features) =>
-          val featuresCol = new DoubleMatrix(features.length, 1, features:_*)
-          val (grad, loss) = gradient.compute(featuresCol, y, weights)
-          (grad, loss)
-      }.reduce((a, b) => (a._1.addi(b._1), a._2 + b._2))
+      val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42 + i)
+        .aggregate((BDV.zeros[Double](weights.size), 0.0))(
+          seqOp = (c, v) => (c, v) match { case ((grad, loss), (label, features)) =>
+            val l = gradient.compute(features, label, weights, Vectors.fromBreeze(grad))
+            (grad, loss + l)
+          },
+          combOp = (c1, c2) => (c1, c2) match { case ((grad1, loss1), (grad2, loss2)) =>
+            (grad1 += grad2, loss1 + loss2)
+          })
 
       /**
        * NOTE(Xinghao): lossSum is computed using the weights from the previous iteration
@@ -173,7 +175,7 @@ object GradientDescent extends Logging {
        */
       stochasticLossHistory.append(lossSum / miniBatchSize + regVal)
       val update = updater.compute(
-        weights, gradientSum.div(miniBatchSize), stepSize, i, regParam)
+        weights, Vectors.fromBreeze(gradientSum / miniBatchSize), stepSize, i, regParam)
       weights = update._1
       regVal = update._2
     }
@@ -181,6 +183,6 @@ object GradientDescent extends Logging {
     logInfo("GradientDescent.runMiniBatchSGD finished. Last 10 stochastic losses %s".format(
       stochasticLossHistory.takeRight(10).mkString(", ")))
 
-    (weights.toArray, stochasticLossHistory.toArray)
+    (weights, stochasticLossHistory.toArray)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/9c65fa76/mllib/src/main/scala/org/apache/spark/mllib/optimization/Optimizer.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Optimizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Optimizer.scala
index 94d30b5..f9ce908 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Optimizer.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Optimizer.scala
@@ -19,11 +19,12 @@ package org.apache.spark.mllib.optimization
 
 import org.apache.spark.rdd.RDD
 
-trait Optimizer {
+import org.apache.spark.mllib.linalg.Vector
+
+trait Optimizer extends Serializable {
 
   /**
    * Solve the provided convex optimization problem. 
    */
-  def optimize(data: RDD[(Double, Array[Double])], initialWeights: Array[Double]): Array[Double]
-
+  def optimize(data: RDD[(Double, Vector)], initialWeights: Vector): Vector
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/9c65fa76/mllib/src/main/scala/org/apache/spark/mllib/optimization/Updater.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Updater.scala b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Updater.scala
index bf8f731..3b7754c 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/optimization/Updater.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/optimization/Updater.scala
@@ -18,7 +18,10 @@
 package org.apache.spark.mllib.optimization
 
 import scala.math._
-import org.jblas.DoubleMatrix
+
+import breeze.linalg.{norm => brzNorm, axpy => brzAxpy, Vector => BV}
+
+import org.apache.spark.mllib.linalg.{Vectors, Vector}
 
 /**
  * Class used to perform steps (weight update) using Gradient Descent methods.
@@ -47,8 +50,12 @@ abstract class Updater extends Serializable {
    * @return A tuple of 2 elements. The first element is a column matrix containing updated weights,
    *         and the second element is the regularization value computed using updated weights.
    */
-  def compute(weightsOld: DoubleMatrix, gradient: DoubleMatrix, stepSize: Double, iter: Int,
-      regParam: Double): (DoubleMatrix, Double)
+  def compute(
+      weightsOld: Vector,
+      gradient: Vector,
+      stepSize: Double,
+      iter: Int,
+      regParam: Double): (Vector, Double)
 }
 
 /**
@@ -56,11 +63,17 @@ abstract class Updater extends Serializable {
  * Uses a step-size decreasing with the square root of the number of iterations.
  */
 class SimpleUpdater extends Updater {
-  override def compute(weightsOld: DoubleMatrix, gradient: DoubleMatrix,
-      stepSize: Double, iter: Int, regParam: Double): (DoubleMatrix, Double) = {
+  override def compute(
+      weightsOld: Vector,
+      gradient: Vector,
+      stepSize: Double,
+      iter: Int,
+      regParam: Double): (Vector, Double) = {
     val thisIterStepSize = stepSize / math.sqrt(iter)
-    val step = gradient.mul(thisIterStepSize)
-    (weightsOld.sub(step), 0)
+    val brzWeights: BV[Double] = weightsOld.toBreeze.toDenseVector
+    brzAxpy(-thisIterStepSize, gradient.toBreeze, brzWeights)
+
+    (Vectors.fromBreeze(brzWeights), 0)
   }
 }
 
@@ -83,19 +96,26 @@ class SimpleUpdater extends Updater {
  * Equivalently, set weight component to signum(w) * max(0.0, abs(w) - shrinkageVal)
  */
 class L1Updater extends Updater {
-  override def compute(weightsOld: DoubleMatrix, gradient: DoubleMatrix,
-      stepSize: Double, iter: Int, regParam: Double): (DoubleMatrix, Double) = {
+  override def compute(
+      weightsOld: Vector,
+      gradient: Vector,
+      stepSize: Double,
+      iter: Int,
+      regParam: Double): (Vector, Double) = {
     val thisIterStepSize = stepSize / math.sqrt(iter)
-    val step = gradient.mul(thisIterStepSize)
     // Take gradient step
-    val newWeights = weightsOld.sub(step)
+    val brzWeights: BV[Double] = weightsOld.toBreeze.toDenseVector
+    brzAxpy(-thisIterStepSize, gradient.toBreeze, brzWeights)
     // Apply proximal operator (soft thresholding)
     val shrinkageVal = regParam * thisIterStepSize
-    (0 until newWeights.length).foreach { i =>
-      val wi = newWeights.get(i)
-      newWeights.put(i, signum(wi) * max(0.0, abs(wi) - shrinkageVal))
+    var i = 0
+    while (i < brzWeights.length) {
+      val wi = brzWeights(i)
+      brzWeights(i) = signum(wi) * max(0.0, abs(wi) - shrinkageVal)
+      i += 1
     }
-    (newWeights, newWeights.norm1 * regParam)
+
+    (Vectors.fromBreeze(brzWeights), brzNorm(brzWeights, 1.0) * regParam)
   }
 }
 
@@ -105,16 +125,23 @@ class L1Updater extends Updater {
  * Uses a step-size decreasing with the square root of the number of iterations.
  */
 class SquaredL2Updater extends Updater {
-  override def compute(weightsOld: DoubleMatrix, gradient: DoubleMatrix,
-      stepSize: Double, iter: Int, regParam: Double): (DoubleMatrix, Double) = {
-    val thisIterStepSize = stepSize / math.sqrt(iter)
-    val step = gradient.mul(thisIterStepSize)
+  override def compute(
+      weightsOld: Vector,
+      gradient: Vector,
+      stepSize: Double,
+      iter: Int,
+      regParam: Double): (Vector, Double) = {
     // add up both updates from the gradient of the loss (= step) as well as
     // the gradient of the regularizer (= regParam * weightsOld)
     // w' = w - thisIterStepSize * (gradient + regParam * w)
     // w' = (1 - thisIterStepSize * regParam) * w - thisIterStepSize * gradient
-    val newWeights = weightsOld.mul(1.0 - thisIterStepSize * regParam).sub(step)
-    (newWeights, 0.5 * pow(newWeights.norm2, 2.0) * regParam)
+    val thisIterStepSize = stepSize / math.sqrt(iter)
+    val brzWeights: BV[Double] = weightsOld.toBreeze.toDenseVector
+    brzWeights :*= (1.0 - thisIterStepSize * regParam)
+    brzAxpy(-thisIterStepSize, gradient.toBreeze, brzWeights)
+    val norm = brzNorm(brzWeights, 2.0)
+
+    (Vectors.fromBreeze(brzWeights), 0.5 * regParam * norm * norm)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/9c65fa76/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala
index 3e1ed91..80dc0f1 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/GeneralizedLinearAlgorithm.scala
@@ -17,11 +17,12 @@
 
 package org.apache.spark.mllib.regression
 
+import breeze.linalg.{DenseVector => BDV, SparseVector => BSV}
+
 import org.apache.spark.{Logging, SparkException}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.mllib.optimization._
-
-import org.jblas.DoubleMatrix
+import org.apache.spark.mllib.linalg.{Vectors, Vector}
 
 /**
  * GeneralizedLinearModel (GLM) represents a model trained using 
@@ -31,12 +32,9 @@ import org.jblas.DoubleMatrix
  * @param weights Weights computed for every feature.
  * @param intercept Intercept computed for this model.
  */
-abstract class GeneralizedLinearModel(val weights: Array[Double], val intercept: Double)
+abstract class GeneralizedLinearModel(val weights: Vector, val intercept: Double)
   extends Serializable {
 
-  // Create a column vector that can be used for predictions
-  private val weightsMatrix = new DoubleMatrix(weights.length, 1, weights:_*)
-
   /**
    * Predict the result given a data point and the weights learned.
    * 
@@ -44,8 +42,7 @@ abstract class GeneralizedLinearModel(val weights: Array[Double], val intercept:
    * @param weightMatrix Column vector containing the weights of the model
    * @param intercept Intercept of the model.
    */
-  def predictPoint(dataMatrix: DoubleMatrix, weightMatrix: DoubleMatrix,
-    intercept: Double): Double
+  protected def predictPoint(dataMatrix: Vector, weightMatrix: Vector, intercept: Double): Double
 
   /**
    * Predict values for the given data set using the model trained.
@@ -53,16 +50,13 @@ abstract class GeneralizedLinearModel(val weights: Array[Double], val intercept:
    * @param testData RDD representing data points to be predicted
    * @return RDD[Double] where each entry contains the corresponding prediction
    */
-  def predict(testData: RDD[Array[Double]]): RDD[Double] = {
+  def predict(testData: RDD[Vector]): RDD[Double] = {
     // A small optimization to avoid serializing the entire model. Only the weightsMatrix
     // and intercept is needed.
-    val localWeights = weightsMatrix
+    val localWeights = weights
     val localIntercept = intercept
 
-    testData.map { x =>
-      val dataMatrix = new DoubleMatrix(1, x.length, x:_*)
-      predictPoint(dataMatrix, localWeights, localIntercept)
-    }
+    testData.map(v => predictPoint(v, localWeights, localIntercept))
   }
 
   /**
@@ -71,14 +65,13 @@ abstract class GeneralizedLinearModel(val weights: Array[Double], val intercept:
    * @param testData array representing a single data point
    * @return Double prediction from the trained model
    */
-  def predict(testData: Array[Double]): Double = {
-    val dataMat = new DoubleMatrix(1, testData.length, testData:_*)
-    predictPoint(dataMat, weightsMatrix, intercept)
+  def predict(testData: Vector): Double = {
+    predictPoint(testData, weights, intercept)
   }
 }
 
 /**
- * GeneralizedLinearAlgorithm implements methods to train a Genearalized Linear Model (GLM).
+ * GeneralizedLinearAlgorithm implements methods to train a Generalized Linear Model (GLM).
  * This class should be extended with an Optimizer to create a new GLM.
  */
 abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel]
@@ -88,6 +81,7 @@ abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel]
 
   val optimizer: Optimizer
 
+  /** Whether to add intercept (default: true). */
   protected var addIntercept: Boolean = true
 
   protected var validateData: Boolean = true
@@ -95,7 +89,7 @@ abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel]
   /**
    * Create a model given the weights and intercept
    */
-  protected def createModel(weights: Array[Double], intercept: Double): M
+  protected def createModel(weights: Vector, intercept: Double): M
 
   /**
    * Set if the algorithm should add an intercept. Default true.
@@ -117,17 +111,27 @@ abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel]
    * Run the algorithm with the configured parameters on an input
    * RDD of LabeledPoint entries.
    */
-  def run(input: RDD[LabeledPoint]) : M = {
-    val nfeatures: Int = input.first().features.length
-    val initialWeights = new Array[Double](nfeatures)
+  def run(input: RDD[LabeledPoint]): M = {
+    val numFeatures: Int = input.first().features.size
+    val initialWeights = Vectors.dense(new Array[Double](numFeatures))
     run(input, initialWeights)
   }
 
+  /** Prepends one to the input vector. */
+  private def prependOne(vector: Vector): Vector = {
+    val vector1 = vector.toBreeze match {
+      case dv: BDV[Double] => BDV.vertcat(BDV.ones[Double](1), dv)
+      case sv: BSV[Double] => BSV.vertcat(new BSV[Double](Array(0), Array(1.0), 1), sv)
+      case v: Any => throw new IllegalArgumentException("Do not support vector type " + v.getClass)
+    }
+    Vectors.fromBreeze(vector1)
+  }
+
   /**
    * Run the algorithm with the configured parameters on an input RDD
    * of LabeledPoint entries starting from the initial weights provided.
    */
-  def run(input: RDD[LabeledPoint], initialWeights: Array[Double]) : M = {
+  def run(input: RDD[LabeledPoint], initialWeights: Vector): M = {
 
     // Check the data properties before running the optimizer
     if (validateData && !validators.forall(func => func(input))) {
@@ -136,27 +140,26 @@ abstract class GeneralizedLinearAlgorithm[M <: GeneralizedLinearModel]
 
     // Prepend an extra variable consisting of all 1.0's for the intercept.
     val data = if (addIntercept) {
-      input.map(labeledPoint => (labeledPoint.label, 1.0 +: labeledPoint.features))
+      input.map(labeledPoint => (labeledPoint.label, prependOne(labeledPoint.features)))
     } else {
       input.map(labeledPoint => (labeledPoint.label, labeledPoint.features))
     }
 
     val initialWeightsWithIntercept = if (addIntercept) {
-      0.0 +: initialWeights
+      prependOne(initialWeights)
     } else {
       initialWeights
     }
 
     val weightsWithIntercept = optimizer.optimize(data, initialWeightsWithIntercept)
 
-    val (intercept, weights) = if (addIntercept) {
-      (weightsWithIntercept(0), weightsWithIntercept.tail)
-    } else {
-      (0.0, weightsWithIntercept)
-    }
-
-    logInfo("Final weights " + weights.mkString(","))
-    logInfo("Final intercept " + intercept)
+    val intercept = if (addIntercept) weightsWithIntercept(0) else 0.0
+    val weights =
+      if (addIntercept) {
+        Vectors.dense(weightsWithIntercept.toArray.slice(1, weightsWithIntercept.size))
+      } else {
+        weightsWithIntercept
+      }
 
     createModel(weights, intercept)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/9c65fa76/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala
index 1a18292..3deab1a 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/LabeledPoint.scala
@@ -17,14 +17,16 @@
 
 package org.apache.spark.mllib.regression
 
+import org.apache.spark.mllib.linalg.Vector
+
 /**
  * Class that represents the features and labels of a data point.
  *
  * @param label Label for this data point.
  * @param features List of features for this data point.
  */
-case class LabeledPoint(label: Double, features: Array[Double]) {
+case class LabeledPoint(label: Double, features: Vector) {
   override def toString: String = {
-    "LabeledPoint(%s, %s)".format(label, features.mkString("[", ", ", "]"))
+    "LabeledPoint(%s, %s)".format(label, features)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/9c65fa76/mllib/src/main/scala/org/apache/spark/mllib/regression/Lasso.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/Lasso.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/Lasso.scala
index be63ce8..25920d0 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/regression/Lasso.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/Lasso.scala
@@ -17,12 +17,11 @@
 
 package org.apache.spark.mllib.regression
 
-import org.apache.spark.{Logging, SparkContext}
-import org.apache.spark.rdd.RDD
+import org.apache.spark.SparkContext
+import org.apache.spark.mllib.linalg.Vector
 import org.apache.spark.mllib.optimization._
 import org.apache.spark.mllib.util.MLUtils
-
-import org.jblas.DoubleMatrix
+import org.apache.spark.rdd.RDD
 
 /**
  * Regression model trained using Lasso.
@@ -31,16 +30,16 @@ import org.jblas.DoubleMatrix
  * @param intercept Intercept computed for this model.
  */
 class LassoModel(
-    override val weights: Array[Double],
+    override val weights: Vector,
     override val intercept: Double)
   extends GeneralizedLinearModel(weights, intercept)
   with RegressionModel with Serializable {
 
-  override def predictPoint(
-      dataMatrix: DoubleMatrix,
-      weightMatrix: DoubleMatrix,
+  override protected def predictPoint(
+      dataMatrix: Vector,
+      weightMatrix: Vector,
       intercept: Double): Double = {
-    dataMatrix.dot(weightMatrix) + intercept
+    weightMatrix.toBreeze.dot(dataMatrix.toBreeze) + intercept
   }
 }
 
@@ -57,8 +56,7 @@ class LassoWithSGD private (
     var numIterations: Int,
     var regParam: Double,
     var miniBatchFraction: Double)
-  extends GeneralizedLinearAlgorithm[LassoModel]
-  with Serializable {
+  extends GeneralizedLinearAlgorithm[LassoModel] with Serializable {
 
   val gradient = new LeastSquaresGradient()
   val updater = new L1Updater()
@@ -70,10 +68,6 @@ class LassoWithSGD private (
   // We don't want to penalize the intercept, so set this to false.
   super.setIntercept(false)
 
-  var yMean = 0.0
-  var xColMean: DoubleMatrix = _
-  var xColSd: DoubleMatrix = _
-
   /**
    * Construct a Lasso object with default parameters
    */
@@ -85,36 +79,8 @@ class LassoWithSGD private (
     this
   }
 
-  override def createModel(weights: Array[Double], intercept: Double) = {
-    val weightsMat = new DoubleMatrix(weights.length, 1, weights: _*)
-    val weightsScaled = weightsMat.div(xColSd)
-    val interceptScaled = yMean - weightsMat.transpose().mmul(xColMean.div(xColSd)).get(0)
-
-    new LassoModel(weightsScaled.data, interceptScaled)
-  }
-
-  override def run(
-      input: RDD[LabeledPoint],
-      initialWeights: Array[Double])
-    : LassoModel =
-  {
-    val nfeatures: Int = input.first.features.length
-    val nexamples: Long = input.count()
-
-    // To avoid penalizing the intercept, we center and scale the data.
-    val stats = MLUtils.computeStats(input, nfeatures, nexamples)
-    yMean = stats._1
-    xColMean = stats._2
-    xColSd = stats._3
-
-    val normalizedData = input.map { point =>
-      val yNormalized = point.label - yMean
-      val featuresMat = new DoubleMatrix(nfeatures, 1, point.features:_*)
-      val featuresNormalized = featuresMat.sub(xColMean).divi(xColSd)
-      LabeledPoint(yNormalized, featuresNormalized.toArray)
-    }
-
-    super.run(normalizedData, initialWeights)
+  override protected def createModel(weights: Vector, intercept: Double) = {
+    new LassoModel(weights, intercept)
   }
 }
 
@@ -144,11 +110,9 @@ object LassoWithSGD {
       stepSize: Double,
       regParam: Double,
       miniBatchFraction: Double,
-      initialWeights: Array[Double])
-    : LassoModel =
-  {
-    new LassoWithSGD(stepSize, numIterations, regParam, miniBatchFraction).run(input,
-        initialWeights)
+      initialWeights: Vector): LassoModel = {
+    new LassoWithSGD(stepSize, numIterations, regParam, miniBatchFraction)
+      .run(input, initialWeights)
   }
 
   /**
@@ -168,9 +132,7 @@ object LassoWithSGD {
       numIterations: Int,
       stepSize: Double,
       regParam: Double,
-      miniBatchFraction: Double)
-    : LassoModel =
-  {
+      miniBatchFraction: Double): LassoModel = {
     new LassoWithSGD(stepSize, numIterations, regParam, miniBatchFraction).run(input)
   }
 
@@ -190,9 +152,7 @@ object LassoWithSGD {
       input: RDD[LabeledPoint],
       numIterations: Int,
       stepSize: Double,
-      regParam: Double)
-    : LassoModel =
-  {
+      regParam: Double): LassoModel = {
     train(input, numIterations, stepSize, regParam, 1.0)
   }
 
@@ -208,9 +168,7 @@ object LassoWithSGD {
    */
   def train(
       input: RDD[LabeledPoint],
-      numIterations: Int)
-    : LassoModel =
-  {
+      numIterations: Int): LassoModel = {
     train(input, numIterations, 1.0, 1.0, 1.0)
   }
 
@@ -222,7 +180,8 @@ object LassoWithSGD {
     val sc = new SparkContext(args(0), "Lasso")
     val data = MLUtils.loadLabeledData(sc, args(1))
     val model = LassoWithSGD.train(data, args(4).toInt, args(2).toDouble, args(3).toDouble)
-    println("Weights: " + model.weights.mkString("[", ", ", "]"))
+
+    println("Weights: " + model.weights)
     println("Intercept: " + model.intercept)
 
     sc.stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/9c65fa76/mllib/src/main/scala/org/apache/spark/mllib/regression/LinearRegression.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/LinearRegression.scala
index f5f15d1..9ed9279 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/regression/LinearRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/LinearRegression.scala
@@ -19,11 +19,10 @@ package org.apache.spark.mllib.regression
 
 import org.apache.spark.SparkContext
 import org.apache.spark.rdd.RDD
+import org.apache.spark.mllib.linalg.Vector
 import org.apache.spark.mllib.optimization._
 import org.apache.spark.mllib.util.MLUtils
 
-import org.jblas.DoubleMatrix
-
 /**
  * Regression model trained using LinearRegression.
  *
@@ -31,15 +30,15 @@ import org.jblas.DoubleMatrix
  * @param intercept Intercept computed for this model.
  */
 class LinearRegressionModel(
-    override val weights: Array[Double],
+    override val weights: Vector,
     override val intercept: Double)
   extends GeneralizedLinearModel(weights, intercept) with RegressionModel with Serializable {
 
-  override def predictPoint(
-      dataMatrix: DoubleMatrix,
-      weightMatrix: DoubleMatrix,
+  override protected def predictPoint(
+      dataMatrix: Vector,
+      weightMatrix: Vector,
       intercept: Double): Double = {
-    dataMatrix.dot(weightMatrix) + intercept
+    weightMatrix.toBreeze.dot(dataMatrix.toBreeze) + intercept
   }
 }
 
@@ -69,7 +68,7 @@ class LinearRegressionWithSGD private (
    */
   def this() = this(1.0, 100, 1.0)
 
-  override def createModel(weights: Array[Double], intercept: Double) = {
+  override protected def createModel(weights: Vector, intercept: Double) = {
     new LinearRegressionModel(weights, intercept)
   }
 }
@@ -98,11 +97,9 @@ object LinearRegressionWithSGD {
       numIterations: Int,
       stepSize: Double,
       miniBatchFraction: Double,
-      initialWeights: Array[Double])
-    : LinearRegressionModel =
-  {
-    new LinearRegressionWithSGD(stepSize, numIterations, miniBatchFraction).run(input,
-      initialWeights)
+      initialWeights: Vector): LinearRegressionModel = {
+    new LinearRegressionWithSGD(stepSize, numIterations, miniBatchFraction)
+      .run(input, initialWeights)
   }
 
   /**
@@ -120,9 +117,7 @@ object LinearRegressionWithSGD {
       input: RDD[LabeledPoint],
       numIterations: Int,
       stepSize: Double,
-      miniBatchFraction: Double)
-    : LinearRegressionModel =
-  {
+      miniBatchFraction: Double): LinearRegressionModel = {
     new LinearRegressionWithSGD(stepSize, numIterations, miniBatchFraction).run(input)
   }
 
@@ -140,9 +135,7 @@ object LinearRegressionWithSGD {
   def train(
       input: RDD[LabeledPoint],
       numIterations: Int,
-      stepSize: Double)
-    : LinearRegressionModel =
-  {
+      stepSize: Double): LinearRegressionModel = {
     train(input, numIterations, stepSize, 1.0)
   }
 
@@ -158,9 +151,7 @@ object LinearRegressionWithSGD {
    */
   def train(
       input: RDD[LabeledPoint],
-      numIterations: Int)
-    : LinearRegressionModel =
-  {
+      numIterations: Int): LinearRegressionModel = {
     train(input, numIterations, 1.0, 1.0)
   }
 
@@ -172,7 +163,7 @@ object LinearRegressionWithSGD {
     val sc = new SparkContext(args(0), "LinearRegression")
     val data = MLUtils.loadLabeledData(sc, args(1))
     val model = LinearRegressionWithSGD.train(data, args(3).toInt, args(2).toDouble)
-    println("Weights: " + model.weights.mkString("[", ", ", "]"))
+    println("Weights: " + model.weights)
     println("Intercept: " + model.intercept)
 
     sc.stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/9c65fa76/mllib/src/main/scala/org/apache/spark/mllib/regression/RegressionModel.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/RegressionModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/RegressionModel.scala
index 423afc3..5e4b8a3 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/regression/RegressionModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/RegressionModel.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.mllib.regression
 
 import org.apache.spark.rdd.RDD
+import org.apache.spark.mllib.linalg.Vector
 
 trait RegressionModel extends Serializable {
   /**
@@ -26,7 +27,7 @@ trait RegressionModel extends Serializable {
    * @param testData RDD representing data points to be predicted
    * @return RDD[Double] where each entry contains the corresponding prediction
    */
-  def predict(testData: RDD[Array[Double]]): RDD[Double]
+  def predict(testData: RDD[Vector]): RDD[Double]
 
   /**
    * Predict values for a single data point using the model trained.
@@ -34,5 +35,5 @@ trait RegressionModel extends Serializable {
    * @param testData array representing a single data point
    * @return Double prediction from the trained model
    */
-  def predict(testData: Array[Double]): Double
+  def predict(testData: Vector): Double
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/9c65fa76/mllib/src/main/scala/org/apache/spark/mllib/regression/RidgeRegression.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/regression/RidgeRegression.scala b/mllib/src/main/scala/org/apache/spark/mllib/regression/RidgeRegression.scala
index feb100f..1f17d21 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/regression/RidgeRegression.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/regression/RidgeRegression.scala
@@ -21,8 +21,7 @@ import org.apache.spark.SparkContext
 import org.apache.spark.rdd.RDD
 import org.apache.spark.mllib.optimization._
 import org.apache.spark.mllib.util.MLUtils
-
-import org.jblas.DoubleMatrix
+import org.apache.spark.mllib.linalg.Vector
 
 /**
  * Regression model trained using RidgeRegression.
@@ -31,16 +30,16 @@ import org.jblas.DoubleMatrix
  * @param intercept Intercept computed for this model.
  */
 class RidgeRegressionModel(
-    override val weights: Array[Double],
+    override val weights: Vector,
     override val intercept: Double)
   extends GeneralizedLinearModel(weights, intercept)
   with RegressionModel with Serializable {
 
-  override def predictPoint(
-      dataMatrix: DoubleMatrix,
-      weightMatrix: DoubleMatrix,
+  override protected def predictPoint(
+      dataMatrix: Vector,
+      weightMatrix: Vector,
       intercept: Double): Double = {
-    dataMatrix.dot(weightMatrix) + intercept
+    weightMatrix.toBreeze.dot(dataMatrix.toBreeze) + intercept
   }
 }
 
@@ -57,8 +56,7 @@ class RidgeRegressionWithSGD private (
     var numIterations: Int,
     var regParam: Double,
     var miniBatchFraction: Double)
-    extends GeneralizedLinearAlgorithm[RidgeRegressionModel]
-  with Serializable {
+    extends GeneralizedLinearAlgorithm[RidgeRegressionModel] with Serializable {
 
   val gradient = new LeastSquaresGradient()
   val updater = new SquaredL2Updater()
@@ -71,10 +69,6 @@ class RidgeRegressionWithSGD private (
   // We don't want to penalize the intercept in RidgeRegression, so set this to false.
   super.setIntercept(false)
 
-  var yMean = 0.0
-  var xColMean: DoubleMatrix = _
-  var xColSd: DoubleMatrix = _
-
   /**
    * Construct a RidgeRegression object with default parameters
    */
@@ -86,36 +80,8 @@ class RidgeRegressionWithSGD private (
     this
   }
 
-  override def createModel(weights: Array[Double], intercept: Double) = {
-    val weightsMat = new DoubleMatrix(weights.length, 1, weights: _*)
-    val weightsScaled = weightsMat.div(xColSd)
-    val interceptScaled = yMean - weightsMat.transpose().mmul(xColMean.div(xColSd)).get(0)
-
-    new RidgeRegressionModel(weightsScaled.data, interceptScaled)
-  }
-
-  override def run(
-      input: RDD[LabeledPoint],
-      initialWeights: Array[Double])
-    : RidgeRegressionModel =
-  {
-    val nfeatures: Int = input.first().features.length
-    val nexamples: Long = input.count()
-
-    // To avoid penalizing the intercept, we center and scale the data.
-    val stats = MLUtils.computeStats(input, nfeatures, nexamples)
-    yMean = stats._1
-    xColMean = stats._2
-    xColSd = stats._3
-
-    val normalizedData = input.map { point =>
-      val yNormalized = point.label - yMean
-      val featuresMat = new DoubleMatrix(nfeatures, 1, point.features:_*)
-      val featuresNormalized = featuresMat.sub(xColMean).divi(xColSd)
-      LabeledPoint(yNormalized, featuresNormalized.toArray)
-    }
-
-    super.run(normalizedData, initialWeights)
+  override protected def createModel(weights: Vector, intercept: Double) = {
+    new RidgeRegressionModel(weights, intercept)
   }
 }
 
@@ -144,9 +110,7 @@ object RidgeRegressionWithSGD {
       stepSize: Double,
       regParam: Double,
       miniBatchFraction: Double,
-      initialWeights: Array[Double])
-    : RidgeRegressionModel =
-  {
+      initialWeights: Vector): RidgeRegressionModel = {
     new RidgeRegressionWithSGD(stepSize, numIterations, regParam, miniBatchFraction).run(
       input, initialWeights)
   }
@@ -167,9 +131,7 @@ object RidgeRegressionWithSGD {
       numIterations: Int,
       stepSize: Double,
       regParam: Double,
-      miniBatchFraction: Double)
-    : RidgeRegressionModel =
-  {
+      miniBatchFraction: Double): RidgeRegressionModel = {
     new RidgeRegressionWithSGD(stepSize, numIterations, regParam, miniBatchFraction).run(input)
   }
 
@@ -188,9 +150,7 @@ object RidgeRegressionWithSGD {
       input: RDD[LabeledPoint],
       numIterations: Int,
       stepSize: Double,
-      regParam: Double)
-    : RidgeRegressionModel =
-  {
+      regParam: Double): RidgeRegressionModel = {
     train(input, numIterations, stepSize, regParam, 1.0)
   }
 
@@ -205,23 +165,22 @@ object RidgeRegressionWithSGD {
    */
   def train(
       input: RDD[LabeledPoint],
-      numIterations: Int)
-    : RidgeRegressionModel =
-  {
+      numIterations: Int): RidgeRegressionModel = {
     train(input, numIterations, 1.0, 1.0, 1.0)
   }
 
   def main(args: Array[String]) {
     if (args.length != 5) {
-      println("Usage: RidgeRegression <master> <input_dir> <step_size> <regularization_parameter>" +
-        " <niters>")
+      println("Usage: RidgeRegression <master> <input_dir> <step_size> " +
+        "<regularization_parameter> <niters>")
       System.exit(1)
     }
     val sc = new SparkContext(args(0), "RidgeRegression")
     val data = MLUtils.loadLabeledData(sc, args(1))
     val model = RidgeRegressionWithSGD.train(data, args(4).toInt, args(2).toDouble,
         args(3).toDouble)
-    println("Weights: " + model.weights.mkString("[", ", ", "]"))
+
+    println("Weights: " + model.weights)
     println("Intercept: " + model.intercept)
 
     sc.stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/9c65fa76/mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala
index 33205b9..dee9594 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/DecisionTree.scala
@@ -30,6 +30,7 @@ import org.apache.spark.mllib.tree.impurity.{Entropy, Gini, Impurity, Variance}
 import org.apache.spark.mllib.tree.model._
 import org.apache.spark.rdd.RDD
 import org.apache.spark.util.random.XORShiftRandom
+import org.apache.spark.mllib.linalg.{Vector, Vectors}
 
 /**
  * A class that implements a decision tree algorithm for classification and regression. It
@@ -295,7 +296,7 @@ object DecisionTree extends Serializable with Logging {
     val numNodes = scala.math.pow(2, level).toInt
     logDebug("numNodes = " + numNodes)
     // Find the number of features by looking at the first sample.
-    val numFeatures = input.first().features.length
+    val numFeatures = input.first().features.size
     logDebug("numFeatures = " + numFeatures)
     val numBins = bins(0).length
     logDebug("numBins = " + numBins)
@@ -902,7 +903,7 @@ object DecisionTree extends Serializable with Logging {
     val count = input.count()
 
     // Find the number of features by looking at the first sample
-    val numFeatures = input.take(1)(0).features.length
+    val numFeatures = input.take(1)(0).features.size
 
     val maxBins = strategy.maxBins
     val numBins = if (maxBins <= count) maxBins else count.toInt
@@ -1116,7 +1117,7 @@ object DecisionTree extends Serializable with Logging {
     sc.textFile(dir).map { line =>
       val parts = line.trim().split(",")
       val label = parts(0).toDouble
-      val features = parts.slice(1,parts.length).map(_.toDouble)
+      val features = Vectors.dense(parts.slice(1,parts.length).map(_.toDouble))
       LabeledPoint(label, features)
     }
   }
@@ -1127,7 +1128,7 @@ object DecisionTree extends Serializable with Logging {
    */
   private def accuracyScore(model: DecisionTreeModel, data: RDD[LabeledPoint],
       threshold: Double = 0.5): Double = {
-    def predictedValue(features: Array[Double]) = {
+    def predictedValue(features: Vector) = {
       if (model.predict(features) < threshold) 0.0 else 1.0
     }
     val correctCount = data.filter(y => predictedValue(y.features) == y.label).count()

http://git-wip-us.apache.org/repos/asf/spark/blob/9c65fa76/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala
index a8bbf21..a6dca84 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/DecisionTreeModel.scala
@@ -19,6 +19,7 @@ package org.apache.spark.mllib.tree.model
 
 import org.apache.spark.mllib.tree.configuration.Algo._
 import org.apache.spark.rdd.RDD
+import org.apache.spark.mllib.linalg.Vector
 
 /**
  * Model to store the decision tree parameters
@@ -33,7 +34,7 @@ class DecisionTreeModel(val topNode: Node, val algo: Algo) extends Serializable
    * @param features array representing a single data point
    * @return Double prediction from the trained model
    */
-  def predict(features: Array[Double]): Double = {
+  def predict(features: Vector): Double = {
     topNode.predictIfLeaf(features)
   }
 
@@ -43,7 +44,7 @@ class DecisionTreeModel(val topNode: Node, val algo: Algo) extends Serializable
    * @param features RDD representing data points to be predicted
    * @return RDD[Int] where each entry contains the corresponding prediction
    */
-  def predict(features: RDD[Array[Double]]): RDD[Double] = {
+  def predict(features: RDD[Vector]): RDD[Double] = {
     features.map(x => predict(x))
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/9c65fa76/mllib/src/main/scala/org/apache/spark/mllib/tree/model/Node.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/Node.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/Node.scala
index ea4693c..aac3f9c 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/tree/model/Node.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/model/Node.scala
@@ -19,6 +19,7 @@ package org.apache.spark.mllib.tree.model
 
 import org.apache.spark.Logging
 import org.apache.spark.mllib.tree.configuration.FeatureType._
+import org.apache.spark.mllib.linalg.Vector
 
 /**
  * Node in a decision tree
@@ -54,8 +55,8 @@ class Node (
     logDebug("stats = " + stats)
     logDebug("predict = " + predict)
     if (!isLeaf) {
-      val leftNodeIndex = id*2 + 1
-      val rightNodeIndex = id*2 + 2
+      val leftNodeIndex = id * 2 + 1
+      val rightNodeIndex = id * 2 + 2
       leftNode = Some(nodes(leftNodeIndex))
       rightNode = Some(nodes(rightNodeIndex))
       leftNode.get.build(nodes)
@@ -68,7 +69,7 @@ class Node (
    * @param feature feature value
    * @return predicted value
    */
-  def predictIfLeaf(feature: Array[Double]) : Double = {
+  def predictIfLeaf(feature: Vector) : Double = {
     if (isLeaf) {
       predict
     } else{

http://git-wip-us.apache.org/repos/asf/spark/blob/9c65fa76/mllib/src/main/scala/org/apache/spark/mllib/util/LinearDataGenerator.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/LinearDataGenerator.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/LinearDataGenerator.scala
index 2e03684..81e4eda 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/util/LinearDataGenerator.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/util/LinearDataGenerator.scala
@@ -24,6 +24,7 @@ import org.jblas.DoubleMatrix
 
 import org.apache.spark.SparkContext
 import org.apache.spark.rdd.RDD
+import org.apache.spark.mllib.linalg.Vectors
 import org.apache.spark.mllib.regression.LabeledPoint
 
 /**
@@ -74,7 +75,7 @@ object LinearDataGenerator {
     val y = x.map { xi =>
       new DoubleMatrix(1, xi.length, xi: _*).dot(weightsMat) + intercept + eps * rnd.nextGaussian()
     }
-    y.zip(x).map(p => LabeledPoint(p._1, p._2))
+    y.zip(x).map(p => LabeledPoint(p._1, Vectors.dense(p._2)))
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/9c65fa76/mllib/src/main/scala/org/apache/spark/mllib/util/LogisticRegressionDataGenerator.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/LogisticRegressionDataGenerator.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/LogisticRegressionDataGenerator.scala
index 52c4a71..61498dc 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/util/LogisticRegressionDataGenerator.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/util/LogisticRegressionDataGenerator.scala
@@ -22,6 +22,7 @@ import scala.util.Random
 import org.apache.spark.SparkContext
 import org.apache.spark.rdd.RDD
 import org.apache.spark.mllib.regression.LabeledPoint
+import org.apache.spark.mllib.linalg.Vectors
 
 /**
  * Generate test data for LogisticRegression. This class chooses positive labels
@@ -54,7 +55,7 @@ object LogisticRegressionDataGenerator {
       val x = Array.fill[Double](nfeatures) {
         rnd.nextGaussian() + (y * eps)
       }
-      LabeledPoint(y, x)
+      LabeledPoint(y, Vectors.dense(x))
     }
     data
   }