You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mrql.apache.org by fe...@apache.org on 2016/02/08 20:47:27 UTC

incubator-mrql git commit: [MRQL-81] Fix the coercion of persistent collections to in-memory bags

Repository: incubator-mrql
Updated Branches:
  refs/heads/master 93d95acde -> aceab3159


[MRQL-81] Fix the coercion of persistent collections to in-memory bags


Project: http://git-wip-us.apache.org/repos/asf/incubator-mrql/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mrql/commit/aceab315
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mrql/tree/aceab315
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mrql/diff/aceab315

Branch: refs/heads/master
Commit: aceab31597bd33d7d22c457a053084976b39ac91
Parents: 93d95ac
Author: fegaras <fe...@cse.uta.edu>
Authored: Sat Feb 6 11:38:28 2016 -0600
Committer: fegaras <fe...@cse.uta.edu>
Committed: Sat Feb 6 11:38:28 2016 -0600

----------------------------------------------------------------------
 .../main/java/org/apache/mrql/BSPEvaluator.gen  | 16 +++++
 .../main/java/org/apache/mrql/Evaluator.java    |  3 +
 core/src/main/java/org/apache/mrql/Printer.gen  |  4 +-
 .../src/main/java/org/apache/mrql/QueryPlan.gen |  4 +-
 .../java/org/apache/mrql/FlinkEvaluator.gen     | 38 ++++++++---
 .../java/org/apache/mrql/MapReduceEvaluator.gen | 16 +++++
 queries/factorization.mrql                      |  3 +-
 queries/factorization3.mrql                     | 70 ++++++++++++++++++++
 .../java/org/apache/mrql/SparkEvaluator.gen     | 29 ++++++--
 9 files changed, 164 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/aceab315/bsp/src/main/java/org/apache/mrql/BSPEvaluator.gen
----------------------------------------------------------------------
diff --git a/bsp/src/main/java/org/apache/mrql/BSPEvaluator.gen b/bsp/src/main/java/org/apache/mrql/BSPEvaluator.gen
index 1b5901a..6f6a4bf 100644
--- a/bsp/src/main/java/org/apache/mrql/BSPEvaluator.gen
+++ b/bsp/src/main/java/org/apache/mrql/BSPEvaluator.gen
@@ -115,6 +115,17 @@ final public class BSPEvaluator extends Evaluator {
         return BSPGeneratorInputFormat.class;
     }
 
+    /** Coerce a persistent collection to a Bag */
+    public Bag toBag ( MRData data ) {
+        try {
+            if (data instanceof MR_dataset)
+                return Plan.collect(((MR_dataset)data).dataset());
+        } catch (Exception ex) {
+            throw new Error("Cannot coerce "+data+" to a Bag");
+        };
+        return (Bag)data;
+    }
+
     /** The Aggregate physical operator
      * @param acc_fnc  the accumulator function from (T,T) to T
      * @param zero  the zero element of type T
@@ -237,6 +248,11 @@ final public class BSPEvaluator extends Evaluator {
                 if (!(t instanceof Tuple))
                     throw new Error("Expected a tuple in function application: "+t);
                 return ((MR_dataset)((Lambda)fnc).lambda().eval(t)).dataset();
+           case nth(`u,`n):
+                MRData x = ((Tuple)evalE(u,env)).get((int)n.longValue());
+                if (x instanceof MR_dataset)
+                    return ((MR_dataset)x).dataset();
+                throw new Error("Evaluation error in: "+print_query(e));
             case `v:
                 if (!v.is_variable())
                     fail;

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/aceab315/core/src/main/java/org/apache/mrql/Evaluator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/Evaluator.java b/core/src/main/java/org/apache/mrql/Evaluator.java
index 76152d4..ac89f4e 100644
--- a/core/src/main/java/org/apache/mrql/Evaluator.java
+++ b/core/src/main/java/org/apache/mrql/Evaluator.java
@@ -67,6 +67,9 @@ abstract public class Evaluator extends Interpreter {
     /** return the FileInputFormat for data generator files */
     abstract public Class<? extends MRQLFileInputFormat> generatorInputFormat ();
 
+    /** Coerce a persistent collection to a Bag */
+    abstract public Bag toBag ( MRData data );
+
     /** The Aggregate physical operator
      * @param acc_fnc  the accumulator function from (T,T) to T
      * @param zero  the zero element of type T

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/aceab315/core/src/main/java/org/apache/mrql/Printer.gen
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/Printer.gen b/core/src/main/java/org/apache/mrql/Printer.gen
index 6942a2f..b75de46 100644
--- a/core/src/main/java/org/apache/mrql/Printer.gen
+++ b/core/src/main/java/org/apache/mrql/Printer.gen
@@ -213,7 +213,7 @@ public class Printer {
                     if (Config.max_bag_size_print > 0 && vals.size() >= Config.max_bag_size_print)
                         return s+", ... }";
                     else return s+" }";
-                } else return print(x,#<bag(`tp)>);
+                } else return print(Evaluator.evaluator.toBag(x),#<bag(`tp)>);
             case List(`tp):
                 if (x instanceof MR_dataset) {
                     DataSet ds = ((MR_dataset)x).dataset();
@@ -227,7 +227,7 @@ public class Printer {
                     if (Config.max_bag_size_print > 0 && vals.size() >= Config.max_bag_size_print)
                         return s+", ... ]";
                     else return s+" ]";
-                } else return print(x,#<list(`tp)>);
+                } else return print(Evaluator.evaluator.toBag(x),#<list(`tp)>);
             case bag(`tp):
                 Bag b = (Bag)x;
                 Iterator<MRData> bi = b.iterator();

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/aceab315/core/src/main/java/org/apache/mrql/QueryPlan.gen
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/QueryPlan.gen b/core/src/main/java/org/apache/mrql/QueryPlan.gen
index d9b907f..e8adebe 100644
--- a/core/src/main/java/org/apache/mrql/QueryPlan.gen
+++ b/core/src/main/java/org/apache/mrql/QueryPlan.gen
@@ -813,7 +813,9 @@ private final static class SingleQueryPlan {
 
     /** plan cost */
     static double cost ( int i, int j ) {
-        return size[i]*size[j]*selectivity[i][j];
+        if (predicate[i][j].equals(#<true>) && depends[i].isEmpty() && depends[j].isEmpty())
+            return 1.0E30;  // make cross products the last option
+        else return size[i]*size[j]*selectivity[i][j];
     }
 
     public static Tree best_plan ( Tree e ) {

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/aceab315/flink/src/main/java/org/apache/mrql/FlinkEvaluator.gen
----------------------------------------------------------------------
diff --git a/flink/src/main/java/org/apache/mrql/FlinkEvaluator.gen b/flink/src/main/java/org/apache/mrql/FlinkEvaluator.gen
index a51b29c..83e80c2 100644
--- a/flink/src/main/java/org/apache/mrql/FlinkEvaluator.gen
+++ b/flink/src/main/java/org/apache/mrql/FlinkEvaluator.gen
@@ -21,6 +21,7 @@ import java.util.List;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Arrays;
+import java.util.Iterator;
 import java.io.*;
 import java.net.URI;
 import java.net.URL;
@@ -252,6 +253,26 @@ public class FlinkEvaluator extends Evaluator implements Serializable {
         return flink_env.fromCollection(a);
     }
 
+    /** Coerce a persistent collection to a Bag */
+    public Bag toBag ( MRData data ) {
+        try {
+            if (data instanceof MR_flink) {
+                final Iterator<FData> i = ((MR_flink)data).flink().collect().iterator();
+                return new Bag(new BagIterator() {
+                        public MRData next () {
+                            return i.next().data();
+                        }
+                        public boolean hasNext () {
+                            return i.hasNext();
+                        }
+                    });
+            };
+            return (Bag)data;
+        } catch (Exception ex) {
+            throw new Error("Cannot coerce "+data+" to a Bag: "+ex);
+        }
+    }
+
     /** The Aggregate physical operator
      * @param merge  the accumulator function from (T,T) to T
      * @param zero  the zero element of type T
@@ -332,7 +353,6 @@ public class FlinkEvaluator extends Evaluator implements Serializable {
         case Loop(lambda(tuple(...vs),tuple(...bs)),tuple(...ss),`num):
             final int limit = ((MR_int)evalE(num,env)).get();
             MR_flink[] s = new MR_flink[vs.length()];
-            String[] path = new String[vs.length()];
             for ( int i = 0; i < vs.length(); i++ )
                 s[i] = new MR_flink(eval(ss.nth(i),env));
             for ( int n = 0; n < limit; n++ ) {
@@ -342,16 +362,7 @@ public class FlinkEvaluator extends Evaluator implements Serializable {
                 for ( int i = 0; i < vs.length(); i ++ )
                     s[i] = new MR_flink(eval(bs.nth(i),nenv));
             };
-            for ( int i = 0; i < vs.length(); i++ ) {
-                path[i] = absolute_path(Plan.new_path(Plan.conf));
-                s[i].flink().write(new FlinkOutputFormat(),path[i]);
-            };
-            flink_env.execute("MRQL loop");
-            final FlinkBinaryInputFormat bif = new FlinkBinaryInputFormat();
-            Tuple t = new Tuple(vs.length());
-            for ( int i = 0; i < vs.length(); i++ )
-                t.set(i,bif.materialize(new org.apache.hadoop.fs.Path(path[i])));
-            return t;
+            return new Tuple(s);
         };
         throw new Error("Wrong Loop format: "+e);
     }
@@ -851,6 +862,11 @@ public class FlinkEvaluator extends Evaluator implements Serializable {
                 if (((MR_bool)evalE(c,env)).get())
                     return eval(x,env);
                 else return eval(y,env);
+           case nth(`u,`n):
+                MRData x = ((Tuple)evalE(u,env)).get((int)n.longValue());
+                if (x instanceof MR_flink)
+                    return ((MR_flink)x).flink();
+                throw new Error("Evaluation error in: "+print_query(e));
             case `v:
                 if (!v.is_variable())
                     fail;

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/aceab315/mapreduce/src/main/java/org/apache/mrql/MapReduceEvaluator.gen
----------------------------------------------------------------------
diff --git a/mapreduce/src/main/java/org/apache/mrql/MapReduceEvaluator.gen b/mapreduce/src/main/java/org/apache/mrql/MapReduceEvaluator.gen
index 2974bd9..1fdc5cb 100644
--- a/mapreduce/src/main/java/org/apache/mrql/MapReduceEvaluator.gen
+++ b/mapreduce/src/main/java/org/apache/mrql/MapReduceEvaluator.gen
@@ -84,6 +84,17 @@ final public class MapReduceEvaluator extends Evaluator {
         return MapReduceGeneratorInputFormat.class;
     }
 
+    /** Coerce a persistent collection to a Bag */
+    public Bag toBag ( MRData data ) {
+        try {
+            if (data instanceof MR_dataset)
+                return Plan.collect(((MR_dataset)data).dataset());
+            return (Bag)data;
+        } catch (Exception ex) {
+            throw new Error("Cannot coerce "+data+" to a Bag: "+ex);
+        }
+    }
+
     /** The Aggregate physical operator
      * @param acc_fnc  the accumulator function from (T,T) to T
      * @param zero  the zero element of type T
@@ -292,6 +303,11 @@ final public class MapReduceEvaluator extends Evaluator {
                 if (!(t instanceof Tuple))
                     throw new Error("Expected a tuple in function application: "+t);
                 return ((MR_dataset)((Lambda)fnc).lambda().eval(t)).dataset();
+           case nth(`u,`n):
+                MRData x = ((Tuple)evalE(u,env)).get((int)n.longValue());
+                if (x instanceof MR_dataset)
+                    return ((MR_dataset)x).dataset();
+                throw new Error("Evaluation error in: "+print_query(e));
             case `v:
                 if (!v.is_variable())
                     fail;

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/aceab315/queries/factorization.mrql
----------------------------------------------------------------------
diff --git a/queries/factorization.mrql b/queries/factorization.mrql
index 2b98063..45f36de 100644
--- a/queries/factorization.mrql
+++ b/queries/factorization.mrql
@@ -56,4 +56,5 @@ macro factorize ( V, Hinit, Winit ) {
    limit 4
 };
 
-factorize(Mmatrix,Hmatrix,Wmatrix);
+let (X1,X2) = factorize(Mmatrix,Hmatrix,Wmatrix)
+ in multiply(X1,transpose(X2));

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/aceab315/queries/factorization3.mrql
----------------------------------------------------------------------
diff --git a/queries/factorization3.mrql b/queries/factorization3.mrql
new file mode 100644
index 0000000..de61b5d
--- /dev/null
+++ b/queries/factorization3.mrql
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Matrix factorization using Gradient Descent.
+// Build the matrices first using the build_matrix.mrql query
+
+Mmatrix = source(binary,"tmp/Xmatrix.bin");
+Hmatrix = source(binary,"tmp/Ymatrix.bin");
+Wmatrix = source(binary,"tmp/Zmatrix.bin");
+
+
+macro transpose ( X ) {
+  select (x,j,i)
+    from (x,i,j) in X
+};
+
+// matrix multiplication:
+macro multiply ( X, Y ) {
+  select (sum(z),i,j)
+    from (x,i,k) in X, (y,k,j) in Y, z = x*y
+   group by (i,j)
+};
+
+// multiplication by a number:
+macro mult ( a, X ) {
+  select ( a*x, i, j )
+    from (x,i,j) in X
+};
+
+// cell-wise addition:
+macro Cadd ( X, Y ) {
+  select ( x*y, i, j )
+    from (x,i,j) in X, (y,i,j) in Y
+};
+
+// cell-wise subtraction:
+macro Csub ( X, Y ) {
+  select ( x-y, i, j )
+    from (x,i,j) in X, (y,i,j) in Y
+};
+
+a = 0.002;
+b = 0.02;
+
+// Matrix Factorization using Gradient Descent
+macro factorize ( R, Pinit, Qinit ) {
+  repeat (E,P,Q) = (R,Pinit,Qinit)
+    step ( Csub(R,multiply(P,transpose(Q))),
+           Cadd(P,mult(a,Csub(mult(2,multiply(E,transpose(Q))),mult(b,P)))),
+           Cadd(Q,mult(a,Csub(mult(2,multiply(E,transpose(P))),mult(b,Q)))) )
+   limit 4
+};
+
+let (E,X1,X2) = factorize(Mmatrix,Hmatrix,Wmatrix)
+ in multiply(X1,transpose(X2));

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/aceab315/spark/src/main/java/org/apache/mrql/SparkEvaluator.gen
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/mrql/SparkEvaluator.gen b/spark/src/main/java/org/apache/mrql/SparkEvaluator.gen
index d43c8f9..5e81fe3 100644
--- a/spark/src/main/java/org/apache/mrql/SparkEvaluator.gen
+++ b/spark/src/main/java/org/apache/mrql/SparkEvaluator.gen
@@ -284,10 +284,7 @@ public class SparkEvaluator extends Evaluator implements Serializable {
                 for ( int i = 0; i < vs.length(); i ++ )
                     s[i] = new MR_rdd(eval(bs.nth(i),nenv).cache());
             };
-            Tuple t = new Tuple(vs.length());
-            for ( int i = 0; i < vs.length(); i++ )
-                t.set(i,bag(s[i].rdd()));
-            return t;
+            return new Tuple(s);
         };
         throw new Error("Wrong Loop format");
     }
@@ -359,6 +356,17 @@ public class SparkEvaluator extends Evaluator implements Serializable {
             });
     }
 
+    /** Coerce a persistent collection to a Bag */
+    public Bag toBag ( MRData data ) {
+        try {
+            if (data instanceof MR_rdd)
+                return bag(((MR_rdd)data).rdd());
+            return (Bag)data;
+        } catch (Exception ex) {
+            throw new Error("Cannot coerce "+data+" to a Bag: "+ex);
+        }
+    }
+
     private static JavaRDD<MRData> materialize ( JavaRDD<MRData> rdd ) {
         return rdd.map(new Function<MRData,MRData>() {
                 public MRData call ( MRData value ) {
@@ -817,6 +825,19 @@ public class SparkEvaluator extends Evaluator implements Serializable {
                 if (x instanceof MR_dataset)
                     return ((RDDDataSource)((MR_dataset)x).dataset().source.get(0)).rdd;
                 throw new Error("Expected an RDD dataset: "+x);
+            case nth(`u,`n):
+                MRData x = ((Tuple)evalE(u,env)).get((int)n.longValue());
+                if (x instanceof MR_dataset)
+                    return ((RDDDataSource)((MR_dataset)x).dataset().source.get(0)).rdd;
+                else if (x instanceof MR_rdd)
+                    return ((MR_rdd)x).rdd();
+                else if (x instanceof Bag) {
+                    ArrayList<MRData> l = new ArrayList<MRData>();
+                    for ( MRData a: (Bag)x )
+                        l.add(a);
+                    return spark_context.parallelize(l);
+                };
+                throw new Error("Evaluation error in: "+print_query(e));
             case `v:
                 if (!v.is_variable())
                     fail;