You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Lydia Ickler <ic...@googlemail.com> on 2016/03/26 23:16:36 UTC

for loop slow

Hi,

I have an issue with a for-loop.
If I set the maximal iteration number i to more than 3 it gets stuck and I cannot figure out why.
With 1, 2 or 3 it runs smoothly.
I attached the code below and marked the loop with //PROBLEM.

Thanks in advance!
Lydia

package org.apache.flink.contrib.lifescience.examples;

import edu.princeton.cs.algs4.Graph;
import edu.princeton.cs.algs4.SymbolDigraph;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.java.io.CsvReader;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.contrib.lifescience.networks.algos.DataSetUtils;
import org.apache.flink.contrib.lifescience.networks.datatypes.networks.Network;
import org.apache.flink.contrib.lifescience.networks.datatypes.networks.NetworkEdge;
import org.apache.flink.contrib.lifescience.networks.datatypes.networks.NetworkNode;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.util.Collector;

import java.util.*;

import static edu.princeton.cs.algs4.GraphGenerator.simple;

public class PowerIteration {

    //path to input
    static String input = null;
    //path to output
    static String output = null;
    //number of iterations (default = 7)
    static int iterations = 7;
    //threshold
    static double delta = 0.01;

    public void run() throws Exception {
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        //read input file
        DataSet<Tuple3<Integer, Integer, Double>> matrixA = readMatrix(env, input);

        DataSet<Tuple3<Integer, Integer, Double>> eigenVector;
        DataSet<Tuple3<Integer, Integer, Double>> eigenValue;

        //initial:
        //Approximate EigenVector by PowerIteration
        eigenVector = PowerIteration_getEigenVector(matrixA);
        //Approximate EigenValue by PowerIteration
        eigenValue = PowerIteration_getEigenValue(matrixA,eigenVector);
        //Deflate original matrix
        matrixA = PowerIteration_getNextMatrix(matrixA,eigenVector,eigenValue);

        MyResult initial = new MyResult(eigenVector,eigenValue,matrixA);

        MyResult next = null;

        //PROBLEM!!! get i eigenvalue gaps
        for(int i=0;i<2;i++){
            next = PowerIteration_routine(initial);
            initial = next;
            next.gap.print();
        }

        env.execute("Power Iteration");
    }

    public static DataSource<Tuple3<Integer, Integer, Double>> readMatrix(ExecutionEnvironment env,
                                                                          String filePath) {
        CsvReader csvReader = env.readCsvFile(filePath);
        csvReader.fieldDelimiter(",");
        csvReader.includeFields("ttt");
        return csvReader.types(Integer.class, Integer.class, Double.class);
    }

    public static final class ProjectJoinResultMapper implements
            MapFunction<Tuple2<Tuple3<Integer, Integer, Double>,
                    Tuple3<Integer, Integer, Double>>,
                    Tuple3<Integer, Integer, Double>> {
        @Override
        public Tuple3<Integer, Integer, Double> map(
                Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>> value)
                throws Exception {
            Integer row = value.f0.f0;
            Integer column = value.f1.f1;
            Double product = value.f0.f2 * value.f1.f2;
            return new Tuple3<Integer, Integer, Double>(row, column, product);
        }
    }

    public static final class RQ implements
            MapFunction<Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>>,
                    Tuple3<Integer, Integer, Double>> {

        @Override
        public Tuple3<Integer, Integer, Double> map(
                Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>> value)
                throws Exception {

            return new Tuple3<Integer, Integer, Double>(value.f0.f0,value.f0.f1,value.f0.f2/value.f1.f2);
        }
    }

    public static void main(String[] args) throws Exception {
        if(args.length<2 || args.length > 4){
            System.err.println("Usage: PowerIteration <input path> <result path> optional: <iterations> <threshold diff>");
            System.exit(0);
        }

        input = args[0];
        output = args[1];

        if(args.length==3) {
            iterations = Integer.parseInt(args[2]);
        }
        if(args.length==4){
            delta = Double.parseDouble(args[3]);
        }

        new PowerIteration2().run();
    }

    public static final class deltaFilter implements FlatJoinFunction<Tuple3<Integer, Integer, Double>,Tuple3<Integer, Integer, Double>,Tuple3<Integer, Integer, Double>> {

        public void join(Tuple3<Integer, Integer, Double> candidate, Tuple3<Integer, Integer, Double> old, Collector<Tuple3<Integer, Integer, Double>> out) {

            if(!(candidate.f2 == old.f2)){
                out.collect(candidate);
            }

            //if(Math.abs(candidate.f2-old.f2) > delta){
            //    out.collect(candidate);
            //}

        }
    }

    public static final class normalizeByMax implements
            MapFunction<Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>>,
                    Tuple3<Integer, Integer, Double>> {

        public Tuple3<Integer, Integer, Double> map(
                Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>> value)
                throws Exception {
            return new Tuple3<Integer, Integer, Double>(value.f0.f0,value.f0.f1,value.f0.f2/(value.f1.f2));
        }
    }

    public static final class firstX implements
            MapFunction<Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>>,
                    Tuple3<Integer, Integer, Double>> {

        public Tuple3<Integer, Integer, Double> map(
                Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>> value)
                throws Exception {
            return new Tuple3<Integer, Integer, Double>(value.f0.f0,value.f0.f1,1/(value.f0.f2*value.f1.f2));
        }
    }

    public static final class resetIndex implements
            MapFunction<Tuple3<Integer, Integer, Double>,
                    Tuple3<Integer, Integer, Double>> {

        public Tuple3<Integer, Integer, Double> map(
                Tuple3<Integer, Integer, Double>value)
                throws Exception {
            return new Tuple3<Integer, Integer, Double>(0,value.f1,value.f2);
        }
    }


    public static final class decBy1 implements
            MapFunction<Tuple3<Integer, Integer, Double>,
                    Tuple3<Integer, Integer, Double>> {

        public Tuple3<Integer, Integer, Double> map(
                Tuple3<Integer, Integer, Double>value)
                throws Exception {
            return new Tuple3<Integer, Integer, Double>(value.f0-1,value.f1-1,value.f2);
        }
    }

    public static final class resetIndex2 implements
            MapFunction<Tuple3<Integer, Integer, Double>,
                    Tuple3<Integer, Integer, Double>> {

        public Tuple3<Integer, Integer, Double> map(
                Tuple3<Integer, Integer, Double>value)
                throws Exception {
            return new Tuple3<Integer, Integer, Double>(value.f0,0,value.f2);
        }
    }

    public static final class MatrixTimesValue implements
            MapFunction<Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>>,
                    Tuple3<Integer, Integer, Double>> {

        @Override
        public Tuple3<Integer, Integer, Double> map(
                Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>> value)
                throws Exception {

            return new Tuple3<Integer, Integer, Double>(value.f0.f0,value.f0.f1,value.f0.f2*(value.f1.f2));
        }
    }

    public static final class MatrixMinusMatrix implements
            MapFunction<Tuple2<Tuple3<Integer, Integer, Double>,
                    Tuple3<Integer, Integer, Double>>,
                    Tuple3<Integer, Integer, Double>> {
        @Override
        public Tuple3<Integer, Integer, Double> map(
                Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>> value)
                throws Exception {
            Integer row = value.f0.f0;
            Integer column = value.f0.f1;
            Double result = value.f0.f2 - value.f1.f2;
            return new Tuple3<Integer, Integer, Double>(row, column, result);
        }
    }

    public static final class getGapCenter implements
            MapFunction<Tuple2<Tuple3<Integer, Integer, Double>,
                    Tuple3<Integer, Integer, Double>>,
                    Tuple3<Integer, Integer, Double>> {
        @Override
        public Tuple3<Integer, Integer, Double> map(
                Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>> value)
                throws Exception {
            Integer row = value.f0.f0;
            Integer column = value.f0.f1;
            Double result = value.f0.f2 + (2/(Math.abs(value.f1.f2)));
            return new Tuple3<Integer, Integer, Double>(row, column, result);
        }
    }


    public static DataSet<Tuple3<Integer, Integer, Double>> PowerIteration_getEigenVector(DataSet<Tuple3<Integer, Integer, Double>> matrixA) throws Exception {

        //get initial vector - which equals matrixA * [1, ... , 1]
        DataSet<Tuple3<Integer, Integer, Double>> initial0 = matrixA.groupBy(0).aggregate(Aggregations.SUM,2);

        //normalize by maximum value
        DataSet<Tuple3<Integer, Integer, Double>> initial= initial0.cross(initial0.maxBy(2)).map(new normalizeByMax());

        //BulkIteration to find dominant eigenvector
        IterativeDataSet<Tuple3<Integer, Integer, Double>> iteration = initial.iterate(iterations);

        DataSet<Tuple3<Integer, Integer, Double>> intermediate = (matrixA.join(iteration).where(1).equalTo(0)
                .map(new ProjectJoinResultMapper()).groupBy(0, 1).aggregate(Aggregations.SUM, 2)).groupBy(0).aggregate(Aggregations.SUM, 2).
                cross((matrixA.join(iteration).where(1).equalTo(0)
                        .map(new ProjectJoinResultMapper()).groupBy(0, 1).aggregate(Aggregations.SUM, 2)).groupBy(0).aggregate(Aggregations.SUM, 2).maxBy(2))
                .map(new normalizeByMax());

        DataSet<Tuple3<Integer, Integer, Double>> diffs = iteration.join(intermediate).where(0).equalTo(0).with(new deltaFilter());
        DataSet<Tuple3<Integer, Integer, Double>> eigenVector  = iteration.closeWith(intermediate,diffs);

        return eigenVector;
    }

    public static DataSet<Tuple3<Integer, Integer, Double>> PowerIteration_getEigenValue(DataSet<Tuple3<Integer, Integer, Double>> matrixA, DataSet<Tuple3<Integer, Integer, Double>> eigenVector) {

        //determine now EigenValue by approximating the Rayleigh Quotient:
        //get Ax
        DataSet<Tuple3<Integer, Integer, Double>> Ax = matrixA.join(eigenVector).where(1).equalTo(0)
                .map(new ProjectJoinResultMapper()).groupBy(0, 1).aggregate(Aggregations.SUM, 2).groupBy(0).aggregate(Aggregations.SUM, 2);
        //get Ax * x
        DataSet<Tuple3<Integer, Integer, Double>> Axx = eigenVector.join(Ax).where(0).equalTo(0)
                .map(new ProjectJoinResultMapper()).groupBy(0, 1).aggregate(Aggregations.SUM, 2).aggregate(Aggregations.SUM,2);

        //now x * x
        DataSet<Tuple3<Integer, Integer, Double>> xx = eigenVector.join(eigenVector).where(0).equalTo(0)
                .map(new ProjectJoinResultMapper()).groupBy(0, 1).aggregate(Aggregations.SUM, 2).aggregate(Aggregations.SUM,2);

        return Axx.cross(xx).map(new RQ()).aggregate(Aggregations.SUM, 2);
    }

    public static DataSet<Tuple3<Integer, Integer, Double>> PowerIteration_getNextMatrix(DataSet<Tuple3<Integer, Integer, Double>> matrixA, DataSet<Tuple3<Integer, Integer, Double>> eigenVector, DataSet<Tuple3<Integer, Integer, Double>> eigenValue) {

        DataSet<Tuple3<Integer, Integer, Double>> eigenValueReset = eigenValue.map(new resetIndex());
        DataSet<Tuple3<Integer, Integer, Double>> firstVal = eigenVector.filter(new FilterFunction<Tuple3<Integer, Integer, Double>>() {
            public boolean filter(Tuple3<Integer, Integer, Double> value) {
                return value.f0 == 0;
            }
        });
        firstVal = eigenValueReset.cross(firstVal).map(new firstX());
        DataSet<Tuple3<Integer, Integer, Double>> firstRow = matrixA.filter(new FilterFunction<Tuple3<Integer, Integer, Double>>() {
            public boolean filter(Tuple3<Integer, Integer, Double> value) {
                return value.f0 == 0;
            }
        });
        DataSet<Tuple3<Integer, Integer, Double>> x = ((firstRow.map(new DataSetUtils.transpose())).join(firstVal).where(1).equalTo(0).map(new MatrixTimesValue())).map(new DataSetUtils.transpose());
        DataSet<Tuple3<Integer, Integer, Double>> C = eigenVector.cross(eigenValueReset).map(new MatrixTimesValue()).map(new resetIndex2()).join(x).where(1).equalTo(0).
                map(new ProjectJoinResultMapper()).groupBy(0, 1).aggregate(Aggregations.SUM, 2);
        matrixA = matrixA.join(C).where(0,1).equalTo(0,1).map(new MatrixMinusMatrix());
        matrixA = matrixA.filter(new FilterFunction<Tuple3<Integer, Integer, Double>>() {
            public boolean filter(Tuple3<Integer, Integer, Double> value) {
                return (value.f0 != 0) && (value.f1 != 0);
            }
        });

        return matrixA.map(new decBy1());
    }

    public MyResult PowerIteration_routine(MyResult initial) throws Exception {

        //Approximate EigenVector by PowerIteration
        DataSet<Tuple3<Integer, Integer, Double>> eigenVector = PowerIteration_getEigenVector(initial.matrixA);
        //Approximate EigenValue by PowerIteration
        DataSet<Tuple3<Integer, Integer, Double>> eigenValue = PowerIteration_getEigenValue(initial.matrixA, eigenVector);
        //get gap
        DataSet<Tuple3<Integer, Integer, Double>> gap = initial.eigenValue.cross(eigenValue).map(new MatrixMinusMatrix());
        //Deflate original matrix
        DataSet<Tuple3<Integer, Integer, Double>> matrixA = PowerIteration_getNextMatrix(initial.matrixA,eigenVector,eigenValue);

        return new MyResult(eigenVector,eigenValue,matrixA,gap);
    }

    public class MyResult {
        DataSet<Tuple3<Integer, Integer, Double>> eigenVector;
        DataSet<Tuple3<Integer, Integer, Double>> eigenValue;
        DataSet<Tuple3<Integer, Integer, Double>> gap;
        DataSet<Tuple3<Integer, Integer, Double>> matrixA;

        public MyResult(DataSet<Tuple3<Integer, Integer, Double>> eigenVector, DataSet<Tuple3<Integer, Integer, Double>> eigenValue,DataSet<Tuple3<Integer, Integer, Double>> matrixA){
            this.eigenVector = eigenVector;
            this.eigenValue =eigenValue;
            this.matrixA = matrixA;
        }

        public MyResult(DataSet<Tuple3<Integer, Integer, Double>> eigenVector, DataSet<Tuple3<Integer, Integer, Double>> eigenValue,DataSet<Tuple3<Integer, Integer, Double>> matrixA, DataSet<Tuple3<Integer, Integer, Double>> gap){
            this.eigenVector = eigenVector;
            this.eigenValue =eigenValue;
            this.matrixA = matrixA;
            this.gap = gap;
        }
    }

}


Re: for loop slow

Posted by Chiwan Park <ch...@apache.org>.
Hi Lydia,

To build iterative algorithm on Flink, using API for iterations [1] would be better than using for-loop. Your program triggers multiple executions by multiple calling `next.gap.print()`. In each execution, Flink reads whole data redundantly and it cause performance to decrease.

Regards,
Chiwan Park

[1]: https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/iterations.html

> On Mar 27, 2016, at 7:16 AM, Lydia Ickler <ic...@googlemail.com> wrote:
> 
> Hi,
> 
> I have an issue with a for-loop.
> If I set the maximal iteration number i to more than 3 it gets stuck and I cannot figure out why.
> With 1, 2 or 3 it runs smoothly.
> I attached the code below and marked the loop with //PROBLEM.
> 
> Thanks in advance!
> Lydia
> 
> package org.apache.flink.contrib.lifescience.examples;
> 
> import edu.princeton.cs.algs4.Graph;
> import edu.princeton.cs.algs4.SymbolDigraph;
> import org.apache.flink.api.common.functions.FilterFunction;
> import org.apache.flink.api.common.functions.FlatJoinFunction;
> import org.apache.flink.api.common.functions.MapFunction;
> import org.apache.flink.api.java.DataSet;
> import org.apache.flink.api.java.ExecutionEnvironment;
> import org.apache.flink.api.java.aggregation.Aggregations;
> import org.apache.flink.api.java.io.CsvReader;
> import org.apache.flink.api.java.operators.DataSource;
> import org.apache.flink.api.java.operators.IterativeDataSet;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.api.java.tuple.Tuple3;
> import org.apache.flink.contrib.lifescience.networks.algos.DataSetUtils;
> import org.apache.flink.contrib.lifescience.networks.datatypes.networks.Network;
> import org.apache.flink.contrib.lifescience.networks.datatypes.networks.NetworkEdge;
> import org.apache.flink.contrib.lifescience.networks.datatypes.networks.NetworkNode;
> import org.apache.flink.core.fs.FileSystem;
> import org.apache.flink.util.Collector;
> 
> import java.util.*;
> 
> import static edu.princeton.cs.algs4.GraphGenerator.simple;
> 
> public class PowerIteration {
> 
>     //path to input
>     static String input = null;
>     //path to output
>     static String output = null;
>     //number of iterations (default = 7)
>     static int iterations = 7;
>     //threshold
>     static double delta = 0.01;
> 
>     public void run() throws Exception {
>         ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
> 
>         //read input file
>         DataSet<Tuple3<Integer, Integer, Double>> matrixA = readMatrix(env, input);
> 
>         DataSet<Tuple3<Integer, Integer, Double>> eigenVector;
>         DataSet<Tuple3<Integer, Integer, Double>> eigenValue;
> 
>         //initial:
>         //Approximate EigenVector by PowerIteration
>         eigenVector = PowerIteration_getEigenVector(matrixA);
>         //Approximate EigenValue by PowerIteration
>         eigenValue = PowerIteration_getEigenValue(matrixA,eigenVector);
>         //Deflate original matrix
>         matrixA = PowerIteration_getNextMatrix(matrixA,eigenVector,eigenValue);
> 
>         MyResult initial = new MyResult(eigenVector,eigenValue,matrixA);
> 
>         MyResult next = null;
> 
>         //PROBLEM!!! get i eigenvalue gaps
>         for(int i=0;i<2;i++){
>             next = PowerIteration_routine(initial);
>             initial = next;
>             next.gap.print();
>         }
> 
>         env.execute("Power Iteration");
>     }
> 
>     public static DataSource<Tuple3<Integer, Integer, Double>> readMatrix(ExecutionEnvironment env,
>                                                                           String filePath) {
>         CsvReader csvReader = env.readCsvFile(filePath);
>         csvReader.fieldDelimiter(",");
>         csvReader.includeFields("ttt");
>         return csvReader.types(Integer.class, Integer.class, Double.class);
>     }
> 
>     public static final class ProjectJoinResultMapper implements
>             MapFunction<Tuple2<Tuple3<Integer, Integer, Double>,
>                     Tuple3<Integer, Integer, Double>>,
>                     Tuple3<Integer, Integer, Double>> {
>         @Override
>         public Tuple3<Integer, Integer, Double> map(
>                 Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>> value)
>                 throws Exception {
>             Integer row = value.f0.f0;
>             Integer column = value.f1.f1;
>             Double product = value.f0.f2 * value.f1.f2;
>             return new Tuple3<Integer, Integer, Double>(row, column, product);
>         }
>     }
> 
>     public static final class RQ implements
>             MapFunction<Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>>,
>                     Tuple3<Integer, Integer, Double>> {
> 
>         @Override
>         public Tuple3<Integer, Integer, Double> map(
>                 Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>> value)
>                 throws Exception {
> 
>             return new Tuple3<Integer, Integer, Double>(value.f0.f0,value.f0.f1,value.f0.f2/value.f1.f2);
>         }
>     }
> 
>     public static void main(String[] args) throws Exception {
>         if(args.length<2 || args.length > 4){
>             System.err.println("Usage: PowerIteration <input path> <result path> optional: <iterations> <threshold diff>");
>             System.exit(0);
>         }
> 
>         input = args[0];
>         output = args[1];
> 
>         if(args.length==3) {
>             iterations = Integer.parseInt(args[2]);
>         }
>         if(args.length==4){
>             delta = Double.parseDouble(args[3]);
>         }
> 
>         new PowerIteration2().run();
>     }
> 
>     public static final class deltaFilter implements FlatJoinFunction<Tuple3<Integer, Integer, Double>,Tuple3<Integer, Integer, Double>,Tuple3<Integer, Integer, Double>> {
> 
>         public void join(Tuple3<Integer, Integer, Double> candidate, Tuple3<Integer, Integer, Double> old, Collector<Tuple3<Integer, Integer, Double>> out) {
> 
>             if(!(candidate.f2 == old.f2)){
>                 out.collect(candidate);
>             }
> 
>             //if(Math.abs(candidate.f2-old.f2) > delta){
>             //    out.collect(candidate);
>             //}
> 
>         }
>     }
> 
>     public static final class normalizeByMax implements
>             MapFunction<Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>>,
>                     Tuple3<Integer, Integer, Double>> {
> 
>         public Tuple3<Integer, Integer, Double> map(
>                 Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>> value)
>                 throws Exception {
>             return new Tuple3<Integer, Integer, Double>(value.f0.f0,value.f0.f1,value.f0.f2/(value.f1.f2));
>         }
>     }
> 
>     public static final class firstX implements
>             MapFunction<Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>>,
>                     Tuple3<Integer, Integer, Double>> {
> 
>         public Tuple3<Integer, Integer, Double> map(
>                 Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>> value)
>                 throws Exception {
>             return new Tuple3<Integer, Integer, Double>(value.f0.f0,value.f0.f1,1/(value.f0.f2*value.f1.f2));
>         }
>     }
> 
>     public static final class resetIndex implements
>             MapFunction<Tuple3<Integer, Integer, Double>,
>                     Tuple3<Integer, Integer, Double>> {
> 
>         public Tuple3<Integer, Integer, Double> map(
>                 Tuple3<Integer, Integer, Double>value)
>                 throws Exception {
>             return new Tuple3<Integer, Integer, Double>(0,value.f1,value.f2);
>         }
>     }
> 
> 
>     public static final class decBy1 implements
>             MapFunction<Tuple3<Integer, Integer, Double>,
>                     Tuple3<Integer, Integer, Double>> {
> 
>         public Tuple3<Integer, Integer, Double> map(
>                 Tuple3<Integer, Integer, Double>value)
>                 throws Exception {
>             return new Tuple3<Integer, Integer, Double>(value.f0-1,value.f1-1,value.f2);
>         }
>     }
> 
>     public static final class resetIndex2 implements
>             MapFunction<Tuple3<Integer, Integer, Double>,
>                     Tuple3<Integer, Integer, Double>> {
> 
>         public Tuple3<Integer, Integer, Double> map(
>                 Tuple3<Integer, Integer, Double>value)
>                 throws Exception {
>             return new Tuple3<Integer, Integer, Double>(value.f0,0,value.f2);
>         }
>     }
> 
>     public static final class MatrixTimesValue implements
>             MapFunction<Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>>,
>                     Tuple3<Integer, Integer, Double>> {
> 
>         @Override
>         public Tuple3<Integer, Integer, Double> map(
>                 Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>> value)
>                 throws Exception {
> 
>             return new Tuple3<Integer, Integer, Double>(value.f0.f0,value.f0.f1,value.f0.f2*(value.f1.f2));
>         }
>     }
> 
>     public static final class MatrixMinusMatrix implements
>             MapFunction<Tuple2<Tuple3<Integer, Integer, Double>,
>                     Tuple3<Integer, Integer, Double>>,
>                     Tuple3<Integer, Integer, Double>> {
>         @Override
>         public Tuple3<Integer, Integer, Double> map(
>                 Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>> value)
>                 throws Exception {
>             Integer row = value.f0.f0;
>             Integer column = value.f0.f1;
>             Double result = value.f0.f2 - value.f1.f2;
>             return new Tuple3<Integer, Integer, Double>(row, column, result);
>         }
>     }
> 
>     public static final class getGapCenter implements
>             MapFunction<Tuple2<Tuple3<Integer, Integer, Double>,
>                     Tuple3<Integer, Integer, Double>>,
>                     Tuple3<Integer, Integer, Double>> {
>         @Override
>         public Tuple3<Integer, Integer, Double> map(
>                 Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>> value)
>                 throws Exception {
>             Integer row = value.f0.f0;
>             Integer column = value.f0.f1;
>             Double result = value.f0.f2 + (2/(Math.abs(value.f1.f2)));
>             return new Tuple3<Integer, Integer, Double>(row, column, result);
>         }
>     }
> 
> 
>     public static DataSet<Tuple3<Integer, Integer, Double>> PowerIteration_getEigenVector(DataSet<Tuple3<Integer, Integer, Double>> matrixA) throws Exception {
> 
>         //get initial vector - which equals matrixA * [1, ... , 1]
>         DataSet<Tuple3<Integer, Integer, Double>> initial0 = matrixA.groupBy(0).aggregate(Aggregations.SUM,2);
> 
>         //normalize by maximum value
>         DataSet<Tuple3<Integer, Integer, Double>> initial= initial0.cross(initial0.maxBy(2)).map(new normalizeByMax());
> 
>         //BulkIteration to find dominant eigenvector
>         IterativeDataSet<Tuple3<Integer, Integer, Double>> iteration = initial.iterate(iterations);
> 
>         DataSet<Tuple3<Integer, Integer, Double>> intermediate = (matrixA.join(iteration).where(1).equalTo(0)
>                 .map(new ProjectJoinResultMapper()).groupBy(0, 1).aggregate(Aggregations.SUM, 2)).groupBy(0).aggregate(Aggregations.SUM, 2).
>                 cross((matrixA.join(iteration).where(1).equalTo(0)
>                         .map(new ProjectJoinResultMapper()).groupBy(0, 1).aggregate(Aggregations.SUM, 2)).groupBy(0).aggregate(Aggregations.SUM, 2).maxBy(2))
>                 .map(new normalizeByMax());
> 
>         DataSet<Tuple3<Integer, Integer, Double>> diffs = iteration.join(intermediate).where(0).equalTo(0).with(new deltaFilter());
>         DataSet<Tuple3<Integer, Integer, Double>> eigenVector  = iteration.closeWith(intermediate,diffs);
> 
>         return eigenVector;
>     }
> 
>     public static DataSet<Tuple3<Integer, Integer, Double>> PowerIteration_getEigenValue(DataSet<Tuple3<Integer, Integer, Double>> matrixA, DataSet<Tuple3<Integer, Integer, Double>> eigenVector) {
> 
>         //determine now EigenValue by approximating the Rayleigh Quotient:
>         //get Ax
>         DataSet<Tuple3<Integer, Integer, Double>> Ax = matrixA.join(eigenVector).where(1).equalTo(0)
>                 .map(new ProjectJoinResultMapper()).groupBy(0, 1).aggregate(Aggregations.SUM, 2).groupBy(0).aggregate(Aggregations.SUM, 2);
>         //get Ax * x
>         DataSet<Tuple3<Integer, Integer, Double>> Axx = eigenVector.join(Ax).where(0).equalTo(0)
>                 .map(new ProjectJoinResultMapper()).groupBy(0, 1).aggregate(Aggregations.SUM, 2).aggregate(Aggregations.SUM,2);
> 
>         //now x * x
>         DataSet<Tuple3<Integer, Integer, Double>> xx = eigenVector.join(eigenVector).where(0).equalTo(0)
>                 .map(new ProjectJoinResultMapper()).groupBy(0, 1).aggregate(Aggregations.SUM, 2).aggregate(Aggregations.SUM,2);
> 
>         return Axx.cross(xx).map(new RQ()).aggregate(Aggregations.SUM, 2);
>     }
> 
>     public static DataSet<Tuple3<Integer, Integer, Double>> PowerIteration_getNextMatrix(DataSet<Tuple3<Integer, Integer, Double>> matrixA, DataSet<Tuple3<Integer, Integer, Double>> eigenVector, DataSet<Tuple3<Integer, Integer, Double>> eigenValue) {
> 
>         DataSet<Tuple3<Integer, Integer, Double>> eigenValueReset = eigenValue.map(new resetIndex());
>         DataSet<Tuple3<Integer, Integer, Double>> firstVal = eigenVector.filter(new FilterFunction<Tuple3<Integer, Integer, Double>>() {
>             public boolean filter(Tuple3<Integer, Integer, Double> value) {
>                 return value.f0 == 0;
>             }
>         });
>         firstVal = eigenValueReset.cross(firstVal).map(new firstX());
>         DataSet<Tuple3<Integer, Integer, Double>> firstRow = matrixA.filter(new FilterFunction<Tuple3<Integer, Integer, Double>>() {
>             public boolean filter(Tuple3<Integer, Integer, Double> value) {
>                 return value.f0 == 0;
>             }
>         });
>         DataSet<Tuple3<Integer, Integer, Double>> x = ((firstRow.map(new DataSetUtils.transpose())).join(firstVal).where(1).equalTo(0).map(new MatrixTimesValue())).map(new DataSetUtils.transpose());
>         DataSet<Tuple3<Integer, Integer, Double>> C = eigenVector.cross(eigenValueReset).map(new MatrixTimesValue()).map(new resetIndex2()).join(x).where(1).equalTo(0).
>                 map(new ProjectJoinResultMapper()).groupBy(0, 1).aggregate(Aggregations.SUM, 2);
>         matrixA = matrixA.join(C).where(0,1).equalTo(0,1).map(new MatrixMinusMatrix());
>         matrixA = matrixA.filter(new FilterFunction<Tuple3<Integer, Integer, Double>>() {
>             public boolean filter(Tuple3<Integer, Integer, Double> value) {
>                 return (value.f0 != 0) && (value.f1 != 0);
>             }
>         });
> 
>         return matrixA.map(new decBy1());
>     }
> 
>     public MyResult PowerIteration_routine(MyResult initial) throws Exception {
> 
>         //Approximate EigenVector by PowerIteration
>         DataSet<Tuple3<Integer, Integer, Double>> eigenVector = PowerIteration_getEigenVector(initial.matrixA);
>         //Approximate EigenValue by PowerIteration
>         DataSet<Tuple3<Integer, Integer, Double>> eigenValue = PowerIteration_getEigenValue(initial.matrixA, eigenVector);
>         //get gap
>         DataSet<Tuple3<Integer, Integer, Double>> gap = initial.eigenValue.cross(eigenValue).map(new MatrixMinusMatrix());
>         //Deflate original matrix
>         DataSet<Tuple3<Integer, Integer, Double>> matrixA = PowerIteration_getNextMatrix(initial.matrixA,eigenVector,eigenValue);
> 
>         return new MyResult(eigenVector,eigenValue,matrixA,gap);
>     }
> 
>     public class MyResult {
>         DataSet<Tuple3<Integer, Integer, Double>> eigenVector;
>         DataSet<Tuple3<Integer, Integer, Double>> eigenValue;
>         DataSet<Tuple3<Integer, Integer, Double>> gap;
>         DataSet<Tuple3<Integer, Integer, Double>> matrixA;
> 
>         public MyResult(DataSet<Tuple3<Integer, Integer, Double>> eigenVector, DataSet<Tuple3<Integer, Integer, Double>> eigenValue,DataSet<Tuple3<Integer, Integer, Double>> matrixA){
>             this.eigenVector = eigenVector;
>             this.eigenValue =eigenValue;
>             this.matrixA = matrixA;
>         }
> 
>         public MyResult(DataSet<Tuple3<Integer, Integer, Double>> eigenVector, DataSet<Tuple3<Integer, Integer, Double>> eigenValue,DataSet<Tuple3<Integer, Integer, Double>> matrixA, DataSet<Tuple3<Integer, Integer, Double>> gap){
>             this.eigenVector = eigenVector;
>             this.eigenValue =eigenValue;
>             this.matrixA = matrixA;
>             this.gap = gap;
>         }
>     }
> 
> }
>