You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mrql.apache.org by fe...@apache.org on 2014/08/28 19:16:53 UTC

git commit: MRQL-47: Fix various bugs for the Flink evaluation mode

Repository: incubator-mrql
Updated Branches:
  refs/heads/master ea690387e -> ee3428505


MRQL-47: Fix various bugs for the Flink evaluation mode


Project: http://git-wip-us.apache.org/repos/asf/incubator-mrql/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mrql/commit/ee342850
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mrql/tree/ee342850
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mrql/diff/ee342850

Branch: refs/heads/master
Commit: ee3428505c014e8da0d1c8a8395c36406d383329
Parents: ea69038
Author: fegaras <fe...@cse.uta.edu>
Authored: Thu Aug 28 12:16:19 2014 -0500
Committer: fegaras <fe...@cse.uta.edu>
Committed: Thu Aug 28 12:16:19 2014 -0500

----------------------------------------------------------------------
 .../main/java/org/apache/mrql/DataSource.java   |  4 +-
 .../main/java/org/apache/mrql/Evaluator.java    | 18 +++++
 core/src/main/java/org/apache/mrql/Plan.java    | 18 -----
 .../main/java/org/apache/mrql/TypeInference.gen |  2 +-
 .../java/org/apache/mrql/FlinkEvaluator.gen     | 81 ++++++++++----------
 5 files changed, 64 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/ee342850/core/src/main/java/org/apache/mrql/DataSource.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/DataSource.java b/core/src/main/java/org/apache/mrql/DataSource.java
index d7723a0..cc156ce 100644
--- a/core/src/main/java/org/apache/mrql/DataSource.java
+++ b/core/src/main/java/org/apache/mrql/DataSource.java
@@ -86,7 +86,9 @@ public class DataSource {
         this.path = path;
         this.inputFormat = inputFormat;
         to_be_merged = false;
-        try {
+        if (Config.flink_mode)
+            dataSourceDirectory.put(path,this);
+        else try {
             Path p = new Path(path);
             FileSystem fs = p.getFileSystem(conf);
             String complete_path = fs.getFileStatus(p).getPath().toString();

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/ee342850/core/src/main/java/org/apache/mrql/Evaluator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/Evaluator.java b/core/src/main/java/org/apache/mrql/Evaluator.java
index 28d356f..0b19599 100644
--- a/core/src/main/java/org/apache/mrql/Evaluator.java
+++ b/core/src/main/java/org/apache/mrql/Evaluator.java
@@ -149,4 +149,22 @@ abstract public class Evaluator extends Interpreter {
 	Config.max_bag_size_print = ps;
 	out.close();
     }
+
+    /** for dumped data to a file, return the MRQL type of the data */
+    public Tree get_type ( String file ) {
+        try {
+            Path path = new Path(file);
+            FileSystem fs = path.getFileSystem(Plan.conf);
+            BufferedReader ftp = new BufferedReader(new InputStreamReader(fs.open(path.suffix(".type"))));
+            String s[] = ftp.readLine().split("@");
+            ftp.close();
+            if (s.length != 2 )
+                return null;
+            if (!s[0].equals("2"))
+                throw new Error("The binary file has been created in java mode and cannot be read in hadoop mode");
+            return Tree.parse(s[1]);
+        } catch (Exception e) {
+            return null;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/ee342850/core/src/main/java/org/apache/mrql/Plan.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/Plan.java b/core/src/main/java/org/apache/mrql/Plan.java
index 15c3a26..116a822 100644
--- a/core/src/main/java/org/apache/mrql/Plan.java
+++ b/core/src/main/java/org/apache/mrql/Plan.java
@@ -335,24 +335,6 @@ public class Plan {
         return new DataSet(new BinaryDataSource(0,newpath,conf),0,0);
     }
 
-    /** for dumped data to a file, return the MRQL type of the data */
-    public final static Tree get_type ( String file ) {
-        try {
-            Path path = new Path(file);
-            FileSystem fs = path.getFileSystem(conf);
-            BufferedReader ftp = new BufferedReader(new InputStreamReader(fs.open(path.suffix(".type"))));
-            String s[] = ftp.readLine().split("@");
-            ftp.close();
-            if (s.length != 2 )
-                return null;
-            if (!s[0].equals("2"))
-                throw new Error("The binary file has been created in java mode and cannot be read in hadoop mode");
-            return Tree.parse(s[1]);
-        } catch (Exception e) {
-            return null;
-        }
-    }
-
     /** create a new PrintStream from the file */
     final static PrintStream print_stream ( String file )  throws Exception {
         Path path = new Path(file);

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/ee342850/core/src/main/java/org/apache/mrql/TypeInference.gen
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/TypeInference.gen b/core/src/main/java/org/apache/mrql/TypeInference.gen
index 1d2f746..3b9edfe 100644
--- a/core/src/main/java/org/apache/mrql/TypeInference.gen
+++ b/core/src/main/java/org/apache/mrql/TypeInference.gen
@@ -594,7 +594,7 @@ public class TypeInference extends Translator {
                 type_error(e,"The source file must be a constant string: "+print_query(f));
             Tree tp = null;
             if (Config.hadoop_mode)
-                tp = Plan.get_type(f.stringValue());
+                tp = Evaluator.evaluator.get_type(f.stringValue());
             else tp = MapReduceAlgebra.get_type(f.stringValue());
             if (tp == null)
                 type_error(e,"Cannot find the type of file "+f);

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/ee342850/flink/src/main/java/org/apache/mrql/FlinkEvaluator.gen
----------------------------------------------------------------------
diff --git a/flink/src/main/java/org/apache/mrql/FlinkEvaluator.gen b/flink/src/main/java/org/apache/mrql/FlinkEvaluator.gen
index 140a7f2..d32a0e0 100644
--- a/flink/src/main/java/org/apache/mrql/FlinkEvaluator.gen
+++ b/flink/src/main/java/org/apache/mrql/FlinkEvaluator.gen
@@ -19,7 +19,7 @@ package org.apache.mrql;
 
 import java.util.List;
 import java.util.ArrayList;
-import java.io.PrintStream;
+import java.io.*;
 import java.net.URI;
 import java.net.URL;
 import org.apache.mrql.gen.*;
@@ -60,7 +60,7 @@ final public class FlinkEvaluator extends Evaluator {
         DataSource.parserDirectory.put("line",FlinkLineParser.class);
         if (Config.local_mode) {
             flink_env = ExecutionEnvironment.createLocalEnvironment(Config.nodes);
-            // curently, the compiler desn't work in local mode
+            // curently, the compiler doesn't work in local mode
             Config.compile_functional_arguments = false;
             Config.write(Plan.conf);
         } else flink_env = ExecutionEnvironment.getExecutionEnvironment();
@@ -77,11 +77,10 @@ final public class FlinkEvaluator extends Evaluator {
                 String master_node = System.getenv("FLINK_MASTER");
                 if (Config.compile_functional_arguments)
                     flink_env = ExecutionEnvironment.createRemoteEnvironment(master_node,6123,
-                                        flink_jar.toURI().getPath(),Plan.conf.get("mrql.jar.path"));
+                                        Config.nodes,flink_jar.toURI().getPath(),Plan.conf.get("mrql.jar.path"));
                 else flink_env = ExecutionEnvironment.createRemoteEnvironment(master_node,6123,
-                                        flink_jar.toURI().getPath());
-                flink_env.setDefaultLocalParallelism(Config.nodes);
-            };
+                                        Config.nodes,flink_jar.toURI().getPath());
+            } else flink_env.setDefaultLocalParallelism(Config.nodes);
         } catch (Exception ex) {
             throw new Error(ex);
         }
@@ -107,11 +106,15 @@ final public class FlinkEvaluator extends Evaluator {
     }
 
     /** returns the absolute path relative to the directory that contains the MRQL executable */
-    private String absolute_path ( String path) {
+    private static String absolute_path ( String path) {
         try {
             URI uri = new URI(path);
             if (uri.getScheme() == null)
-                if (uri.toString().startsWith("/"))
+                if (Config.hadoop_mode && !Config.local_mode)
+                    if (uri.toString().startsWith("/"))
+                        uri = new URI("hdfs:"+uri);
+                    else uri = new URI("hdfs:/user/"+System.getProperty("user.name")+"/"+uri);
+                else if (uri.toString().startsWith("/"))
                     uri = new URI("file:"+uri);
                 else uri = new URI("file:"+System.getProperty("user.dir")+"/"+uri);
             uri = uri.normalize();
@@ -121,6 +124,25 @@ final public class FlinkEvaluator extends Evaluator {
         }
     }
 
+    /** for dumped data to a file, return the MRQL type of the data */
+    @Override
+    public Tree get_type ( String file ) {
+        try {
+            Path path = new Path(absolute_path(file));
+            FileSystem fs = path.getFileSystem();
+            BufferedReader ftp = new BufferedReader(new InputStreamReader(fs.open(path.suffix(".type"))));
+            String s[] = ftp.readLine().split("@");
+            ftp.close();
+            if (s.length != 2 )
+                return null;
+            if (!s[0].equals("2"))
+                throw new Error("The binary file has been created in java mode and cannot be read in hadoop mode");
+            return Tree.parse(s[1]);
+        } catch (Exception e) {
+            return null;
+        }
+    }
+
     /** dump MRQL data into a Flink binary file instead of a Hadoop Sequence file */
     @Override
     public void dump ( String file, Tree type, MRData data ) throws Exception {
@@ -131,39 +153,18 @@ final public class FlinkEvaluator extends Evaluator {
         ftp.close();
         if (data instanceof MR_dataset)
             data = Plan.collect(((MR_dataset)data).dataset());
-        DataSet<Long> d = flink_env.generateSequence(0L,0L); // need just one record
-        DataSet<FData> res = (data instanceof Bag)
-                             ? d.flatMap(new bag_dumper(new FData(data)))
-                             : d.map(new value_dumper(new FData(data)));
-        res.write(new FlinkOutputFormat(),path.toString(),
-                  org.apache.flink.core.fs.FileSystem.WriteMode.OVERWRITE);
-        flink_env.execute("MRQL dump");
-    }
-
-    public static final class bag_dumper extends RichFlatMapFunction<Long,FData> {
-        final FData container = new FData(new MR_int(0));
-        final FData value;
-
-        bag_dumper ( FData value ) { this.value = value; }
-
-        @Override
-        public void flatMap ( Long ignore, Collector<FData> out ) throws Exception {
-            for ( MRData e: (Bag)value.data() ) {
+        FlinkOutputFormat of = new FlinkOutputFormat();
+        of.setOutputFilePath(path);
+        of.setWriteMode(org.apache.flink.core.fs.FileSystem.WriteMode.OVERWRITE);
+        of.open(0,2);
+        if (data instanceof Bag) {
+            final FData container = new FData(new MR_int(0));
+            for ( MRData e: (Bag)data ) {
                 container.data = e;
-                out.collect(container);
+                of.writeRecord(container);
             }
-        }
-    }
-
-    public static final class value_dumper extends RichMapFunction<Long,FData> {
-        final FData value;
-
-        value_dumper ( FData value ) { this.value = value; }
-
-        @Override
-        public FData map ( Long ignore ) {
-            return value;
-        }
+        } else of.writeRecord(new FData(data));
+        of.close();
     }
 
     private static MRData aggregate ( DataSet<FData> data_set, MRData zero, Tree acc_fnc, Tree merge_fnc ) {
@@ -267,6 +268,7 @@ final public class FlinkEvaluator extends Evaluator {
             flink_env.execute("MRQL query");
             return new org.apache.mrql.DataSet(new FlinkDataSource(d,path,needs_sorting(e)),0L,0L);
         } catch (Exception ex) {
+            System.err.println(ex.getMessage());
             throw new Error("Cannot evaluate the query: "+e);
         }
     }
@@ -289,6 +291,7 @@ final public class FlinkEvaluator extends Evaluator {
                 System.out.println(tabs(tab_count)+"-> "+out);
                 tab_count -= 3;
             } catch (Exception ex) {
+                System.err.println(ex.getMessage());
                 throw new Error("Cannot collect the operator output: "+e);
             };
         return res;