You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mrql.apache.org by fe...@apache.org on 2016/02/08 20:47:27 UTC
incubator-mrql git commit: [MRQL-81] Fix the coercion of persistent
collections to in-memory bags
Repository: incubator-mrql
Updated Branches:
refs/heads/master 93d95acde -> aceab3159
[MRQL-81] Fix the coercion of persistent collections to in-memory bags
Project: http://git-wip-us.apache.org/repos/asf/incubator-mrql/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mrql/commit/aceab315
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mrql/tree/aceab315
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mrql/diff/aceab315
Branch: refs/heads/master
Commit: aceab31597bd33d7d22c457a053084976b39ac91
Parents: 93d95ac
Author: fegaras <fe...@cse.uta.edu>
Authored: Sat Feb 6 11:38:28 2016 -0600
Committer: fegaras <fe...@cse.uta.edu>
Committed: Sat Feb 6 11:38:28 2016 -0600
----------------------------------------------------------------------
.../main/java/org/apache/mrql/BSPEvaluator.gen | 16 +++++
.../main/java/org/apache/mrql/Evaluator.java | 3 +
core/src/main/java/org/apache/mrql/Printer.gen | 4 +-
.../src/main/java/org/apache/mrql/QueryPlan.gen | 4 +-
.../java/org/apache/mrql/FlinkEvaluator.gen | 38 ++++++++---
.../java/org/apache/mrql/MapReduceEvaluator.gen | 16 +++++
queries/factorization.mrql | 3 +-
queries/factorization3.mrql | 70 ++++++++++++++++++++
.../java/org/apache/mrql/SparkEvaluator.gen | 29 ++++++--
9 files changed, 164 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/aceab315/bsp/src/main/java/org/apache/mrql/BSPEvaluator.gen
----------------------------------------------------------------------
diff --git a/bsp/src/main/java/org/apache/mrql/BSPEvaluator.gen b/bsp/src/main/java/org/apache/mrql/BSPEvaluator.gen
index 1b5901a..6f6a4bf 100644
--- a/bsp/src/main/java/org/apache/mrql/BSPEvaluator.gen
+++ b/bsp/src/main/java/org/apache/mrql/BSPEvaluator.gen
@@ -115,6 +115,17 @@ final public class BSPEvaluator extends Evaluator {
return BSPGeneratorInputFormat.class;
}
+ /** Coerce a persistent collection to a Bag */
+ public Bag toBag ( MRData data ) {
+ try {
+ if (data instanceof MR_dataset)
+ return Plan.collect(((MR_dataset)data).dataset());
+ } catch (Exception ex) {
+ throw new Error("Cannot coerce "+data+" to a Bag");
+ };
+ return (Bag)data;
+ }
+
/** The Aggregate physical operator
* @param acc_fnc the accumulator function from (T,T) to T
* @param zero the zero element of type T
@@ -237,6 +248,11 @@ final public class BSPEvaluator extends Evaluator {
if (!(t instanceof Tuple))
throw new Error("Expected a tuple in function application: "+t);
return ((MR_dataset)((Lambda)fnc).lambda().eval(t)).dataset();
+ case nth(`u,`n):
+ MRData x = ((Tuple)evalE(u,env)).get((int)n.longValue());
+ if (x instanceof MR_dataset)
+ return ((MR_dataset)x).dataset();
+ throw new Error("Evaluation error in: "+print_query(e));
case `v:
if (!v.is_variable())
fail;
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/aceab315/core/src/main/java/org/apache/mrql/Evaluator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/Evaluator.java b/core/src/main/java/org/apache/mrql/Evaluator.java
index 76152d4..ac89f4e 100644
--- a/core/src/main/java/org/apache/mrql/Evaluator.java
+++ b/core/src/main/java/org/apache/mrql/Evaluator.java
@@ -67,6 +67,9 @@ abstract public class Evaluator extends Interpreter {
/** return the FileInputFormat for data generator files */
abstract public Class<? extends MRQLFileInputFormat> generatorInputFormat ();
+ /** Coerce a persistent collection to a Bag */
+ abstract public Bag toBag ( MRData data );
+
/** The Aggregate physical operator
* @param acc_fnc the accumulator function from (T,T) to T
* @param zero the zero element of type T
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/aceab315/core/src/main/java/org/apache/mrql/Printer.gen
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/Printer.gen b/core/src/main/java/org/apache/mrql/Printer.gen
index 6942a2f..b75de46 100644
--- a/core/src/main/java/org/apache/mrql/Printer.gen
+++ b/core/src/main/java/org/apache/mrql/Printer.gen
@@ -213,7 +213,7 @@ public class Printer {
if (Config.max_bag_size_print > 0 && vals.size() >= Config.max_bag_size_print)
return s+", ... }";
else return s+" }";
- } else return print(x,#<bag(`tp)>);
+ } else return print(Evaluator.evaluator.toBag(x),#<bag(`tp)>);
case List(`tp):
if (x instanceof MR_dataset) {
DataSet ds = ((MR_dataset)x).dataset();
@@ -227,7 +227,7 @@ public class Printer {
if (Config.max_bag_size_print > 0 && vals.size() >= Config.max_bag_size_print)
return s+", ... ]";
else return s+" ]";
- } else return print(x,#<list(`tp)>);
+ } else return print(Evaluator.evaluator.toBag(x),#<list(`tp)>);
case bag(`tp):
Bag b = (Bag)x;
Iterator<MRData> bi = b.iterator();
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/aceab315/core/src/main/java/org/apache/mrql/QueryPlan.gen
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/QueryPlan.gen b/core/src/main/java/org/apache/mrql/QueryPlan.gen
index d9b907f..e8adebe 100644
--- a/core/src/main/java/org/apache/mrql/QueryPlan.gen
+++ b/core/src/main/java/org/apache/mrql/QueryPlan.gen
@@ -813,7 +813,9 @@ private final static class SingleQueryPlan {
/** plan cost */
static double cost ( int i, int j ) {
- return size[i]*size[j]*selectivity[i][j];
+ if (predicate[i][j].equals(#<true>) && depends[i].isEmpty() && depends[j].isEmpty())
+ return 1.0E30; // make cross products the last option
+ else return size[i]*size[j]*selectivity[i][j];
}
public static Tree best_plan ( Tree e ) {
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/aceab315/flink/src/main/java/org/apache/mrql/FlinkEvaluator.gen
----------------------------------------------------------------------
diff --git a/flink/src/main/java/org/apache/mrql/FlinkEvaluator.gen b/flink/src/main/java/org/apache/mrql/FlinkEvaluator.gen
index a51b29c..83e80c2 100644
--- a/flink/src/main/java/org/apache/mrql/FlinkEvaluator.gen
+++ b/flink/src/main/java/org/apache/mrql/FlinkEvaluator.gen
@@ -21,6 +21,7 @@ import java.util.List;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Arrays;
+import java.util.Iterator;
import java.io.*;
import java.net.URI;
import java.net.URL;
@@ -252,6 +253,26 @@ public class FlinkEvaluator extends Evaluator implements Serializable {
return flink_env.fromCollection(a);
}
+ /** Coerce a persistent collection to a Bag */
+ public Bag toBag ( MRData data ) {
+ try {
+ if (data instanceof MR_flink) {
+ final Iterator<FData> i = ((MR_flink)data).flink().collect().iterator();
+ return new Bag(new BagIterator() {
+ public MRData next () {
+ return i.next().data();
+ }
+ public boolean hasNext () {
+ return i.hasNext();
+ }
+ });
+ };
+ return (Bag)data;
+ } catch (Exception ex) {
+ throw new Error("Cannot coerce "+data+" to a Bag: "+ex);
+ }
+ }
+
/** The Aggregate physical operator
* @param merge the accumulator function from (T,T) to T
* @param zero the zero element of type T
@@ -332,7 +353,6 @@ public class FlinkEvaluator extends Evaluator implements Serializable {
case Loop(lambda(tuple(...vs),tuple(...bs)),tuple(...ss),`num):
final int limit = ((MR_int)evalE(num,env)).get();
MR_flink[] s = new MR_flink[vs.length()];
- String[] path = new String[vs.length()];
for ( int i = 0; i < vs.length(); i++ )
s[i] = new MR_flink(eval(ss.nth(i),env));
for ( int n = 0; n < limit; n++ ) {
@@ -342,16 +362,7 @@ public class FlinkEvaluator extends Evaluator implements Serializable {
for ( int i = 0; i < vs.length(); i ++ )
s[i] = new MR_flink(eval(bs.nth(i),nenv));
};
- for ( int i = 0; i < vs.length(); i++ ) {
- path[i] = absolute_path(Plan.new_path(Plan.conf));
- s[i].flink().write(new FlinkOutputFormat(),path[i]);
- };
- flink_env.execute("MRQL loop");
- final FlinkBinaryInputFormat bif = new FlinkBinaryInputFormat();
- Tuple t = new Tuple(vs.length());
- for ( int i = 0; i < vs.length(); i++ )
- t.set(i,bif.materialize(new org.apache.hadoop.fs.Path(path[i])));
- return t;
+ return new Tuple(s);
};
throw new Error("Wrong Loop format: "+e);
}
@@ -851,6 +862,11 @@ public class FlinkEvaluator extends Evaluator implements Serializable {
if (((MR_bool)evalE(c,env)).get())
return eval(x,env);
else return eval(y,env);
+ case nth(`u,`n):
+ MRData x = ((Tuple)evalE(u,env)).get((int)n.longValue());
+ if (x instanceof MR_flink)
+ return ((MR_flink)x).flink();
+ throw new Error("Evaluation error in: "+print_query(e));
case `v:
if (!v.is_variable())
fail;
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/aceab315/mapreduce/src/main/java/org/apache/mrql/MapReduceEvaluator.gen
----------------------------------------------------------------------
diff --git a/mapreduce/src/main/java/org/apache/mrql/MapReduceEvaluator.gen b/mapreduce/src/main/java/org/apache/mrql/MapReduceEvaluator.gen
index 2974bd9..1fdc5cb 100644
--- a/mapreduce/src/main/java/org/apache/mrql/MapReduceEvaluator.gen
+++ b/mapreduce/src/main/java/org/apache/mrql/MapReduceEvaluator.gen
@@ -84,6 +84,17 @@ final public class MapReduceEvaluator extends Evaluator {
return MapReduceGeneratorInputFormat.class;
}
+ /** Coerce a persistent collection to a Bag */
+ public Bag toBag ( MRData data ) {
+ try {
+ if (data instanceof MR_dataset)
+ return Plan.collect(((MR_dataset)data).dataset());
+ return (Bag)data;
+ } catch (Exception ex) {
+ throw new Error("Cannot coerce "+data+" to a Bag: "+ex);
+ }
+ }
+
/** The Aggregate physical operator
* @param acc_fnc the accumulator function from (T,T) to T
* @param zero the zero element of type T
@@ -292,6 +303,11 @@ final public class MapReduceEvaluator extends Evaluator {
if (!(t instanceof Tuple))
throw new Error("Expected a tuple in function application: "+t);
return ((MR_dataset)((Lambda)fnc).lambda().eval(t)).dataset();
+ case nth(`u,`n):
+ MRData x = ((Tuple)evalE(u,env)).get((int)n.longValue());
+ if (x instanceof MR_dataset)
+ return ((MR_dataset)x).dataset();
+ throw new Error("Evaluation error in: "+print_query(e));
case `v:
if (!v.is_variable())
fail;
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/aceab315/queries/factorization.mrql
----------------------------------------------------------------------
diff --git a/queries/factorization.mrql b/queries/factorization.mrql
index 2b98063..45f36de 100644
--- a/queries/factorization.mrql
+++ b/queries/factorization.mrql
@@ -56,4 +56,5 @@ macro factorize ( V, Hinit, Winit ) {
limit 4
};
-factorize(Mmatrix,Hmatrix,Wmatrix);
+let (X1,X2) = factorize(Mmatrix,Hmatrix,Wmatrix)
+ in multiply(X1,transpose(X2));
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/aceab315/queries/factorization3.mrql
----------------------------------------------------------------------
diff --git a/queries/factorization3.mrql b/queries/factorization3.mrql
new file mode 100644
index 0000000..de61b5d
--- /dev/null
+++ b/queries/factorization3.mrql
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Matrix factorization using Gradient Descent.
+// Build the matrices first using the build_matrix.mrql query
+
+Mmatrix = source(binary,"tmp/Xmatrix.bin");
+Hmatrix = source(binary,"tmp/Ymatrix.bin");
+Wmatrix = source(binary,"tmp/Zmatrix.bin");
+
+
+macro transpose ( X ) {
+ select (x,j,i)
+ from (x,i,j) in X
+};
+
+// matrix multiplication:
+macro multiply ( X, Y ) {
+ select (sum(z),i,j)
+ from (x,i,k) in X, (y,k,j) in Y, z = x*y
+ group by (i,j)
+};
+
+// multiplication by a number:
+macro mult ( a, X ) {
+ select ( a*x, i, j )
+ from (x,i,j) in X
+};
+
+// cell-wise addition:
+macro Cadd ( X, Y ) {
+ select ( x*y, i, j )
+ from (x,i,j) in X, (y,i,j) in Y
+};
+
+// cell-wise subtraction:
+macro Csub ( X, Y ) {
+ select ( x-y, i, j )
+ from (x,i,j) in X, (y,i,j) in Y
+};
+
+a = 0.002;
+b = 0.02;
+
+// Matrix Factorization using Gradient Descent
+macro factorize ( R, Pinit, Qinit ) {
+ repeat (E,P,Q) = (R,Pinit,Qinit)
+ step ( Csub(R,multiply(P,transpose(Q))),
+ Cadd(P,mult(a,Csub(mult(2,multiply(E,transpose(Q))),mult(b,P)))),
+ Cadd(Q,mult(a,Csub(mult(2,multiply(E,transpose(P))),mult(b,Q)))) )
+ limit 4
+};
+
+let (E,X1,X2) = factorize(Mmatrix,Hmatrix,Wmatrix)
+ in multiply(X1,transpose(X2));
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/aceab315/spark/src/main/java/org/apache/mrql/SparkEvaluator.gen
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/mrql/SparkEvaluator.gen b/spark/src/main/java/org/apache/mrql/SparkEvaluator.gen
index d43c8f9..5e81fe3 100644
--- a/spark/src/main/java/org/apache/mrql/SparkEvaluator.gen
+++ b/spark/src/main/java/org/apache/mrql/SparkEvaluator.gen
@@ -284,10 +284,7 @@ public class SparkEvaluator extends Evaluator implements Serializable {
for ( int i = 0; i < vs.length(); i ++ )
s[i] = new MR_rdd(eval(bs.nth(i),nenv).cache());
};
- Tuple t = new Tuple(vs.length());
- for ( int i = 0; i < vs.length(); i++ )
- t.set(i,bag(s[i].rdd()));
- return t;
+ return new Tuple(s);
};
throw new Error("Wrong Loop format");
}
@@ -359,6 +356,17 @@ public class SparkEvaluator extends Evaluator implements Serializable {
});
}
+ /** Coerce a persistent collection to a Bag */
+ public Bag toBag ( MRData data ) {
+ try {
+ if (data instanceof MR_rdd)
+ return bag(((MR_rdd)data).rdd());
+ return (Bag)data;
+ } catch (Exception ex) {
+ throw new Error("Cannot coerce "+data+" to a Bag: "+ex);
+ }
+ }
+
private static JavaRDD<MRData> materialize ( JavaRDD<MRData> rdd ) {
return rdd.map(new Function<MRData,MRData>() {
public MRData call ( MRData value ) {
@@ -817,6 +825,19 @@ public class SparkEvaluator extends Evaluator implements Serializable {
if (x instanceof MR_dataset)
return ((RDDDataSource)((MR_dataset)x).dataset().source.get(0)).rdd;
throw new Error("Expected an RDD dataset: "+x);
+ case nth(`u,`n):
+ MRData x = ((Tuple)evalE(u,env)).get((int)n.longValue());
+ if (x instanceof MR_dataset)
+ return ((RDDDataSource)((MR_dataset)x).dataset().source.get(0)).rdd;
+ else if (x instanceof MR_rdd)
+ return ((MR_rdd)x).rdd();
+ else if (x instanceof Bag) {
+ ArrayList<MRData> l = new ArrayList<MRData>();
+ for ( MRData a: (Bag)x )
+ l.add(a);
+ return spark_context.parallelize(l);
+ };
+ throw new Error("Evaluation error in: "+print_query(e));
case `v:
if (!v.is_variable())
fail;