You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Lydia Ickler <ic...@googlemail.com> on 2016/02/04 15:13:58 UTC

DistributedMatrix in Flink

Hi all,

as mentioned before I am trying to import the RowMatrix from Spark to Flink…

In the code I already ran into a dead end… In the function multiplyGramianMatrixBy() (see end of mail) there is the line: 
rows.context.broadcast(v) (rows is a DataSet[Vector]
What exactly is this line doing? Does it fill the „content“ of v into the variable rows?
And another question:
What is the function treeAggregate doing ? And how would you tackle a „copy“ of that in Flink?

Thanks in advance!
Best regards, 
Lydia


private[flink] def multiplyGramianMatrixBy(v: DenseVector[Double]): DenseVector[Double] = {
  val n = numCols().toInt

  val vbr = rows.context.broadcast(v)

  rows.treeAggregate(BDV.zeros[Double](n))(
    seqOp = (U, r) => {
      val rBrz = r.toBreeze
      val a = rBrz.dot(vbr.data)
      rBrz match {
        // use specialized axpy for better performance
        case _: BDV[_] => brzAxpy(a, rBrz.asInstanceOf[BDV[Double]], U)
        case _: BSV[_] => brzAxpy(a, rBrz.asInstanceOf[BSV[Double]], U)
        case _ => throw new UnsupportedOperationException(
          s"Do not support vector operation from type ${rBrz.getClass.getName}.")
      }
      U
    }, combOp = (U1, U2) => U1 += U2)
}


Re: DistributedMatrix in Flink

Posted by Till Rohrmann <ti...@gmail.com>.
Hi Lydia,

Spark and Flink are not identical. Thus, you’ll concepts in both system
which won’t have a corresponding counter part in the other system. For
example, rows.context.broadcast(v) broadcasts the value v so that you can
use it on all Executors. Flink follows a slightly different concept when
you broadcast values. In Flink you’ll always broadcast the contents of
DataSets. That way you avoid to collect the result on some central node
from which it is then broadcasted.

The treeAggregate is an aggregation operation which is partly executed on
the cluster. It is similar to a combinable reduce operation in Flink.
However, you can choose an arbitrary result type (similar to a fold
operation compared to a reduce operation). You can do the same with Flink
if you first apply a combineGroup function on the DataSet and then a reduce
function.

Cheers,
Till
​

On Thu, Feb 4, 2016 at 3:13 PM, Lydia Ickler <ic...@googlemail.com>
wrote:

> Hi all,
>
> as mentioned before I am trying to import the RowMatrix from Spark to
> Flink…
>
> In the code I already ran into a dead end… In the function multiplyGramianMatrixBy()
> (see end of mail) there is the line:
> rows.context.broadcast(v) (rows is a DataSet[Vector]
> What exactly is this line doing? Does it fill the „content“ of v into the
> variable *rows*?
> And another question:
> What is the function treeAggregate doing ? And how would you tackle
> a „copy“ of that in Flink?
>
> Thanks in advance!
> Best regards,
> Lydia
>
>
> private[flink] def multiplyGramianMatrixBy(v: DenseVector[Double]): DenseVector[Double] = {
>   val n = numCols().toInt
>
>   val vbr = rows.context.broadcast(v)
>
>   rows.treeAggregate(BDV.zeros[Double](n))(
>     seqOp = (U, r) => {
>       val rBrz = r.toBreeze
>       val a = rBrz.dot(vbr.data)
>       rBrz match {
>         // use specialized axpy for better performance
>         case _: BDV[_] => brzAxpy(a, rBrz.asInstanceOf[BDV[Double]], U)
>         case _: BSV[_] => brzAxpy(a, rBrz.asInstanceOf[BSV[Double]], U)
>         case _ => throw new UnsupportedOperationException(
>           s"Do not support vector operation from type ${rBrz.getClass.getName}.")
>       }
>       U
>     }, combOp = (U1, U2) => U1 += U2)
> }
>
>
>