You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by yinxusen <gi...@git.apache.org> on 2014/03/29 04:42:56 UTC

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

GitHub user yinxusen opened a pull request:

    https://github.com/apache/spark/pull/268

    [WIP] [SPARK-1328] Add vector statistics

    As with the new vector system in MLlib, we find that it is good to add some new APIs to precess the `RDD[Vector]`. Beside, the former implementation of `computeStat` is not stable which could loss precision, and has the possibility to cause `Nan` in scientific computing, just as said in the [SPARK-1328](https://spark-project.atlassian.net/browse/SPARK-1328).
    
    APIs contain:
    
    * rowMeans(): RDD[Double]
    * rowNorm2(): RDD[Double]
    * rowSDs(): RDD[Double]
    * colMeans(): Vector
    * colMeans(size: Int): Vector
    * colNorm2(): Vector
    * colNorm2(size: Int): Vector
    * colSDs(): Vector
    * colSDs(size: Int): Vector
    * maxOption((Vector, Vector) => Boolean): Option[Vector]
    * minOption((Vector, Vector) => Boolean): Option[Vector]
    * rowShrink(): RDD[Vector]
    * colShrink(): RDD[Vector]
    
    This is working in process now, and some more APIs will add to `LabeledPoint`. Moreover, the implicit declaration will move from `MLUtils` to `MLContext` later.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/yinxusen/spark vector-statistics

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/268.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #268
    
----
commit cae6c9e0a9307c9102fddd864f879ef1f11407b2
Author: Xusen Yin <yi...@gmail.com>
Date:   2014-03-28T03:40:43Z

    add basic statistics

commit 317f2c1e52b3f3eb91ecf685faeb30790045b803
Author: Xusen Yin <yi...@gmail.com>
Date:   2014-03-28T10:23:54Z

    add new API to shrink RDD[Vector]

commit 6243332579a33fee58aed1cd6c35c525aef5b90c
Author: Xusen Yin <yi...@gmail.com>
Date:   2014-03-29T01:25:35Z

    fix error of column means

commit 6f07e17f680d4d8e4d190e265f058178b90138d0
Author: Xusen Yin <yi...@gmail.com>
Date:   2014-03-29T01:42:39Z

    pass all tests

commit 2a5ed37cf5b2b26dbf1c94203e419171d664b86d
Author: Xusen Yin <yi...@gmail.com>
Date:   2014-03-29T02:48:44Z

    add scala docs and refine shrink method

commit 95dbc6e1288914234bafa43b6ace662847b9242c
Author: Xusen Yin <yi...@gmail.com>
Date:   2014-03-29T03:08:33Z

    add shrink test

commit ed6fdf836275832e167025d848eaeb28a2538cfa
Author: Xusen Yin <yi...@gmail.com>
Date:   2014-03-29T03:40:03Z

    refine the code style

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/268#issuecomment-39206333
  
     Build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/268#issuecomment-40152554
  
    Merged build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/268#issuecomment-39346844
  
     Build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/268#issuecomment-40031809
  
     Build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by yinxusen <gi...@git.apache.org>.
Github user yinxusen commented on the pull request:

    https://github.com/apache/spark/pull/268#issuecomment-39174557
  
    @mengxr Ah... I totally understand your mean. Code is on the way.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/268#issuecomment-39527117
  
     Build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on the pull request:

    https://github.com/apache/spark/pull/268#issuecomment-39034276
  
    @yinxusen I want to see whether we can improve the performance. First of all, the complexity should be `O(nnz)` instead of `O(n d)`. This is a little bit tricky. Basically, we don't need to update zeros for a sparse vector. Suppose we have the following column:
    
    ~~~
    1.0
    0.0
    2.0
    0.0
    3.0
    0.0
    0.0
    ~~~
    
    To apply the formula for mean/variance, you need to do the update at the second row. This is actually not necessary because the summary statistics are invariant to the ordering. Imagine we have this column re-arranged:
    ~~~
    1.0
    2.0
    3.0
    0.0
    0.0
    0.0
    0.0
    ~~~
    which still have the same summary statistics. We can skip all zeros and do mean/variance updates based on the non-zero count instead of global count. After `aggregate`, we know we need to accumulate 4 zeros to the column, which is a constant-time operation.
    
    I'm okay if you don't have enough time to make the change in this PR. Put a TODO and we can fix it later.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on the pull request:

    https://github.com/apache/spark/pull/268#issuecomment-40005107
  
    Btw, I used equal in tests because the results should be exact with the numbers there. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on the pull request:

    https://github.com/apache/spark/pull/268#issuecomment-39941590
  
    @yinxusen #296 was merged. Could you move the method `computeSummaryStatistics` to `RowMatrix`? The return type should be renamed to either `MultivariateStatisticalSummary` or `MultivariateSummaryStatistics`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on the pull request:

    https://github.com/apache/spark/pull/268#issuecomment-39116485
  
    @yinxusen I mean a column. You don't need to check the type as you already use `activeIterator` to get the non-zero elements. The approach I suggested is to only update mean/variance/min/max for non-zeros. After we scan all the items, we know how many zeros we need to append to each column, and then we do a constant-time update for each column.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by yinxusen <gi...@git.apache.org>.
Github user yinxusen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/268#discussion_r11196822
  
    --- Diff: mllib/src/test/scala/org/apache/spark/mllib/rdd/VectorRDDFunctionsSuite.scala ---
    @@ -0,0 +1,87 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.mllib.rdd
    +
    +import scala.collection.mutable.ArrayBuffer
    +
    +import org.scalatest.FunSuite
    +
    +import org.apache.spark.mllib.linalg.{Vector, Vectors}
    +import org.apache.spark.mllib.util.LocalSparkContext
    +import org.apache.spark.mllib.util.MLUtils._
    +
    +/**
    + * Test suite for the summary statistics of RDD[Vector]. Both the accuracy and the time consuming
    + * between dense and sparse vector are tested.
    + */
    +class VectorRDDFunctionsSuite extends FunSuite with LocalSparkContext {
    +  import VectorRDDFunctionsSuite._
    +
    +  val localData = Array(
    +    Vectors.dense(1.0, 2.0, 3.0),
    +    Vectors.dense(4.0, 5.0, 6.0),
    +    Vectors.dense(7.0, 8.0, 9.0)
    +  )
    +
    +  val sparseData = ArrayBuffer(Vectors.sparse(20, Seq((0, 1.0), (9, 2.0), (10, 7.0))))
    +  for (i <- 0 until 10000) sparseData += Vectors.sparse(20, Seq((9, 0.0)))
    +  sparseData += Vectors.sparse(20, Seq((0, 5.0), (9, 13.0), (16, 2.0)))
    +  sparseData += Vectors.sparse(20, Seq((3, 5.0), (9, 13.0), (18, 2.0)))
    +
    +  test("full-statistics") {
    +    val data = sc.parallelize(localData, 2)
    +    val (VectorRDDStatisticalAggregator(mean, variance, cnt, nnz, max, min), denseTime) =
    +      time(data.summarizeStatistics())
    +
    +    assert(equivVector(Vectors.fromBreeze(mean), Vectors.dense(4.0, 5.0, 6.0)),
    +      "Column mean do not match.")
    +    assert(equivVector(Vectors.fromBreeze(variance), Vectors.dense(6.0, 6.0, 6.0)),
    +      "Column variance do not match.")
    +    assert(cnt === 3.0, "Column cnt do not match.")
    +    assert(equivVector(Vectors.fromBreeze(nnz), Vectors.dense(3.0, 3.0, 3.0)),
    +      "Column nnz do not match.")
    +    assert(equivVector(Vectors.fromBreeze(max), Vectors.dense(7.0, 8.0, 9.0)),
    +      "Column max do not match.")
    +    assert(equivVector(Vectors.fromBreeze(min), Vectors.dense(1.0, 2.0, 3.0)),
    +      "Column min do not match.")
    +
    +    val dataForSparse = sc.parallelize(sparseData.toSeq, 2)
    +    val (_, sparseTime) = time(dataForSparse.summarizeStatistics())
    +
    +    println(s"dense time is $denseTime, sparse time is $sparseTime.")
    +    assert(relativeTime(denseTime, sparseTime),
    --- End diff --
    
    If I do both "remove the `println()`" and "do not `assert()` on running times", then I can also remove the computing of `denseTime` and `sparseTime`. How to assert the running time for computing statistical summary of sparse vectors to be O(nnz) but not O(n d) in this way?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/268#discussion_r11235878
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/VectorRDDFunctions.scala ---
    @@ -0,0 +1,179 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.rdd
    +
    +import breeze.linalg.{axpy, Vector => BV}
    +
    +import org.apache.spark.mllib.linalg.{Vectors, Vector}
    +import org.apache.spark.rdd.RDD
    +
    +/**
    + * Trait of the summary statistics, including mean, variance, count, max, min, and non-zero elements
    + * count.
    + */
    +trait VectorRDDStatisticalSummary {
    +  def mean: Vector
    +  def variance: Vector
    +  def totalCount: Long
    +  def numNonZeros: Vector
    +  def max: Vector
    +  def min: Vector
    +}
    +
    +/**
    + * Aggregates [[org.apache.spark.mllib.rdd.VectorRDDStatisticalSummary VectorRDDStatisticalSummary]]
    + * together with add() and merge() function.
    + */
    +private class Aggregator(
    +    val currMean: BV[Double],
    --- End diff --
    
    Use BDV[Double] because you know they are dense. It might affect which implicit the compiler picks up.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/268#issuecomment-39284940
  
    Build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/268#discussion_r11193353
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/VectorRDDFunctions.scala ---
    @@ -0,0 +1,156 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.mllib.rdd
    +
    +import breeze.linalg.{axpy, Vector => BV}
    +
    +import org.apache.spark.mllib.linalg.Vector
    +import org.apache.spark.rdd.RDD
    +
    +/**
    + * Case class of the summary statistics, including mean, variance, count, max, min, and non-zero
    + * elements count.
    + */
    +case class VectorRDDStatisticalAggregator(
    +    mean: BV[Double],
    +    statCounter: BV[Double],
    +    totalCount: Double,
    +    numNonZeros: BV[Double],
    +    max: BV[Double],
    +    min: BV[Double])
    +
    +/**
    + * Extra functions available on RDDs of [[org.apache.spark.mllib.linalg.Vector Vector]] through an
    + * implicit conversion. Import `org.apache.spark.MLContext._` at the top of your program to use
    + * these functions.
    + */
    +class VectorRDDFunctions(self: RDD[Vector]) extends Serializable {
    +
    +  /**
    +   * Aggregate function used for aggregating elements in a worker together.
    +   */
    +  private def seqOp(
    +      aggregator: VectorRDDStatisticalAggregator,
    +      currData: BV[Double]): VectorRDDStatisticalAggregator = {
    +    aggregator match {
    +      case VectorRDDStatisticalAggregator(prevMean, prevM2n, cnt, nnzVec, maxVec, minVec) =>
    +        currData.activeIterator.foreach {
    +          case (id, 0.0) =>
    +          case (id, value) =>
    +            if (maxVec(id) < value) maxVec(id) = value
    +            if (minVec(id) > value) minVec(id) = value
    +
    +            val tmpPrevMean = prevMean(id)
    +            prevMean(id) = (prevMean(id) * cnt + value) / (cnt + 1.0)
    +            prevM2n(id) += (value - prevMean(id)) * (value - tmpPrevMean)
    +
    +            nnzVec(id) += 1.0
    +        }
    +
    +        VectorRDDStatisticalAggregator(
    +          prevMean,
    +          prevM2n,
    +          cnt + 1.0,
    +          nnzVec,
    +          maxVec,
    +          minVec)
    +    }
    +  }
    +
    +  /**
    +   * Combine function used for combining intermediate results together from every worker.
    +   */
    +  private def combOp(
    +      statistics1: VectorRDDStatisticalAggregator,
    +      statistics2: VectorRDDStatisticalAggregator): VectorRDDStatisticalAggregator = {
    +    (statistics1, statistics2) match {
    +      case (VectorRDDStatisticalAggregator(mean1, m2n1, cnt1, nnz1, max1, min1),
    +            VectorRDDStatisticalAggregator(mean2, m2n2, cnt2, nnz2, max2, min2)) =>
    +        val totalCnt = cnt1 + cnt2
    +        val deltaMean = mean2 - mean1
    +
    +        mean2.activeIterator.foreach {
    +          case (id, 0.0) =>
    +          case (id, value) =>
    +            mean1(id) = (mean1(id) * nnz1(id) + mean2(id) * nnz2(id)) / (nnz1(id) + nnz2(id))
    +        }
    +
    +        m2n2.activeIterator.foreach {
    +          case (id, 0.0) =>
    +          case (id, value) =>
    +            m2n1(id) +=
    +              value + deltaMean(id) * deltaMean(id) * nnz1(id) * nnz2(id) / (nnz1(id)+nnz2(id))
    +        }
    +
    +        max2.activeIterator.foreach {
    +          case (id, value) =>
    +            if (max1(id) < value) max1(id) = value
    +        }
    +
    +        min2.activeIterator.foreach {
    +          case (id, value) =>
    +            if (min1(id) > value) min1(id) = value
    +        }
    +
    +        axpy(1.0, nnz2, nnz1)
    +        VectorRDDStatisticalAggregator(mean1, m2n1, totalCnt, nnz1, max1, min1)
    +    }
    +  }
    +
    +  /**
    +   * Compute full column-wise statistics for the RDD with the size of Vector as input parameter.
    +   */
    +  def summarizeStatistics(): VectorRDDStatisticalAggregator = {
    +    val size = self.take(1).head.size
    +    val zeroValue = VectorRDDStatisticalAggregator(
    +      BV.zeros[Double](size),
    +      BV.zeros[Double](size),
    +      0.0,
    +      BV.zeros[Double](size),
    +      BV.fill(size)(Double.MinValue),
    +      BV.fill(size)(Double.MaxValue))
    +
    +    val VectorRDDStatisticalAggregator(currMean, currM2n, totalCnt, nnz, currMax, currMin) =
    +      self.map(_.toBreeze).aggregate(zeroValue)(seqOp, combOp)
    +
    +    // solve real mean
    +    val realMean = currMean :* nnz :/ totalCnt
    +
    +    // solve real m2n
    +    val deltaMean = currMean
    +    val realM2n = currM2n - ((deltaMean :* deltaMean) :* (nnz :* (nnz :- totalCnt)) :/ totalCnt)
    +
    +    // remove the initial value in max and min, i.e. the Double.MaxValue or Double.MinValue.
    +    nnz.activeIterator.foreach {
    +      case (id, 0.0) =>
    +        currMax(id) = 0.0
    +        currMin(id) = 0.0
    +      case _ =>
    +    }
    +
    +    // get variance
    +    realM2n :/= totalCnt
    +
    +    VectorRDDStatisticalAggregator(
    --- End diff --
    
    Do not expose breeze types in public APIs, and don't worry about `toBreeze`, which is a very light operation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/268#issuecomment-39346866
  
    Build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/268#discussion_r11236213
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/VectorRDDFunctions.scala ---
    @@ -0,0 +1,179 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.rdd
    +
    +import breeze.linalg.{axpy, Vector => BV}
    +
    +import org.apache.spark.mllib.linalg.{Vectors, Vector}
    +import org.apache.spark.rdd.RDD
    +
    +/**
    + * Trait of the summary statistics, including mean, variance, count, max, min, and non-zero elements
    + * count.
    + */
    +trait VectorRDDStatisticalSummary {
    +  def mean: Vector
    +  def variance: Vector
    +  def totalCount: Long
    +  def numNonZeros: Vector
    +  def max: Vector
    +  def min: Vector
    +}
    +
    +/**
    + * Aggregates [[org.apache.spark.mllib.rdd.VectorRDDStatisticalSummary VectorRDDStatisticalSummary]]
    + * together with add() and merge() function.
    + */
    +private class Aggregator(
    +    val currMean: BV[Double],
    +    val currM2n: BV[Double],
    +    var totalCnt: Double,
    +    val nnz: BV[Double],
    +    val currMax: BV[Double],
    +    val currMin: BV[Double]) extends VectorRDDStatisticalSummary with Serializable {
    +
    +  // lazy val is used for computing only once time. Same below.
    +  override lazy val mean = Vectors.fromBreeze(currMean :* nnz :/ totalCnt)
    +
    +  // Online variance solution used in add() function, while parallel variance solution used in
    +  // merge() function. Reference here:
    +  // http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance
    +  // Solution here ignoring the zero elements when calling add() and merge(), for decreasing the
    +  // O(n) algorithm to O(nnz). Real variance is computed here after we get other statistics, simply
    +  // by another parallel combination process.
    +  override lazy val variance = {
    +    val deltaMean = currMean
    +    var i = 0
    +    while(i < currM2n.size) {
    +      currM2n(i) += deltaMean(i) * deltaMean(i) * nnz(i) * (totalCnt-nnz(i)) / totalCnt
    +      currM2n(i) /= totalCnt
    +      i += 1
    +    }
    +    Vectors.fromBreeze(currM2n)
    +  }
    +
    +  override lazy val totalCount: Long = totalCnt.toLong
    +
    +  override lazy val numNonZeros: Vector = Vectors.fromBreeze(nnz)
    +
    +  override lazy val max: Vector = {
    +    nnz.iterator.foreach {
    +      case (id, count) =>
    +        if ((count == 0.0) || ((count < totalCnt) && (currMax(id) < 0.0)))  currMax(id) = 0.0
    +    }
    +    Vectors.fromBreeze(currMax)
    +  }
    +
    +  override lazy val min: Vector = {
    +    nnz.iterator.foreach {
    +      case (id, count) =>
    +        if ((count == 0.0) || ((count < totalCnt) && (currMin(id) > 0.0))) currMin(id) = 0.0
    +    }
    +    Vectors.fromBreeze(currMin)
    +  }
    +
    +  /**
    +   * Aggregate function used for aggregating elements in a worker together.
    +   */
    +  def add(currData: BV[Double]): this.type = {
    +    currData.activeIterator.foreach {
    +      // this case is used for filtering the zero elements if the vector is a dense one.
    +      case (id, 0.0) =>
    +      case (id, value) =>
    +        if (currMax(id) < value) currMax(id) = value
    +        if (currMin(id) > value) currMin(id) = value
    +
    +        val tmpPrevMean = currMean(id)
    +        currMean(id) = (currMean(id) * nnz(id) + value) / (nnz(id) + 1.0)
    +        currM2n(id) += (value - currMean(id)) * (value - tmpPrevMean)
    +
    +        nnz(id) += 1.0
    +    }
    +
    +    totalCnt += 1.0
    +    this
    +  }
    +
    +  /**
    +   * Combine function used for combining intermediate results together from every worker.
    +   */
    +  def merge(other: Aggregator): this.type = {
    +
    +    totalCnt += other.totalCnt
    +
    +    val deltaMean = currMean - other.currMean
    +
    +    other.currMean.activeIterator.foreach {
    +      case (id, 0.0) =>
    +      case (id, value) =>
    +        currMean(id) =
    +          (currMean(id) * nnz(id) + other.currMean(id) * other.nnz(id)) / (nnz(id) + other.nnz(id))
    +    }
    +
    +    var i = 0
    +    while(i < currM2n.size) {
    +      (nnz(i), other.nnz(i)) match {
    +        case (0.0, 0.0) =>
    +        case _ => currM2n(i) +=
    +          other.currM2n(i) + deltaMean(i) * deltaMean(i) * nnz(i) * other.nnz(i) / (nnz(i)+other.nnz(i))
    +      }
    +      i += 1
    +    }
    +
    +    other.currMax.activeIterator.foreach {
    +      case (id, value) =>
    +        if (currMax(id) < value) currMax(id) = value
    +    }
    +
    +    other.currMin.activeIterator.foreach {
    +      case (id, value) =>
    +        if (currMin(id) > value) currMin(id) = value
    +    }
    +
    +    axpy(1.0, other.nnz, nnz)
    +    this
    +  }
    +}
    +
    +/**
    + * Extra functions available on RDDs of [[org.apache.spark.mllib.linalg.Vector Vector]] through an
    + * implicit conversion. Import `org.apache.spark.MLContext._` at the top of your program to use
    + * these functions.
    + */
    +class VectorRDDFunctions(self: RDD[Vector]) extends Serializable {
    +
    +  /**
    +   * Compute full column-wise statistics for the RDD with the size of Vector as input parameter.
    +   */
    +  def summarizeStatistics(): VectorRDDStatisticalSummary = {
    --- End diff --
    
    Do you think `computeSummaryStatistics` is a better name?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by yinxusen <gi...@git.apache.org>.
Github user yinxusen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/268#discussion_r11161910
  
    --- Diff: mllib/src/test/scala/org/apache/spark/mllib/rdd/VectorRDDFunctionsSuite.scala ---
    @@ -0,0 +1,84 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.rdd
    +
    +import scala.collection.mutable.ArrayBuffer
    +
    +import org.scalatest.FunSuite
    +
    +import org.apache.spark.mllib.linalg.{Vector, Vectors}
    +import org.apache.spark.mllib.util.LocalSparkContext
    +import org.apache.spark.mllib.util.MLUtils._
    +
    +/**
    + * Test suite for the summary statistics of RDD[Vector]. Both the accuracy and the time consuming
    + * between dense and sparse vector are tested.
    + */
    +class VectorRDDFunctionsSuite extends FunSuite with LocalSparkContext {
    +  import VectorRDDFunctionsSuite._
    +
    +  val localData = Array(
    +    Vectors.dense(1.0, 2.0, 3.0),
    +    Vectors.dense(4.0, 5.0, 6.0),
    +    Vectors.dense(7.0, 8.0, 9.0)
    +  )
    +
    +  val sparseData = ArrayBuffer(Vectors.sparse(20, Seq((0, 1.0), (9, 2.0), (10, 7.0))))
    +  for (i <- 0 until 10000) sparseData += Vectors.sparse(20, Seq((9, 0.0)))
    +  sparseData += Vectors.sparse(20, Seq((0, 5.0), (9, 13.0), (16, 2.0)))
    +  sparseData += Vectors.sparse(20, Seq((3, 5.0), (9, 13.0), (18, 2.0)))
    +
    +  test("full-statistics") {
    +    val data = sc.parallelize(localData, 2)
    +    val (VectorRDDStatisticalSummary(mean, variance, cnt, nnz, max, min), denseTime) =
    +      time(data.summarizeStatistics(3))
    +
    +    assert(equivVector(mean, Vectors.dense(4.0, 5.0, 6.0)), "Column mean do not match.")
    +    assert(equivVector(variance, Vectors.dense(6.0, 6.0, 6.0)), "Column variance do not match.")
    +    assert(cnt === 3, "Column cnt do not match.")
    +    assert(equivVector(nnz, Vectors.dense(3.0, 3.0, 3.0)), "Column nnz do not match.")
    +    assert(equivVector(max, Vectors.dense(7.0, 8.0, 9.0)), "Column max do not match.")
    +    assert(equivVector(min, Vectors.dense(1.0, 2.0, 3.0)), "Column min do not match.")
    +
    +    val dataForSparse = sc.parallelize(sparseData.toSeq, 2)
    +    val (_, sparseTime) = time(dataForSparse.summarizeStatistics(20))
    +
    +    println(s"dense time is $denseTime, sparse time is $sparseTime.")
    +    assert(relativeTime(denseTime, sparseTime),
    +      "Relative time between dense and sparse vector doesn't match.")
    +  }
    +}
    +
    +object VectorRDDFunctionsSuite {
    +  def time[R](block: => R): (R, Double) = {
    +    val t0 = System.nanoTime()
    +    val result = block
    +    val t1 = System.nanoTime()
    +    (result, (t1 - t0).toDouble / 1.0e9)
    +  }
    +
    +  def equivVector(lhs: Vector, rhs: Vector): Boolean = {
    +    (lhs.toBreeze - rhs.toBreeze).norm(2) < 1e-9
    +  }
    +
    +  def relativeTime(lhs: Double, rhs: Double): Boolean = {
    +    val denominator = math.max(lhs, rhs)
    +    math.abs(lhs - rhs) / denominator < 0.3
    +  }
    +}
    +
    --- End diff --
    
    Build failed might be caught by this empty line in the end. I'll fix it in next commit with other problems.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/268#issuecomment-40045593
  
     Build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/268#issuecomment-39285028
  
    
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/13661/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/268#issuecomment-40031814
  
    Build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/268#discussion_r11271166
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/VectorRDDFunctions.scala ---
    @@ -0,0 +1,185 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.rdd
    +
    +import breeze.linalg.{Vector => BV, DenseVector => BDV}
    +
    +import org.apache.spark.mllib.linalg.{Vectors, Vector}
    +import org.apache.spark.rdd.RDD
    +
    +/**
    + * Trait of the summary statistics, including mean, variance, count, max, min, and non-zero elements
    + * count.
    + */
    +trait VectorRDDStatisticalSummary {
    +  def mean: Vector
    --- End diff --
    
    Add docs to each method and put one line between method declaration.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/268


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by yinxusen <gi...@git.apache.org>.
Github user yinxusen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/268#discussion_r11431836
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala ---
    @@ -28,6 +28,171 @@ import org.apache.spark.rdd.RDD
     import org.apache.spark.Logging
     
     /**
    + * Trait of the summary statistics, including mean, variance, count, max, min, and non-zero elements
    + * count.
    + */
    +trait VectorRDDStatisticalSummary {
    +
    +  /**
    +   * Computes the mean of columns in RDD[Vector].
    +   */
    +  def mean: Vector
    +
    +  /**
    +   * Computes the sample variance of columns in RDD[Vector].
    +   */
    +  def variance: Vector
    +
    +  /**
    +   * Computes number of vectors in RDD[Vector].
    +   */
    +  def count: Long
    +
    +  /**
    +   * Computes the number of non-zero elements in each column of RDD[Vector].
    +   */
    +  def numNonZeros: Vector
    +
    +  /**
    +   * Computes the maximum of each column in RDD[Vector].
    +   */
    +  def max: Vector
    +
    +  /**
    +   * Computes the minimum of each column in RDD[Vector].
    +   */
    +  def min: Vector
    +}
    +
    +
    --- End diff --
    
    I'll remove the extra blank line in next push.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/268#issuecomment-40045601
  
    Build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by yinxusen <gi...@git.apache.org>.
Github user yinxusen commented on the pull request:

    https://github.com/apache/spark/pull/268#issuecomment-40051673
  
    @mateiz I have fixed the issues. You can merge it if looks good to you.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/268#issuecomment-39285024
  
    Build finished. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/268#issuecomment-40044964
  
    Build finished. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on the pull request:

    https://github.com/apache/spark/pull/268#issuecomment-40152058
  
    Jenkins, test this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/268#issuecomment-39427670
  
    Build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/268#discussion_r11470050
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala ---
    @@ -19,13 +19,144 @@ package org.apache.spark.mllib.linalg.distributed
     
     import java.util
     
    -import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, svd => brzSvd}
    +import breeze.linalg.{Vector => BV, DenseMatrix => BDM, DenseVector => BDV, svd => brzSvd}
     import breeze.numerics.{sqrt => brzSqrt}
     import com.github.fommil.netlib.BLAS.{getInstance => blas}
     
     import org.apache.spark.mllib.linalg._
     import org.apache.spark.rdd.RDD
     import org.apache.spark.Logging
    +import org.apache.spark.mllib.stat.MultivariateStatisticalSummary
    +
    +/**
    + * Column statistics aggregator implementing
    + * [[org.apache.spark.mllib.stat.MultivariateStatisticalSummary]]
    + * together with add() and merge() function.
    + * A numerically stable algorithm is implemented to compute sample mean and variance:
    +  *[[http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance variance-wiki]].
    + * Zero elements (including explicit zero values) are skipped when calling add() and merge(),
    + * to have time complexity O(nnz) instead of O(n) for each column.
    + */
    +private class ColumnStatisticsAggregator(private val n: Int)
    +    extends MultivariateStatisticalSummary with Serializable {
    +
    +  private val currMean: BDV[Double] = BDV.zeros[Double](n)
    +  private val currM2n: BDV[Double] = BDV.zeros[Double](n)
    +  private var totalCnt = 0.0
    +  private val nnz: BDV[Double] = BDV.zeros[Double](n)
    +  private val currMax: BDV[Double] = BDV.fill(n)(Double.MinValue)
    +  private val currMin: BDV[Double] = BDV.fill(n)(Double.MaxValue)
    +
    +  override def mean: Vector = {
    +    val realMean = BDV.zeros[Double](n)
    +    var i = 0
    +    while (i < n) {
    +      realMean(i) = currMean(i) * nnz(i) / totalCnt
    +      i += 1
    +    }
    +    Vectors.fromBreeze(realMean)
    +  }
    +
    +  override def variance: Vector = {
    +    val realVariance = BDV.zeros[Double](n)
    +
    +    val denominator = totalCnt - 1.0
    +
    +    // Sample variance is computed, if the denominator is 0, the variance is just 0.
    +    if (denominator != 0.0) {
    +      val deltaMean = currMean
    +      var i = 0
    +      while (i < currM2n.size) {
    +        realVariance(i) =
    +          currM2n(i) + deltaMean(i) * deltaMean(i) * nnz(i) * (totalCnt - nnz(i)) / totalCnt
    +        realVariance(i) /= denominator
    +        i += 1
    +      }
    +    }
    +
    +    Vectors.fromBreeze(realVariance)
    +  }
    +
    +  override def count: Long = totalCnt.toLong
    +
    +  override def numNonzeros: Vector = Vectors.fromBreeze(nnz)
    +
    +  override def max: Vector = {
    +    var i = 0
    +    while (i < n) {
    +      if ((nnz(i) < totalCnt) && (currMax(i) < 0.0)) currMax(i) = 0.0
    +      i += 1
    +    }
    +    Vectors.fromBreeze(currMax)
    +  }
    +
    +  override def min: Vector = {
    +    var i = 0
    +    while (i < n) {
    +      if ((nnz(i) < totalCnt) && (currMin(i) > 0.0)) currMin(i) = 0.0
    +      i += 1
    +    }
    +    Vectors.fromBreeze(currMin)
    +  }
    +
    +  /**
    +   * Aggregates a row.
    +   */
    +  def add(currData: BV[Double]): this.type = {
    +    currData.activeIterator.foreach {
    +      case (_, 0.0) => // Skip explicit zero elements.
    +      case (i, value) =>
    +        if (currMax(i) < value) currMax(i) = value
    +        if (currMin(i) > value) currMin(i) = value
    +
    +        val tmpPrevMean = currMean(i)
    +        currMean(i) = (currMean(i) * nnz(i) + value) / (nnz(i) + 1.0)
    +        currM2n(i) += (value - currMean(i)) * (value - tmpPrevMean)
    +
    +        nnz(i) += 1.0
    +    }
    +
    +    totalCnt += 1.0
    +    this
    +  }
    +
    +  /**
    +   * Merges another aggregator.
    +   */
    +  def merge(other: ColumnStatisticsAggregator): this.type = {
    +
    +    require(n == other.n, s"Dimensions mismatch. Expecting $n but got ${other.n}.")
    +
    +    totalCnt += other.totalCnt
    +
    +    val deltaMean = currMean - other.currMean
    +
    +    var i = 0
    +    while (i < n) {
    +      // merge mean together
    +      if (other.currMean(i) != 0.0) {
    +        currMean(i) = (currMean(i) * nnz(i) + other.currMean(i) * other.nnz(i)) /
    +          (nnz(i) + other.nnz(i))
    +      }
    +
    +      // merge m2n together
    +      if (nnz(i) + other.nnz(i) != 0.0) {
    +        currM2n(i) += other.currM2n(i) + deltaMean(i) * deltaMean(i) * nnz(i) * other.nnz(i) /
    +          (nnz(i) + other.nnz(i))
    +      }
    +
    +      if (currMax(i) < other.currMax(i)) currMax(i) = other.currMax(i)
    --- End diff --
    
    Use curly braces around the if bodies in case we want to extend them later, like this:
    ```
    if (currMax(i) < other.currMax(i)) {
      currMax(i) = other.currMax(i)
    }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/268#discussion_r11468581
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala ---
    @@ -19,13 +19,144 @@ package org.apache.spark.mllib.linalg.distributed
     
     import java.util
     
    -import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, svd => brzSvd}
    +import breeze.linalg.{Vector => BV, DenseMatrix => BDM, DenseVector => BDV, svd => brzSvd}
     import breeze.numerics.{sqrt => brzSqrt}
     import com.github.fommil.netlib.BLAS.{getInstance => blas}
     
     import org.apache.spark.mllib.linalg._
     import org.apache.spark.rdd.RDD
     import org.apache.spark.Logging
    +import org.apache.spark.mllib.stat.MultivariateStatisticalSummary
    +
    +/**
    + * Column statistics aggregator implementing
    + * [[org.apache.spark.mllib.stat.MultivariateStatisticalSummary]]
    + * together with add() and merge() function.
    + * A numerically stable algorithm is implemented to compute sample mean and variance:
    +  *[[http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance variance-wiki]].
    + * Zero elements (including explicit zero values) are skipped when calling add() and merge(),
    + * to have time complexity O(nnz) instead of O(n) for each column.
    + */
    +private class ColumnStatisticsAggregator(private val n: Int)
    +    extends MultivariateStatisticalSummary with Serializable {
    +
    +  private val currMean: BDV[Double] = BDV.zeros[Double](n)
    +  private val currM2n: BDV[Double] = BDV.zeros[Double](n)
    +  private var totalCnt: Double = 0.0
    --- End diff --
    
    remove ": Double"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/268#discussion_r11235843
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/VectorRDDFunctions.scala ---
    @@ -0,0 +1,179 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.rdd
    +
    +import breeze.linalg.{axpy, Vector => BV}
    +
    +import org.apache.spark.mllib.linalg.{Vectors, Vector}
    +import org.apache.spark.rdd.RDD
    +
    +/**
    + * Trait of the summary statistics, including mean, variance, count, max, min, and non-zero elements
    + * count.
    + */
    +trait VectorRDDStatisticalSummary {
    +  def mean: Vector
    +  def variance: Vector
    +  def totalCount: Long
    --- End diff --
    
    `count` should be sufficient here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/268#discussion_r11169770
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/VectorRDDFunctions.scala ---
    @@ -0,0 +1,170 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.mllib.rdd
    +
    +import breeze.linalg.{axpy, Vector => BV}
    +
    +import org.apache.spark.mllib.linalg.{Vector, Vectors}
    +import org.apache.spark.rdd.RDD
    +
    +/**
    + * Case class of the summary statistics, including mean, variance, count, max, min, and non-zero
    + * elements count.
    + */
    +case class VectorRDDStatisticalSummary(
    --- End diff --
    
    Instead of having two classes, we only need one accumulator class. `variance` can be computed on demand. Use `StatCounter` as a reference implementation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on the pull request:

    https://github.com/apache/spark/pull/268#issuecomment-39822477
  
    Btw, can we use sample variance instead of population variance?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/268#issuecomment-39415213
  
    Build finished. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/268#issuecomment-39347104
  
    
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/13685/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/268#issuecomment-40031013
  
    All automated tests passed.
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/13969/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by yinxusen <gi...@git.apache.org>.
Github user yinxusen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/268#discussion_r11238923
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/VectorRDDFunctions.scala ---
    @@ -0,0 +1,179 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.rdd
    +
    +import breeze.linalg.{axpy, Vector => BV}
    +
    +import org.apache.spark.mllib.linalg.{Vectors, Vector}
    +import org.apache.spark.rdd.RDD
    +
    +/**
    + * Trait of the summary statistics, including mean, variance, count, max, min, and non-zero elements
    + * count.
    + */
    +trait VectorRDDStatisticalSummary {
    +  def mean: Vector
    +  def variance: Vector
    +  def totalCount: Long
    +  def numNonZeros: Vector
    +  def max: Vector
    +  def min: Vector
    +}
    +
    +/**
    + * Aggregates [[org.apache.spark.mllib.rdd.VectorRDDStatisticalSummary VectorRDDStatisticalSummary]]
    + * together with add() and merge() function.
    + */
    +private class Aggregator(
    +    val currMean: BV[Double],
    +    val currM2n: BV[Double],
    +    var totalCnt: Double,
    +    val nnz: BV[Double],
    +    val currMax: BV[Double],
    +    val currMin: BV[Double]) extends VectorRDDStatisticalSummary with Serializable {
    +
    +  // lazy val is used for computing only once time. Same below.
    +  override lazy val mean = Vectors.fromBreeze(currMean :* nnz :/ totalCnt)
    --- End diff --
    
    If we remove `lazy`, it is dangerous for end-user to call this method more than once. Should we add a warning in the comment?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/268#discussion_r11169142
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/VectorRDDFunctions.scala ---
    @@ -0,0 +1,170 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.mllib.rdd
    +
    +import breeze.linalg.{axpy, Vector => BV}
    +
    +import org.apache.spark.mllib.linalg.{Vector, Vectors}
    +import org.apache.spark.rdd.RDD
    +
    +/**
    + * Case class of the summary statistics, including mean, variance, count, max, min, and non-zero
    + * elements count.
    + */
    +case class VectorRDDStatisticalSummary(
    +    mean: Vector,
    +    variance: Vector,
    +    count: Long,
    +    max: Vector,
    +    min: Vector,
    +    nonZeroCnt: Vector) extends Serializable
    +
    +/**
    + * Case class of the aggregate value for collecting summary statistics from RDD[Vector]. These
    + * values are relatively with
    + * [[org.apache.spark.mllib.rdd.VectorRDDStatisticalSummary VectorRDDStatisticalSummary]], the
    + * latter is computed from the former.
    + */
    +private case class VectorRDDStatisticalRing(
    +    fakeMean: BV[Double],
    --- End diff --
    
    `fakeMean` is a weird name here. `curMean` looks better to me.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on the pull request:

    https://github.com/apache/spark/pull/268#issuecomment-40269790
  
    Thanks, merged!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/268#discussion_r11427570
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/VectorRDDFunctions.scala ---
    @@ -0,0 +1,208 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.rdd
    +
    +import breeze.linalg.{Vector => BV, DenseVector => BDV}
    +
    +import org.apache.spark.mllib.linalg.{Vectors, Vector}
    +import org.apache.spark.rdd.RDD
    +
    +/**
    + * Trait of the summary statistics, including mean, variance, count, max, min, and non-zero elements
    + * count.
    + */
    +trait VectorRDDStatisticalSummary {
    +
    +  /**
    +   * Computes the mean of columns in RDD[Vector].
    +   */
    +  def mean: Vector
    +
    +  /**
    +   * Computes the sample variance of columns in RDD[Vector].
    +   */
    +  def variance: Vector
    +
    +  /**
    +   * Computes number of vectors in RDD[Vector].
    +   */
    +  def count: Long
    +
    +  /**
    +   * Computes the number of non-zero elements in each column of RDD[Vector].
    +   */
    +  def numNonZeros: Vector
    +
    +  /**
    +   * Computes the maximum of each column in RDD[Vector].
    +   */
    +  def max: Vector
    +
    +  /**
    +   * Computes the minimum of each column in RDD[Vector].
    +   */
    +  def min: Vector
    +}
    +
    +/**
    + * Aggregates [[org.apache.spark.mllib.rdd.VectorRDDStatisticalSummary VectorRDDStatisticalSummary]]
    + * together with add() and merge() function. Online variance solution used in add() function, while
    + * parallel variance solution used in merge() function. Reference here:
    + * [[http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance variance-wiki]]. Solution here
    + * ignoring the zero elements when calling add() and merge(), for decreasing the O(n) algorithm to
    + * O(nnz). Real variance is computed here after we get other statistics, simply by another parallel
    + * combination process.
    + */
    +private class VectorRDDStatisticsAggregator(
    +    val currMean: BDV[Double],
    +    val currM2n: BDV[Double],
    +    var totalCnt: Double,
    +    val nnz: BDV[Double],
    +    val currMax: BDV[Double],
    +    val currMin: BDV[Double])
    +  extends VectorRDDStatisticalSummary with Serializable {
    +
    +  override def mean = {
    +    val realMean = BDV.zeros[Double](currMean.length)
    +    var i = 0
    +    while (i < currMean.length) {
    +      realMean(i) = currMean(i) * nnz(i) / totalCnt
    +      i += 1
    +    }
    +    Vectors.fromBreeze(realMean)
    +  }
    +
    +  override def variance = {
    +    val realVariance = BDV.zeros[Double](currM2n.length)
    +    val deltaMean = currMean
    +    var i = 0
    +    while (i < currM2n.size) {
    +      realVariance(i) =
    +        currM2n(i) + deltaMean(i) * deltaMean(i) * nnz(i) * (totalCnt - nnz(i)) / totalCnt
    +      realVariance(i) /= (totalCnt - 1.0)
    --- End diff --
    
    What if `totalCnt == 1`? R's `var` returns `NA` for a sample of size 1.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/268#discussion_r11236156
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/VectorRDDFunctions.scala ---
    @@ -0,0 +1,179 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.rdd
    +
    +import breeze.linalg.{axpy, Vector => BV}
    +
    +import org.apache.spark.mllib.linalg.{Vectors, Vector}
    +import org.apache.spark.rdd.RDD
    +
    +/**
    + * Trait of the summary statistics, including mean, variance, count, max, min, and non-zero elements
    + * count.
    + */
    +trait VectorRDDStatisticalSummary {
    +  def mean: Vector
    +  def variance: Vector
    +  def totalCount: Long
    +  def numNonZeros: Vector
    +  def max: Vector
    +  def min: Vector
    +}
    +
    +/**
    + * Aggregates [[org.apache.spark.mllib.rdd.VectorRDDStatisticalSummary VectorRDDStatisticalSummary]]
    + * together with add() and merge() function.
    + */
    +private class Aggregator(
    +    val currMean: BV[Double],
    +    val currM2n: BV[Double],
    +    var totalCnt: Double,
    +    val nnz: BV[Double],
    +    val currMax: BV[Double],
    +    val currMin: BV[Double]) extends VectorRDDStatisticalSummary with Serializable {
    +
    +  // lazy val is used for computing only once time. Same below.
    +  override lazy val mean = Vectors.fromBreeze(currMean :* nnz :/ totalCnt)
    +
    +  // Online variance solution used in add() function, while parallel variance solution used in
    +  // merge() function. Reference here:
    +  // http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance
    +  // Solution here ignoring the zero elements when calling add() and merge(), for decreasing the
    +  // O(n) algorithm to O(nnz). Real variance is computed here after we get other statistics, simply
    +  // by another parallel combination process.
    +  override lazy val variance = {
    +    val deltaMean = currMean
    +    var i = 0
    +    while(i < currM2n.size) {
    +      currM2n(i) += deltaMean(i) * deltaMean(i) * nnz(i) * (totalCnt-nnz(i)) / totalCnt
    +      currM2n(i) /= totalCnt
    +      i += 1
    +    }
    +    Vectors.fromBreeze(currM2n)
    +  }
    +
    +  override lazy val totalCount: Long = totalCnt.toLong
    +
    +  override lazy val numNonZeros: Vector = Vectors.fromBreeze(nnz)
    +
    +  override lazy val max: Vector = {
    +    nnz.iterator.foreach {
    +      case (id, count) =>
    +        if ((count == 0.0) || ((count < totalCnt) && (currMax(id) < 0.0)))  currMax(id) = 0.0
    +    }
    +    Vectors.fromBreeze(currMax)
    +  }
    +
    +  override lazy val min: Vector = {
    +    nnz.iterator.foreach {
    +      case (id, count) =>
    +        if ((count == 0.0) || ((count < totalCnt) && (currMin(id) > 0.0))) currMin(id) = 0.0
    +    }
    +    Vectors.fromBreeze(currMin)
    +  }
    +
    +  /**
    +   * Aggregate function used for aggregating elements in a worker together.
    +   */
    +  def add(currData: BV[Double]): this.type = {
    +    currData.activeIterator.foreach {
    +      // this case is used for filtering the zero elements if the vector is a dense one.
    +      case (id, 0.0) =>
    +      case (id, value) =>
    +        if (currMax(id) < value) currMax(id) = value
    +        if (currMin(id) > value) currMin(id) = value
    +
    +        val tmpPrevMean = currMean(id)
    +        currMean(id) = (currMean(id) * nnz(id) + value) / (nnz(id) + 1.0)
    +        currM2n(id) += (value - currMean(id)) * (value - tmpPrevMean)
    +
    +        nnz(id) += 1.0
    +    }
    +
    +    totalCnt += 1.0
    +    this
    +  }
    +
    +  /**
    +   * Combine function used for combining intermediate results together from every worker.
    +   */
    +  def merge(other: Aggregator): this.type = {
    +
    +    totalCnt += other.totalCnt
    +
    +    val deltaMean = currMean - other.currMean
    +
    +    other.currMean.activeIterator.foreach {
    --- End diff --
    
    `currMean` is a dense vector. So use a while loop.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/268#issuecomment-39432280
  
    Build finished. All automated tests passed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on the pull request:

    https://github.com/apache/spark/pull/268#issuecomment-40004876
  
    @yinxusen I sent a PR to your repo with updated interface names and tests. Please merge it if it looks good to you. I moved `MultivariateStatisticalSummary` to `mllib.stat` and changed the method name to `computeColumnSummaryStatistics`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/268#issuecomment-38985979
  
     Merged build triggered. Build is starting -or- tests failed to complete.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/268#discussion_r11193678
  
    --- Diff: mllib/src/test/scala/org/apache/spark/mllib/rdd/VectorRDDFunctionsSuite.scala ---
    @@ -0,0 +1,87 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.mllib.rdd
    +
    +import scala.collection.mutable.ArrayBuffer
    +
    +import org.scalatest.FunSuite
    +
    +import org.apache.spark.mllib.linalg.{Vector, Vectors}
    +import org.apache.spark.mllib.util.LocalSparkContext
    +import org.apache.spark.mllib.util.MLUtils._
    +
    +/**
    + * Test suite for the summary statistics of RDD[Vector]. Both the accuracy and the time consuming
    + * between dense and sparse vector are tested.
    + */
    +class VectorRDDFunctionsSuite extends FunSuite with LocalSparkContext {
    +  import VectorRDDFunctionsSuite._
    +
    +  val localData = Array(
    +    Vectors.dense(1.0, 2.0, 3.0),
    +    Vectors.dense(4.0, 5.0, 6.0),
    +    Vectors.dense(7.0, 8.0, 9.0)
    +  )
    +
    +  val sparseData = ArrayBuffer(Vectors.sparse(20, Seq((0, 1.0), (9, 2.0), (10, 7.0))))
    +  for (i <- 0 until 10000) sparseData += Vectors.sparse(20, Seq((9, 0.0)))
    +  sparseData += Vectors.sparse(20, Seq((0, 5.0), (9, 13.0), (16, 2.0)))
    +  sparseData += Vectors.sparse(20, Seq((3, 5.0), (9, 13.0), (18, 2.0)))
    +
    +  test("full-statistics") {
    +    val data = sc.parallelize(localData, 2)
    +    val (VectorRDDStatisticalAggregator(mean, variance, cnt, nnz, max, min), denseTime) =
    +      time(data.summarizeStatistics())
    +
    +    assert(equivVector(Vectors.fromBreeze(mean), Vectors.dense(4.0, 5.0, 6.0)),
    +      "Column mean do not match.")
    +    assert(equivVector(Vectors.fromBreeze(variance), Vectors.dense(6.0, 6.0, 6.0)),
    +      "Column variance do not match.")
    +    assert(cnt === 3.0, "Column cnt do not match.")
    +    assert(equivVector(Vectors.fromBreeze(nnz), Vectors.dense(3.0, 3.0, 3.0)),
    +      "Column nnz do not match.")
    +    assert(equivVector(Vectors.fromBreeze(max), Vectors.dense(7.0, 8.0, 9.0)),
    +      "Column max do not match.")
    +    assert(equivVector(Vectors.fromBreeze(min), Vectors.dense(1.0, 2.0, 3.0)),
    +      "Column min do not match.")
    +
    +    val dataForSparse = sc.parallelize(sparseData.toSeq, 2)
    +    val (_, sparseTime) = time(dataForSparse.summarizeStatistics())
    +
    +    println(s"dense time is $denseTime, sparse time is $sparseTime.")
    +    assert(relativeTime(denseTime, sparseTime),
    --- End diff --
    
    Do not assert on running times, which are subject to env.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/268#discussion_r11236003
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/VectorRDDFunctions.scala ---
    @@ -0,0 +1,179 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.rdd
    +
    +import breeze.linalg.{axpy, Vector => BV}
    +
    +import org.apache.spark.mllib.linalg.{Vectors, Vector}
    +import org.apache.spark.rdd.RDD
    +
    +/**
    + * Trait of the summary statistics, including mean, variance, count, max, min, and non-zero elements
    + * count.
    + */
    +trait VectorRDDStatisticalSummary {
    +  def mean: Vector
    +  def variance: Vector
    +  def totalCount: Long
    +  def numNonZeros: Vector
    +  def max: Vector
    +  def min: Vector
    +}
    +
    +/**
    + * Aggregates [[org.apache.spark.mllib.rdd.VectorRDDStatisticalSummary VectorRDDStatisticalSummary]]
    + * together with add() and merge() function.
    + */
    +private class Aggregator(
    +    val currMean: BV[Double],
    +    val currM2n: BV[Double],
    +    var totalCnt: Double,
    +    val nnz: BV[Double],
    +    val currMax: BV[Double],
    +    val currMin: BV[Double]) extends VectorRDDStatisticalSummary with Serializable {
    +
    +  // lazy val is used for computing only once time. Same below.
    +  override lazy val mean = Vectors.fromBreeze(currMean :* nnz :/ totalCnt)
    +
    +  // Online variance solution used in add() function, while parallel variance solution used in
    +  // merge() function. Reference here:
    +  // http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance
    +  // Solution here ignoring the zero elements when calling add() and merge(), for decreasing the
    +  // O(n) algorithm to O(nnz). Real variance is computed here after we get other statistics, simply
    +  // by another parallel combination process.
    +  override lazy val variance = {
    +    val deltaMean = currMean
    +    var i = 0
    +    while(i < currM2n.size) {
    +      currM2n(i) += deltaMean(i) * deltaMean(i) * nnz(i) * (totalCnt-nnz(i)) / totalCnt
    +      currM2n(i) /= totalCnt
    +      i += 1
    +    }
    +    Vectors.fromBreeze(currM2n)
    +  }
    +
    +  override lazy val totalCount: Long = totalCnt.toLong
    +
    +  override lazy val numNonZeros: Vector = Vectors.fromBreeze(nnz)
    +
    +  override lazy val max: Vector = {
    +    nnz.iterator.foreach {
    +      case (id, count) =>
    +        if ((count == 0.0) || ((count < totalCnt) && (currMax(id) < 0.0)))  currMax(id) = 0.0
    --- End diff --
    
    You don't need `count == 0.0`. `currMax(id)` is `Double.MinValue` if `count == 0.0`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/268#discussion_r11193525
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/VectorRDDFunctions.scala ---
    @@ -0,0 +1,156 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.mllib.rdd
    +
    +import breeze.linalg.{axpy, Vector => BV}
    +
    +import org.apache.spark.mllib.linalg.Vector
    +import org.apache.spark.rdd.RDD
    +
    +/**
    + * Case class of the summary statistics, including mean, variance, count, max, min, and non-zero
    + * elements count.
    + */
    +case class VectorRDDStatisticalAggregator(
    +    mean: BV[Double],
    +    statCounter: BV[Double],
    +    totalCount: Double,
    +    numNonZeros: BV[Double],
    +    max: BV[Double],
    +    min: BV[Double])
    +
    +/**
    + * Extra functions available on RDDs of [[org.apache.spark.mllib.linalg.Vector Vector]] through an
    + * implicit conversion. Import `org.apache.spark.MLContext._` at the top of your program to use
    + * these functions.
    + */
    +class VectorRDDFunctions(self: RDD[Vector]) extends Serializable {
    +
    +  /**
    +   * Aggregate function used for aggregating elements in a worker together.
    +   */
    +  private def seqOp(
    --- End diff --
    
    Define seqOp and combOp inside Aggregator to have a better separation of code.
    ~~~
    class Aggregator {
      def add(BV[Double]): this.type
      def merge(Aggregator): this.type
    }
    ~~~


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/268#issuecomment-38986780
  
    All automated tests passed.
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/13565/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/268#issuecomment-40050338
  
    All automated tests passed.
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/13986/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/268#issuecomment-40028845
  
    Merged build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/268#issuecomment-39206346
  
    Build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by yinxusen <gi...@git.apache.org>.
Github user yinxusen commented on the pull request:

    https://github.com/apache/spark/pull/268#issuecomment-39954694
  
    Well, the `git rebase` is very tricky... @mengxr You can have a look.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on the pull request:

    https://github.com/apache/spark/pull/268#issuecomment-39502081
  
    And please take a look at https://github.com/apache/spark/pull/296 . I'm not sure where we should put this method. To make life simpler for Java users, we better put it under `RowRDDMatrix`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/268#discussion_r11099393
  
    --- Diff: mllib/src/test/scala/org/apache/spark/mllib/rdd/VectorRDDFunctionsSuite.scala ---
    @@ -0,0 +1,51 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.rdd
    +
    +import org.scalatest.FunSuite
    +import org.apache.spark.mllib.linalg.{Vector, Vectors}
    +import org.apache.spark.mllib.util.LocalSparkContext
    +import org.apache.spark.mllib.util.MLUtils._
    +
    +class VectorRDDFunctionsSuite extends FunSuite with LocalSparkContext {
    +  import VectorRDDFunctionsSuite._
    +
    +  val localData = Array(
    +    Vectors.dense(1.0, 2.0, 3.0),
    +    Vectors.dense(4.0, 5.0, 6.0),
    +    Vectors.dense(7.0, 8.0, 9.0)
    +  )
    +
    +  test("full-statistics") {
    --- End diff --
    
    Please also add a test for sparse vector rdd. Make the dimension big like 10000 but only the first and the last column contains values. The running time for computing statistical summary should be `O(nnz)` but not `O(n d)`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/268#issuecomment-40155151
  
    Merged build finished. All automated tests passed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/268#discussion_r11239093
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/VectorRDDFunctions.scala ---
    @@ -0,0 +1,179 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.rdd
    +
    +import breeze.linalg.{axpy, Vector => BV}
    +
    +import org.apache.spark.mllib.linalg.{Vectors, Vector}
    +import org.apache.spark.rdd.RDD
    +
    +/**
    + * Trait of the summary statistics, including mean, variance, count, max, min, and non-zero elements
    + * count.
    + */
    +trait VectorRDDStatisticalSummary {
    +  def mean: Vector
    +  def variance: Vector
    +  def totalCount: Long
    +  def numNonZeros: Vector
    +  def max: Vector
    +  def min: Vector
    +}
    +
    +/**
    + * Aggregates [[org.apache.spark.mllib.rdd.VectorRDDStatisticalSummary VectorRDDStatisticalSummary]]
    + * together with add() and merge() function.
    + */
    +private class Aggregator(
    +    val currMean: BV[Double],
    +    val currM2n: BV[Double],
    +    var totalCnt: Double,
    +    val nnz: BV[Double],
    +    val currMax: BV[Double],
    +    val currMin: BV[Double]) extends VectorRDDStatisticalSummary with Serializable {
    +
    +  // lazy val is used for computing only once time. Same below.
    +  override lazy val mean = Vectors.fromBreeze(currMean :* nnz :/ totalCnt)
    --- End diff --
    
    If the user call this method twice, just generate the vectors twice. We need to keep a small footprint within the aggregator.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by yinxusen <gi...@git.apache.org>.
Github user yinxusen commented on the pull request:

    https://github.com/apache/spark/pull/268#issuecomment-39822920
  
    @mengxr Yep, I have substituted the population variance with sample variance. See line 97. in VectorRDDStatistics.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/268#issuecomment-39432281
  
    All automated tests passed.
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/13723/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/268#discussion_r11271293
  
    --- Diff: mllib/src/test/scala/org/apache/spark/mllib/rdd/VectorRDDFunctionsSuite.scala ---
    @@ -0,0 +1,95 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.rdd
    +
    +import scala.collection.mutable.ArrayBuffer
    +
    +import org.scalatest.FunSuite
    +
    +import org.apache.spark.mllib.linalg.{Vector, Vectors}
    +import org.apache.spark.mllib.rdd.VectorRDDFunctionsSuite._
    +import org.apache.spark.mllib.util.LocalSparkContext
    +import org.apache.spark.mllib.util.MLUtils._
    +
    +/**
    + * Test suite for the summary statistics of RDD[Vector]. Both the accuracy and the time consuming
    + * between dense and sparse vector are tested.
    + */
    +class VectorRDDFunctionsSuite extends FunSuite with LocalSparkContext {
    +
    +  val localData = Array(
    +    Vectors.dense(1.0, 2.0, 3.0),
    --- End diff --
    
    Put some explicit zeros in the data to see whether nnz is correct.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on the pull request:

    https://github.com/apache/spark/pull/268#issuecomment-40145491
  
    Hey, unfortunately this no longer merges cleanly. Do you mind rebasing it? I think some conflicting changes happened in MLUtils.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/268#issuecomment-39529029
  
    All automated tests passed.
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/13752/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/268#discussion_r11470173
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala ---
    @@ -273,6 +411,27 @@ class RowMatrix(
         }
         mat
       }
    +
    +  /** Updates or verifies the number of columns. */
    +  private def updateNumCols(n: Int) {
    --- End diff --
    
    No, we don't. Thanks for catching it!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/268#discussion_r11468547
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala ---
    @@ -19,13 +19,144 @@ package org.apache.spark.mllib.linalg.distributed
     
     import java.util
     
    -import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, svd => brzSvd}
    +import breeze.linalg.{Vector => BV, DenseMatrix => BDM, DenseVector => BDV, svd => brzSvd}
     import breeze.numerics.{sqrt => brzSqrt}
     import com.github.fommil.netlib.BLAS.{getInstance => blas}
     
     import org.apache.spark.mllib.linalg._
     import org.apache.spark.rdd.RDD
     import org.apache.spark.Logging
    +import org.apache.spark.mllib.stat.MultivariateStatisticalSummary
    +
    +/**
    + * Column statistics aggregator implementing
    + * [[org.apache.spark.mllib.stat.MultivariateStatisticalSummary]]
    + * together with add() and merge() function.
    + * A numerically stable algorithm is implemented to compute sample mean and variance:
    +  *[[http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance variance-wiki]].
    + * Zero elements (including explicit zero values) are skipped when calling add() and merge(),
    + * to have time complexity O(nnz) instead of O(n) for each column.
    + */
    +private class ColumnStatisticsAggregator(private val n: Int)
    +    extends MultivariateStatisticalSummary with Serializable {
    +
    +  private val currMean: BDV[Double] = BDV.zeros[Double](n)
    +  private val currM2n: BDV[Double] = BDV.zeros[Double](n)
    +  private var totalCnt: Double = 0.0
    +  private val nnz: BDV[Double] = BDV.zeros[Double](n)
    +  private val currMax: BDV[Double] = BDV.fill(n)(Double.MinValue)
    +  private val currMin: BDV[Double] = BDV.fill(n)(Double.MaxValue)
    +
    +  override def mean = {
    --- End diff --
    
    add return type to method declaration


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/268#discussion_r11193583
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/VectorRDDFunctions.scala ---
    @@ -0,0 +1,156 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.mllib.rdd
    +
    +import breeze.linalg.{axpy, Vector => BV}
    +
    +import org.apache.spark.mllib.linalg.Vector
    +import org.apache.spark.rdd.RDD
    +
    +/**
    + * Case class of the summary statistics, including mean, variance, count, max, min, and non-zero
    + * elements count.
    + */
    +case class VectorRDDStatisticalAggregator(
    +    mean: BV[Double],
    +    statCounter: BV[Double],
    +    totalCount: Double,
    +    numNonZeros: BV[Double],
    +    max: BV[Double],
    +    min: BV[Double])
    +
    +/**
    + * Extra functions available on RDDs of [[org.apache.spark.mllib.linalg.Vector Vector]] through an
    + * implicit conversion. Import `org.apache.spark.MLContext._` at the top of your program to use
    + * these functions.
    + */
    +class VectorRDDFunctions(self: RDD[Vector]) extends Serializable {
    +
    +  /**
    +   * Aggregate function used for aggregating elements in a worker together.
    +   */
    +  private def seqOp(
    +      aggregator: VectorRDDStatisticalAggregator,
    +      currData: BV[Double]): VectorRDDStatisticalAggregator = {
    +    aggregator match {
    +      case VectorRDDStatisticalAggregator(prevMean, prevM2n, cnt, nnzVec, maxVec, minVec) =>
    +        currData.activeIterator.foreach {
    +          case (id, 0.0) =>
    +          case (id, value) =>
    +            if (maxVec(id) < value) maxVec(id) = value
    +            if (minVec(id) > value) minVec(id) = value
    +
    +            val tmpPrevMean = prevMean(id)
    +            prevMean(id) = (prevMean(id) * cnt + value) / (cnt + 1.0)
    +            prevM2n(id) += (value - prevMean(id)) * (value - tmpPrevMean)
    +
    +            nnzVec(id) += 1.0
    +        }
    +
    +        VectorRDDStatisticalAggregator(
    +          prevMean,
    +          prevM2n,
    +          cnt + 1.0,
    +          nnzVec,
    +          maxVec,
    +          minVec)
    +    }
    +  }
    +
    +  /**
    +   * Combine function used for combining intermediate results together from every worker.
    +   */
    +  private def combOp(
    +      statistics1: VectorRDDStatisticalAggregator,
    +      statistics2: VectorRDDStatisticalAggregator): VectorRDDStatisticalAggregator = {
    +    (statistics1, statistics2) match {
    +      case (VectorRDDStatisticalAggregator(mean1, m2n1, cnt1, nnz1, max1, min1),
    +            VectorRDDStatisticalAggregator(mean2, m2n2, cnt2, nnz2, max2, min2)) =>
    +        val totalCnt = cnt1 + cnt2
    +        val deltaMean = mean2 - mean1
    +
    +        mean2.activeIterator.foreach {
    +          case (id, 0.0) =>
    +          case (id, value) =>
    +            mean1(id) = (mean1(id) * nnz1(id) + mean2(id) * nnz2(id)) / (nnz1(id) + nnz2(id))
    +        }
    +
    +        m2n2.activeIterator.foreach {
    +          case (id, 0.0) =>
    +          case (id, value) =>
    +            m2n1(id) +=
    +              value + deltaMean(id) * deltaMean(id) * nnz1(id) * nnz2(id) / (nnz1(id)+nnz2(id))
    +        }
    +
    +        max2.activeIterator.foreach {
    +          case (id, value) =>
    +            if (max1(id) < value) max1(id) = value
    +        }
    +
    +        min2.activeIterator.foreach {
    +          case (id, value) =>
    +            if (min1(id) > value) min1(id) = value
    +        }
    +
    +        axpy(1.0, nnz2, nnz1)
    +        VectorRDDStatisticalAggregator(mean1, m2n1, totalCnt, nnz1, max1, min1)
    +    }
    +  }
    +
    +  /**
    +   * Compute full column-wise statistics for the RDD with the size of Vector as input parameter.
    +   */
    +  def summarizeStatistics(): VectorRDDStatisticalAggregator = {
    +    val size = self.take(1).head.size
    +    val zeroValue = VectorRDDStatisticalAggregator(
    +      BV.zeros[Double](size),
    +      BV.zeros[Double](size),
    +      0.0,
    +      BV.zeros[Double](size),
    +      BV.fill(size)(Double.MinValue),
    +      BV.fill(size)(Double.MaxValue))
    +
    +    val VectorRDDStatisticalAggregator(currMean, currM2n, totalCnt, nnz, currMax, currMin) =
    +      self.map(_.toBreeze).aggregate(zeroValue)(seqOp, combOp)
    +
    +    // solve real mean
    +    val realMean = currMean :* nnz :/ totalCnt
    +
    +    // solve real m2n
    +    val deltaMean = currMean
    +    val realM2n = currM2n - ((deltaMean :* deltaMean) :* (nnz :* (nnz :- totalCnt)) :/ totalCnt)
    --- End diff --
    
    I understand that this is convenient, but it creates seven temporary vectors. A while loop might be better. Also, you need to put a comment on what this line does and explain the formula.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/268#discussion_r11170791
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/VectorRDDFunctions.scala ---
    @@ -0,0 +1,170 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.mllib.rdd
    +
    +import breeze.linalg.{axpy, Vector => BV}
    +
    +import org.apache.spark.mllib.linalg.{Vector, Vectors}
    +import org.apache.spark.rdd.RDD
    +
    +/**
    + * Case class of the summary statistics, including mean, variance, count, max, min, and non-zero
    + * elements count.
    + */
    +case class VectorRDDStatisticalSummary(
    +    mean: Vector,
    +    variance: Vector,
    +    count: Long,
    +    max: Vector,
    +    min: Vector,
    +    nonZeroCnt: Vector) extends Serializable
    --- End diff --
    
    Use `numNonzeros`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/268#discussion_r11235949
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/VectorRDDFunctions.scala ---
    @@ -0,0 +1,179 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.rdd
    +
    +import breeze.linalg.{axpy, Vector => BV}
    +
    +import org.apache.spark.mllib.linalg.{Vectors, Vector}
    +import org.apache.spark.rdd.RDD
    +
    +/**
    + * Trait of the summary statistics, including mean, variance, count, max, min, and non-zero elements
    + * count.
    + */
    +trait VectorRDDStatisticalSummary {
    +  def mean: Vector
    +  def variance: Vector
    +  def totalCount: Long
    +  def numNonZeros: Vector
    +  def max: Vector
    +  def min: Vector
    +}
    +
    +/**
    + * Aggregates [[org.apache.spark.mllib.rdd.VectorRDDStatisticalSummary VectorRDDStatisticalSummary]]
    + * together with add() and merge() function.
    + */
    +private class Aggregator(
    +    val currMean: BV[Double],
    +    val currM2n: BV[Double],
    +    var totalCnt: Double,
    +    val nnz: BV[Double],
    +    val currMax: BV[Double],
    +    val currMin: BV[Double]) extends VectorRDDStatisticalSummary with Serializable {
    +
    +  // lazy val is used for computing only once time. Same below.
    +  override lazy val mean = Vectors.fromBreeze(currMean :* nnz :/ totalCnt)
    +
    +  // Online variance solution used in add() function, while parallel variance solution used in
    +  // merge() function. Reference here:
    +  // http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance
    +  // Solution here ignoring the zero elements when calling add() and merge(), for decreasing the
    +  // O(n) algorithm to O(nnz). Real variance is computed here after we get other statistics, simply
    +  // by another parallel combination process.
    +  override lazy val variance = {
    +    val deltaMean = currMean
    +    var i = 0
    +    while(i < currM2n.size) {
    --- End diff --
    
    put a space after `while`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on the pull request:

    https://github.com/apache/spark/pull/268#issuecomment-38988446
  
    @yinxusen Thanks for working on this! I don't think row statistics are important because they represent values for different features. For column statistics, instead of implementing each statistic separately, we can compute all common statistics like (n, nnz, mean, variance, max, min) in a single job. This adds little overhead to the computation.
    
    Btw, `Vector.toArray` is an expensive operation for `SparseVector`. You should use breeze's axpy to aggregate vectors.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/268#issuecomment-39206557
  
    Build finished. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/268#discussion_r11193421
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/VectorRDDFunctions.scala ---
    @@ -0,0 +1,156 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.mllib.rdd
    +
    +import breeze.linalg.{axpy, Vector => BV}
    +
    +import org.apache.spark.mllib.linalg.Vector
    +import org.apache.spark.rdd.RDD
    +
    +/**
    + * Case class of the summary statistics, including mean, variance, count, max, min, and non-zero
    + * elements count.
    + */
    +case class VectorRDDStatisticalAggregator(
    --- End diff --
    
    Instead of a case class, use a normal class and hide breeze vector members.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/268#discussion_r11099367
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/VectorRDDFunctions.scala ---
    @@ -0,0 +1,93 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.mllib.rdd
    +
    +import breeze.linalg.{Vector => BV}
    +
    +import org.apache.spark.mllib.linalg.{Vector, Vectors}
    +import org.apache.spark.rdd.RDD
    +
    +/**
    + * Extra functions available on RDDs of [[org.apache.spark.mllib.linalg.Vector Vector]] through an
    + * implicit conversion. Import `org.apache.spark.MLContext._` at the top of your program to use
    + * these functions.
    + */
    +class VectorRDDFunctions(self: RDD[Vector]) extends Serializable {
    +
    +  /**
    +   * Compute full column-wise statistics for the RDD, including
    +   * {{{
    +   *   Mean:              Vector,
    +   *   Variance:          Vector,
    +   *   Count:             Double,
    +   *   Non-zero count:    Vector,
    +   *   Maximum elements:  Vector,
    +   *   Minimum elements:  Vector.
    +   * }}},
    +   * with the size of Vector as input parameter.
    +   */
    +  def statistics(size: Int): (Vector, Vector, Double, Vector, Vector, Vector) = {
    +    val results = self.map(_.toBreeze).aggregate((
    +      BV.zeros[Double](size),
    +      BV.zeros[Double](size),
    +      0.0,
    +      BV.zeros[Double](size),
    +      BV.fill(size){Double.MinValue},
    +      BV.fill(size){Double.MaxValue}))(
    +      seqOp = (c, v) => (c, v) match {
    +        case ((prevMean, prevM2n, cnt, nnzVec, maxVec, minVec), currData) =>
    +          val currMean = ((prevMean :* cnt) + currData) :/ (cnt + 1.0)
    +          val nonZeroCnt = Vectors
    +            .sparse(size, currData.activeKeysIterator.toSeq.map(x => (x, 1.0))).toBreeze
    +          currData.activeIterator.foreach { case (id, value) =>
    +            if (maxVec(id) < value) maxVec(id) = value
    +            if (minVec(id) > value) minVec(id) = value
    +          }
    +          (currMean,
    +            prevM2n + ((currData - prevMean) :* (currData - currMean)),
    +            cnt + 1.0,
    +            nnzVec + nonZeroCnt,
    +            maxVec,
    +            minVec)
    +      },
    +      combOp = (lhs, rhs) => (lhs, rhs) match {
    +        case (
    +          (lhsMean, lhsM2n, lhsCnt, lhsNNZ, lhsMax, lhsMin),
    --- End diff --
    
    `lhs` and `rhs` are used in a context where there is an equation. Let's update the names to `mean1`, `mean2`, etc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/268#discussion_r11169437
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/VectorRDDFunctions.scala ---
    @@ -0,0 +1,170 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.mllib.rdd
    +
    +import breeze.linalg.{axpy, Vector => BV}
    +
    +import org.apache.spark.mllib.linalg.{Vector, Vectors}
    +import org.apache.spark.rdd.RDD
    +
    +/**
    + * Case class of the summary statistics, including mean, variance, count, max, min, and non-zero
    + * elements count.
    + */
    +case class VectorRDDStatisticalSummary(
    +    mean: Vector,
    +    variance: Vector,
    +    count: Long,
    +    max: Vector,
    +    min: Vector,
    +    nonZeroCnt: Vector) extends Serializable
    +
    +/**
    + * Case class of the aggregate value for collecting summary statistics from RDD[Vector]. These
    + * values are relatively with
    + * [[org.apache.spark.mllib.rdd.VectorRDDStatisticalSummary VectorRDDStatisticalSummary]], the
    + * latter is computed from the former.
    + */
    +private case class VectorRDDStatisticalRing(
    +    fakeMean: BV[Double],
    +    fakeM2n: BV[Double],
    +    totalCnt: Double,
    +    nnz: BV[Double],
    +    fakeMax: BV[Double],
    +    fakeMin: BV[Double])
    +
    +/**
    + * Extra functions available on RDDs of [[org.apache.spark.mllib.linalg.Vector Vector]] through an
    + * implicit conversion. Import `org.apache.spark.MLContext._` at the top of your program to use
    + * these functions.
    + */
    +class VectorRDDFunctions(self: RDD[Vector]) extends Serializable {
    +
    +  /**
    +   * Aggregate function used for aggregating elements in a worker together.
    +   */
    +  private def seqOp(
    +      aggregator: VectorRDDStatisticalRing,
    +      currData: BV[Double]): VectorRDDStatisticalRing = {
    +    aggregator match {
    +      case VectorRDDStatisticalRing(prevMean, prevM2n, cnt, nnzVec, maxVec, minVec) =>
    +        currData.activeIterator.foreach {
    +          case (id, value) =>
    +            if (maxVec(id) < value) maxVec(id) = value
    +            if (minVec(id) > value) minVec(id) = value
    +
    +            val tmpPrevMean = prevMean(id)
    +            prevMean(id) = (prevMean(id) * cnt + value) / (cnt + 1.0)
    +            prevM2n(id) += (value - prevMean(id)) * (value - tmpPrevMean)
    +
    +            nnzVec(id) += 1.0
    +        }
    +
    +        VectorRDDStatisticalRing(
    +          prevMean,
    +          prevM2n,
    +          cnt + 1.0,
    +          nnzVec,
    +          maxVec,
    +          minVec)
    +    }
    +  }
    +
    +  /**
    +   * Combine function used for combining intermediate results together from every worker.
    +   */
    +  private def combOp(
    +      statistics1: VectorRDDStatisticalRing,
    +      statistics2: VectorRDDStatisticalRing): VectorRDDStatisticalRing = {
    +    (statistics1, statistics2) match {
    +      case (VectorRDDStatisticalRing(mean1, m2n1, cnt1, nnz1, max1, min1),
    +            VectorRDDStatisticalRing(mean2, m2n2, cnt2, nnz2, max2, min2)) =>
    +        val totalCnt = cnt1 + cnt2
    +        val deltaMean = mean2 - mean1
    +
    +        mean2.activeIterator.foreach {
    +          case (id, 0.0) =>
    +          case (id, value) =>
    +            mean1(id) = (mean1(id) * nnz1(id) + mean2(id) * nnz2(id)) / (nnz1(id) + nnz2(id))
    +        }
    +
    +        m2n2.activeIterator.foreach {
    +          case (id, 0.0) =>
    +          case (id, value) =>
    +            m2n1(id) +=
    +              value + deltaMean(id) * deltaMean(id) * nnz1(id) * nnz2(id) / (nnz1(id)+nnz2(id))
    +        }
    +
    +        max2.activeIterator.foreach {
    +          case (id, value) =>
    +            if (max1(id) < value) max1(id) = value
    +        }
    +
    +        min2.activeIterator.foreach {
    +          case (id, value) =>
    +            if (min1(id) > value) min1(id) = value
    +        }
    +
    +        axpy(1.0, nnz2, nnz1)
    +        VectorRDDStatisticalRing(mean1, m2n1, totalCnt, nnz1, max1, min1)
    +    }
    +  }
    +
    +  /**
    +   * Compute full column-wise statistics for the RDD with the size of Vector as input parameter.
    +   */
    +  def summarizeStatistics(size: Int): VectorRDDStatisticalSummary = {
    +    val zeroValue = VectorRDDStatisticalRing(
    +      BV.zeros[Double](size),
    +      BV.zeros[Double](size),
    +      0.0,
    +      BV.zeros[Double](size),
    +      BV.fill(size)(Double.MinValue),
    +      BV.fill(size)(Double.MaxValue))
    +
    +    val VectorRDDStatisticalRing(fakeMean, fakeM2n, totalCnt, nnz, fakeMax, fakeMin) =
    +      self.map(_.toBreeze).aggregate(zeroValue)(seqOp, combOp)
    +
    +    // solve real mean
    +    val realMean = fakeMean :* nnz :/ totalCnt
    +
    +    // solve real m2n
    +    val deltaMean = fakeMean
    +    val realM2n = fakeM2n - ((deltaMean :* deltaMean) :* (nnz :* (nnz :- totalCnt)) :/ totalCnt)
    +
    +    // remove the initial value in max and min, i.e. the Double.MaxValue or Double.MinValue.
    --- End diff --
    
    You should check for every index where `nnz(i) != n`. Don't need to handle Double.MinValue / Double.MaxValue explicitly.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/268#issuecomment-39415215
  
    
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/13718/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/268#discussion_r11169339
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/VectorRDDFunctions.scala ---
    @@ -0,0 +1,170 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.mllib.rdd
    +
    +import breeze.linalg.{axpy, Vector => BV}
    +
    +import org.apache.spark.mllib.linalg.{Vector, Vectors}
    +import org.apache.spark.rdd.RDD
    +
    +/**
    + * Case class of the summary statistics, including mean, variance, count, max, min, and non-zero
    + * elements count.
    + */
    +case class VectorRDDStatisticalSummary(
    +    mean: Vector,
    +    variance: Vector,
    +    count: Long,
    +    max: Vector,
    +    min: Vector,
    +    nonZeroCnt: Vector) extends Serializable
    +
    +/**
    + * Case class of the aggregate value for collecting summary statistics from RDD[Vector]. These
    + * values are relatively with
    + * [[org.apache.spark.mllib.rdd.VectorRDDStatisticalSummary VectorRDDStatisticalSummary]], the
    + * latter is computed from the former.
    + */
    +private case class VectorRDDStatisticalRing(
    +    fakeMean: BV[Double],
    +    fakeM2n: BV[Double],
    +    totalCnt: Double,
    +    nnz: BV[Double],
    +    fakeMax: BV[Double],
    +    fakeMin: BV[Double])
    +
    +/**
    + * Extra functions available on RDDs of [[org.apache.spark.mllib.linalg.Vector Vector]] through an
    + * implicit conversion. Import `org.apache.spark.MLContext._` at the top of your program to use
    + * these functions.
    + */
    +class VectorRDDFunctions(self: RDD[Vector]) extends Serializable {
    +
    +  /**
    +   * Aggregate function used for aggregating elements in a worker together.
    +   */
    +  private def seqOp(
    +      aggregator: VectorRDDStatisticalRing,
    +      currData: BV[Double]): VectorRDDStatisticalRing = {
    +    aggregator match {
    +      case VectorRDDStatisticalRing(prevMean, prevM2n, cnt, nnzVec, maxVec, minVec) =>
    +        currData.activeIterator.foreach {
    +          case (id, value) =>
    +            if (maxVec(id) < value) maxVec(id) = value
    +            if (minVec(id) > value) minVec(id) = value
    +
    +            val tmpPrevMean = prevMean(id)
    +            prevMean(id) = (prevMean(id) * cnt + value) / (cnt + 1.0)
    +            prevM2n(id) += (value - prevMean(id)) * (value - tmpPrevMean)
    +
    +            nnzVec(id) += 1.0
    +        }
    +
    +        VectorRDDStatisticalRing(
    +          prevMean,
    +          prevM2n,
    +          cnt + 1.0,
    +          nnzVec,
    +          maxVec,
    +          minVec)
    +    }
    +  }
    +
    +  /**
    +   * Combine function used for combining intermediate results together from every worker.
    +   */
    +  private def combOp(
    +      statistics1: VectorRDDStatisticalRing,
    +      statistics2: VectorRDDStatisticalRing): VectorRDDStatisticalRing = {
    +    (statistics1, statistics2) match {
    +      case (VectorRDDStatisticalRing(mean1, m2n1, cnt1, nnz1, max1, min1),
    +            VectorRDDStatisticalRing(mean2, m2n2, cnt2, nnz2, max2, min2)) =>
    +        val totalCnt = cnt1 + cnt2
    +        val deltaMean = mean2 - mean1
    +
    +        mean2.activeIterator.foreach {
    +          case (id, 0.0) =>
    +          case (id, value) =>
    +            mean1(id) = (mean1(id) * nnz1(id) + mean2(id) * nnz2(id)) / (nnz1(id) + nnz2(id))
    +        }
    +
    +        m2n2.activeIterator.foreach {
    +          case (id, 0.0) =>
    +          case (id, value) =>
    +            m2n1(id) +=
    +              value + deltaMean(id) * deltaMean(id) * nnz1(id) * nnz2(id) / (nnz1(id)+nnz2(id))
    +        }
    +
    +        max2.activeIterator.foreach {
    +          case (id, value) =>
    +            if (max1(id) < value) max1(id) = value
    +        }
    +
    +        min2.activeIterator.foreach {
    +          case (id, value) =>
    +            if (min1(id) > value) min1(id) = value
    +        }
    +
    +        axpy(1.0, nnz2, nnz1)
    +        VectorRDDStatisticalRing(mean1, m2n1, totalCnt, nnz1, max1, min1)
    +    }
    +  }
    +
    +  /**
    +   * Compute full column-wise statistics for the RDD with the size of Vector as input parameter.
    +   */
    +  def summarizeStatistics(size: Int): VectorRDDStatisticalSummary = {
    --- End diff --
    
    `size` is not necessary. You can get the size from the first element.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/268#discussion_r11235956
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/VectorRDDFunctions.scala ---
    @@ -0,0 +1,179 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.rdd
    +
    +import breeze.linalg.{axpy, Vector => BV}
    +
    +import org.apache.spark.mllib.linalg.{Vectors, Vector}
    +import org.apache.spark.rdd.RDD
    +
    +/**
    + * Trait of the summary statistics, including mean, variance, count, max, min, and non-zero elements
    + * count.
    + */
    +trait VectorRDDStatisticalSummary {
    +  def mean: Vector
    +  def variance: Vector
    +  def totalCount: Long
    +  def numNonZeros: Vector
    +  def max: Vector
    +  def min: Vector
    +}
    +
    +/**
    + * Aggregates [[org.apache.spark.mllib.rdd.VectorRDDStatisticalSummary VectorRDDStatisticalSummary]]
    + * together with add() and merge() function.
    + */
    +private class Aggregator(
    +    val currMean: BV[Double],
    +    val currM2n: BV[Double],
    +    var totalCnt: Double,
    +    val nnz: BV[Double],
    +    val currMax: BV[Double],
    +    val currMin: BV[Double]) extends VectorRDDStatisticalSummary with Serializable {
    +
    +  // lazy val is used for computing only once time. Same below.
    +  override lazy val mean = Vectors.fromBreeze(currMean :* nnz :/ totalCnt)
    +
    +  // Online variance solution used in add() function, while parallel variance solution used in
    +  // merge() function. Reference here:
    +  // http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance
    +  // Solution here ignoring the zero elements when calling add() and merge(), for decreasing the
    +  // O(n) algorithm to O(nnz). Real variance is computed here after we get other statistics, simply
    +  // by another parallel combination process.
    +  override lazy val variance = {
    +    val deltaMean = currMean
    +    var i = 0
    +    while(i < currM2n.size) {
    +      currM2n(i) += deltaMean(i) * deltaMean(i) * nnz(i) * (totalCnt-nnz(i)) / totalCnt
    --- End diff --
    
    `totalCnt-nnz(i)` -> `totalCnt - nnz(i)` (spaces around `-`)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/268#discussion_r11099292
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/VectorRDDFunctions.scala ---
    @@ -0,0 +1,93 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.mllib.rdd
    +
    +import breeze.linalg.{Vector => BV}
    +
    +import org.apache.spark.mllib.linalg.{Vector, Vectors}
    +import org.apache.spark.rdd.RDD
    +
    +/**
    + * Extra functions available on RDDs of [[org.apache.spark.mllib.linalg.Vector Vector]] through an
    + * implicit conversion. Import `org.apache.spark.MLContext._` at the top of your program to use
    + * these functions.
    + */
    +class VectorRDDFunctions(self: RDD[Vector]) extends Serializable {
    +
    +  /**
    +   * Compute full column-wise statistics for the RDD, including
    +   * {{{
    +   *   Mean:              Vector,
    +   *   Variance:          Vector,
    +   *   Count:             Double,
    +   *   Non-zero count:    Vector,
    +   *   Maximum elements:  Vector,
    +   *   Minimum elements:  Vector.
    +   * }}},
    +   * with the size of Vector as input parameter.
    +   */
    +  def statistics(size: Int): (Vector, Vector, Double, Vector, Vector, Vector) = {
    --- End diff --
    
    Please wrap the return type to a class called `VectorRDDStatisticalSummary` that provides `mean`, 'variance`, `count: Long`, `min`, `max`, and `std`. So later we can add more summary statistics to it. I would also recommend changing the method name to `summarize` or `summarizeStatistics`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/268#discussion_r11235926
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/VectorRDDFunctions.scala ---
    @@ -0,0 +1,179 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.rdd
    +
    +import breeze.linalg.{axpy, Vector => BV}
    +
    +import org.apache.spark.mllib.linalg.{Vectors, Vector}
    +import org.apache.spark.rdd.RDD
    +
    +/**
    + * Trait of the summary statistics, including mean, variance, count, max, min, and non-zero elements
    + * count.
    + */
    +trait VectorRDDStatisticalSummary {
    +  def mean: Vector
    +  def variance: Vector
    +  def totalCount: Long
    +  def numNonZeros: Vector
    +  def max: Vector
    +  def min: Vector
    +}
    +
    +/**
    + * Aggregates [[org.apache.spark.mllib.rdd.VectorRDDStatisticalSummary VectorRDDStatisticalSummary]]
    + * together with add() and merge() function.
    + */
    +private class Aggregator(
    +    val currMean: BV[Double],
    +    val currM2n: BV[Double],
    +    var totalCnt: Double,
    +    val nnz: BV[Double],
    +    val currMax: BV[Double],
    +    val currMin: BV[Double]) extends VectorRDDStatisticalSummary with Serializable {
    +
    +  // lazy val is used for computing only once time. Same below.
    +  override lazy val mean = Vectors.fromBreeze(currMean :* nnz :/ totalCnt)
    --- End diff --
    
    I don't think `lazy val` is useful because user should call this method only once. If the dimension is beyond 1e6, cache this extra mean vector cost several MBs. Same for other lazy values. Also, use while loop instead of chaining vector ops.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/268#issuecomment-39206558
  
    
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/13630/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/268#discussion_r11099320
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/VectorRDDFunctions.scala ---
    @@ -0,0 +1,93 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.mllib.rdd
    +
    +import breeze.linalg.{Vector => BV}
    +
    +import org.apache.spark.mllib.linalg.{Vector, Vectors}
    +import org.apache.spark.rdd.RDD
    +
    +/**
    + * Extra functions available on RDDs of [[org.apache.spark.mllib.linalg.Vector Vector]] through an
    + * implicit conversion. Import `org.apache.spark.MLContext._` at the top of your program to use
    + * these functions.
    + */
    +class VectorRDDFunctions(self: RDD[Vector]) extends Serializable {
    +
    +  /**
    +   * Compute full column-wise statistics for the RDD, including
    +   * {{{
    +   *   Mean:              Vector,
    +   *   Variance:          Vector,
    +   *   Count:             Double,
    +   *   Non-zero count:    Vector,
    +   *   Maximum elements:  Vector,
    +   *   Minimum elements:  Vector.
    +   * }}},
    +   * with the size of Vector as input parameter.
    +   */
    +  def statistics(size: Int): (Vector, Vector, Double, Vector, Vector, Vector) = {
    +    val results = self.map(_.toBreeze).aggregate((
    +      BV.zeros[Double](size),
    +      BV.zeros[Double](size),
    +      0.0,
    +      BV.zeros[Double](size),
    +      BV.fill(size){Double.MinValue},
    +      BV.fill(size){Double.MaxValue}))(
    +      seqOp = (c, v) => (c, v) match {
    +        case ((prevMean, prevM2n, cnt, nnzVec, maxVec, minVec), currData) =>
    +          val currMean = ((prevMean :* cnt) + currData) :/ (cnt + 1.0)
    --- End diff --
    
    Use in-place ops and axpy whenever possible for performance. For this line, it is equivalent to 
    ~~~
    currMean = prevMean :* (cnt / (cnt + 1.0))
    axpy(1.0/(cnt+1.0), currData, currMean)
    ~~~
    So you only create one temp array. You can also put this temp storage into `aggregate`'s initial value set to avoid object creation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/268#discussion_r11193650
  
    --- Diff: mllib/src/test/scala/org/apache/spark/mllib/rdd/VectorRDDFunctionsSuite.scala ---
    @@ -0,0 +1,87 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.mllib.rdd
    +
    +import scala.collection.mutable.ArrayBuffer
    +
    +import org.scalatest.FunSuite
    +
    +import org.apache.spark.mllib.linalg.{Vector, Vectors}
    +import org.apache.spark.mllib.util.LocalSparkContext
    +import org.apache.spark.mllib.util.MLUtils._
    +
    +/**
    + * Test suite for the summary statistics of RDD[Vector]. Both the accuracy and the time consuming
    + * between dense and sparse vector are tested.
    + */
    +class VectorRDDFunctionsSuite extends FunSuite with LocalSparkContext {
    +  import VectorRDDFunctionsSuite._
    +
    +  val localData = Array(
    +    Vectors.dense(1.0, 2.0, 3.0),
    +    Vectors.dense(4.0, 5.0, 6.0),
    +    Vectors.dense(7.0, 8.0, 9.0)
    +  )
    +
    +  val sparseData = ArrayBuffer(Vectors.sparse(20, Seq((0, 1.0), (9, 2.0), (10, 7.0))))
    +  for (i <- 0 until 10000) sparseData += Vectors.sparse(20, Seq((9, 0.0)))
    +  sparseData += Vectors.sparse(20, Seq((0, 5.0), (9, 13.0), (16, 2.0)))
    +  sparseData += Vectors.sparse(20, Seq((3, 5.0), (9, 13.0), (18, 2.0)))
    +
    +  test("full-statistics") {
    +    val data = sc.parallelize(localData, 2)
    +    val (VectorRDDStatisticalAggregator(mean, variance, cnt, nnz, max, min), denseTime) =
    +      time(data.summarizeStatistics())
    +
    +    assert(equivVector(Vectors.fromBreeze(mean), Vectors.dense(4.0, 5.0, 6.0)),
    +      "Column mean do not match.")
    +    assert(equivVector(Vectors.fromBreeze(variance), Vectors.dense(6.0, 6.0, 6.0)),
    +      "Column variance do not match.")
    +    assert(cnt === 3.0, "Column cnt do not match.")
    +    assert(equivVector(Vectors.fromBreeze(nnz), Vectors.dense(3.0, 3.0, 3.0)),
    +      "Column nnz do not match.")
    +    assert(equivVector(Vectors.fromBreeze(max), Vectors.dense(7.0, 8.0, 9.0)),
    +      "Column max do not match.")
    +    assert(equivVector(Vectors.fromBreeze(min), Vectors.dense(1.0, 2.0, 3.0)),
    +      "Column min do not match.")
    +
    +    val dataForSparse = sc.parallelize(sparseData.toSeq, 2)
    +    val (_, sparseTime) = time(dataForSparse.summarizeStatistics())
    +
    +    println(s"dense time is $denseTime, sparse time is $sparseTime.")
    --- End diff --
    
    Remove println messages.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by yinxusen <gi...@git.apache.org>.
Github user yinxusen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/268#discussion_r11197597
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/VectorRDDFunctions.scala ---
    @@ -0,0 +1,156 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.mllib.rdd
    +
    +import breeze.linalg.{axpy, Vector => BV}
    +
    +import org.apache.spark.mllib.linalg.Vector
    +import org.apache.spark.rdd.RDD
    +
    +/**
    + * Case class of the summary statistics, including mean, variance, count, max, min, and non-zero
    + * elements count.
    + */
    +case class VectorRDDStatisticalAggregator(
    +    mean: BV[Double],
    +    statCounter: BV[Double],
    +    totalCount: Double,
    +    numNonZeros: BV[Double],
    +    max: BV[Double],
    +    min: BV[Double])
    +
    +/**
    + * Extra functions available on RDDs of [[org.apache.spark.mllib.linalg.Vector Vector]] through an
    + * implicit conversion. Import `org.apache.spark.MLContext._` at the top of your program to use
    + * these functions.
    + */
    +class VectorRDDFunctions(self: RDD[Vector]) extends Serializable {
    +
    +  /**
    +   * Aggregate function used for aggregating elements in a worker together.
    +   */
    +  private def seqOp(
    --- End diff --
    
    Hmm... that's great! Code could be simplified a lot.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/268#issuecomment-38985980
  
    Merged build started. Build is starting -or- tests failed to complete.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/268#discussion_r11193468
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/VectorRDDFunctions.scala ---
    @@ -0,0 +1,156 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.mllib.rdd
    +
    +import breeze.linalg.{axpy, Vector => BV}
    +
    +import org.apache.spark.mllib.linalg.Vector
    +import org.apache.spark.rdd.RDD
    +
    +/**
    + * Case class of the summary statistics, including mean, variance, count, max, min, and non-zero
    + * elements count.
    + */
    +case class VectorRDDStatisticalAggregator(
    --- End diff --
    
    For example, you can have a trait:
    ~~~
    class VectorRDDStatisticalSummary {
      def mean(): Vector
      def variance(): Vector
    }
    ~~~
    Set `computeSummaryStatistics`'s return type to this trait. Let aggregator implement this trait but mark it private.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by yinxusen <gi...@git.apache.org>.
Github user yinxusen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/268#discussion_r11470053
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala ---
    @@ -19,13 +19,144 @@ package org.apache.spark.mllib.linalg.distributed
     
     import java.util
     
    -import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, svd => brzSvd}
    +import breeze.linalg.{Vector => BV, DenseMatrix => BDM, DenseVector => BDV, svd => brzSvd}
     import breeze.numerics.{sqrt => brzSqrt}
     import com.github.fommil.netlib.BLAS.{getInstance => blas}
     
     import org.apache.spark.mllib.linalg._
     import org.apache.spark.rdd.RDD
     import org.apache.spark.Logging
    +import org.apache.spark.mllib.stat.MultivariateStatisticalSummary
    +
    +/**
    + * Column statistics aggregator implementing
    + * [[org.apache.spark.mllib.stat.MultivariateStatisticalSummary]]
    + * together with add() and merge() function.
    + * A numerically stable algorithm is implemented to compute sample mean and variance:
    +  *[[http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance variance-wiki]].
    + * Zero elements (including explicit zero values) are skipped when calling add() and merge(),
    + * to have time complexity O(nnz) instead of O(n) for each column.
    + */
    +private class ColumnStatisticsAggregator(private val n: Int)
    +    extends MultivariateStatisticalSummary with Serializable {
    +
    +  private val currMean: BDV[Double] = BDV.zeros[Double](n)
    +  private val currM2n: BDV[Double] = BDV.zeros[Double](n)
    +  private var totalCnt = 0.0
    +  private val nnz: BDV[Double] = BDV.zeros[Double](n)
    +  private val currMax: BDV[Double] = BDV.fill(n)(Double.MinValue)
    +  private val currMin: BDV[Double] = BDV.fill(n)(Double.MaxValue)
    +
    +  override def mean: Vector = {
    +    val realMean = BDV.zeros[Double](n)
    +    var i = 0
    +    while (i < n) {
    +      realMean(i) = currMean(i) * nnz(i) / totalCnt
    +      i += 1
    +    }
    +    Vectors.fromBreeze(realMean)
    +  }
    +
    +  override def variance: Vector = {
    +    val realVariance = BDV.zeros[Double](n)
    +
    +    val denominator = totalCnt - 1.0
    +
    +    // Sample variance is computed, if the denominator is 0, the variance is just 0.
    +    if (denominator != 0.0) {
    --- End diff --
    
    Yep, you are right, I prefer the second choice.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/268#issuecomment-40041606
  
     Build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/268#discussion_r11169105
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/VectorRDDFunctions.scala ---
    @@ -0,0 +1,170 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.mllib.rdd
    +
    +import breeze.linalg.{axpy, Vector => BV}
    +
    +import org.apache.spark.mllib.linalg.{Vector, Vectors}
    +import org.apache.spark.rdd.RDD
    +
    +/**
    + * Case class of the summary statistics, including mean, variance, count, max, min, and non-zero
    + * elements count.
    + */
    +case class VectorRDDStatisticalSummary(
    +    mean: Vector,
    +    variance: Vector,
    +    count: Long,
    +    max: Vector,
    +    min: Vector,
    +    nonZeroCnt: Vector) extends Serializable
    +
    +/**
    + * Case class of the aggregate value for collecting summary statistics from RDD[Vector]. These
    + * values are relatively with
    + * [[org.apache.spark.mllib.rdd.VectorRDDStatisticalSummary VectorRDDStatisticalSummary]], the
    + * latter is computed from the former.
    + */
    +private case class VectorRDDStatisticalRing(
    --- End diff --
    
    What do you mean by `Ring`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/268#issuecomment-40031012
  
    Merged build finished. All automated tests passed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/268#issuecomment-39347102
  
    Build finished. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/268#issuecomment-39527124
  
    Build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/268#discussion_r11235979
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/VectorRDDFunctions.scala ---
    @@ -0,0 +1,179 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.rdd
    +
    +import breeze.linalg.{axpy, Vector => BV}
    +
    +import org.apache.spark.mllib.linalg.{Vectors, Vector}
    +import org.apache.spark.rdd.RDD
    +
    +/**
    + * Trait of the summary statistics, including mean, variance, count, max, min, and non-zero elements
    + * count.
    + */
    +trait VectorRDDStatisticalSummary {
    +  def mean: Vector
    +  def variance: Vector
    +  def totalCount: Long
    +  def numNonZeros: Vector
    +  def max: Vector
    +  def min: Vector
    +}
    +
    +/**
    + * Aggregates [[org.apache.spark.mllib.rdd.VectorRDDStatisticalSummary VectorRDDStatisticalSummary]]
    + * together with add() and merge() function.
    + */
    +private class Aggregator(
    +    val currMean: BV[Double],
    +    val currM2n: BV[Double],
    +    var totalCnt: Double,
    +    val nnz: BV[Double],
    +    val currMax: BV[Double],
    +    val currMin: BV[Double]) extends VectorRDDStatisticalSummary with Serializable {
    +
    +  // lazy val is used for computing only once time. Same below.
    +  override lazy val mean = Vectors.fromBreeze(currMean :* nnz :/ totalCnt)
    +
    +  // Online variance solution used in add() function, while parallel variance solution used in
    +  // merge() function. Reference here:
    +  // http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance
    +  // Solution here ignoring the zero elements when calling add() and merge(), for decreasing the
    +  // O(n) algorithm to O(nnz). Real variance is computed here after we get other statistics, simply
    +  // by another parallel combination process.
    +  override lazy val variance = {
    +    val deltaMean = currMean
    +    var i = 0
    +    while(i < currM2n.size) {
    +      currM2n(i) += deltaMean(i) * deltaMean(i) * nnz(i) * (totalCnt-nnz(i)) / totalCnt
    +      currM2n(i) /= totalCnt
    +      i += 1
    +    }
    +    Vectors.fromBreeze(currM2n)
    +  }
    +
    +  override lazy val totalCount: Long = totalCnt.toLong
    +
    +  override lazy val numNonZeros: Vector = Vectors.fromBreeze(nnz)
    +
    +  override lazy val max: Vector = {
    +    nnz.iterator.foreach {
    --- End diff --
    
    Use while loop instead of iterator.foreach.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/268#issuecomment-39284932
  
     Build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/268#issuecomment-40041612
  
    Build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/268#issuecomment-38986779
  
    Merged build finished. All automated tests passed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on the pull request:

    https://github.com/apache/spark/pull/268#issuecomment-39822204
  
    @yinxusen Yes, let's wait until #296 gets merged. Thanks for being patient! 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/268#discussion_r11470006
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala ---
    @@ -19,13 +19,144 @@ package org.apache.spark.mllib.linalg.distributed
     
     import java.util
     
    -import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, svd => brzSvd}
    +import breeze.linalg.{Vector => BV, DenseMatrix => BDM, DenseVector => BDV, svd => brzSvd}
     import breeze.numerics.{sqrt => brzSqrt}
     import com.github.fommil.netlib.BLAS.{getInstance => blas}
     
     import org.apache.spark.mllib.linalg._
     import org.apache.spark.rdd.RDD
     import org.apache.spark.Logging
    +import org.apache.spark.mllib.stat.MultivariateStatisticalSummary
    +
    +/**
    + * Column statistics aggregator implementing
    + * [[org.apache.spark.mllib.stat.MultivariateStatisticalSummary]]
    + * together with add() and merge() function.
    + * A numerically stable algorithm is implemented to compute sample mean and variance:
    +  *[[http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance variance-wiki]].
    + * Zero elements (including explicit zero values) are skipped when calling add() and merge(),
    + * to have time complexity O(nnz) instead of O(n) for each column.
    + */
    +private class ColumnStatisticsAggregator(private val n: Int)
    +    extends MultivariateStatisticalSummary with Serializable {
    +
    +  private val currMean: BDV[Double] = BDV.zeros[Double](n)
    +  private val currM2n: BDV[Double] = BDV.zeros[Double](n)
    +  private var totalCnt = 0.0
    +  private val nnz: BDV[Double] = BDV.zeros[Double](n)
    +  private val currMax: BDV[Double] = BDV.fill(n)(Double.MinValue)
    +  private val currMin: BDV[Double] = BDV.fill(n)(Double.MaxValue)
    +
    +  override def mean: Vector = {
    +    val realMean = BDV.zeros[Double](n)
    +    var i = 0
    +    while (i < n) {
    +      realMean(i) = currMean(i) * nnz(i) / totalCnt
    +      i += 1
    +    }
    +    Vectors.fromBreeze(realMean)
    +  }
    +
    +  override def variance: Vector = {
    +    val realVariance = BDV.zeros[Double](n)
    +
    +    val denominator = totalCnt - 1.0
    +
    +    // Sample variance is computed, if the denominator is 0, the variance is just 0.
    +    if (denominator != 0.0) {
    --- End diff --
    
    Possibly a stupid question, but what happens if totalCnt is 0? I guess the stuff below is skipped since m2n.size is 0? It would be good to add a test for this and maybe just change this test to if(denominator > 0) for clarity.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/268#issuecomment-40044966
  
    
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/13981/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/268#issuecomment-39957521
  
    Build finished. All automated tests passed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/268#discussion_r11271141
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/VectorRDDFunctions.scala ---
    @@ -0,0 +1,185 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.rdd
    +
    +import breeze.linalg.{Vector => BV, DenseVector => BDV}
    +
    +import org.apache.spark.mllib.linalg.{Vectors, Vector}
    +import org.apache.spark.rdd.RDD
    +
    +/**
    + * Trait of the summary statistics, including mean, variance, count, max, min, and non-zero elements
    + * count.
    + */
    +trait VectorRDDStatisticalSummary {
    +  def mean: Vector
    +  def variance: Vector
    +  def count: Long
    +  def numNonZeros: Vector
    +  def max: Vector
    +  def min: Vector
    +}
    +
    +/**
    + * Aggregates [[org.apache.spark.mllib.rdd.VectorRDDStatisticalSummary VectorRDDStatisticalSummary]]
    + * together with add() and merge() function. Online variance solution used in add() function, while
    + * parallel variance solution used in merge() function. Reference here:
    + * [[http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance variance-wiki]]. Solution here
    + * ignoring the zero elements when calling add() and merge(), for decreasing the O(n) algorithm to
    + * O(nnz). Real variance is computed here after we get other statistics, simply by another parallel
    + * combination process.
    + */
    +private class VectorRDDStatisticsAggregator(
    +    val currMean: BDV[Double],
    +    val currM2n: BDV[Double],
    +    var totalCnt: Double,
    +    val nnz: BDV[Double],
    +    val currMax: BDV[Double],
    +    val currMin: BDV[Double])
    +  extends VectorRDDStatisticalSummary with Serializable {
    +
    +  // lazy val is used for computing only once time. Same below.
    --- End diff --
    
    remove comment


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/268#discussion_r11236019
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/VectorRDDFunctions.scala ---
    @@ -0,0 +1,179 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.rdd
    +
    +import breeze.linalg.{axpy, Vector => BV}
    +
    +import org.apache.spark.mllib.linalg.{Vectors, Vector}
    +import org.apache.spark.rdd.RDD
    +
    +/**
    + * Trait of the summary statistics, including mean, variance, count, max, min, and non-zero elements
    + * count.
    + */
    +trait VectorRDDStatisticalSummary {
    +  def mean: Vector
    +  def variance: Vector
    +  def totalCount: Long
    +  def numNonZeros: Vector
    +  def max: Vector
    +  def min: Vector
    +}
    +
    +/**
    + * Aggregates [[org.apache.spark.mllib.rdd.VectorRDDStatisticalSummary VectorRDDStatisticalSummary]]
    + * together with add() and merge() function.
    + */
    +private class Aggregator(
    +    val currMean: BV[Double],
    +    val currM2n: BV[Double],
    +    var totalCnt: Double,
    +    val nnz: BV[Double],
    +    val currMax: BV[Double],
    +    val currMin: BV[Double]) extends VectorRDDStatisticalSummary with Serializable {
    +
    +  // lazy val is used for computing only once time. Same below.
    +  override lazy val mean = Vectors.fromBreeze(currMean :* nnz :/ totalCnt)
    +
    +  // Online variance solution used in add() function, while parallel variance solution used in
    +  // merge() function. Reference here:
    +  // http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance
    +  // Solution here ignoring the zero elements when calling add() and merge(), for decreasing the
    +  // O(n) algorithm to O(nnz). Real variance is computed here after we get other statistics, simply
    +  // by another parallel combination process.
    +  override lazy val variance = {
    +    val deltaMean = currMean
    +    var i = 0
    +    while(i < currM2n.size) {
    +      currM2n(i) += deltaMean(i) * deltaMean(i) * nnz(i) * (totalCnt-nnz(i)) / totalCnt
    +      currM2n(i) /= totalCnt
    +      i += 1
    +    }
    +    Vectors.fromBreeze(currM2n)
    +  }
    +
    +  override lazy val totalCount: Long = totalCnt.toLong
    +
    +  override lazy val numNonZeros: Vector = Vectors.fromBreeze(nnz)
    +
    +  override lazy val max: Vector = {
    +    nnz.iterator.foreach {
    +      case (id, count) =>
    +        if ((count == 0.0) || ((count < totalCnt) && (currMax(id) < 0.0)))  currMax(id) = 0.0
    +    }
    +    Vectors.fromBreeze(currMax)
    +  }
    +
    +  override lazy val min: Vector = {
    +    nnz.iterator.foreach {
    +      case (id, count) =>
    +        if ((count == 0.0) || ((count < totalCnt) && (currMin(id) > 0.0))) currMin(id) = 0.0
    --- End diff --
    
    Same issues here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/268#discussion_r11468552
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala ---
    @@ -19,13 +19,144 @@ package org.apache.spark.mllib.linalg.distributed
     
     import java.util
     
    -import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, svd => brzSvd}
    +import breeze.linalg.{Vector => BV, DenseMatrix => BDM, DenseVector => BDV, svd => brzSvd}
     import breeze.numerics.{sqrt => brzSqrt}
     import com.github.fommil.netlib.BLAS.{getInstance => blas}
     
     import org.apache.spark.mllib.linalg._
     import org.apache.spark.rdd.RDD
     import org.apache.spark.Logging
    +import org.apache.spark.mllib.stat.MultivariateStatisticalSummary
    +
    +/**
    + * Column statistics aggregator implementing
    + * [[org.apache.spark.mllib.stat.MultivariateStatisticalSummary]]
    + * together with add() and merge() function.
    + * A numerically stable algorithm is implemented to compute sample mean and variance:
    +  *[[http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance variance-wiki]].
    + * Zero elements (including explicit zero values) are skipped when calling add() and merge(),
    + * to have time complexity O(nnz) instead of O(n) for each column.
    + */
    +private class ColumnStatisticsAggregator(private val n: Int)
    +    extends MultivariateStatisticalSummary with Serializable {
    +
    +  private val currMean: BDV[Double] = BDV.zeros[Double](n)
    +  private val currM2n: BDV[Double] = BDV.zeros[Double](n)
    +  private var totalCnt: Double = 0.0
    +  private val nnz: BDV[Double] = BDV.zeros[Double](n)
    +  private val currMax: BDV[Double] = BDV.fill(n)(Double.MinValue)
    +  private val currMin: BDV[Double] = BDV.fill(n)(Double.MaxValue)
    +
    +  override def mean = {
    +    val realMean = BDV.zeros[Double](n)
    +    var i = 0
    +    while (i < n) {
    +      realMean(i) = currMean(i) * nnz(i) / totalCnt
    +      i += 1
    +    }
    +    Vectors.fromBreeze(realMean)
    +  }
    +
    +  override def variance = {
    --- End diff --
    
    Ditto.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/268#discussion_r11470086
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala ---
    @@ -273,6 +411,27 @@ class RowMatrix(
         }
         mat
       }
    +
    +  /** Updates or verifies the number of columns. */
    +  private def updateNumCols(n: Int) {
    --- End diff --
    
    This never seems to be called, do we need it? It seems the number of columns has to be fixed to n anyway.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/268#discussion_r11236050
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/VectorRDDFunctions.scala ---
    @@ -0,0 +1,179 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.rdd
    +
    +import breeze.linalg.{axpy, Vector => BV}
    +
    +import org.apache.spark.mllib.linalg.{Vectors, Vector}
    +import org.apache.spark.rdd.RDD
    +
    +/**
    + * Trait of the summary statistics, including mean, variance, count, max, min, and non-zero elements
    + * count.
    + */
    +trait VectorRDDStatisticalSummary {
    +  def mean: Vector
    +  def variance: Vector
    +  def totalCount: Long
    +  def numNonZeros: Vector
    +  def max: Vector
    +  def min: Vector
    +}
    +
    +/**
    + * Aggregates [[org.apache.spark.mllib.rdd.VectorRDDStatisticalSummary VectorRDDStatisticalSummary]]
    + * together with add() and merge() function.
    + */
    +private class Aggregator(
    +    val currMean: BV[Double],
    +    val currM2n: BV[Double],
    +    var totalCnt: Double,
    +    val nnz: BV[Double],
    +    val currMax: BV[Double],
    +    val currMin: BV[Double]) extends VectorRDDStatisticalSummary with Serializable {
    +
    +  // lazy val is used for computing only once time. Same below.
    +  override lazy val mean = Vectors.fromBreeze(currMean :* nnz :/ totalCnt)
    +
    +  // Online variance solution used in add() function, while parallel variance solution used in
    +  // merge() function. Reference here:
    +  // http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance
    +  // Solution here ignoring the zero elements when calling add() and merge(), for decreasing the
    +  // O(n) algorithm to O(nnz). Real variance is computed here after we get other statistics, simply
    +  // by another parallel combination process.
    --- End diff --
    
    Move this comment block to a better please. Either to `add` and `merge` or the class doc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/268#discussion_r11470041
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/RowMatrix.scala ---
    @@ -19,13 +19,144 @@ package org.apache.spark.mllib.linalg.distributed
     
     import java.util
     
    -import breeze.linalg.{DenseMatrix => BDM, DenseVector => BDV, svd => brzSvd}
    +import breeze.linalg.{Vector => BV, DenseMatrix => BDM, DenseVector => BDV, svd => brzSvd}
     import breeze.numerics.{sqrt => brzSqrt}
     import com.github.fommil.netlib.BLAS.{getInstance => blas}
     
     import org.apache.spark.mllib.linalg._
     import org.apache.spark.rdd.RDD
     import org.apache.spark.Logging
    +import org.apache.spark.mllib.stat.MultivariateStatisticalSummary
    +
    +/**
    + * Column statistics aggregator implementing
    + * [[org.apache.spark.mllib.stat.MultivariateStatisticalSummary]]
    + * together with add() and merge() function.
    + * A numerically stable algorithm is implemented to compute sample mean and variance:
    +  *[[http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance variance-wiki]].
    + * Zero elements (including explicit zero values) are skipped when calling add() and merge(),
    + * to have time complexity O(nnz) instead of O(n) for each column.
    + */
    +private class ColumnStatisticsAggregator(private val n: Int)
    +    extends MultivariateStatisticalSummary with Serializable {
    +
    +  private val currMean: BDV[Double] = BDV.zeros[Double](n)
    +  private val currM2n: BDV[Double] = BDV.zeros[Double](n)
    +  private var totalCnt = 0.0
    +  private val nnz: BDV[Double] = BDV.zeros[Double](n)
    +  private val currMax: BDV[Double] = BDV.fill(n)(Double.MinValue)
    +  private val currMin: BDV[Double] = BDV.fill(n)(Double.MaxValue)
    +
    +  override def mean: Vector = {
    +    val realMean = BDV.zeros[Double](n)
    +    var i = 0
    +    while (i < n) {
    +      realMean(i) = currMean(i) * nnz(i) / totalCnt
    +      i += 1
    +    }
    +    Vectors.fromBreeze(realMean)
    +  }
    +
    +  override def variance: Vector = {
    +    val realVariance = BDV.zeros[Double](n)
    +
    +    val denominator = totalCnt - 1.0
    +
    +    // Sample variance is computed, if the denominator is 0, the variance is just 0.
    +    if (denominator != 0.0) {
    +      val deltaMean = currMean
    +      var i = 0
    +      while (i < currM2n.size) {
    +        realVariance(i) =
    +          currM2n(i) + deltaMean(i) * deltaMean(i) * nnz(i) * (totalCnt - nnz(i)) / totalCnt
    +        realVariance(i) /= denominator
    +        i += 1
    +      }
    +    }
    +
    +    Vectors.fromBreeze(realVariance)
    +  }
    +
    +  override def count: Long = totalCnt.toLong
    +
    +  override def numNonzeros: Vector = Vectors.fromBreeze(nnz)
    +
    +  override def max: Vector = {
    +    var i = 0
    +    while (i < n) {
    +      if ((nnz(i) < totalCnt) && (currMax(i) < 0.0)) currMax(i) = 0.0
    +      i += 1
    +    }
    +    Vectors.fromBreeze(currMax)
    +  }
    +
    +  override def min: Vector = {
    +    var i = 0
    +    while (i < n) {
    +      if ((nnz(i) < totalCnt) && (currMin(i) > 0.0)) currMin(i) = 0.0
    +      i += 1
    +    }
    +    Vectors.fromBreeze(currMin)
    +  }
    +
    +  /**
    +   * Aggregates a row.
    +   */
    +  def add(currData: BV[Double]): this.type = {
    +    currData.activeIterator.foreach {
    +      case (_, 0.0) => // Skip explicit zero elements.
    +      case (i, value) =>
    +        if (currMax(i) < value) currMax(i) = value
    +        if (currMin(i) > value) currMin(i) = value
    +
    +        val tmpPrevMean = currMean(i)
    +        currMean(i) = (currMean(i) * nnz(i) + value) / (nnz(i) + 1.0)
    +        currM2n(i) += (value - currMean(i)) * (value - tmpPrevMean)
    +
    +        nnz(i) += 1.0
    +    }
    +
    +    totalCnt += 1.0
    +    this
    +  }
    +
    +  /**
    +   * Merges another aggregator.
    +   */
    +  def merge(other: ColumnStatisticsAggregator): this.type = {
    --- End diff --
    
    This method has a few too many blank lines, e.g. there's no need to have one at the beginning. Probably fine if we merge this as is but if you make another pass fix these.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by yinxusen <gi...@git.apache.org>.
Github user yinxusen commented on the pull request:

    https://github.com/apache/spark/pull/268#issuecomment-39941843
  
    Sure, I'll do it now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/268#discussion_r11099351
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/VectorRDDFunctions.scala ---
    @@ -0,0 +1,93 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.mllib.rdd
    +
    +import breeze.linalg.{Vector => BV}
    +
    +import org.apache.spark.mllib.linalg.{Vector, Vectors}
    +import org.apache.spark.rdd.RDD
    +
    +/**
    + * Extra functions available on RDDs of [[org.apache.spark.mllib.linalg.Vector Vector]] through an
    + * implicit conversion. Import `org.apache.spark.MLContext._` at the top of your program to use
    + * these functions.
    + */
    +class VectorRDDFunctions(self: RDD[Vector]) extends Serializable {
    +
    +  /**
    +   * Compute full column-wise statistics for the RDD, including
    +   * {{{
    +   *   Mean:              Vector,
    +   *   Variance:          Vector,
    +   *   Count:             Double,
    +   *   Non-zero count:    Vector,
    +   *   Maximum elements:  Vector,
    +   *   Minimum elements:  Vector.
    +   * }}},
    +   * with the size of Vector as input parameter.
    +   */
    +  def statistics(size: Int): (Vector, Vector, Double, Vector, Vector, Vector) = {
    +    val results = self.map(_.toBreeze).aggregate((
    +      BV.zeros[Double](size),
    +      BV.zeros[Double](size),
    +      0.0,
    +      BV.zeros[Double](size),
    +      BV.fill(size){Double.MinValue},
    +      BV.fill(size){Double.MaxValue}))(
    +      seqOp = (c, v) => (c, v) match {
    +        case ((prevMean, prevM2n, cnt, nnzVec, maxVec, minVec), currData) =>
    +          val currMean = ((prevMean :* cnt) + currData) :/ (cnt + 1.0)
    +          val nonZeroCnt = Vectors
    +            .sparse(size, currData.activeKeysIterator.toSeq.map(x => (x, 1.0))).toBreeze
    --- End diff --
    
    If we check explicit zero values, this line can be merged to the next foreach block.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/268#issuecomment-39954675
  
     Build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/268#discussion_r11193390
  
    --- Diff: mllib/src/test/scala/org/apache/spark/mllib/rdd/VectorRDDFunctionsSuite.scala ---
    @@ -0,0 +1,87 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.mllib.rdd
    +
    +import scala.collection.mutable.ArrayBuffer
    +
    +import org.scalatest.FunSuite
    +
    +import org.apache.spark.mllib.linalg.{Vector, Vectors}
    +import org.apache.spark.mllib.util.LocalSparkContext
    +import org.apache.spark.mllib.util.MLUtils._
    +
    +/**
    + * Test suite for the summary statistics of RDD[Vector]. Both the accuracy and the time consuming
    + * between dense and sparse vector are tested.
    + */
    +class VectorRDDFunctionsSuite extends FunSuite with LocalSparkContext {
    +  import VectorRDDFunctionsSuite._
    +
    +  val localData = Array(
    +    Vectors.dense(1.0, 2.0, 3.0),
    +    Vectors.dense(4.0, 5.0, 6.0),
    +    Vectors.dense(7.0, 8.0, 9.0)
    +  )
    +
    +  val sparseData = ArrayBuffer(Vectors.sparse(20, Seq((0, 1.0), (9, 2.0), (10, 7.0))))
    +  for (i <- 0 until 10000) sparseData += Vectors.sparse(20, Seq((9, 0.0)))
    --- End diff --
    
    Do not need so many rows, which slows down the test. Try to use minimal amount of data for tests.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/268#discussion_r11099343
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/VectorRDDFunctions.scala ---
    @@ -0,0 +1,93 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.mllib.rdd
    +
    +import breeze.linalg.{Vector => BV}
    +
    +import org.apache.spark.mllib.linalg.{Vector, Vectors}
    +import org.apache.spark.rdd.RDD
    +
    +/**
    + * Extra functions available on RDDs of [[org.apache.spark.mllib.linalg.Vector Vector]] through an
    + * implicit conversion. Import `org.apache.spark.MLContext._` at the top of your program to use
    + * these functions.
    + */
    +class VectorRDDFunctions(self: RDD[Vector]) extends Serializable {
    +
    +  /**
    +   * Compute full column-wise statistics for the RDD, including
    +   * {{{
    +   *   Mean:              Vector,
    +   *   Variance:          Vector,
    +   *   Count:             Double,
    +   *   Non-zero count:    Vector,
    +   *   Maximum elements:  Vector,
    +   *   Minimum elements:  Vector.
    +   * }}},
    +   * with the size of Vector as input parameter.
    +   */
    +  def statistics(size: Int): (Vector, Vector, Double, Vector, Vector, Vector) = {
    +    val results = self.map(_.toBreeze).aggregate((
    +      BV.zeros[Double](size),
    +      BV.zeros[Double](size),
    +      0.0,
    +      BV.zeros[Double](size),
    +      BV.fill(size){Double.MinValue},
    +      BV.fill(size){Double.MaxValue}))(
    +      seqOp = (c, v) => (c, v) match {
    +        case ((prevMean, prevM2n, cnt, nnzVec, maxVec, minVec), currData) =>
    +          val currMean = ((prevMean :* cnt) + currData) :/ (cnt + 1.0)
    +          val nonZeroCnt = Vectors
    +            .sparse(size, currData.activeKeysIterator.toSeq.map(x => (x, 1.0))).toBreeze
    --- End diff --
    
    Adding non-zeros directly to `nnzVec` instead of creating another. We need to think about whether we should remove explicit zero values. I suggest we do not count explicit zeros.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/268#discussion_r11193497
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/VectorRDDFunctions.scala ---
    @@ -0,0 +1,156 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.mllib.rdd
    +
    +import breeze.linalg.{axpy, Vector => BV}
    +
    +import org.apache.spark.mllib.linalg.Vector
    +import org.apache.spark.rdd.RDD
    +
    +/**
    + * Case class of the summary statistics, including mean, variance, count, max, min, and non-zero
    + * elements count.
    + */
    +case class VectorRDDStatisticalAggregator(
    +    mean: BV[Double],
    +    statCounter: BV[Double],
    +    totalCount: Double,
    +    numNonZeros: BV[Double],
    +    max: BV[Double],
    +    min: BV[Double])
    +
    +/**
    + * Extra functions available on RDDs of [[org.apache.spark.mllib.linalg.Vector Vector]] through an
    + * implicit conversion. Import `org.apache.spark.MLContext._` at the top of your program to use
    + * these functions.
    + */
    +class VectorRDDFunctions(self: RDD[Vector]) extends Serializable {
    +
    +  /**
    +   * Aggregate function used for aggregating elements in a worker together.
    +   */
    +  private def seqOp(
    +      aggregator: VectorRDDStatisticalAggregator,
    +      currData: BV[Double]): VectorRDDStatisticalAggregator = {
    +    aggregator match {
    +      case VectorRDDStatisticalAggregator(prevMean, prevM2n, cnt, nnzVec, maxVec, minVec) =>
    +        currData.activeIterator.foreach {
    +          case (id, 0.0) =>
    --- End diff --
    
    Put a comment for this filter.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/268#issuecomment-39415127
  
     Build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/268#discussion_r11236033
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/VectorRDDFunctions.scala ---
    @@ -0,0 +1,179 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.rdd
    +
    +import breeze.linalg.{axpy, Vector => BV}
    +
    +import org.apache.spark.mllib.linalg.{Vectors, Vector}
    +import org.apache.spark.rdd.RDD
    +
    +/**
    + * Trait of the summary statistics, including mean, variance, count, max, min, and non-zero elements
    + * count.
    + */
    +trait VectorRDDStatisticalSummary {
    +  def mean: Vector
    +  def variance: Vector
    +  def totalCount: Long
    +  def numNonZeros: Vector
    +  def max: Vector
    +  def min: Vector
    +}
    +
    +/**
    + * Aggregates [[org.apache.spark.mllib.rdd.VectorRDDStatisticalSummary VectorRDDStatisticalSummary]]
    + * together with add() and merge() function.
    + */
    +private class Aggregator(
    +    val currMean: BV[Double],
    +    val currM2n: BV[Double],
    +    var totalCnt: Double,
    +    val nnz: BV[Double],
    +    val currMax: BV[Double],
    +    val currMin: BV[Double]) extends VectorRDDStatisticalSummary with Serializable {
    +
    +  // lazy val is used for computing only once time. Same below.
    +  override lazy val mean = Vectors.fromBreeze(currMean :* nnz :/ totalCnt)
    +
    +  // Online variance solution used in add() function, while parallel variance solution used in
    +  // merge() function. Reference here:
    +  // http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance
    +  // Solution here ignoring the zero elements when calling add() and merge(), for decreasing the
    +  // O(n) algorithm to O(nnz). Real variance is computed here after we get other statistics, simply
    +  // by another parallel combination process.
    +  override lazy val variance = {
    +    val deltaMean = currMean
    +    var i = 0
    +    while(i < currM2n.size) {
    +      currM2n(i) += deltaMean(i) * deltaMean(i) * nnz(i) * (totalCnt-nnz(i)) / totalCnt
    +      currM2n(i) /= totalCnt
    +      i += 1
    +    }
    +    Vectors.fromBreeze(currM2n)
    +  }
    +
    +  override lazy val totalCount: Long = totalCnt.toLong
    +
    +  override lazy val numNonZeros: Vector = Vectors.fromBreeze(nnz)
    +
    +  override lazy val max: Vector = {
    +    nnz.iterator.foreach {
    +      case (id, count) =>
    +        if ((count == 0.0) || ((count < totalCnt) && (currMax(id) < 0.0)))  currMax(id) = 0.0
    +    }
    +    Vectors.fromBreeze(currMax)
    +  }
    +
    +  override lazy val min: Vector = {
    +    nnz.iterator.foreach {
    +      case (id, count) =>
    +        if ((count == 0.0) || ((count < totalCnt) && (currMin(id) > 0.0))) currMin(id) = 0.0
    +    }
    +    Vectors.fromBreeze(currMin)
    +  }
    +
    +  /**
    +   * Aggregate function used for aggregating elements in a worker together.
    +   */
    +  def add(currData: BV[Double]): this.type = {
    +    currData.activeIterator.foreach {
    +      // this case is used for filtering the zero elements if the vector is a dense one.
    --- End diff --
    
    Sparse vector can also have explicit zero values. So please update the comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/268#issuecomment-39954685
  
    Build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on the pull request:

    https://github.com/apache/spark/pull/268#issuecomment-40042693
  
    @yinxusen Thanks for your work! LGTM and let's wait for Jenkins.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/268#issuecomment-40155152
  
    All automated tests passed.
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14025/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/268#discussion_r11236148
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/VectorRDDFunctions.scala ---
    @@ -0,0 +1,179 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.rdd
    +
    +import breeze.linalg.{axpy, Vector => BV}
    +
    +import org.apache.spark.mllib.linalg.{Vectors, Vector}
    +import org.apache.spark.rdd.RDD
    +
    +/**
    + * Trait of the summary statistics, including mean, variance, count, max, min, and non-zero elements
    + * count.
    + */
    +trait VectorRDDStatisticalSummary {
    +  def mean: Vector
    +  def variance: Vector
    +  def totalCount: Long
    +  def numNonZeros: Vector
    +  def max: Vector
    +  def min: Vector
    +}
    +
    +/**
    + * Aggregates [[org.apache.spark.mllib.rdd.VectorRDDStatisticalSummary VectorRDDStatisticalSummary]]
    + * together with add() and merge() function.
    + */
    +private class Aggregator(
    +    val currMean: BV[Double],
    +    val currM2n: BV[Double],
    +    var totalCnt: Double,
    +    val nnz: BV[Double],
    +    val currMax: BV[Double],
    +    val currMin: BV[Double]) extends VectorRDDStatisticalSummary with Serializable {
    +
    +  // lazy val is used for computing only once time. Same below.
    +  override lazy val mean = Vectors.fromBreeze(currMean :* nnz :/ totalCnt)
    +
    +  // Online variance solution used in add() function, while parallel variance solution used in
    +  // merge() function. Reference here:
    +  // http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance
    +  // Solution here ignoring the zero elements when calling add() and merge(), for decreasing the
    +  // O(n) algorithm to O(nnz). Real variance is computed here after we get other statistics, simply
    +  // by another parallel combination process.
    +  override lazy val variance = {
    +    val deltaMean = currMean
    +    var i = 0
    +    while(i < currM2n.size) {
    +      currM2n(i) += deltaMean(i) * deltaMean(i) * nnz(i) * (totalCnt-nnz(i)) / totalCnt
    +      currM2n(i) /= totalCnt
    +      i += 1
    +    }
    +    Vectors.fromBreeze(currM2n)
    +  }
    +
    +  override lazy val totalCount: Long = totalCnt.toLong
    +
    +  override lazy val numNonZeros: Vector = Vectors.fromBreeze(nnz)
    +
    +  override lazy val max: Vector = {
    +    nnz.iterator.foreach {
    +      case (id, count) =>
    +        if ((count == 0.0) || ((count < totalCnt) && (currMax(id) < 0.0)))  currMax(id) = 0.0
    +    }
    +    Vectors.fromBreeze(currMax)
    +  }
    +
    +  override lazy val min: Vector = {
    +    nnz.iterator.foreach {
    +      case (id, count) =>
    +        if ((count == 0.0) || ((count < totalCnt) && (currMin(id) > 0.0))) currMin(id) = 0.0
    +    }
    +    Vectors.fromBreeze(currMin)
    +  }
    +
    +  /**
    +   * Aggregate function used for aggregating elements in a worker together.
    +   */
    +  def add(currData: BV[Double]): this.type = {
    +    currData.activeIterator.foreach {
    +      // this case is used for filtering the zero elements if the vector is a dense one.
    +      case (id, 0.0) =>
    +      case (id, value) =>
    +        if (currMax(id) < value) currMax(id) = value
    +        if (currMin(id) > value) currMin(id) = value
    +
    +        val tmpPrevMean = currMean(id)
    +        currMean(id) = (currMean(id) * nnz(id) + value) / (nnz(id) + 1.0)
    +        currM2n(id) += (value - currMean(id)) * (value - tmpPrevMean)
    +
    +        nnz(id) += 1.0
    +    }
    +
    +    totalCnt += 1.0
    +    this
    +  }
    +
    +  /**
    +   * Combine function used for combining intermediate results together from every worker.
    +   */
    +  def merge(other: Aggregator): this.type = {
    +
    +    totalCnt += other.totalCnt
    +
    +    val deltaMean = currMean - other.currMean
    +
    +    other.currMean.activeIterator.foreach {
    +      case (id, 0.0) =>
    +      case (id, value) =>
    +        currMean(id) =
    +          (currMean(id) * nnz(id) + other.currMean(id) * other.nnz(id)) / (nnz(id) + other.nnz(id))
    +    }
    +
    +    var i = 0
    +    while(i < currM2n.size) {
    +      (nnz(i), other.nnz(i)) match {
    +        case (0.0, 0.0) =>
    +        case _ => currM2n(i) +=
    +          other.currM2n(i) + deltaMean(i) * deltaMean(i) * nnz(i) * other.nnz(i) / (nnz(i)+other.nnz(i))
    +      }
    +      i += 1
    +    }
    +
    +    other.currMax.activeIterator.foreach {
    --- End diff --
    
    `currMax` is a dense vector. So use a while loop.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by yinxusen <gi...@git.apache.org>.
Github user yinxusen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/268#discussion_r11191344
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/VectorRDDFunctions.scala ---
    @@ -0,0 +1,156 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.mllib.rdd
    +
    +import breeze.linalg.{axpy, Vector => BV}
    +
    +import org.apache.spark.mllib.linalg.Vector
    +import org.apache.spark.rdd.RDD
    +
    +/**
    + * Case class of the summary statistics, including mean, variance, count, max, min, and non-zero
    + * elements count.
    + */
    +case class VectorRDDStatisticalAggregator(
    +    mean: BV[Double],
    +    statCounter: BV[Double],
    +    totalCount: Double,
    +    numNonZeros: BV[Double],
    +    max: BV[Double],
    +    min: BV[Double])
    +
    +/**
    + * Extra functions available on RDDs of [[org.apache.spark.mllib.linalg.Vector Vector]] through an
    + * implicit conversion. Import `org.apache.spark.MLContext._` at the top of your program to use
    + * these functions.
    + */
    +class VectorRDDFunctions(self: RDD[Vector]) extends Serializable {
    +
    +  /**
    +   * Aggregate function used for aggregating elements in a worker together.
    +   */
    +  private def seqOp(
    +      aggregator: VectorRDDStatisticalAggregator,
    +      currData: BV[Double]): VectorRDDStatisticalAggregator = {
    +    aggregator match {
    +      case VectorRDDStatisticalAggregator(prevMean, prevM2n, cnt, nnzVec, maxVec, minVec) =>
    +        currData.activeIterator.foreach {
    +          case (id, 0.0) =>
    +          case (id, value) =>
    +            if (maxVec(id) < value) maxVec(id) = value
    +            if (minVec(id) > value) minVec(id) = value
    +
    +            val tmpPrevMean = prevMean(id)
    +            prevMean(id) = (prevMean(id) * cnt + value) / (cnt + 1.0)
    +            prevM2n(id) += (value - prevMean(id)) * (value - tmpPrevMean)
    +
    +            nnzVec(id) += 1.0
    +        }
    +
    +        VectorRDDStatisticalAggregator(
    +          prevMean,
    +          prevM2n,
    +          cnt + 1.0,
    +          nnzVec,
    +          maxVec,
    +          minVec)
    +    }
    +  }
    +
    +  /**
    +   * Combine function used for combining intermediate results together from every worker.
    +   */
    +  private def combOp(
    +      statistics1: VectorRDDStatisticalAggregator,
    +      statistics2: VectorRDDStatisticalAggregator): VectorRDDStatisticalAggregator = {
    +    (statistics1, statistics2) match {
    +      case (VectorRDDStatisticalAggregator(mean1, m2n1, cnt1, nnz1, max1, min1),
    +            VectorRDDStatisticalAggregator(mean2, m2n2, cnt2, nnz2, max2, min2)) =>
    +        val totalCnt = cnt1 + cnt2
    +        val deltaMean = mean2 - mean1
    +
    +        mean2.activeIterator.foreach {
    +          case (id, 0.0) =>
    +          case (id, value) =>
    +            mean1(id) = (mean1(id) * nnz1(id) + mean2(id) * nnz2(id)) / (nnz1(id) + nnz2(id))
    +        }
    +
    +        m2n2.activeIterator.foreach {
    +          case (id, 0.0) =>
    +          case (id, value) =>
    +            m2n1(id) +=
    +              value + deltaMean(id) * deltaMean(id) * nnz1(id) * nnz2(id) / (nnz1(id)+nnz2(id))
    +        }
    +
    +        max2.activeIterator.foreach {
    +          case (id, value) =>
    +            if (max1(id) < value) max1(id) = value
    +        }
    +
    +        min2.activeIterator.foreach {
    +          case (id, value) =>
    +            if (min1(id) > value) min1(id) = value
    +        }
    +
    +        axpy(1.0, nnz2, nnz1)
    +        VectorRDDStatisticalAggregator(mean1, m2n1, totalCnt, nnz1, max1, min1)
    +    }
    +  }
    +
    +  /**
    +   * Compute full column-wise statistics for the RDD with the size of Vector as input parameter.
    +   */
    +  def summarizeStatistics(): VectorRDDStatisticalAggregator = {
    +    val size = self.take(1).head.size
    +    val zeroValue = VectorRDDStatisticalAggregator(
    +      BV.zeros[Double](size),
    +      BV.zeros[Double](size),
    +      0.0,
    +      BV.zeros[Double](size),
    +      BV.fill(size)(Double.MinValue),
    +      BV.fill(size)(Double.MaxValue))
    +
    +    val VectorRDDStatisticalAggregator(currMean, currM2n, totalCnt, nnz, currMax, currMin) =
    +      self.map(_.toBreeze).aggregate(zeroValue)(seqOp, combOp)
    +
    +    // solve real mean
    +    val realMean = currMean :* nnz :/ totalCnt
    +
    +    // solve real m2n
    +    val deltaMean = currMean
    +    val realM2n = currM2n - ((deltaMean :* deltaMean) :* (nnz :* (nnz :- totalCnt)) :/ totalCnt)
    +
    +    // remove the initial value in max and min, i.e. the Double.MaxValue or Double.MinValue.
    +    nnz.activeIterator.foreach {
    +      case (id, 0.0) =>
    +        currMax(id) = 0.0
    +        currMin(id) = 0.0
    +      case _ =>
    +    }
    +
    +    // get variance
    +    realM2n :/= totalCnt
    +
    +    VectorRDDStatisticalAggregator(
    --- End diff --
    
    @mengxr I am not sure of the return type. Indeed, I prefer to use `Vector` in Spark as the return type instead of `BV` in Breeze, but if so, I have to use lots of `toBreeze` in my code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by yinxusen <gi...@git.apache.org>.
Github user yinxusen commented on the pull request:

    https://github.com/apache/spark/pull/268#issuecomment-39527499
  
    @mengxr Yes, I think `RowRDDMatrix` is a good position. Just put this method together with SVD and PCA. Indeed, `RDD[Vector]` is a kind of matrix.
    
    What should I do now? Wait for #296 merged, or something else?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/268#discussion_r11170823
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/VectorRDDFunctions.scala ---
    @@ -0,0 +1,170 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.mllib.rdd
    +
    +import breeze.linalg.{axpy, Vector => BV}
    +
    +import org.apache.spark.mllib.linalg.{Vector, Vectors}
    +import org.apache.spark.rdd.RDD
    +
    +/**
    + * Case class of the summary statistics, including mean, variance, count, max, min, and non-zero
    + * elements count.
    + */
    +case class VectorRDDStatisticalSummary(
    +    mean: Vector,
    +    variance: Vector,
    +    count: Long,
    +    max: Vector,
    +    min: Vector,
    +    nonZeroCnt: Vector) extends Serializable
    +
    +/**
    + * Case class of the aggregate value for collecting summary statistics from RDD[Vector]. These
    + * values are relatively with
    + * [[org.apache.spark.mllib.rdd.VectorRDDStatisticalSummary VectorRDDStatisticalSummary]], the
    + * latter is computed from the former.
    + */
    +private case class VectorRDDStatisticalRing(
    +    fakeMean: BV[Double],
    +    fakeM2n: BV[Double],
    +    totalCnt: Double,
    +    nnz: BV[Double],
    +    fakeMax: BV[Double],
    +    fakeMin: BV[Double])
    +
    +/**
    + * Extra functions available on RDDs of [[org.apache.spark.mllib.linalg.Vector Vector]] through an
    + * implicit conversion. Import `org.apache.spark.MLContext._` at the top of your program to use
    + * these functions.
    + */
    +class VectorRDDFunctions(self: RDD[Vector]) extends Serializable {
    +
    +  /**
    +   * Aggregate function used for aggregating elements in a worker together.
    +   */
    +  private def seqOp(
    +      aggregator: VectorRDDStatisticalRing,
    +      currData: BV[Double]): VectorRDDStatisticalRing = {
    +    aggregator match {
    +      case VectorRDDStatisticalRing(prevMean, prevM2n, cnt, nnzVec, maxVec, minVec) =>
    +        currData.activeIterator.foreach {
    +          case (id, value) =>
    --- End diff --
    
    Should skip `value == 0.0`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on the pull request:

    https://github.com/apache/spark/pull/268#issuecomment-40039782
  
    @yinxusen This looks good to me except the minor stuff I commented inline. Thanks for cleaning up `MLUtils#computeStats`!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by yinxusen <gi...@git.apache.org>.
Github user yinxusen commented on the pull request:

    https://github.com/apache/spark/pull/268#issuecomment-40158443
  
    Conflict in MLUtils and RowMatrix. I think it is OK now.
    2014-4-11 AM5:31于 "Matei Zaharia" <no...@github.com>写道:
    
    > Hey, unfortunately this no longer merges cleanly. Do you mind rebasing it?
    > I think some conflicting changes happened in MLUtils.
    >
    > —
    > Reply to this email directly or view it on GitHub<https://github.com/apache/spark/pull/268#issuecomment-40145491>
    > .
    >


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by yinxusen <gi...@git.apache.org>.
Github user yinxusen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/268#discussion_r11382104
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/VectorRDDFunctions.scala ---
    @@ -0,0 +1,208 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.rdd
    +
    +import breeze.linalg.{Vector => BV, DenseVector => BDV}
    +
    +import org.apache.spark.mllib.linalg.{Vectors, Vector}
    +import org.apache.spark.rdd.RDD
    +
    +/**
    + * Trait of the summary statistics, including mean, variance, count, max, min, and non-zero elements
    + * count.
    + */
    +trait VectorRDDStatisticalSummary {
    +
    +  /**
    +   * Computes the mean of columns in RDD[Vector].
    +   */
    +  def mean: Vector
    +
    +  /**
    +   * Computes the sample variance of columns in RDD[Vector].
    +   */
    +  def variance: Vector
    +
    +  /**
    +   * Computes number of vectors in RDD[Vector].
    +   */
    +  def count: Long
    +
    +  /**
    +   * Computes the number of non-zero elements in each column of RDD[Vector].
    +   */
    +  def numNonZeros: Vector
    +
    +  /**
    +   * Computes the maximum of each column in RDD[Vector].
    +   */
    +  def max: Vector
    +
    +  /**
    +   * Computes the minimum of each column in RDD[Vector].
    +   */
    +  def min: Vector
    +}
    +
    +/**
    + * Aggregates [[org.apache.spark.mllib.rdd.VectorRDDStatisticalSummary VectorRDDStatisticalSummary]]
    + * together with add() and merge() function. Online variance solution used in add() function, while
    + * parallel variance solution used in merge() function. Reference here:
    + * [[http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance variance-wiki]]. Solution here
    + * ignoring the zero elements when calling add() and merge(), for decreasing the O(n) algorithm to
    + * O(nnz). Real variance is computed here after we get other statistics, simply by another parallel
    + * combination process.
    + */
    +private class VectorRDDStatisticsAggregator(
    +    val currMean: BDV[Double],
    +    val currM2n: BDV[Double],
    +    var totalCnt: Double,
    +    val nnz: BDV[Double],
    +    val currMax: BDV[Double],
    +    val currMin: BDV[Double])
    +  extends VectorRDDStatisticalSummary with Serializable {
    +
    +  override def mean = {
    +    val realMean = BDV.zeros[Double](currMean.length)
    +    var i = 0
    +    while (i < currMean.length) {
    +      realMean(i) = currMean(i) * nnz(i) / totalCnt
    +      i += 1
    +    }
    +    Vectors.fromBreeze(realMean)
    +  }
    +
    +  override def variance = {
    +    val realVariance = BDV.zeros[Double](currM2n.length)
    +    val deltaMean = currMean
    +    var i = 0
    +    while (i < currM2n.size) {
    +      realVariance(i) =
    +        currM2n(i) + deltaMean(i) * deltaMean(i) * nnz(i) * (totalCnt - nnz(i)) / totalCnt
    +      realVariance(i) /= (totalCnt - 1.0)
    --- End diff --
    
    @mengxr I use `totalCnt - 1.0` here to compute sample variance.
    
    Any other questions, feel free to inform me. :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/268#issuecomment-39427646
  
     Build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by yinxusen <gi...@git.apache.org>.
Github user yinxusen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/268#discussion_r11427699
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/VectorRDDFunctions.scala ---
    @@ -0,0 +1,208 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.rdd
    +
    +import breeze.linalg.{Vector => BV, DenseVector => BDV}
    +
    +import org.apache.spark.mllib.linalg.{Vectors, Vector}
    +import org.apache.spark.rdd.RDD
    +
    +/**
    + * Trait of the summary statistics, including mean, variance, count, max, min, and non-zero elements
    + * count.
    + */
    +trait VectorRDDStatisticalSummary {
    +
    +  /**
    +   * Computes the mean of columns in RDD[Vector].
    +   */
    +  def mean: Vector
    +
    +  /**
    +   * Computes the sample variance of columns in RDD[Vector].
    +   */
    +  def variance: Vector
    +
    +  /**
    +   * Computes number of vectors in RDD[Vector].
    +   */
    +  def count: Long
    +
    +  /**
    +   * Computes the number of non-zero elements in each column of RDD[Vector].
    +   */
    +  def numNonZeros: Vector
    +
    +  /**
    +   * Computes the maximum of each column in RDD[Vector].
    +   */
    +  def max: Vector
    +
    +  /**
    +   * Computes the minimum of each column in RDD[Vector].
    +   */
    +  def min: Vector
    +}
    +
    +/**
    + * Aggregates [[org.apache.spark.mllib.rdd.VectorRDDStatisticalSummary VectorRDDStatisticalSummary]]
    + * together with add() and merge() function. Online variance solution used in add() function, while
    + * parallel variance solution used in merge() function. Reference here:
    + * [[http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance variance-wiki]]. Solution here
    + * ignoring the zero elements when calling add() and merge(), for decreasing the O(n) algorithm to
    + * O(nnz). Real variance is computed here after we get other statistics, simply by another parallel
    + * combination process.
    + */
    +private class VectorRDDStatisticsAggregator(
    +    val currMean: BDV[Double],
    +    val currM2n: BDV[Double],
    +    var totalCnt: Double,
    +    val nnz: BDV[Double],
    +    val currMax: BDV[Double],
    +    val currMin: BDV[Double])
    +  extends VectorRDDStatisticalSummary with Serializable {
    +
    +  override def mean = {
    +    val realMean = BDV.zeros[Double](currMean.length)
    +    var i = 0
    +    while (i < currMean.length) {
    +      realMean(i) = currMean(i) * nnz(i) / totalCnt
    +      i += 1
    +    }
    +    Vectors.fromBreeze(realMean)
    +  }
    +
    +  override def variance = {
    +    val realVariance = BDV.zeros[Double](currM2n.length)
    +    val deltaMean = currMean
    +    var i = 0
    +    while (i < currM2n.size) {
    +      realVariance(i) =
    +        currM2n(i) + deltaMean(i) * deltaMean(i) * nnz(i) * (totalCnt - nnz(i)) / totalCnt
    +      realVariance(i) /= (totalCnt - 1.0)
    --- End diff --
    
    Ah.. yes. if `totalCnt == 1`, then R's VAR should be `0`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/268#issuecomment-40033687
  
    All automated tests passed.
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/13971/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by yinxusen <gi...@git.apache.org>.
Github user yinxusen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/268#discussion_r11190187
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/VectorRDDFunctions.scala ---
    @@ -0,0 +1,170 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.mllib.rdd
    +
    +import breeze.linalg.{axpy, Vector => BV}
    +
    +import org.apache.spark.mllib.linalg.{Vector, Vectors}
    +import org.apache.spark.rdd.RDD
    +
    +/**
    + * Case class of the summary statistics, including mean, variance, count, max, min, and non-zero
    + * elements count.
    + */
    +case class VectorRDDStatisticalSummary(
    +    mean: Vector,
    +    variance: Vector,
    +    count: Long,
    +    max: Vector,
    +    min: Vector,
    +    nonZeroCnt: Vector) extends Serializable
    +
    +/**
    + * Case class of the aggregate value for collecting summary statistics from RDD[Vector]. These
    + * values are relatively with
    + * [[org.apache.spark.mllib.rdd.VectorRDDStatisticalSummary VectorRDDStatisticalSummary]], the
    + * latter is computed from the former.
    + */
    +private case class VectorRDDStatisticalRing(
    +    fakeMean: BV[Double],
    +    fakeM2n: BV[Double],
    +    totalCnt: Double,
    +    nnz: BV[Double],
    +    fakeMax: BV[Double],
    +    fakeMin: BV[Double])
    +
    +/**
    + * Extra functions available on RDDs of [[org.apache.spark.mllib.linalg.Vector Vector]] through an
    + * implicit conversion. Import `org.apache.spark.MLContext._` at the top of your program to use
    + * these functions.
    + */
    +class VectorRDDFunctions(self: RDD[Vector]) extends Serializable {
    +
    +  /**
    +   * Aggregate function used for aggregating elements in a worker together.
    +   */
    +  private def seqOp(
    +      aggregator: VectorRDDStatisticalRing,
    +      currData: BV[Double]): VectorRDDStatisticalRing = {
    +    aggregator match {
    +      case VectorRDDStatisticalRing(prevMean, prevM2n, cnt, nnzVec, maxVec, minVec) =>
    +        currData.activeIterator.foreach {
    +          case (id, value) =>
    +            if (maxVec(id) < value) maxVec(id) = value
    +            if (minVec(id) > value) minVec(id) = value
    +
    +            val tmpPrevMean = prevMean(id)
    +            prevMean(id) = (prevMean(id) * cnt + value) / (cnt + 1.0)
    +            prevM2n(id) += (value - prevMean(id)) * (value - tmpPrevMean)
    +
    +            nnzVec(id) += 1.0
    +        }
    +
    +        VectorRDDStatisticalRing(
    +          prevMean,
    +          prevM2n,
    +          cnt + 1.0,
    +          nnzVec,
    +          maxVec,
    +          minVec)
    +    }
    +  }
    +
    +  /**
    +   * Combine function used for combining intermediate results together from every worker.
    +   */
    +  private def combOp(
    +      statistics1: VectorRDDStatisticalRing,
    +      statistics2: VectorRDDStatisticalRing): VectorRDDStatisticalRing = {
    +    (statistics1, statistics2) match {
    +      case (VectorRDDStatisticalRing(mean1, m2n1, cnt1, nnz1, max1, min1),
    +            VectorRDDStatisticalRing(mean2, m2n2, cnt2, nnz2, max2, min2)) =>
    +        val totalCnt = cnt1 + cnt2
    +        val deltaMean = mean2 - mean1
    +
    +        mean2.activeIterator.foreach {
    +          case (id, 0.0) =>
    +          case (id, value) =>
    +            mean1(id) = (mean1(id) * nnz1(id) + mean2(id) * nnz2(id)) / (nnz1(id) + nnz2(id))
    +        }
    +
    +        m2n2.activeIterator.foreach {
    +          case (id, 0.0) =>
    +          case (id, value) =>
    +            m2n1(id) +=
    +              value + deltaMean(id) * deltaMean(id) * nnz1(id) * nnz2(id) / (nnz1(id)+nnz2(id))
    +        }
    +
    +        max2.activeIterator.foreach {
    +          case (id, value) =>
    +            if (max1(id) < value) max1(id) = value
    +        }
    +
    +        min2.activeIterator.foreach {
    +          case (id, value) =>
    +            if (min1(id) > value) min1(id) = value
    +        }
    +
    +        axpy(1.0, nnz2, nnz1)
    +        VectorRDDStatisticalRing(mean1, m2n1, totalCnt, nnz1, max1, min1)
    +    }
    +  }
    +
    +  /**
    +   * Compute full column-wise statistics for the RDD with the size of Vector as input parameter.
    +   */
    +  def summarizeStatistics(size: Int): VectorRDDStatisticalSummary = {
    +    val zeroValue = VectorRDDStatisticalRing(
    +      BV.zeros[Double](size),
    +      BV.zeros[Double](size),
    +      0.0,
    +      BV.zeros[Double](size),
    +      BV.fill(size)(Double.MinValue),
    +      BV.fill(size)(Double.MaxValue))
    +
    +    val VectorRDDStatisticalRing(fakeMean, fakeM2n, totalCnt, nnz, fakeMax, fakeMin) =
    +      self.map(_.toBreeze).aggregate(zeroValue)(seqOp, combOp)
    +
    +    // solve real mean
    +    val realMean = fakeMean :* nnz :/ totalCnt
    +
    +    // solve real m2n
    +    val deltaMean = fakeMean
    +    val realM2n = fakeM2n - ((deltaMean :* deltaMean) :* (nnz :* (nnz :- totalCnt)) :/ totalCnt)
    +
    +    // remove the initial value in max and min, i.e. the Double.MaxValue or Double.MinValue.
    --- End diff --
    
    I think checking for every index where  `nnz(i) == 0` is better. I just set those max/min value to 0.0.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on the pull request:

    https://github.com/apache/spark/pull/268#issuecomment-39501883
  
    @yinxusen Thanks for updating the implementation! One minor question: should we return sample variance instead of population variance. There is no big difference if we have many rows. But just to match the result from other packages like R. Maybe we should use sample variance here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/268#discussion_r11235724
  
    --- Diff: .gitignore ---
    @@ -47,3 +47,4 @@ spark-*-bin.tar.gz
     unit-tests.log
     /lib/
     rat-results.txt
    +sbt/sbt-launch-0.13.1.jar.part
    --- End diff --
    
    I think this is because you had a partial file. Let's remove this change from the PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/268#issuecomment-39957522
  
    All automated tests passed.
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/13948/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/268#issuecomment-40050336
  
    Build finished. All automated tests passed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/268#issuecomment-40033686
  
    Build finished. All automated tests passed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by yinxusen <gi...@git.apache.org>.
Github user yinxusen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/268#discussion_r11210688
  
    --- Diff: mllib/src/test/scala/org/apache/spark/mllib/rdd/VectorRDDFunctionsSuite.scala ---
    @@ -0,0 +1,87 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.mllib.rdd
    +
    +import scala.collection.mutable.ArrayBuffer
    +
    +import org.scalatest.FunSuite
    +
    +import org.apache.spark.mllib.linalg.{Vector, Vectors}
    +import org.apache.spark.mllib.util.LocalSparkContext
    +import org.apache.spark.mllib.util.MLUtils._
    +
    +/**
    + * Test suite for the summary statistics of RDD[Vector]. Both the accuracy and the time consuming
    + * between dense and sparse vector are tested.
    + */
    +class VectorRDDFunctionsSuite extends FunSuite with LocalSparkContext {
    +  import VectorRDDFunctionsSuite._
    +
    +  val localData = Array(
    +    Vectors.dense(1.0, 2.0, 3.0),
    +    Vectors.dense(4.0, 5.0, 6.0),
    +    Vectors.dense(7.0, 8.0, 9.0)
    +  )
    +
    +  val sparseData = ArrayBuffer(Vectors.sparse(20, Seq((0, 1.0), (9, 2.0), (10, 7.0))))
    +  for (i <- 0 until 10000) sparseData += Vectors.sparse(20, Seq((9, 0.0)))
    +  sparseData += Vectors.sparse(20, Seq((0, 5.0), (9, 13.0), (16, 2.0)))
    +  sparseData += Vectors.sparse(20, Seq((3, 5.0), (9, 13.0), (18, 2.0)))
    +
    +  test("full-statistics") {
    +    val data = sc.parallelize(localData, 2)
    +    val (VectorRDDStatisticalAggregator(mean, variance, cnt, nnz, max, min), denseTime) =
    +      time(data.summarizeStatistics())
    +
    +    assert(equivVector(Vectors.fromBreeze(mean), Vectors.dense(4.0, 5.0, 6.0)),
    +      "Column mean do not match.")
    +    assert(equivVector(Vectors.fromBreeze(variance), Vectors.dense(6.0, 6.0, 6.0)),
    +      "Column variance do not match.")
    +    assert(cnt === 3.0, "Column cnt do not match.")
    +    assert(equivVector(Vectors.fromBreeze(nnz), Vectors.dense(3.0, 3.0, 3.0)),
    +      "Column nnz do not match.")
    +    assert(equivVector(Vectors.fromBreeze(max), Vectors.dense(7.0, 8.0, 9.0)),
    +      "Column max do not match.")
    +    assert(equivVector(Vectors.fromBreeze(min), Vectors.dense(1.0, 2.0, 3.0)),
    +      "Column min do not match.")
    +
    +    val dataForSparse = sc.parallelize(sparseData.toSeq, 2)
    +    val (_, sparseTime) = time(dataForSparse.summarizeStatistics())
    +
    +    println(s"dense time is $denseTime, sparse time is $sparseTime.")
    +    assert(relativeTime(denseTime, sparseTime),
    --- End diff --
    
    OK, I know your mean.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/268#discussion_r11193626
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/VectorRDDFunctions.scala ---
    @@ -0,0 +1,156 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.mllib.rdd
    +
    +import breeze.linalg.{axpy, Vector => BV}
    +
    +import org.apache.spark.mllib.linalg.Vector
    +import org.apache.spark.rdd.RDD
    +
    +/**
    + * Case class of the summary statistics, including mean, variance, count, max, min, and non-zero
    + * elements count.
    + */
    +case class VectorRDDStatisticalAggregator(
    +    mean: BV[Double],
    +    statCounter: BV[Double],
    +    totalCount: Double,
    +    numNonZeros: BV[Double],
    +    max: BV[Double],
    +    min: BV[Double])
    +
    +/**
    + * Extra functions available on RDDs of [[org.apache.spark.mllib.linalg.Vector Vector]] through an
    + * implicit conversion. Import `org.apache.spark.MLContext._` at the top of your program to use
    + * these functions.
    + */
    +class VectorRDDFunctions(self: RDD[Vector]) extends Serializable {
    +
    +  /**
    +   * Aggregate function used for aggregating elements in a worker together.
    +   */
    +  private def seqOp(
    +      aggregator: VectorRDDStatisticalAggregator,
    +      currData: BV[Double]): VectorRDDStatisticalAggregator = {
    +    aggregator match {
    +      case VectorRDDStatisticalAggregator(prevMean, prevM2n, cnt, nnzVec, maxVec, minVec) =>
    +        currData.activeIterator.foreach {
    +          case (id, 0.0) =>
    +          case (id, value) =>
    +            if (maxVec(id) < value) maxVec(id) = value
    +            if (minVec(id) > value) minVec(id) = value
    +
    +            val tmpPrevMean = prevMean(id)
    +            prevMean(id) = (prevMean(id) * cnt + value) / (cnt + 1.0)
    +            prevM2n(id) += (value - prevMean(id)) * (value - tmpPrevMean)
    +
    +            nnzVec(id) += 1.0
    +        }
    +
    +        VectorRDDStatisticalAggregator(
    +          prevMean,
    +          prevM2n,
    +          cnt + 1.0,
    +          nnzVec,
    +          maxVec,
    +          minVec)
    +    }
    +  }
    +
    +  /**
    +   * Combine function used for combining intermediate results together from every worker.
    +   */
    +  private def combOp(
    +      statistics1: VectorRDDStatisticalAggregator,
    +      statistics2: VectorRDDStatisticalAggregator): VectorRDDStatisticalAggregator = {
    +    (statistics1, statistics2) match {
    +      case (VectorRDDStatisticalAggregator(mean1, m2n1, cnt1, nnz1, max1, min1),
    +            VectorRDDStatisticalAggregator(mean2, m2n2, cnt2, nnz2, max2, min2)) =>
    +        val totalCnt = cnt1 + cnt2
    +        val deltaMean = mean2 - mean1
    +
    +        mean2.activeIterator.foreach {
    +          case (id, 0.0) =>
    +          case (id, value) =>
    +            mean1(id) = (mean1(id) * nnz1(id) + mean2(id) * nnz2(id)) / (nnz1(id) + nnz2(id))
    +        }
    +
    +        m2n2.activeIterator.foreach {
    +          case (id, 0.0) =>
    +          case (id, value) =>
    +            m2n1(id) +=
    +              value + deltaMean(id) * deltaMean(id) * nnz1(id) * nnz2(id) / (nnz1(id)+nnz2(id))
    +        }
    +
    +        max2.activeIterator.foreach {
    +          case (id, value) =>
    +            if (max1(id) < value) max1(id) = value
    +        }
    +
    +        min2.activeIterator.foreach {
    +          case (id, value) =>
    +            if (min1(id) > value) min1(id) = value
    +        }
    +
    +        axpy(1.0, nnz2, nnz1)
    +        VectorRDDStatisticalAggregator(mean1, m2n1, totalCnt, nnz1, max1, min1)
    +    }
    +  }
    +
    +  /**
    +   * Compute full column-wise statistics for the RDD with the size of Vector as input parameter.
    +   */
    +  def summarizeStatistics(): VectorRDDStatisticalAggregator = {
    +    val size = self.take(1).head.size
    +    val zeroValue = VectorRDDStatisticalAggregator(
    +      BV.zeros[Double](size),
    +      BV.zeros[Double](size),
    +      0.0,
    +      BV.zeros[Double](size),
    +      BV.fill(size)(Double.MinValue),
    +      BV.fill(size)(Double.MaxValue))
    +
    +    val VectorRDDStatisticalAggregator(currMean, currM2n, totalCnt, nnz, currMax, currMin) =
    +      self.map(_.toBreeze).aggregate(zeroValue)(seqOp, combOp)
    +
    +    // solve real mean
    +    val realMean = currMean :* nnz :/ totalCnt
    +
    +    // solve real m2n
    +    val deltaMean = currMean
    +    val realM2n = currM2n - ((deltaMean :* deltaMean) :* (nnz :* (nnz :- totalCnt)) :/ totalCnt)
    +
    +    // remove the initial value in max and min, i.e. the Double.MaxValue or Double.MinValue.
    +    nnz.activeIterator.foreach {
    +      case (id, 0.0) =>
    --- End diff --
    
    There are corner cases you don't cover, e.g.,
    
    ~~~
    1
    2
    3
    0
    ~~~
    
    Ignoring `0`, the min value is `1`. Basically, if nnz is smaller than count, you need to update min and max by comparing them to `0`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by yinxusen <gi...@git.apache.org>.
Github user yinxusen commented on the pull request:

    https://github.com/apache/spark/pull/268#issuecomment-39110100
  
    @mengxr I am not very sure of the concept of sparse vector. In your example, do you mean the column is `Vector(1.0, 0.0, 2.0, 0.0, 3.0, 0.0, 0.0)` or 
    `RDD(
    Vector(1.0), 
    Vector(0.0), 
    Vector(2.0), 
    Vector(0.0), 
    Vector(3.0), 
    Vector(0.0), 
    Vector(0.0)
    )`?
    
    If it is the case 1, then it is easy to rewrite it in O(nnz), otherwise, it will be difficult, because we cannot judge whether a column is sparse or not before we count the nnz. If the case 1 is your mean, then I think I should treat sparse vector different with the dense one with the following code:
    
    `RDD.take(1).head.type match {
      case DenseVector[Double] => xxx
      case SparseVector[Double] => xxx
    }`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/268#issuecomment-39529028
  
    Build finished. All automated tests passed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/268#discussion_r11236109
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/VectorRDDFunctions.scala ---
    @@ -0,0 +1,179 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.rdd
    +
    +import breeze.linalg.{axpy, Vector => BV}
    +
    +import org.apache.spark.mllib.linalg.{Vectors, Vector}
    +import org.apache.spark.rdd.RDD
    +
    +/**
    + * Trait of the summary statistics, including mean, variance, count, max, min, and non-zero elements
    + * count.
    + */
    +trait VectorRDDStatisticalSummary {
    +  def mean: Vector
    +  def variance: Vector
    +  def totalCount: Long
    +  def numNonZeros: Vector
    +  def max: Vector
    +  def min: Vector
    +}
    +
    +/**
    + * Aggregates [[org.apache.spark.mllib.rdd.VectorRDDStatisticalSummary VectorRDDStatisticalSummary]]
    + * together with add() and merge() function.
    + */
    +private class Aggregator(
    --- End diff --
    
    Use a more specific name. I used the name `Aggregator` in my previous comment just for demonstration. If a user calls `getClass`, he/she will see `org.apache.spark.mllib.rdd.Aggregator`, which is confusing.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/268#issuecomment-39415136
  
    Build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/268#issuecomment-40152052
  
     Merged build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/268#discussion_r11236175
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/VectorRDDFunctions.scala ---
    @@ -0,0 +1,179 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.rdd
    +
    +import breeze.linalg.{axpy, Vector => BV}
    +
    +import org.apache.spark.mllib.linalg.{Vectors, Vector}
    +import org.apache.spark.rdd.RDD
    +
    +/**
    + * Trait of the summary statistics, including mean, variance, count, max, min, and non-zero elements
    + * count.
    + */
    +trait VectorRDDStatisticalSummary {
    +  def mean: Vector
    +  def variance: Vector
    +  def totalCount: Long
    +  def numNonZeros: Vector
    +  def max: Vector
    +  def min: Vector
    +}
    +
    +/**
    + * Aggregates [[org.apache.spark.mllib.rdd.VectorRDDStatisticalSummary VectorRDDStatisticalSummary]]
    + * together with add() and merge() function.
    + */
    +private class Aggregator(
    +    val currMean: BV[Double],
    +    val currM2n: BV[Double],
    +    var totalCnt: Double,
    +    val nnz: BV[Double],
    +    val currMax: BV[Double],
    +    val currMin: BV[Double]) extends VectorRDDStatisticalSummary with Serializable {
    +
    +  // lazy val is used for computing only once time. Same below.
    +  override lazy val mean = Vectors.fromBreeze(currMean :* nnz :/ totalCnt)
    +
    +  // Online variance solution used in add() function, while parallel variance solution used in
    +  // merge() function. Reference here:
    +  // http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance
    +  // Solution here ignoring the zero elements when calling add() and merge(), for decreasing the
    +  // O(n) algorithm to O(nnz). Real variance is computed here after we get other statistics, simply
    +  // by another parallel combination process.
    +  override lazy val variance = {
    +    val deltaMean = currMean
    +    var i = 0
    +    while(i < currM2n.size) {
    +      currM2n(i) += deltaMean(i) * deltaMean(i) * nnz(i) * (totalCnt-nnz(i)) / totalCnt
    +      currM2n(i) /= totalCnt
    +      i += 1
    +    }
    +    Vectors.fromBreeze(currM2n)
    +  }
    +
    +  override lazy val totalCount: Long = totalCnt.toLong
    +
    +  override lazy val numNonZeros: Vector = Vectors.fromBreeze(nnz)
    +
    +  override lazy val max: Vector = {
    +    nnz.iterator.foreach {
    +      case (id, count) =>
    +        if ((count == 0.0) || ((count < totalCnt) && (currMax(id) < 0.0)))  currMax(id) = 0.0
    +    }
    +    Vectors.fromBreeze(currMax)
    +  }
    +
    +  override lazy val min: Vector = {
    +    nnz.iterator.foreach {
    +      case (id, count) =>
    +        if ((count == 0.0) || ((count < totalCnt) && (currMin(id) > 0.0))) currMin(id) = 0.0
    +    }
    +    Vectors.fromBreeze(currMin)
    +  }
    +
    +  /**
    +   * Aggregate function used for aggregating elements in a worker together.
    +   */
    +  def add(currData: BV[Double]): this.type = {
    +    currData.activeIterator.foreach {
    +      // this case is used for filtering the zero elements if the vector is a dense one.
    +      case (id, 0.0) =>
    +      case (id, value) =>
    +        if (currMax(id) < value) currMax(id) = value
    +        if (currMin(id) > value) currMin(id) = value
    +
    +        val tmpPrevMean = currMean(id)
    +        currMean(id) = (currMean(id) * nnz(id) + value) / (nnz(id) + 1.0)
    +        currM2n(id) += (value - currMean(id)) * (value - tmpPrevMean)
    +
    +        nnz(id) += 1.0
    +    }
    +
    +    totalCnt += 1.0
    +    this
    +  }
    +
    +  /**
    +   * Combine function used for combining intermediate results together from every worker.
    +   */
    +  def merge(other: Aggregator): this.type = {
    +
    +    totalCnt += other.totalCnt
    +
    +    val deltaMean = currMean - other.currMean
    +
    +    other.currMean.activeIterator.foreach {
    +      case (id, 0.0) =>
    +      case (id, value) =>
    +        currMean(id) =
    +          (currMean(id) * nnz(id) + other.currMean(id) * other.nnz(id)) / (nnz(id) + other.nnz(id))
    +    }
    +
    +    var i = 0
    +    while(i < currM2n.size) {
    +      (nnz(i), other.nnz(i)) match {
    +        case (0.0, 0.0) =>
    +        case _ => currM2n(i) +=
    +          other.currM2n(i) + deltaMean(i) * deltaMean(i) * nnz(i) * other.nnz(i) / (nnz(i)+other.nnz(i))
    +      }
    +      i += 1
    +    }
    +
    +    other.currMax.activeIterator.foreach {
    +      case (id, value) =>
    +        if (currMax(id) < value) currMax(id) = value
    +    }
    +
    +    other.currMin.activeIterator.foreach {
    +      case (id, value) =>
    +        if (currMin(id) > value) currMin(id) = value
    +    }
    +
    +    axpy(1.0, other.nnz, nnz)
    +    this
    +  }
    +}
    +
    +/**
    + * Extra functions available on RDDs of [[org.apache.spark.mllib.linalg.Vector Vector]] through an
    + * implicit conversion. Import `org.apache.spark.MLContext._` at the top of your program to use
    + * these functions.
    + */
    +class VectorRDDFunctions(self: RDD[Vector]) extends Serializable {
    +
    +  /**
    +   * Compute full column-wise statistics for the RDD with the size of Vector as input parameter.
    +   */
    +  def summarizeStatistics(): VectorRDDStatisticalSummary = {
    +    val size = self.take(1).head.size
    --- End diff --
    
    `take(1).head` = `first()`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/268#discussion_r11235747
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/VectorRDDFunctions.scala ---
    @@ -0,0 +1,179 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.rdd
    +
    +import breeze.linalg.{axpy, Vector => BV}
    +
    +import org.apache.spark.mllib.linalg.{Vectors, Vector}
    +import org.apache.spark.rdd.RDD
    +
    +/**
    + * Trait of the summary statistics, including mean, variance, count, max, min, and non-zero elements
    + * count.
    + */
    +trait VectorRDDStatisticalSummary {
    +  def mean: Vector
    +  def variance: Vector
    +  def totalCount: Long
    +  def numNonZeros: Vector
    +  def max: Vector
    +  def min: Vector
    +}
    +
    +/**
    + * Aggregates [[org.apache.spark.mllib.rdd.VectorRDDStatisticalSummary VectorRDDStatisticalSummary]]
    + * together with add() and merge() function.
    + */
    +private class Aggregator(
    +    val currMean: BV[Double],
    +    val currM2n: BV[Double],
    +    var totalCnt: Double,
    +    val nnz: BV[Double],
    +    val currMax: BV[Double],
    +    val currMin: BV[Double]) extends VectorRDDStatisticalSummary with Serializable {
    +
    +  // lazy val is used for computing only once time. Same below.
    +  override lazy val mean = Vectors.fromBreeze(currMean :* nnz :/ totalCnt)
    +
    +  // Online variance solution used in add() function, while parallel variance solution used in
    +  // merge() function. Reference here:
    +  // http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance
    +  // Solution here ignoring the zero elements when calling add() and merge(), for decreasing the
    +  // O(n) algorithm to O(nnz). Real variance is computed here after we get other statistics, simply
    +  // by another parallel combination process.
    +  override lazy val variance = {
    +    val deltaMean = currMean
    +    var i = 0
    +    while(i < currM2n.size) {
    +      currM2n(i) += deltaMean(i) * deltaMean(i) * nnz(i) * (totalCnt-nnz(i)) / totalCnt
    +      currM2n(i) /= totalCnt
    +      i += 1
    +    }
    +    Vectors.fromBreeze(currM2n)
    +  }
    +
    +  override lazy val totalCount: Long = totalCnt.toLong
    +
    +  override lazy val numNonZeros: Vector = Vectors.fromBreeze(nnz)
    +
    +  override lazy val max: Vector = {
    +    nnz.iterator.foreach {
    +      case (id, count) =>
    +        if ((count == 0.0) || ((count < totalCnt) && (currMax(id) < 0.0)))  currMax(id) = 0.0
    +    }
    +    Vectors.fromBreeze(currMax)
    +  }
    +
    +  override lazy val min: Vector = {
    +    nnz.iterator.foreach {
    +      case (id, count) =>
    +        if ((count == 0.0) || ((count < totalCnt) && (currMin(id) > 0.0))) currMin(id) = 0.0
    +    }
    +    Vectors.fromBreeze(currMin)
    +  }
    +
    +  /**
    +   * Aggregate function used for aggregating elements in a worker together.
    +   */
    +  def add(currData: BV[Double]): this.type = {
    +    currData.activeIterator.foreach {
    +      // this case is used for filtering the zero elements if the vector is a dense one.
    +      case (id, 0.0) =>
    +      case (id, value) =>
    +        if (currMax(id) < value) currMax(id) = value
    +        if (currMin(id) > value) currMin(id) = value
    +
    +        val tmpPrevMean = currMean(id)
    +        currMean(id) = (currMean(id) * nnz(id) + value) / (nnz(id) + 1.0)
    +        currM2n(id) += (value - currMean(id)) * (value - tmpPrevMean)
    +
    +        nnz(id) += 1.0
    +    }
    +
    +    totalCnt += 1.0
    +    this
    +  }
    +
    +  /**
    +   * Combine function used for combining intermediate results together from every worker.
    +   */
    +  def merge(other: Aggregator): this.type = {
    +
    +    totalCnt += other.totalCnt
    +
    +    val deltaMean = currMean - other.currMean
    +
    +    other.currMean.activeIterator.foreach {
    +      case (id, 0.0) =>
    +      case (id, value) =>
    +        currMean(id) =
    +          (currMean(id) * nnz(id) + other.currMean(id) * other.nnz(id)) / (nnz(id) + other.nnz(id))
    +    }
    +
    +    var i = 0
    +    while(i < currM2n.size) {
    +      (nnz(i), other.nnz(i)) match {
    +        case (0.0, 0.0) =>
    +        case _ => currM2n(i) +=
    +          other.currM2n(i) + deltaMean(i) * deltaMean(i) * nnz(i) * other.nnz(i) / (nnz(i)+other.nnz(i))
    +      }
    +      i += 1
    +    }
    +
    +    other.currMax.activeIterator.foreach {
    +      case (id, value) =>
    +        if (currMax(id) < value) currMax(id) = value
    +    }
    +
    +    other.currMin.activeIterator.foreach {
    +      case (id, value) =>
    +        if (currMin(id) > value) currMin(id) = value
    +    }
    +
    +    axpy(1.0, other.nnz, nnz)
    --- End diff --
    
    This is "nnz += other.nnz". Remove `axpy` from imports.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [WIP] [SPARK-1328] Add vector statistics

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/268#issuecomment-40028835
  
     Merged build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---