You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@flink.apache.org by Lydia Ickler <ic...@googlemail.com> on 2016/01/25 14:13:42 UTC

MatrixMultiplication

Hi, 

I want do a simple MatrixMultiplication and use the following code (see bottom).
For matrices 50x50 or 100x100 it is no problem. But already with matrices of 1000x1000 it would not work anymore and gets stuck in the joining part. 
What am I doing wrong?

Best regards, 
Lydia

package de.tuberlin.dima.aim3.assignment3;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.CsvReader;
import org.apache.flink.api.java.operators.DataSource;
import org.apache.flink.api.java.operators.GroupReduceOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.DataSet;


public class MatrixMultiplication {

   static String input = null;
   static String output = null;

   public void run() throws Exception {
      ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

      DataSet<Tuple3<Integer, Integer, Double>> matrixA = readMatrix(env, input);

      matrixA.join(matrixA).where(1).equalTo(0)
            .map(new ProjectJoinResultMapper()).groupBy(0, 1).sum(2).writeAsCsv(output);

     
      env.execute();
   }



   public static DataSource<Tuple3<Integer, Integer, Double>> readMatrix(ExecutionEnvironment env,
         String filePath) {
      CsvReader csvReader = env.readCsvFile(filePath);
      csvReader.fieldDelimiter(',');
      csvReader.includeFields("fttt");
      return csvReader.types(Integer.class, Integer.class, Double.class);
   }

   public static final class ProjectJoinResultMapper implements
            MapFunction<Tuple2<Tuple3<Integer, Integer, Double>,
                           Tuple3<Integer, Integer, Double>>,
                      Tuple3<Integer, Integer, Double>> {
      @Override
      public Tuple3<Integer, Integer, Double> map(
            Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>> value)
            throws Exception {
         Integer row = value.f0.f0;
         Integer column = value.f1.f1;
         Double product = value.f0.f2 * value.f1.f2;
         return new Tuple3<Integer, Integer, Double>(row, column, product);
      }
   }

  
   public static void main(String[] args) throws Exception {
      if(args.length<2){
         System.err.println("Usage: MatrixMultiplication <input path> <result path>");
         System.exit(0);
      }
      input = args[0];
      output = args[1];
      new MatrixMultiplication().run();
   }

}


Re: MatrixMultiplication

Posted by Lydia Ickler <ic...@googlemail.com>.
Hi Till,

thanks for your reply :)
Yes, it finished after ~27 minutes…

Best regards, 
Lydia

> Am 25.01.2016 um 14:27 schrieb Till Rohrmann <tr...@apache.org>:
> 
> Hi Lydia,
> 
> Since matrix multiplication is O(n^3), I would assume that it would simply take 1000 times longer than the multiplication of the 100 x 100 matrix. Have you waited so long to see whether it completes or is there another problem?
> 
> Cheers,
> Till
> 
> On Mon, Jan 25, 2016 at 2:13 PM, Lydia Ickler <icklerly@googlemail.com <ma...@googlemail.com>> wrote:
> Hi, 
> 
> I want do a simple MatrixMultiplication and use the following code (see bottom).
> For matrices 50x50 or 100x100 it is no problem. But already with matrices of 1000x1000 it would not work anymore and gets stuck in the joining part. 
> What am I doing wrong?
> 
> Best regards, 
> Lydia
> 
> package de.tuberlin.dima.aim3.assignment3;
> 
> import org.apache.flink.api.common.functions.MapFunction;
> import org.apache.flink.api.java.ExecutionEnvironment;
> import org.apache.flink.api.java.io.CsvReader;
> import org.apache.flink.api.java.operators.DataSource;
> import org.apache.flink.api.java.operators.GroupReduceOperator;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.api.java.tuple.Tuple3;
> import org.apache.flink.api.java.DataSet;
> 
> 
> public class MatrixMultiplication {
> 
>    static String input = null;
>    static String output = null;
> 
>    public void run() throws Exception {
>       ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
> 
>       DataSet<Tuple3<Integer, Integer, Double>> matrixA = readMatrix(env, input);
> 
>       matrixA.join(matrixA).where(1).equalTo(0)
>             .map(new ProjectJoinResultMapper()).groupBy(0, 1).sum(2).writeAsCsv(output);
> 
>      
>       env.execute();
>    }
> 
> 
> 
>    public static DataSource<Tuple3<Integer, Integer, Double>> readMatrix(ExecutionEnvironment env,
>          String filePath) {
>       CsvReader csvReader = env.readCsvFile(filePath);
>       csvReader.fieldDelimiter(',');
>       csvReader.includeFields("fttt");
>       return csvReader.types(Integer.class, Integer.class, Double.class);
>    }
> 
>    public static final class ProjectJoinResultMapper implements
>             MapFunction<Tuple2<Tuple3<Integer, Integer, Double>,
>                            Tuple3<Integer, Integer, Double>>,
>                       Tuple3<Integer, Integer, Double>> {
>       @Override
>       public Tuple3<Integer, Integer, Double> map(
>             Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>> value)
>             throws Exception {
>          Integer row = value.f0.f0;
>          Integer column = value.f1.f1;
>          Double product = value.f0.f2 * value.f1.f2;
>          return new Tuple3<Integer, Integer, Double>(row, column, product);
>       }
>    }
> 
>   
>    public static void main(String[] args) throws Exception {
>       if(args.length<2){
>          System.err.println("Usage: MatrixMultiplication <input path> <result path>");
>          System.exit(0);
>       }
>       input = args[0];
>       output = args[1];
>       new MatrixMultiplication().run();
>    }
> 
> }
> 
> 


Re: MatrixMultiplication

Posted by Till Rohrmann <tr...@apache.org>.
Hi Lydia,

Since matrix multiplication is O(n^3), I would assume that it would simply
take 1000 times longer than the multiplication of the 100 x 100 matrix.
Have you waited so long to see whether it completes or is there another
problem?

Cheers,
Till

On Mon, Jan 25, 2016 at 2:13 PM, Lydia Ickler <ic...@googlemail.com>
wrote:

> Hi,
>
> I want do a simple MatrixMultiplication and use the following code (see
> bottom).
> For matrices 50x50 or 100x100 it is no problem. But already with matrices
> of 1000x1000 it would not work anymore and gets stuck in the joining part.
> What am I doing wrong?
>
> Best regards,
> Lydia
>
> package de.tuberlin.dima.aim3.assignment3;
>
> import org.apache.flink.api.common.functions.MapFunction;
> import org.apache.flink.api.java.ExecutionEnvironment;
> import org.apache.flink.api.java.io.CsvReader;
> import org.apache.flink.api.java.operators.DataSource;
> import org.apache.flink.api.java.operators.GroupReduceOperator;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.api.java.tuple.Tuple3;
> import org.apache.flink.api.java.DataSet;
>
>
> public class MatrixMultiplication {
>
>    static String input = null;
>    static String output = null;
>
>    public void run() throws Exception {
>       ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
>
>       DataSet<Tuple3<Integer, Integer, Double>> matrixA = readMatrix(env, input);
>
>       matrixA.join(matrixA).where(1).equalTo(0)
>             .map(new ProjectJoinResultMapper()).groupBy(0, 1).sum(2).writeAsCsv(output);
>
>
>       env.execute();
>    }
>
>
>
>    public static DataSource<Tuple3<Integer, Integer, Double>> readMatrix(ExecutionEnvironment env,
>          String filePath) {
>       CsvReader csvReader = env.readCsvFile(filePath);
>       csvReader.fieldDelimiter(',');
>       csvReader.includeFields("fttt");
>       return csvReader.types(Integer.class, Integer.class, Double.class);
>    }
>
>    public static final class ProjectJoinResultMapper implements
>             MapFunction<Tuple2<Tuple3<Integer, Integer, Double>,
>                            Tuple3<Integer, Integer, Double>>,
>                       Tuple3<Integer, Integer, Double>> {
>       @Override
>       public Tuple3<Integer, Integer, Double> map(
>             Tuple2<Tuple3<Integer, Integer, Double>, Tuple3<Integer, Integer, Double>> value)
>             throws Exception {
>          Integer row = value.f0.f0;
>          Integer column = value.f1.f1;
>          Double product = value.f0.f2 * value.f1.f2;
>          return new Tuple3<Integer, Integer, Double>(row, column, product);
>       }
>    }
>
>
>    public static void main(String[] args) throws Exception {
>       if(args.length<2){
>          System.err.println("Usage: MatrixMultiplication <input path> <result path>");
>          System.exit(0);
>       }
>       input = args[0];
>       output = args[1];
>       new MatrixMultiplication().run();
>    }
>
> }
>
>
>