You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Lydia Ickler <ic...@googlemail.com> on 2016/03/26 23:16:36 UTC
for loop slow
Hi,
I have an issue with a for-loop.
If I set the maximal iteration number i to more than 3 it gets stuck and I cannot figure out why.
With 1, 2 or 3 it runs smoothly.
I attached the code below and marked the loop with //PROBLEM.
Thanks in advance!
Lydia
package org.apache.flink.contrib.lifescience.examples;
import edu.princeton.cs.algs4.Graph;
import edu.princeton.cs.algs4.SymbolDigraph;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.java.io.CsvReader;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.contrib.lifescience.networks.algos.DataSetUtils;
import org.apache.flink.contrib.lifescience.networks.datatypes.networks.Network;
import org.apache.flink.contrib.lifescience.networks.datatypes.networks.NetworkEdge;
import org.apache.flink.contrib.lifescience.networks.datatypes.networks.NetworkNode;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.util.Collector;
import java.util.*;
import static edu.princeton.cs.algs4.GraphGenerator.simple;
public class PowerIteration {
//path to input
static String input = null;
//path to output
static String output = null;
//number of iterations (default = 7)
static int iterations = 7;
//threshold
static double delta = 0.01;
public void run() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//read input file
DataSet<Tuple3<Integer, Integer, Double>> matrixA = readMatrix(env, input);
DataSet<Tuple3<Integer, Integer, Double>> eigenVector;
DataSet<Tuple3<Integer, Integer, Double>> eigenValue;
//initial:
//Approximate EigenVector by PowerIteration
eigenVector = PowerIteration_getEigenVector(matrixA);
//Approximate EigenValue by PowerIteration
eigenValue = PowerIteration_getEigenValue(matrixA,eigenVector);
//Deflate original matrix
matrixA = PowerIteration_getNextMatrix(matrixA,eigenVector,eigenValue);
MyResult initial = new MyResult(eigenVector,eigenValue,matrixA);
MyResult next = null;
//PROBLEM!!! get i eigenvalue gaps
for(int i=0;i<2;i++){
next = PowerIteration_routine(initial);
initial = next;
next.gap.print();
}
env.execute("Power Iteration");
}
public static DataSource<Tuple3<Integer, Integer, Double>> readMatrix(ExecutionEnvironment env,
String filePath) {
CsvReader csvReader = env.readCsvFile(filePath);
csvReader.fieldDelimiter(",");
csvReader.includeFields("ttt");
return csvReader.types(Integer.class, Integer.class, Double.class);
}
public static final class ProjectJoinResultMapper implements
MapFunction<Tuple2<Tuple3<Integer, Integer, Double>,
Tuple3<Integer, Integer, Double>>,
Tuple3<Integer, Integer, Double>> {
@Override
public Tuple3<Integer, Integer, Double> map(
Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>> value)
throws Exception {
Integer row = value.f0.f0;
Integer column = value.f1.f1;
Double product = value.f0.f2 * value.f1.f2;
return new Tuple3<Integer, Integer, Double>(row, column, product);
}
}
public static final class RQ implements
MapFunction<Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>>,
Tuple3<Integer, Integer, Double>> {
@Override
public Tuple3<Integer, Integer, Double> map(
Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>> value)
throws Exception {
return new Tuple3<Integer, Integer, Double>(value.f0.f0,value.f0.f1,value.f0.f2/value.f1.f2);
}
}
public static void main(String[] args) throws Exception {
if(args.length<2 || args.length > 4){
System.err.println("Usage: PowerIteration <input path> <result path> optional: <iterations> <threshold diff>");
System.exit(0);
}
input = args[0];
output = args[1];
if(args.length==3) {
iterations = Integer.parseInt(args[2]);
}
if(args.length==4){
delta = Double.parseDouble(args[3]);
}
new PowerIteration2().run();
}
public static final class deltaFilter implements FlatJoinFunction<Tuple3<Integer, Integer, Double>,Tuple3<Integer, Integer, Double>,Tuple3<Integer, Integer, Double>> {
public void join(Tuple3<Integer, Integer, Double> candidate, Tuple3<Integer, Integer, Double> old, Collector<Tuple3<Integer, Integer, Double>> out) {
if(!(candidate.f2 == old.f2)){
out.collect(candidate);
}
//if(Math.abs(candidate.f2-old.f2) > delta){
// out.collect(candidate);
//}
}
}
public static final class normalizeByMax implements
MapFunction<Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>>,
Tuple3<Integer, Integer, Double>> {
public Tuple3<Integer, Integer, Double> map(
Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>> value)
throws Exception {
return new Tuple3<Integer, Integer, Double>(value.f0.f0,value.f0.f1,value.f0.f2/(value.f1.f2));
}
}
public static final class firstX implements
MapFunction<Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>>,
Tuple3<Integer, Integer, Double>> {
public Tuple3<Integer, Integer, Double> map(
Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>> value)
throws Exception {
return new Tuple3<Integer, Integer, Double>(value.f0.f0,value.f0.f1,1/(value.f0.f2*value.f1.f2));
}
}
public static final class resetIndex implements
MapFunction<Tuple3<Integer, Integer, Double>,
Tuple3<Integer, Integer, Double>> {
public Tuple3<Integer, Integer, Double> map(
Tuple3<Integer, Integer, Double>value)
throws Exception {
return new Tuple3<Integer, Integer, Double>(0,value.f1,value.f2);
}
}
public static final class decBy1 implements
MapFunction<Tuple3<Integer, Integer, Double>,
Tuple3<Integer, Integer, Double>> {
public Tuple3<Integer, Integer, Double> map(
Tuple3<Integer, Integer, Double>value)
throws Exception {
return new Tuple3<Integer, Integer, Double>(value.f0-1,value.f1-1,value.f2);
}
}
public static final class resetIndex2 implements
MapFunction<Tuple3<Integer, Integer, Double>,
Tuple3<Integer, Integer, Double>> {
public Tuple3<Integer, Integer, Double> map(
Tuple3<Integer, Integer, Double>value)
throws Exception {
return new Tuple3<Integer, Integer, Double>(value.f0,0,value.f2);
}
}
public static final class MatrixTimesValue implements
MapFunction<Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>>,
Tuple3<Integer, Integer, Double>> {
@Override
public Tuple3<Integer, Integer, Double> map(
Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>> value)
throws Exception {
return new Tuple3<Integer, Integer, Double>(value.f0.f0,value.f0.f1,value.f0.f2*(value.f1.f2));
}
}
public static final class MatrixMinusMatrix implements
MapFunction<Tuple2<Tuple3<Integer, Integer, Double>,
Tuple3<Integer, Integer, Double>>,
Tuple3<Integer, Integer, Double>> {
@Override
public Tuple3<Integer, Integer, Double> map(
Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>> value)
throws Exception {
Integer row = value.f0.f0;
Integer column = value.f0.f1;
Double result = value.f0.f2 - value.f1.f2;
return new Tuple3<Integer, Integer, Double>(row, column, result);
}
}
public static final class getGapCenter implements
MapFunction<Tuple2<Tuple3<Integer, Integer, Double>,
Tuple3<Integer, Integer, Double>>,
Tuple3<Integer, Integer, Double>> {
@Override
public Tuple3<Integer, Integer, Double> map(
Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>> value)
throws Exception {
Integer row = value.f0.f0;
Integer column = value.f0.f1;
Double result = value.f0.f2 + (2/(Math.abs(value.f1.f2)));
return new Tuple3<Integer, Integer, Double>(row, column, result);
}
}
public static DataSet<Tuple3<Integer, Integer, Double>> PowerIteration_getEigenVector(DataSet<Tuple3<Integer, Integer, Double>> matrixA) throws Exception {
//get initial vector - which equals matrixA * [1, ... , 1]
DataSet<Tuple3<Integer, Integer, Double>> initial0 = matrixA.groupBy(0).aggregate(Aggregations.SUM,2);
//normalize by maximum value
DataSet<Tuple3<Integer, Integer, Double>> initial= initial0.cross(initial0.maxBy(2)).map(new normalizeByMax());
//BulkIteration to find dominant eigenvector
IterativeDataSet<Tuple3<Integer, Integer, Double>> iteration = initial.iterate(iterations);
DataSet<Tuple3<Integer, Integer, Double>> intermediate = (matrixA.join(iteration).where(1).equalTo(0)
.map(new ProjectJoinResultMapper()).groupBy(0, 1).aggregate(Aggregations.SUM, 2)).groupBy(0).aggregate(Aggregations.SUM, 2).
cross((matrixA.join(iteration).where(1).equalTo(0)
.map(new ProjectJoinResultMapper()).groupBy(0, 1).aggregate(Aggregations.SUM, 2)).groupBy(0).aggregate(Aggregations.SUM, 2).maxBy(2))
.map(new normalizeByMax());
DataSet<Tuple3<Integer, Integer, Double>> diffs = iteration.join(intermediate).where(0).equalTo(0).with(new deltaFilter());
DataSet<Tuple3<Integer, Integer, Double>> eigenVector = iteration.closeWith(intermediate,diffs);
return eigenVector;
}
public static DataSet<Tuple3<Integer, Integer, Double>> PowerIteration_getEigenValue(DataSet<Tuple3<Integer, Integer, Double>> matrixA, DataSet<Tuple3<Integer, Integer, Double>> eigenVector) {
//determine now EigenValue by approximating the Rayleigh Quotient:
//get Ax
DataSet<Tuple3<Integer, Integer, Double>> Ax = matrixA.join(eigenVector).where(1).equalTo(0)
.map(new ProjectJoinResultMapper()).groupBy(0, 1).aggregate(Aggregations.SUM, 2).groupBy(0).aggregate(Aggregations.SUM, 2);
//get Ax * x
DataSet<Tuple3<Integer, Integer, Double>> Axx = eigenVector.join(Ax).where(0).equalTo(0)
.map(new ProjectJoinResultMapper()).groupBy(0, 1).aggregate(Aggregations.SUM, 2).aggregate(Aggregations.SUM,2);
//now x * x
DataSet<Tuple3<Integer, Integer, Double>> xx = eigenVector.join(eigenVector).where(0).equalTo(0)
.map(new ProjectJoinResultMapper()).groupBy(0, 1).aggregate(Aggregations.SUM, 2).aggregate(Aggregations.SUM,2);
return Axx.cross(xx).map(new RQ()).aggregate(Aggregations.SUM, 2);
}
public static DataSet<Tuple3<Integer, Integer, Double>> PowerIteration_getNextMatrix(DataSet<Tuple3<Integer, Integer, Double>> matrixA, DataSet<Tuple3<Integer, Integer, Double>> eigenVector, DataSet<Tuple3<Integer, Integer, Double>> eigenValue) {
DataSet<Tuple3<Integer, Integer, Double>> eigenValueReset = eigenValue.map(new resetIndex());
DataSet<Tuple3<Integer, Integer, Double>> firstVal = eigenVector.filter(new FilterFunction<Tuple3<Integer, Integer, Double>>() {
public boolean filter(Tuple3<Integer, Integer, Double> value) {
return value.f0 == 0;
}
});
firstVal = eigenValueReset.cross(firstVal).map(new firstX());
DataSet<Tuple3<Integer, Integer, Double>> firstRow = matrixA.filter(new FilterFunction<Tuple3<Integer, Integer, Double>>() {
public boolean filter(Tuple3<Integer, Integer, Double> value) {
return value.f0 == 0;
}
});
DataSet<Tuple3<Integer, Integer, Double>> x = ((firstRow.map(new DataSetUtils.transpose())).join(firstVal).where(1).equalTo(0).map(new MatrixTimesValue())).map(new DataSetUtils.transpose());
DataSet<Tuple3<Integer, Integer, Double>> C = eigenVector.cross(eigenValueReset).map(new MatrixTimesValue()).map(new resetIndex2()).join(x).where(1).equalTo(0).
map(new ProjectJoinResultMapper()).groupBy(0, 1).aggregate(Aggregations.SUM, 2);
matrixA = matrixA.join(C).where(0,1).equalTo(0,1).map(new MatrixMinusMatrix());
matrixA = matrixA.filter(new FilterFunction<Tuple3<Integer, Integer, Double>>() {
public boolean filter(Tuple3<Integer, Integer, Double> value) {
return (value.f0 != 0) && (value.f1 != 0);
}
});
return matrixA.map(new decBy1());
}
public MyResult PowerIteration_routine(MyResult initial) throws Exception {
//Approximate EigenVector by PowerIteration
DataSet<Tuple3<Integer, Integer, Double>> eigenVector = PowerIteration_getEigenVector(initial.matrixA);
//Approximate EigenValue by PowerIteration
DataSet<Tuple3<Integer, Integer, Double>> eigenValue = PowerIteration_getEigenValue(initial.matrixA, eigenVector);
//get gap
DataSet<Tuple3<Integer, Integer, Double>> gap = initial.eigenValue.cross(eigenValue).map(new MatrixMinusMatrix());
//Deflate original matrix
DataSet<Tuple3<Integer, Integer, Double>> matrixA = PowerIteration_getNextMatrix(initial.matrixA,eigenVector,eigenValue);
return new MyResult(eigenVector,eigenValue,matrixA,gap);
}
public class MyResult {
DataSet<Tuple3<Integer, Integer, Double>> eigenVector;
DataSet<Tuple3<Integer, Integer, Double>> eigenValue;
DataSet<Tuple3<Integer, Integer, Double>> gap;
DataSet<Tuple3<Integer, Integer, Double>> matrixA;
public MyResult(DataSet<Tuple3<Integer, Integer, Double>> eigenVector, DataSet<Tuple3<Integer, Integer, Double>> eigenValue,DataSet<Tuple3<Integer, Integer, Double>> matrixA){
this.eigenVector = eigenVector;
this.eigenValue =eigenValue;
this.matrixA = matrixA;
}
public MyResult(DataSet<Tuple3<Integer, Integer, Double>> eigenVector, DataSet<Tuple3<Integer, Integer, Double>> eigenValue,DataSet<Tuple3<Integer, Integer, Double>> matrixA, DataSet<Tuple3<Integer, Integer, Double>> gap){
this.eigenVector = eigenVector;
this.eigenValue =eigenValue;
this.matrixA = matrixA;
this.gap = gap;
}
}
}
Re: for loop slow
Posted by Chiwan Park <ch...@apache.org>.
Hi Lydia,
To build iterative algorithm on Flink, using API for iterations [1] would be better than using for-loop. Your program triggers multiple executions by multiple calling `next.gap.print()`. In each execution, Flink reads whole data redundantly and it cause performance to decrease.
Regards,
Chiwan Park
[1]: https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/batch/iterations.html
> On Mar 27, 2016, at 7:16 AM, Lydia Ickler <ic...@googlemail.com> wrote:
>
> Hi,
>
> I have an issue with a for-loop.
> If I set the maximal iteration number i to more than 3 it gets stuck and I cannot figure out why.
> With 1, 2 or 3 it runs smoothly.
> I attached the code below and marked the loop with //PROBLEM.
>
> Thanks in advance!
> Lydia
>
> package org.apache.flink.contrib.lifescience.examples;
>
> import edu.princeton.cs.algs4.Graph;
> import edu.princeton.cs.algs4.SymbolDigraph;
> import org.apache.flink.api.common.functions.FilterFunction;
> import org.apache.flink.api.common.functions.FlatJoinFunction;
> import org.apache.flink.api.common.functions.MapFunction;
> import org.apache.flink.api.java.DataSet;
> import org.apache.flink.api.java.ExecutionEnvironment;
> import org.apache.flink.api.java.aggregation.Aggregations;
> import org.apache.flink.api.java.io.CsvReader;
> import org.apache.flink.api.java.operators.DataSource;
> import org.apache.flink.api.java.operators.IterativeDataSet;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.api.java.tuple.Tuple3;
> import org.apache.flink.contrib.lifescience.networks.algos.DataSetUtils;
> import org.apache.flink.contrib.lifescience.networks.datatypes.networks.Network;
> import org.apache.flink.contrib.lifescience.networks.datatypes.networks.NetworkEdge;
> import org.apache.flink.contrib.lifescience.networks.datatypes.networks.NetworkNode;
> import org.apache.flink.core.fs.FileSystem;
> import org.apache.flink.util.Collector;
>
> import java.util.*;
>
> import static edu.princeton.cs.algs4.GraphGenerator.simple;
>
> public class PowerIteration {
>
> //path to input
> static String input = null;
> //path to output
> static String output = null;
> //number of iterations (default = 7)
> static int iterations = 7;
> //threshold
> static double delta = 0.01;
>
> public void run() throws Exception {
> ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
>
> //read input file
> DataSet<Tuple3<Integer, Integer, Double>> matrixA = readMatrix(env, input);
>
> DataSet<Tuple3<Integer, Integer, Double>> eigenVector;
> DataSet<Tuple3<Integer, Integer, Double>> eigenValue;
>
> //initial:
> //Approximate EigenVector by PowerIteration
> eigenVector = PowerIteration_getEigenVector(matrixA);
> //Approximate EigenValue by PowerIteration
> eigenValue = PowerIteration_getEigenValue(matrixA,eigenVector);
> //Deflate original matrix
> matrixA = PowerIteration_getNextMatrix(matrixA,eigenVector,eigenValue);
>
> MyResult initial = new MyResult(eigenVector,eigenValue,matrixA);
>
> MyResult next = null;
>
> //PROBLEM!!! get i eigenvalue gaps
> for(int i=0;i<2;i++){
> next = PowerIteration_routine(initial);
> initial = next;
> next.gap.print();
> }
>
> env.execute("Power Iteration");
> }
>
> public static DataSource<Tuple3<Integer, Integer, Double>> readMatrix(ExecutionEnvironment env,
> String filePath) {
> CsvReader csvReader = env.readCsvFile(filePath);
> csvReader.fieldDelimiter(",");
> csvReader.includeFields("ttt");
> return csvReader.types(Integer.class, Integer.class, Double.class);
> }
>
> public static final class ProjectJoinResultMapper implements
> MapFunction<Tuple2<Tuple3<Integer, Integer, Double>,
> Tuple3<Integer, Integer, Double>>,
> Tuple3<Integer, Integer, Double>> {
> @Override
> public Tuple3<Integer, Integer, Double> map(
> Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>> value)
> throws Exception {
> Integer row = value.f0.f0;
> Integer column = value.f1.f1;
> Double product = value.f0.f2 * value.f1.f2;
> return new Tuple3<Integer, Integer, Double>(row, column, product);
> }
> }
>
> public static final class RQ implements
> MapFunction<Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>>,
> Tuple3<Integer, Integer, Double>> {
>
> @Override
> public Tuple3<Integer, Integer, Double> map(
> Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>> value)
> throws Exception {
>
> return new Tuple3<Integer, Integer, Double>(value.f0.f0,value.f0.f1,value.f0.f2/value.f1.f2);
> }
> }
>
> public static void main(String[] args) throws Exception {
> if(args.length<2 || args.length > 4){
> System.err.println("Usage: PowerIteration <input path> <result path> optional: <iterations> <threshold diff>");
> System.exit(0);
> }
>
> input = args[0];
> output = args[1];
>
> if(args.length==3) {
> iterations = Integer.parseInt(args[2]);
> }
> if(args.length==4){
> delta = Double.parseDouble(args[3]);
> }
>
> new PowerIteration2().run();
> }
>
> public static final class deltaFilter implements FlatJoinFunction<Tuple3<Integer, Integer, Double>,Tuple3<Integer, Integer, Double>,Tuple3<Integer, Integer, Double>> {
>
> public void join(Tuple3<Integer, Integer, Double> candidate, Tuple3<Integer, Integer, Double> old, Collector<Tuple3<Integer, Integer, Double>> out) {
>
> if(!(candidate.f2 == old.f2)){
> out.collect(candidate);
> }
>
> //if(Math.abs(candidate.f2-old.f2) > delta){
> // out.collect(candidate);
> //}
>
> }
> }
>
> public static final class normalizeByMax implements
> MapFunction<Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>>,
> Tuple3<Integer, Integer, Double>> {
>
> public Tuple3<Integer, Integer, Double> map(
> Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>> value)
> throws Exception {
> return new Tuple3<Integer, Integer, Double>(value.f0.f0,value.f0.f1,value.f0.f2/(value.f1.f2));
> }
> }
>
> public static final class firstX implements
> MapFunction<Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>>,
> Tuple3<Integer, Integer, Double>> {
>
> public Tuple3<Integer, Integer, Double> map(
> Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>> value)
> throws Exception {
> return new Tuple3<Integer, Integer, Double>(value.f0.f0,value.f0.f1,1/(value.f0.f2*value.f1.f2));
> }
> }
>
> public static final class resetIndex implements
> MapFunction<Tuple3<Integer, Integer, Double>,
> Tuple3<Integer, Integer, Double>> {
>
> public Tuple3<Integer, Integer, Double> map(
> Tuple3<Integer, Integer, Double>value)
> throws Exception {
> return new Tuple3<Integer, Integer, Double>(0,value.f1,value.f2);
> }
> }
>
>
> public static final class decBy1 implements
> MapFunction<Tuple3<Integer, Integer, Double>,
> Tuple3<Integer, Integer, Double>> {
>
> public Tuple3<Integer, Integer, Double> map(
> Tuple3<Integer, Integer, Double>value)
> throws Exception {
> return new Tuple3<Integer, Integer, Double>(value.f0-1,value.f1-1,value.f2);
> }
> }
>
> public static final class resetIndex2 implements
> MapFunction<Tuple3<Integer, Integer, Double>,
> Tuple3<Integer, Integer, Double>> {
>
> public Tuple3<Integer, Integer, Double> map(
> Tuple3<Integer, Integer, Double>value)
> throws Exception {
> return new Tuple3<Integer, Integer, Double>(value.f0,0,value.f2);
> }
> }
>
> public static final class MatrixTimesValue implements
> MapFunction<Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>>,
> Tuple3<Integer, Integer, Double>> {
>
> @Override
> public Tuple3<Integer, Integer, Double> map(
> Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>> value)
> throws Exception {
>
> return new Tuple3<Integer, Integer, Double>(value.f0.f0,value.f0.f1,value.f0.f2*(value.f1.f2));
> }
> }
>
> public static final class MatrixMinusMatrix implements
> MapFunction<Tuple2<Tuple3<Integer, Integer, Double>,
> Tuple3<Integer, Integer, Double>>,
> Tuple3<Integer, Integer, Double>> {
> @Override
> public Tuple3<Integer, Integer, Double> map(
> Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>> value)
> throws Exception {
> Integer row = value.f0.f0;
> Integer column = value.f0.f1;
> Double result = value.f0.f2 - value.f1.f2;
> return new Tuple3<Integer, Integer, Double>(row, column, result);
> }
> }
>
> public static final class getGapCenter implements
> MapFunction<Tuple2<Tuple3<Integer, Integer, Double>,
> Tuple3<Integer, Integer, Double>>,
> Tuple3<Integer, Integer, Double>> {
> @Override
> public Tuple3<Integer, Integer, Double> map(
> Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>> value)
> throws Exception {
> Integer row = value.f0.f0;
> Integer column = value.f0.f1;
> Double result = value.f0.f2 + (2/(Math.abs(value.f1.f2)));
> return new Tuple3<Integer, Integer, Double>(row, column, result);
> }
> }
>
>
> public static DataSet<Tuple3<Integer, Integer, Double>> PowerIteration_getEigenVector(DataSet<Tuple3<Integer, Integer, Double>> matrixA) throws Exception {
>
> //get initial vector - which equals matrixA * [1, ... , 1]
> DataSet<Tuple3<Integer, Integer, Double>> initial0 = matrixA.groupBy(0).aggregate(Aggregations.SUM,2);
>
> //normalize by maximum value
> DataSet<Tuple3<Integer, Integer, Double>> initial= initial0.cross(initial0.maxBy(2)).map(new normalizeByMax());
>
> //BulkIteration to find dominant eigenvector
> IterativeDataSet<Tuple3<Integer, Integer, Double>> iteration = initial.iterate(iterations);
>
> DataSet<Tuple3<Integer, Integer, Double>> intermediate = (matrixA.join(iteration).where(1).equalTo(0)
> .map(new ProjectJoinResultMapper()).groupBy(0, 1).aggregate(Aggregations.SUM, 2)).groupBy(0).aggregate(Aggregations.SUM, 2).
> cross((matrixA.join(iteration).where(1).equalTo(0)
> .map(new ProjectJoinResultMapper()).groupBy(0, 1).aggregate(Aggregations.SUM, 2)).groupBy(0).aggregate(Aggregations.SUM, 2).maxBy(2))
> .map(new normalizeByMax());
>
> DataSet<Tuple3<Integer, Integer, Double>> diffs = iteration.join(intermediate).where(0).equalTo(0).with(new deltaFilter());
> DataSet<Tuple3<Integer, Integer, Double>> eigenVector = iteration.closeWith(intermediate,diffs);
>
> return eigenVector;
> }
>
> public static DataSet<Tuple3<Integer, Integer, Double>> PowerIteration_getEigenValue(DataSet<Tuple3<Integer, Integer, Double>> matrixA, DataSet<Tuple3<Integer, Integer, Double>> eigenVector) {
>
> //determine now EigenValue by approximating the Rayleigh Quotient:
> //get Ax
> DataSet<Tuple3<Integer, Integer, Double>> Ax = matrixA.join(eigenVector).where(1).equalTo(0)
> .map(new ProjectJoinResultMapper()).groupBy(0, 1).aggregate(Aggregations.SUM, 2).groupBy(0).aggregate(Aggregations.SUM, 2);
> //get Ax * x
> DataSet<Tuple3<Integer, Integer, Double>> Axx = eigenVector.join(Ax).where(0).equalTo(0)
> .map(new ProjectJoinResultMapper()).groupBy(0, 1).aggregate(Aggregations.SUM, 2).aggregate(Aggregations.SUM,2);
>
> //now x * x
> DataSet<Tuple3<Integer, Integer, Double>> xx = eigenVector.join(eigenVector).where(0).equalTo(0)
> .map(new ProjectJoinResultMapper()).groupBy(0, 1).aggregate(Aggregations.SUM, 2).aggregate(Aggregations.SUM,2);
>
> return Axx.cross(xx).map(new RQ()).aggregate(Aggregations.SUM, 2);
> }
>
> public static DataSet<Tuple3<Integer, Integer, Double>> PowerIteration_getNextMatrix(DataSet<Tuple3<Integer, Integer, Double>> matrixA, DataSet<Tuple3<Integer, Integer, Double>> eigenVector, DataSet<Tuple3<Integer, Integer, Double>> eigenValue) {
>
> DataSet<Tuple3<Integer, Integer, Double>> eigenValueReset = eigenValue.map(new resetIndex());
> DataSet<Tuple3<Integer, Integer, Double>> firstVal = eigenVector.filter(new FilterFunction<Tuple3<Integer, Integer, Double>>() {
> public boolean filter(Tuple3<Integer, Integer, Double> value) {
> return value.f0 == 0;
> }
> });
> firstVal = eigenValueReset.cross(firstVal).map(new firstX());
> DataSet<Tuple3<Integer, Integer, Double>> firstRow = matrixA.filter(new FilterFunction<Tuple3<Integer, Integer, Double>>() {
> public boolean filter(Tuple3<Integer, Integer, Double> value) {
> return value.f0 == 0;
> }
> });
> DataSet<Tuple3<Integer, Integer, Double>> x = ((firstRow.map(new DataSetUtils.transpose())).join(firstVal).where(1).equalTo(0).map(new MatrixTimesValue())).map(new DataSetUtils.transpose());
> DataSet<Tuple3<Integer, Integer, Double>> C = eigenVector.cross(eigenValueReset).map(new MatrixTimesValue()).map(new resetIndex2()).join(x).where(1).equalTo(0).
> map(new ProjectJoinResultMapper()).groupBy(0, 1).aggregate(Aggregations.SUM, 2);
> matrixA = matrixA.join(C).where(0,1).equalTo(0,1).map(new MatrixMinusMatrix());
> matrixA = matrixA.filter(new FilterFunction<Tuple3<Integer, Integer, Double>>() {
> public boolean filter(Tuple3<Integer, Integer, Double> value) {
> return (value.f0 != 0) && (value.f1 != 0);
> }
> });
>
> return matrixA.map(new decBy1());
> }
>
> public MyResult PowerIteration_routine(MyResult initial) throws Exception {
>
> //Approximate EigenVector by PowerIteration
> DataSet<Tuple3<Integer, Integer, Double>> eigenVector = PowerIteration_getEigenVector(initial.matrixA);
> //Approximate EigenValue by PowerIteration
> DataSet<Tuple3<Integer, Integer, Double>> eigenValue = PowerIteration_getEigenValue(initial.matrixA, eigenVector);
> //get gap
> DataSet<Tuple3<Integer, Integer, Double>> gap = initial.eigenValue.cross(eigenValue).map(new MatrixMinusMatrix());
> //Deflate original matrix
> DataSet<Tuple3<Integer, Integer, Double>> matrixA = PowerIteration_getNextMatrix(initial.matrixA,eigenVector,eigenValue);
>
> return new MyResult(eigenVector,eigenValue,matrixA,gap);
> }
>
> public class MyResult {
> DataSet<Tuple3<Integer, Integer, Double>> eigenVector;
> DataSet<Tuple3<Integer, Integer, Double>> eigenValue;
> DataSet<Tuple3<Integer, Integer, Double>> gap;
> DataSet<Tuple3<Integer, Integer, Double>> matrixA;
>
> public MyResult(DataSet<Tuple3<Integer, Integer, Double>> eigenVector, DataSet<Tuple3<Integer, Integer, Double>> eigenValue,DataSet<Tuple3<Integer, Integer, Double>> matrixA){
> this.eigenVector = eigenVector;
> this.eigenValue =eigenValue;
> this.matrixA = matrixA;
> }
>
> public MyResult(DataSet<Tuple3<Integer, Integer, Double>> eigenVector, DataSet<Tuple3<Integer, Integer, Double>> eigenValue,DataSet<Tuple3<Integer, Integer, Double>> matrixA, DataSet<Tuple3<Integer, Integer, Double>> gap){
> this.eigenVector = eigenVector;
> this.eigenValue =eigenValue;
> this.matrixA = matrixA;
> this.gap = gap;
> }
> }
>
> }
>