You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@flink.apache.org by Florian Gößler <ma...@floriangoessler.de> on 2015/06/02 13:10:58 UTC

[FLINK-1731] [ML] Issues with vector to breeze converter while implementing KMeans

Hi Flink Community,

we are implementing a KMeans algorithm in the ML part of Flink, but after recent updates we ran into an issues with the vector to breeze converter.

We are getting the following compile error:

Error:(200, 75) ambiguous implicit values:
 both value denseVectorConverter in object BreezeVectorConverter of type => org.apache.flink.ml.math.BreezeVectorConverter[org.apache.flink.ml.math.DenseVector]
 and value sparseVectorConverter in object BreezeVectorConverter of type => org.apache.flink.ml.math.BreezeVectorConverter[org.apache.flink.ml.math.SparseVector]
 match expected type org.apache.flink.ml.math.BreezeVectorConverter[T]
            .reduce((p1, p2) => (p1._1, (p1._2.asBreeze + p2._2.asBreeze).fromBreeze, p1._3 + p2._3))
                                                                          ^

The code looks like this:

new FitOperation[KMeans, Vector] {
      override def fit(
        instance: KMeans,
        fitParameters: ParameterMap,
        input: DataSet[Vector])
      : Unit = {
        val resultingParameters = instance.parameters ++ fitParameters

        val centroids: DataSet[LabeledVector] = resultingParameters.get(InitialCentroids).getdon'
        val numIterations: Int = resultingParameters.get(NumIterations).get

        val finalCentroids = centroids.iterate(numIterations) { currentCentroids =>
          val newCentroids: DataSet[LabeledVector] = input
            .map(new SelectNearestCenterMapper).withBroadcastSet(currentCentroids, CENTROIDS)
            .map(x => (x.label, x.vector, 1.0)).withForwardedFields("label->_1; vector->_2")
            .groupBy(x => x._1)
            .reduce((p1, p2) => (p1._1, (p1._2.asBreeze + p2._2.asBreeze).fromBreeze, p1._3 + p2._3))
            .withForwardedFields("_1")
            .map(x => LabeledVector(x._1, (x._2.asBreeze :/ x._3).fromBreeze))
            .withForwardedFields("_1->label")

          newCentroids
        }

        instance.centroids = Some(finalCentroids)
      }
}

We are getting this error since the commit https://github.com/apache/flink/commit/ae446388b91ecc0f08887da19400395b96b32f6c .
It looks like the change to flink-ml/src/main/scala/org/apache/flink/ml/math/BreezeVectorConverter.scala introduced this problem.

We are not that experienced with scala and don’t know how to resolve this issue.

You can have a look at the code in the pull request: https://github.com/apache/flink/pull/700


Thanks in advance,

Florian & Co.



Re: [FLINK-1731] [ML] Issues with vector to breeze converter while implementing KMeans

Posted by Till Rohrmann <ti...@gmail.com>.
Hi Florian,

I just wrote a patch for this problem. I wait until all tests pass and then
I’ll merge the fix. Thus, it will be included in the current master in the
late afternoon.

If you don’t want to wait that long, then you can also solve the issue with
.fromBreeze[org.apache.flink.ml.math.Vector].

Cheers,
Till
​

On Tue, Jun 2, 2015 at 1:10 PM, Florian Gößler <ma...@floriangoessler.de>
wrote:

> Hi Flink Community,
>
> we are implementing a KMeans algorithm in the ML part of Flink, but after
> recent updates we ran into an issues with the vector to breeze converter.
>
> We are getting the following compile error:
>
> Error:(200, 75) ambiguous implicit values:
>  both value denseVectorConverter in object BreezeVectorConverter of type
> =>
> org.apache.flink.ml.math.BreezeVectorConverter[org.apache.flink.ml.math.DenseVector]
>  and value sparseVectorConverter in object BreezeVectorConverter of type
> =>
> org.apache.flink.ml.math.BreezeVectorConverter[org.apache.flink.ml.math.SparseVector]
>  match expected type org.apache.flink.ml.math.BreezeVectorConverter[T]
>             .reduce((p1, p2) => (p1._1, (p1._2.asBreeze +
> p2._2.asBreeze).fromBreeze, p1._3 + p2._3))
>                                                                           ^
>
> The code looks like this:
>
> new FitOperation[KMeans, Vector] {
>       override def fit(
>         instance: KMeans,
>         fitParameters: ParameterMap,
>         input: DataSet[Vector])
>       : Unit = {
>         val resultingParameters = instance.parameters ++ fitParameters
>
>         val centroids: DataSet[LabeledVector] =
> resultingParameters.get(InitialCentroids).getdon'
>         val numIterations: Int = resultingParameters.get(NumIterations).get
>
>         val finalCentroids = centroids.iterate(numIterations) {
> currentCentroids =>
>           val newCentroids: DataSet[LabeledVector] = input
>             .map(new
> SelectNearestCenterMapper).withBroadcastSet(currentCentroids, CENTROIDS)
>             .map(x => (x.label, x.vector,
> 1.0)).withForwardedFields("label->_1; vector->_2")
>             .groupBy(x => x._1)
>             .reduce((p1, p2) => (p1._1, (p1._2.asBreeze +
> p2._2.asBreeze).fromBreeze, p1._3 + p2._3))
>             .withForwardedFields("_1")
>             .map(x => LabeledVector(x._1, (x._2.asBreeze :/
> x._3).fromBreeze))
>             .withForwardedFields("_1->label")
>
>           newCentroids
>         }
>
>         instance.centroids = Some(finalCentroids)
>       }
> }
>
> We are getting this error since the commit
> https://github.com/apache/flink/commit/ae446388b91ecc0f08887da19400395b96b32f6c
> .
> It looks like the change to
> flink-ml/src/main/scala/org/apache/flink/ml/math/BreezeVectorConverter.scala
> introduced this problem.
>
> We are not that experienced with scala and don’t know how to resolve this
> issue.
>
> You can have a look at the code in the pull request:
> https://github.com/apache/flink/pull/700
>
>
> Thanks in advance,
>
> Florian & Co.
>
>
>