You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mrql.apache.org by fe...@apache.org on 2016/02/22 01:26:40 UTC
[1/2] incubator-mrql git commit: [MRQL-84] Improve performance of
Incremental MRQL
Repository: incubator-mrql
Updated Branches:
refs/heads/master 807df966e -> a624ffc50
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/a624ffc5/queries/incremental-pagerank.mrql
----------------------------------------------------------------------
diff --git a/queries/incremental-pagerank.mrql b/queries/incremental-pagerank.mrql
index 6cb3293..97096be 100644
--- a/queries/incremental-pagerank.mrql
+++ b/queries/incremental-pagerank.mrql
@@ -17,12 +17,12 @@
*/
// preprocessing: for each node, group its outgoing links into a bag
-graph = select (key,n#1)
- from n in stream(binary,"tmp/graph.bin")
- group by key: n#0;
+graph = select (src,dst)
+ from (src,dst) in stream(binary,"tmp/graph.bin")
+ group by src;
-store graph_size := count(select distinct n#0
- from n in source(binary,"tmp/graph.bin"));
+store graph_size := count(select distinct s
+ from (s,d) in source(binary,"tmp/graph.bin"));
// damping factor
factor = 0.85;
@@ -31,9 +31,11 @@ incr repeat nodes = select < id: key, rank: 1.0/graph_size as double, adjacent:
from (key,al) in graph
step select ( < id: m.id, rank: n.rank, adjacent: m.adjacent >, true )
from n in (select < id: key,
- rank: (1-factor)/graph_size+factor*sum(select x.rank from x in c) >
+ rank: (1-factor)/graph_size+factor*sum(rank) >
from c in ( select < id: a, rank: n.rank/count(n.adjacent) >
- from n in nodes, a in n.adjacent )
+ from n in nodes,
+ a in n.adjacent ),
+ rank = c.rank
group by key: c.id),
m in nodes
where n.id = m.id
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/a624ffc5/spark/src/main/java/org/apache/mrql/SparkEvaluator.gen
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/mrql/SparkEvaluator.gen b/spark/src/main/java/org/apache/mrql/SparkEvaluator.gen
index 5e81fe3..60a9bb3 100644
--- a/spark/src/main/java/org/apache/mrql/SparkEvaluator.gen
+++ b/spark/src/main/java/org/apache/mrql/SparkEvaluator.gen
@@ -284,7 +284,10 @@ public class SparkEvaluator extends Evaluator implements Serializable {
for ( int i = 0; i < vs.length(); i ++ )
s[i] = new MR_rdd(eval(bs.nth(i),nenv).cache());
};
- return new Tuple(s);
+ Tuple t = new Tuple(vs.length());
+ for ( int i = 0; i < vs.length(); i++ )
+ t.set(i,bag(s[i].rdd()));
+ return t;
};
throw new Error("Wrong Loop format");
}
@@ -458,7 +461,7 @@ public class SparkEvaluator extends Evaluator implements Serializable {
}
});
};
- return s.groupBy(get_first,Config.nodes)
+ return s.groupBy(get_first)
.map(new Function<Tuple2<MRData,Iterable<MRData>>,MRData> () {
public MRData call ( Tuple2<MRData,Iterable<MRData>> value ) {
final Iterator<MRData> i = value._2.iterator();
@@ -599,7 +602,7 @@ public class SparkEvaluator extends Evaluator implements Serializable {
return joinIterator(((Bag)fy.eval(value)).iterator());
}
});
- return xs.cogroup(ys,Config.nodes)
+ return xs.cogroup(ys)
.flatMap(new FlatMapFunction<Tuple2<MRData,Tuple2<Iterable<MRData>,Iterable<MRData>>>,MRData>() {
public Iterable<MRData> call ( Tuple2<MRData,Tuple2<Iterable<MRData>,Iterable<MRData>>> value ) {
set_global_env(master_env);
@@ -751,13 +754,13 @@ public class SparkEvaluator extends Evaluator implements Serializable {
int max_num = ((MR_int)evalE(n,env)).get();
JavaRDD<MRData> rd;
JavaRDD<MRData> res = eval(s,env);
- res = materialize(res);
+ //res = materialize(res);
int i = 0;
boolean cont = true;
do {
rd = eval(b,new Environment(v.toString(),new MR_rdd(res),env)).cache();
res = rd.map(get_first);
- res = materialize(res);
+ //res = materialize(res);
long size = res.count();
Integer true_results
= rd.aggregate(new Integer(0),
@@ -805,6 +808,14 @@ public class SparkEvaluator extends Evaluator implements Serializable {
else rd = rd.union(containerData(spark_context.hadoopFile(d.path,SparkGeneratorInputFormat.class,
MRContainer.class,MRContainer.class,1)));
return rd;
+ case let(`v,DataSetCollect(`s),`body):
+ MRData x = evalE(s,env);
+ if (x instanceof MR_dataset)
+ x = new Bag(((MR_dataset)x).dataset().take(Integer.MAX_VALUE));
+ else if (x instanceof Bag)
+ ((Bag)x).materialize();
+ new_distributed_binding(v.toString(),x);
+ return eval(body,new Environment(v.toString(),x,env));
case let(`v,`u,`body):
MRData val = evalE(u,env);
Interpreter.new_global_binding(v.toString(),val);
@@ -825,19 +836,6 @@ public class SparkEvaluator extends Evaluator implements Serializable {
if (x instanceof MR_dataset)
return ((RDDDataSource)((MR_dataset)x).dataset().source.get(0)).rdd;
throw new Error("Expected an RDD dataset: "+x);
- case nth(`u,`n):
- MRData x = ((Tuple)evalE(u,env)).get((int)n.longValue());
- if (x instanceof MR_dataset)
- return ((RDDDataSource)((MR_dataset)x).dataset().source.get(0)).rdd;
- else if (x instanceof MR_rdd)
- return ((MR_rdd)x).rdd();
- else if (x instanceof Bag) {
- ArrayList<MRData> l = new ArrayList<MRData>();
- for ( MRData a: (Bag)x )
- l.add(a);
- return spark_context.parallelize(l);
- };
- throw new Error("Evaluation error in: "+print_query(e));
case `v:
if (!v.is_variable())
fail;
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/a624ffc5/spark/src/main/java/org/apache/mrql/SparkFileInputStream.java
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/mrql/SparkFileInputStream.java b/spark/src/main/java/org/apache/mrql/SparkFileInputStream.java
index f1e1535..0831484 100644
--- a/spark/src/main/java/org/apache/mrql/SparkFileInputStream.java
+++ b/spark/src/main/java/org/apache/mrql/SparkFileInputStream.java
@@ -115,7 +115,7 @@ public class SparkFileInputStream extends JavaInputDStream<MRData> {
rdd = hadoopFile(file);
else rdd = rdd.union(hadoopFile(file));
if (rdd == null)
- return scala.None$.MODULE$.apply(null);
+ rdd = SparkEvaluator.spark_context.emptyRDD();
return new Some<RDD<MRData>>(rdd.rdd());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/a624ffc5/spark/src/main/java/org/apache/mrql/SparkStreaming.gen
----------------------------------------------------------------------
diff --git a/spark/src/main/java/org/apache/mrql/SparkStreaming.gen b/spark/src/main/java/org/apache/mrql/SparkStreaming.gen
index a7bb209..d0ec379 100644
--- a/spark/src/main/java/org/apache/mrql/SparkStreaming.gen
+++ b/spark/src/main/java/org/apache/mrql/SparkStreaming.gen
@@ -74,8 +74,28 @@ public class SparkStreaming extends SparkEvaluator {
return plan;
}
+ private final static Evaluator ef = Evaluator.evaluator;
+
+ /** bind the pattern variables to values */
+ private static Environment bind_list ( Tree pattern, Tree src, Environment env ) {
+ match pattern {
+ case tuple(...ps):
+ int i = 0;
+ match src {
+ case tuple(...es):
+ for ( Tree p: ps )
+ env = bind_list(p,es.nth(i++),env);
+ return env;
+ }
+ };
+ MRData value = ef.evalE(src,env);
+ env.replace(pattern.toString(),value);
+ if (repeat_variables.member(pattern))
+ Interpreter.new_distributed_binding(pattern.toString(),value);
+ return env;
+ }
+
private static void stream_processing ( final Tree plan, final Environment env, final Function f ) {
- final Evaluator ef = Evaluator.evaluator;
if (streams.size() == 0)
throw new Error("No input streams in query");
ArrayList<DStream<?>> rdds = new ArrayList<DStream<?>>();
@@ -91,20 +111,15 @@ public class SparkStreaming extends SparkEvaluator {
for ( RDD<?> rdd: JavaConversions.seqAsJavaList(rdds) ) {
try {
if (rdd.count() == 0)
- rdd = SparkEvaluator.spark_context.emptyRDD().rdd();
+ return SparkEvaluator.spark_context.sc().emptyRDD(classtag);
} catch (Exception ex) {
return SparkEvaluator.spark_context.sc().emptyRDD(classtag);
};
new_env = new Environment(vars.get(i++),new MR_rdd(new JavaRDD(rdd,classtag)),new_env);
};
match plan {
- case lambda(tuple(...vs),tuple(...as)):
- int k = 0;
- for ( Tree v: vs )
- new_env.replace(v.toString(),ef.evalE(as.nth(k++),new_env));
- f.eval(null);
- case lambda(`v,`b):
- new_env.replace(v.toString(),ef.evalE(b,new_env));
+ case lambda(`pat,`e):
+ new_env = bind_list(pat,e,new_env);
f.eval(null);
case _: // non-incremental streaming
f.eval(ef.evalE(plan,new_env));
[2/2] incubator-mrql git commit: [MRQL-84] Improve performance of
Incremental MRQL
Posted by fe...@apache.org.
[MRQL-84] Improve performance of Incremental MRQL
Project: http://git-wip-us.apache.org/repos/asf/incubator-mrql/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mrql/commit/a624ffc5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mrql/tree/a624ffc5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mrql/diff/a624ffc5
Branch: refs/heads/master
Commit: a624ffc5063bac59b752c4eb92d3ac6073cc30cd
Parents: 807df96
Author: fegaras <fe...@cse.uta.edu>
Authored: Sat Feb 20 18:12:45 2016 -0600
Committer: fegaras <fe...@cse.uta.edu>
Committed: Sat Feb 20 18:12:45 2016 -0600
----------------------------------------------------------------------
.../org/apache/mrql/AlgebraicOptimization.gen | 2 +-
core/src/main/java/org/apache/mrql/Compiler.gen | 3 +-
.../main/java/org/apache/mrql/Interpreter.gen | 23 +-
.../java/org/apache/mrql/Materialization.gen | 2 +
.../java/org/apache/mrql/PlanGeneration.gen | 27 +-
core/src/main/java/org/apache/mrql/Printer.gen | 30 +-
.../src/main/java/org/apache/mrql/Streaming.gen | 1045 ++++++++++++------
.../java/org/apache/mrql/SystemFunctions.java | 8 +
core/src/main/java/org/apache/mrql/TopLevel.gen | 70 +-
.../main/java/org/apache/mrql/TypeInference.gen | 27 +-
.../java/org/apache/mrql/FlinkEvaluator.gen | 8 +
queries/incremental-pagerank.mrql | 16 +-
.../java/org/apache/mrql/SparkEvaluator.gen | 34 +-
.../org/apache/mrql/SparkFileInputStream.java | 2 +-
.../java/org/apache/mrql/SparkStreaming.gen | 33 +-
15 files changed, 914 insertions(+), 416 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/a624ffc5/core/src/main/java/org/apache/mrql/AlgebraicOptimization.gen
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/AlgebraicOptimization.gen b/core/src/main/java/org/apache/mrql/AlgebraicOptimization.gen
index b89dd3c..7f1fbcc 100644
--- a/core/src/main/java/org/apache/mrql/AlgebraicOptimization.gen
+++ b/core/src/main/java/org/apache/mrql/AlgebraicOptimization.gen
@@ -93,7 +93,7 @@ public class AlgebraicOptimization extends Simplification {
return translate(#<crossProduct(`mx,`my,lambda(`v,cmap(`m,`b)),`X,`Y)>);
case cmap(`r,`groupBy1(cmap(`m,`groupBy2(`s)))):
if (! #[groupBy,orderBy].member(#<`groupBy1>)
- && ! #[groupBy,orderBy].member(#<`groupBy2>))
+ || ! #[groupBy,orderBy].member(#<`groupBy2>))
fail;
return #<mapReduce(`(identity()),
`(translate(r)),
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/a624ffc5/core/src/main/java/org/apache/mrql/Compiler.gen
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/Compiler.gen b/core/src/main/java/org/apache/mrql/Compiler.gen
index 531205a..0c2ba3a 100644
--- a/core/src/main/java/org/apache/mrql/Compiler.gen
+++ b/core/src/main/java/org/apache/mrql/Compiler.gen
@@ -624,8 +624,7 @@ final public class Compiler extends Translator {
+compileF(superstep)+","+compileE(state)+","+o+","
+"new Bag[]{"+ds.substring(1)+"})";
case DataSetCollect(`s):
- return "(new Bag(((MR_dataset)(Interpreter.lookup_global_binding(\""+s.toString()
- +"\"))).dataset().take(Integer.MAX_VALUE)))";
+ return "Interpreter.dataSetCollect(\""+s+"\")";
case `v:
if (v.is_variable())
return v.toString();
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/a624ffc5/core/src/main/java/org/apache/mrql/Interpreter.gen
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/Interpreter.gen b/core/src/main/java/org/apache/mrql/Interpreter.gen
index 6627a12..9d99e8b 100644
--- a/core/src/main/java/org/apache/mrql/Interpreter.gen
+++ b/core/src/main/java/org/apache/mrql/Interpreter.gen
@@ -72,6 +72,8 @@ public class Interpreter extends TypeInference {
/** insert a new global variable binding visible to all nodes */
public final static void new_distributed_binding ( final String var, final MRData value ) {
+ if (value == null)
+ return;
new_global_binding(var,value);
if (Config.hadoop_mode) { // insert the binding in the hadoop configuration
String gvs = Plan.conf.get("mrql.global.vars");
@@ -222,6 +224,16 @@ public class Interpreter extends TypeInference {
return b.toString();
}
+ /** implements the DataSetCollect operation */
+ public static Bag dataSetCollect ( String var ) {
+ MRData val = lookup_global_binding(var);
+ if (val instanceof Bag)
+ return (Bag)val;
+ else if (val instanceof MR_dataset)
+ return new Bag(((MR_dataset)val).dataset().take(Integer.MAX_VALUE));
+ throw new Error("Cannot collect the MRQL value into a bag: "+val);
+ }
+
public static long pre_trace ( String msg ) {
tab_count += 3;
trace_count++;
@@ -302,6 +314,14 @@ public class Interpreter extends TypeInference {
return ((Tuple)evalE(x,env)).set((int)n.longValue(),evalE(v,env),evalE(ret,env));
case materialize(`u):
return MapReduceAlgebra.materialize(evalE(u,env));
+ case let(`v,DataSetCollect(`s),`body):
+ MRData x = evalE(s,env);
+ if (x instanceof MR_dataset)
+ x = new Bag(((MR_dataset)x).dataset().take(Integer.MAX_VALUE));
+ else if (x instanceof Bag)
+ ((Bag)x).materialize();
+ new_distributed_binding(v.toString(),x);
+ return evalE(body,new Environment(v.toString(),x,env));
case let(`v,`u,`body):
MRData x = evalE(u,env);
if (x instanceof Bag)
@@ -414,8 +434,7 @@ public class Interpreter extends TypeInference {
return b;
} catch (Exception ex) { throw new Error(ex); }
case DataSetCollect(`s):
- DataSet ds = Evaluator.evaluator.eval(s,env,"-");
- return new Bag(ds.take(Integer.MAX_VALUE));
+ return dataSetCollect(s.toString());
case dataset_size(`x):
return new MR_long(Plan.size(Evaluator.evaluator.eval(x,env,"-")) / (1024*1024));
case synchronize(`peer,`b):
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/a624ffc5/core/src/main/java/org/apache/mrql/Materialization.gen
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/Materialization.gen b/core/src/main/java/org/apache/mrql/Materialization.gen
index 4869d2e..c056e20 100644
--- a/core/src/main/java/org/apache/mrql/Materialization.gen
+++ b/core/src/main/java/org/apache/mrql/Materialization.gen
@@ -119,6 +119,8 @@ final public class Materialization extends Translator {
if (((LongLeaf)k).value() != unionM)
fail;
return new_domain(vars,x,new_domain(vars,y,d));
+ case callM(join_key,_,`x,`y):
+ return new_domain(vars,x,new_domain(vars,y,d));
case `f(...as):
Domains nd = new Domains(d.domains,d.repeats);
for ( Tree a: as )
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/a624ffc5/core/src/main/java/org/apache/mrql/PlanGeneration.gen
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/PlanGeneration.gen b/core/src/main/java/org/apache/mrql/PlanGeneration.gen
index e3901a8..1b724da 100644
--- a/core/src/main/java/org/apache/mrql/PlanGeneration.gen
+++ b/core/src/main/java/org/apache/mrql/PlanGeneration.gen
@@ -230,6 +230,12 @@ final public class PlanGeneration extends AlgebraicOptimization {
if (!free_variables(b,#[`var,`v]).is_empty())
fail;
return e;
+ case mapReduce(lambda(`v,`b),`r,`s,...):
+ if (!repeat_variables.member(s))
+ fail;
+ if (!free_variables(b,#[`var,`v]).is_empty())
+ fail;
+ return e;
case `f(...r):
for ( Tree a: r )
match get_nested_query(var,a) {
@@ -309,7 +315,7 @@ final public class PlanGeneration extends AlgebraicOptimization {
};
fail
case mapReduce(lambda(`vm,`bm),`nr,`s,`o):
- if (!is_dataset_expr(s))
+ if (!is_dataset_expr(s) || Config.bsp_mode)
fail;
Tree pm = makePlan(bm);
match get_nested_query(vm,pm) {
@@ -324,12 +330,22 @@ final public class PlanGeneration extends AlgebraicOptimization {
`(makePlan(nr)),
`(makePlan(s)),
`o))>;
+ case mapReduce(`km,`kr,`ks,`ko):
+ Tree nv = new_var();
+ pm = subst(ks,nv,pm);
+ TypeInference.global_type_env.insert(nv.toString(),TypeInference.type_inference(ks));
+ return #<let(`nv,DataSetCollect(`ks),
+ MapReduce(lambda(`vm,`pm),
+ `(makePlan(nr)),
+ `(makePlan(s)),
+ `o))>;
};
fail
// extract the mapReduce combiner
case mapReduce(lambda(`vm,`bm),lambda(`vr,`br),`s,`o):
if (!Config.use_combiner || !is_dataset_expr(s))
fail;
+ TypeInference.type_inference(e);
Aggregates.clear();
Tree nv = new_var();
match TypeInference.type_inference(bm) {
@@ -472,6 +488,9 @@ final public class PlanGeneration extends AlgebraicOptimization {
fail;
repeat_variables = repeat_variables.append(vs);
return #<Loop(lambda(tuple(...vs),`(makePlan(b))),`(makePlan(#<tuple(...s)>)),`(makePlan(n)))>;
+ case incr(`z,lambda(tuple(`s,`v),`m),`a):
+ repeat_variables = repeat_variables.append(v);
+ return #<incr(`(makePlan(z)),lambda(tuple(`s,`v),`(makePlan(m))),`(makePlan(a)))>;
case record(...bl):
Trees el = #[];
for ( Tree b: bl )
@@ -731,7 +750,9 @@ final public class PlanGeneration extends AlgebraicOptimization {
case MapReduce2(`mx,`my,`r,`x,`y,`o):
return physical_plan(#<MapAggregateReduce2(`mx,`my,`r,null,null,`x,`y,`o)>);
case MapAggregateReduce2(`mx,`my,lambda(`v,`b),`acc,`zero,`x,`y,false):
- if (!Config.flink_mode)
+ // dissable mapjoins in Flink and Spark mode, because to select between a mapjoin
+ // and a coGroup at run-time, it needs to know the input sizes
+ if (!Config.flink_mode && !Config.spark_mode)
fail;
return #<MapAggregateReduce2(`mx,`my,lambda(`v,`b),`acc,`zero,
`(physical_plan(x)),
@@ -814,7 +835,7 @@ final public class PlanGeneration extends AlgebraicOptimization {
case CrossProduct(`mx,`my,`r,`x,`y):
return physical_plan(#<CrossAggregateProduct(`mx,`my,`r,null,null,`x,`y)>);
case CrossAggregateProduct(`mx,`my,lambda(`v,`b),`acc,`zero,`x,`y):
- if (Config.flink_mode)
+ if (Config.flink_mode || Config.spark_mode)
fail;
Tree X = new_var();
Tree Y = new_var();
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/a624ffc5/core/src/main/java/org/apache/mrql/Printer.gen
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/Printer.gen b/core/src/main/java/org/apache/mrql/Printer.gen
index b75de46..18975c9 100644
--- a/core/src/main/java/org/apache/mrql/Printer.gen
+++ b/core/src/main/java/org/apache/mrql/Printer.gen
@@ -398,8 +398,17 @@ public class Printer {
return f+" ("+v+"):\n"+tab(n+3)+"init: "+print_plan(s,n+9,true)+"\n"
+tab(n+3)+"step: "+print_plan(b,n+9,true);
case Let(`v,`u,`body):
- return "let "+v+" = "+print_plan(u,n+10+v.toString().length(),pv)+"\n"
+ return "Let "+v+" = "+print_plan(u,n+10+v.toString().length(),pv)+"\n"
+tab(n)+print_plan(body,n,pv);
+ case let(`v,`u,`body):
+ if (print_plan(u,0,pv).length() > 0)
+ return "let "+v+" = "+print_plan(u,n+10+v.toString().length(),pv)+"\n"
+ +tab(n)+print_plan(body,n,pv);
+ else return print_plan(body,n,pv);
+ case lambda(tuple(...vs),`b):
+ return "lambda "+vs+":\n"+tab(n+3)+print_plan(b,n+9,true);
+ case lambda(`v,`b):
+ return "lambda ("+v+"):\n"+tab(n+3)+print_plan(b,n+9,true);
case If(_,`x1,If(_,`x2,If(_,`x3,`x4))):
return "Choice 1: "+print_plan(x1,n+10,pv)+"\n"
+tab(n)+"Choice 2: "+print_plan(x2,n+10,pv)+"\n"
@@ -412,6 +421,25 @@ public class Printer {
case If(`c,`x,`y):
return "Choice 1: "+print_plan(x,n+10,pv)+"\n"
+tab(n)+"Choice 2: "+print_plan(y,n+10,pv);
+ case tuple():
+ return "( )";
+ case tuple(...ts):
+ String s = "( ";
+ if (print_plan(ts.head(),0,pv).length() > 0)
+ s += print_plan(ts.head(),n+2,pv)+"\n";
+ for ( Tree t: ts.tail() )
+ if (print_plan(t,0,pv).length() > 0)
+ s += tab(n+2)+print_plan(t,n+2,pv)+"\n";
+ if (s.equals("( "))
+ return "";
+ else return s+tab(n)+")";
+ case incr(`z,lambda(tuple(...s),`h),lambda(tuple(...t),`a)):
+ return "Incremental:\n"+tab(n+3)+"State Transformer "+s+": "+print_plan(h,n+21+s.toString().length(),true)+"\n"
+ +tab(n+3)+"Answer "+t+": "+print_plan(a,n+10+t.toString().length(),true);
+ case incr(`z,lambda(`s,`h),lambda(`t,`a)):
+ return "Incremental:\n"+tab(n+3)+"Zero: "+print_plan(z,n+9,false)+"\n"
+ +tab(n+3)+"State Transformer ("+s+"): "+print_plan(h,n+23+s.toString().length(),true)+"\n"
+ +tab(n+3)+"Answer ("+t+"): "+print_plan(a,n+12+t.toString().length(),true);
case `f(...as):
String s = "";
for (Tree a: as) {
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/a624ffc5/core/src/main/java/org/apache/mrql/Streaming.gen
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/Streaming.gen b/core/src/main/java/org/apache/mrql/Streaming.gen
index 01e754c..99ce205 100644
--- a/core/src/main/java/org/apache/mrql/Streaming.gen
+++ b/core/src/main/java/org/apache/mrql/Streaming.gen
@@ -21,6 +21,7 @@ import org.apache.mrql.gen.*;
/** Generates code for streaming queries */
final public class Streaming extends AlgebraicOptimization {
+ private static int istate = 0;
// set it to true to debug the monoid inference
private static boolean inference_tracing = false;
// set it to true to debug the split function
@@ -122,9 +123,7 @@ final public class Streaming extends AlgebraicOptimization {
return #<none>;
else if (mx.equals(#<fixed>) && my.equals(#<fixed>))
return #<fixed>;
- else if (my.equals(#<fixed>))
- return #<groupBy(product(`mx,`my))>;
- else return #<groupBy(product(fixed,`my))>;
+ else return #<groupBy(product(`mx,`my))>;
case cmap(lambda(`v,bag(`w)),`x):
if (!w.equals(v))
fail;
@@ -160,12 +159,15 @@ final public class Streaming extends AlgebraicOptimization {
return #<`gb2(`m2)>;
case union:
return #<union>;
- case fixed: return #<fixed>;
+ case fixed:
+ return #<fixed>;
}
case fixed:
match inference(b,new Environment(v.toString(),#<fixed>,env)) {
- case union: return #<union>;
- case fixed: return #<fixed>;
+ case union:
+ return #<union>;
+ case fixed:
+ return #<fixed>;
case `gb(`m):
if (! #[groupBy,orderBy].member(#<`gb>) )
fail;
@@ -239,31 +241,25 @@ final public class Streaming extends AlgebraicOptimization {
case product(...ms):
return ms.nth(n);
};
- fail
+ return #<none>;
case project(`u,`a):
match inference(u,env) {
case fixed:
return #<fixed>;
};
- fail
+ return #<none>;
+ case if(`p,`e1,`e2):
+ return inference(e1,env);
case bag(`u):
match inference(u,env) {
case fixed:
return #<fixed>;
};
- fail
+ return #<none>;
case call(stream,...):
return #<union>;
case call(source,...):
return #<fixed>;
- case call(`f,`s):
- for ( Tree monoid: monoids )
- match monoid {
- case `aggr(`mtp,`plus,`zero,`unit):
- if (aggr.equals(f.toString()))
- return #<monoid(`plus,`zero,`unit)>;
- };
- fail
case call(`f,...s):
Trees ms = #[ ];
for ( Tree x: s )
@@ -278,16 +274,20 @@ final public class Streaming extends AlgebraicOptimization {
fixed &= mm.equals(#<fixed>);
if (fixed)
return #<fixed>;
- fail
+ return #<none>;
case bind(_,`u):
return inference(u,env);
+ case typed(`u,_):
+ return inference(u,env);
case _:
if (e.is_string() || e.is_long() || e.is_double())
return #<fixed>;
else if (e.is_variable())
if (repeat_var(e))
return find(e.toString(),repeat_environment);
- else return find(e.toString(),env);
+ else if (member(e.toString(),env))
+ return find(e.toString(),env);
+ else return #<fixed>;
};
return #<none>;
}
@@ -296,7 +296,7 @@ final public class Streaming extends AlgebraicOptimization {
return inference(e,null);
}
- private static Tree cmap1 ( Tree f, Tree e ) {
+ private static Tree smap1 ( Tree f, Tree e ) {
Tree nv = new_var();
Tree b = new_var();
return #<cmap(lambda(`nv,cmap(lambda(`b,bag(tuple(nth(`nv,0),`b))),
@@ -304,47 +304,64 @@ final public class Streaming extends AlgebraicOptimization {
`e)>;
}
- private static Tree cmap2 ( Tree f, Tree e ) {
+ private static Tree smap2 ( Tree f, Tree e ) {
Tree nv = new_var();
Tree nw = new_var();
- return #<cmap(lambda(`nv,cmap(lambda(`nw,bag(tuple(tuple(nth(`nw,0),nth(`nv,0)),nth(`nw,1)))),
+ return #<cmap(lambda(`nv,cmap(lambda(`nw,bag(tuple(nth(`nw,0),
+ tuple(nth(`nv,0),nth(`nw,1))))),
apply(`f,tuple(nth(nth(`nv,0),0),nth(`nv,1))))),
`e)>;
}
- private static Tree cmap3 ( Tree f, Tree e ) {
+ private static Tree smap3 ( Tree f, Tree e ) {
Tree nv = new_var();
Tree nw = new_var();
- return #<cmap(lambda(`nv,cmap(lambda(`nw,bag(tuple(tuple(nth(`nw,0),tuple()),nth(`nw,1)))),
+ return #<cmap(lambda(`nv,cmap(lambda(`nw,bag(tuple(nth(`nw,0),
+ tuple(tuple(),nth(`nw,1))))),
apply(`f,`nv))),
`e)>;
}
+ private static Tree swap ( Tree e ) {
+ Tree nv = new_var();
+ return #<cmap(lambda(`nv,bag(tuple(tuple(nth(`nv,0),nth(nth(`nv,1),0)),
+ nth(nth(`nv,1),1)))),
+ `e)>;
+ }
+
+ private static Tree mix ( Tree e ) {
+ Tree nv = new_var();
+ Tree x = new_var();
+ Tree y = new_var();
+ Tree b = #<bag(tuple(tuple(nth(`nv,0),
+ tuple(nth(`x,0),nth(`y,0))),
+ tuple(nth(`x,1),nth(`y,1))))>;
+ return #<cmap(lambda(`nv,cmap(lambda(`x,cmap(lambda(`y,`b),
+ groupBy(nth(nth(`nv,1),1)))),
+ groupBy(nth(nth(`nv,1),0)))),
+ `e)>;
+ }
+
/** inject lineage (all groupBy/join keys) to a query e
* @param e an MRQL query
* @return an algebraic term that associates lineage to every query result
*/
public static Tree inject_q ( Tree e ) {
match e {
- case tuple(...as):
- Trees bs = #[ ];
- for ( Tree a: as )
- bs = bs.append(inject_q(a));
- return #<tuple(...bs)>;
case reduce(`m,cmap(`f,call(stream,...s))):
- return e;
+ return #<bag(tuple(tuple(),`e))>;
case reduce(`m,cmap(`f,`v)):
if (!repeat_var(v))
fail;
- return e;
+ return #<bag(tuple(tuple(),`e))>;
case reduce(`m,cmap(`f,`u)):
- return #<reduce(groupBy(`m),`(cmap1(f,inject_e(u))))>;
+ return #<reduce(groupBy(`m),`(smap1(f,inject_e(u))))>;
case reduce(`m,call(stream,...s)):
- return e;
+ return #<bag(tuple(tuple(),`e))>;
case reduce(`m,`v):
if (!repeat_var(v))
fail;
- return e;
+ return #<bag(tuple(tuple(),`e))>;
case cmap(`f,call(stream,...s)):
Tree a = new_var();
Tree b = new_var();
@@ -360,14 +377,18 @@ final public class Streaming extends AlgebraicOptimization {
apply(`f,`a))),
`v)>;
case cmap(`f,`u):
- return cmap1(f,inject_e(u));
- case `f(...as):
- Trees bs = #[ ];
- for ( Tree a: as )
- bs = bs.append(inject_q(a));
- return #<`f(...bs)>;
+ return smap1(f,inject_e(u));
};
- return e;
+ // otherwise, it's a constant
+ return #<bag(tuple(tuple(),`e))>;
+ }
+
+ private static boolean stream_source ( Tree e ) {
+ match e {
+ case call(stream,...):
+ return true;
+ };
+ return repeat_var(e);
}
/** inject lineage (all groupBy/join keys) to a groupBy/join term e
@@ -377,10 +398,46 @@ final public class Streaming extends AlgebraicOptimization {
static Tree inject_e ( Tree e ) {
match e {
case `gb(`c):
- if (! #[groupBy,orderBy].member(#<`gb>) ) fail;
- return #<`gb(`(inject_c(c)))>;
+ if (! #[groupBy,orderBy].member(#<`gb>) )
+ fail;
+ return #<`gb(`(swap(inject_c(c))))>;
case coGroup(`c1,`c2):
- return #<coGroup(`(inject_c(c1)),`(inject_c(c2)))>;
+ match c1 {
+ case cmap(_,`u):
+ if (!stream_source(u))
+ fail;
+ match c2 {
+ case cmap(_,`s):
+ if (!stream_source(s))
+ fail;
+ Tree nv = new_var();
+ return #<cmap(lambda(`nv,bag(tuple(tuple(nth(`nv,0),tuple(tuple(),tuple())),
+ nth(`nv,1)))),
+ coGroup(`c1,`c2))>;
+ };
+ Tree nv = new_var();
+ Tree y = new_var();
+ return #<cmap(lambda(`nv,cmap(lambda(`y,bag(tuple(tuple(nth(`nv,0),
+ tuple(tuple(),nth(`y,0))),
+ tuple(nth(nth(`nv,1),0),
+ nth(`y,1))))),
+ groupBy(nth(nth(`nv,1),1)))),
+ coGroup(`c1,`(inject_c(c2))))>;
+ };
+ match c2 {
+ case cmap(_,`u):
+ if (!stream_source(u))
+ fail;
+ Tree nv = new_var();
+ Tree x = new_var();
+ return #<cmap(lambda(`nv,cmap(lambda(`x,bag(tuple(tuple(nth(`nv,0),
+ tuple(nth(`x,0),tuple())),
+ tuple(nth(`x,1),
+ nth(nth(`nv,1),1))))),
+ groupBy(nth(nth(`nv,1),0)))),
+ coGroup(`(inject_c(c1)),`c2))>;
+ };
+ return mix(#<coGroup(`(inject_c(c1)),`(inject_c(c2)))>);
};
return inject_c(e);
}
@@ -392,108 +449,30 @@ final public class Streaming extends AlgebraicOptimization {
static Tree inject_c ( Tree e ) {
match e {
case cmap(`f,call(stream,...s)):
- return cmap3(f,#<call(stream,...s)>);
+ return smap3(f,#<call(stream,...s)>);
case cmap(`f,`v):
if (!repeat_var(v))
fail;
- return cmap3(f,v);
+ return smap3(f,v);
case cmap(`f,`u):
- return cmap2(f,inject_e(u));
+ return smap2(f,inject_e(u));
case call(stream,...s):
- return cmap3(#<lambda(x,bag(x))>,#<call(stream,...s)>);
+ return smap3(#<lambda(x,bag(x))>,#<call(stream,...s)>);
case `v:
if (!repeat_var(v))
fail;
- return cmap3(#<lambda(x,bag(x))>,v);
+ return smap3(#<lambda(x,bag(x))>,v);
};
return e;
}
- /** Return the answer function of a query e
- * @param e a query
- * @param var the input of the answer function
- * @param merge the monoid associated with the input
- * @return the answer function
- */
- static Tree answer ( Tree e, Tree var, Tree merge ) {
- match e {
- case tuple(...as):
- match merge {
- case product(...ms):
- Trees bs = #[ ];
- int i = 0;
- for ( Tree a: as )
- bs = bs.append(answer(a,#<nth(`var,`i)>,ms.nth(i++)));
- return #<tuple(...bs)>;
- }
- case record(...as):
- match merge {
- case record(...ms):
- Trees bs = #[ ];
- int i = 0;
- for ( Tree a: as )
- match a {
- case bind(`v,`u):
- match ms.nth(i++) {
- case bind(_,`m):
- bs = bs.append(#<bind(`v,`(answer(u,#<project(`var,`v)>,m)))>);
- }
- }
- return #<record(...bs)>;
- }
- case reduce(`m,cmap(`f,`u)):
- match merge {
- case `gb(_):
- if (! #[groupBy,orderBy].member(#<`gb>) )
- fail;
- Tree nv = new_var();
- return #<reduce(`m,cmap(lambda(`nv,bag(nth(`nv,1))),`var))>;
- case _: // total aggregation
- return var;
- }
- case reduce(`m,call(stream,...s)):
- return var;
- case reduce(`m,`v):
- if (!repeat_var(v))
- fail;
- return var;
- case cmap(`f,call(stream,...s)):
- Tree nv = new_var();
- return #<cmap(lambda(`nv,bag(nth(`nv,1))),`var)>;
- case cmap(`f,`v):
- if (!repeat_var(v))
- fail;
- Tree nv = new_var();
- return #<cmap(lambda(`nv,bag(nth(`nv,1))),`var)>;
- case cmap(`f,`u):
- match merge {
- case `gb(`m):
- if (! #[groupBy,orderBy].member(#<`gb>) )
- fail;
- Tree nv = new_var();
- match TypeInference.type_inference(var) {
- case `T(tuple(tuple(_,tuple()),_)):
- // only one key => don't group-by
- return #<cmap(lambda(`nv,bag(nth(`nv,1))),`var)>;
- };
- Tree nw = new_var();
- Tree nn = new_var();
- return #<cmap(lambda(`nv,cmap(lambda(`nn,bag(reduce(`m,`nn))),
- nth(`nv,1))),
- `gb(cmap(lambda(`nw,bag(tuple(nth(nth(`nw,0),0),nth(`nw,1)))),
- `var)))>;
- }
- };
- return var;
- }
-
/** find all homomorphic terms in the algebraic term e
* @param e an algebraic term
* @param env binds variables to monoids
* @return a list of homomorphic terms
*/
static Trees find_homomorphisms ( Tree e, Environment env ) {
- if (! #[none].member(inference(e,env)))
+ if (! #[none,fixed].member(inference(e,env)))
return #[ `e ];
match e {
case cmap(lambda(`v,bag(`b)),`x):
@@ -520,97 +499,85 @@ final public class Streaming extends AlgebraicOptimization {
for ( Tree d: ds )
match d {
case bind(`n,`a):
- bs = bs.append(find_homomorphisms(a,env));
+ bs = union(bs,find_homomorphisms(a,env));
};
return bs;
case call(`f,...as):
Trees bs = #[ ];
for ( Tree a: as )
- bs = bs.append(find_homomorphisms(a,env));
+ bs = union(bs,find_homomorphisms(a,env));
+ return bs;
+ case nth(`u,`n):
+ Trees bs = #[ ];
+ for ( Tree x: find_homomorphisms(u,env) )
+ bs = union(bs,#[nth(`x,`n)]);
+ return bs;
+ case project(`u,`a):
+ Trees bs = #[ ];
+ for ( Tree x: find_homomorphisms(u,env) )
+ bs = union(bs,#[project(`x,`a)]);
return bs;
case `f(...as):
Trees bs = #[ ];
for ( Tree a: as )
- bs = bs.append(find_homomorphisms(a,env));
+ bs = union(bs,find_homomorphisms(a,env));
return bs;
};
return #[ ];
}
- /** Split a cMap term e into a homomorhism and an answer function
- * @param e a cMap term
+ /** Split the term e into a homomorhism and an answer function
+ * @param e an algebraic term
* @param var the input variable of the answer function
- * @param cmap_var the cmap variable
* @param env binds variables to monoids
* @return a pair (answer,homomorphism)
*/
- static Tree split ( Tree e, Tree var, Tree cmap_var, Environment env ) {
+ static Tree split ( Tree e, Tree var, Environment env ) {
if (split_tracing) {
tab_count += 3;
- System.out.println(Interpreter.tabs(tab_count)+var+" "+cmap_var+"\n"+e.pretty(0));
+ System.out.println(Interpreter.tabs(tab_count)+var+"\n"
+ +Interpreter.tabs(tab_count)+e.pretty(tab_count));
};
- Tree res = split_trace(e,var,cmap_var,env);
+ Tree res = split_trace(e,var,env);
if (split_tracing) {
- System.out.println(Interpreter.tabs(tab_count)+"-> "+res.pretty(0));
+ System.out.println(Interpreter.tabs(tab_count)+"-> "+res.pretty(tab_count+3));
tab_count -= 3;
};
return res;
}
- private static Tree split_trace ( Tree e, Tree var, Tree cmap_var, Environment env ) {
+ /** Split the term e into a homomorhism and an answer function
+ * @param e an algebraic term
+ * @param var the input variable of the answer function
+ * @param env binds variables to monoids
+ * @return a pair (answer,homomorphism)
+ */
+ static Tree split_trace ( Tree e, Tree var, Environment env ) {
match e {
- case cmap(lambda(`v,bag(tuple(nth(`k,0),`b))),`u):
- if (!cmap_var.equals(k))
- fail;
- match inference(u,env) {
- case `gb(`m):
- if (! #[groupBy,orderBy].member(#<`gb>) )
- fail;
- Trees hs = find_homomorphisms(b,new Environment(v.toString(),#<product(fixed,`m)>,env));
- if (hs.length() == 1 && hs.nth(0).equals(b))
- return #<pair(`var,`e)>;
- Tree nv = new_var();
- Tree ne = b;
- int i = 0;
- for ( Tree h: hs ) {
- ne = subst(h,#<nth(`nv,`i)>,ne);
- i++;
- };
- return #<pair(cmap(lambda(`nv,bag(`ne)),`var),
- cmap(lambda(`v,bag(tuple(nth(`k,0),tuple(...hs)))),`u))>;
- case fixed:
- Trees hs = find_homomorphisms(b,new Environment(v.toString(),#<fixed>,env));
- if (hs.length() == 1 && hs.nth(0).equals(b))
- return #<pair(`var,`e)>;
- Tree nv = new_var();
- Tree ne = b;
- int i = 0;
- for ( Tree h: hs ) {
- ne = subst(h,#<nth(`nv,`i)>,ne);
- i++;
- };
- return #<pair(cmap(lambda(`nv,bag(`ne)),`var),
- cmap(lambda(`v,bag(tuple(nth(`k,0),tuple(...hs)))),`u))>;
- }
case cmap(lambda(`v,bag(tuple(`k,`b))),`u):
+ Environment nenv = env;
+ Tree nv = new_var();
match inference(u,env) {
+ case union:
+ nv = #<nth(`nv,1)>;
+ nenv = new Environment(v.toString(),#<union>,nenv);
case `gb(`m):
if (! #[groupBy,orderBy].member(#<`gb>) )
fail;
- Trees hs = find_homomorphisms(b,new Environment(v.toString(),#<product(fixed,`m)>,env));
- if (hs.length() == 1 && hs.nth(0).equals(b))
- return #<pair(`var,`e)>;
- Tree nv = new_var();
- Tree ne = b;
- int i = 0;
- for ( Tree h: hs ) {
- ne = subst(h,#<nth(`nv,`i)>,ne);
- i++;
- };
- return #<pair(cmap(lambda(`nv,bag(`ne)),`var),
- cmap(lambda(`v,bag(tuple(`k,tuple(...hs)))),`u))>;
- }
+ nenv = new Environment(v.toString(),#<product(fixed,`m)>,nenv);
+ };
+ Trees hs = find_homomorphisms(b,nenv);
+ if (hs.length() == 1 && hs.nth(0).equals(b))
+ return #<pair(`var,`e)>;
+ Tree ne = b;
+ int i = 0;
+ for ( Tree h: hs ) {
+ ne = subst(h,#<nth(nth(`nv,1),`i)>,ne);
+ i++;
+ };
+ return #<pair(cmap(lambda(`nv,bag(tuple(nth(`nv,0),`ne))),`var),
+ cmap(lambda(`v,bag(tuple(`k,tuple(...hs)))),`u))>;
case cmap(lambda(`v,`b),`u):
match inference(u,env) {
case `gb(`m):
@@ -626,38 +593,54 @@ final public class Streaming extends AlgebraicOptimization {
Tree nv = new_var();
match split(b,nv,nenv) {
case pair(`a,`h):
- a = subst(nv,var,a);
- return #<pair(`a,cmap(lambda(`v,`h),`u))>;
+ Tree nw = new_var();
+ if (occurences(v,a) == 0) {
+ a = subst(nv,var,a);
+ return #<pair(`a,cmap(lambda(`v,`h),`u))>;
+ };
+ // The answer function needs v; need to piggyback v along with the state
+ a = subst(nv,#<bag(tuple(nth(`nw,0),nth(nth(`nw,1),1)))>,
+ subst(v,#<nth(nth(`nw,1),0)>,a));
+ return #<pair(cmap(lambda(`nw,`a),`var),
+ cmap(lambda(`v,cmap(lambda(`nv,bag(tuple(nth(`nv,0),
+ tuple(`v,nth(`nv,1))))),
+ `h)),
+ `u))>;
}
}
- case fixed:
+ case `m:
+ if (! #[union,fixed].member(m))
+ fail;
Tree nv = new_var();
- match split(b,nv,new Environment(v.toString(),#<fixed>,env)) {
+ match split(b,nv,new Environment(v.toString(),m,env)) {
case pair(`a,`h):
- a = subst(nv,var,a);
- a = subst(cmap_var,nv,a);
- return #<pair(`a,cmap(lambda(`v,`h),`u))>;
+ Tree nw = new_var();
+ if (occurences(v,a) == 0) {
+ a = subst(nv,var,a);
+ return #<pair(`a,cmap(lambda(`v,`h),`u))>;
+ };
+ // The answer function needs v; need to piggyback v along with the state
+ a = subst(nv,#<bag(tuple(nth(`nw,0),nth(nth(`nw,1),1)))>,
+ subst(v,#<nth(nth(`nw,1),0)>,a));
+ return #<pair(cmap(lambda(`nw,`a),`var),
+ cmap(lambda(`v,cmap(lambda(`nv,bag(tuple(nth(`nv,0),
+ tuple(`v,nth(`nv,1))))),
+ `h)),
+ `u))>;
}
}
- };
- return #<fail>;
- }
-
- /** Split the term e into a homomorhism and an answer function
- * @param e an algebraic term
- * @param var the input variable of the answer function
- * @param env binds variables to monoids
- * @return a pair (answer,homomorphism)
- */
- static Tree split ( Tree e, Tree var, Environment env ) {
- match e {
- case cmap(lambda(`v,`b),`u):
- match split(e,var,v,env) {
- case fail:
- return #<pair(cmap(lambda(`v,`b),`var),`u)>;
- case `t:
- return t;
- }
+ case call(stream,...):
+ return #<pair(`var,`e)>;
+ case reduce(`m,`u):
+ match inference(u,env) {
+ case union:
+ return #<pair(`var,`e)>;
+ case _:
+ match split(u,var,env) {
+ case pair(`b,`h):
+ return #<pair(reduce(`m,`b),`h)>;
+ }
+ };
case tuple(...as):
Trees bs = #[ ];
Trees hs = #[ ];
@@ -682,8 +665,15 @@ final public class Streaming extends AlgebraicOptimization {
}
};
return #<pair(record(...bs),record(...hs))>;
- case reduce(`m,`u):
- return #<pair(`var,`e)>;
+ case call(`f,...as):
+ if (! #[fixed].member(inference(e,env)))
+ fail;
+ return #<pair(`e,tuple())>;
+ case call(`f,`a):
+ match split(a,var,env) {
+ case pair(`b,`h):
+ return #<pair(call(`f,`b),`h)>;
+ };
case call(`f,...as):
Tree nv = new_var();
Trees bs = #[ ];
@@ -696,6 +686,33 @@ final public class Streaming extends AlgebraicOptimization {
hs = hs.append(h);
};
return #<pair(apply(lambda(`nv,call(`f,...bs)),`var),tuple(...hs))>;
+ case nth(`u,`n):
+ match split(u,var,env) {
+ case pair(`b,`h):
+ return #<pair(nth(`b,`n),`h)>;
+ };
+ case project(`u,`a):
+ match split(u,var,env) {
+ case pair(`b,`h):
+ return #<pair(project(`b,`a),`h)>;
+ };
+ case bag(tuple(`k,`u)):
+ Tree ne = #<nth(call(elem,`var),1)>;
+ match TypeInference.type_inference(u) {
+ case `S(_):
+ if (!is_persistent_collection(S))
+ fail;
+ ne = #<nth(call(elem,Collect(`var)),1)>;
+ };
+ match split(u,ne,env) {
+ case pair(`b,`h):
+ return #<pair(`b,bag(tuple(`k,`h)))>;
+ };
+ case if(`p,`u,bag()):
+ match split(u,var,env) {
+ case pair(`b,`h):
+ return #<pair(`b,if(`p,`h,bag()))>;
+ };
case `f(...as):
Tree nv = new_var();
Trees bs = #[ ];
@@ -709,7 +726,82 @@ final public class Streaming extends AlgebraicOptimization {
};
return #<pair(apply(lambda(`nv,`f(...bs)),`var),tuple(...hs))>;
};
- return #<pair(`e,tuple())>;
+ return #<pair(`var,`e)>;
+ }
+
+ /** Return the monoid of a query e */
+ private static Tree get_monoid ( Tree e, Environment env ) {
+ match e {
+ case reduce(`m,`u):
+ return #<groupBy(`m)>;
+ case cmap(`f,call(stream,...s)):
+ return #<union>;
+ case cmap(`f,`v):
+ if (!repeat_var(v))
+ fail;
+ return #<union>;
+ case cmap(lambda(`v,`b),`u):
+ match inference(u,env) {
+ case groupBy(`m):
+ return get_monoid(b,new Environment(v.toString(),#<product(fixed,`m)>,env));
+ case `m:
+ return get_monoid(b,env);
+ }
+ case bag(tuple(`k,`v)):
+ Tree m = inference(v,env);
+ return #<groupBy(`m)>;
+ case if(`p,`e1,bag()):
+ return get_monoid(e1,env);
+ };
+ throw new Error("Unable to find the merge function for: "+e);
+ //return #<groupBy(fixed)>;
+ }
+
+ private static Tree convert_reduce_all ( Tree m, Tree e ) {
+ match m {
+ case fixed:
+ return #<call(elem,`e)>;
+ case union:
+ Tree v = new_var();
+ return #<cmap(lambda(`v,`v),`e)>;
+ case product(...ps):
+ int i = 0;
+ Trees as = #[ ];
+ for ( Tree p: ps ) {
+ Tree nv = new_var();
+ as = as.append(convert_reduce_all(p,#<cmap(lambda(`nv,bag(nth(`nv,`(i++)))),`e)>));
+ };
+ return #<tuple(...as)>;
+ case record(...bs):
+ Trees as = #[ ];
+ for ( Tree b: bs )
+ match b {
+ case bind(`n,`p):
+ Tree nv = new_var();
+ Tree nb = convert_reduce_all(p,#<cmap(lambda(`nv,bag(project(`nv,`n))),`e)>);
+ as = as.append(#<bind(`n,`nb)>);
+ };
+ return #<record(...as)>;
+ case `gb(`gm):
+ if (! #[groupBy,orderBy].member(#<`gb>) )
+ fail;
+ Tree v = new_var();
+ Tree me = convert_reduce_all(gm,#<nth(`v,1)>);
+ return #<cmap(lambda(`v,bag(tuple(nth(`v,0),`me))),
+ `gb(`e))>;
+ case _:
+ if (!m.is_variable())
+ fail;
+ for ( Tree monoid: monoids )
+ match monoid {
+ case `aggr(`mtp,`plus,`zero,`unit):
+ if (#<`aggr>.equals(m)) {
+ plus = Normalization.rename(plus);
+ return #<aggregate(`plus,`zero,`e)>;
+ }
+ };
+ };
+ return #<call(elem,`e)>;
}
/** Convert a reduce on the monoid m to an aggregation */
@@ -717,6 +809,8 @@ final public class Streaming extends AlgebraicOptimization {
match m {
case fixed:
return e;
+ case union:
+ return e;
case product(...ps):
int i = 0;
Trees as = #[ ];
@@ -732,15 +826,21 @@ final public class Streaming extends AlgebraicOptimization {
};
return #<record(...as)>;
case `gb(`gm):
- if (! #[groupBy,orderBy].member(#<`gb>) ) fail;
+ if (! #[groupBy,orderBy].member(#<`gb>) )
+ fail;
Tree v = new_var();
- Tree me = convert_reduce(gm,#<nth(`v,1)>);
- return #<cmap(lambda(`v,bag(tuple(nth(`v,0),`me))),`gb(`e))>;
- };
- match e {
- // FIX: may have more cases
- case cmap(...):
- return #<call(`m,`e)>;
+ Tree me = convert_reduce_all(gm,#<nth(`v,1)>);
+ return #<cmap(lambda(`v,bag(tuple(nth(`v,0),`me))),
+ `gb(`e))>;
+ case _:
+ if (!m.is_variable())
+ fail;
+ for ( Tree monoid: monoids )
+ match monoid {
+ case `aggr(`mtp,`plus,`zero,`unit):
+ if (#<`aggr>.equals(m))
+ return #<call(`aggr,`e)>;
+ };
};
return e;
}
@@ -754,12 +854,14 @@ final public class Streaming extends AlgebraicOptimization {
Tree vy = new_var();
Tree cx = convert_to_algebra(x);
Tree cy = convert_to_algebra(y);
+ Tree mx = new_var();
+ Tree my = new_var();
return #<join(lambda(`vx,nth(`vx,0)),lambda(`vy,nth(`vy,0)),
lambda(`v,bag(tuple(call(join_key,nth(`v,0),nth(`v,1)),
tuple(cmap(lambda(`vx,bag(nth(`vx,1))),nth(`v,0)),
cmap(lambda(`vy,bag(nth(`vy,1))),nth(`v,1)))))),
`cx, `cy)>;
- case reduce(`aggr,`s):
+ case reduce(`aggr,`s):
return convert_reduce(aggr,convert_to_algebra(s));
case `f(...as):
Trees bs = #[ ];
@@ -803,31 +905,72 @@ final public class Streaming extends AlgebraicOptimization {
return e;
}
+ /** true if there is only one instance value of type tp */
+ private static boolean unique_key ( Tree tp ) {
+ match tp {
+ case tuple():
+ return true;
+ case `T(`t):
+ if (!is_collection(T))
+ fail;
+ return unique_key(t);
+ case tuple(...as):
+ for ( Tree a: as )
+ if (!unique_key(a))
+ return false;
+ return true;
+ case record(...as):
+ for ( Tree a: as )
+ match a {
+ case bind(_,`b):
+ if (!unique_key(b))
+ return false;
+ }
+ return true;
+ };
+ return false;
+ }
+
/** Return the merge function (over X and Y) of the monoid m; type is used for key equality */
private static Tree merge ( Tree m, Tree type, Tree X, Tree Y ) {
match m {
case `gb(`n):
- // needs outer-join in function r
if (! #[groupBy,orderBy].member(#<`gb>) )
fail;
- Tree v = new_var();
- Tree vx = new_var();
- Tree vy = new_var();
- Tree mx = new_var();
- Tree my = new_var();
match type {
case `T(tuple(`keytp,`tp)):
+ if (unique_key(keytp)) {
+ Tree vx = new_var();
+ Tree vy = new_var();
+ Tree v = merge(n,tp,#<nth(call(elem,`vx),1)>,#<nth(call(elem,`vy),1)>);
+ if (is_persistent_collection(T)) {
+ X = #<Collect(`X)>;
+ Y = #<Collect(`Y)>;
+ };
+ return #<let(`vx,`X,
+ let(`vy,`Y,
+ if(call(exists,`vx),
+ if(call(exists,`vy),
+ bag(tuple(`keytp,`v)),
+ `vx),
+ `vy)))>;
+ };
+ // needs an outer-join in the reducer function
+ Tree v = new_var();
+ Tree vx = new_var();
+ Tree vy = new_var();
+ Tree mx = new_var();
+ Tree my = new_var();
+ Tree mb = merge(n,tp,#<nth(`vx,1)>,#<nth(`vy,1)>);
+ Tree b = #<cmap(lambda(`vx,cmap(lambda(`vy,bag(tuple(nth(`vx,0),`mb))),
+ `my)),
+ `mx)>;
return #<join(lambda(`vx,`(key_equality(keytp,#<nth(`vx,0)>))),
lambda(`vy,`(key_equality(keytp,#<nth(`vy,0)>))),
lambda(`v,let(`mx,nth(`v,0),
let(`my,nth(`v,1),
if(call(exists,`mx),
- if(call(exists,`my),
- cmap(lambda(`vx,cmap(lambda(`vy,bag(tuple(nth(`vx,0),
- `(merge(n,tp,#<nth(`vx,1)>,#<nth(`vy,1)>))))),
- `my)),
- `mx),
- `mx),
+ if(call(exists,`my),`b,`mx),
`my)))),
`X, `Y)>;
};
@@ -862,7 +1005,7 @@ final public class Streaming extends AlgebraicOptimization {
for ( Tree monoid: monoids )
match monoid {
case `aggr(`mtp,`plus,`zero,`unit):
- if (aggr.equals(m.toString()))
+ if (#<`aggr>.equals(m))
return #<apply(`plus,tuple(`X,`Y))>;
};
};
@@ -899,7 +1042,7 @@ final public class Streaming extends AlgebraicOptimization {
for ( Tree monoid: monoids )
match monoid {
case `aggr(`mtp,`plus,`zero,`unit):
- if (aggr.equals(m.toString()))
+ if (#<`aggr>.equals(m))
return zero;
};
};
@@ -909,26 +1052,78 @@ final public class Streaming extends AlgebraicOptimization {
/** Convert joins to coGroups, plus other transformations */
private static Tree normalize_term ( Tree e ) {
match e {
+ case join(`kx,`ky,`r,cmap(`f,`x),cmap(`g,`y)):
+ if (!x.equals(y))
+ fail;
+ Tree v = new_var(), w = new_var(),
+ wx = new_var(), wy = new_var(),
+ vx = new_var(), vy = new_var();
+ Tree xtp = TypeInference.type_inference(((Node)kx).children.head);
+ Tree ytp = TypeInference.type_inference(((Node)ky).children.head);
+ Tree T = #<union(inL(`xtp),inR(`ytp))>;
+ Tree X = #<cmap(lambda(`w,call(plus,cmap(lambda(`wx,bag(tuple(apply(`kx,`wx),
+ typed(tagged_union(0,`wx),`T)))),
+ apply(`f,`w)),
+ cmap(lambda(`wy,bag(tuple(apply(`ky,`wy),
+ typed(tagged_union(1,`wy),`T)))),
+ apply(`g,`w)))),
+ `x)>;
+ Tree sx = #<cmap(lambda(`vx,if(call(eq,union_tag(`vx),0),bag(typed(union_value(`vx),`xtp)),bag())),
+ nth(`v,1))>;
+ Tree sy = #<cmap(lambda(`vy,if(call(eq,union_tag(`vy),1),bag(typed(union_value(`vy),`ytp)),bag())),
+ nth(`v,1))>;
+ return normalize_term(#<cmap(lambda(`v,apply(`r,tuple(`sx,`sy))),
+ groupBy(`X))>);
+ case join(`kx,`ky,`r,`x,cmap(`g,`y)):
+ if (!x.equals(y))
+ fail;
+ Tree v = new_var(), w = new_var(), wy = new_var(),
+ vx = new_var(), vy = new_var();
+ Tree xtp = TypeInference.type_inference(((Node)kx).children.head);
+ Tree ytp = TypeInference.type_inference(((Node)ky).children.head);
+ Tree T = #<union(inL(`xtp),inR(`ytp))>;
+ Tree X = #<cmap(lambda(`w,call(plus,bag(tuple(apply(`kx,`w),typed(tagged_union(0,`w),`T))),
+ cmap(lambda(`wy,bag(tuple(apply(`ky,`wy),
+ typed(tagged_union(1,`wy),`T)))),
+ apply(`g,`w)))),
+ `x)>;
+ Tree sx = #<cmap(lambda(`vx,if(call(eq,union_tag(`vx),0),bag(typed(union_value(`vx),`xtp)),bag())),
+ nth(`v,1))>;
+ Tree sy = #<cmap(lambda(`vy,if(call(eq,union_tag(`vy),1),bag(typed(union_value(`vy),`ytp)),bag())),
+ nth(`v,1))>;
+ return normalize_term(#<cmap(lambda(`v,apply(`r,tuple(`sx,`sy))),
+ groupBy(`X))>);
+ case join(`kx,`ky,`r,cmap(`f,`x),`y):
+ if (!x.equals(y))
+ fail;
+ Tree v = new_var(), w = new_var(), wx = new_var(),
+ vx = new_var(), vy = new_var();
+ Tree xtp = TypeInference.type_inference(((Node)kx).children.head);
+ Tree ytp = TypeInference.type_inference(((Node)ky).children.head);
+ Tree T = #<union(inL(`xtp),inR(`ytp))>;
+ Tree X = #<cmap(lambda(`w,call(plus,cmap(lambda(`wx,bag(tuple(apply(`kx,`wx),
+ typed(tagged_union(0,`wx),`T)))),
+ apply(`f,`w)),
+ bag(tuple(apply(`ky,`w),typed(tagged_union(1,`w),`T))))),
+ `x)>;
+ Tree sx = #<cmap(lambda(`vx,if(call(eq,union_tag(`vx),0),bag(typed(union_value(`vx),`xtp)),bag())),
+ nth(`v,1))>;
+ Tree sy = #<cmap(lambda(`vy,if(call(eq,union_tag(`vy),1),bag(typed(union_value(`vy),`ytp)),bag())),
+ nth(`v,1))>;
+ return normalize_term(#<cmap(lambda(`v,apply(`r,tuple(`sx,`sy))),
+ groupBy(`X))>);
case join(`kx,`ky,`r,`x,`y):
- match TypeInference.type_inference(x) {
- case `T(`xtp):
- match TypeInference.type_inference(x) {
- case `S(`ytp):
- Tree v = new_var();
- Tree vx = new_var();
- Tree vy = new_var();
- type_env.insert(vx.toString(),xtp);
- type_env.insert(vy.toString(),ytp);
- type_env.insert(v.toString(),#<tuple(none,tuple(`T(`xtp),`S(`ytp)))>);
- return normalize_term(#<cmap(lambda(`v,apply(`r,tuple(nth(nth(`v,1),0),
- nth(nth(`v,1),1)))),
- coGroup(cmap(lambda(`vx,bag(tuple(apply(`kx,`vx),`vx))),`x),
- cmap(lambda(`vy,bag(tuple(apply(`ky,`vy),`vy))),`y)))>);
- }
- };
- fail
+ Tree v = new_var();
+ Tree vx = new_var();
+ Tree vy = new_var();
+ Tree X = #<cmap(lambda(`vx,bag(tuple(apply(`kx,`vx),`vx))),`x)>;
+ Tree Y = #<cmap(lambda(`vy,bag(tuple(apply(`ky,`vy),`vy))),`y)>;
+ return normalize_term(#<cmap(lambda(`v,apply(`r,tuple(nth(nth(`v,1),0),
+ nth(nth(`v,1),1)))),
+ coGroup(`X,`Y))>);
case `gb(`u):
- if (! #[groupBy,orderBy].member(#<`gb>) ) fail;
+ if (! #[groupBy,orderBy].member(#<`gb>) )
+ fail;
match u {
case cmap(...):
return #<`gb(`(normalize_term(u)))>;
@@ -956,6 +1151,32 @@ final public class Streaming extends AlgebraicOptimization {
};
Tree nv = new_var();
return #<reduce(`m,cmap(lambda(`nv,bag(`nv)),`(normalize_term(u))))>;
+ case call(`f,`s):
+ for ( Tree monoid: monoids )
+ match monoid {
+ case `aggr(`mtp,`plus,`zero,`unit):
+ if (#<`aggr>.equals(f)) {
+ match s {
+ case cmap(...):
+ return #<reduce(`aggr,`(normalize_term(s)))>;
+ };
+ Tree nv = new_var();
+ return #<reduce(`aggr,cmap(lambda(`nv,bag(`nv)),`(normalize_term(s))))>;
+ }
+ };
+ fail
+ case `f(...as):
+ Trees bs = #[ ];
+ for ( Tree a: as )
+ bs = bs.append(normalize_term(a));
+ return #<`f(...bs)>;
+ };
+ return e;
+ }
+
+ /** Embed missing cmaps */
+ private static Tree embed_missing_cmaps ( Tree e ) {
+ match e {
case project(`x,`a):
match TypeInference.type_inference(x) {
case `T(`tp):
@@ -963,7 +1184,7 @@ final public class Streaming extends AlgebraicOptimization {
fail;
Tree v = new_var();
type_env.insert(v.toString(),tp);
- return normalize_term(#<cmap(lambda(`v,bag(project(`v,`a))),`x)>);
+ return embed_missing_cmaps(#<cmap(lambda(`v,bag(project(`v,`a))),`x)>);
};
fail
case nth(`x,`n):
@@ -973,32 +1194,30 @@ final public class Streaming extends AlgebraicOptimization {
fail;
Tree v = new_var();
type_env.insert(v.toString(),tp);
- return normalize_term(#<cmap(lambda(`v,bag(nth(`v,`n))),`x)>);
+ return embed_missing_cmaps(#<cmap(lambda(`v,bag(nth(`v,`n))),`x)>);
};
fail
- case call(`f,`s):
- for ( Tree monoid: monoids )
- match monoid {
- case `aggr(`mtp,`plus,`zero,`unit):
- if (aggr.equals(f.toString())) {
- match s {
- case cmap(...):
- return #<reduce(`aggr,`(normalize_term(s)))>;
- };
- Tree nv = new_var();
- return #<reduce(`aggr,cmap(lambda(`nv,bag(`nv)),`(normalize_term(s))))>;
- }
- };
- fail
case `f(...as):
Trees bs = #[ ];
for ( Tree a: as )
- bs = bs.append(normalize_term(a));
+ bs = bs.append(embed_missing_cmaps(a));
return #<`f(...bs)>;
};
return e;
}
+ private static Tree subst_pattern ( Tree pattern, Tree src, Tree dst ) {
+ match pattern {
+ case tuple(...as):
+ Tree res = dst;
+ int i = 0;
+ for ( Tree a: as )
+ res = subst_pattern(a,#<nth(`src,`(i++))>,res);
+ return res;
+ };
+ return subst_var(pattern,src,dst);
+ }
+
/** Simplify the term e using rewrite rules */
static Tree simplify_term ( Tree e ) {
match e {
@@ -1008,12 +1227,22 @@ final public class Streaming extends AlgebraicOptimization {
return simplify_term(subst_var(x,a,b));
case cmap(lambda(`x,`b),bag()):
return #<bag()>;
+ case cmap(`f,if(`p,`e1,`e2)):
+ return simplify_term(#<if(`p,cmap(`f,`e1),cmap(`f,`e2))>);
+ case cmap(`f,call(plus,`x,`y)):
+ return simplify_term(#<call(plus,cmap(`f,`x),cmap(`f,`y))>);
case groupBy(bag()):
return #<bag()>;
+ case cmap(`f,join(`kx,`ky,lambda(`v,`r),`X,`Y)):
+ return simplify_term(#<join(`kx,`ky,lambda(`v,cmap(`f,`r)),`X,`Y)>);
case apply(lambda(`v,`b),`u):
if (!v.is_variable())
fail;
return simplify_term(subst_var(v,u,b));
+ case apply(lambda(`pattern,`b),`u):
+ return simplify_term(subst_pattern(pattern,u,b));
+ case call(elem,bag(`u)):
+ return simplify_term(u);
case nth(tuple(...al),`n):
if (!n.is_long())
fail;
@@ -1074,7 +1303,7 @@ final public class Streaming extends AlgebraicOptimization {
rs = rs.cons(b);
return rs;
};
- if (repeat_var(e))
+ if (false && repeat_var(e))
return #[`e];
else return #[];
}
@@ -1094,6 +1323,69 @@ final public class Streaming extends AlgebraicOptimization {
return ne;
}
+ public static Tree unstreamify ( Tree e ) {
+ match e {
+ case call(stream,...r):
+ return #<call(source,...r)>;
+ case BinaryStream(...r):
+ return #<BinarySource(...r)>;
+ case ParsedStream(...r):
+ return #<ParsedSource(...r)>;
+ case `f(...al):
+ Trees bs = #[ ];
+ for ( Tree a: al )
+ bs = bs.append(unstreamify(a));
+ return #<`f(...bs)>;
+ };
+ return e;
+ }
+
+ /** Return the answer function of a query e
+ * @param e a query
+ * @param var the input of the answer function
+ * @param merge the monoid associated with the input
+ * @return the answer function
+ */
+ static Tree answer ( Tree e, Tree var, Tree merge ) {
+ match e {
+ case cmap(`f,call(stream,...s)):
+ return var;
+ case cmap(`f,`v):
+ if (!repeat_var(v))
+ fail;
+ return var;
+ case cmap(lambda(`v,`b),`u):
+ match merge {
+ case `gb(`m):
+ if (! #[groupBy,orderBy].member(#<`gb>) )
+ fail;
+ Tree nv = new_var();
+ return #<reduce(`gb(`m),
+ cmap(lambda(`nv,bag(tuple(nth(nth(`nv,0),0),nth(`nv,1)))),
+ `var))>;
+ }
+ };
+ return var;
+ }
+
+ /** Remove the state from the query output */
+ private static Tree strip_state ( Tree e, Tree var ) {
+ match e {
+ case reduce(`m,call(stream,...s)):
+ return var;
+ case reduce(`m,`v):
+ if (!repeat_var(v))
+ fail;
+ return var;
+ case reduce(`m,`u):
+ return var;
+ case cmap(`f,`u):
+ Tree nn = new_var();
+ return #<cmap(lambda(`nn,bag(nth(`nn,1))),`var)>;
+ };
+ return var;
+ }
+
/** Convert a stream-based query to an incremental stream-based program.
* It returns the quadruple (zero,merge,homomorphism,answer),
* where (zero,merge) is the monoid for the homomorphism
@@ -1103,16 +1395,15 @@ final public class Streaming extends AlgebraicOptimization {
* @return a quadruple that represents the incremental stream-based program
*/
static Tree generate_code ( Tree e, Environment env ) {
- if (!is_streaming(e)) {
- Tree x = new_var();
- Tree s = new_var();
- TypeInference.type_env.insert(x.toString(),#<tuple(tuple(),tuple())>);
- TypeInference.type_env.insert(s.toString(),#<tuple()>);
- return #<tuple(tuple(),lambda(`x,tuple()),tuple(),lambda(`s,`e))>;
- }
+ if (Config.trace) {
+ System.out.println("Subterm:");
+ System.out.println(e.pretty(0));
+ };
Tree ne = SimplifyTerm(normalize_term(e));
+ TypeInference.type_inference(ne);
+ ne = SimplifyTerm(embed_missing_cmaps(ne));
if (Config.trace) {
- System.out.println("After converting joins to coGroup and simplifying:");
+ System.out.println("Subterm normalized:");
System.out.println(ne.pretty(0));
};
Tree qe = SimplifyTerm(inject_q(ne));
@@ -1122,40 +1413,128 @@ final public class Streaming extends AlgebraicOptimization {
};
Tree nv = new_var();
match split(qe,nv,env) {
- case pair(`a,`q):
+ case pair(`a,`h):
if (Config.trace) {
System.out.println("After split:");
- System.out.println(a.pretty(0)+"\n"+q.pretty(0));
+ System.out.println(a.pretty(0)+"\n"+h.pretty(0));
+ };
+ h = SimplifyTerm(h);
+ if (Config.trace) {
System.out.println("After split (simplified):");
- System.out.println(SimplifyTerm(a).pretty(0)+"\n"+SimplifyTerm(q).pretty(0));
+ System.out.println(SimplifyTerm(a).pretty(0)+"\n"+h.pretty(0));
};
- Tree m = inference(q,env);
+ Tree m = get_monoid(h,env);
if (Config.trace)
- System.out.println("monoid: "+m.pretty(0));
- q = convert_to_algebra(SimplifyTerm(q));
+ System.out.println("Subterm monoid: "+m.pretty(0));
+ Tree q = convert_to_algebra(h);
Tree tp = TypeInference.type_inference(q);
if (Config.trace) {
- System.out.println("Query type: "+tp);
+ System.out.println("Subterm type: "+tp);
System.out.println("Merge function over X and Y: ");
System.out.println(SimplifyTerm(merge(m,tp,#<X>,#<Y>)).pretty(0));
};
type_env.insert(nv.toString(),tp);
Tree answer = answer(ne,nv,m);
answer = subst(nv,answer,a);
+ answer = strip_state(ne,answer);
answer = SimplifyTerm(convert_to_algebra(SimplifyTerm(answer)));
if (Config.trace) {
System.out.println("Answer function:");
System.out.println(answer.pretty(0));
};
Tree zero = zero(m);
- Tree mv = new_var();
- Tree merge = convert_to_algebra(SimplifyTerm(merge(m,tp,#<nth(`mv,0)>,#<nth(`mv,1)>)));
- Tree res = #<tuple(`zero,lambda(`mv,`merge),`q,lambda(`nv,`answer))>;
+ Tree my = new_var();
+ Tree merge = convert_to_algebra(SimplifyTerm(merge(m,tp,nv,my)));
+ Tree res = #<tuple(`zero,lambda(tuple(`nv,`my),`merge),`q,lambda(`nv,`answer))>;
+ if (Config.trace) {
+ System.out.println("Incremental subterm:");
+ System.out.println(res.pretty(0));
+ };
return res;
};
throw new Error("Cannot generate incremental code: "+e);
}
+ private static Tree generate_incremental_code ( Tree e, Environment env ) {
+ if (!is_streaming(e))
+ return #<tuple(tuple(),lambda(tuple(tuple(),tuple()),tuple()),tuple(),lambda(tuple(),`e))>;
+ match e {
+ case call(stream,...):
+ Tree v = new_var();
+ return generate_code(#<cmap(lambda(`v,bag(`v)),`e)>,env);
+ case call(`f,...qs):
+ boolean is_monoid = false;
+ for ( Tree monoid: monoids )
+ match monoid {
+ case `aggr(`mtp,`plus,`zero,`unit):
+ if (#<`aggr>.equals(f))
+ is_monoid = true;
+ };
+ if (is_monoid)
+ fail;
+ Trees zs = #[], vs = #[], ws = #[], ms = #[], hs = #[], as = #[];
+ int i = 0;
+ for ( Tree q: qs )
+ match generate_incremental_code(q,env) {
+ case tuple(`z,lambda(tuple(`v,`w),`m),`h,lambda(_,`a)):
+ zs = zs.append(z); hs = hs.append(h); as = as.append(a);
+ vs = vs.append(v); ws = ws.append(w); ms = ms.append(m);
+ i++;
+ };
+ return #<tuple(tuple(...zs),
+ lambda(tuple(tuple(...vs),tuple(...ws)),tuple(...ms)),
+ tuple(...hs),
+ lambda(tuple(...vs),call(`f,...as)))>;
+ case record(...rs):
+ Trees zs = #[], vs = #[], ws = #[], ms = #[], hs = #[], as = #[];
+ int i = 0;
+ for ( Tree r: rs )
+ match r {
+ case bind(`k,`q):
+ match generate_incremental_code(q,env) {
+ case tuple(`z,lambda(tuple(`v,`w),`m),`h,lambda(_,`a)):
+ zs = zs.append(z); hs = hs.append(h); as = as.append(a);
+ vs = vs.append(v); ws = ws.append(w); ms = ms.append(m);
+ i++;
+ }
+ };
+ return #<tuple(tuple(...zs),
+ lambda(tuple(tuple(...vs),tuple(...ws)),tuple(...ms)),
+ tuple(...hs),
+ lambda(tuple(...vs),record(...as)))>;
+ case tuple(...qs):
+ Trees zs = #[], vs = #[], ws = #[], ms = #[], hs = #[], as = #[];
+ int i = 0;
+ for ( Tree q: qs )
+ match generate_incremental_code(q,env) {
+ case tuple(`z,lambda(tuple(`v,`w),`m),`h,lambda(_,`a)):
+ zs = zs.append(z); hs = hs.append(h); as = as.append(a);
+ vs = vs.append(v); ws = ws.append(w); ms = ms.append(m);
+ i++;
+ };
+ return #<tuple(tuple(...zs),
+ lambda(tuple(tuple(...vs),tuple(...ws)),tuple(...ms)),
+ tuple(...hs),
+ lambda(tuple(...vs),tuple(...as)))>;
+ };
+ return generate_code(e,env);
+ }
+
+ /** bind the pattern variables to types */
+ private static void bind_pattern2type ( Tree pattern, Tree tp ) {
+ match pattern {
+ case tuple(...pl):
+ match tp {
+ case tuple(...tl):
+ int i = 0;
+ for ( Tree p: pl )
+ bind_pattern2type(p,tl.nth(i++));
+ return;
+ }
+ };
+ TypeInference.type_env.insert(pattern.toString(),tp);
+ }
+
/** Convert a stream-based query to an incremental stream-based program.
* @param e a stream-based query
* @return the incremental stream-based program
@@ -1163,7 +1542,7 @@ final public class Streaming extends AlgebraicOptimization {
public static Tree generate_incremental_code ( Tree e ) {
match e {
case repeat(lambda(`v,`u),`x,`n):
- if (!is_streaming(x))
+ if (false && !is_streaming(x))
fail;
if (!(n instanceof LongLeaf))
throw new Error("The repeat must have a constant repetition: "+n);
@@ -1174,68 +1553,40 @@ final public class Streaming extends AlgebraicOptimization {
u = #<cmap(lambda(`nv,bag(nth(`nv,0))),`u)>;
TypeInference.type_inference(u);
repeat_environment = new Environment(v.toString(),#<union>,repeat_environment);
- match generate_code(u,null) {
- case tuple(`z,`m,`h,lambda(`s,`a)):
- match generate_code(x,null) {
- case tuple(`xz,`xm,`xh,lambda(`xs,`xa)):
- Tree state = new_var();
- Tree w = new_var();
- TypeInference.type_env.insert(state.toString(),TypeInference.type_inference(s));
- TypeInference.type_env.insert(w.toString(),TypeInference.type_inference(s));
- Tree hh = subst(v,xa,h);
- h = subst(v,a,h);
- Tree q = #<repeat(lambda(`s,cmap(lambda(`w,bag(tuple(`w,true))),
- apply(`m,tuple(`state,`h)))),
- apply(`m,tuple(`state,`hh)),
- `(nn-1))>;
- Tree xq = #<apply(`xm,tuple(`xs,`xh))>;
- a = subst(s,state,a);
- TypeInference.type_inference(a);
- Tree res = #<tuple(tuple(`xz,`z),
- lambda(tuple(`xs,`state),tuple(`xq,`q)),
- lambda(tuple(`xs,`state),`a))>;
- res = SimplifyTerm(res);
- Tree tp = TypeInference.type_inference(res);
- return res;
- }
- }
- case repeat(lambda(`v,`u),`x,`n):
- // when x is invariant (doesn't contain a stream source)
- if (!(n instanceof LongLeaf))
- throw new Error("wrong repeat: "+n);
- int nn = (int)((LongLeaf)n).value();
- if (nn < 1)
- throw new Error("wrong repeat number: "+n);
- Tree nv = new_var();
- u = #<cmap(lambda(`nv,bag(nth(`nv,0))),`u)>;
- TypeInference.type_inference(u);
- repeat_environment = new Environment(v.toString(),#<union>,repeat_environment);
- match generate_code(u,null) {
- case tuple(`z,`m,`h,lambda(`s,`a)):
- Tree state = new_var();
+ match generate_incremental_code(u,null) {
+ case tuple(`z,lambda(tuple(`v1,`v2),`m),`h,lambda(`s,`a)):
+ Tree deltaT = new_var();
Tree w = new_var();
- TypeInference.type_env.insert(state.toString(),TypeInference.type_inference(s));
+ TypeInference.type_env.insert(deltaT.toString(),TypeInference.type_inference(s));
TypeInference.type_env.insert(w.toString(),TypeInference.type_inference(s));
- Tree hh = subst(v,x,h);
- h = subst(v,a,h);
- Tree q = #<repeat(lambda(`s,cmap(lambda(`w,bag(tuple(`w,true))),
- apply(`m,tuple(`state,`h)))),
- apply(`m,tuple(`state,`hh)),
+ Tree nm = subst(v1,deltaT,subst(v2,h,m));
+ Tree m0 = subst(v,x,h);
+ Tree q = #<repeat(lambda(`deltaT,cmap(lambda(`w,bag(tuple(`w,true))),`nm)),
+ `m0,
`(nn-1))>;
- a = subst(s,state,a);
- TypeInference.type_inference(a);
- Tree res = #<tuple(`z,
- lambda(`state,`q),
- lambda(`state,`a))>;
+ Tree res = #<incr(tuple(`z,`(unstreamify(x))),
+ lambda(tuple(`s,`v),tuple(apply(lambda(tuple(`v1,`v2),`m),tuple(`s,`q)),`a)),
+ lambda(tuple(`s,`v),`v))>;
res = SimplifyTerm(res);
- Tree tp = TypeInference.type_inference(res);
+ if (Config.trace) {
+ System.out.println("Incremental program:");
+ System.out.println(res.pretty(0));
+ };
+ TypeInference.type_inference(res);
return res;
}
};
- match generate_code(e,null) {
- case tuple(`z,`m,`h,lambda(`v,`a)):
- Tree q = SimplifyTerm(#<apply(`m,tuple(`v,`h))>);
- Tree res = #<tuple(`z,lambda(`v,`q),lambda(`v,`a))>;
+ match generate_incremental_code(e,null) {
+ case tuple(`z,lambda(tuple(`v1,`v2),`m),`h,lambda(_,`a)):
+ Tree tp = TypeInference.type_inference(h);
+ bind_pattern2type(v1,tp);
+ Tree nm = SimplifyTerm(#<apply(lambda(`v2,`m),`h)>);
+ a = SimplifyTerm(a);
+ Tree res = #<incr(`z,lambda(`v1,`nm),lambda(`v1,`a))>;
+ if (Config.trace) {
+ System.out.println("Incremental program:");
+ System.out.println(res.pretty(0));
+ };
TypeInference.type_inference(res);
return res;
};
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/a624ffc5/core/src/main/java/org/apache/mrql/SystemFunctions.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/SystemFunctions.java b/core/src/main/java/org/apache/mrql/SystemFunctions.java
index c19e344..62fdd50 100644
--- a/core/src/main/java/org/apache/mrql/SystemFunctions.java
+++ b/core/src/main/java/org/apache/mrql/SystemFunctions.java
@@ -219,6 +219,14 @@ final public class SystemFunctions {
return (s.contains(e)) ? true_value : false_value;
}
+ public static MRData elem ( Bag s ) {
+ s.materialize();
+ Iterator<MRData> it = s.iterator();
+ if (it.hasNext())
+ return it.next();
+ throw new Error("method elem was applied on an empty bag");
+ }
+
public static MR_long count ( Bag s ) {
if (s.materialized())
return new MR_long(s.size());
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/a624ffc5/core/src/main/java/org/apache/mrql/TopLevel.gen
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/TopLevel.gen b/core/src/main/java/org/apache/mrql/TopLevel.gen
index 7e4f38e..5838ed5 100644
--- a/core/src/main/java/org/apache/mrql/TopLevel.gen
+++ b/core/src/main/java/org/apache/mrql/TopLevel.gen
@@ -17,6 +17,7 @@
*/
package org.apache.mrql;
+import java.util.List;
import org.apache.mrql.gen.*;
@@ -265,6 +266,36 @@ final public class TopLevel extends Interpreter {
monoids = monoids.append(#<`name(`type,`plus,`zero,`unit)>);
}
+ /** bind the pattern variables to values */
+ private static Environment bind_list ( Tree pattern, MRData value, Environment env ) {
+ match pattern {
+ case tuple(...ps):
+ int i = 0;
+ Tuple t = (Tuple)value;
+ for ( Tree p: ps )
+ env = bind_list(p,t.get(i++),env);
+ return env;
+ };
+ return new Environment(pattern.toString(),value,env);
+ }
+
+ /** bind the pattern variables to values */
+ private static Environment bind_list ( Tree pattern, Tree src, Environment env ) {
+ match pattern {
+ case tuple(...ps):
+ int i = 0;
+ match src {
+ case tuple(...es):
+ for ( Tree p: ps )
+ env = bind_list(p,es.nth(i++),env);
+ return env;
+ }
+ };
+ MRData value = Evaluator.evaluator.evalE(src,env);
+ env.replace(pattern.toString(),value);
+ return env;
+ }
+
/** the MRQL top-level interfacse to evaluate a single MRQL expression or command */
public final static void evaluate_top_level ( Tree expr ) {
if (expr == null)
@@ -330,30 +361,31 @@ final public class TopLevel extends Interpreter {
return;
Evaluator.evaluator.initialize_query();
match plan {
- case tuple(`zero,lambda(tuple(`state1,`state2),`merge),lambda(_,`answer)):
- final Tree qt = make_persistent_type(query_type);
- Tuple z = (Tuple)evalE(zero,null);
- final Environment env =
- new Environment(state1.toString(),z.get(0),
- new Environment(state2.toString(),z.get(1),null));
- final Tree ans = answer;
- merge = Streaming.streamify(merge);
- Evaluator.evaluator.streaming(#<lambda(tuple(`state1,`state2),`merge)>,env,
- new Function(){
- public MRData eval ( final MRData state ) {
- MRData a = Evaluator.evaluator.evalE(ans,env);
- System.out.println(print(a,qt));
- return state;
- }
- });
- case tuple(`zero,lambda(`state,`merge),lambda(_,`answer)):
+ case incr(`zero,lambda(`state,`merge),lambda(_,`answer)):
final Tree qt = make_persistent_type(query_type);
MRData z = evalE(zero,null);
- final Environment env = new Environment(state.toString(),z,null);
+ final Environment env = bind_list(state,z,null);
+ final boolean incr_test = false;
+ if (incr_test) {
+ // for code testing only
+ long t = System.currentTimeMillis();
+ Environment nenv = bind_list(state,Streaming.unstreamify(merge),env);
+ for ( Environment ev = nenv; ev != null; ev = ev.next )
+ if (ev.value instanceof MR_dataset) {
+ DataSet ds = ((MR_dataset)ev.value).dataset();
+ List<MRData> vals = ds.take(Config.max_bag_size_print);
+ System.out.println(ev.name+" = "+vals.size()+ " "+vals);
+ };
+ MRData a = Evaluator.evaluator.evalE(answer,nenv);
+ System.out.println(print(a,qt));
+ if (!Config.quiet_execution)
+ System.out.println("Run time: "+(System.currentTimeMillis()-t)/1000.0+" secs");
+ return;
+ };
final Tree ans = answer;
merge = Streaming.streamify(merge);
Evaluator.evaluator.streaming(#<lambda(`state,`merge)>,env,
- new Function(){
+ new Function() {
public MRData eval ( final MRData state ) {
MRData a = Evaluator.evaluator.evalE(ans,env);
System.out.println(print(a,qt));
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/a624ffc5/core/src/main/java/org/apache/mrql/TypeInference.gen
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/TypeInference.gen b/core/src/main/java/org/apache/mrql/TypeInference.gen
index 7fe483d..7c80240 100644
--- a/core/src/main/java/org/apache/mrql/TypeInference.gen
+++ b/core/src/main/java/org/apache/mrql/TypeInference.gen
@@ -543,6 +543,9 @@ public class TypeInference extends Translator {
case let(`p,`u,`b):
bind_pattern_type(p,type_inference2(u));
return type_inference2(b);
+ case Let(`p,`u,`b):
+ bind_pattern_type(p,type_inference2(u));
+ return type_inference2(b);
case case(`u,...cs):
Tree tp = type_inference2(u);
Trees ts = #[];
@@ -667,10 +670,12 @@ public class TypeInference extends Translator {
return type_inference2(b);
case typed(null,`tp):
return tp;
- case typed(`f(...),`tp):
- if (f.equals("tagged_union") || f.equals("union_value"))
- return tp;
- else fail
+ case typed(tagged_union(`n,`u),`tp):
+ type_inference2(u);
+ return tp;
+ case typed(union_value(`u),`tp):
+ type_inference2(u);
+ return tp;
case typed(`x,`tp):
if (tp.is_variable() && !tp.equals(#<string>)
&& MRContainer.type_code(tp.toString()) >= 0) {
@@ -927,9 +932,9 @@ public class TypeInference extends Translator {
case `T(tuple(`kx,`a)):
if (!is_collection(T))
type_error(e,"Wrong coGroup left operand: "+print_query(x));
- match xtp {
+ match ytp {
case `S(tuple(`ky,`b)):
- if (!is_collection(T))
+ if (!is_collection(S))
type_error(e,"Wrong coGroup right operand: "+print_query(y));
if (unify(kx,ky) == null)
type_error(e,"incompatible keys in coGroup: "+print_query(e));
@@ -1206,6 +1211,8 @@ public class TypeInference extends Translator {
return rt;
case call(inv,`x):
return type_inference(x);
+ case incr(...s):
+ return type_inference(#<tuple(...s)>);
case call(plus,`x,`y):
Tree tx = type_inference2(x);
Tree ty = type_inference2(y);
@@ -1250,6 +1257,14 @@ public class TypeInference extends Translator {
coerce(tx,t1,((Node)e).children.tail);
return #<bool>;
};
+ case call(elem,`x):
+ Tree tp = type_inference2(x);
+ match tp {
+ case `T(`t):
+ if (is_collection(T))
+ return t;
+ };
+ type_error(e,"method elem must be applied to a collection: "+tp);
case call(`f,`x,`y):
if (! #[eq,neq,lt,leq,gt,geq].member(f))
fail;
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/a624ffc5/flink/src/main/java/org/apache/mrql/FlinkEvaluator.gen
----------------------------------------------------------------------
diff --git a/flink/src/main/java/org/apache/mrql/FlinkEvaluator.gen b/flink/src/main/java/org/apache/mrql/FlinkEvaluator.gen
index 1f2637f..eb5098a 100644
--- a/flink/src/main/java/org/apache/mrql/FlinkEvaluator.gen
+++ b/flink/src/main/java/org/apache/mrql/FlinkEvaluator.gen
@@ -854,6 +854,14 @@ public class FlinkEvaluator extends Evaluator implements Serializable {
IterativeDataSet<FData> startOfIteration = eval(s,env).iterate(max_num);
DataSet<FData> toBeFedBack = eval(b,new Environment(v.toString(),new MR_flink(startOfIteration),env));
return startOfIteration.closeWith(toBeFedBack);
+ case let(`v,DataSetCollect(`s),`body):
+ MRData x = evalE(s,env);
+ if (x instanceof MR_dataset)
+ x = new Bag(((MR_dataset)x).dataset().take(Integer.MAX_VALUE));
+ else if (x instanceof Bag)
+ ((Bag)x).materialize();
+ new_distributed_binding(v.toString(),x);
+ return eval(body,new Environment(v.toString(),x,env));
case let(`v,`u,`body):
return eval(body,new Environment(v.toString(),evalE(u,null),env));
case Let(`v,`u,`body):