You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mrql.apache.org by fe...@apache.org on 2013/06/19 00:21:32 UTC
git commit: Fixed matrix multiplication using GroupByJoin
Updated Branches:
refs/heads/master 0868e7110 -> 869eefd5b
Fixed matrix multiplication using GroupByJoin
Project: http://git-wip-us.apache.org/repos/asf/incubator-mrql/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mrql/commit/869eefd5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mrql/tree/869eefd5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mrql/diff/869eefd5
Branch: refs/heads/master
Commit: 869eefd5bd2d8b088d88903ea5979f8d05ea28e6
Parents: 0868e71
Author: fegaras <fe...@cse.uta.edu>
Authored: Tue Jun 18 17:20:52 2013 -0500
Committer: fegaras <fe...@cse.uta.edu>
Committed: Tue Jun 18 17:20:52 2013 -0500
----------------------------------------------------------------------
conf/mrql-env.sh | 4 +-
src/AlgebraicOptimization.gen | 3 -
src/BSP/BSPPlan.java | 16 ++--
src/BSPTranslator.gen | 164 +++++++++++++++++++-------------
src/Compiler.gen | 10 +-
src/Interpreter.gen | 17 ++--
src/MapReduceAlgebra.java | 12 +--
src/Normalization.gen | 2 +-
src/PlanGeneration.gen | 130 ++++++++++++++++---------
src/QueryPlan.gen | 20 +++-
src/Simplification.gen | 93 +++++++++---------
src/TopLevel.gen | 2 +
src/Tuple.java | 7 +-
src/TypeInference.gen | 6 +-
tests/queries/factozization_1.mrql | 38 ++++++++
tests/queries/xml_2.mrql | 3 +-
16 files changed, 328 insertions(+), 199 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/869eefd5/conf/mrql-env.sh
----------------------------------------------------------------------
diff --git a/conf/mrql-env.sh b/conf/mrql-env.sh
index 53bb0da..ca43f51 100644
--- a/conf/mrql-env.sh
+++ b/conf/mrql-env.sh
@@ -37,10 +37,10 @@ HAMA_HOME=${HOME}/hama-${HAMA_VERSION}
HAMA_JAR=${HAMA_HOME}/hama-core-${HAMA_VERSION}.jar
# The Hadoop configuration directory. Required.
-HADOOP_CONF=${MRQL_HOME}/conf/conf-hadoop
+HADOOP_CONF=${MRQL_HOME}/conf
# Optional: the Hama configuration directory
-HAMA_CONF=${MRQL_HOME}/conf/conf-hama
+HAMA_CONF=${MRQL_HOME}/conf-hama
# The CUP parser library.
# You may install it as a linux package or download it from http://www2.cs.tum.edu/projects/cup/
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/869eefd5/src/AlgebraicOptimization.gen
----------------------------------------------------------------------
diff --git a/src/AlgebraicOptimization.gen b/src/AlgebraicOptimization.gen
index 6111c6d..56a0bbf 100644
--- a/src/AlgebraicOptimization.gen
+++ b/src/AlgebraicOptimization.gen
@@ -92,9 +92,6 @@ public class AlgebraicOptimization extends Simplification {
res = simplify_all(rename(res));
TypeInference.type_inference(res);
return translate(res);
- case mapReduce2(`mx,`my,`r,cmap(lambda(`vx,`bx),`X),cmap(lambda(`vy,`by),`Y),`o):
- return translate(#<mapReduce2(lambda(`vx,cmap(`mx,`bx)),
- lambda(`vy,cmap(`my,`by)),`r,`X,`Y,`o)>);
case mapReduce2(`mx,`my,`r,cmap(lambda(`v,`b),`X),`Y,`o):
match X {
case groupBy(...): ;
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/869eefd5/src/BSP/BSPPlan.java
----------------------------------------------------------------------
diff --git a/src/BSP/BSPPlan.java b/src/BSP/BSPPlan.java
index 8168789..f9ec7e6 100644
--- a/src/BSP/BSPPlan.java
+++ b/src/BSP/BSPPlan.java
@@ -74,7 +74,9 @@ final public class BSPPlan extends Plan {
try {
// this case is only used for checking the exit condition of repeat/closure
boolean exit = mr_exit.get();
- if (!exit && all_peers.length > 1)
+ if (all_peers.length <= 1)
+ return (exit) ? SystemFunctions.bsp_true_value : SystemFunctions.bsp_false_value;
+ if (!exit)
// this peer is not ready to exit, so no peer should exit
for ( String p: this_peer.getAllPeerNames() )
this_peer.send(p,new MRContainer(more_supersteps));
@@ -204,7 +206,7 @@ final public class BSPPlan extends Plan {
@Override
public void bsp ( BSPPeer<MRContainer,MRContainer,MRContainer,MRContainer,MRContainer> peer )
throws IOException, SyncException, InterruptedException {
- final Tuple pair = new Tuple(2);
+ final Tuple triple = new Tuple(3);
Tuple result;
boolean skip = false;
String tabs = "";
@@ -221,14 +223,14 @@ final public class BSPPlan extends Plan {
System.err.println(tabs+" state ["+peer.getPeerName()+"]: "+state);
for ( int i = 0; i < local_cache.size(); i++)
if (local_cache.get(i) instanceof Bag && ((Bag)local_cache.get(i)).size() > 0)
- System.out.println(tabs+" cache "+i+": "+local_cache.get(i));
+ System.out.println(tabs+" cache ["+peer.getPeerName()+"] "+i+": "+local_cache.get(i));
};
- pair.set(0,msg_cache);
- pair.set(1,state);
+ triple.set(0,local_cache);
+ triple.set(1,msg_cache);
+ triple.set(2,state);
this_peer = peer;
- cache = local_cache;
// evaluate one superstep
- result = (Tuple)superstep_fnc.eval(pair);
+ result = (Tuple)superstep_fnc.eval(triple);
Bag msgs = (Bag)result.get(0);
exit = ((MR_bool)result.get(2)).get();
state = result.get(1);
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/869eefd5/src/BSPTranslator.gen
----------------------------------------------------------------------
diff --git a/src/BSPTranslator.gen b/src/BSPTranslator.gen
index ab0244f..0f363e1 100644
--- a/src/BSPTranslator.gen
+++ b/src/BSPTranslator.gen
@@ -35,7 +35,7 @@ public class BSPTranslator extends TypeInference {
final static int coerceM = ClassImporter.find_method_number("coerce",#[any,int]);
final static Trees planNames = plans_with_distributed_lambdas
- .append(#[ParsedSource,Generator,BinarySource,Repeat,Closure,repeat]);
+ .append(#[ParsedSource,Generator,BinarySource,GroupByJoin,Repeat,Closure,repeat]);
private static int source_num = 0;
@@ -103,6 +103,10 @@ public class BSPTranslator extends TypeInference {
return #<CrossAggregateProduct(`mx,`my,`r,`acc,`zero,
`(new_source(x)),
`(new_source(y)))>;
+ case GroupByJoin(`kx,`ky,`gx,`gy,`mr,`c,`r,`x,`y,`o):
+ return #<GroupByJoin(`kx,`ky,`gx,`gy,`mr,`c,`r,
+ `(new_source(x)),
+ `(new_source(y)),`o)>;
case Repeat(`f,`ds,`max):
return #<Repeat(`(preprocess(f)),`(preprocess(ds)),`max)>;
case repeat(`f,`ds,`max):
@@ -133,16 +137,31 @@ public class BSPTranslator extends TypeInference {
return -1;
}
- private static Tree subst_cache_num ( int n, int m, Tree e ) {
+ private static Tree subst_getCache_num ( int n, int m, Tree e ) {
match e {
- case setCache(`k,...r):
+ case getCache(`cache,`k):
if (!k.equals(#<`n>))
fail;
- return #<setCache(`m,...r)>;
+ return #<getCache(`cache,`m)>;
case `f(...as):
Trees bs = #[];
for (Tree a: as)
- bs = bs.append(subst_cache_num(n,m,a));
+ bs = bs.append(subst_getCache_num(n,m,a));
+ return #<`f(...bs)>;
+ };
+ return e;
+ }
+
+ private static Tree subst_setCache_num ( int n, int m, Tree e ) {
+ match e {
+ case setCache(`cache,`k,...r):
+ if (!k.equals(#<`n>))
+ fail;
+ return #<setCache(`cache,`m,...r)>;
+ case `f(...as):
+ Trees bs = #[];
+ for (Tree a: as)
+ bs = bs.append(subst_setCache_num(n,m,a));
return #<`f(...bs)>;
};
return e;
@@ -151,7 +170,7 @@ public class BSPTranslator extends TypeInference {
private static Tree set_cache_num ( Tree e, int n ) {
match e {
case BSP(`m,...r):
- return subst_cache_num((int)((LongLeaf)m).value(),n,#<BSP(`n,...r)>);
+ return subst_setCache_num((int)((LongLeaf)m).value(),n,#<BSP(`n,...r)>);
case BSPSource(_,`x):
return #<BSPSource(`n,`x)>;
};
@@ -181,13 +200,13 @@ public class BSPTranslator extends TypeInference {
private static Tree getCache ( Tree var, Tree e, Tree body ) {
Trees ns = source_nums(e);
if (ns.length() > 0)
- return subst(var,#<getCache(...ns)>,body);
+ return subst(var,#<getCache(cache,...ns)>,body);
return body;
}
private static Tree getCache ( Tree var, Trees s, Tree body ) {
Trees ns = source_nums(s);
- return subst(var,#<getCache(...ns)>,body);
+ return subst(var,#<getCache(cache,...ns)>,body);
}
/** optimize a BSP plan after BSP fusion */
@@ -197,11 +216,6 @@ public class BSPTranslator extends TypeInference {
if (is_collection(S))
return post_simplify(#<map(lambda(`x,`a),`b)>);
else fail
- case getCache(`a,...as):
- Tree z = #<getCache(`a)>;
- for ( Tree x: as )
- z = #<callM(union,`unionM,`z,getCache(`x))>;
- return z;
case `f(...as):
Trees bs = #[];
for (Tree a: as)
@@ -256,28 +270,28 @@ public class BSPTranslator extends TypeInference {
case Aggregate(`acc,`zero,`S):
return #<Aggregate(`acc,`zero,`(mr2bsp(S)))>;
case cMap(`m,`S):
- return mkBSP(#<lambda(tuple(ms,k),
- setCache(o_,cmap(`m,i_),
+ return mkBSP(#<lambda(tuple(cache,ms,k),
+ setCache(cache,o_,cmap(`m,i_),
tuple(BAG(),tuple(),TRUE())))>,
#<tuple()>,
#<false>,
mr2bsp(S));
case AggregateMap(`m,`acc,`zero,`S):
- return mkBSP(#<lambda(tuple(ms,k),
- setCache(o_,aggregate(`acc,`zero,cmap(`m,i_)),
+ return mkBSP(#<lambda(tuple(cache,ms,k),
+ setCache(cache,o_,aggregate(`acc,`zero,cmap(`m,i_)),
tuple(BAG(),tuple(),TRUE())))>,
#<tuple()>,
#<false>,
mr2bsp(S));
case MapReduce(`m,`r,`S,`o):
- return mkBSP(#<lambda(tuple(ms,map_step),
+ return mkBSP(#<lambda(tuple(cache,ms,map_step),
if(map_step,
tuple(cmap(lambda(tuple(k,c),
bag(tuple(k,tuple(k,c)))),
cmap(`m,i_)),
false,
FALSE()),
- setCache(o_,`(o.equals(#<true>) // need to sort the result?
+ setCache(cache,o_,`(o.equals(#<true>) // need to sort the result?
? #<cmap(lambda(tuple(k,s),
cmap(lambda(x,bag(tuple(x,k))),
apply(`r,tuple(k,s)))),
@@ -288,20 +302,20 @@ public class BSPTranslator extends TypeInference {
o,
mr2bsp(S));
case MapAggregateReduce(`m,`r,`acc,`zero,`S,_):
- return mkBSP(#<lambda(tuple(ms,map_step),
+ return mkBSP(#<lambda(tuple(cache,ms,map_step),
if(map_step,
tuple(cmap(lambda(tuple(k,c),
bag(tuple(k,tuple(k,c)))),
cmap(`m,i_)),
false,
FALSE()),
- setCache(o_,aggregate(`acc,`zero,cmap(`r,groupBy(ms))),
+ setCache(cache,o_,aggregate(`acc,`zero,cmap(`r,groupBy(ms))),
tuple(BAG(),false,TRUE()))))>,
#<true>,
#<false>,
mr2bsp(S));
case MapCombineReduce(`m,`c,`r,`S,`o):
- return mkBSP(#<lambda(tuple(ms,map_step),
+ return mkBSP(#<lambda(tuple(cache,ms,map_step),
if(map_step,
tuple(cmap(lambda(tuple(k,s),
cmap(lambda(x,bag(tuple(k,tuple(k,x)))),
@@ -309,7 +323,7 @@ public class BSPTranslator extends TypeInference {
groupBy(cmap(`m,i_))),
false,
FALSE()),
- setCache(o_,`(o.equals(#<true>) // need to sort the result?
+ setCache(cache,o_,`(o.equals(#<true>) // need to sort the result?
? #<cmap(lambda(tuple(k,s),
cmap(lambda(x,bag(tuple(x,k))),
apply(`r,tuple(k,s)))),
@@ -320,7 +334,7 @@ public class BSPTranslator extends TypeInference {
o,
mr2bsp(S));
case MapCombineReduce2(`mx,`my,`c,`r,`x,`y,`o):
- return mkBSP2(#<lambda(tuple(ms,map_step),
+ return mkBSP2(#<lambda(tuple(cache,ms,map_step),
if(map_step,
tuple(callM(union,`unionM,
cmap(lambda(tuple(kx,x),
@@ -331,7 +345,7 @@ public class BSPTranslator extends TypeInference {
cmap(`my,j_))),
false,
FALSE()),
- setCache(o_,cmap(lambda(tuple(k,s),
+ setCache(cache,o_,cmap(lambda(tuple(k,s),
cmap(lambda(tuple(kk,ss),
cmap(lambda(x,bag(tuple(kk,x))),
apply(`c,tuple(kk,ss)))),
@@ -353,7 +367,7 @@ public class BSPTranslator extends TypeInference {
mr2bsp(x),
mr2bsp(y));
case MapReduce2(`mx,`my,`r,`x,`y,`o):
- return mkBSP2(#<lambda(tuple(ms,map_step),
+ return mkBSP2(#<lambda(tuple(cache,ms,map_step),
if(map_step,
tuple(callM(union,`unionM,
cmap(lambda(tuple(kx,x),
@@ -364,7 +378,7 @@ public class BSPTranslator extends TypeInference {
cmap(`my,j_))),
false,
FALSE()),
- setCache(o_,cmap(lambda(tuple(k,s),
+ setCache(cache,o_,cmap(lambda(tuple(k,s),
cmap(lambda(x,bag(`(o.equals(#<true>) ? #<tuple(x,k)> : #<x>))),
apply(`r,
tuple(cmap(lambda(tuple(kx,x),
@@ -384,7 +398,7 @@ public class BSPTranslator extends TypeInference {
mr2bsp(x),
mr2bsp(y));
case MapAggregateReduce2(`mx,`my,`r,`acc,`zero,`x,`y,_):
- return mkBSP2(#<lambda(tuple(ms,map_step),
+ return mkBSP2(#<lambda(tuple(cache,ms,map_step),
if(map_step,
tuple(callM(union,`unionM,
cmap(lambda(tuple(kx,x),
@@ -395,7 +409,7 @@ public class BSPTranslator extends TypeInference {
cmap(`my,j_))),
false,
FALSE()),
- setCache(o_,aggregate(`acc,`zero,
+ setCache(cache,o_,aggregate(`acc,`zero,
cmap(lambda(tuple(k,s),
apply(`r,tuple(cmap(lambda(tuple(kx,x),
if(callM(eq,`eqM,kx,1),
@@ -435,14 +449,15 @@ public class BSPTranslator extends TypeInference {
type_inference(ykey);
xkey = PlanGeneration.makePlan(xkey);
ykey = PlanGeneration.makePlan(ykey);
- return mkBSP2(#<lambda(tuple(ms,map_step),
+ return mkBSP2(#<lambda(tuple(cache,ms,map_step),
if(map_step,
tuple(callM(union,`unionM,
cmap(lambda(`vx,`xkey),i_),
cmap(lambda(`vy,`ykey),j_)),
false,
FALSE()),
- setCache(o_,mergeGroupByJoin(`kx,`ky,lambda(`vx,`gx),lambda(`vy,`gy),`mr,`c,`r,
+ setCache(cache,o_,
+ mergeGroupByJoin(`kx,`ky,lambda(`vx,`gx),lambda(`vy,`gy),`mr,`c,`r,
cmap(lambda(tuple(kx,p,x),
if(callM(eq,`eqM,kx,1),bag(tuple(p,x)),bag())),ms),
cmap(lambda(tuple(ky,p,y),
@@ -453,9 +468,9 @@ public class BSPTranslator extends TypeInference {
mr2bsp(x),
mr2bsp(y));
case CrossProduct(`mx,`my,`r,`x,`y):
- return mkBSP(#<lambda(tuple(ms,ys),
+ return mkBSP(#<lambda(tuple(cache,ms,ys),
tuple(BAG(),
- setCache(o_,
+ setCache(cache,o_,
cmap(lambda(x,
cmap(lambda(y,apply(`r,tuple(x,y))),
cmap(`my,ys))),
@@ -466,9 +481,9 @@ public class BSPTranslator extends TypeInference {
#<false>,
mr2bsp(x));
case CrossAggregateProduct(`mx,`my,`r,`acc,`zero,`x,`y):
- return mkBSP(#<lambda(tuple(ms,ys),
+ return mkBSP(#<lambda(tuple(cache,ms,ys),
tuple(BAG(),
- setCache(o_,
+ setCache(cache,o_,
aggregate(`acc,`zero,
cmap(lambda(x,
cmap(lambda(y,apply(`r,tuple(x,y))),
@@ -498,15 +513,15 @@ public class BSPTranslator extends TypeInference {
else sources = sources.append(x);
case _: sources = sources.append(x);
};
- s = subst(#<getCache(`step_input)>,#<getCache(`step_cache_num)>,s);
+ s = subst_getCache_num((int)((LongLeaf)step_input).value(),step_cache_num,s);
return mkBSPL(step_cache_num,
- #<lambda(tuple(ms,tuple(k,steps)),
+ #<lambda(tuple(cache,ms,tuple(k,steps)),
let(tuple(ts,kk,step_end),
- apply(`s,tuple(ms,k)),
+ apply(`s,tuple(cache,ms,k)),
if(step_end, // end of repeat step
- setCache(`step_cache_num,
+ setCache(cache,`step_cache_num,
map(lambda(tuple(x,bb),x),
- getCache(`step_cache_num)),
+ getCache(cache,`step_cache_num)),
tuple(bag(),
tuple(`k0,callM(plus,`plusM,steps,1)),
if(callM(gt,`gtM,steps,`max),
@@ -515,7 +530,7 @@ public class BSPTranslator extends TypeInference {
aggregate(lambda(tuple(x,y),callM(or,`orM,x,y)),
false,
map(lambda(tuple(x,bb),bb),
- getCache(`step_cache_num))))))),
+ getCache(cache,`step_cache_num))))))),
tuple(ts,tuple(kk,steps),FALSE()))))>,
#<tuple(`k0,1)>,
#<false>,
@@ -526,7 +541,7 @@ public class BSPTranslator extends TypeInference {
case repeat(lambda(`v,`b),`ds,`max):
Tree step = bspSimplify(mr2bsp(b));
int step_cache_num = source_num(step);
- step = subst(v,#<map(lambda(tuple(x,b),x),getCache(`step_cache_num))>,step);
+ step = subst(v,#<map(lambda(tuple(x,b),x),getCache(cache,`step_cache_num))>,step);
match step {
case BSP(`n,`s,`k0,_,...as):
// the initial values of all data sources
@@ -534,14 +549,14 @@ public class BSPTranslator extends TypeInference {
for ( Tree x: as )
if (!x.equals(v))
sources = sources.append(x);
- Tree res = mkBSPL(#<lambda(tuple(ms,tuple(k,steps,firstp,S)),
- let(ignore,if(firstp,setCache(`step_cache_num,S,0),0),
+ Tree res = mkBSPL(#<lambda(tuple(cache,ms,tuple(k,steps,firstp,S)),
+ let(ignore,if(firstp,setCache(cache,`step_cache_num,S,0),0),
let(tuple(ts,kk,step_end),
- apply(`s,tuple(ms,k)),
+ apply(`s,tuple(cache,ms,k)),
if(step_end, // end of repeat step
- setCache(o_,map(lambda(tuple(x,b),x),getCache(`step_cache_num)),
- setCache(`step_cache_num,
- distribute(getCache(`step_cache_num)),
+ setCache(cache,o_,map(lambda(tuple(x,b),x),getCache(cache,`step_cache_num)),
+ setCache(cache,`step_cache_num,
+ distribute(getCache(cache,`step_cache_num)),
tuple(bag(),
tuple(`k0,callM(plus,`plusM,steps,1),false,bag()),
if(callM(gt,`gtM,steps,`max),
@@ -550,7 +565,7 @@ public class BSPTranslator extends TypeInference {
aggregate(lambda(tuple(x,y),callM(or,`orM,x,y)),
false,
map(lambda(tuple(x,bb),bb),
- getCache(`step_cache_num)))))))),
+ getCache(cache,`step_cache_num)))))))),
tuple(ts,tuple(kk,steps,false,bag()),FALSE())))))>,
#<tuple(`k0,1,true,map(lambda(x,tuple(x,false)),`ds))>,
#<false>,
@@ -577,13 +592,13 @@ public class BSPTranslator extends TypeInference {
else sources = sources.append(x);
case _: sources = sources.append(x);
};
- s = subst(#<getCache(`step_input)>,#<getCache(`step_cache_num)>,s);
+ s = subst_getCache_num((int)((LongLeaf)step_input).value(),step_cache_num,s);
return mkBSPL(step_cache_num,
- #<lambda(tuple(ms,tuple(k,steps,len)),
+ #<lambda(tuple(cache,ms,tuple(k,steps,len)),
let(tuple(ts,kk,step_end),
- apply(`s,tuple(ms,k)),
+ apply(`s,tuple(cache,ms,k)),
if(step_end, // end of repeat step
- let(newLen,callM(count,`countM,getCache(`step_cache_num)),
+ let(newLen,callM(count,`countM,getCache(cache,`step_cache_num)),
tuple(bag(),
tuple(`k0,callM(plus,`plusM,steps,1),newLen),
if(callM(gt,`gtM,steps,`max),
@@ -596,7 +611,6 @@ public class BSPTranslator extends TypeInference {
sources);
case `x: throw new Error("Cannot compile the closure function: "+x);
}
-
case Loop(lambda(tuple(...vs),tuple(...bs)),tuple(...ss),`max):
Tree[] steps = new Tree[vs.length()];
Tree[] inits = new Tree[vs.length()];
@@ -608,12 +622,12 @@ public class BSPTranslator extends TypeInference {
inits[i] = mr2bsp(ss.nth(i));
sources = sources.append(inits[i]);
cache_num[i] = source_num(inits[i]);
- all_cache = all_cache.append(#<getCache(`(cache_num[i]))>);
+ all_cache = all_cache.append(#<getCache(cache,`(cache_num[i]))>);
};
for ( int i = 0; i < vs.length(); i++ )
match bspSimplify(mr2bsp(bs.nth(i))) {
case BSP(`n,`s,`k0,_,...as):
- steps[i] = subst_cache_num((int)((LongLeaf)n).value(),cache_num[i],s);
+ steps[i] = subst_setCache_num((int)((LongLeaf)n).value(),cache_num[i],s);
k[i] = k0;
loop: for ( Tree x: as )
match x {
@@ -622,13 +636,14 @@ public class BSPTranslator extends TypeInference {
fail;
for ( int j = 0; j < vs.length(); j++ )
if (w.equals(vs.nth(j)))
- steps[i] = subst(#<getCache(`m)>,#<getCache(`(cache_num[j]))>,steps[i]);
+ steps[i] = subst_getCache_num((int)((LongLeaf)m).value(),cache_num[j],steps[i]);
case BSPSource(`n1,`d1):
for ( Tree y: sources )
match y {
case BSPSource(`n2,`d2):
if (d1.equals(d2)) {
- steps[i] = subst(#<getCache(`n1)>,#<getCache(`n2)>,steps[i]);
+ steps[i] = subst_getCache_num((int)((LongLeaf)n1).value(),
+ (int)((LongLeaf)n2).value(),steps[i]);
continue loop;
}
};
@@ -640,7 +655,7 @@ public class BSPTranslator extends TypeInference {
for ( int i = 0; i < vs.length(); i++ )
code = #<if(callM(eq,`eqM,i,`i),
let(tuple(ts,kk,step_end),
- apply(`(steps[i]),tuple(ms,k)),
+ apply(`(steps[i]),tuple(cache,ms,k)),
if(step_end, // end of repeat step
`((i+1 < vs.length())
? #<tuple(bag(),tuple(`(k[i+1]),`(i+1),steps),FALSE())>
@@ -652,7 +667,7 @@ public class BSPTranslator extends TypeInference {
tuple(ts,tuple(kk,i,steps),FALSE()))),
`code)>;
return mkBSPL(cache_num,
- #<lambda(tuple(ms,tuple(k,i,steps)),`code)>,
+ #<lambda(tuple(cache,ms,tuple(k,i,steps)),`code)>,
#<tuple(`(k[0]),0,2)>,
#<false>,
sources);
@@ -678,7 +693,8 @@ public class BSPTranslator extends TypeInference {
match y {
case BSPSource(`n2,`d2):
if (d1.equals(d2)) {
- s1 = subst(#<getCache(`n1)>,#<getCache(`n2)>,s1);
+ s1 = subst_getCache_num((int)((LongLeaf)n1).value(),
+ (int)((LongLeaf)n2).value(),s1);
continue loop;
}
};
@@ -686,16 +702,16 @@ public class BSPTranslator extends TypeInference {
case _: ns = ns.append(x);
};
return bspSimplify(mkBSPL((int)((LongLeaf)n).value(),
- #<lambda(tuple(ms,tuple(first,k)),
+ #<lambda(tuple(cache,ms,tuple(first,k)),
if(first,
- let(tuple(ts,kk,b),apply(`s1,tuple(ms,k)),
+ let(tuple(ts,kk,b),apply(`s1,tuple(cache,ms,k)),
let(exit,
synchronize(b), // poll all peers: do you want to exit?
// all peers must aggree to exit the inner BSP
// and proceed to the outer BSP
tuple(ts,tuple(callM(not,`notM,exit),
if(exit,`k2,kk)),FALSE()))),
- let(tuple(ts,kk,bb),apply(`s2,tuple(ms,k)),
+ let(tuple(ts,kk,bb),apply(`s2,tuple(cache,ms,k)),
tuple(ts,tuple(false,kk),bb))))>,
#<tuple(true,`k1)>,
o,
@@ -709,11 +725,29 @@ public class BSPTranslator extends TypeInference {
return e;
}
+ private static Tree post_simplify_plan ( Tree e ) {
+ match e {
+ case setCache(`cache,`a,`v,`ret):
+ return post_simplify_plan(#<setNth(`cache,`a,materialize(`v),`ret)>);
+ case getCache(`cache,`a,...as):
+ Tree z = #<nth(`cache,`a)>;
+ for ( Tree x: as )
+ z = #<callM(union,`unionM,`z,nth(`cache,`x))>;
+ return z;
+ case `f(...as):
+ Trees bs = #[];
+ for (Tree a: as)
+ bs = bs.append(post_simplify_plan(a));
+ return #<`f(...bs)>;
+ };
+ return e;
+ }
+
/** construct and simplify the BSP plan from a physical plan
* @param plan the physical plan
* @return the BSP plan
*/
public static Tree constructBSPplan ( Tree plan ) {
- return bspSimplify(mr2bsp(preprocess(plan)));
+ return post_simplify_plan(bspSimplify(mr2bsp(preprocess(plan))));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/869eefd5/src/Compiler.gen
----------------------------------------------------------------------
diff --git a/src/Compiler.gen b/src/Compiler.gen
index 65954f2..f584d3c 100644
--- a/src/Compiler.gen
+++ b/src/Compiler.gen
@@ -348,6 +348,8 @@ final public class Compiler extends Translator {
return "new Lambda("+compilef(v.toString(),body)+")";
case nth(`x,`n):
return "(((Tuple)("+compileE(x)+")).get("+((LongLeaf)n).value()+"))";
+ case setNth(`x,`n,`v,`ret):
+ return "(((Tuple)("+compileE(x)+")).set("+((LongLeaf)n).value()+","+compileE(v)+","+compileE(ret)+"))";
case materialize(`u):
return "MapReduceAlgebra.materialize("+compileE(u)+")";
case let(`v,`u,`body):
@@ -409,14 +411,6 @@ final public class Compiler extends Translator {
return "SystemFunctions.synchronize((MR_bool)"+compileE(b)+")";
case distribute(`s):
return "SystemFunctions.distribute((Bag)"+compileE(s)+")";
- case getCache(`n):
- String loc = "((MR_int)"+compileE(n)+").get()";
- return (Config.hadoop_mode) ? "Plan.getCache("+loc+")" : "MapReduceAlgebra.getCache("+loc+")";
- case setCache(`n,`v,`rv):
- String loc = "((MR_int)"+compileE(n)+").get()";
- return (Config.hadoop_mode)
- ? "Plan.setCache("+loc+","+compileE(v)+","+compileE(rv)+")"
- : "MapReduceAlgebra.setCache("+loc+","+compileE(v)+","+compileE(rv)+")";
case mapReduce(`mx,`my,`s,_):
return "MapReduceAlgebra.mapReduce("+compileF(mx)+","+compileF(my)+",(Bag)("+compileE(s)+"))";
case mapReduce2(`mx,`my,`r,`x,`y,_):
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/869eefd5/src/Interpreter.gen
----------------------------------------------------------------------
diff --git a/src/Interpreter.gen b/src/Interpreter.gen
index e254792..b43f403 100644
--- a/src/Interpreter.gen
+++ b/src/Interpreter.gen
@@ -203,6 +203,8 @@ public class Interpreter extends TypeInference {
return new Lambda(evalf(v.toString(),body,env));
case nth(`x,`n):
return ((Tuple)evalE(x,env)).get((int)n.longValue());
+ case setNth(`x,`n,`v,`ret):
+ return ((Tuple)evalE(x,env)).set((int)n.longValue(),evalE(v,env),evalE(ret,env));
case materialize(`u):
return MapReduceAlgebra.materialize(evalE(u,env));
case let(`v,`u,`body):
@@ -295,14 +297,6 @@ public class Interpreter extends TypeInference {
return Evaluator.synchronize((MR_bool)evalE(b,env));
case distribute(`s):
return Evaluator.distribute((Bag)evalE(s,env));
- case getCache(`n):
- int loc = ((MR_int)evalE(n,env)).get();
- return (Config.hadoop_mode) ? Plan.getCache(loc) : MapReduceAlgebra.getCache(loc);
- case setCache(`n,`v,`rv):
- int loc = ((MR_int)evalE(n,env)).get();
- return (Config.hadoop_mode)
- ? Plan.setCache(loc,evalE(v,env),evalE(rv,env))
- : MapReduceAlgebra.setCache(loc,evalE(v,env),evalE(rv,env));
case mapReduce(`m,`r,`s,_):
return MapReduceAlgebra.mapReduce(evalF(m,env),
evalF(r,env),
@@ -467,7 +461,8 @@ public class Interpreter extends TypeInference {
msg.printStackTrace();
throw new Error("Evaluation error in: "+print_query(e));
} catch (Exception ex) {
- System.err.println(ex.getMessage());
+ if (Config.trace)
+ System.err.println(ex.getMessage());
throw new Error("Evaluation error in: "+print_query(e));
}
}
@@ -743,6 +738,7 @@ public class Interpreter extends TypeInference {
ne = Simplification.rename(Translator.translate_select(ne));
if (Config.trace)
System.out.println("After removing select-queries:\n"+ne.pretty(0));
+ type_inference(ne);
ne = Simplification.simplify_all(ne);
if (Config.trace)
System.out.println("Algebra expression:\n"+ne.pretty(0));
@@ -757,7 +753,8 @@ public class Interpreter extends TypeInference {
if (Config.trace)
System.out.println("Physical plan type: "+print_type(et));
repeat_variables = #[];
- Tree plan = PlanGeneration.makePlan(Simplification.simplify_all(ne));
+ ne = Simplification.simplify_all(ne);
+ Tree plan = PlanGeneration.makePlan(ne);
if (Config.bsp_mode) {
BSPTranslator.reset();
if (Config.trace)
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/869eefd5/src/MapReduceAlgebra.java
----------------------------------------------------------------------
diff --git a/src/MapReduceAlgebra.java b/src/MapReduceAlgebra.java
index 676ecc5..f7529b5 100644
--- a/src/MapReduceAlgebra.java
+++ b/src/MapReduceAlgebra.java
@@ -724,12 +724,6 @@ final public class MapReduceAlgebra {
/** the cache that holds all local data in memory */
private static Tuple cache;
- private static void cleanCache () {
- cache = new Tuple(100);
- for ( int i = 0; i < 100; i++ )
- cache.set(i,new Bag());
- }
-
/** return the cache element at location loc */
public static MRData getCache ( int loc ) {
return cache.get(loc);
@@ -763,7 +757,9 @@ final public class MapReduceAlgebra {
boolean skip = false;
String tabs = "";
int step = 0;
- cleanCache();
+ cache = new Tuple(100);
+ for ( int i = 0; i < 100; i++ )
+ cache.set(i,new Bag());
for ( Bag x: inputs ) {
Tuple p = (Tuple)(x.get(0));
cache.set(((MR_int)p.first()).get(),
@@ -781,7 +777,7 @@ final public class MapReduceAlgebra {
if (cache.get(i) instanceof Bag && ((Bag)cache.get(i)).size() > 0)
System.out.println(tabs+" cache "+i+": "+cache.get(i));
};
- result = (Tuple)superstep.eval(new Tuple(msgs,state));
+ result = (Tuple)superstep.eval(new Tuple(cache,msgs,state));
Bag new_msgs = (Bag)result.get(0);
state = result.get(1);
exit = ((MR_bool)result.get(2)).get();
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/869eefd5/src/Normalization.gen
----------------------------------------------------------------------
diff --git a/src/Normalization.gen b/src/Normalization.gen
index ed243b7..4d3d85b 100644
--- a/src/Normalization.gen
+++ b/src/Normalization.gen
@@ -23,7 +23,7 @@ import Gen.*;
/** normalize algebraic expressions to more efficient forms using heuristic rules */
public class Normalization extends Translator {
- /** given that patter=e, find the bindings of the pattern variables */
+ /** given that pattern=e, find the bindings of the pattern variables */
static Trees bind_pattern ( Tree pattern, Tree e ) {
Trees args = #[];
match pattern {
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/869eefd5/src/PlanGeneration.gen
----------------------------------------------------------------------
diff --git a/src/PlanGeneration.gen b/src/PlanGeneration.gen
index daa4fb2..ca72b15 100644
--- a/src/PlanGeneration.gen
+++ b/src/PlanGeneration.gen
@@ -185,24 +185,41 @@ final public class PlanGeneration extends AlgebraicOptimization {
*/
public static Tree makePlan ( Tree e ) {
match e {
- // extract the mapReduce combiner
+ // combine groupBy with Join (experimental)
case mapReduce(lambda(`vm,`bm),lambda(`vr,`br),`s,`o):
if (!Config.use_combiner || !is_dataset_expr(s))
fail;
Tree splan = makePlan(s);
- Aggregates.clear();
- Tree nv = new_var();
- match TypeInference.type_inference(bm) {
- case `S(`tp):
- if (!is_collection(S))
- fail;
- type_env.insert(nv.toString(),tp);
- };
- Tree rd = Aggregates.derive_combiner(br,#<lambda(`vm,`bm)>,vm,vr);
- if (Aggregates.can_use_combiner) {
+ match splan {
+ case MapReduce2(lambda(`mvx,bag(tuple(`kx,`mx))),
+ lambda(`mvy,bag(tuple(`ky,`my))),
+ lambda(`v,cmap(lambda(`x,cmap(lambda(`y,bag(tuple(tuple(`gx,`gy),`b))),
+ nth(`vx,1))),
+ nth(`vy,0))),
+ `X,`Y,`o2):
+ if (!vx.equals(v) || !vy.equals(v) || !mx.equals(mvx) || !my.equals(mvy))
+ fail;
+ Tree gxx = gx;
+ Tree gyy = gy;
+ if (free_variables(gx,#[]).equals(#[`y]) && free_variables(gy,#[]).equals(#[`x])) {
+ gxx = gy;
+ gyy = gx;
+ } else if (!free_variables(gx,#[]).equals(#[`x]) || !free_variables(gy,#[]).equals(#[`y]))
+ fail;
+ Aggregates.clear();
+ Tree nv = new_var();
+ match TypeInference.type_inference(bm) {
+ case `S(`tp):
+ if (!is_collection(S))
+ fail;
+ type_env.insert(nv.toString(),tp);
+ };
+ Tree rd = Aggregates.derive_combiner(br,#<lambda(`vm,`bm)>,vm,vr);
+ if (Aggregates.reduces.is_empty())
+ fail;
Tree vr2 = new_var();
Tree m = simplify_all(#<lambda(`vm,cmap(lambda(`nv,bag(tuple(nth(`nv,0),
- tuple(...(Aggregates.maps))))),`bm))>);
+ tuple(...(Aggregates.maps))))),`bm))>);
Tree c = subst(vr,vr2,#<bag(tuple(...(Aggregates.combines)))>);
c = simplify_all(#<lambda(`vr2,`c)>);
Tree r = simplify_all(#<lambda(`vr,`rd)>);
@@ -215,38 +232,61 @@ final public class PlanGeneration extends AlgebraicOptimization {
TypeInference.type_inference(r);
Tree combiner = makePlan(c);
Tree reducer = makePlan(r);
- match splan {
- // combine groupBy with Join (experimental)
- case MapReduce2(lambda(`mvx,bag(tuple(`kx,`mx))),
- lambda(`mvy,bag(tuple(`ky,`my))),
- lambda(`v,cmap(lambda(`x,cmap(lambda(`y,bag(tuple(tuple(`gx,`gy),`b))),
- nth(`vx,1))),
- nth(`vy,0))),
- `X,`Y,`o2):
- if (!vx.equals(v) || !vy.equals(v) || !mx.equals(mvx) || !my.equals(mvy))
- fail;
- Tree nm = simplify_all(subst(x,#<nth(`vx,0)>,
- subst(y,#<nth(`vy,1)>,
- #<cmap(lambda(x,bag(nth(x,1))),
- cmap(`m,bag(tuple(tuple(`gx,`gy),`b))))>)));
- type_env.insert(v.toString(),TypeInference.type_inference(#<tuple(`mvx,`mvy)>));
- nm = makePlan(nm);
- return #<GroupByJoin(lambda(`mvx,`kx),
- lambda(`mvy,`ky),
- lambda(`x,`gx),
- lambda(`y,`gy),
- lambda(`v,`nm),
- `combiner,
- `reducer,
- `X,`Y,`o)>;
- // if the MapCombineReduce input is a join, push the combiner to the join
- case MapReduce2(`mx,`my,lambda(`rv,`rb),`x,`y,`o2):
- Tree nr = makePlan(simplify_all(#<lambda(`rv,cmap(`m,`rb))>));
- return #<MapReduce(lambda(`vm,bag(`vm)),`reducer,
- MapCombineReduce2(`mx,`my,`combiner,`nr,`x,`y,`o2),`o)>;
- case `input:
- return #<MapCombineReduce(`(makePlan(m)),`combiner,`reducer,`input,`o)>;
- }
+ Tree nm = simplify_all(subst(x,#<nth(`vx,0)>,
+ subst(y,#<nth(`vy,1)>,
+ #<cmap(lambda(x,bag(nth(x,1))),
+ cmap(`m,bag(tuple(tuple(`gx,`gy),`b))))>)));
+ type_env.insert(v.toString(),TypeInference.type_inference(#<tuple(`mvx,`mvy)>));
+ nm = makePlan(nm);
+ return #<GroupByJoin(lambda(`mvx,`kx),
+ lambda(`mvy,`ky),
+ lambda(`x,`gxx),
+ lambda(`y,`gyy),
+ lambda(`v,`nm),
+ `combiner,
+ `reducer,
+ `X,`Y,`o)>;
+ };
+ fail
+ // extract the mapReduce combiner
+ case mapReduce(lambda(`vm,`bm),lambda(`vr,`br),`s,`o):
+ if (!Config.use_combiner || !is_dataset_expr(s))
+ fail;
+ Tree splan = makePlan(s);
+ Aggregates.clear();
+ Tree nv = new_var();
+ match TypeInference.type_inference(bm) {
+ case `S(`tp):
+ if (!is_collection(S))
+ fail;
+ type_env.insert(nv.toString(),tp);
+ };
+ Tree rd = Aggregates.derive_combiner(br,#<lambda(`vm,`bm)>,vm,vr);
+ if (!Aggregates.can_use_combiner || Aggregates.reduces.is_empty())
+ fail;
+ Tree vr2 = new_var();
+ Tree m = simplify_all(#<lambda(`vm,cmap(lambda(`nv,bag(tuple(nth(`nv,0),
+ tuple(...(Aggregates.maps))))),`bm))>);
+ Tree c = subst(vr,vr2,#<bag(tuple(...(Aggregates.combines)))>);
+ c = simplify_all(#<lambda(`vr2,`c)>);
+ Tree r = simplify_all(#<lambda(`vr,`rd)>);
+ Tree mtp = TypeInference.type_inference(#<bag(tuple(...(Aggregates.maps)))>);
+ Tree rtp = #<tuple(`(TypeInference.type_inference(#<nth(`vr,0)>)),`mtp)>;
+ type_env.insert(vr.toString(),rtp);
+ type_env.insert(vr2.toString(),rtp);
+ TypeInference.type_inference(m);
+ TypeInference.type_inference(c);
+ TypeInference.type_inference(r);
+ Tree combiner = makePlan(c);
+ Tree reducer = makePlan(r);
+ match splan {
+ // if the MapCombineReduce input is a join, push the combiner to the join
+ case MapReduce2(`mx,`my,lambda(`rv,`rb),`x,`y,`o2):
+ Tree nr = makePlan(simplify_all(#<lambda(`rv,cmap(`m,`rb))>));
+ return #<MapReduce(lambda(`vm,bag(`vm)),`reducer,
+ MapCombineReduce2(`mx,`my,`combiner,`nr,`x,`y,`o2),`o)>;
+ case `input:
+ return #<MapCombineReduce(`(makePlan(m)),`combiner,`reducer,`input,`o)>;
};
fail
case mapReduce(`m,`r,`s,`o):
@@ -481,7 +521,7 @@ final public class PlanGeneration extends AlgebraicOptimization {
};
plus = makePlan(simplify_all(#<lambda(`np,apply(`plus,`np))>));
Tree acc = makePlan(simplify_all(#<lambda(`na,apply(`plus,tuple(nth(`na,0),
- apply(`unit,nth(`na,1)))))>));
+ apply(`unit,nth(`na,1)))))>));
zero = makePlan((f.equals(#<avg>)) ? zero : #<typed(`zero,`tp)>);
match plan {
case MapCombineReduce(`m,`c,`r,`s,_):
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/869eefd5/src/QueryPlan.gen
----------------------------------------------------------------------
diff --git a/src/QueryPlan.gen b/src/QueryPlan.gen
index 9969964..0c6fd8f 100644
--- a/src/QueryPlan.gen
+++ b/src/QueryPlan.gen
@@ -647,12 +647,30 @@ private final static class SingleQueryPlan {
`(plan[j]))>;
}
+ private static Tree top_pattern_variables ( Tree pat ) {
+ match pat {
+ case _(...ts):
+ Trees ps = #[];
+ for ( Tree t: ts )
+ match t {
+ case `f(...): ps = ps.append(#<`f>);
+ case _: ps = ps.append(t);
+ };
+ if (ps.length() > 1)
+ return #<tuple(...ps)>;
+ else return ps.head();
+ };
+ return pat;
+ }
+
static Tree make_unnest ( int i, int j ) {
Tree body = null;
if (Config.trace)
System.out.print("unnest "+pattern[i]+" -> "+pattern[j]);
if (!no_grouping && depth[i] < depth[j]) {
- body = subst_expr(pattern_head(pattern[j],false),plan[j],plan[i]);
+ // Changed 6/13/13: must rearrange binding variables in nested queries based on join order
+ //body = subst_expr(pattern_head(pattern[j],false),plan[j],plan[i]);
+ body = subst_header(pattern_head(pattern[j],false),top_pattern_variables(pattern[j]),plan[j],plan[i]);
// new pattern[i] is the old pattern[i]
} else {
body = join_body(pattern[j],pattern[i],predicate[i][j]);
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/869eefd5/src/Simplification.gen
----------------------------------------------------------------------
diff --git a/src/Simplification.gen b/src/Simplification.gen
index 3c56759..289dea9 100644
--- a/src/Simplification.gen
+++ b/src/Simplification.gen
@@ -53,25 +53,18 @@ public class Simplification extends Normalization {
return e.equals(v);
}
- private static Tree get_accessor ( Tree v, Tree d, Tree e ) {
- match d {
- case nth(`u,`n):
- int nn = (int)((LongLeaf)n).value();
- Trees bl = #[];
- for ( int i = 0; i < 2; i++ )
- bl = bl.append(i == nn ? get_accessor(#<nth(`v,`i)>,u,e) : #<nth(`v,`i)>);
- return #<tuple(...bl)>;
- case project(`u,`a): return v;
- };
- return e;
- }
-
private static Trees factor_out_aggr ( Tree e, Tree v ) {
match e {
case call(`g,cmap(`f,`d)):
if (!simple_accessor(v,d) || !free_variables(f,#[]).is_empty())
fail;
- return #[bind(`e,`d)];
+ for ( Tree monoid: monoids )
+ match monoid {
+ case `aggr(...):
+ if (aggr.equals(g.toString()))
+ return #[bind(`e,`d)];
+ };
+ fail
case `f(...al):
Trees bl = #[];
for ( Tree a: al )
@@ -97,7 +90,7 @@ public class Simplification extends Normalization {
for ( Tree b: bl )
match b {
case bind(`uu,`dd):
- if (d.equals(dd))
+ if (d.equals(dd) && !u.equals(uu))
if (alpha_equivalent(u,uu))
exists = true;
};
@@ -158,43 +151,53 @@ public class Simplification extends Normalization {
case cmap(`f,if(`p,`x,`y)):
return simplify(#<if(`p,cmap(`f,`x),cmap(`f,`y))>);
// if the join reducer contains an independent aggregation push it to the input
- // NOT USED YET
- case xxxjoin(`kx,`ky,lambda(`v,cmap(lambda(`v1,cmap(lambda(`v2,bag(tuple(`k,`b))),
- nth(`vy,1))),
- nth(`vx,0))),
+ case join(`kx,`ky,
+ lambda(`v,cmap(lambda(`v1,cmap(lambda(`v2,bag(tuple(`k,`b))),
+ nth(`vy,1))),
+ nth(`vx,0))),
`X,`Y):
if (!vx.equals(v) || !vy.equals(v))
fail;
Trees l1 = factor_out_aggregations(b,v1);
- Trees dl = #[];
- if (!l1.is_empty())
- for ( Tree a: l1 )
- match a {
- case bind(`u,`d):
- Tree nv = new_var();
- b = subst(u,d,b);
- if (!dl.member(d))
- X = #<cmap(lambda(`nv,bag(`(subst(v1,nv,get_accessor(v1,d,u))))),`X)>;
- dl = dl.append(d);
- };
Trees l2 = factor_out_aggregations(b,v2);
- dl = #[];
- if (!l2.is_empty())
- for ( Tree a: l2 )
- match a {
- case bind(`u,`d):
- Tree nv = new_var();
- b = subst(u,d,b);
- if (!dl.member(d))
- Y = #<cmap(lambda(`nv,bag(`(subst(v2,nv,get_accessor(v2,d,u))))),`Y)>;
- dl = dl.append(d);
- };
if (l1.is_empty() && l2.is_empty())
fail;
- return simplify(#<join(`kx,`ky,lambda(`v,cmap(lambda(`v1,cmap(lambda(`v2,bag(tuple(`k,`b))),
- nth(`vy,1))),
- nth(`vx,0))),
- `X,`Y)>);
+ Tree px = v1;
+ Trees dl = #[];
+ for ( Tree a: l1 )
+ match a {
+ case bind(`u,`d):
+ Tree nv = new_var();
+ b = subst(u,nv,b);
+ if (true || !dl.member(d)) {
+ Tree vv = new_var();
+ X = #<cmap(lambda(`px,bag(tuple(`px,`u))),`X)>;
+ px = #<tuple(`px,`nv)>;
+ }
+ dl = dl.append(d);
+ };
+ Tree py = v2;
+ dl = #[];
+ for ( Tree a: l2 )
+ match a {
+ case bind(`u,`d):
+ Tree nv = new_var();
+ b = subst(u,nv,b);
+ if (true || !dl.member(d)) {
+ Tree vv = new_var();
+ Y = #<cmap(lambda(`py,bag(tuple(`py,`u))),`Y)>;
+ py = #<tuple(`py,`nv)>;
+ };
+ dl = dl.append(d);
+ };
+ Tree res = #<join(lambda(`px,apply(`kx,`v1)),
+ lambda(`py,apply(`ky,`v2)),
+ lambda(`v,cmap(lambda(`px,cmap(lambda(`py,bag(tuple(`k,`b))),
+ nth(`vy,1))),
+ nth(`vx,0))),
+ `X,`Y)>;
+ res = rename(res);
+ return simplify(res);
// if the reducer of a join generates pairs (k,v), where k is functional dependent
// on a join key, then the outer groupBy just groups the v values
case groupBy(join(lambda(`vx,`bx),`ky,
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/869eefd5/src/TopLevel.gen
----------------------------------------------------------------------
diff --git a/src/TopLevel.gen b/src/TopLevel.gen
index 4c0e5a4..86543c3 100644
--- a/src/TopLevel.gen
+++ b/src/TopLevel.gen
@@ -23,6 +23,7 @@ import java.io.PrintStream;
/** Provides the API for compilation/code-generation */
final public class TopLevel extends Interpreter {
+ static Tree xml_type;
public TopLevel () {
// XML and JSON are user-defined types:
@@ -36,6 +37,7 @@ final public class TopLevel extends Interpreter {
Jbool(bool),
Jnull(tuple()))>);
constant(#<PI>,#<double>,new MR_double(Math.PI));
+ xml_type = global_datatype_env.lookup("XML");
DataSource.loadParsers();
}
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/869eefd5/src/Tuple.java
----------------------------------------------------------------------
diff --git a/src/Tuple.java b/src/Tuple.java
index 1ba7e63..d2ca6d2 100644
--- a/src/Tuple.java
+++ b/src/Tuple.java
@@ -55,9 +55,14 @@ final public class Tuple implements MRData {
/** the second element of the tuple */
public MRData second () { return tuple[1]; }
+ /** replace the i'th element of a tuple with new data and return a new value */
+ public MRData set ( int i, MRData data, MRData ret ) {
+ tuple[i] = data;
+ return ret;
+ }
+
/** replace the i'th element of a tuple with new data */
public Tuple set ( int i, MRData data ) {
- assert(i >= 0 && i < tuple.length);
tuple[i] = data;
return this;
}
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/869eefd5/src/TypeInference.gen
----------------------------------------------------------------------
diff --git a/src/TypeInference.gen b/src/TypeInference.gen
index d780df2..6aa4258 100644
--- a/src/TypeInference.gen
+++ b/src/TypeInference.gen
@@ -654,10 +654,12 @@ public class TypeInference extends Translator {
match ta {
case XML:
return #<list(XML)>;
- case `S(XML):
- if (is_collection(S))
+ case `S(`tp):
+ if (is_collection(S) && (tp.equals(#<XML>) || tp.equals(TopLevel.xml_type)))
return #<`S(XML)>;
};
+ if (ta.equals(TopLevel.xml_type))
+ return #<list(XML)>;
match expand(ta) {
case record(...ts):
for ( Tree t: ts )
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/869eefd5/tests/queries/factozization_1.mrql
----------------------------------------------------------------------
diff --git a/tests/queries/factozization_1.mrql b/tests/queries/factozization_1.mrql
new file mode 100644
index 0000000..13b86d2
--- /dev/null
+++ b/tests/queries/factozization_1.mrql
@@ -0,0 +1,38 @@
+Mmatrix = source(line,"tests/data/Xmatrix.txt",",",type( (double,long,long) ));
+Hmatrix = source(line,"tests/data/Ymatrix.txt",",",type( (double,long,long) ));
+Wmatrix = source(line,"tests/data/Ymatrix.txt",",",type( (double,long,long) ));
+
+
+macro transpose ( X ) {
+ select (x,j,i)
+ from (x,i,j) in X
+};
+
+// matrix multiplication:
+macro multiply ( X, Y ) {
+ select (sum(z),i,j)
+ from (x,i,k) in X, (y,k,j) in Y, z = x*y
+ group by (i,j)
+};
+
+// cell-wise multiplication:
+macro Cmult ( X, Y ) {
+ select ( x*y, i, j )
+ from (x,i,j) in X, (y,i,j) in Y
+};
+
+// cell-wise division:
+macro Cdiv ( X, Y ) {
+ select ( x/y, i, j )
+ from (x,i,j) in X, (y,i,j) in Y
+};
+
+// Gaussian non-negative matrix factorization (from SystemML paper)
+macro factorize ( V, Hinit, Winit ) {
+ repeat (H,W) = (Hinit,Winit)
+ step ( Cmult(H,Cdiv(multiply(transpose(W),V),multiply(transpose(W),multiply(W,H)))),
+ Cmult(W,Cdiv(multiply(V,transpose(H)),multiply(W,multiply(H,transpose(H))))) )
+ limit 2
+};
+
+//factorize(Mmatrix,Hmatrix,Wmatrix);
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/869eefd5/tests/queries/xml_2.mrql
----------------------------------------------------------------------
diff --git a/tests/queries/xml_2.mrql b/tests/queries/xml_2.mrql
index bdd73b6..08c60c4 100644
--- a/tests/queries/xml_2.mrql
+++ b/tests/queries/xml_2.mrql
@@ -1 +1,2 @@
-source(xml,"tests/data/cs.xml",{"gradstudent"},xpath(name[firstname="Leonidas"]));
+select n.lastname
+from n in source(xml,"tests/data/cs.xml",{"gradstudent"},xpath(name[firstname="Leonidas"]));