You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by freeman-lab <gi...@git.apache.org> on 2014/07/10 21:53:52 UTC

[GitHub] spark pull request: Streaming mllib

GitHub user freeman-lab opened a pull request:

    https://github.com/apache/spark/pull/1361

    Streaming mllib

    This PR implements a streaming linear regression analysis, in which a linear regression model is trained online as new data arrive. The design is based on discussions with @tdas and @mengxr, in which we determined how to add this functionality in a general way, with minimal changes to existing libraries.
    
    __Summary of additions:__
    
    _StreamingRegression_
    - An abstract class for fitting regression analyses online on streaming data, including training on (and updating) a model, and making predictions
    
    _StreamingLinearRegressionWithSGD_
    - Class and companion object for running streaming linear regression
    
    _MLStreamingUtils_
    - Utility for loading and parsing streaming data from a text file stream, could be extended with functions for loading data from Kafka, Network, etc.
    
    _StreamingLinearRegression_
    - Example use case: fitting a model online to data from one stream, and making predictions on other data
    
    __Notes__
    - I will definitely add tests but I wasn't sure where it makes sense to put them: mllib or streaming?
    - If this looks good, I can use the StreamingRegression class to do all other regression analyses (Ridge, Lasso, etc.), and a similar StreamingClassification class would give us logistic and SVM classification.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/freeman-lab/spark streaming-mllib

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/1361.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1361
    
----
commit 0898add2e1dd2f1faac9e8d08c758994af03ee6e
Author: freeman <th...@gmail.com>
Date:   2014-07-10T14:39:31Z

    Added dependency on streaming

commit d99aa85d8f275ca605aacb2804f0c55fff10ff2b
Author: freeman <th...@gmail.com>
Date:   2014-07-10T14:40:55Z

    Helper methods for streaming MLlib apps

commit 604f4d738357adccc0168f8449614e8e09d9f70e
Author: freeman <th...@gmail.com>
Date:   2014-07-10T14:41:25Z

    Expanded private class to include mllib

commit c4b1143dc2ab39506aeefb2f7a89485196308d08
Author: freeman <th...@gmail.com>
Date:   2014-07-10T14:43:16Z

    Streaming linear regression
    
    - Abstract class to support a variety of streaming regression analyses
    - Example concrete class for streaming linear regression
    - Example usage: continually train on one data stream and test on
    another

commit 453974e75afbebfc605e80efaa32e8f45dc0e258
Author: freeman <th...@gmail.com>
Date:   2014-07-10T19:36:14Z

    Fixed indentation

commit fd31e036afe537b86d49487527eca83ac62c7630
Author: freeman <th...@gmail.com>
Date:   2014-07-10T19:49:32Z

    Changed logging behavior

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1361#issuecomment-50952222
  
    QA results for PR 1361:<br>- This patch PASSES unit tests.<br>- This patch merges cleanly<br>- This patch adds the following public classes (experimental):<br>class StreamingLinearRegressionWithSGD (<br><br>For more information see test ouptut:<br>https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17732/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1361#discussion_r15723317
  
    --- Diff: mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala ---
    @@ -0,0 +1,133 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.regression
    +
    +import java.io.File
    +import java.nio.charset.Charset
    +
    +import scala.collection.mutable.ArrayBuffer
    +
    +import com.google.common.io.Files
    +import org.scalatest.FunSuite
    +
    +import org.apache.spark.mllib.linalg.Vectors
    +import org.apache.spark.mllib.util.{LinearDataGenerator, LocalSparkContext, MLUtils}
    +import org.apache.spark.streaming.{Milliseconds, StreamingContext}
    +import org.apache.spark.util.Utils
    +
    +class StreamingLinearRegressionSuite extends FunSuite with LocalSparkContext {
    +
    +  // Assert that two values are equal within tolerance epsilon
    +  def assertEqual(v1: Double, v2: Double, epsilon: Double) {
    +    def errorMessage = v1.toString + " did not equal " + v2.toString
    +    assert(math.abs(v1-v2) <= epsilon, errorMessage)
    +  }
    +
    +  // Assert that model predictions are correct
    +  def validatePrediction(predictions: Seq[Double], input: Seq[LabeledPoint]) {
    +    val numOffPredictions = predictions.zip(input).count { case (prediction, expected) =>
    +      // A prediction is off if the prediction is more than 0.5 away from expected value.
    +      math.abs(prediction - expected.label) > 0.5
    +    }
    +    // At least 80% of the predictions should be on.
    +    assert(numOffPredictions < input.length / 5)
    +  }
    +
    +  // Test if we can accurately learn Y = 10*X1 + 10*X2 on streaming data
    +  test("streaming linear regression parameter accuracy") {
    +
    +    val testDir = Files.createTempDir()
    +    val numBatches = 10
    +    val batchDuration = Milliseconds(1000)
    +    val ssc = new StreamingContext(sc, batchDuration)
    +    val data = MLUtils.loadStreamingLabeledPoints(ssc, testDir.toString)
    +    val model = new StreamingLinearRegressionWithSGD()
    +      .setInitialWeights(Vectors.dense(0.0, 0.0))
    +      .setStepSize(0.1)
    +      .setNumIterations(50)
    +
    +    model.trainOn(data)
    +
    +    ssc.start()
    +
    +    // write data to a file stream
    +    for (i <- 0 until numBatches) {
    +      val samples = LinearDataGenerator.generateLinearInput(0.0, Array(10.0, 10.0), 100, 42 * (i + 1))
    +      val file = new File(testDir, i.toString)
    +      Files.write(samples.map(x => x.toString).mkString("\n"), file, Charset.forName("UTF-8"))
    +      Thread.sleep(batchDuration.milliseconds)
    +    }
    +
    +    ssc.stop(stopSparkContext=false)
    +
    +    System.clearProperty("spark.driver.port")
    +    Utils.deleteRecursively(testDir)
    +
    +    // check accuracy of final parameter estimates
    +    assertEqual(model.latestModel().intercept, 0.0, 0.1)
    +    assertEqual(model.latestModel().weights(0), 10.0, 0.1)
    +    assertEqual(model.latestModel().weights(1), 10.0, 0.1)
    +
    +    // check accuracy of predictions
    +    val validationData = LinearDataGenerator.generateLinearInput(0.0, Array(10.0, 10.0), 100, 17)
    +    validatePrediction(validationData.map(row => model.latestModel().predict(row.features)), validationData)
    --- End diff --
    
    This line is too wide.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by freeman-lab <gi...@git.apache.org>.
Github user freeman-lab commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1361#discussion_r15725602
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala ---
    @@ -174,17 +182,18 @@ object GradientDescent extends Logging {
           weights, Vectors.dense(new Array[Double](weights.size)), 0, 1, regParam)._2
     
         for (i <- 1 to numIterations) {
    -      val bcWeights = data.context.broadcast(weights)
    --- End diff --
    
    This was my mistake, should be fixed now.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1361#issuecomment-49389728
  
    QA results for PR 1361:<br>- This patch PASSES unit tests.<br>- This patch merges cleanly<br>- This patch adds no public classes<br><br>For more information see test ouptut:<br>https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16799/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1361#discussion_r15724913
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionWithSGD.scala ---
    @@ -0,0 +1,86 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.regression
    +
    +import org.apache.spark.annotation.Experimental
    +import org.apache.spark.mllib.linalg.{Vector, Vectors}
    +
    +/**
    + * Train or predict a linear regression model on streaming data. Training uses
    + * Stochastic Gradient Descent to update the model based on each new batch of
    + * incoming data from a DStream (see `LinearRegressionWithSGD` for model equation)
    + *
    + * Each batch of data is assumed to be an RDD of LabeledPoints.
    + * The number of data points per batch can vary, but the number
    + * of features must be constant. An initial weight
    + * vector must be provided.
    + *
    + * Use a builder pattern to construct a streaming linear regression
    + * analysis in an application, like:
    + *
    + *  val model = new StreamingLinearRegressionWithSGD()
    + *    .setStepSize(0.5)
    + *    .setNumIterations(10)
    + *    .setInitialWeights(Vectors.dense(...))
    + *    .trainOn(DStream)
    + *
    + */
    +@Experimental
    +class StreamingLinearRegressionWithSGD (
    +    private var stepSize: Double,
    +    private var numIterations: Int,
    +    private var miniBatchFraction: Double,
    +    private var initialWeights: Vector)
    +  extends StreamingLinearAlgorithm[
    +    LinearRegressionModel, LinearRegressionWithSGD] with Serializable {
    +
    +  /**
    +   * Construct a StreamingLinearRegression object with default parameters:
    +   * {stepSize: 0.1, numIterations: 50, miniBatchFraction: 1.0, initialWeights: [0.0, 0.0]}.
    +   */
    +  def this() = this(0.1, 50, 1.0, Vectors.dense(0.0, 0.0))
    --- End diff --
    
    The former solution looks better to me. In the future, we can determine the number of features from data.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/1361


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by freeman-lab <gi...@git.apache.org>.
Github user freeman-lab commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1361#discussion_r15725513
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionWithSGD.scala ---
    @@ -0,0 +1,86 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.regression
    +
    +import org.apache.spark.annotation.Experimental
    +import org.apache.spark.mllib.linalg.{Vector, Vectors}
    +
    +/**
    + * Train or predict a linear regression model on streaming data. Training uses
    + * Stochastic Gradient Descent to update the model based on each new batch of
    + * incoming data from a DStream (see `LinearRegressionWithSGD` for model equation)
    + *
    + * Each batch of data is assumed to be an RDD of LabeledPoints.
    + * The number of data points per batch can vary, but the number
    + * of features must be constant. An initial weight
    + * vector must be provided.
    + *
    + * Use a builder pattern to construct a streaming linear regression
    + * analysis in an application, like:
    + *
    + *  val model = new StreamingLinearRegressionWithSGD()
    + *    .setStepSize(0.5)
    + *    .setNumIterations(10)
    + *    .setInitialWeights(Vectors.dense(...))
    + *    .trainOn(DStream)
    + *
    + */
    +@Experimental
    +class StreamingLinearRegressionWithSGD (
    +    private var stepSize: Double,
    +    private var numIterations: Int,
    +    private var miniBatchFraction: Double,
    +    private var initialWeights: Vector)
    +  extends StreamingLinearAlgorithm[
    +    LinearRegressionModel, LinearRegressionWithSGD] with Serializable {
    +
    +  /**
    +   * Construct a StreamingLinearRegression object with default parameters:
    +   * {stepSize: 0.1, numIterations: 50, miniBatchFraction: 1.0, initialWeights: [0.0, 0.0]}.
    +   */
    +  def this() = this(0.1, 50, 1.0, Vectors.dense(0.0, 0.0))
    --- End diff --
    
    Ok, should probably throw the same error when calling ``predictOn`` too right? Otherwise we'll get errors inside model.predict.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1361#discussion_r15708464
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala ---
    @@ -0,0 +1,83 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.regression
    +
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.Logging
    +import org.apache.spark.streaming.dstream.DStream
    +
    +/**
    + * :: DeveloperApi ::
    --- End diff --
    
    Somewhere it needs to be documented that ordering relationship of trainOn() and predictOn(). Should one be called before other? Can one be called after others?  Can any of them be called multiple times? What are the implications? If one is to be called only after another, then we will have to think about restricting it in the aPI itself (throw, error is ordering is wrong)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on the pull request:

    https://github.com/apache/spark/pull/1361#issuecomment-50934956
  
    @freeman-lab I think the static methods `StreamingLinearRegressionWithSGD.start` are not necessary, and these methods actually do not start anything. Do you mind removing the static methods and advice users to use the builder pattern?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1361#discussion_r15565956
  
    --- Diff: mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala ---
    @@ -0,0 +1,127 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.regression
    +
    +import java.io.File
    +
    +import com.google.common.io.Files
    +import org.apache.commons.io.FileUtils
    +import org.scalatest.FunSuite
    --- End diff --
    
    add an empty line to separate 3rd party imports from spark imports


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1361#issuecomment-50908639
  
    QA tests have started for PR 1361. This patch merges cleanly. <br>View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17681/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1361#discussion_r15566050
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegression.scala ---
    @@ -0,0 +1,104 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.regression
    +
    +import org.apache.spark.mllib.linalg.Vectors
    +import org.apache.spark.annotation.Experimental
    +
    +/**
    + * Train or predict a linear regression model on streaming data. Training uses
    + * Stochastic Gradient Descent to update the model based on each new batch of
    + * incoming data from a DStream (see LinearRegressionWithSGD for model equation)
    + *
    + * Each batch of data is assumed to be an RDD of LabeledPoints.
    + * The number of data points per batch can vary, but the number
    + * of features must be constant.
    + */
    +@Experimental
    +class StreamingLinearRegressionWithSGD private (
    +    private var stepSize: Double,
    +    private var numIterations: Int,
    +    private var miniBatchFraction: Double,
    +    private var numFeatures: Int)
    --- End diff --
    
    If we ask the users to provide the initial weight, we don't need this argument.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1361#discussion_r15565941
  
    --- Diff: mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala ---
    @@ -0,0 +1,127 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.regression
    +
    +import java.io.File
    +
    +import com.google.common.io.Files
    +import org.apache.commons.io.FileUtils
    --- End diff --
    
    We try to avoid using `commons-io` for compatibility. Please check https://github.com/apache/spark/pull/226 and use `Utils.deleteRecursively` defined there to replace `FileUtils.delete*`. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by freeman-lab <gi...@git.apache.org>.
Github user freeman-lab commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1361#discussion_r15671309
  
    --- Diff: mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala ---
    @@ -0,0 +1,127 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.regression
    +
    +import java.io.File
    +
    +import com.google.common.io.Files
    +import org.apache.commons.io.FileUtils
    +import org.scalatest.FunSuite
    +import org.apache.spark.SparkConf
    +import org.apache.spark.streaming.{Milliseconds, Seconds, StreamingContext}
    +import org.apache.spark.mllib.util.{MLStreamingUtils, LinearDataGenerator, LocalSparkContext}
    +
    +import scala.collection.mutable.ArrayBuffer
    +
    +class StreamingLinearRegressionSuite extends FunSuite {
    --- End diff --
    
    I'm using the one from mllib, just got it working with a tip from TD, the call to ``ssc.stop`` after each test needs ``stopSparkContext=false``.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by freeman-lab <gi...@git.apache.org>.
Github user freeman-lab commented on the pull request:

    https://github.com/apache/spark/pull/1361#issuecomment-49384594
  
    @mengxr mind retesting? I tried to make the convergence test more robust in a couple ways. If we still have issues we might need to rethink that test further. Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by freeman-lab <gi...@git.apache.org>.
Github user freeman-lab commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1361#discussion_r15684457
  
    --- Diff: examples/src/main/scala/org/apache/spark/examples/mllib/StreamingLinearRegression.scala ---
    @@ -0,0 +1,56 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.examples.mllib
    +
    +import org.apache.spark.SparkConf
    +import org.apache.spark.mllib.util.MLStreamingUtils
    +import org.apache.spark.mllib.regression.StreamingLinearRegressionWithSGD
    +import org.apache.spark.streaming.{Seconds, StreamingContext}
    +
    +/**
    + * Continually update a model on one stream of data using streaming linear regression,
    + * while making predictions on another stream of data
    + *
    + */
    +object StreamingLinearRegression {
    +
    +  def main(args: Array[String]) {
    +
    +    if (args.length != 4) {
    +      System.err.println(
    +        "Usage: StreamingLinearRegression <trainingData> <testData> <batchDuration> <numFeatures>")
    +      System.exit(1)
    +    }
    +
    +    val conf = new SparkConf().setMaster("local").setAppName("StreamingLinearRegression")
    +    val ssc = new StreamingContext(conf, Seconds(args(2).toLong))
    +
    +    val trainingData = MLStreamingUtils.loadLabeledPointsFromText(ssc, args(0))
    --- End diff --
    
    I like this idea, but I can actually trivially remove ``MLStreamingUtils`` by just moving its ``loadStreamingLabeledPoints`` into ``MLUtils``. Maybe we can rename LabeledPoint in another PR. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1361#discussion_r15565768
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingRegression.scala ---
    @@ -0,0 +1,83 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.regression
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.annotation.{Experimental, DeveloperApi}
    +import org.apache.spark.streaming.dstream.DStream
    +
    +/**
    + * :: DeveloperApi ::
    + * StreamingRegression implements methods for training
    + * a linear regression model on streaming data, and using it
    + * for prediction on streaming data.
    + *
    + * This class takes as type parameters a GeneralizedLinearModel,
    + * and a GeneralizedLinearAlgorithm, making it easy to extend to construct
    + * streaming versions of arbitrary regression analyses. For example usage,
    + * see StreamingLinearRegressionWithSGD.
    + *
    + */
    +@DeveloperApi
    +@Experimental
    +abstract class StreamingRegression[
    +    M <: GeneralizedLinearModel,
    +    A <: GeneralizedLinearAlgorithm[M]] extends Logging {
    +
    +  /** The model to be updated and used for prediction. */
    +  var model: M
    +
    +  /** The algorithm to use for updating. */
    +  val algorithm: A
    +
    +  /** Return the latest model. */
    +  def latest(): M = {
    +    model
    +  }
    +
    +  /**
    +   * Update the model by training on batches of data from a DStream.
    +   * This operation registers a DStream for training the model,
    +   * and updates the model based on every subsequent non-empty
    +   * batch of data from the stream.
    +   *
    +   * @param data DStream containing labeled data
    +   */
    +  def trainOn(data: DStream[LabeledPoint]) {
    +    data.foreachRDD{
    +      rdd =>
    +        if (rdd.count() > 0) {
    +          model = algorithm.run(rdd, model.weights)
    +          logInfo("Model updated")
    +        }
    +        logInfo("Current model: weights, %s".format(model.weights.toString))
    +        logInfo("Current model: intercept, %s".format(model.intercept.toString))
    --- End diff --
    
    The intercept won't be updated because we didn't pass the intercept in line 65. This is due to our implementation of GLM. I think we can ignore intercept and ask user to append the bias to the original data if they need intercept.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by freeman-lab <gi...@git.apache.org>.
Github user freeman-lab commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1361#discussion_r15628387
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingRegression.scala ---
    @@ -0,0 +1,83 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.regression
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.annotation.{Experimental, DeveloperApi}
    +import org.apache.spark.streaming.dstream.DStream
    +
    +/**
    + * :: DeveloperApi ::
    + * StreamingRegression implements methods for training
    + * a linear regression model on streaming data, and using it
    + * for prediction on streaming data.
    + *
    + * This class takes as type parameters a GeneralizedLinearModel,
    + * and a GeneralizedLinearAlgorithm, making it easy to extend to construct
    + * streaming versions of arbitrary regression analyses. For example usage,
    + * see StreamingLinearRegressionWithSGD.
    + *
    + */
    +@DeveloperApi
    +@Experimental
    +abstract class StreamingRegression[
    +    M <: GeneralizedLinearModel,
    +    A <: GeneralizedLinearAlgorithm[M]] extends Logging {
    +
    +  /** The model to be updated and used for prediction. */
    +  var model: M
    +
    +  /** The algorithm to use for updating. */
    +  val algorithm: A
    +
    +  /** Return the latest model. */
    +  def latest(): M = {
    +    model
    +  }
    +
    +  /**
    +   * Update the model by training on batches of data from a DStream.
    +   * This operation registers a DStream for training the model,
    +   * and updates the model based on every subsequent non-empty
    +   * batch of data from the stream.
    +   *
    +   * @param data DStream containing labeled data
    +   */
    +  def trainOn(data: DStream[LabeledPoint]) {
    +    data.foreachRDD{
    +      rdd =>
    +        if (rdd.count() > 0) {
    --- End diff --
    
    Ok cool, without this we get NaNs when updating empty RDDs. I'm tracking it down, but pretty sure we'd just need to fix ``runMiniBatchSGD`` from ``GradientDescent``. A count already happens there (line 161), maybe just have it return the initial weights if that count is 0?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by freeman-lab <gi...@git.apache.org>.
Github user freeman-lab commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1361#discussion_r15684474
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegression.scala ---
    @@ -0,0 +1,104 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.regression
    +
    +import org.apache.spark.mllib.linalg.Vectors
    +import org.apache.spark.annotation.Experimental
    +
    +/**
    + * Train or predict a linear regression model on streaming data. Training uses
    + * Stochastic Gradient Descent to update the model based on each new batch of
    + * incoming data from a DStream (see LinearRegressionWithSGD for model equation)
    + *
    + * Each batch of data is assumed to be an RDD of LabeledPoints.
    + * The number of data points per batch can vary, but the number
    + * of features must be constant.
    + */
    +@Experimental
    +class StreamingLinearRegressionWithSGD private (
    +    private var stepSize: Double,
    +    private var numIterations: Int,
    +    private var miniBatchFraction: Double,
    +    private var numFeatures: Int)
    +  extends StreamingRegression[LinearRegressionModel, LinearRegressionWithSGD] with Serializable {
    +
    +  val algorithm = new LinearRegressionWithSGD(stepSize, numIterations, miniBatchFraction)
    +
    +  var model = algorithm.createModel(Vectors.dense(new Array[Double](numFeatures)), 0.0)
    +
    +}
    +
    +/**
    + * Top-level methods for calling StreamingLinearRegressionWithSGD.
    + */
    +@Experimental
    +object StreamingLinearRegressionWithSGD {
    +
    +  /**
    +   * Start a streaming Linear Regression model by setting optimization parameters.
    +   *
    +   * @param numIterations Number of iterations of gradient descent to run.
    +   * @param stepSize Step size to be used for each iteration of gradient descent.
    +   * @param miniBatchFraction Fraction of data to be used per iteration.
    +   * @param numFeatures Number of features per record, must be constant for all batches of data.
    +   */
    +  def start(
    --- End diff --
    
    Ok, I added setters and do it that way in the example, but kept the static ``start`` method for consistency with the others, can always drop later.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1361#issuecomment-50948711
  
    QA results for PR 1361:<br>- This patch PASSES unit tests.<br>- This patch merges cleanly<br>- This patch adds the following public classes (experimental):<br>class StreamingLinearRegressionWithSGD (<br><br>For more information see test ouptut:<br>https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17716/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1361#issuecomment-50898186
  
    QA results for PR 1361:<br>- This patch FAILED unit tests.<br><br>For more information see test ouptut:<br>https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17675/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1361#discussion_r15565824
  
    --- Diff: mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala ---
    @@ -0,0 +1,127 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.regression
    +
    +import java.io.File
    +
    +import com.google.common.io.Files
    +import org.apache.commons.io.FileUtils
    +import org.scalatest.FunSuite
    +import org.apache.spark.SparkConf
    +import org.apache.spark.streaming.{Milliseconds, Seconds, StreamingContext}
    +import org.apache.spark.mllib.util.{MLStreamingUtils, LinearDataGenerator, LocalSparkContext}
    +
    +import scala.collection.mutable.ArrayBuffer
    +
    +class StreamingLinearRegressionSuite extends FunSuite {
    +
    +  // Assert that two values are equal within tolerance epsilon
    +  def assertEqual(v1: Double, v2: Double, epsilon: Double) {
    --- End diff --
    
    We now have some utils defined in `mllib.TestingUtils` to make this easier: `assert(v1 ~== v2 relTol 1e-15)`. It is okay to keep them in this PR. We can update it after this is merged.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/1361#issuecomment-48661164
  
    Awesome, time to have some fun :D 
    Roping in @pwendell



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1361#discussion_r15724803
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala ---
    @@ -0,0 +1,94 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.regression
    +
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.Logging
    +import org.apache.spark.streaming.dstream.DStream
    +
    +/**
    + * :: DeveloperApi ::
    + * StreamingLinearAlgorithm implements methods for continuously
    + * training a generalized linear model model on streaming data,
    + * and using it for prediction on (possibly different) streaming data.
    + *
    + * This class takes as type parameters a GeneralizedLinearModel,
    + * and a GeneralizedLinearAlgorithm, making it easy to extend to construct
    + * streaming versions of any analyses using GLMs. Only weights will be updated,
    + * not an intercept. If the model needs an intercept, it should be manually appended
    + * to the input data.
    + *
    + * For example usage, see `StreamingLinearRegressionWithSGD`.
    + *
    + * NOTE(Freeman): In some use cases, the order in which trainOn and predictOn
    + * are called in an application will affect the results. When called on
    + * the same DStream, if trainOn is called before predictOn, when new data
    + * arrive the model will update and the prediction will be based on the new
    + * model. Whereas if predictOn is called first, the prediction will use the model
    + * from the previous update.
    + *
    + * NOTE(Freeman): It is ok to call predictOn repeatedly on multiple streams; this
    + * will generate predictions for each one all using the current model.
    + * It is also ok to call trainOn on different streams; this will update
    + * the model using each of the different sources, in sequence.
    + *
    + */
    +@DeveloperApi
    +abstract class StreamingLinearAlgorithm[
    +    M <: GeneralizedLinearModel,
    +    A <: GeneralizedLinearAlgorithm[M]] extends Logging {
    +
    +  /** The model to be updated and used for prediction. */
    +  protected var model: M
    +
    +  /** The algorithm to use for updating. */
    +  protected val algorithm: A
    +
    +  /** Return the latest model. */
    +  def latestModel(): M = {
    +    model
    +  }
    +
    +  /**
    +   * Update the model by training on batches of data from a DStream.
    +   * This operation registers a DStream for training the model,
    +   * and updates the model based on every subsequent
    +   * batch of data from the stream.
    +   *
    +   * @param data DStream containing labeled data
    +   */
    +  def trainOn(data: DStream[LabeledPoint]) {
    +    data.foreachRDD { (rdd, time) =>
    +        model = algorithm.run(rdd, model.weights)
    +        logInfo("Model updated at time %s".format(time.toString))
    +        logInfo("Current model: weights, %s".format(
    +          model.weights.toArray.take(100).mkString("[", ",", "]")))
    --- End diff --
    
    If the number of features is greater than 100, we should output `, ...` instead of `]`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by freeman-lab <gi...@git.apache.org>.
Github user freeman-lab commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1361#discussion_r15628426
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegression.scala ---
    @@ -0,0 +1,104 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.regression
    +
    +import org.apache.spark.mllib.linalg.Vectors
    +import org.apache.spark.annotation.Experimental
    +
    +/**
    + * Train or predict a linear regression model on streaming data. Training uses
    + * Stochastic Gradient Descent to update the model based on each new batch of
    + * incoming data from a DStream (see LinearRegressionWithSGD for model equation)
    + *
    + * Each batch of data is assumed to be an RDD of LabeledPoints.
    + * The number of data points per batch can vary, but the number
    + * of features must be constant.
    + */
    +@Experimental
    +class StreamingLinearRegressionWithSGD private (
    +    private var stepSize: Double,
    +    private var numIterations: Int,
    +    private var miniBatchFraction: Double,
    +    private var numFeatures: Int)
    +  extends StreamingRegression[LinearRegressionModel, LinearRegressionWithSGD] with Serializable {
    +
    +  val algorithm = new LinearRegressionWithSGD(stepSize, numIterations, miniBatchFraction)
    +
    +  var model = algorithm.createModel(Vectors.dense(new Array[Double](numFeatures)), 0.0)
    +
    +}
    +
    +/**
    + * Top-level methods for calling StreamingLinearRegressionWithSGD.
    + */
    +@Experimental
    +object StreamingLinearRegressionWithSGD {
    +
    +  /**
    +   * Start a streaming Linear Regression model by setting optimization parameters.
    +   *
    +   * @param numIterations Number of iterations of gradient descent to run.
    +   * @param stepSize Step size to be used for each iteration of gradient descent.
    +   * @param miniBatchFraction Fraction of data to be used per iteration.
    +   * @param numFeatures Number of features per record, must be constant for all batches of data.
    +   */
    +  def start(
    --- End diff --
    
    Nice, I'll add the setter methods, should we then drop the companion object and static methods entirely? Some others, like ``LinearRegressionWithSGD``, have both.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1361#issuecomment-50946889
  
    QA tests have started for PR 1361. This patch merges cleanly. <br>View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17718/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1361#discussion_r15565686
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingRegression.scala ---
    @@ -0,0 +1,83 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.regression
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.annotation.{Experimental, DeveloperApi}
    +import org.apache.spark.streaming.dstream.DStream
    +
    +/**
    + * :: DeveloperApi ::
    + * StreamingRegression implements methods for training
    + * a linear regression model on streaming data, and using it
    + * for prediction on streaming data.
    + *
    + * This class takes as type parameters a GeneralizedLinearModel,
    + * and a GeneralizedLinearAlgorithm, making it easy to extend to construct
    + * streaming versions of arbitrary regression analyses. For example usage,
    + * see StreamingLinearRegressionWithSGD.
    + *
    + */
    +@DeveloperApi
    +@Experimental
    +abstract class StreamingRegression[
    +    M <: GeneralizedLinearModel,
    +    A <: GeneralizedLinearAlgorithm[M]] extends Logging {
    +
    +  /** The model to be updated and used for prediction. */
    +  var model: M
    --- End diff --
    
    Add `protected`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1361#discussion_r15565706
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingRegression.scala ---
    @@ -0,0 +1,83 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.regression
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.annotation.{Experimental, DeveloperApi}
    +import org.apache.spark.streaming.dstream.DStream
    +
    +/**
    + * :: DeveloperApi ::
    + * StreamingRegression implements methods for training
    + * a linear regression model on streaming data, and using it
    + * for prediction on streaming data.
    + *
    + * This class takes as type parameters a GeneralizedLinearModel,
    + * and a GeneralizedLinearAlgorithm, making it easy to extend to construct
    + * streaming versions of arbitrary regression analyses. For example usage,
    + * see StreamingLinearRegressionWithSGD.
    + *
    + */
    +@DeveloperApi
    +@Experimental
    +abstract class StreamingRegression[
    +    M <: GeneralizedLinearModel,
    +    A <: GeneralizedLinearAlgorithm[M]] extends Logging {
    +
    +  /** The model to be updated and used for prediction. */
    +  var model: M
    +
    +  /** The algorithm to use for updating. */
    +  val algorithm: A
    +
    +  /** Return the latest model. */
    +  def latest(): M = {
    +    model
    +  }
    +
    +  /**
    +   * Update the model by training on batches of data from a DStream.
    +   * This operation registers a DStream for training the model,
    +   * and updates the model based on every subsequent non-empty
    +   * batch of data from the stream.
    +   *
    +   * @param data DStream containing labeled data
    +   */
    +  def trainOn(data: DStream[LabeledPoint]) {
    +    data.foreachRDD{
    +      rdd =>
    +        if (rdd.count() > 0) {
    --- End diff --
    
    `count` requires one pass through the rdd. I think it should be safe to call `algorithm.run` directly. If there are errors with empty RDDs, we need to fix them in mllib.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/1361#issuecomment-50952474
  
    Yay!!!
    
    
    On Fri, Aug 1, 2014 at 8:12 PM, Xiangrui Meng <no...@github.com>
    wrote:
    
    > LGTM. Merged into master. Thanks a lot for putting Streaming and MLlib
    > together!
    >
    > —
    > Reply to this email directly or view it on GitHub
    > <https://github.com/apache/spark/pull/1361#issuecomment-50952454>.
    >


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on the pull request:

    https://github.com/apache/spark/pull/1361#issuecomment-48660791
  
    @freeman-lab This is great! Could you create a JIRA and add `[SPARK-####][MLLIB]` to the title of this PR? Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by freeman-lab <gi...@git.apache.org>.
Github user freeman-lab commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1361#discussion_r15628448
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingRegression.scala ---
    @@ -0,0 +1,83 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.regression
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.annotation.{Experimental, DeveloperApi}
    +import org.apache.spark.streaming.dstream.DStream
    +
    +/**
    + * :: DeveloperApi ::
    + * StreamingRegression implements methods for training
    + * a linear regression model on streaming data, and using it
    + * for prediction on streaming data.
    + *
    + * This class takes as type parameters a GeneralizedLinearModel,
    + * and a GeneralizedLinearAlgorithm, making it easy to extend to construct
    + * streaming versions of arbitrary regression analyses. For example usage,
    + * see StreamingLinearRegressionWithSGD.
    + *
    + */
    +@DeveloperApi
    +@Experimental
    +abstract class StreamingRegression[
    --- End diff --
    
    I guess I was going for something more general to avoid confusion, we can actually get ``StreamingLinearRegression``, ``StreamingLasso``, ``StreamingLogisiticRegression``, etc. all by extending this one class. Maybe ``StreamingLinearAlgorithm``?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1361#discussion_r15436152
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingRegression.scala ---
    @@ -0,0 +1,83 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.regression
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.annotation.{Experimental, DeveloperApi}
    +import org.apache.spark.streaming.dstream.DStream
    +
    +/**
    + * :: DeveloperApi ::
    + * StreamingRegression implements methods for training
    + * a linear regression model on streaming data, and using it
    + * for prediction on streaming data.
    + *
    + * This class takes as type parameters a GeneralizedLinearModel,
    + * and a GeneralizedLinearAlgorithm, making it easy to extend to construct
    + * streaming versions of arbitrary regression analyses. For example usage,
    + * see StreamingLinearRegressionWithSGD.
    + *
    + */
    +@DeveloperApi
    +@Experimental
    --- End diff --
    
    I don't think this needs to have `@Experimental` if it's a developer API


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on the pull request:

    https://github.com/apache/spark/pull/1361#issuecomment-49272156
  
    Jenkins, add to whitelist.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1361#discussion_r15565983
  
    --- Diff: mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala ---
    @@ -0,0 +1,127 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.regression
    +
    +import java.io.File
    +
    +import com.google.common.io.Files
    +import org.apache.commons.io.FileUtils
    +import org.scalatest.FunSuite
    +import org.apache.spark.SparkConf
    +import org.apache.spark.streaming.{Milliseconds, Seconds, StreamingContext}
    +import org.apache.spark.mllib.util.{MLStreamingUtils, LinearDataGenerator, LocalSparkContext}
    +
    +import scala.collection.mutable.ArrayBuffer
    --- End diff --
    
    move scala imports before 3rd party imports. usually the imports are organized into 4 groups in the following order:
    
    1. java imports (java.*)
    2. scala imports (scala.*)
    3. 3rd-party imports
    4. spark import (org.apache.spark.*)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on the pull request:

    https://github.com/apache/spark/pull/1361#issuecomment-50934313
  
    LGTM!!! 
    
    My only concern in the documentation, but that we can try fixing independently (but we have to). And the unit test uses files, which is not the best way to test. Integrating with streamign unit tests is going to be a bit involved, so lets leave it for the future, 1.2, as more streaming + mllib stuff gets added.
    
    This is very cool start to a hopefully new path. :D @mengxr Go ahead and merge it. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1361#issuecomment-50897333
  
    QA tests have started for PR 1361. This patch DID NOT merge cleanly! <br>View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17675/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/1361#issuecomment-48655690
  
    Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by freeman-lab <gi...@git.apache.org>.
Github user freeman-lab commented on the pull request:

    https://github.com/apache/spark/pull/1361#issuecomment-48935929
  
    @mengxr I added two tests, they check that parameter estimates are accurate, and improve over time. The tests use temporary file writing / file streams, which is clunky, but @tdas will help add dependencies on the streaming test suite so we can use its utilities instead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/1361#issuecomment-48661142
  
     Merged build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1361#discussion_r15719184
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala ---
    @@ -0,0 +1,83 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.regression
    +
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.Logging
    +import org.apache.spark.streaming.dstream.DStream
    +
    +/**
    + * :: DeveloperApi ::
    --- End diff --
    
    @freeman-lab It would be really nice to put your findings inside the implementation. That can help us to understand the behavior.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by freeman-lab <gi...@git.apache.org>.
Github user freeman-lab commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1361#discussion_r15707941
  
    --- Diff: examples/src/main/scala/org/apache/spark/examples/mllib/StreamingLinearRegression.scala ---
    @@ -0,0 +1,56 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.examples.mllib
    +
    +import org.apache.spark.SparkConf
    +import org.apache.spark.mllib.util.MLStreamingUtils
    +import org.apache.spark.mllib.regression.StreamingLinearRegressionWithSGD
    +import org.apache.spark.streaming.{Seconds, StreamingContext}
    +
    +/**
    + * Continually update a model on one stream of data using streaming linear regression,
    + * while making predictions on another stream of data
    + *
    --- End diff --
    
    That's correct, I'll make it much more clear how the example works.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1361#discussion_r15565590
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegression.scala ---
    @@ -0,0 +1,104 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.regression
    +
    +import org.apache.spark.mllib.linalg.Vectors
    +import org.apache.spark.annotation.Experimental
    +
    +/**
    + * Train or predict a linear regression model on streaming data. Training uses
    + * Stochastic Gradient Descent to update the model based on each new batch of
    + * incoming data from a DStream (see LinearRegressionWithSGD for model equation)
    + *
    + * Each batch of data is assumed to be an RDD of LabeledPoints.
    + * The number of data points per batch can vary, but the number
    + * of features must be constant.
    + */
    +@Experimental
    +class StreamingLinearRegressionWithSGD private (
    +    private var stepSize: Double,
    +    private var numIterations: Int,
    +    private var miniBatchFraction: Double,
    +    private var numFeatures: Int)
    +  extends StreamingRegression[LinearRegressionModel, LinearRegressionWithSGD] with Serializable {
    +
    +  val algorithm = new LinearRegressionWithSGD(stepSize, numIterations, miniBatchFraction)
    +
    +  var model = algorithm.createModel(Vectors.dense(new Array[Double](numFeatures)), 0.0)
    +
    +}
    +
    +/**
    + * Top-level methods for calling StreamingLinearRegressionWithSGD.
    + */
    +@Experimental
    +object StreamingLinearRegressionWithSGD {
    +
    +  /**
    +   * Start a streaming Linear Regression model by setting optimization parameters.
    +   *
    +   * @param numIterations Number of iterations of gradient descent to run.
    +   * @param stepSize Step size to be used for each iteration of gradient descent.
    +   * @param miniBatchFraction Fraction of data to be used per iteration.
    +   * @param numFeatures Number of features per record, must be constant for all batches of data.
    +   */
    +  def start(
    --- End diff --
    
    Instead of having static methods, we can define setter methods inside the class and provide a default constructor with no arguments. Then user can use a builder pattern to construct the streaming update algorithm:
    
    ~~~
    new StreamingLinearRegressionWithSGD()
      .setStepSize(0.5)
      .setNumIterations(10)
      .setInitialWeights(Vectors.dense(...))
      .trainOn(DStream)
    ~~~


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1361#issuecomment-50949034
  
    QA results for PR 1361:<br>- This patch PASSES unit tests.<br>- This patch merges cleanly<br>- This patch adds the following public classes (experimental):<br>class StreamingLinearRegressionWithSGD (<br><br>For more information see test ouptut:<br>https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17718/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by freeman-lab <gi...@git.apache.org>.
Github user freeman-lab commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1361#discussion_r15628281
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingRegression.scala ---
    @@ -0,0 +1,83 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.regression
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.annotation.{Experimental, DeveloperApi}
    +import org.apache.spark.streaming.dstream.DStream
    +
    +/**
    + * :: DeveloperApi ::
    + * StreamingRegression implements methods for training
    + * a linear regression model on streaming data, and using it
    + * for prediction on streaming data.
    + *
    + * This class takes as type parameters a GeneralizedLinearModel,
    + * and a GeneralizedLinearAlgorithm, making it easy to extend to construct
    + * streaming versions of arbitrary regression analyses. For example usage,
    + * see StreamingLinearRegressionWithSGD.
    + *
    + */
    +@DeveloperApi
    +@Experimental
    +abstract class StreamingRegression[
    +    M <: GeneralizedLinearModel,
    +    A <: GeneralizedLinearAlgorithm[M]] extends Logging {
    +
    +  /** The model to be updated and used for prediction. */
    +  var model: M
    +
    +  /** The algorithm to use for updating. */
    +  val algorithm: A
    +
    +  /** Return the latest model. */
    +  def latest(): M = {
    +    model
    +  }
    +
    +  /**
    +   * Update the model by training on batches of data from a DStream.
    +   * This operation registers a DStream for training the model,
    +   * and updates the model based on every subsequent non-empty
    +   * batch of data from the stream.
    +   *
    +   * @param data DStream containing labeled data
    +   */
    +  def trainOn(data: DStream[LabeledPoint]) {
    +    data.foreachRDD{
    +      rdd =>
    +        if (rdd.count() > 0) {
    +          model = algorithm.run(rdd, model.weights)
    +          logInfo("Model updated")
    --- End diff --
    
    great idea!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on the pull request:

    https://github.com/apache/spark/pull/1361#issuecomment-48660846
  
    Jenkins, test this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1361#issuecomment-49272423
  
    QA tests have started for PR 1361. This patch merges cleanly. <br>View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16774/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1361#issuecomment-50928131
  
    QA results for PR 1361:<br>- This patch PASSES unit tests.<br>- This patch merges cleanly<br>- This patch adds no public classes<br><br>For more information see test ouptut:<br>https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17690/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1361#discussion_r15630714
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingRegression.scala ---
    @@ -0,0 +1,83 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.regression
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.annotation.{Experimental, DeveloperApi}
    +import org.apache.spark.streaming.dstream.DStream
    +
    +/**
    + * :: DeveloperApi ::
    + * StreamingRegression implements methods for training
    + * a linear regression model on streaming data, and using it
    + * for prediction on streaming data.
    + *
    + * This class takes as type parameters a GeneralizedLinearModel,
    + * and a GeneralizedLinearAlgorithm, making it easy to extend to construct
    + * streaming versions of arbitrary regression analyses. For example usage,
    + * see StreamingLinearRegressionWithSGD.
    + *
    + */
    +@DeveloperApi
    +@Experimental
    +abstract class StreamingRegression[
    --- End diff --
    
    Sounds good.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by freeman-lab <gi...@git.apache.org>.
Github user freeman-lab commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1361#discussion_r15724418
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala ---
    @@ -162,45 +162,55 @@ object GradientDescent extends Logging {
         val numExamples = data.count()
         val miniBatchSize = numExamples * miniBatchFraction
     
    -    // Initialize weights as a column vector
    -    var weights = Vectors.dense(initialWeights.toArray)
    -    val n = weights.size
    -
    -    /**
    -     * For the first iteration, the regVal will be initialized as sum of weight squares
    -     * if it's L2 updater; for L1 updater, the same logic is followed.
    -     */
    -    var regVal = updater.compute(
    -      weights, Vectors.dense(new Array[Double](weights.size)), 0, 1, regParam)._2
    -
    -    for (i <- 1 to numIterations) {
    -      val bcWeights = data.context.broadcast(weights)
    -      // Sample a subset (fraction miniBatchFraction) of the total data
    -      // compute and sum up the subgradients on this subset (this is one map-reduce)
    -      val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42 + i)
    -        .treeAggregate((BDV.zeros[Double](n), 0.0))(
    -          seqOp = (c, v) => (c, v) match { case ((grad, loss), (label, features)) =>
    -            val l = gradient.compute(features, label, bcWeights.value, Vectors.fromBreeze(grad))
    -            (grad, loss + l)
    -          },
    -          combOp = (c1, c2) => (c1, c2) match { case ((grad1, loss1), (grad2, loss2)) =>
    -            (grad1 += grad2, loss1 + loss2)
    -          })
    +    // if no data, return initial weights to avoid NaNs
    +    if (numExamples == 0) {
    +
    +      logInfo("GradientDescent.runMiniBatchSGD returning initial weights, no data found")
    +      (initialWeights, stochasticLossHistory.toArray)
    +
    +    } else {
    +
    +      // Initialize weights as a column vector
    +      var weights = Vectors.dense(initialWeights.toArray)
    +      val n = weights.size
     
           /**
    -       * NOTE(Xinghao): lossSum is computed using the weights from the previous iteration
    -       * and regVal is the regularization value computed in the previous iteration as well.
    +       * For the first iteration, the regVal will be initialized as sum of weight squares
    +       * if it's L2 updater; for L1 updater, the same logic is followed.
            */
    -      stochasticLossHistory.append(lossSum / miniBatchSize + regVal)
    -      val update = updater.compute(
    -        weights, Vectors.fromBreeze(gradientSum / miniBatchSize), stepSize, i, regParam)
    -      weights = update._1
    -      regVal = update._2
    +      var regVal = updater.compute(
    +        weights, Vectors.dense(new Array[Double](weights.size)), 0, 1, regParam)._2
    +
    +      for (i <- 1 to numIterations) {
    +        // Sample a subset (fraction miniBatchFraction) of the total data
    +        // compute and sum up the subgradients on this subset (this is one map-reduce)
    +        val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42 + i)
    +          .aggregate((BDV.zeros[Double](weights.size), 0.0))(
    --- End diff --
    
    Same for broadcasting, sorry, fixing...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1361#issuecomment-50950926
  
    QA tests have started for PR 1361. This patch merges cleanly. <br>View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17732/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by freeman-lab <gi...@git.apache.org>.
Github user freeman-lab commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1361#discussion_r15725743
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala ---
    @@ -0,0 +1,94 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.regression
    +
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.Logging
    +import org.apache.spark.streaming.dstream.DStream
    +
    +/**
    + * :: DeveloperApi ::
    + * StreamingLinearAlgorithm implements methods for continuously
    + * training a generalized linear model model on streaming data,
    + * and using it for prediction on (possibly different) streaming data.
    + *
    + * This class takes as type parameters a GeneralizedLinearModel,
    + * and a GeneralizedLinearAlgorithm, making it easy to extend to construct
    + * streaming versions of any analyses using GLMs. Only weights will be updated,
    + * not an intercept. If the model needs an intercept, it should be manually appended
    + * to the input data.
    + *
    + * For example usage, see `StreamingLinearRegressionWithSGD`.
    + *
    + * NOTE(Freeman): In some use cases, the order in which trainOn and predictOn
    + * are called in an application will affect the results. When called on
    + * the same DStream, if trainOn is called before predictOn, when new data
    + * arrive the model will update and the prediction will be based on the new
    + * model. Whereas if predictOn is called first, the prediction will use the model
    + * from the previous update.
    + *
    + * NOTE(Freeman): It is ok to call predictOn repeatedly on multiple streams; this
    + * will generate predictions for each one all using the current model.
    + * It is also ok to call trainOn on different streams; this will update
    + * the model using each of the different sources, in sequence.
    + *
    + */
    +@DeveloperApi
    +abstract class StreamingLinearAlgorithm[
    +    M <: GeneralizedLinearModel,
    +    A <: GeneralizedLinearAlgorithm[M]] extends Logging {
    +
    +  /** The model to be updated and used for prediction. */
    +  protected var model: M
    +
    +  /** The algorithm to use for updating. */
    +  protected val algorithm: A
    +
    +  /** Return the latest model. */
    +  def latestModel(): M = {
    +    model
    +  }
    +
    +  /**
    +   * Update the model by training on batches of data from a DStream.
    +   * This operation registers a DStream for training the model,
    +   * and updates the model based on every subsequent
    +   * batch of data from the stream.
    +   *
    +   * @param data DStream containing labeled data
    +   */
    +  def trainOn(data: DStream[LabeledPoint]) {
    +    data.foreachRDD { (rdd, time) =>
    +        model = algorithm.run(rdd, model.weights)
    +        logInfo("Model updated at time %s".format(time.toString))
    +        logInfo("Current model: weights, %s".format(
    +          model.weights.toArray.take(100).mkString("[", ",", "]")))
    --- End diff --
    
    Good catch!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1361#discussion_r15725538
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionWithSGD.scala ---
    @@ -0,0 +1,86 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.regression
    +
    +import org.apache.spark.annotation.Experimental
    +import org.apache.spark.mllib.linalg.{Vector, Vectors}
    +
    +/**
    + * Train or predict a linear regression model on streaming data. Training uses
    + * Stochastic Gradient Descent to update the model based on each new batch of
    + * incoming data from a DStream (see `LinearRegressionWithSGD` for model equation)
    + *
    + * Each batch of data is assumed to be an RDD of LabeledPoints.
    + * The number of data points per batch can vary, but the number
    + * of features must be constant. An initial weight
    + * vector must be provided.
    + *
    + * Use a builder pattern to construct a streaming linear regression
    + * analysis in an application, like:
    + *
    + *  val model = new StreamingLinearRegressionWithSGD()
    + *    .setStepSize(0.5)
    + *    .setNumIterations(10)
    + *    .setInitialWeights(Vectors.dense(...))
    + *    .trainOn(DStream)
    + *
    + */
    +@Experimental
    +class StreamingLinearRegressionWithSGD (
    +    private var stepSize: Double,
    +    private var numIterations: Int,
    +    private var miniBatchFraction: Double,
    +    private var initialWeights: Vector)
    +  extends StreamingLinearAlgorithm[
    +    LinearRegressionModel, LinearRegressionWithSGD] with Serializable {
    +
    +  /**
    +   * Construct a StreamingLinearRegression object with default parameters:
    +   * {stepSize: 0.1, numIterations: 50, miniBatchFraction: 1.0, initialWeights: [0.0, 0.0]}.
    +   */
    +  def this() = this(0.1, 50, 1.0, Vectors.dense(0.0, 0.0))
    --- End diff --
    
    Yes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1361#issuecomment-50922127
  
    QA tests have started for PR 1361. This patch merges cleanly. <br>View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17690/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/1361#issuecomment-48661282
  
    
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16516/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/1361#issuecomment-48661281
  
    Merged build finished. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1361#issuecomment-50946141
  
    QA tests have started for PR 1361. This patch merges cleanly. <br>View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17716/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by freeman-lab <gi...@git.apache.org>.
Github user freeman-lab commented on the pull request:

    https://github.com/apache/spark/pull/1361#issuecomment-50943077
  
    @mengxr done! removed the static methods (and made the class public), and added those usage notes to ``StreamingLinearAlgorithm``


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1361#discussion_r15565687
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingRegression.scala ---
    @@ -0,0 +1,83 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.regression
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.annotation.{Experimental, DeveloperApi}
    +import org.apache.spark.streaming.dstream.DStream
    +
    +/**
    + * :: DeveloperApi ::
    + * StreamingRegression implements methods for training
    + * a linear regression model on streaming data, and using it
    + * for prediction on streaming data.
    + *
    + * This class takes as type parameters a GeneralizedLinearModel,
    + * and a GeneralizedLinearAlgorithm, making it easy to extend to construct
    + * streaming versions of arbitrary regression analyses. For example usage,
    + * see StreamingLinearRegressionWithSGD.
    + *
    + */
    +@DeveloperApi
    +@Experimental
    +abstract class StreamingRegression[
    +    M <: GeneralizedLinearModel,
    +    A <: GeneralizedLinearAlgorithm[M]] extends Logging {
    +
    +  /** The model to be updated and used for prediction. */
    +  var model: M
    +
    +  /** The algorithm to use for updating. */
    +  val algorithm: A
    --- End diff --
    
    Also `protected`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1361#discussion_r15565489
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingRegression.scala ---
    @@ -0,0 +1,83 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.regression
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.annotation.{Experimental, DeveloperApi}
    +import org.apache.spark.streaming.dstream.DStream
    +
    +/**
    + * :: DeveloperApi ::
    + * StreamingRegression implements methods for training
    + * a linear regression model on streaming data, and using it
    + * for prediction on streaming data.
    + *
    + * This class takes as type parameters a GeneralizedLinearModel,
    + * and a GeneralizedLinearAlgorithm, making it easy to extend to construct
    + * streaming versions of arbitrary regression analyses. For example usage,
    + * see StreamingLinearRegressionWithSGD.
    + *
    + */
    +@DeveloperApi
    +@Experimental
    +abstract class StreamingRegression[
    --- End diff --
    
    Since it only applies to linear regression, maybe `StreamingLinearRegression` is a better name.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1361#discussion_r15724885
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionWithSGD.scala ---
    @@ -0,0 +1,86 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.regression
    +
    +import org.apache.spark.annotation.Experimental
    +import org.apache.spark.mllib.linalg.{Vector, Vectors}
    +
    +/**
    + * Train or predict a linear regression model on streaming data. Training uses
    + * Stochastic Gradient Descent to update the model based on each new batch of
    + * incoming data from a DStream (see `LinearRegressionWithSGD` for model equation)
    + *
    + * Each batch of data is assumed to be an RDD of LabeledPoints.
    + * The number of data points per batch can vary, but the number
    + * of features must be constant. An initial weight
    + * vector must be provided.
    + *
    + * Use a builder pattern to construct a streaming linear regression
    + * analysis in an application, like:
    + *
    + *  val model = new StreamingLinearRegressionWithSGD()
    + *    .setStepSize(0.5)
    + *    .setNumIterations(10)
    + *    .setInitialWeights(Vectors.dense(...))
    + *    .trainOn(DStream)
    + *
    + */
    +@Experimental
    +class StreamingLinearRegressionWithSGD (
    +    private var stepSize: Double,
    +    private var numIterations: Int,
    +    private var miniBatchFraction: Double,
    +    private var initialWeights: Vector)
    +  extends StreamingLinearAlgorithm[
    +    LinearRegressionModel, LinearRegressionWithSGD] with Serializable {
    +
    +  /**
    +   * Construct a StreamingLinearRegression object with default parameters:
    +   * {stepSize: 0.1, numIterations: 50, miniBatchFraction: 1.0, initialWeights: [0.0, 0.0]}.
    +   */
    +  def this() = this(0.1, 50, 1.0, Vectors.dense(0.0, 0.0))
    --- End diff --
    
    Another option is to make it a required argument:
    
    ~~~
    def this(w0: Vector) = this(0.1, 50, 1.0, w0)
    ~~~


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1361#discussion_r15724866
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionWithSGD.scala ---
    @@ -0,0 +1,86 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.regression
    +
    +import org.apache.spark.annotation.Experimental
    +import org.apache.spark.mllib.linalg.{Vector, Vectors}
    +
    +/**
    + * Train or predict a linear regression model on streaming data. Training uses
    + * Stochastic Gradient Descent to update the model based on each new batch of
    + * incoming data from a DStream (see `LinearRegressionWithSGD` for model equation)
    + *
    + * Each batch of data is assumed to be an RDD of LabeledPoints.
    + * The number of data points per batch can vary, but the number
    + * of features must be constant. An initial weight
    + * vector must be provided.
    + *
    + * Use a builder pattern to construct a streaming linear regression
    + * analysis in an application, like:
    + *
    + *  val model = new StreamingLinearRegressionWithSGD()
    + *    .setStepSize(0.5)
    + *    .setNumIterations(10)
    + *    .setInitialWeights(Vectors.dense(...))
    + *    .trainOn(DStream)
    + *
    + */
    +@Experimental
    +class StreamingLinearRegressionWithSGD (
    +    private var stepSize: Double,
    +    private var numIterations: Int,
    +    private var miniBatchFraction: Double,
    +    private var initialWeights: Vector)
    +  extends StreamingLinearAlgorithm[
    +    LinearRegressionModel, LinearRegressionWithSGD] with Serializable {
    +
    +  /**
    +   * Construct a StreamingLinearRegression object with default parameters:
    +   * {stepSize: 0.1, numIterations: 50, miniBatchFraction: 1.0, initialWeights: [0.0, 0.0]}.
    +   */
    +  def this() = this(0.1, 50, 1.0, Vectors.dense(0.0, 0.0))
    --- End diff --
    
    We shouldn't assume the number of features is 2. We can default to `null` and throw an error if this is not set when `trainOn` is called.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by freeman-lab <gi...@git.apache.org>.
Github user freeman-lab commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1361#discussion_r15671040
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingRegression.scala ---
    @@ -0,0 +1,83 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.regression
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.annotation.{Experimental, DeveloperApi}
    +import org.apache.spark.streaming.dstream.DStream
    +
    +/**
    + * :: DeveloperApi ::
    + * StreamingRegression implements methods for training
    + * a linear regression model on streaming data, and using it
    + * for prediction on streaming data.
    + *
    + * This class takes as type parameters a GeneralizedLinearModel,
    + * and a GeneralizedLinearAlgorithm, making it easy to extend to construct
    + * streaming versions of arbitrary regression analyses. For example usage,
    + * see StreamingLinearRegressionWithSGD.
    + *
    + */
    +@DeveloperApi
    +@Experimental
    +abstract class StreamingRegression[
    +    M <: GeneralizedLinearModel,
    +    A <: GeneralizedLinearAlgorithm[M]] extends Logging {
    +
    +  /** The model to be updated and used for prediction. */
    +  var model: M
    +
    +  /** The algorithm to use for updating. */
    +  val algorithm: A
    +
    +  /** Return the latest model. */
    +  def latest(): M = {
    +    model
    +  }
    +
    +  /**
    +   * Update the model by training on batches of data from a DStream.
    +   * This operation registers a DStream for training the model,
    +   * and updates the model based on every subsequent non-empty
    +   * batch of data from the stream.
    +   *
    +   * @param data DStream containing labeled data
    +   */
    +  def trainOn(data: DStream[LabeledPoint]) {
    +    data.foreachRDD{
    +      rdd =>
    +        if (rdd.count() > 0) {
    --- End diff --
    
    For sure!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on the pull request:

    https://github.com/apache/spark/pull/1361#issuecomment-48660827
  
    Jenkins, add to whitelist.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1361#discussion_r15565612
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingRegression.scala ---
    @@ -0,0 +1,83 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.regression
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.annotation.{Experimental, DeveloperApi}
    +import org.apache.spark.streaming.dstream.DStream
    +
    +/**
    + * :: DeveloperApi ::
    + * StreamingRegression implements methods for training
    + * a linear regression model on streaming data, and using it
    + * for prediction on streaming data.
    + *
    + * This class takes as type parameters a GeneralizedLinearModel,
    + * and a GeneralizedLinearAlgorithm, making it easy to extend to construct
    + * streaming versions of arbitrary regression analyses. For example usage,
    + * see StreamingLinearRegressionWithSGD.
    + *
    + */
    +@DeveloperApi
    +@Experimental
    +abstract class StreamingRegression[
    +    M <: GeneralizedLinearModel,
    +    A <: GeneralizedLinearAlgorithm[M]] extends Logging {
    +
    +  /** The model to be updated and used for prediction. */
    +  var model: M
    +
    +  /** The algorithm to use for updating. */
    +  val algorithm: A
    +
    +  /** Return the latest model. */
    +  def latest(): M = {
    --- End diff --
    
    Call it `latestModel` to be more specific?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1361#discussion_r15565864
  
    --- Diff: mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala ---
    @@ -0,0 +1,127 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.regression
    +
    +import java.io.File
    +
    +import com.google.common.io.Files
    +import org.apache.commons.io.FileUtils
    +import org.scalatest.FunSuite
    +import org.apache.spark.SparkConf
    +import org.apache.spark.streaming.{Milliseconds, Seconds, StreamingContext}
    +import org.apache.spark.mllib.util.{MLStreamingUtils, LinearDataGenerator, LocalSparkContext}
    +
    +import scala.collection.mutable.ArrayBuffer
    +
    +class StreamingLinearRegressionSuite extends FunSuite {
    --- End diff --
    
    You can use `with LocalSparkContext` and construct `StreamingContext` by `new StreamingContext(sc, duration)`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1361#discussion_r15565789
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/util/MLStreamingUtils.scala ---
    @@ -0,0 +1,42 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.util
    +
    +import org.apache.spark.annotation.Experimental
    +import org.apache.spark.streaming.dstream.DStream
    +import org.apache.spark.streaming.StreamingContext
    +import org.apache.spark.mllib.regression.{LabeledPointParser, LabeledPoint}
    --- End diff --
    
    Please order the imports alphabetically. There is an Intellij plugin that automates this task: http://plugins.jetbrains.com/plugin/7350?pr=


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/1361#issuecomment-48661153
  
    Merged build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1361#discussion_r15565834
  
    --- Diff: mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala ---
    @@ -0,0 +1,127 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.regression
    +
    +import java.io.File
    +
    +import com.google.common.io.Files
    +import org.apache.commons.io.FileUtils
    +import org.scalatest.FunSuite
    +import org.apache.spark.SparkConf
    +import org.apache.spark.streaming.{Milliseconds, Seconds, StreamingContext}
    +import org.apache.spark.mllib.util.{MLStreamingUtils, LinearDataGenerator, LocalSparkContext}
    +
    +import scala.collection.mutable.ArrayBuffer
    +
    +class StreamingLinearRegressionSuite extends FunSuite {
    +
    +  // Assert that two values are equal within tolerance epsilon
    +  def assertEqual(v1: Double, v2: Double, epsilon: Double) {
    +    def errorMessage = v1.toString + " did not equal " + v2.toString
    +    assert(math.abs(v1-v2) <= epsilon, errorMessage)
    +  }
    +
    +  // Assert that model predictions are correct
    +  def validatePrediction(predictions: Seq[Double], input: Seq[LabeledPoint]) {
    +    val numOffPredictions = predictions.zip(input).count { case (prediction, expected) =>
    +      // A prediction is off if the prediction is more than 0.5 away from expected value.
    +      math.abs(prediction - expected.label) > 0.5
    +    }
    +    // At least 80% of the predictions should be on.
    +    assert(numOffPredictions < input.length / 5)
    +  }
    +
    +  // Test if we can accurately learn Y = 10*X1 + 10*X2 on streaming data
    +  test("streaming linear regression parameter accuracy") {
    +
    +    val conf = new SparkConf().setMaster("local").setAppName("streaming test")
    +    val testDir = Files.createTempDir()
    +    val numBatches = 10
    +    val ssc = new StreamingContext(conf, Seconds(1))
    +    val data = MLStreamingUtils.loadLabeledPointsFromText(ssc, testDir.toString)
    +    val model = StreamingLinearRegressionWithSGD.start(numFeatures=2, numIterations=50)
    +
    +    model.trainOn(data)
    +
    +    ssc.start()
    +
    +    // write data to a file stream
    +    Thread.sleep(5000)
    --- End diff --
    
    Could you explain why this is needed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1361#discussion_r15565728
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingRegression.scala ---
    @@ -0,0 +1,83 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.regression
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.annotation.{Experimental, DeveloperApi}
    +import org.apache.spark.streaming.dstream.DStream
    +
    +/**
    + * :: DeveloperApi ::
    + * StreamingRegression implements methods for training
    + * a linear regression model on streaming data, and using it
    + * for prediction on streaming data.
    + *
    + * This class takes as type parameters a GeneralizedLinearModel,
    + * and a GeneralizedLinearAlgorithm, making it easy to extend to construct
    + * streaming versions of arbitrary regression analyses. For example usage,
    + * see StreamingLinearRegressionWithSGD.
    + *
    + */
    +@DeveloperApi
    +@Experimental
    +abstract class StreamingRegression[
    +    M <: GeneralizedLinearModel,
    +    A <: GeneralizedLinearAlgorithm[M]] extends Logging {
    +
    +  /** The model to be updated and used for prediction. */
    +  var model: M
    +
    +  /** The algorithm to use for updating. */
    +  val algorithm: A
    +
    +  /** Return the latest model. */
    +  def latest(): M = {
    +    model
    +  }
    +
    +  /**
    +   * Update the model by training on batches of data from a DStream.
    +   * This operation registers a DStream for training the model,
    +   * and updates the model based on every subsequent non-empty
    +   * batch of data from the stream.
    +   *
    +   * @param data DStream containing labeled data
    +   */
    +  def trainOn(data: DStream[LabeledPoint]) {
    +    data.foreachRDD{
    +      rdd =>
    +        if (rdd.count() > 0) {
    +          model = algorithm.run(rdd, model.weights)
    +          logInfo("Model updated")
    +        }
    +        logInfo("Current model: weights, %s".format(model.weights.toString))
    --- End diff --
    
    `weights` may contain millions of values. We can output the first 100, for example.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on the pull request:

    https://github.com/apache/spark/pull/1361#issuecomment-49272169
  
    Jenkins, test this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on the pull request:

    https://github.com/apache/spark/pull/1361#issuecomment-50898744
  
    @freeman-lab Could you try to merge the latest master and resolve conflicts? It may be because of the change to constructors.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1361#discussion_r15708929
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala ---
    @@ -193,6 +195,17 @@ object MLUtils {
         loadLabeledPoints(sc, dir, sc.defaultMinPartitions)
     
       /**
    +   * Loads streaming labeled points from a stream of text files
    +   * where points are in the same format as used in `RDD[LabeledPoint].saveAsTextFile`.
    +   *
    --- End diff --
    
    Can you add, see `StreamingContext.textFileStream` for more details about the how to generate a stream from files.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by freeman-lab <gi...@git.apache.org>.
Github user freeman-lab commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1361#discussion_r15724226
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala ---
    @@ -162,45 +162,55 @@ object GradientDescent extends Logging {
         val numExamples = data.count()
         val miniBatchSize = numExamples * miniBatchFraction
     
    -    // Initialize weights as a column vector
    -    var weights = Vectors.dense(initialWeights.toArray)
    -    val n = weights.size
    -
    -    /**
    -     * For the first iteration, the regVal will be initialized as sum of weight squares
    -     * if it's L2 updater; for L1 updater, the same logic is followed.
    -     */
    -    var regVal = updater.compute(
    -      weights, Vectors.dense(new Array[Double](weights.size)), 0, 1, regParam)._2
    -
    -    for (i <- 1 to numIterations) {
    -      val bcWeights = data.context.broadcast(weights)
    -      // Sample a subset (fraction miniBatchFraction) of the total data
    -      // compute and sum up the subgradients on this subset (this is one map-reduce)
    -      val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42 + i)
    -        .treeAggregate((BDV.zeros[Double](n), 0.0))(
    -          seqOp = (c, v) => (c, v) match { case ((grad, loss), (label, features)) =>
    -            val l = gradient.compute(features, label, bcWeights.value, Vectors.fromBreeze(grad))
    -            (grad, loss + l)
    -          },
    -          combOp = (c1, c2) => (c1, c2) match { case ((grad1, loss1), (grad2, loss2)) =>
    -            (grad1 += grad2, loss1 + loss2)
    -          })
    +    // if no data, return initial weights to avoid NaNs
    +    if (numExamples == 0) {
    +
    +      logInfo("GradientDescent.runMiniBatchSGD returning initial weights, no data found")
    +      (initialWeights, stochasticLossHistory.toArray)
    +
    +    } else {
    +
    +      // Initialize weights as a column vector
    +      var weights = Vectors.dense(initialWeights.toArray)
    +      val n = weights.size
     
           /**
    -       * NOTE(Xinghao): lossSum is computed using the weights from the previous iteration
    -       * and regVal is the regularization value computed in the previous iteration as well.
    +       * For the first iteration, the regVal will be initialized as sum of weight squares
    +       * if it's L2 updater; for L1 updater, the same logic is followed.
            */
    -      stochasticLossHistory.append(lossSum / miniBatchSize + regVal)
    -      val update = updater.compute(
    -        weights, Vectors.fromBreeze(gradientSum / miniBatchSize), stepSize, i, regParam)
    -      weights = update._1
    -      regVal = update._2
    +      var regVal = updater.compute(
    +        weights, Vectors.dense(new Array[Double](weights.size)), 0, 1, regParam)._2
    +
    +      for (i <- 1 to numIterations) {
    +        // Sample a subset (fraction miniBatchFraction) of the total data
    +        // compute and sum up the subgradients on this subset (this is one map-reduce)
    +        val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42 + i)
    +          .aggregate((BDV.zeros[Double](weights.size), 0.0))(
    --- End diff --
    
    It's totally fine, I might have lost it in the merge, put it back.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1361#discussion_r15723222
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala ---
    @@ -162,45 +162,55 @@ object GradientDescent extends Logging {
         val numExamples = data.count()
         val miniBatchSize = numExamples * miniBatchFraction
     
    -    // Initialize weights as a column vector
    -    var weights = Vectors.dense(initialWeights.toArray)
    -    val n = weights.size
    -
    -    /**
    -     * For the first iteration, the regVal will be initialized as sum of weight squares
    -     * if it's L2 updater; for L1 updater, the same logic is followed.
    -     */
    -    var regVal = updater.compute(
    -      weights, Vectors.dense(new Array[Double](weights.size)), 0, 1, regParam)._2
    -
    -    for (i <- 1 to numIterations) {
    -      val bcWeights = data.context.broadcast(weights)
    -      // Sample a subset (fraction miniBatchFraction) of the total data
    -      // compute and sum up the subgradients on this subset (this is one map-reduce)
    -      val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42 + i)
    -        .treeAggregate((BDV.zeros[Double](n), 0.0))(
    -          seqOp = (c, v) => (c, v) match { case ((grad, loss), (label, features)) =>
    -            val l = gradient.compute(features, label, bcWeights.value, Vectors.fromBreeze(grad))
    -            (grad, loss + l)
    -          },
    -          combOp = (c1, c2) => (c1, c2) match { case ((grad1, loss1), (grad2, loss2)) =>
    -            (grad1 += grad2, loss1 + loss2)
    -          })
    +    // if no data, return initial weights to avoid NaNs
    +    if (numExamples == 0) {
    +
    +      logInfo("GradientDescent.runMiniBatchSGD returning initial weights, no data found")
    +      (initialWeights, stochasticLossHistory.toArray)
    +
    +    } else {
    +
    +      // Initialize weights as a column vector
    +      var weights = Vectors.dense(initialWeights.toArray)
    +      val n = weights.size
     
           /**
    -       * NOTE(Xinghao): lossSum is computed using the weights from the previous iteration
    -       * and regVal is the regularization value computed in the previous iteration as well.
    +       * For the first iteration, the regVal will be initialized as sum of weight squares
    +       * if it's L2 updater; for L1 updater, the same logic is followed.
            */
    -      stochasticLossHistory.append(lossSum / miniBatchSize + regVal)
    -      val update = updater.compute(
    -        weights, Vectors.fromBreeze(gradientSum / miniBatchSize), stepSize, i, regParam)
    -      weights = update._1
    -      regVal = update._2
    +      var regVal = updater.compute(
    +        weights, Vectors.dense(new Array[Double](weights.size)), 0, 1, regParam)._2
    +
    +      for (i <- 1 to numIterations) {
    +        // Sample a subset (fraction miniBatchFraction) of the total data
    +        // compute and sum up the subgradients on this subset (this is one map-reduce)
    +        val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42 + i)
    +          .aggregate((BDV.zeros[Double](weights.size), 0.0))(
    --- End diff --
    
    `aggregate` -> `.treeAggregate`. We use a tree pattern to avoid sending too much data to the driver. Does it hurt streaming update performance? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1361#discussion_r15630602
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingRegression.scala ---
    @@ -0,0 +1,83 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.regression
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.annotation.{Experimental, DeveloperApi}
    +import org.apache.spark.streaming.dstream.DStream
    +
    +/**
    + * :: DeveloperApi ::
    + * StreamingRegression implements methods for training
    + * a linear regression model on streaming data, and using it
    + * for prediction on streaming data.
    + *
    + * This class takes as type parameters a GeneralizedLinearModel,
    + * and a GeneralizedLinearAlgorithm, making it easy to extend to construct
    + * streaming versions of arbitrary regression analyses. For example usage,
    + * see StreamingLinearRegressionWithSGD.
    + *
    + */
    +@DeveloperApi
    +@Experimental
    +abstract class StreamingRegression[
    +    M <: GeneralizedLinearModel,
    +    A <: GeneralizedLinearAlgorithm[M]] extends Logging {
    +
    +  /** The model to be updated and used for prediction. */
    +  var model: M
    +
    +  /** The algorithm to use for updating. */
    +  val algorithm: A
    +
    +  /** Return the latest model. */
    +  def latest(): M = {
    +    model
    +  }
    +
    +  /**
    +   * Update the model by training on batches of data from a DStream.
    +   * This operation registers a DStream for training the model,
    +   * and updates the model based on every subsequent non-empty
    +   * batch of data from the stream.
    +   *
    +   * @param data DStream containing labeled data
    +   */
    +  def trainOn(data: DStream[LabeledPoint]) {
    +    data.foreachRDD{
    +      rdd =>
    +        if (rdd.count() > 0) {
    --- End diff --
    
    That sounds good! Do you mind including that change in this PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1361#discussion_r15718670
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala ---
    @@ -0,0 +1,83 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.regression
    +
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.Logging
    +import org.apache.spark.streaming.dstream.DStream
    +
    +/**
    + * :: DeveloperApi ::
    --- End diff --
    
    Whoa this is quite a lot to grasp. 
    I am not sure how in the second case the order doesnt matter. Gotta think about it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by freeman-lab <gi...@git.apache.org>.
Github user freeman-lab commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1361#discussion_r15718348
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearAlgorithm.scala ---
    @@ -0,0 +1,83 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.regression
    +
    +import org.apache.spark.annotation.DeveloperApi
    +import org.apache.spark.Logging
    +import org.apache.spark.streaming.dstream.DStream
    +
    +/**
    + * :: DeveloperApi ::
    --- End diff --
    
    I've been testing this and it seems fairly robust, agreed we should clarify in the documentation. 
    
    What I've tried:
    
    - If train and predict are called on the same stream (or on two streams with data arriving simultaneously), order matters. If trainOn is first, the prediction will always use the subsequently updated model. If predictOn is first, it will use the model from the previous update. In practice, over multiple updates, either behavior seems reasonable, but maybe there should be a helpful warning if the user calls predictOn before trainOn?
    
    - If they are called on different streams and the data arrive sequentially, order doesn't matter. For example, if data arrive in the predictOn stream before the trainOn stream, the prediction uses the intial weights (as it should) to predict, regardless of the order of the calls.
    
    - It's ok, and maybe useful, to call predictOn repeatedly on different streams. For example, training on one stream, and predicting on it and another, behaves correctly (modolu the ordering issues described above).
    
    - If you call trainOn repeatedly on different streams, it will do an update when data arrive in either stream, which seems fine. Could be used to update using multiple input sources.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by freeman-lab <gi...@git.apache.org>.
Github user freeman-lab commented on the pull request:

    https://github.com/apache/spark/pull/1361#issuecomment-50647207
  
    @mengxr and @mateiz thanks for the feedback! I've started working on fixing all this. I'll leave a couple notes / questions above, otherwise changes are great and I'll incorporate as is.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1361#discussion_r15566077
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingRegression.scala ---
    @@ -0,0 +1,83 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.regression
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.annotation.{Experimental, DeveloperApi}
    +import org.apache.spark.streaming.dstream.DStream
    +
    +/**
    + * :: DeveloperApi ::
    + * StreamingRegression implements methods for training
    + * a linear regression model on streaming data, and using it
    + * for prediction on streaming data.
    + *
    + * This class takes as type parameters a GeneralizedLinearModel,
    + * and a GeneralizedLinearAlgorithm, making it easy to extend to construct
    + * streaming versions of arbitrary regression analyses. For example usage,
    + * see StreamingLinearRegressionWithSGD.
    + *
    + */
    +@DeveloperApi
    +@Experimental
    +abstract class StreamingRegression[
    +    M <: GeneralizedLinearModel,
    +    A <: GeneralizedLinearAlgorithm[M]] extends Logging {
    +
    +  /** The model to be updated and used for prediction. */
    +  var model: M
    +
    +  /** The algorithm to use for updating. */
    +  val algorithm: A
    +
    +  /** Return the latest model. */
    +  def latest(): M = {
    +    model
    +  }
    +
    +  /**
    +   * Update the model by training on batches of data from a DStream.
    +   * This operation registers a DStream for training the model,
    +   * and updates the model based on every subsequent non-empty
    +   * batch of data from the stream.
    +   *
    +   * @param data DStream containing labeled data
    +   */
    +  def trainOn(data: DStream[LabeledPoint]) {
    +    data.foreachRDD{
    +      rdd =>
    +        if (rdd.count() > 0) {
    +          model = algorithm.run(rdd, model.weights)
    +          logInfo("Model updated")
    --- End diff --
    
    Maybe we can add more information to it, for example, the current time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on the pull request:

    https://github.com/apache/spark/pull/1361#issuecomment-48803931
  
    @freeman-lab Could you add some unit tests? There should be some examples under streaming and mllib.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1361#issuecomment-50914178
  
    QA results for PR 1361:<br>- This patch PASSES unit tests.<br>- This patch merges cleanly<br>- This patch adds no public classes<br><br>For more information see test ouptut:<br>https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17681/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1361#discussion_r15561528
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegression.scala ---
    @@ -0,0 +1,104 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.regression
    +
    +import org.apache.spark.mllib.linalg.Vectors
    +import org.apache.spark.annotation.Experimental
    --- End diff --
    
    The imports should be ordered alphabetically.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by freeman-lab <gi...@git.apache.org>.
Github user freeman-lab commented on the pull request:

    https://github.com/apache/spark/pull/1361#issuecomment-50908914
  
    @mengxr think I fixed it, issue was in ``GradientDescent`` (where I added the check for an empty RDD)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by freeman-lab <gi...@git.apache.org>.
Github user freeman-lab commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1361#discussion_r15724230
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala ---
    @@ -162,45 +162,55 @@ object GradientDescent extends Logging {
         val numExamples = data.count()
         val miniBatchSize = numExamples * miniBatchFraction
     
    -    // Initialize weights as a column vector
    -    var weights = Vectors.dense(initialWeights.toArray)
    -    val n = weights.size
    -
    -    /**
    -     * For the first iteration, the regVal will be initialized as sum of weight squares
    -     * if it's L2 updater; for L1 updater, the same logic is followed.
    -     */
    -    var regVal = updater.compute(
    -      weights, Vectors.dense(new Array[Double](weights.size)), 0, 1, regParam)._2
    -
    -    for (i <- 1 to numIterations) {
    -      val bcWeights = data.context.broadcast(weights)
    -      // Sample a subset (fraction miniBatchFraction) of the total data
    -      // compute and sum up the subgradients on this subset (this is one map-reduce)
    -      val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42 + i)
    -        .treeAggregate((BDV.zeros[Double](n), 0.0))(
    -          seqOp = (c, v) => (c, v) match { case ((grad, loss), (label, features)) =>
    -            val l = gradient.compute(features, label, bcWeights.value, Vectors.fromBreeze(grad))
    -            (grad, loss + l)
    -          },
    -          combOp = (c1, c2) => (c1, c2) match { case ((grad1, loss1), (grad2, loss2)) =>
    -            (grad1 += grad2, loss1 + loss2)
    -          })
    +    // if no data, return initial weights to avoid NaNs
    +    if (numExamples == 0) {
    +
    +      logInfo("GradientDescent.runMiniBatchSGD returning initial weights, no data found")
    +      (initialWeights, stochasticLossHistory.toArray)
    --- End diff --
    
    Nice idea, made the change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by freeman-lab <gi...@git.apache.org>.
Github user freeman-lab commented on the pull request:

    https://github.com/apache/spark/pull/1361#issuecomment-49344174
  
    Looks like the basic test for correct final params passes, but not the stricter test for improvement on every update. Both pass locally. My guess is that it's running a bit slower on Jenkins, so the updates don't complete fast enough (I can create a failure locally by making the test data rate too high). I'll play with this, might work to just slow down the data rate.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1361#issuecomment-49384924
  
    QA tests have started for PR 1361. This patch merges cleanly. <br>View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16799/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by freeman-lab <gi...@git.apache.org>.
Github user freeman-lab commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1361#discussion_r15628342
  
    --- Diff: mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala ---
    @@ -0,0 +1,127 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.regression
    +
    +import java.io.File
    +
    +import com.google.common.io.Files
    +import org.apache.commons.io.FileUtils
    +import org.scalatest.FunSuite
    +import org.apache.spark.SparkConf
    +import org.apache.spark.streaming.{Milliseconds, Seconds, StreamingContext}
    +import org.apache.spark.mllib.util.{MLStreamingUtils, LinearDataGenerator, LocalSparkContext}
    +
    +import scala.collection.mutable.ArrayBuffer
    +
    +class StreamingLinearRegressionSuite extends FunSuite {
    --- End diff --
    
    I wanted to do this but it doesn't seem to work. The first test always passes, but the second test immediately hits this error. Maybe because we're effectively starting multiple StreamingContexts from the same SparkContext? I'm trying to debug. @tdas ?
    
        java.lang.NullPointerException:
          at org.apache.spark.streaming.StreamingContext.<init>(StreamingContext.scala:159)
          at org.apache.spark.streaming.StreamingContext.<init>(StreamingContext.scala:66)
          at org.apache.spark.mllib.regression.StreamingLinearRegressionSuite$$anonfun$2.apply$mcV$sp(StreamingLinearRegressionSuite.scala:92)
          at org.apache.spark.mllib.regression.StreamingLinearRegressionSuite$$anonfun$2.apply(StreamingLinearRegressionSuite.scala:89)
          at org.apache.spark.mllib.regression.StreamingLinearRegressionSuite$$anonfun$2.apply(StreamingLinearRegressionSuite.scala:89)
          at org.scalatest.Transformer$$anonfun$apply$1.apply(Transformer.scala:22)
          at org.scalatest.Transformer$$anonfun$apply$1.apply(Transformer.scala:22)
          at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
          at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
          at org.scalatest.Transformer.apply(Transformer.scala:22)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by tdas <gi...@git.apache.org>.
Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1361#discussion_r15707657
  
    --- Diff: examples/src/main/scala/org/apache/spark/examples/mllib/StreamingLinearRegression.scala ---
    @@ -0,0 +1,56 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.examples.mllib
    +
    +import org.apache.spark.SparkConf
    +import org.apache.spark.mllib.util.MLStreamingUtils
    +import org.apache.spark.mllib.regression.StreamingLinearRegressionWithSGD
    +import org.apache.spark.streaming.{Seconds, StreamingContext}
    +
    +/**
    + * Continually update a model on one stream of data using streaming linear regression,
    + * while making predictions on another stream of data
    + *
    --- End diff --
    
    The example needs a more documentation on how to run this. Is my assumption correct that the data needs to be loaded through the text file stream by writing new files to a folder? If that is the case, then all the usage details need to be mentioned.
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1361#discussion_r15566035
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegression.scala ---
    @@ -0,0 +1,104 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.regression
    +
    +import org.apache.spark.mllib.linalg.Vectors
    +import org.apache.spark.annotation.Experimental
    +
    +/**
    + * Train or predict a linear regression model on streaming data. Training uses
    + * Stochastic Gradient Descent to update the model based on each new batch of
    + * incoming data from a DStream (see LinearRegressionWithSGD for model equation)
    + *
    + * Each batch of data is assumed to be an RDD of LabeledPoints.
    + * The number of data points per batch can vary, but the number
    + * of features must be constant.
    + */
    +@Experimental
    +class StreamingLinearRegressionWithSGD private (
    +    private var stepSize: Double,
    +    private var numIterations: Int,
    +    private var miniBatchFraction: Double,
    --- End diff --
    
    For streaming updates, the RDDs are usually small. Maybe it is not necessary to use `miniBatchFraction`. But it is fine to keep this option.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1361#discussion_r15630905
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingRegression.scala ---
    @@ -0,0 +1,83 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.regression
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.annotation.{Experimental, DeveloperApi}
    +import org.apache.spark.streaming.dstream.DStream
    +
    +/**
    + * :: DeveloperApi ::
    + * StreamingRegression implements methods for training
    + * a linear regression model on streaming data, and using it
    + * for prediction on streaming data.
    + *
    + * This class takes as type parameters a GeneralizedLinearModel,
    + * and a GeneralizedLinearAlgorithm, making it easy to extend to construct
    + * streaming versions of arbitrary regression analyses. For example usage,
    + * see StreamingLinearRegressionWithSGD.
    + *
    + */
    +@DeveloperApi
    +@Experimental
    +abstract class StreamingRegression[
    +    M <: GeneralizedLinearModel,
    +    A <: GeneralizedLinearAlgorithm[M]] extends Logging {
    +
    +  /** The model to be updated and used for prediction. */
    +  var model: M
    +
    +  /** The algorithm to use for updating. */
    +  val algorithm: A
    +
    +  /** Return the latest model. */
    +  def latest(): M = {
    +    model
    +  }
    +
    +  /**
    +   * Update the model by training on batches of data from a DStream.
    +   * This operation registers a DStream for training the model,
    +   * and updates the model based on every subsequent non-empty
    +   * batch of data from the stream.
    +   *
    +   * @param data DStream containing labeled data
    +   */
    +  def trainOn(data: DStream[LabeledPoint]) {
    +    data.foreachRDD{
    +      rdd =>
    +        if (rdd.count() > 0) {
    +          model = algorithm.run(rdd, model.weights)
    +          logInfo("Model updated")
    +        }
    +        logInfo("Current model: weights, %s".format(model.weights.toString))
    +        logInfo("Current model: intercept, %s".format(model.intercept.toString))
    --- End diff --
    
    I'm a little worry about the accuracy if we don't provide the previous weight for the intercept. Ignoring the intercept may be safer. Otherwise, if the correct weight for the intercept is large but the step size is small (common for online updates), it may need many steps to recover the weight for the intercept. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by freeman-lab <gi...@git.apache.org>.
Github user freeman-lab commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1361#discussion_r15628405
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingRegression.scala ---
    @@ -0,0 +1,83 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.regression
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.annotation.{Experimental, DeveloperApi}
    +import org.apache.spark.streaming.dstream.DStream
    +
    +/**
    + * :: DeveloperApi ::
    + * StreamingRegression implements methods for training
    + * a linear regression model on streaming data, and using it
    + * for prediction on streaming data.
    + *
    + * This class takes as type parameters a GeneralizedLinearModel,
    + * and a GeneralizedLinearAlgorithm, making it easy to extend to construct
    + * streaming versions of arbitrary regression analyses. For example usage,
    + * see StreamingLinearRegressionWithSGD.
    + *
    + */
    +@DeveloperApi
    +@Experimental
    +abstract class StreamingRegression[
    +    M <: GeneralizedLinearModel,
    +    A <: GeneralizedLinearAlgorithm[M]] extends Logging {
    +
    +  /** The model to be updated and used for prediction. */
    +  var model: M
    +
    +  /** The algorithm to use for updating. */
    +  val algorithm: A
    +
    +  /** Return the latest model. */
    +  def latest(): M = {
    --- End diff --
    
    Great


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by freeman-lab <gi...@git.apache.org>.
Github user freeman-lab commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1361#discussion_r15682567
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingRegression.scala ---
    @@ -0,0 +1,83 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.regression
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.annotation.{Experimental, DeveloperApi}
    +import org.apache.spark.streaming.dstream.DStream
    +
    +/**
    + * :: DeveloperApi ::
    + * StreamingRegression implements methods for training
    + * a linear regression model on streaming data, and using it
    + * for prediction on streaming data.
    + *
    + * This class takes as type parameters a GeneralizedLinearModel,
    + * and a GeneralizedLinearAlgorithm, making it easy to extend to construct
    + * streaming versions of arbitrary regression analyses. For example usage,
    + * see StreamingLinearRegressionWithSGD.
    + *
    + */
    +@DeveloperApi
    +@Experimental
    +abstract class StreamingRegression[
    +    M <: GeneralizedLinearModel,
    +    A <: GeneralizedLinearAlgorithm[M]] extends Logging {
    +
    +  /** The model to be updated and used for prediction. */
    +  var model: M
    +
    +  /** The algorithm to use for updating. */
    +  val algorithm: A
    +
    +  /** Return the latest model. */
    +  def latest(): M = {
    +    model
    +  }
    +
    +  /**
    +   * Update the model by training on batches of data from a DStream.
    +   * This operation registers a DStream for training the model,
    +   * and updates the model based on every subsequent non-empty
    +   * batch of data from the stream.
    +   *
    +   * @param data DStream containing labeled data
    +   */
    +  def trainOn(data: DStream[LabeledPoint]) {
    +    data.foreachRDD{
    +      rdd =>
    +        if (rdd.count() > 0) {
    +          model = algorithm.run(rdd, model.weights)
    +          logInfo("Model updated")
    +        }
    +        logInfo("Current model: weights, %s".format(model.weights.toString))
    +        logInfo("Current model: intercept, %s".format(model.intercept.toString))
    --- End diff --
    
    Ok, good points, agreed it's safer. I'll make sure there's a note about this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1361#issuecomment-49287245
  
    QA results for PR 1361:<br>- This patch FAILED unit tests.<br>- This patch merges cleanly<br>- This patch adds no public classes<br><br>For more information see test ouptut:<br>https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16774/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1361#discussion_r15561486
  
    --- Diff: examples/src/main/scala/org/apache/spark/examples/mllib/StreamingLinearRegression.scala ---
    @@ -0,0 +1,56 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.examples.mllib
    +
    +import org.apache.spark.SparkConf
    +import org.apache.spark.mllib.util.MLStreamingUtils
    +import org.apache.spark.mllib.regression.StreamingLinearRegressionWithSGD
    +import org.apache.spark.streaming.{Seconds, StreamingContext}
    +
    +/**
    + * Continually update a model on one stream of data using streaming linear regression,
    + * while making predictions on another stream of data
    + *
    + */
    +object StreamingLinearRegression {
    +
    +  def main(args: Array[String]) {
    +
    +    if (args.length != 4) {
    +      System.err.println(
    +        "Usage: StreamingLinearRegression <trainingData> <testData> <batchDuration> <numFeatures>")
    +      System.exit(1)
    +    }
    +
    +    val conf = new SparkConf().setMaster("local").setAppName("StreamingLinearRegression")
    +    val ssc = new StreamingContext(conf, Seconds(args(2).toLong))
    +
    +    val trainingData = MLStreamingUtils.loadLabeledPointsFromText(ssc, args(0))
    --- End diff --
    
    It might be cleaner to rename `LabeledPointParser.parse` to `LabeledPoint.parse` and make it public. For the parser, the original name is `LabeledPoint.parse`. But `LabeledPoint` is a case class and the type signature is a little complicated to match. But if you can figure it out, we can remove `MLStreamingUtils`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by freeman-lab <gi...@git.apache.org>.
Github user freeman-lab commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1361#discussion_r15628373
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingRegression.scala ---
    @@ -0,0 +1,83 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.regression
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.annotation.{Experimental, DeveloperApi}
    +import org.apache.spark.streaming.dstream.DStream
    +
    +/**
    + * :: DeveloperApi ::
    + * StreamingRegression implements methods for training
    + * a linear regression model on streaming data, and using it
    + * for prediction on streaming data.
    + *
    + * This class takes as type parameters a GeneralizedLinearModel,
    + * and a GeneralizedLinearAlgorithm, making it easy to extend to construct
    + * streaming versions of arbitrary regression analyses. For example usage,
    + * see StreamingLinearRegressionWithSGD.
    + *
    + */
    +@DeveloperApi
    +@Experimental
    +abstract class StreamingRegression[
    +    M <: GeneralizedLinearModel,
    +    A <: GeneralizedLinearAlgorithm[M]] extends Logging {
    +
    +  /** The model to be updated and used for prediction. */
    +  var model: M
    +
    +  /** The algorithm to use for updating. */
    +  val algorithm: A
    +
    +  /** Return the latest model. */
    +  def latest(): M = {
    +    model
    +  }
    +
    +  /**
    +   * Update the model by training on batches of data from a DStream.
    +   * This operation registers a DStream for training the model,
    +   * and updates the model based on every subsequent non-empty
    +   * batch of data from the stream.
    +   *
    +   * @param data DStream containing labeled data
    +   */
    +  def trainOn(data: DStream[LabeledPoint]) {
    +    data.foreachRDD{
    +      rdd =>
    +        if (rdd.count() > 0) {
    +          model = algorithm.run(rdd, model.weights)
    +          logInfo("Model updated")
    +        }
    +        logInfo("Current model: weights, %s".format(model.weights.toString))
    +        logInfo("Current model: intercept, %s".format(model.intercept.toString))
    --- End diff --
    
    Yup, I noticed this. It could also work to call ``setIntercept(addIntercept=true)`` where the algorithm is defined (e.g. within ``StreamingLinearRegressionWithSGD``), and have a setter to control this. Estimating an intercept from scratch on each update should be well constrained because we'll be starting from the current weights.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1361#discussion_r15630544
  
    --- Diff: mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala ---
    @@ -0,0 +1,127 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.regression
    +
    +import java.io.File
    +
    +import com.google.common.io.Files
    +import org.apache.commons.io.FileUtils
    +import org.scalatest.FunSuite
    +import org.apache.spark.SparkConf
    +import org.apache.spark.streaming.{Milliseconds, Seconds, StreamingContext}
    +import org.apache.spark.mllib.util.{MLStreamingUtils, LinearDataGenerator, LocalSparkContext}
    +
    +import scala.collection.mutable.ArrayBuffer
    +
    +class StreamingLinearRegressionSuite extends FunSuite {
    --- End diff --
    
    Did you use the `LocalSparkContext` under `mllib` or core? Try to use the one from mllib.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1361#discussion_r15630645
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegression.scala ---
    @@ -0,0 +1,104 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.regression
    +
    +import org.apache.spark.mllib.linalg.Vectors
    +import org.apache.spark.annotation.Experimental
    +
    +/**
    + * Train or predict a linear regression model on streaming data. Training uses
    + * Stochastic Gradient Descent to update the model based on each new batch of
    + * incoming data from a DStream (see LinearRegressionWithSGD for model equation)
    + *
    + * Each batch of data is assumed to be an RDD of LabeledPoints.
    + * The number of data points per batch can vary, but the number
    + * of features must be constant.
    + */
    +@Experimental
    +class StreamingLinearRegressionWithSGD private (
    +    private var stepSize: Double,
    +    private var numIterations: Int,
    +    private var miniBatchFraction: Double,
    +    private var numFeatures: Int)
    +  extends StreamingRegression[LinearRegressionModel, LinearRegressionWithSGD] with Serializable {
    +
    +  val algorithm = new LinearRegressionWithSGD(stepSize, numIterations, miniBatchFraction)
    +
    +  var model = algorithm.createModel(Vectors.dense(new Array[Double](numFeatures)), 0.0)
    +
    +}
    +
    +/**
    + * Top-level methods for calling StreamingLinearRegressionWithSGD.
    + */
    +@Experimental
    +object StreamingLinearRegressionWithSGD {
    +
    +  /**
    +   * Start a streaming Linear Regression model by setting optimization parameters.
    +   *
    +   * @param numIterations Number of iterations of gradient descent to run.
    +   * @param stepSize Step size to be used for each iteration of gradient descent.
    +   * @param miniBatchFraction Fraction of data to be used per iteration.
    +   * @param numFeatures Number of features per record, must be constant for all batches of data.
    +   */
    +  def start(
    --- End diff --
    
    I like the builder pattern better, since every time I tried to use the static train method I forgot the order of arguments ...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1361#discussion_r15700516
  
    --- Diff: examples/src/main/scala/org/apache/spark/examples/mllib/StreamingLinearRegression.scala ---
    @@ -0,0 +1,56 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.examples.mllib
    +
    +import org.apache.spark.SparkConf
    +import org.apache.spark.mllib.util.MLStreamingUtils
    +import org.apache.spark.mllib.regression.StreamingLinearRegressionWithSGD
    +import org.apache.spark.streaming.{Seconds, StreamingContext}
    +
    +/**
    + * Continually update a model on one stream of data using streaming linear regression,
    + * while making predictions on another stream of data
    + *
    + */
    +object StreamingLinearRegression {
    +
    +  def main(args: Array[String]) {
    +
    +    if (args.length != 4) {
    +      System.err.println(
    +        "Usage: StreamingLinearRegression <trainingData> <testData> <batchDuration> <numFeatures>")
    +      System.exit(1)
    +    }
    +
    +    val conf = new SparkConf().setMaster("local").setAppName("StreamingLinearRegression")
    +    val ssc = new StreamingContext(conf, Seconds(args(2).toLong))
    +
    +    val trainingData = MLStreamingUtils.loadLabeledPointsFromText(ssc, args(0))
    --- End diff --
    
    Another solution is to make `LabeledPointParser` public and chain operations `ssc.textFileStream(path).map(LabeledPointParser.parse)`. If we want to load vectors, we can use `ssc.textFileStream(path).map(Vectors.parse)` instead of defining a new method `MLStreamingUtils.loadStreamingVectors`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1361#discussion_r15723261
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala ---
    @@ -162,45 +162,55 @@ object GradientDescent extends Logging {
         val numExamples = data.count()
         val miniBatchSize = numExamples * miniBatchFraction
     
    -    // Initialize weights as a column vector
    -    var weights = Vectors.dense(initialWeights.toArray)
    -    val n = weights.size
    -
    -    /**
    -     * For the first iteration, the regVal will be initialized as sum of weight squares
    -     * if it's L2 updater; for L1 updater, the same logic is followed.
    -     */
    -    var regVal = updater.compute(
    -      weights, Vectors.dense(new Array[Double](weights.size)), 0, 1, regParam)._2
    -
    -    for (i <- 1 to numIterations) {
    -      val bcWeights = data.context.broadcast(weights)
    -      // Sample a subset (fraction miniBatchFraction) of the total data
    -      // compute and sum up the subgradients on this subset (this is one map-reduce)
    -      val (gradientSum, lossSum) = data.sample(false, miniBatchFraction, 42 + i)
    -        .treeAggregate((BDV.zeros[Double](n), 0.0))(
    -          seqOp = (c, v) => (c, v) match { case ((grad, loss), (label, features)) =>
    -            val l = gradient.compute(features, label, bcWeights.value, Vectors.fromBreeze(grad))
    -            (grad, loss + l)
    -          },
    -          combOp = (c1, c2) => (c1, c2) match { case ((grad1, loss1), (grad2, loss2)) =>
    -            (grad1 += grad2, loss1 + loss2)
    -          })
    +    // if no data, return initial weights to avoid NaNs
    +    if (numExamples == 0) {
    +
    +      logInfo("GradientDescent.runMiniBatchSGD returning initial weights, no data found")
    +      (initialWeights, stochasticLossHistory.toArray)
    --- End diff --
    
    it may be better to use `return (initialWeights, stochasticLossHistory.toArray)` here to avoid having extra indentation for the main block.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on the pull request:

    https://github.com/apache/spark/pull/1361#issuecomment-50952454
  
    LGTM. Merged into master. Thanks a lot for putting Streaming and MLlib together!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1361#discussion_r15566064
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingRegression.scala ---
    @@ -0,0 +1,83 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.regression
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.annotation.{Experimental, DeveloperApi}
    +import org.apache.spark.streaming.dstream.DStream
    +
    +/**
    + * :: DeveloperApi ::
    + * StreamingRegression implements methods for training
    + * a linear regression model on streaming data, and using it
    + * for prediction on streaming data.
    + *
    + * This class takes as type parameters a GeneralizedLinearModel,
    + * and a GeneralizedLinearAlgorithm, making it easy to extend to construct
    + * streaming versions of arbitrary regression analyses. For example usage,
    + * see StreamingLinearRegressionWithSGD.
    + *
    + */
    +@DeveloperApi
    +@Experimental
    +abstract class StreamingRegression[
    +    M <: GeneralizedLinearModel,
    +    A <: GeneralizedLinearAlgorithm[M]] extends Logging {
    +
    +  /** The model to be updated and used for prediction. */
    +  var model: M
    +
    +  /** The algorithm to use for updating. */
    +  val algorithm: A
    +
    +  /** Return the latest model. */
    +  def latest(): M = {
    +    model
    +  }
    +
    +  /**
    +   * Update the model by training on batches of data from a DStream.
    +   * This operation registers a DStream for training the model,
    +   * and updates the model based on every subsequent non-empty
    +   * batch of data from the stream.
    +   *
    +   * @param data DStream containing labeled data
    +   */
    +  def trainOn(data: DStream[LabeledPoint]) {
    +    data.foreachRDD{
    +      rdd =>
    --- End diff --
    
    Spark's code style prefers the following:
    
    ~~~
    data.foreachRDD { rdd =>
      ...
    ~~~


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by freeman-lab <gi...@git.apache.org>.
Github user freeman-lab commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1361#discussion_r15628354
  
    --- Diff: mllib/src/test/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionSuite.scala ---
    @@ -0,0 +1,127 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.regression
    +
    +import java.io.File
    +
    +import com.google.common.io.Files
    +import org.apache.commons.io.FileUtils
    +import org.scalatest.FunSuite
    +import org.apache.spark.SparkConf
    +import org.apache.spark.streaming.{Milliseconds, Seconds, StreamingContext}
    +import org.apache.spark.mllib.util.{MLStreamingUtils, LinearDataGenerator, LocalSparkContext}
    +
    +import scala.collection.mutable.ArrayBuffer
    +
    +class StreamingLinearRegressionSuite extends FunSuite {
    +
    +  // Assert that two values are equal within tolerance epsilon
    +  def assertEqual(v1: Double, v2: Double, epsilon: Double) {
    +    def errorMessage = v1.toString + " did not equal " + v2.toString
    +    assert(math.abs(v1-v2) <= epsilon, errorMessage)
    +  }
    +
    +  // Assert that model predictions are correct
    +  def validatePrediction(predictions: Seq[Double], input: Seq[LabeledPoint]) {
    +    val numOffPredictions = predictions.zip(input).count { case (prediction, expected) =>
    +      // A prediction is off if the prediction is more than 0.5 away from expected value.
    +      math.abs(prediction - expected.label) > 0.5
    +    }
    +    // At least 80% of the predictions should be on.
    +    assert(numOffPredictions < input.length / 5)
    +  }
    +
    +  // Test if we can accurately learn Y = 10*X1 + 10*X2 on streaming data
    +  test("streaming linear regression parameter accuracy") {
    +
    +    val conf = new SparkConf().setMaster("local").setAppName("streaming test")
    +    val testDir = Files.createTempDir()
    +    val numBatches = 10
    +    val ssc = new StreamingContext(conf, Seconds(1))
    +    val data = MLStreamingUtils.loadLabeledPointsFromText(ssc, testDir.toString)
    +    val model = StreamingLinearRegressionWithSGD.start(numFeatures=2, numIterations=50)
    +
    +    model.trainOn(data)
    +
    +    ssc.start()
    +
    +    // write data to a file stream
    +    Thread.sleep(5000)
    --- End diff --
    
    MIght not be =) I added it because I saw it in the streaming test suite for file writing, but without it both tests still pass fine.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by freeman-lab <gi...@git.apache.org>.
Github user freeman-lab commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1361#discussion_r15726365
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/regression/StreamingLinearRegressionWithSGD.scala ---
    @@ -0,0 +1,86 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.regression
    +
    +import org.apache.spark.annotation.Experimental
    +import org.apache.spark.mllib.linalg.{Vector, Vectors}
    +
    +/**
    + * Train or predict a linear regression model on streaming data. Training uses
    + * Stochastic Gradient Descent to update the model based on each new batch of
    + * incoming data from a DStream (see `LinearRegressionWithSGD` for model equation)
    + *
    + * Each batch of data is assumed to be an RDD of LabeledPoints.
    + * The number of data points per batch can vary, but the number
    + * of features must be constant. An initial weight
    + * vector must be provided.
    + *
    + * Use a builder pattern to construct a streaming linear regression
    + * analysis in an application, like:
    + *
    + *  val model = new StreamingLinearRegressionWithSGD()
    + *    .setStepSize(0.5)
    + *    .setNumIterations(10)
    + *    .setInitialWeights(Vectors.dense(...))
    + *    .trainOn(DStream)
    + *
    + */
    +@Experimental
    +class StreamingLinearRegressionWithSGD (
    +    private var stepSize: Double,
    +    private var numIterations: Int,
    +    private var miniBatchFraction: Double,
    +    private var initialWeights: Vector)
    +  extends StreamingLinearAlgorithm[
    +    LinearRegressionModel, LinearRegressionWithSGD] with Serializable {
    +
    +  /**
    +   * Construct a StreamingLinearRegression object with default parameters:
    +   * {stepSize: 0.1, numIterations: 50, miniBatchFraction: 1.0, initialWeights: [0.0, 0.0]}.
    +   */
    +  def this() = this(0.1, 50, 1.0, Vectors.dense(0.0, 0.0))
    --- End diff --
    
    K, this should work for now. Will be fun to figure out the automatic setting =)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by freeman-lab <gi...@git.apache.org>.
Github user freeman-lab commented on the pull request:

    https://github.com/apache/spark/pull/1361#issuecomment-48664477
  
    @mengxr great! Just created a JIRA (https://issues.apache.org/jira/browse/SPARK-2438) and added to the title.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Streaming mllib [SPARK-2438][MLLIB]

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1361#discussion_r15724782
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/optimization/GradientDescent.scala ---
    @@ -174,17 +182,18 @@ object GradientDescent extends Logging {
           weights, Vectors.dense(new Array[Double](weights.size)), 0, 1, regParam)._2
     
         for (i <- 1 to numIterations) {
    -      val bcWeights = data.context.broadcast(weights)
    --- End diff --
    
    Broadcasting the weights is actually important for performance. Did you experience any problem with it? It may be an orthogonal issue. Maybe we should keep this code block unchanged.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---