You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2015/05/28 16:37:54 UTC

flink git commit: [FLINK-2047] [ml] Renaming CoCoA to SVM

Repository: flink
Updated Branches:
  refs/heads/master 41181603d -> 995f8f969


[FLINK-2047] [ml] Renaming CoCoA to SVM

This closes #733.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/995f8f96
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/995f8f96
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/995f8f96

Branch: refs/heads/master
Commit: 995f8f9693c4fe2e40efcf6a82c10ccab11c37ba
Parents: 4118160
Author: Theodore Vasiloudis <tv...@sics.se>
Authored: Wed May 27 12:00:17 2015 +0200
Committer: Till Rohrmann <tr...@apache.org>
Committed: Thu May 28 16:37:27 2015 +0200

----------------------------------------------------------------------
 docs/libs/ml/cocoa.md                           | 182 -------
 docs/libs/ml/index.md                           |   2 +-
 docs/libs/ml/svm.md                             | 182 +++++++
 .../apache/flink/ml/classification/CoCoA.scala  | 515 -------------------
 .../apache/flink/ml/classification/SVM.scala    | 515 +++++++++++++++++++
 .../flink/ml/classification/CoCoAITSuite.scala  |  52 --
 .../flink/ml/classification/SVMITSuite.scala    |  52 ++
 7 files changed, 750 insertions(+), 750 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/995f8f96/docs/libs/ml/cocoa.md
----------------------------------------------------------------------
diff --git a/docs/libs/ml/cocoa.md b/docs/libs/ml/cocoa.md
deleted file mode 100644
index 9297ce3..0000000
--- a/docs/libs/ml/cocoa.md
+++ /dev/null
@@ -1,182 +0,0 @@
----
-mathjax: include
-htmlTitle: FlinkML - Communication efficient distributed dual coordinate ascent (CoCoA)
-title: <a href="../ml">FlinkML</a> - Communication efficient distributed dual coordinate ascent (CoCoA)
----
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-
-* This will be replaced by the TOC
-{:toc}
-
-## Description
-
-Implements the communication-efficient distributed dual coordinate ascent algorithm with hinge-loss function. 
-The algorithm can be used to train a SVM with soft-margin.
-The algorithm solves the following minimization problem:
-  
-$$\min_{\mathbf{w} \in \mathbb{R}^d} \frac{\lambda}{2} \left\lVert \mathbf{w} \right\rVert^2 + \frac{1}{n} \sum_{i=1}^n l_{i}\left(\mathbf{w}^T\mathbf{x}_i\right)$$
- 
-with $\mathbf{w}$ being the weight vector, $\lambda$ being the regularization constant, 
-$$\mathbf{x}_i \in \mathbb{R}^d$$ being the data points and $$l_{i}$$ being the convex loss 
-functions, which can also depend on the labels $$y_{i} \in \mathbb{R}$$.
-In the current implementation the regularizer is the $\ell_2$-norm and the loss functions are the hinge-loss functions:
-  
-  $$l_{i} = \max\left(0, 1 - y_{i} \mathbf{w}^T\mathbf{x}_i \right)$$
-
-With these choices, the problem definition is equivalent to a SVM with soft-margin.
-Thus, the algorithm allows us to train a SVM with soft-margin.
-
-The minimization problem is solved by applying stochastic dual coordinate ascent (SDCA).
-In order to make the algorithm efficient in a distributed setting, the CoCoA algorithm calculates 
-several iterations of SDCA locally on a data block before merging the local updates into a
-valid global state.
-This state is redistributed to the different data partitions where the next round of local SDCA 
-iterations is then executed.
-The number of outer iterations and local SDCA iterations control the overall network costs, because 
-there is only network communication required for each outer iteration.
-The local SDCA iterations are embarrassingly parallel once the individual data partitions have been 
-distributed across the cluster.
-
-The implementation of this algorithm is based on the work of 
-[Jaggi et al.](http://arxiv.org/abs/1409.1458)
-
-## Operations
-
-`CoCoA` is a `Predictor`.
-As such, it supports the `fit` and `predict` operation.
-
-### Fit
-
-CoCoA is trained given a set of `LabeledVector`: 
-
-* `fit: DataSet[LabeledVector] => Unit`
-
-### Predict
-
-CoCoA predicts for all subtypes of `Vector` the corresponding class label: 
-
-* `predict[T <: Vector]: DataSet[T] => DataSet[LabeledVector]`
-
-## Parameters
-
-The CoCoA implementation can be controlled by the following parameters:
-
-   <table class="table table-bordered">
-    <thead>
-      <tr>
-        <th class="text-left" style="width: 20%">Parameters</th>
-        <th class="text-center">Description</th>
-      </tr>
-    </thead>
-
-    <tbody>
-      <tr>
-        <td><strong>Blocks</strong></td>
-        <td>
-          <p>
-            Sets the number of blocks into which the input data will be split. 
-            On each block the local stochastic dual coordinate ascent method is executed. 
-            This number should be set at least to the degree of parallelism. 
-            If no value is specified, then the parallelism of the input DataSet is used as the number of blocks. 
-            (Default value: <strong>None</strong>)
-          </p>
-        </td>
-      </tr>
-      <tr>
-        <td><strong>Iterations</strong></td>
-        <td>
-          <p>
-            Defines the maximum number of iterations of the outer loop method. 
-            In other words, it defines how often the SDCA method is applied to the blocked data. 
-            After each iteration, the locally computed weight vector updates have to be reduced to update the global weight vector value.
-            The new weight vector is broadcast to all SDCA tasks at the beginning of each iteration.
-            (Default value: <strong>10</strong>)
-          </p>
-        </td>
-      </tr>
-      <tr>
-        <td><strong>LocalIterations</strong></td>
-        <td>
-          <p>
-            Defines the maximum number of SDCA iterations. 
-            In other words, it defines how many data points are drawn from each local data block to calculate the stochastic dual coordinate ascent.
-            (Default value: <strong>10</strong>)
-          </p>
-        </td>
-      </tr>
-      <tr>
-        <td><strong>Regularization</strong></td>
-        <td>
-          <p>
-            Defines the regularization constant of the CoCoA algorithm. 
-            The higher the value, the smaller will the 2-norm of the weight vector be. 
-            In case of a SVM with hinge loss this means that the SVM margin will be wider even though it might contain some false classifications.
-            (Default value: <strong>1.0</strong>)
-          </p>
-        </td>
-      </tr>
-      <tr>
-        <td><strong>Stepsize</strong></td>
-        <td>
-          <p>
-            Defines the initial step size for the updates of the weight vector. 
-            The larger the step size is, the larger will be the contribution of the weight vector updates to the next weight vector value. 
-            The effective scaling of the updates is $\frac{stepsize}{blocks}$.
-            This value has to be tuned in case that the algorithm becomes instable. 
-            (Default value: <strong>1.0</strong>)
-          </p>
-        </td>
-      </tr>
-      <tr>
-        <td><strong>Seed</strong></td>
-        <td>
-          <p>
-            Defines the seed to initialize the random number generator. 
-            The seed directly controls which data points are chosen for the SDCA method. 
-            (Default value: <strong>0</strong>)
-          </p>
-        </td>
-      </tr>
-    </tbody>
-  </table>
-
-## Examples
-
-{% highlight scala %}
-// Read the training data set
-val trainingDS: DataSet[LabeledVector] = env.readSVMFile(pathToTrainingFile)
-
-// Create the CoCoA learner
-val svm = CoCoA()
-.setBlocks(10)
-.setIterations(10)
-.setLocalIterations(10)
-.setRegularization(0.5)
-.setStepsize(0.5)
-
-// Learn the SVM model
-svm.fit(trainingDS)
-
-// Read the testing data set
-val testingDS: DataSet[Vector] = env.readVectorFile(pathToTestingFile)
-
-// Calculate the predictions for the testing data set
-val predictionDS: DataSet[LabeledVector] = svm.predict(testingDS)
-{% endhighlight %}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/995f8f96/docs/libs/ml/index.md
----------------------------------------------------------------------
diff --git a/docs/libs/ml/index.md b/docs/libs/ml/index.md
index d172bda..f61480f 100644
--- a/docs/libs/ml/index.md
+++ b/docs/libs/ml/index.md
@@ -35,7 +35,7 @@ FlinkML currently supports the following algorithms:
 
 ### Supervised Learning
 
-* [Communication efficient distributed dual coordinate ascent (CoCoA)](cocoa.html)
+* [SVM using Communication efficient distributed dual coordinate ascent (CoCoA)](svm.html)
 * [Multiple linear regression](multiple_linear_regression.html)
 * [Optimization Framework](optimization.html)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/995f8f96/docs/libs/ml/svm.md
----------------------------------------------------------------------
diff --git a/docs/libs/ml/svm.md b/docs/libs/ml/svm.md
new file mode 100644
index 0000000..ec5cc6a
--- /dev/null
+++ b/docs/libs/ml/svm.md
@@ -0,0 +1,182 @@
+---
+mathjax: include
+htmlTitle: FlinkML - SVM using CoCoA
+title: <a href="../ml">FlinkML</a> - SVM using CoCoA
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+* This will be replaced by the TOC
+{:toc}
+
+## Description
+
+Implements an SVM with soft-margin using the communication-efficient distributed dual coordinate 
+ascent algorithm with hinge-loss function. 
+The algorithm solves the following minimization problem:
+  
+$$\min_{\mathbf{w} \in \mathbb{R}^d} \frac{\lambda}{2} \left\lVert \mathbf{w} \right\rVert^2 + \frac{1}{n} \sum_{i=1}^n l_{i}\left(\mathbf{w}^T\mathbf{x}_i\right)$$
+ 
+with $\mathbf{w}$ being the weight vector, $\lambda$ being the regularization constant, 
+$$\mathbf{x}_i \in \mathbb{R}^d$$ being the data points and $$l_{i}$$ being the convex loss 
+functions, which can also depend on the labels $$y_{i} \in \mathbb{R}$$.
+In the current implementation the regularizer is the $\ell_2$-norm and the loss functions are the hinge-loss functions:
+  
+  $$l_{i} = \max\left(0, 1 - y_{i} \mathbf{w}^T\mathbf{x}_i \right)$$
+
+With these choices, the problem definition is equivalent to a SVM with soft-margin.
+Thus, the algorithm allows us to train a SVM with soft-margin.
+
+The minimization problem is solved by applying stochastic dual coordinate ascent (SDCA).
+In order to make the algorithm efficient in a distributed setting, the CoCoA algorithm calculates 
+several iterations of SDCA locally on a data block before merging the local updates into a
+valid global state.
+This state is redistributed to the different data partitions where the next round of local SDCA 
+iterations is then executed.
+The number of outer iterations and local SDCA iterations control the overall network costs, because 
+there is only network communication required for each outer iteration.
+The local SDCA iterations are embarrassingly parallel once the individual data partitions have been 
+distributed across the cluster.
+
+The implementation of this algorithm is based on the work of 
+[Jaggi et al.](http://arxiv.org/abs/1409.1458)
+
+## Operations
+
+`CoCoA` is a `Predictor`.
+As such, it supports the `fit` and `predict` operation.
+
+### Fit
+
+CoCoA is trained given a set of `LabeledVector`: 
+
+* `fit: DataSet[LabeledVector] => Unit`
+
+### Predict
+
+CoCoA predicts for all subtypes of `Vector` the corresponding class label: 
+
+* `predict[T <: Vector]: DataSet[T] => DataSet[LabeledVector]`
+
+## Parameters
+
+The SVM implementation can be controlled by the following parameters:
+
+   <table class="table table-bordered">
+    <thead>
+      <tr>
+        <th class="text-left" style="width: 20%">Parameters</th>
+        <th class="text-center">Description</th>
+      </tr>
+    </thead>
+
+    <tbody>
+      <tr>
+        <td><strong>Blocks</strong></td>
+        <td>
+          <p>
+            Sets the number of blocks into which the input data will be split. 
+            On each block the local stochastic dual coordinate ascent method is executed. 
+            This number should be set at least to the degree of parallelism. 
+            If no value is specified, then the parallelism of the input DataSet is used as the number of blocks. 
+            (Default value: <strong>None</strong>)
+          </p>
+        </td>
+      </tr>
+      <tr>
+        <td><strong>Iterations</strong></td>
+        <td>
+          <p>
+            Defines the maximum number of iterations of the outer loop method. 
+            In other words, it defines how often the SDCA method is applied to the blocked data. 
+            After each iteration, the locally computed weight vector updates have to be reduced to update the global weight vector value.
+            The new weight vector is broadcast to all SDCA tasks at the beginning of each iteration.
+            (Default value: <strong>10</strong>)
+          </p>
+        </td>
+      </tr>
+      <tr>
+        <td><strong>LocalIterations</strong></td>
+        <td>
+          <p>
+            Defines the maximum number of SDCA iterations. 
+            In other words, it defines how many data points are drawn from each local data block to calculate the stochastic dual coordinate ascent.
+            (Default value: <strong>10</strong>)
+          </p>
+        </td>
+      </tr>
+      <tr>
+        <td><strong>Regularization</strong></td>
+        <td>
+          <p>
+            Defines the regularization constant of the CoCoA algorithm. 
+            The higher the value, the smaller will the 2-norm of the weight vector be. 
+            In case of a SVM with hinge loss this means that the SVM margin will be wider even though it might contain some false classifications.
+            (Default value: <strong>1.0</strong>)
+          </p>
+        </td>
+      </tr>
+      <tr>
+        <td><strong>Stepsize</strong></td>
+        <td>
+          <p>
+            Defines the initial step size for the updates of the weight vector. 
+            The larger the step size is, the larger will be the contribution of the weight vector updates to the next weight vector value. 
+            The effective scaling of the updates is $\frac{stepsize}{blocks}$.
+            This value has to be tuned in case that the algorithm becomes unstable. 
+            (Default value: <strong>1.0</strong>)
+          </p>
+        </td>
+      </tr>
+      <tr>
+        <td><strong>Seed</strong></td>
+        <td>
+          <p>
+            Defines the seed to initialize the random number generator. 
+            The seed directly controls which data points are chosen for the SDCA method. 
+            (Default value: <strong>0</strong>)
+          </p>
+        </td>
+      </tr>
+    </tbody>
+  </table>
+
+## Examples
+
+{% highlight scala %}
+// Read the training data set, from a LibSVM formatted file
+val trainingDS: DataSet[LabeledVector] = env.readLibSVM(pathToTrainingFile)
+
+// Create the SVM learner
+val svm = SVM()
+.setBlocks(10)
+.setIterations(10)
+.setLocalIterations(10)
+.setRegularization(0.5)
+.setStepsize(0.5)
+
+// Learn the SVM model
+svm.fit(trainingDS)
+
+// Read the testing data set
+val testingDS: DataSet[Vector] = env.readVectorFile(pathToTestingFile)
+
+// Calculate the predictions for the testing data set
+val predictionDS: DataSet[LabeledVector] = svm.predict(testingDS)
+{% endhighlight %}

http://git-wip-us.apache.org/repos/asf/flink/blob/995f8f96/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/CoCoA.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/CoCoA.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/CoCoA.scala
deleted file mode 100644
index fea6be5..0000000
--- a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/CoCoA.scala
+++ /dev/null
@@ -1,515 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.ml.classification
-
-import org.apache.flink.ml.pipeline.{FitOperation, PredictOperation, Predictor}
-
-import scala.collection.mutable.ArrayBuffer
-import scala.util.Random
-
-import org.apache.flink.api.common.functions.RichMapFunction
-import org.apache.flink.api.scala._
-import org.apache.flink.configuration.Configuration
-import org.apache.flink.ml.common.FlinkMLTools.ModuloKeyPartitioner
-import org.apache.flink.ml.common._
-import org.apache.flink.ml.math.Vector
-import org.apache.flink.ml.math.Breeze._
-
-import breeze.linalg.{Vector => BreezeVector, DenseVector => BreezeDenseVector}
-
-/** Implements the communication-efficient distributed dual coordinate ascent algorithm with
-  * hinge-loss function. The algorithm can be used to train a SVM with soft-margin.
-  *
-  * The algorithm solves the following minimization problem:
-  *
-  * `min_{w in bbb"R"^d} lambda/2 ||w||^2 + 1/n sum_(i=1)^n l_{i}(w^Tx_i)`
-  *
-  * with `w` being the weight vector, `lambda` being the regularization constant,
-  * `x_{i} in bbb"R"^d` being the data points and `l_{i}` being the convex loss functions, which
-  * can also depend on the labels `y_{i} in bbb"R"`.
-  * In the current implementation the regularizer is the 2-norm and the loss functions are the
-  * hinge-loss functions:
-  *
-  * `l_{i} = max(0, 1 - y_{i} * w^Tx_i`
-  *
-  * With these choices, the problem definition is equivalent to a SVM with soft-margin.
-  * Thus, the algorithm allows us to train a SVM with soft-margin.
-  *
-  * The minimization problem is solved by applying stochastic dual coordinate ascent (SDCA).
-  * In order to make the algorithm efficient in a distributed setting, the CoCoA algorithm
-  * calculates several iterations of SDCA locally on a data block before merging the local
-  * updates into a valid global state.
-  * This state is redistributed to the different data partitions where the next round of local
-  * SDCA iterations is then executed.
-  * The number of outer iterations and local SDCA iterations control the overall network costs,
-  * because there is only network communication required for each outer iteration.
-  * The local SDCA iterations are embarrassingly parallel once the individual data partitions have
-  * been distributed across the cluster.
-  *
-  * Further details of the algorithm can be found [[http://arxiv.org/abs/1409.1458 here]].
-  *
-  * @example
-  *          {{{
-  *             val trainingDS: DataSet[LabeledVector] = env.readSVMFile(pathToTrainingFile)
-  *
-  *             val svm = CoCoA()
-  *               .setBlocks(10)
-  *               .setIterations(10)
-  *               .setLocalIterations(10)
-  *               .setRegularization(0.5)
-  *               .setStepsize(0.5)
-  *
-  *             svm.fit(trainingDS)
-  *
-  *             val testingDS: DataSet[Vector] = env.readVectorFile(pathToTestingFile)
-  *
-  *             val predictionDS: DataSet[LabeledVector] = svm.predict(testingDS)
-  *          }}}
-  *
-  * =Parameters=
-  *
-  *  - [[org.apache.flink.ml.classification.CoCoA.Blocks]]:
-  *  Sets the number of blocks into which the input data will be split. On each block the local
-  *  stochastic dual coordinate ascent method is executed. This number should be set at least to
-  *  the degree of parallelism. If no value is specified, then the parallelism of the input
-  *  [[DataSet]] is used as the number of blocks. (Default value: '''None''')
-  *
-  *  - [[org.apache.flink.ml.classification.CoCoA.Iterations]]:
-  *  Defines the maximum number of iterations of the outer loop method. In other words, it defines
-  *  how often the SDCA method is applied to the blocked data. After each iteration, the locally
-  *  computed weight vector updates have to be reduced to update the global weight vector value.
-  *  The new weight vector is broadcast to all SDCA tasks at the beginning of each iteration.
-  *  (Default value: '''10''')
-  *
-  *  - [[org.apache.flink.ml.classification.CoCoA.LocalIterations]]:
-  *  Defines the maximum number of SDCA iterations. In other words, it defines how many data points
-  *  are drawn from each local data block to calculate the stochastic dual coordinate ascent.
-  *  (Default value: '''10''')
-  *
-  *  - [[org.apache.flink.ml.classification.CoCoA.Regularization]]:
-  *  Defines the regularization constant of the CoCoA algorithm. The higher the value, the smaller
-  *  will the 2-norm of the weight vector be. In case of a SVM with hinge loss this means that the
-  *  SVM margin will be wider even though it might contain some false classifications.
-  *  (Default value: '''1.0''')
-  *
-  *  - [[org.apache.flink.ml.classification.CoCoA.Stepsize]]:
-  *  Defines the initial step size for the updates of the weight vector. The larger the step size
-  *  is, the larger will be the contribution of the weight vector updates to the next weight vector
-  *  value. The effective scaling of the updates is `stepsize/blocks`. This value has to be tuned
-  *  in case that the algorithm becomes instable. (Default value: '''1.0''')
-  *
-  *  - [[org.apache.flink.ml.classification.CoCoA.Seed]]:
-  *  Defines the seed to initialize the random number generator. The seed directly controls which
-  *  data points are chosen for the SDCA method. (Default value: '''0''')
-  */
-class CoCoA extends Predictor[CoCoA] {
-
-  import CoCoA._
-
-  /** Stores the learned weight vector after the fit operation */
-  var weightsOption: Option[DataSet[BreezeDenseVector[Double]]] = None
-
-  /** Sets the number of data blocks/partitions
-    *
-    * @param blocks
-    * @return itself
-    */
-  def setBlocks(blocks: Int): CoCoA = {
-    parameters.add(Blocks, blocks)
-    this
-  }
-
-  /** Sets the number of outer iterations
-    *
-    * @param iterations
-    * @return itself
-    */
-  def setIterations(iterations: Int): CoCoA = {
-    parameters.add(Iterations, iterations)
-    this
-  }
-
-  /** Sets the number of local SDCA iterations
-    *
-    * @param localIterations
-    * @return itselft
-    */
-  def setLocalIterations(localIterations: Int): CoCoA =  {
-    parameters.add(LocalIterations, localIterations)
-    this
-  }
-
-  /** Sets the regularization constant
-    *
-    * @param regularization
-    * @return itself
-    */
-  def setRegularization(regularization: Double): CoCoA = {
-    parameters.add(Regularization, regularization)
-    this
-  }
-
-  /** Sets the stepsize for the weight vector updates
-    *
-    * @param stepsize
-    * @return itself
-    */
-  def setStepsize(stepsize: Double): CoCoA = {
-    parameters.add(Stepsize, stepsize)
-    this
-  }
-
-  /** Sets the seed value for the random number generator
-    *
-    * @param seed
-    * @return itself
-    */
-  def setSeed(seed: Long): CoCoA = {
-    parameters.add(Seed, seed)
-    this
-  }
-}
-
-/** Companion object of CoCoA. Contains convenience functions and the parameter type definitions
-  * of the algorithm.
-  */
-object CoCoA{
-  val WEIGHT_VECTOR ="weightVector"
-
-  // ========================================== Parameters =========================================
-
-  case object Blocks extends Parameter[Int] {
-    val defaultValue: Option[Int] = None
-  }
-
-  case object Iterations extends Parameter[Int] {
-    val defaultValue = Some(10)
-  }
-
-  case object LocalIterations extends Parameter[Int] {
-    val defaultValue = Some(10)
-  }
-
-  case object Regularization extends Parameter[Double] {
-    val defaultValue = Some(1.0)
-  }
-
-  case object Stepsize extends Parameter[Double] {
-    val defaultValue = Some(1.0)
-  }
-
-  case object Seed extends Parameter[Long] {
-    val defaultValue = Some(0L)
-  }
-
-  // ========================================== Factory methods ====================================
-
-  def apply(): CoCoA = {
-    new CoCoA()
-  }
-
-  // ========================================== Operations =========================================
-
-  /** [[org.apache.flink.ml.pipeline.PredictOperation]] for vector types. The result type is a
-    * [[LabeledVector]]
-    *
-    * @tparam T Subtype of [[Vector]]
-    * @return
-    */
-  implicit def predictValues[T <: Vector] = {
-    new PredictOperation[CoCoA, T, LabeledVector]{
-      override def predict(
-          instance: CoCoA,
-          predictParameters: ParameterMap,
-          input: DataSet[T])
-        : DataSet[LabeledVector] = {
-
-        instance.weightsOption match {
-          case Some(weights) => {
-            input.map(new PredictionMapper[T]).withBroadcastSet(weights, WEIGHT_VECTOR)
-          }
-
-          case None => {
-            throw new RuntimeException("The CoCoA model has not been trained. Call first fit" +
-              "before calling the predict operation.")
-          }
-        }
-      }
-    }
-  }
-
-  /** Mapper to calculate the value of the prediction function. This is a RichMapFunction, because
-    * we broadcast the weight vector to all mappers.
-    */
-  class PredictionMapper[T <: Vector] extends RichMapFunction[T, LabeledVector] {
-
-    var weights: BreezeDenseVector[Double] = _
-
-    @throws(classOf[Exception])
-    override def open(configuration: Configuration): Unit = {
-      // get current weights
-      weights = getRuntimeContext.
-        getBroadcastVariable[BreezeDenseVector[Double]](WEIGHT_VECTOR).get(0)
-    }
-
-    override def map(vector: T): LabeledVector = {
-      // calculate the prediction value (scaled distance from the separating hyperplane)
-      val dotProduct = weights dot vector.asBreeze
-
-      LabeledVector(dotProduct, vector)
-    }
-  }
-
-  /** [[FitOperation]] which trains a SVM with soft-margin based on the given training data set.
-    *
-    */
-  implicit val fitCoCoA = {
-    new FitOperation[CoCoA, LabeledVector] {
-      override def fit(
-          instance: CoCoA,
-          fitParameters: ParameterMap,
-          input: DataSet[LabeledVector])
-        : Unit = {
-        val resultingParameters = instance.parameters ++ fitParameters
-
-        // Check if the number of blocks/partitions has been specified
-        val blocks = resultingParameters.get(Blocks) match {
-          case Some(value) => value
-          case None => input.getParallelism
-        }
-
-        val scaling = resultingParameters(Stepsize)/blocks
-        val iterations = resultingParameters(Iterations)
-        val localIterations = resultingParameters(LocalIterations)
-        val regularization = resultingParameters(Regularization)
-        val seed = resultingParameters(Seed)
-
-        // Obtain DataSet with the dimension of the data points
-        val dimension = input.map{_.vector.size}.reduce{
-          (a, b) => {
-            require(a == b, "Dimensions of feature vectors have to be equal.")
-            a
-          }
-        }
-
-        val initialWeights = createInitialWeights(dimension)
-
-        // Count the number of vectors, but keep the value in a DataSet to broadcast it later
-        // TODO: Once efficient count and intermediate result partitions are implemented, use count
-        val numberVectors = input map { x => 1 } reduce { _ + _ }
-
-        // Group the input data into blocks in round robin fashion
-        val blockedInputNumberElements = FlinkMLTools.block(
-          input,
-          blocks,
-          Some(ModuloKeyPartitioner)).
-          cross(numberVectors).
-          map { x => x }
-
-        val resultingWeights = initialWeights.iterate(iterations) {
-          weights => {
-            // compute the local SDCA to obtain the weight vector updates
-            val deltaWs = localDualMethod(
-              weights,
-              blockedInputNumberElements,
-              localIterations,
-              regularization,
-              scaling,
-              seed
-            )
-
-            // scale the weight vectors
-            val weightedDeltaWs = deltaWs map {
-              deltaW => {
-                deltaW :*= scaling
-              }
-            }
-
-            // calculate the new weight vector by adding the weight vector updates to the weight
-            // vector value
-            weights.union(weightedDeltaWs).reduce { _ + _ }
-          }
-        }
-
-        // Store the learned weight vector in hte given instance
-        instance.weightsOption = Some(resultingWeights)
-      }
-    }
-  }
-
-  /** Creates a zero vector of length dimension
-    *
-    * @param dimension [[DataSet]] containing the dimension of the initial weight vector
-    * @return Zero vector of length dimension
-    */
-  private def createInitialWeights(dimension: DataSet[Int]): DataSet[BreezeDenseVector[Double]] = {
-    dimension.map {
-      d => BreezeDenseVector.zeros[Double](d)
-    }
-  }
-
-  /** Computes the local SDCA on the individual data blocks/partitions
-    *
-    * @param w Current weight vector
-    * @param blockedInputNumberElements Blocked/Partitioned input data
-    * @param localIterations Number of local SDCA iterations
-    * @param regularization Regularization constant
-    * @param scaling Scaling value for new weight vector updates
-    * @param seed Random number generator seed
-    * @return [[DataSet]] of weight vector updates. The weight vector updates are double arrays
-    */
-  private def localDualMethod(
-    w: DataSet[BreezeDenseVector[Double]],
-    blockedInputNumberElements: DataSet[(Block[LabeledVector], Int)],
-    localIterations: Int,
-    regularization: Double,
-    scaling: Double,
-    seed: Long)
-  : DataSet[BreezeDenseVector[Double]] = {
-    /*
-    Rich mapper calculating for each data block the local SDCA. We use a RichMapFunction here,
-    because we broadcast the current value of the weight vector to all mappers.
-     */
-    val localSDCA = new RichMapFunction[(Block[LabeledVector], Int), BreezeDenseVector[Double]] {
-      var originalW: BreezeDenseVector[Double] = _
-      // we keep the alphas across the outer loop iterations
-      val alphasArray = ArrayBuffer[BreezeDenseVector[Double]]()
-      // there might be several data blocks in one Flink partition, therefore store mapping
-      val idMapping = scala.collection.mutable.HashMap[Int, Int]()
-      var counter = 0
-
-      var r: Random = _
-
-      override def open(parameters: Configuration): Unit = {
-        originalW = getRuntimeContext.getBroadcastVariable(WEIGHT_VECTOR).get(0)
-
-        if(r == null){
-          r = new Random(seed ^ getRuntimeContext.getIndexOfThisSubtask)
-        }
-      }
-
-      override def map(blockNumberElements: (Block[LabeledVector], Int))
-      : BreezeDenseVector[Double] = {
-        val (block, numberElements) = blockNumberElements
-
-        // check if we already processed a data block with the corresponding block index
-        val localIndex = idMapping.get(block.index) match {
-          case Some(idx) => idx
-          case None =>
-            idMapping += (block.index -> counter)
-            counter += 1
-
-            alphasArray += BreezeDenseVector.zeros[Double](block.values.length)
-
-            counter - 1
-        }
-
-        // create temporary alpha array for the local SDCA iterations
-        val tempAlphas = alphasArray(localIndex).copy
-
-        val numLocalDatapoints = tempAlphas.length
-        val deltaAlphas = BreezeDenseVector.zeros[Double](numLocalDatapoints)
-
-        val w = originalW.copy
-
-        val deltaW = BreezeDenseVector.zeros[Double](originalW.length)
-
-        for(i <- 1 to localIterations) {
-          // pick random data point for SDCA
-          val idx = r.nextInt(numLocalDatapoints)
-
-          val LabeledVector(label, vector) = block.values(idx)
-          val alpha = tempAlphas(idx)
-
-          // maximize the dual problem and retrieve alpha and weight vector updates
-          val (deltaAlpha, deltaWUpdate) = maximize(
-            vector.asBreeze,
-            label,
-            regularization,
-            alpha,
-            w,
-            numberElements)
-
-          // update alpha values
-          tempAlphas(idx) += deltaAlpha
-          deltaAlphas(idx) += deltaAlpha
-
-          // deltaWUpdate is already scaled with 1/lambda/n
-          w += deltaWUpdate
-          deltaW += deltaWUpdate
-        }
-
-        // update local alpha values
-        alphasArray(localIndex) += deltaAlphas * scaling
-
-        deltaW
-      }
-    }
-
-    blockedInputNumberElements.map(localSDCA).withBroadcastSet(w, WEIGHT_VECTOR)
-  }
-
-  /** Maximizes the dual problem using hinge loss functions. It returns the alpha and weight
-    * vector updates.
-    *
-    * @param x Selected data point
-    * @param y Label of selected data point
-    * @param regularization Regularization constant
-    * @param alpha Alpha value of selected data point
-    * @param w Current weight vector value
-    * @param numberElements Number of elements in the training data set
-    * @return Alpha and weight vector updates
-    */
-  private def maximize(
-    x: BreezeVector[Double],
-    y: Double, regularization: Double,
-    alpha: Double,
-    w: BreezeVector[Double],
-    numberElements: Int)
-  : (Double, BreezeVector[Double]) = {
-    // compute hinge loss gradient
-    val dotProduct = x dot w
-    val grad = (y * dotProduct - 1.0) * (regularization * numberElements)
-
-    // compute projected gradient
-    var proj_grad = if(alpha  <= 0.0){
-      Math.min(grad, 0)
-    } else if(alpha >= 1.0) {
-      Math.max(grad, 0)
-    } else {
-      grad
-    }
-
-    if(Math.abs(grad) != 0.0){
-      val qii = x dot x
-      val newAlpha = if(qii != 0.0){
-        Math.min(Math.max((alpha - (grad / qii)), 0.0), 1.0)
-      } else {
-        1.0
-      }
-
-      val deltaW = x * y * (newAlpha - alpha) / (regularization * numberElements)
-
-      (newAlpha - alpha, deltaW)
-    } else {
-      (0.0 , BreezeVector.zeros(w.length))
-    }
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/995f8f96/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala
new file mode 100644
index 0000000..a08fef2
--- /dev/null
+++ b/flink-staging/flink-ml/src/main/scala/org/apache/flink/ml/classification/SVM.scala
@@ -0,0 +1,515 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.classification
+
+import org.apache.flink.ml.pipeline.{FitOperation, PredictOperation, Predictor}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.util.Random
+
+import org.apache.flink.api.common.functions.RichMapFunction
+import org.apache.flink.api.scala._
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.ml.common.FlinkMLTools.ModuloKeyPartitioner
+import org.apache.flink.ml.common._
+import org.apache.flink.ml.math.Vector
+import org.apache.flink.ml.math.Breeze._
+
+import breeze.linalg.{Vector => BreezeVector, DenseVector => BreezeDenseVector}
+
+/** Implements a soft-maring SVM using the communication-efficient distributed dual coordinate
+  * ascent algorithm (CoCoA) with hinge-loss function.
+  *
+  * The algorithm solves the following minimization problem:
+  *
+  * `min_{w in bbb"R"^d} lambda/2 ||w||^2 + 1/n sum_(i=1)^n l_{i}(w^Tx_i)`
+  *
+  * with `w` being the weight vector, `lambda` being the regularization constant,
+  * `x_{i} in bbb"R"^d` being the data points and `l_{i}` being the convex loss functions, which
+  * can also depend on the labels `y_{i} in bbb"R"`.
+  * In the current implementation the regularizer is the 2-norm and the loss functions are the
+  * hinge-loss functions:
+  *
+  * `l_{i} = max(0, 1 - y_{i} * w^Tx_i`
+  *
+  * With these choices, the problem definition is equivalent to a SVM with soft-margin.
+  * Thus, the algorithm allows us to train a SVM with soft-margin.
+  *
+  * The minimization problem is solved by applying stochastic dual coordinate ascent (SDCA).
+  * In order to make the algorithm efficient in a distributed setting, the CoCoA algorithm
+  * calculates several iterations of SDCA locally on a data block before merging the local
+  * updates into a valid global state.
+  * This state is redistributed to the different data partitions where the next round of local
+  * SDCA iterations is then executed.
+  * The number of outer iterations and local SDCA iterations control the overall network costs,
+  * because there is only network communication required for each outer iteration.
+  * The local SDCA iterations are embarrassingly parallel once the individual data partitions have
+  * been distributed across the cluster.
+  *
+  * Further details of the algorithm can be found [[http://arxiv.org/abs/1409.1458 here]].
+  *
+  * @example
+  *          {{{
+  *             val trainingDS: DataSet[LabeledVector] = env.readSVMFile(pathToTrainingFile)
+  *
+  *             val svm = CoCoA()
+  *               .setBlocks(10)
+  *               .setIterations(10)
+  *               .setLocalIterations(10)
+  *               .setRegularization(0.5)
+  *               .setStepsize(0.5)
+  *
+  *             svm.fit(trainingDS)
+  *
+  *             val testingDS: DataSet[Vector] = env.readVectorFile(pathToTestingFile)
+  *
+  *             val predictionDS: DataSet[LabeledVector] = svm.predict(testingDS)
+  *          }}}
+  *
+  * =Parameters=
+  *
+  *  - [[org.apache.flink.ml.classification.SVM.Blocks]]:
+  *  Sets the number of blocks into which the input data will be split. On each block the local
+  *  stochastic dual coordinate ascent method is executed. This number should be set at least to
+  *  the degree of parallelism. If no value is specified, then the parallelism of the input
+  *  [[DataSet]] is used as the number of blocks. (Default value: '''None''')
+  *
+  *  - [[org.apache.flink.ml.classification.SVM.Iterations]]:
+  *  Defines the maximum number of iterations of the outer loop method. In other words, it defines
+  *  how often the SDCA method is applied to the blocked data. After each iteration, the locally
+  *  computed weight vector updates have to be reduced to update the global weight vector value.
+  *  The new weight vector is broadcast to all SDCA tasks at the beginning of each iteration.
+  *  (Default value: '''10''')
+  *
+  *  - [[org.apache.flink.ml.classification.SVM.LocalIterations]]:
+  *  Defines the maximum number of SDCA iterations. In other words, it defines how many data points
+  *  are drawn from each local data block to calculate the stochastic dual coordinate ascent.
+  *  (Default value: '''10''')
+  *
+  *  - [[org.apache.flink.ml.classification.SVM.Regularization]]:
+  *  Defines the regularization constant of the CoCoA algorithm. The higher the value, the smaller
+  *  will the 2-norm of the weight vector be. In case of a SVM with hinge loss this means that the
+  *  SVM margin will be wider even though it might contain some false classifications.
+  *  (Default value: '''1.0''')
+  *
+  *  - [[org.apache.flink.ml.classification.SVM.Stepsize]]:
+  *  Defines the initial step size for the updates of the weight vector. The larger the step size
+  *  is, the larger will be the contribution of the weight vector updates to the next weight vector
+  *  value. The effective scaling of the updates is `stepsize/blocks`. This value has to be tuned
+  *  in case that the algorithm becomes instable. (Default value: '''1.0''')
+  *
+  *  - [[org.apache.flink.ml.classification.SVM.Seed]]:
+  *  Defines the seed to initialize the random number generator. The seed directly controls which
+  *  data points are chosen for the SDCA method. (Default value: '''0''')
+  */
+class SVM extends Predictor[SVM] {
+
+  import SVM._
+
+  /** Stores the learned weight vector after the fit operation */
+  var weightsOption: Option[DataSet[BreezeDenseVector[Double]]] = None
+
+  /** Sets the number of data blocks/partitions
+    *
+    * @param blocks
+    * @return itself
+    */
+  def setBlocks(blocks: Int): SVM = {
+    parameters.add(Blocks, blocks)
+    this
+  }
+
+  /** Sets the number of outer iterations
+    *
+    * @param iterations
+    * @return itself
+    */
+  def setIterations(iterations: Int): SVM = {
+    parameters.add(Iterations, iterations)
+    this
+  }
+
+  /** Sets the number of local SDCA iterations
+    *
+    * @param localIterations
+    * @return itselft
+    */
+  def setLocalIterations(localIterations: Int): SVM =  {
+    parameters.add(LocalIterations, localIterations)
+    this
+  }
+
+  /** Sets the regularization constant
+    *
+    * @param regularization
+    * @return itself
+    */
+  def setRegularization(regularization: Double): SVM = {
+    parameters.add(Regularization, regularization)
+    this
+  }
+
+  /** Sets the stepsize for the weight vector updates
+    *
+    * @param stepsize
+    * @return itself
+    */
+  def setStepsize(stepsize: Double): SVM = {
+    parameters.add(Stepsize, stepsize)
+    this
+  }
+
+  /** Sets the seed value for the random number generator
+    *
+    * @param seed
+    * @return itself
+    */
+  def setSeed(seed: Long): SVM = {
+    parameters.add(Seed, seed)
+    this
+  }
+}
+
+/** Companion object of SVM. Contains convenience functions and the parameter type definitions
+  * of the algorithm.
+  */
+object SVM{
+  val WEIGHT_VECTOR ="weightVector"
+
+  // ========================================== Parameters =========================================
+
+  case object Blocks extends Parameter[Int] {
+    val defaultValue: Option[Int] = None
+  }
+
+  case object Iterations extends Parameter[Int] {
+    val defaultValue = Some(10)
+  }
+
+  case object LocalIterations extends Parameter[Int] {
+    val defaultValue = Some(10)
+  }
+
+  case object Regularization extends Parameter[Double] {
+    val defaultValue = Some(1.0)
+  }
+
+  case object Stepsize extends Parameter[Double] {
+    val defaultValue = Some(1.0)
+  }
+
+  case object Seed extends Parameter[Long] {
+    val defaultValue = Some(0L)
+  }
+
+  // ========================================== Factory methods ====================================
+
+  def apply(): SVM = {
+    new SVM()
+  }
+
+  // ========================================== Operations =========================================
+
+  /** [[org.apache.flink.ml.pipeline.PredictOperation]] for vector types. The result type is a
+    * [[LabeledVector]]
+    *
+    * @tparam T Subtype of [[Vector]]
+    * @return
+    */
+  implicit def predictValues[T <: Vector] = {
+    new PredictOperation[SVM, T, LabeledVector]{
+      override def predict(
+          instance: SVM,
+          predictParameters: ParameterMap,
+          input: DataSet[T])
+        : DataSet[LabeledVector] = {
+
+        instance.weightsOption match {
+          case Some(weights) => {
+            input.map(new PredictionMapper[T]).withBroadcastSet(weights, WEIGHT_VECTOR)
+          }
+
+          case None => {
+            throw new RuntimeException("The SVM model has not been trained. Call first fit" +
+              "before calling the predict operation.")
+          }
+        }
+      }
+    }
+  }
+
+  /** Mapper to calculate the value of the prediction function. This is a RichMapFunction, because
+    * we broadcast the weight vector to all mappers.
+    */
+  class PredictionMapper[T <: Vector] extends RichMapFunction[T, LabeledVector] {
+
+    var weights: BreezeDenseVector[Double] = _
+
+    @throws(classOf[Exception])
+    override def open(configuration: Configuration): Unit = {
+      // get current weights
+      weights = getRuntimeContext.
+        getBroadcastVariable[BreezeDenseVector[Double]](WEIGHT_VECTOR).get(0)
+    }
+
+    override def map(vector: T): LabeledVector = {
+      // calculate the prediction value (scaled distance from the separating hyperplane)
+      val dotProduct = weights dot vector.asBreeze
+
+      LabeledVector(dotProduct, vector)
+    }
+  }
+
+  /** [[FitOperation]] which trains a SVM with soft-margin based on the given training data set.
+    *
+    */
+  implicit val fitSVM = {
+    new FitOperation[SVM, LabeledVector] {
+      override def fit(
+          instance: SVM,
+          fitParameters: ParameterMap,
+          input: DataSet[LabeledVector])
+        : Unit = {
+        val resultingParameters = instance.parameters ++ fitParameters
+
+        // Check if the number of blocks/partitions has been specified
+        val blocks = resultingParameters.get(Blocks) match {
+          case Some(value) => value
+          case None => input.getParallelism
+        }
+
+        val scaling = resultingParameters(Stepsize)/blocks
+        val iterations = resultingParameters(Iterations)
+        val localIterations = resultingParameters(LocalIterations)
+        val regularization = resultingParameters(Regularization)
+        val seed = resultingParameters(Seed)
+
+        // Obtain DataSet with the dimension of the data points
+        val dimension = input.map{_.vector.size}.reduce{
+          (a, b) => {
+            require(a == b, "Dimensions of feature vectors have to be equal.")
+            a
+          }
+        }
+
+        val initialWeights = createInitialWeights(dimension)
+
+        // Count the number of vectors, but keep the value in a DataSet to broadcast it later
+        // TODO: Once efficient count and intermediate result partitions are implemented, use count
+        val numberVectors = input map { x => 1 } reduce { _ + _ }
+
+        // Group the input data into blocks in round robin fashion
+        val blockedInputNumberElements = FlinkMLTools.block(
+          input,
+          blocks,
+          Some(ModuloKeyPartitioner)).
+          cross(numberVectors).
+          map { x => x }
+
+        val resultingWeights = initialWeights.iterate(iterations) {
+          weights => {
+            // compute the local SDCA to obtain the weight vector updates
+            val deltaWs = localDualMethod(
+              weights,
+              blockedInputNumberElements,
+              localIterations,
+              regularization,
+              scaling,
+              seed
+            )
+
+            // scale the weight vectors
+            val weightedDeltaWs = deltaWs map {
+              deltaW => {
+                deltaW :*= scaling
+              }
+            }
+
+            // calculate the new weight vector by adding the weight vector updates to the weight
+            // vector value
+            weights.union(weightedDeltaWs).reduce { _ + _ }
+          }
+        }
+
+        // Store the learned weight vector in hte given instance
+        instance.weightsOption = Some(resultingWeights)
+      }
+    }
+  }
+
+  /** Creates a zero vector of length dimension
+    *
+    * @param dimension [[DataSet]] containing the dimension of the initial weight vector
+    * @return Zero vector of length dimension
+    */
+  private def createInitialWeights(dimension: DataSet[Int]): DataSet[BreezeDenseVector[Double]] = {
+    dimension.map {
+      d => BreezeDenseVector.zeros[Double](d)
+    }
+  }
+
+  /** Computes the local SDCA on the individual data blocks/partitions
+    *
+    * @param w Current weight vector
+    * @param blockedInputNumberElements Blocked/Partitioned input data
+    * @param localIterations Number of local SDCA iterations
+    * @param regularization Regularization constant
+    * @param scaling Scaling value for new weight vector updates
+    * @param seed Random number generator seed
+    * @return [[DataSet]] of weight vector updates. The weight vector updates are double arrays
+    */
+  private def localDualMethod(
+    w: DataSet[BreezeDenseVector[Double]],
+    blockedInputNumberElements: DataSet[(Block[LabeledVector], Int)],
+    localIterations: Int,
+    regularization: Double,
+    scaling: Double,
+    seed: Long)
+  : DataSet[BreezeDenseVector[Double]] = {
+    /*
+    Rich mapper calculating for each data block the local SDCA. We use a RichMapFunction here,
+    because we broadcast the current value of the weight vector to all mappers.
+     */
+    val localSDCA = new RichMapFunction[(Block[LabeledVector], Int), BreezeDenseVector[Double]] {
+      var originalW: BreezeDenseVector[Double] = _
+      // we keep the alphas across the outer loop iterations
+      val alphasArray = ArrayBuffer[BreezeDenseVector[Double]]()
+      // there might be several data blocks in one Flink partition, therefore store mapping
+      val idMapping = scala.collection.mutable.HashMap[Int, Int]()
+      var counter = 0
+
+      var r: Random = _
+
+      override def open(parameters: Configuration): Unit = {
+        originalW = getRuntimeContext.getBroadcastVariable(WEIGHT_VECTOR).get(0)
+
+        if(r == null){
+          r = new Random(seed ^ getRuntimeContext.getIndexOfThisSubtask)
+        }
+      }
+
+      override def map(blockNumberElements: (Block[LabeledVector], Int))
+      : BreezeDenseVector[Double] = {
+        val (block, numberElements) = blockNumberElements
+
+        // check if we already processed a data block with the corresponding block index
+        val localIndex = idMapping.get(block.index) match {
+          case Some(idx) => idx
+          case None =>
+            idMapping += (block.index -> counter)
+            counter += 1
+
+            alphasArray += BreezeDenseVector.zeros[Double](block.values.length)
+
+            counter - 1
+        }
+
+        // create temporary alpha array for the local SDCA iterations
+        val tempAlphas = alphasArray(localIndex).copy
+
+        val numLocalDatapoints = tempAlphas.length
+        val deltaAlphas = BreezeDenseVector.zeros[Double](numLocalDatapoints)
+
+        val w = originalW.copy
+
+        val deltaW = BreezeDenseVector.zeros[Double](originalW.length)
+
+        for(i <- 1 to localIterations) {
+          // pick random data point for SDCA
+          val idx = r.nextInt(numLocalDatapoints)
+
+          val LabeledVector(label, vector) = block.values(idx)
+          val alpha = tempAlphas(idx)
+
+          // maximize the dual problem and retrieve alpha and weight vector updates
+          val (deltaAlpha, deltaWUpdate) = maximize(
+            vector.asBreeze,
+            label,
+            regularization,
+            alpha,
+            w,
+            numberElements)
+
+          // update alpha values
+          tempAlphas(idx) += deltaAlpha
+          deltaAlphas(idx) += deltaAlpha
+
+          // deltaWUpdate is already scaled with 1/lambda/n
+          w += deltaWUpdate
+          deltaW += deltaWUpdate
+        }
+
+        // update local alpha values
+        alphasArray(localIndex) += deltaAlphas * scaling
+
+        deltaW
+      }
+    }
+
+    blockedInputNumberElements.map(localSDCA).withBroadcastSet(w, WEIGHT_VECTOR)
+  }
+
+  /** Maximizes the dual problem using hinge loss functions. It returns the alpha and weight
+    * vector updates.
+    *
+    * @param x Selected data point
+    * @param y Label of selected data point
+    * @param regularization Regularization constant
+    * @param alpha Alpha value of selected data point
+    * @param w Current weight vector value
+    * @param numberElements Number of elements in the training data set
+    * @return Alpha and weight vector updates
+    */
+  private def maximize(
+    x: BreezeVector[Double],
+    y: Double, regularization: Double,
+    alpha: Double,
+    w: BreezeVector[Double],
+    numberElements: Int)
+  : (Double, BreezeVector[Double]) = {
+    // compute hinge loss gradient
+    val dotProduct = x dot w
+    val grad = (y * dotProduct - 1.0) * (regularization * numberElements)
+
+    // compute projected gradient
+    var proj_grad = if(alpha  <= 0.0){
+      Math.min(grad, 0)
+    } else if(alpha >= 1.0) {
+      Math.max(grad, 0)
+    } else {
+      grad
+    }
+
+    if(Math.abs(grad) != 0.0){
+      val qii = x dot x
+      val newAlpha = if(qii != 0.0){
+        Math.min(Math.max((alpha - (grad / qii)), 0.0), 1.0)
+      } else {
+        1.0
+      }
+
+      val deltaW = x * y * (newAlpha - alpha) / (regularization * numberElements)
+
+      (newAlpha - alpha, deltaW)
+    } else {
+      (0.0 , BreezeVector.zeros(w.length))
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/995f8f96/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/classification/CoCoAITSuite.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/classification/CoCoAITSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/classification/CoCoAITSuite.scala
deleted file mode 100644
index a5e7496..0000000
--- a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/classification/CoCoAITSuite.scala
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.ml.classification
-
-import org.scalatest.{FlatSpec, Matchers}
-
-import org.apache.flink.api.scala._
-import org.apache.flink.test.util.FlinkTestBase
-
-class CoCoAITSuite extends FlatSpec with Matchers with FlinkTestBase {
-
-  behavior of "The CoCoA implementation"
-
-  it should "train a SVM" in {
-    val env = ExecutionEnvironment.getExecutionEnvironment
-
-    val cocoa = CoCoA().
-    setBlocks(env.getParallelism).
-    setIterations(100).
-    setLocalIterations(100).
-    setRegularization(0.002).
-    setStepsize(0.1).
-    setSeed(0)
-
-    val trainingDS = env.fromCollection(Classification.trainingData)
-
-    cocoa.fit(trainingDS)
-
-    val weightVector = cocoa.weightsOption.get.collect().apply(0)
-
-    weightVector.valuesIterator.zip(Classification.expectedWeightVector.valueIterator).foreach {
-      case (weight, expectedWeight) =>
-        weight should be(expectedWeight +- 0.1)
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/995f8f96/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/classification/SVMITSuite.scala
----------------------------------------------------------------------
diff --git a/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/classification/SVMITSuite.scala b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/classification/SVMITSuite.scala
new file mode 100644
index 0000000..cdb4ffc
--- /dev/null
+++ b/flink-staging/flink-ml/src/test/scala/org/apache/flink/ml/classification/SVMITSuite.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.ml.classification
+
+import org.scalatest.{FlatSpec, Matchers}
+
+import org.apache.flink.api.scala._
+import org.apache.flink.test.util.FlinkTestBase
+
+class SVMITSuite extends FlatSpec with Matchers with FlinkTestBase {
+
+  behavior of "The SVM using CoCoA implementation"
+
+  it should "train a SVM" in {
+    val env = ExecutionEnvironment.getExecutionEnvironment
+
+    val cocoa = SVM().
+    setBlocks(env.getParallelism).
+    setIterations(100).
+    setLocalIterations(100).
+    setRegularization(0.002).
+    setStepsize(0.1).
+    setSeed(0)
+
+    val trainingDS = env.fromCollection(Classification.trainingData)
+
+    cocoa.fit(trainingDS)
+
+    val weightVector = cocoa.weightsOption.get.collect().apply(0)
+
+    weightVector.valuesIterator.zip(Classification.expectedWeightVector.valueIterator).foreach {
+      case (weight, expectedWeight) =>
+        weight should be(expectedWeight +- 0.1)
+    }
+  }
+}