You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Aljoscha Krettek <al...@apache.org> on 2016/04/11 10:09:15 UTC

Re: BulkIteration and BroadcastVariables

Hi,
it is not possible to change broadcast variables. Internally they are also
just a dataset that get's streamed through on an additional input of an
operator.

--
aljoscha

On Wed, 30 Mar 2016 at 17:34 Lydia Ickler <ic...@googlemail.com> wrote:

> Hi all,
> I have a question regarding the BulkIteration and BroadcastVariables:
> The BulkIteration by default has one input variable and sends one variable
> into the next iteration, right?
> What if I need to collect some intermediate results in each iteration? How
> would I do that?
>
> For example in my code below I would like to store all newEigenValue. Unfortunately
> I didn’t find a way to do so.
> Is it possible to set/change BroadcastVariables? Or is it only possible to
> „get“ them?
>
> Thanks in advance!
> Lydia
>
>
> //read input file
> DataSet<Tuple3<Integer, Integer, Double>> matrixA = readMatrix(env, input);
>
>
> //initial:
> //Approximate EigenVector by PowerIteration
> DataSet<Tuple3<Integer, Integer, Double>> eigenVector = PowerIteration_getEigenVector2(matrixA);
> //Approximate EigenValue by PowerIteration
> DataSet<Tuple3<Integer, Integer, Double>> oldEigenValue = PowerIteration_getEigenValue(matrixA,eigenVector);
> //Deflate original matrix
> matrixA = PowerIteration_getNextMatrix(matrixA,eigenVector,oldEigenValue);
>
> DataSet<Tuple3<Integer, Integer, Double>> newEigenVector = null;
> DataSet<Tuple3<Integer, Integer, Double>> newEigenValue = null;
> DataSet<Tuple3<Integer, Integer, Double>> newMatrixA = null;
>
>
> //BulkIteration to find k dominant eigenvalues
> IterativeDataSet<Tuple3<Integer, Integer, Double>> iteration = matrixA.iterate(outer_iterations);
>
> newEigenVector = PowerIteration_getEigenVector2(iteration);
> newEigenValue = PowerIteration_getEigenValue(iteration,newEigenVector);
> newMatrixA = PowerIteration_getNextMatrix(iteration,newEigenVector,newEigenValue);
>
> //get gap
> DataSet<Tuple3<Integer, Integer, Double>> gap = newEigenValue.map(new getGap()).withBroadcastSet(oldEigenValue, "oldEigenValue");
> DataSet<Tuple3<Integer, Integer, Double>> filtered = gap.filter(new gapFilter());
> oldEigenValue = newEigenValue;
>
> DataSet<Tuple3<Integer, Integer, Double>> neue  = iteration.closeWith(newMatrixA,filtered);
>
>
>