You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2015/05/28 16:37:54 UTC
flink git commit: [FLINK-2047] [ml] Renaming CoCoA to SVM
Repository: flink
Updated Branches:
refs/heads/master 41181603d -> 995f8f969
[FLINK-2047] [ml] Renaming CoCoA to SVM
This closes #733.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/995f8f96
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/995f8f96
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/995f8f96
Branch: refs/heads/master
Commit: 995f8f9693c4fe2e40efcf6a82c10ccab11c37ba
Parents: 4118160
Author: Theodore Vasiloudis <tv...@sics.se>
Authored: Wed May 27 12:00:17 2015 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu May 28 16:37:27 2015 +0200
----------------------------------------------------------------------
docs/libs/ml/cocoa.md | 182 -------
docs/libs/ml/index.md | 2 +-
docs/libs/ml/svm.md | 182 +++++++
.../apache/flink/ml/classification/CoCoA.scala | 515 -------------------
.../apache/flink/ml/classification/SVM.scala | 515 +++++++++++++++++++
.../flink/ml/classification/CoCoAITSuite.scala | 52 --
.../flink/ml/classification/SVMITSuite.scala | 52 ++
7 files changed, 750 insertions(+), 750 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/995f8f96/docs/libs/ml/cocoa.md
----------------------------------------------------------------------
diff --git a/docs/libs/ml/cocoa.md b/docs/libs/ml/cocoa.md
deleted file mode 100644
index 9297ce3..0000000
--- a/docs/libs/ml/cocoa.md
+++ /dev/null
@@ -1,182 +0,0 @@
----
-mathjax: include
-htmlTitle: FlinkML - Communication efficient distributed dual coordinate ascent (CoCoA)
-title: <a href="../ml">FlinkML</a> - Communication efficient distributed dual coordinate ascent (CoCoA)
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-* This will be replaced by the TOC
-{:toc}
-
-## Description
-
-Implements the communication-efficient distributed dual coordinate ascent algorithm with hinge-loss function.
-The algorithm can be used to train a SVM with soft-margin.
-The algorithm solves the following minimization problem:
-
-$$\min_{\mathbf{w} \in \mathbb{R}^d} \frac{\lambda}{2} \left\lVert \mathbf{w} \right\rVert^2 + \frac{1}{n} \sum_{i=1}^n l_{i}\left(\mathbf{w}^T\mathbf{x}_i\right)$$
-
-with $\mathbf{w}$ being the weight vector, $\lambda$ being the regularization constant,
-$$\mathbf{x}_i \in \mathbb{R}^d$$ being the data points and $$l_{i}$$ being the convex loss
-functions, which can also depend on the labels $$y_{i} \in \mathbb{R}$$.
-In the current implementation the regularizer is the $\ell_2$-norm and the loss functions are the hinge-loss functions:
-
- $$l_{i} = \max\left(0, 1 - y_{i} \mathbf{w}^T\mathbf{x}_i \right)$$
-
-With these choices, the problem definition is equivalent to a SVM with soft-margin.
-Thus, the algorithm allows us to train a SVM with soft-margin.
-
-The minimization problem is solved by applying stochastic dual coordinate ascent (SDCA).
-In order to make the algorithm efficient in a distributed setting, the CoCoA algorithm calculates
-several iterations of SDCA locally on a data block before merging the local updates into a
-valid global state.
-This state is redistributed to the different data partitions where the next round of local SDCA
-iterations is then executed.
-The number of outer iterations and local SDCA iterations control the overall network costs, because
-there is only network communication required for each outer iteration.
-The local SDCA iterations are embarrassingly parallel once the individual data partitions have been
-distributed across the cluster.
-
-The implementation of this algorithm is based on the work of
-[Jaggi et al.](http://arxiv.org/abs/1409.1458)
-
-## Operations
-
-`CoCoA` is a `Predictor`.
-As such, it supports the `fit` and `predict` operation.
-
-### Fit
-
-CoCoA is trained given a set of `LabeledVector`:
-
-* `fit: DataSet[LabeledVector] => Unit`
-
-### Predict
-
-CoCoA predicts for all subtypes of `Vector` the corresponding class label:
-
-* `predict[T <: Vector]: DataSet[T] => DataSet[LabeledVector]`
-
-## Parameters
-
-The CoCoA implementation can be controlled by the following parameters:
-
- <table class="table table-bordered">
- <thead>
- <tr>
- <th class="text-left" style="width: 20%">Parameters</th>
- <th class="text-center">Description</th>
- </tr>
- </thead>
-
- <tbody>
- <tr>
- <td><strong>Blocks</strong></td>
- <td>
- <p>
- Sets the number of blocks into which the input data will be split.
- On each block the local stochastic dual coordinate ascent method is executed.
- This number should be set at least to the degree of parallelism.
- If no value is specified, then the parallelism of the input DataSet is used as the number of blocks.
- (Default value: <strong>None</strong>)
- </p>
- </td>
- </tr>
- <tr>
- <td><strong>Iterations</strong></td>
- <td>
- <p>
- Defines the maximum number of iterations of the outer loop method.
- In other words, it defines how often the SDCA method is applied to the blocked data.
- After each iteration, the locally computed weight vector updates have to be reduced to update the global weight vector value.
- The new weight vector is broadcast to all SDCA tasks at the beginning of each iteration.
- (Default value: <strong>10</strong>)
- </p>
- </td>
- </tr>
- <tr>
- <td><strong>LocalIterations</strong></td>
- <td>
- <p>
- Defines the maximum number of SDCA iterations.
- In other words, it defines how many data points are drawn from each local data block to calculate the stochastic dual coordinate ascent.
- (Default value: <strong>10</strong>)
- </p>
- </td>
- </tr>
- <tr>
- <td><strong>Regularization</strong></td>
- <td>
- <p>
- Defines the regularization constant of the CoCoA algorithm.
- The higher the value, the smaller will the 2-norm of the weight vector be.
- In case of a SVM with hinge loss this means that the SVM margin will be wider even though it might contain some false classifications.
- (Default value: <strong>1.0</strong>)
- </p>
- </td>
- </tr>
- <tr>
- <td><strong>Stepsize</strong></td>
- <td>
- <p>
- Defines the initial step size for the updates of the weight vector.
- The larger the step size is, the larger will be the contribution of the weight vector updates to the next weight vector value.
- The effective scaling of the updates is $\frac{stepsize}{blocks}$.
- This value has to be tuned in case that the algorithm becomes instable.
- (Default value: <strong>1.0</strong>)
- </p>
- </td>
- </tr>
- <tr>
- <td><strong>Seed</strong></td>
- <td>
- <p>
- Defines the seed to initialize the random number generator.
- The seed directly controls which data points are chosen for the SDCA method.
- (Default value: <strong>0</strong>)
- </p>
- </td>
- </tr>
- </tbody>
- </table>
-
-## Examples
-
-{% highlight scala %}
-// Read the training data set
-val trainingDS: DataSet[LabeledVector] = env.readSVMFile(pathToTrainingFile)
-
-// Create the CoCoA learner
-val svm = CoCoA()
-.setBlocks(10)
-.setIterations(10)
-.setLocalIterations(10)
-.setRegularization(0.5)
-.setStepsize(0.5)
-
-// Learn the SVM model
-svm.fit(trainingDS)
-
-// Read the testing data set
-val testingDS: DataSet[Vector] = env.readVectorFile(pathToTestingFile)
-
-// Calculate the predictions for the testing data set
-val predictionDS: DataSet[LabeledVector] = svm.predict(testingDS)
-{% endhighlight %}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/995f8f96/docs/libs/ml/index.md
----------------------------------------------------------------------
diff --git a/docs/libs/ml/index.md b/docs/libs/ml/index.md
index d172bda..f61480f 100644
--- a/docs/libs/ml/index.md
+++ b/docs/libs/ml/index.md
@@ -35,7 +35,7 @@ FlinkML currently supports the following algorithms:
### Supervised Learning
-* [Communication efficient distributed dual coordinate ascent (CoCoA)](cocoa.html)
+* [SVM using Communication efficient distributed dual coordinate ascent (CoCoA)](svm.html)
* [Multiple linear regression](multiple_linear_regression.html)
* [Optimization Framework](optimization.html)
http://git-wip-us.apache.org/repos/asf/flink/blob/995f8f96/docs/libs/ml/svm.md
----------------------------------------------------------------------
diff --git a/docs/libs/ml/svm.md b/docs/libs/ml/svm.md
new file mode 100644
index 0000000..ec5cc6a
--- /dev/null
+++ b/docs/libs/ml/svm.md
@@ -0,0 +1,182 @@
+---
+mathjax: include
+htmlTitle: FlinkML - SVM using CoCoA
+title: <a href="../ml">FlinkML</a> - SVM using CoCoA
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+## Description
+
+Implements an SVM with soft-margin using the communication-efficient distributed dual coordinate
+ascent algorithm with hinge-loss function.
+The algorithm solves the following minimization problem:
+
+$$\min_{\mathbf{w} \in \mathbb{R}^d} \frac{\lambda}{2} \left\lVert \mathbf{w} \right\rVert^2 + \frac{1}{n} \sum_{i=1}^n l_{i}\left(\mathbf{w}^T\mathbf{x}_i\right)$$
+
+with $\mathbf{w}$ being the weight vector, $\lambda$ being the regularization constant,
+$$\mathbf{x}_i \in \mathbb{R}^d$$ being the data points and $$l_{i}$$ being the convex loss
+functions, which can also depend on the labels $$y_{i} \in \mathbb{R}$$.
+In the current implementation the regularizer is the $\ell_2$-norm and the loss functions are the hinge-loss functions:
+
+ $$l_{i} = \max\left(0, 1 - y_{i} \mathbf{w}^T\mathbf{x}_i \right)$$
+
+With these choices, the problem definition is equivalent to a SVM with soft-margin.
+Thus, the algorithm allows us to train a SVM with soft-margin.
+
+The minimization problem is solved by applying stochastic dual coordinate ascent (SDCA).
+In order to make the algorithm efficient in a distributed setting, the CoCoA algorithm calculates
+several iterations of SDCA locally on a data block before merging the local updates into a
+valid global state.
+This state is redistributed to the different data partitions where the next round of local SDCA
+iterations is then executed.
+The number of outer iterations and local SDCA iterations control the overall network costs, because
+there is only network communication required for each outer iteration.
+The local SDCA iterations are embarrassingly parallel once the individual data partitions have been
+distributed across the cluster.
+
+The implementation of this algorithm is based on the work of
+[Jaggi et al.](http://arxiv.org/abs/1409.1458)
+
+## Operations
+
+`CoCoA` is a `Predictor`.
+As such, it supports the `fit` and `predict` operation.
+
+### Fit
+
+CoCoA is trained given a set of `LabeledVector`:
+
+* `fit: DataSet[LabeledVector] => Unit`
+
+### Predict
+
+CoCoA predicts for all subtypes of `Vector` the corresponding class label:
+
+* `predict[T <: Vector]: DataSet[T] => DataSet[LabeledVector]`
+
+## Parameters
+
+The SVM implementation can be controlled by the following parameters:
+
+ <table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 20%">Parameters</th>
+ <th class="text-center">Description</th>
+ </tr>
+ </thead>
+
+ <tbody>
+ <tr>
+ <td><strong>Blocks</strong></td>
+ <td>
+ <p>
+ Sets the number of blocks into which the input data will be split.
+ On each block the local stochastic dual coordinate ascent method is executed.
+ This number should be set at least to the degree of parallelism.
+ If no value is specified, then the parallelism of the input DataSet is used as the number of blocks.
+ (Default value: <strong>None</strong>)
+ </p>
+ </td>
+ </tr>
+ <tr>
+ <td><strong>Iterations</strong></td>
+ <td>
+ <p>
+ Defines the maximum number of iterations of the outer loop method.
+ In other words, it defines how often the SDCA method is applied to the blocked data.
+ After each iteration, the locally computed weight vector updates have to be reduced to update the global weight vector value.
+ The new weight vector is broadcast to all SDCA tasks at the beginning of each iteration.
+ (Default value: <strong>10</strong>)
+ </p>
+ </td>
+ </tr>
+ <tr>
+ <td><strong>LocalIterations</strong></td>
+ <td>
+ <p>
+ Defines the maximum number of SDCA iterations.
+ In other words, it defines how many data points are drawn from each local data block to calculate the stochastic dual coordinate ascent.
+ (Default value: <strong>10</strong>)
+ </p>
+ </td>
+ </tr>
+ <tr>
+ <td><strong>Regularization</strong></td>
+ <td>
+ <p>
+ Defines the regularization constant of the CoCoA algorithm.
+ The higher the value, the smaller will the 2-norm of the weight vector be.
+ In case of a SVM with hinge loss this means that the SVM margin will be wider even though it might contain some false classifications.
+ (Default value: <strong>1.0</strong>)
+ </p>
+ </td>
+ </tr>
+ <tr>
+ <td><strong>Stepsize</strong></td>
+ <td>
+ <p>
+ Defines the initial step size for the updates of the weight vector.
+ The larger the step size is, the larger will be the contribution of the weight vector updates to the next weight vector value.
+ The effective scaling of the updates is $\frac{stepsize}{blocks}$.
+ This value has to be tuned in case that the algorithm becomes unstable.
+ (Default value: <strong>1.0</strong>)
+ </p>
+ </td>
+ </tr>
+ <tr>
+ <td><strong>Seed</strong></td>
+ <td>
+ <p>
+ Defines the seed to initialize the random number generator.
+ The seed directly controls which data points are chosen for the SDCA method.
+ (Default value: <strong>0</strong>)
+ </p>
+ </td>
+ </tr>
+ </tbody>
+ </table>
+
+## Examples
+
+{% highlight scala %}
+// Read the training data set, from a LibSVM formatted file
+val trainingDS: DataSet[LabeledVector] = env.readLibSVM(pathToTrainingFile)
+
+// Create the SVM learner
+val svm = SVM()
+.setBlocks(10)
+.setIterations(10)
+.setLocalIterations(10)
+.setRegularization(0.5)
+.setStepsize(0.5)
+
+// Learn the SVM model
+svm.fit(trainingDS)
+
+// Read the testing data set
+val testingDS: DataSet[Vector] = env.readVectorFile(pathToTestingFile)
+
+// Calculate the predictions for the testing data set
+val predictionDS: DataSet[LabeledVector] = svm.predict(testingDS)
+{% endhighlight %}
http://git-wip-us.apache.org/repos/asf/flink/blob/995f8f96/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/CoCoA.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/CoCoA.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/CoCoA.scala
deleted file mode 100644
index fea6be5..0000000
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/CoCoA.scala
+++ /dev/null
@@ -1,515 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.ml.classification
-
-import org.apache.flink.ml.pipeline.{FitOperation, PredictOperation, Predictor}
-
-import scala.collection.mutable.ArrayBuffer
-import scala.util.Random
-
-import org.apache.flink.api.common.functions.RichMapFunction
-import org.apache.flink.api.scala._
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.ml.common.FlinkMLTools.ModuloKeyPartitioner
-import org.apache.flink.ml.common._
-import org.apache.flink.ml.math.Vector
-import org.apache.flink.ml.math.Breeze._
-
-import breeze.linalg.{Vector => BreezeVector, DenseVector => BreezeDenseVector}
-
-/** Implements the communication-efficient distributed dual coordinate ascent algorithm with
- * hinge-loss function. The algorithm can be used to train a SVM with soft-margin.
- *
- * The algorithm solves the following minimization problem:
- *
- * `min_{w in bbb"R"^d} lambda/2 ||w||^2 + 1/n sum_(i=1)^n l_{i}(w^Tx_i)`
- *
- * with `w` being the weight vector, `lambda` being the regularization constant,
- * `x_{i} in bbb"R"^d` being the data points and `l_{i}` being the convex loss functions, which
- * can also depend on the labels `y_{i} in bbb"R"`.
- * In the current implementation the regularizer is the 2-norm and the loss functions are the
- * hinge-loss functions:
- *
- * `l_{i} = max(0, 1 - y_{i} * w^Tx_i`
- *
- * With these choices, the problem definition is equivalent to a SVM with soft-margin.
- * Thus, the algorithm allows us to train a SVM with soft-margin.
- *
- * The minimization problem is solved by applying stochastic dual coordinate ascent (SDCA).
- * In order to make the algorithm efficient in a distributed setting, the CoCoA algorithm
- * calculates several iterations of SDCA locally on a data block before merging the local
- * updates into a valid global state.
- * This state is redistributed to the different data partitions where the next round of local
- * SDCA iterations is then executed.
- * The number of outer iterations and local SDCA iterations control the overall network costs,
- * because there is only network communication required for each outer iteration.
- * The local SDCA iterations are embarrassingly parallel once the individual data partitions have
- * been distributed across the cluster.
- *
- * Further details of the algorithm can be found [[http://arxiv.org/abs/1409.1458 here]].
- *
- * @example
- * {{{
- * val trainingDS: DataSet[LabeledVector] = env.readSVMFile(pathToTrainingFile)
- *
- * val svm = CoCoA()
- * .setBlocks(10)
- * .setIterations(10)
- * .setLocalIterations(10)
- * .setRegularization(0.5)
- * .setStepsize(0.5)
- *
- * svm.fit(trainingDS)
- *
- * val testingDS: DataSet[Vector] = env.readVectorFile(pathToTestingFile)
- *
- * val predictionDS: DataSet[LabeledVector] = svm.predict(testingDS)
- * }}}
- *
- * =Parameters=
- *
- * - [[org.apache.flink.ml.classification.CoCoA.Blocks]]:
- * Sets the number of blocks into which the input data will be split. On each block the local
- * stochastic dual coordinate ascent method is executed. This number should be set at least to
- * the degree of parallelism. If no value is specified, then the parallelism of the input
- * [[DataSet]] is used as the number of blocks. (Default value: '''None''')
- *
- * - [[org.apache.flink.ml.classification.CoCoA.Iterations]]:
- * Defines the maximum number of iterations of the outer loop method. In other words, it defines
- * how often the SDCA method is applied to the blocked data. After each iteration, the locally
- * computed weight vector updates have to be reduced to update the global weight vector value.
- * The new weight vector is broadcast to all SDCA tasks at the beginning of each iteration.
- * (Default value: '''10''')
- *
- * - [[org.apache.flink.ml.classification.CoCoA.LocalIterations]]:
- * Defines the maximum number of SDCA iterations. In other words, it defines how many data points
- * are drawn from each local data block to calculate the stochastic dual coordinate ascent.
- * (Default value: '''10''')
- *
- * - [[org.apache.flink.ml.classification.CoCoA.Regularization]]:
- * Defines the regularization constant of the CoCoA algorithm. The higher the value, the smaller
- * will the 2-norm of the weight vector be. In case of a SVM with hinge loss this means that the
- * SVM margin will be wider even though it might contain some false classifications.
- * (Default value: '''1.0''')
- *
- * - [[org.apache.flink.ml.classification.CoCoA.Stepsize]]:
- * Defines the initial step size for the updates of the weight vector. The larger the step size
- * is, the larger will be the contribution of the weight vector updates to the next weight vector
- * value. The effective scaling of the updates is `stepsize/blocks`. This value has to be tuned
- * in case that the algorithm becomes instable. (Default value: '''1.0''')
- *
- * - [[org.apache.flink.ml.classification.CoCoA.Seed]]:
- * Defines the seed to initialize the random number generator. The seed directly controls which
- * data points are chosen for the SDCA method. (Default value: '''0''')
- */
-class CoCoA extends Predictor[CoCoA] {
-
- import CoCoA._
-
- /** Stores the learned weight vector after the fit operation */
- var weightsOption: Option[DataSet[BreezeDenseVector[Double]]] = None
-
- /** Sets the number of data blocks/partitions
- *
- * @param blocks
- * @return itself
- */
- def setBlocks(blocks: Int): CoCoA = {
- parameters.add(Blocks, blocks)
- this
- }
-
- /** Sets the number of outer iterations
- *
- * @param iterations
- * @return itself
- */
- def setIterations(iterations: Int): CoCoA = {
- parameters.add(Iterations, iterations)
- this
- }
-
- /** Sets the number of local SDCA iterations
- *
- * @param localIterations
- * @return itselft
- */
- def setLocalIterations(localIterations: Int): CoCoA = {
- parameters.add(LocalIterations, localIterations)
- this
- }
-
- /** Sets the regularization constant
- *
- * @param regularization
- * @return itself
- */
- def setRegularization(regularization: Double): CoCoA = {
- parameters.add(Regularization, regularization)
- this
- }
-
- /** Sets the stepsize for the weight vector updates
- *
- * @param stepsize
- * @return itself
- */
- def setStepsize(stepsize: Double): CoCoA = {
- parameters.add(Stepsize, stepsize)
- this
- }
-
- /** Sets the seed value for the random number generator
- *
- * @param seed
- * @return itself
- */
- def setSeed(seed: Long): CoCoA = {
- parameters.add(Seed, seed)
- this
- }
-}
-
-/** Companion object of CoCoA. Contains convenience functions and the parameter type definitions
- * of the algorithm.
- */
-object CoCoA{
- val WEIGHT_VECTOR ="weightVector"
-
- // ========================================== Parameters =========================================
-
- case object Blocks extends Parameter[Int] {
- val defaultValue: Option[Int] = None
- }
-
- case object Iterations extends Parameter[Int] {
- val defaultValue = Some(10)
- }
-
- case object LocalIterations extends Parameter[Int] {
- val defaultValue = Some(10)
- }
-
- case object Regularization extends Parameter[Double] {
- val defaultValue = Some(1.0)
- }
-
- case object Stepsize extends Parameter[Double] {
- val defaultValue = Some(1.0)
- }
-
- case object Seed extends Parameter[Long] {
- val defaultValue = Some(0L)
- }
-
- // ========================================== Factory methods ====================================
-
- def apply(): CoCoA = {
- new CoCoA()
- }
-
- // ========================================== Operations =========================================
-
- /** [[org.apache.flink.ml.pipeline.PredictOperation]] for vector types. The result type is a
- * [[LabeledVector]]
- *
- * @tparam T Subtype of [[Vector]]
- * @return
- */
- implicit def predictValues[T <: Vector] = {
- new PredictOperation[CoCoA, T, LabeledVector]{
- override def predict(
- instance: CoCoA,
- predictParameters: ParameterMap,
- input: DataSet[T])
- : DataSet[LabeledVector] = {
-
- instance.weightsOption match {
- case Some(weights) => {
- input.map(new PredictionMapper[T]).withBroadcastSet(weights, WEIGHT_VECTOR)
- }
-
- case None => {
- throw new RuntimeException("The CoCoA model has not been trained. Call first fit" +
- "before calling the predict operation.")
- }
- }
- }
- }
- }
-
- /** Mapper to calculate the value of the prediction function. This is a RichMapFunction, because
- * we broadcast the weight vector to all mappers.
- */
- class PredictionMapper[T <: Vector] extends RichMapFunction[T, LabeledVector] {
-
- var weights: BreezeDenseVector[Double] = _
-
- @throws(classOf[Exception])
- override def open(configuration: Configuration): Unit = {
- // get current weights
- weights = getRuntimeContext.
- getBroadcastVariable[BreezeDenseVector[Double]](WEIGHT_VECTOR).get(0)
- }
-
- override def map(vector: T): LabeledVector = {
- // calculate the prediction value (scaled distance from the separating hyperplane)
- val dotProduct = weights dot vector.asBreeze
-
- LabeledVector(dotProduct, vector)
- }
- }
-
- /** [[FitOperation]] which trains a SVM with soft-margin based on the given training data set.
- *
- */
- implicit val fitCoCoA = {
- new FitOperation[CoCoA, LabeledVector] {
- override def fit(
- instance: CoCoA,
- fitParameters: ParameterMap,
- input: DataSet[LabeledVector])
- : Unit = {
- val resultingParameters = instance.parameters ++ fitParameters
-
- // Check if the number of blocks/partitions has been specified
- val blocks = resultingParameters.get(Blocks) match {
- case Some(value) => value
- case None => input.getParallelism
- }
-
- val scaling = resultingParameters(Stepsize)/blocks
- val iterations = resultingParameters(Iterations)
- val localIterations = resultingParameters(LocalIterations)
- val regularization = resultingParameters(Regularization)
- val seed = resultingParameters(Seed)
-
- // Obtain DataSet with the dimension of the data points
- val dimension = input.map{_.vector.size}.reduce{
- (a, b) => {
- require(a == b, "Dimensions of feature vectors have to be equal.")
- a
- }
- }
-
- val initialWeights = createInitialWeights(dimension)
-
- // Count the number of vectors, but keep the value in a DataSet to broadcast it later
- // TODO: Once efficient count and intermediate result partitions are implemented, use count
- val numberVectors = input map { x => 1 } reduce { _ + _ }
-
- // Group the input data into blocks in round robin fashion
- val blockedInputNumberElements = FlinkMLTools.block(
- input,
- blocks,
- Some(ModuloKeyPartitioner)).
- cross(numberVectors).
- map { x => x }
-
- val resultingWeights = initialWeights.iterate(iterations) {
- weights => {
- // compute the local SDCA to obtain the weight vector updates
- val deltaWs = localDualMethod(
- weights,
- blockedInputNumberElements,
- localIterations,
- regularization,
- scaling,
- seed
- )
-
- // scale the weight vectors
- val weightedDeltaWs = deltaWs map {
- deltaW => {
- deltaW :*= scaling
- }
- }
-
- // calculate the new weight vector by adding the weight vector updates to the weight
- // vector value
- weights.union(weightedDeltaWs).reduce { _ + _ }
- }
- }
-
- // Store the learned weight vector in hte given instance
- instance.weightsOption = Some(resultingWeights)
- }
- }
- }
-
- /** Creates a zero vector of length dimension
- *
- * @param dimension [[DataSet]] containing the dimension of the initial weight vector
- * @return Zero vector of length dimension
- */
- private def createInitialWeights(dimension: DataSet[Int]): DataSet[BreezeDenseVector[Double]] = {
- dimension.map {
- d => BreezeDenseVector.zeros[Double](d)
- }
- }
-
- /** Computes the local SDCA on the individual data blocks/partitions
- *
- * @param w Current weight vector
- * @param blockedInputNumberElements Blocked/Partitioned input data
- * @param localIterations Number of local SDCA iterations
- * @param regularization Regularization constant
- * @param scaling Scaling value for new weight vector updates
- * @param seed Random number generator seed
- * @return [[DataSet]] of weight vector updates. The weight vector updates are double arrays
- */
- private def localDualMethod(
- w: DataSet[BreezeDenseVector[Double]],
- blockedInputNumberElements: DataSet[(Block[LabeledVector], Int)],
- localIterations: Int,
- regularization: Double,
- scaling: Double,
- seed: Long)
- : DataSet[BreezeDenseVector[Double]] = {
- /*
- Rich mapper calculating for each data block the local SDCA. We use a RichMapFunction here,
- because we broadcast the current value of the weight vector to all mappers.
- */
- val localSDCA = new RichMapFunction[(Block[LabeledVector], Int), BreezeDenseVector[Double]] {
- var originalW: BreezeDenseVector[Double] = _
- // we keep the alphas across the outer loop iterations
- val alphasArray = ArrayBuffer[BreezeDenseVector[Double]]()
- // there might be several data blocks in one Flink partition, therefore store mapping
- val idMapping = scala.collection.mutable.HashMap[Int, Int]()
- var counter = 0
-
- var r: Random = _
-
- override def open(parameters: Configuration): Unit = {
- originalW = getRuntimeContext.getBroadcastVariable(WEIGHT_VECTOR).get(0)
-
- if(r == null){
- r = new Random(seed ^ getRuntimeContext.getIndexOfThisSubtask)
- }
- }
-
- override def map(blockNumberElements: (Block[LabeledVector], Int))
- : BreezeDenseVector[Double] = {
- val (block, numberElements) = blockNumberElements
-
- // check if we already processed a data block with the corresponding block index
- val localIndex = idMapping.get(block.index) match {
- case Some(idx) => idx
- case None =>
- idMapping += (block.index -> counter)
- counter += 1
-
- alphasArray += BreezeDenseVector.zeros[Double](block.values.length)
-
- counter - 1
- }
-
- // create temporary alpha array for the local SDCA iterations
- val tempAlphas = alphasArray(localIndex).copy
-
- val numLocalDatapoints = tempAlphas.length
- val deltaAlphas = BreezeDenseVector.zeros[Double](numLocalDatapoints)
-
- val w = originalW.copy
-
- val deltaW = BreezeDenseVector.zeros[Double](originalW.length)
-
- for(i <- 1 to localIterations) {
- // pick random data point for SDCA
- val idx = r.nextInt(numLocalDatapoints)
-
- val LabeledVector(label, vector) = block.values(idx)
- val alpha = tempAlphas(idx)
-
- // maximize the dual problem and retrieve alpha and weight vector updates
- val (deltaAlpha, deltaWUpdate) = maximize(
- vector.asBreeze,
- label,
- regularization,
- alpha,
- w,
- numberElements)
-
- // update alpha values
- tempAlphas(idx) += deltaAlpha
- deltaAlphas(idx) += deltaAlpha
-
- // deltaWUpdate is already scaled with 1/lambda/n
- w += deltaWUpdate
- deltaW += deltaWUpdate
- }
-
- // update local alpha values
- alphasArray(localIndex) += deltaAlphas * scaling
-
- deltaW
- }
- }
-
- blockedInputNumberElements.map(localSDCA).withBroadcastSet(w, WEIGHT_VECTOR)
- }
-
- /** Maximizes the dual problem using hinge loss functions. It returns the alpha and weight
- * vector updates.
- *
- * @param x Selected data point
- * @param y Label of selected data point
- * @param regularization Regularization constant
- * @param alpha Alpha value of selected data point
- * @param w Current weight vector value
- * @param numberElements Number of elements in the training data set
- * @return Alpha and weight vector updates
- */
- private def maximize(
- x: BreezeVector[Double],
- y: Double, regularization: Double,
- alpha: Double,
- w: BreezeVector[Double],
- numberElements: Int)
- : (Double, BreezeVector[Double]) = {
- // compute hinge loss gradient
- val dotProduct = x dot w
- val grad = (y * dotProduct - 1.0) * (regularization * numberElements)
-
- // compute projected gradient
- var proj_grad = if(alpha <= 0.0){
- Math.min(grad, 0)
- } else if(alpha >= 1.0) {
- Math.max(grad, 0)
- } else {
- grad
- }
-
- if(Math.abs(grad) != 0.0){
- val qii = x dot x
- val newAlpha = if(qii != 0.0){
- Math.min(Math.max((alpha - (grad / qii)), 0.0), 1.0)
- } else {
- 1.0
- }
-
- val deltaW = x * y * (newAlpha - alpha) / (regularization * numberElements)
-
- (newAlpha - alpha, deltaW)
- } else {
- (0.0 , BreezeVector.zeros(w.length))
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/995f8f96/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala
new file mode 100644
index 0000000..a08fef2
--- /dev/null
+++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala
@@ -0,0 +1,515 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.classification
+
+import org.apache.flink.ml.pipeline.{FitOperation, PredictOperation, Predictor}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.util.Random
+
+import org.apache.flink.api.common.functions.RichMapFunction
+import org.apache.flink.api.scala._
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.ml.common.FlinkMLTools.ModuloKeyPartitioner
+import org.apache.flink.ml.common._
+import org.apache.flink.ml.math.Vector
+import org.apache.flink.ml.math.Breeze._
+
+import breeze.linalg.{Vector => BreezeVector, DenseVector => BreezeDenseVector}
+
+/** Implements a soft-maring SVM using the communication-efficient distributed dual coordinate
+ * ascent algorithm (CoCoA) with hinge-loss function.
+ *
+ * The algorithm solves the following minimization problem:
+ *
+ * `min_{w in bbb"R"^d} lambda/2 ||w||^2 + 1/n sum_(i=1)^n l_{i}(w^Tx_i)`
+ *
+ * with `w` being the weight vector, `lambda` being the regularization constant,
+ * `x_{i} in bbb"R"^d` being the data points and `l_{i}` being the convex loss functions, which
+ * can also depend on the labels `y_{i} in bbb"R"`.
+ * In the current implementation the regularizer is the 2-norm and the loss functions are the
+ * hinge-loss functions:
+ *
+ * `l_{i} = max(0, 1 - y_{i} * w^Tx_i`
+ *
+ * With these choices, the problem definition is equivalent to a SVM with soft-margin.
+ * Thus, the algorithm allows us to train a SVM with soft-margin.
+ *
+ * The minimization problem is solved by applying stochastic dual coordinate ascent (SDCA).
+ * In order to make the algorithm efficient in a distributed setting, the CoCoA algorithm
+ * calculates several iterations of SDCA locally on a data block before merging the local
+ * updates into a valid global state.
+ * This state is redistributed to the different data partitions where the next round of local
+ * SDCA iterations is then executed.
+ * The number of outer iterations and local SDCA iterations control the overall network costs,
+ * because there is only network communication required for each outer iteration.
+ * The local SDCA iterations are embarrassingly parallel once the individual data partitions have
+ * been distributed across the cluster.
+ *
+ * Further details of the algorithm can be found [[http://arxiv.org/abs/1409.1458 here]].
+ *
+ * @example
+ * {{{
+ * val trainingDS: DataSet[LabeledVector] = env.readSVMFile(pathToTrainingFile)
+ *
+ * val svm = CoCoA()
+ * .setBlocks(10)
+ * .setIterations(10)
+ * .setLocalIterations(10)
+ * .setRegularization(0.5)
+ * .setStepsize(0.5)
+ *
+ * svm.fit(trainingDS)
+ *
+ * val testingDS: DataSet[Vector] = env.readVectorFile(pathToTestingFile)
+ *
+ * val predictionDS: DataSet[LabeledVector] = svm.predict(testingDS)
+ * }}}
+ *
+ * =Parameters=
+ *
+ * - [[org.apache.flink.ml.classification.SVM.Blocks]]:
+ * Sets the number of blocks into which the input data will be split. On each block the local
+ * stochastic dual coordinate ascent method is executed. This number should be set at least to
+ * the degree of parallelism. If no value is specified, then the parallelism of the input
+ * [[DataSet]] is used as the number of blocks. (Default value: '''None''')
+ *
+ * - [[org.apache.flink.ml.classification.SVM.Iterations]]:
+ * Defines the maximum number of iterations of the outer loop method. In other words, it defines
+ * how often the SDCA method is applied to the blocked data. After each iteration, the locally
+ * computed weight vector updates have to be reduced to update the global weight vector value.
+ * The new weight vector is broadcast to all SDCA tasks at the beginning of each iteration.
+ * (Default value: '''10''')
+ *
+ * - [[org.apache.flink.ml.classification.SVM.LocalIterations]]:
+ * Defines the maximum number of SDCA iterations. In other words, it defines how many data points
+ * are drawn from each local data block to calculate the stochastic dual coordinate ascent.
+ * (Default value: '''10''')
+ *
+ * - [[org.apache.flink.ml.classification.SVM.Regularization]]:
+ * Defines the regularization constant of the CoCoA algorithm. The higher the value, the smaller
+ * will the 2-norm of the weight vector be. In case of a SVM with hinge loss this means that the
+ * SVM margin will be wider even though it might contain some false classifications.
+ * (Default value: '''1.0''')
+ *
+ * - [[org.apache.flink.ml.classification.SVM.Stepsize]]:
+ * Defines the initial step size for the updates of the weight vector. The larger the step size
+ * is, the larger will be the contribution of the weight vector updates to the next weight vector
+ * value. The effective scaling of the updates is `stepsize/blocks`. This value has to be tuned
+ * in case that the algorithm becomes instable. (Default value: '''1.0''')
+ *
+ * - [[org.apache.flink.ml.classification.SVM.Seed]]:
+ * Defines the seed to initialize the random number generator. The seed directly controls which
+ * data points are chosen for the SDCA method. (Default value: '''0''')
+ */
+class SVM extends Predictor[SVM] {
+
+ import SVM._
+
+ /** Stores the learned weight vector after the fit operation */
+ var weightsOption: Option[DataSet[BreezeDenseVector[Double]]] = None
+
+ /** Sets the number of data blocks/partitions
+ *
+ * @param blocks
+ * @return itself
+ */
+ def setBlocks(blocks: Int): SVM = {
+ parameters.add(Blocks, blocks)
+ this
+ }
+
+ /** Sets the number of outer iterations
+ *
+ * @param iterations
+ * @return itself
+ */
+ def setIterations(iterations: Int): SVM = {
+ parameters.add(Iterations, iterations)
+ this
+ }
+
+ /** Sets the number of local SDCA iterations
+ *
+ * @param localIterations
+ * @return itselft
+ */
+ def setLocalIterations(localIterations: Int): SVM = {
+ parameters.add(LocalIterations, localIterations)
+ this
+ }
+
+ /** Sets the regularization constant
+ *
+ * @param regularization
+ * @return itself
+ */
+ def setRegularization(regularization: Double): SVM = {
+ parameters.add(Regularization, regularization)
+ this
+ }
+
+ /** Sets the stepsize for the weight vector updates
+ *
+ * @param stepsize
+ * @return itself
+ */
+ def setStepsize(stepsize: Double): SVM = {
+ parameters.add(Stepsize, stepsize)
+ this
+ }
+
+ /** Sets the seed value for the random number generator
+ *
+ * @param seed
+ * @return itself
+ */
+ def setSeed(seed: Long): SVM = {
+ parameters.add(Seed, seed)
+ this
+ }
+}
+
+/** Companion object of SVM. Contains convenience functions and the parameter type definitions
+ * of the algorithm.
+ */
+object SVM{
+ val WEIGHT_VECTOR ="weightVector"
+
+ // ========================================== Parameters =========================================
+
+ case object Blocks extends Parameter[Int] {
+ val defaultValue: Option[Int] = None
+ }
+
+ case object Iterations extends Parameter[Int] {
+ val defaultValue = Some(10)
+ }
+
+ case object LocalIterations extends Parameter[Int] {
+ val defaultValue = Some(10)
+ }
+
+ case object Regularization extends Parameter[Double] {
+ val defaultValue = Some(1.0)
+ }
+
+ case object Stepsize extends Parameter[Double] {
+ val defaultValue = Some(1.0)
+ }
+
+ case object Seed extends Parameter[Long] {
+ val defaultValue = Some(0L)
+ }
+
+ // ========================================== Factory methods ====================================
+
+ def apply(): SVM = {
+ new SVM()
+ }
+
+ // ========================================== Operations =========================================
+
+ /** [[org.apache.flink.ml.pipeline.PredictOperation]] for vector types. The result type is a
+ * [[LabeledVector]]
+ *
+ * @tparam T Subtype of [[Vector]]
+ * @return
+ */
+ implicit def predictValues[T <: Vector] = {
+ new PredictOperation[SVM, T, LabeledVector]{
+ override def predict(
+ instance: SVM,
+ predictParameters: ParameterMap,
+ input: DataSet[T])
+ : DataSet[LabeledVector] = {
+
+ instance.weightsOption match {
+ case Some(weights) => {
+ input.map(new PredictionMapper[T]).withBroadcastSet(weights, WEIGHT_VECTOR)
+ }
+
+ case None => {
+ throw new RuntimeException("The SVM model has not been trained. Call first fit" +
+ "before calling the predict operation.")
+ }
+ }
+ }
+ }
+ }
+
+ /** Mapper to calculate the value of the prediction function. This is a RichMapFunction, because
+ * we broadcast the weight vector to all mappers.
+ */
+ class PredictionMapper[T <: Vector] extends RichMapFunction[T, LabeledVector] {
+
+ var weights: BreezeDenseVector[Double] = _
+
+ @throws(classOf[Exception])
+ override def open(configuration: Configuration): Unit = {
+ // get current weights
+ weights = getRuntimeContext.
+ getBroadcastVariable[BreezeDenseVector[Double]](WEIGHT_VECTOR).get(0)
+ }
+
+ override def map(vector: T): LabeledVector = {
+ // calculate the prediction value (scaled distance from the separating hyperplane)
+ val dotProduct = weights dot vector.asBreeze
+
+ LabeledVector(dotProduct, vector)
+ }
+ }
+
+ /** [[FitOperation]] which trains a SVM with soft-margin based on the given training data set.
+ *
+ */
+ implicit val fitSVM = {
+ new FitOperation[SVM, LabeledVector] {
+ override def fit(
+ instance: SVM,
+ fitParameters: ParameterMap,
+ input: DataSet[LabeledVector])
+ : Unit = {
+ val resultingParameters = instance.parameters ++ fitParameters
+
+ // Check if the number of blocks/partitions has been specified
+ val blocks = resultingParameters.get(Blocks) match {
+ case Some(value) => value
+ case None => input.getParallelism
+ }
+
+ val scaling = resultingParameters(Stepsize)/blocks
+ val iterations = resultingParameters(Iterations)
+ val localIterations = resultingParameters(LocalIterations)
+ val regularization = resultingParameters(Regularization)
+ val seed = resultingParameters(Seed)
+
+ // Obtain DataSet with the dimension of the data points
+ val dimension = input.map{_.vector.size}.reduce{
+ (a, b) => {
+ require(a == b, "Dimensions of feature vectors have to be equal.")
+ a
+ }
+ }
+
+ val initialWeights = createInitialWeights(dimension)
+
+ // Count the number of vectors, but keep the value in a DataSet to broadcast it later
+ // TODO: Once efficient count and intermediate result partitions are implemented, use count
+ val numberVectors = input map { x => 1 } reduce { _ + _ }
+
+ // Group the input data into blocks in round robin fashion
+ val blockedInputNumberElements = FlinkMLTools.block(
+ input,
+ blocks,
+ Some(ModuloKeyPartitioner)).
+ cross(numberVectors).
+ map { x => x }
+
+ val resultingWeights = initialWeights.iterate(iterations) {
+ weights => {
+ // compute the local SDCA to obtain the weight vector updates
+ val deltaWs = localDualMethod(
+ weights,
+ blockedInputNumberElements,
+ localIterations,
+ regularization,
+ scaling,
+ seed
+ )
+
+ // scale the weight vectors
+ val weightedDeltaWs = deltaWs map {
+ deltaW => {
+ deltaW :*= scaling
+ }
+ }
+
+ // calculate the new weight vector by adding the weight vector updates to the weight
+ // vector value
+ weights.union(weightedDeltaWs).reduce { _ + _ }
+ }
+ }
+
+ // Store the learned weight vector in hte given instance
+ instance.weightsOption = Some(resultingWeights)
+ }
+ }
+ }
+
+ /** Creates a zero vector of length dimension
+ *
+ * @param dimension [[DataSet]] containing the dimension of the initial weight vector
+ * @return Zero vector of length dimension
+ */
+ private def createInitialWeights(dimension: DataSet[Int]): DataSet[BreezeDenseVector[Double]] = {
+ dimension.map {
+ d => BreezeDenseVector.zeros[Double](d)
+ }
+ }
+
+ /** Computes the local SDCA on the individual data blocks/partitions
+ *
+ * @param w Current weight vector
+ * @param blockedInputNumberElements Blocked/Partitioned input data
+ * @param localIterations Number of local SDCA iterations
+ * @param regularization Regularization constant
+ * @param scaling Scaling value for new weight vector updates
+ * @param seed Random number generator seed
+ * @return [[DataSet]] of weight vector updates. The weight vector updates are double arrays
+ */
+ private def localDualMethod(
+ w: DataSet[BreezeDenseVector[Double]],
+ blockedInputNumberElements: DataSet[(Block[LabeledVector], Int)],
+ localIterations: Int,
+ regularization: Double,
+ scaling: Double,
+ seed: Long)
+ : DataSet[BreezeDenseVector[Double]] = {
+ /*
+ Rich mapper calculating for each data block the local SDCA. We use a RichMapFunction here,
+ because we broadcast the current value of the weight vector to all mappers.
+ */
+ val localSDCA = new RichMapFunction[(Block[LabeledVector], Int), BreezeDenseVector[Double]] {
+ var originalW: BreezeDenseVector[Double] = _
+ // we keep the alphas across the outer loop iterations
+ val alphasArray = ArrayBuffer[BreezeDenseVector[Double]]()
+ // there might be several data blocks in one Flink partition, therefore store mapping
+ val idMapping = scala.collection.mutable.HashMap[Int, Int]()
+ var counter = 0
+
+ var r: Random = _
+
+ override def open(parameters: Configuration): Unit = {
+ originalW = getRuntimeContext.getBroadcastVariable(WEIGHT_VECTOR).get(0)
+
+ if(r == null){
+ r = new Random(seed ^ getRuntimeContext.getIndexOfThisSubtask)
+ }
+ }
+
+ override def map(blockNumberElements: (Block[LabeledVector], Int))
+ : BreezeDenseVector[Double] = {
+ val (block, numberElements) = blockNumberElements
+
+ // check if we already processed a data block with the corresponding block index
+ val localIndex = idMapping.get(block.index) match {
+ case Some(idx) => idx
+ case None =>
+ idMapping += (block.index -> counter)
+ counter += 1
+
+ alphasArray += BreezeDenseVector.zeros[Double](block.values.length)
+
+ counter - 1
+ }
+
+ // create temporary alpha array for the local SDCA iterations
+ val tempAlphas = alphasArray(localIndex).copy
+
+ val numLocalDatapoints = tempAlphas.length
+ val deltaAlphas = BreezeDenseVector.zeros[Double](numLocalDatapoints)
+
+ val w = originalW.copy
+
+ val deltaW = BreezeDenseVector.zeros[Double](originalW.length)
+
+ for(i <- 1 to localIterations) {
+ // pick random data point for SDCA
+ val idx = r.nextInt(numLocalDatapoints)
+
+ val LabeledVector(label, vector) = block.values(idx)
+ val alpha = tempAlphas(idx)
+
+ // maximize the dual problem and retrieve alpha and weight vector updates
+ val (deltaAlpha, deltaWUpdate) = maximize(
+ vector.asBreeze,
+ label,
+ regularization,
+ alpha,
+ w,
+ numberElements)
+
+ // update alpha values
+ tempAlphas(idx) += deltaAlpha
+ deltaAlphas(idx) += deltaAlpha
+
+ // deltaWUpdate is already scaled with 1/lambda/n
+ w += deltaWUpdate
+ deltaW += deltaWUpdate
+ }
+
+ // update local alpha values
+ alphasArray(localIndex) += deltaAlphas * scaling
+
+ deltaW
+ }
+ }
+
+ blockedInputNumberElements.map(localSDCA).withBroadcastSet(w, WEIGHT_VECTOR)
+ }
+
+ /** Maximizes the dual problem using hinge loss functions. It returns the alpha and weight
+ * vector updates.
+ *
+ * @param x Selected data point
+ * @param y Label of selected data point
+ * @param regularization Regularization constant
+ * @param alpha Alpha value of selected data point
+ * @param w Current weight vector value
+ * @param numberElements Number of elements in the training data set
+ * @return Alpha and weight vector updates
+ */
+ private def maximize(
+ x: BreezeVector[Double],
+ y: Double, regularization: Double,
+ alpha: Double,
+ w: BreezeVector[Double],
+ numberElements: Int)
+ : (Double, BreezeVector[Double]) = {
+ // compute hinge loss gradient
+ val dotProduct = x dot w
+ val grad = (y * dotProduct - 1.0) * (regularization * numberElements)
+
+ // compute projected gradient
+ var proj_grad = if(alpha <= 0.0){
+ Math.min(grad, 0)
+ } else if(alpha >= 1.0) {
+ Math.max(grad, 0)
+ } else {
+ grad
+ }
+
+ if(Math.abs(grad) != 0.0){
+ val qii = x dot x
+ val newAlpha = if(qii != 0.0){
+ Math.min(Math.max((alpha - (grad / qii)), 0.0), 1.0)
+ } else {
+ 1.0
+ }
+
+ val deltaW = x * y * (newAlpha - alpha) / (regularization * numberElements)
+
+ (newAlpha - alpha, deltaW)
+ } else {
+ (0.0 , BreezeVector.zeros(w.length))
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/995f8f96/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/classification/CoCoAITSuite.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/classification/CoCoAITSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/classification/CoCoAITSuite.scala
deleted file mode 100644
index a5e7496..0000000
--- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/classification/CoCoAITSuite.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.ml.classification
-
-import org.scalatest.{FlatSpec, Matchers}
-
-import org.apache.flink.api.scala._
-import org.apache.flink.test.util.FlinkTestBase
-
-class CoCoAITSuite extends FlatSpec with Matchers with FlinkTestBase {
-
- behavior of "The CoCoA implementation"
-
- it should "train a SVM" in {
- val env = ExecutionEnvironment.getExecutionEnvironment
-
- val cocoa = CoCoA().
- setBlocks(env.getParallelism).
- setIterations(100).
- setLocalIterations(100).
- setRegularization(0.002).
- setStepsize(0.1).
- setSeed(0)
-
- val trainingDS = env.fromCollection(Classification.trainingData)
-
- cocoa.fit(trainingDS)
-
- val weightVector = cocoa.weightsOption.get.collect().apply(0)
-
- weightVector.valuesIterator.zip(Classification.expectedWeightVector.valueIterator).foreach {
- case (weight, expectedWeight) =>
- weight should be(expectedWeight +- 0.1)
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/995f8f96/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/classification/SVMITSuite.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/classification/SVMITSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/classification/SVMITSuite.scala
new file mode 100644
index 0000000..cdb4ffc
--- /dev/null
+++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/classification/SVMITSuite.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.classification
+
+import org.scalatest.{FlatSpec, Matchers}
+
+import org.apache.flink.api.scala._
+import org.apache.flink.test.util.FlinkTestBase
+
+class SVMITSuite extends FlatSpec with Matchers with FlinkTestBase {
+
+ behavior of "The SVM using CoCoA implementation"
+
+ it should "train a SVM" in {
+ val env = ExecutionEnvironment.getExecutionEnvironment
+
+ val cocoa = SVM().
+ setBlocks(env.getParallelism).
+ setIterations(100).
+ setLocalIterations(100).
+ setRegularization(0.002).
+ setStepsize(0.1).
+ setSeed(0)
+
+ val trainingDS = env.fromCollection(Classification.trainingData)
+
+ cocoa.fit(trainingDS)
+
+ val weightVector = cocoa.weightsOption.get.collect().apply(0)
+
+ weightVector.valuesIterator.zip(Classification.expectedWeightVector.valueIterator).foreach {
+ case (weight, expectedWeight) =>
+ weight should be(expectedWeight +- 0.1)
+ }
+ }
+}