You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mrql.apache.org by fe...@apache.org on 2013/06/19 00:21:32 UTC

git commit: Fixed matrix multiplication using GroupByJoin

Updated Branches:
  refs/heads/master 0868e7110 -> 869eefd5b


Fixed matrix multiplication using GroupByJoin


Project: http://git-wip-us.apache.org/repos/asf/incubator-mrql/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mrql/commit/869eefd5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mrql/tree/869eefd5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mrql/diff/869eefd5

Branch: refs/heads/master
Commit: 869eefd5bd2d8b088d88903ea5979f8d05ea28e6
Parents: 0868e71
Author: fegaras <fe...@cse.uta.edu>
Authored: Tue Jun 18 17:20:52 2013 -0500
Committer: fegaras <fe...@cse.uta.edu>
Committed: Tue Jun 18 17:20:52 2013 -0500

----------------------------------------------------------------------
 conf/mrql-env.sh                   |   4 +-
 src/AlgebraicOptimization.gen      |   3 -
 src/BSP/BSPPlan.java               |  16 ++--
 src/BSPTranslator.gen              | 164 +++++++++++++++++++-------------
 src/Compiler.gen                   |  10 +-
 src/Interpreter.gen                |  17 ++--
 src/MapReduceAlgebra.java          |  12 +--
 src/Normalization.gen              |   2 +-
 src/PlanGeneration.gen             | 130 ++++++++++++++++---------
 src/QueryPlan.gen                  |  20 +++-
 src/Simplification.gen             |  93 +++++++++---------
 src/TopLevel.gen                   |   2 +
 src/Tuple.java                     |   7 +-
 src/TypeInference.gen              |   6 +-
 tests/queries/factozization_1.mrql |  38 ++++++++
 tests/queries/xml_2.mrql           |   3 +-
 16 files changed, 328 insertions(+), 199 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/869eefd5/conf/mrql-env.sh
----------------------------------------------------------------------
diff --git a/conf/mrql-env.sh b/conf/mrql-env.sh
index 53bb0da..ca43f51 100644
--- a/conf/mrql-env.sh
+++ b/conf/mrql-env.sh
@@ -37,10 +37,10 @@ HAMA_HOME=${HOME}/hama-${HAMA_VERSION}
 HAMA_JAR=${HAMA_HOME}/hama-core-${HAMA_VERSION}.jar
 
 # The Hadoop configuration directory.  Required.
-HADOOP_CONF=${MRQL_HOME}/conf/conf-hadoop
+HADOOP_CONF=${MRQL_HOME}/conf
 
 # Optional: the Hama configuration directory
-HAMA_CONF=${MRQL_HOME}/conf/conf-hama
+HAMA_CONF=${MRQL_HOME}/conf-hama
 
 # The CUP parser library.
 # You may install it as a linux package or download it from http://www2.cs.tum.edu/projects/cup/

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/869eefd5/src/AlgebraicOptimization.gen
----------------------------------------------------------------------
diff --git a/src/AlgebraicOptimization.gen b/src/AlgebraicOptimization.gen
index 6111c6d..56a0bbf 100644
--- a/src/AlgebraicOptimization.gen
+++ b/src/AlgebraicOptimization.gen
@@ -92,9 +92,6 @@ public class AlgebraicOptimization extends Simplification {
 	    res = simplify_all(rename(res));
 	    TypeInference.type_inference(res);
 	    return translate(res);
-	case mapReduce2(`mx,`my,`r,cmap(lambda(`vx,`bx),`X),cmap(lambda(`vy,`by),`Y),`o):
-	    return translate(#<mapReduce2(lambda(`vx,cmap(`mx,`bx)),
-					  lambda(`vy,cmap(`my,`by)),`r,`X,`Y,`o)>);
 	case mapReduce2(`mx,`my,`r,cmap(lambda(`v,`b),`X),`Y,`o):
 	    match X {
 	    case groupBy(...): ;

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/869eefd5/src/BSP/BSPPlan.java
----------------------------------------------------------------------
diff --git a/src/BSP/BSPPlan.java b/src/BSP/BSPPlan.java
index 8168789..f9ec7e6 100644
--- a/src/BSP/BSPPlan.java
+++ b/src/BSP/BSPPlan.java
@@ -74,7 +74,9 @@ final public class BSPPlan extends Plan {
 	    try {
 		// this case is only used for checking the exit condition of repeat/closure
 		boolean exit = mr_exit.get();
-		if (!exit && all_peers.length > 1)
+		if (all_peers.length <= 1)
+		    return (exit) ? SystemFunctions.bsp_true_value : SystemFunctions.bsp_false_value;
+		if (!exit)
 		    // this peer is not ready to exit, so no peer should exit
 		    for ( String p: this_peer.getAllPeerNames() )
 			this_peer.send(p,new MRContainer(more_supersteps));
@@ -204,7 +206,7 @@ final public class BSPPlan extends Plan {
 	@Override
 	public void bsp ( BSPPeer<MRContainer,MRContainer,MRContainer,MRContainer,MRContainer> peer )
 	       throws IOException, SyncException, InterruptedException {
-	    final Tuple pair = new Tuple(2);
+	    final Tuple triple = new Tuple(3);
 	    Tuple result;
 	    boolean skip = false;
 	    String tabs = "";
@@ -221,14 +223,14 @@ final public class BSPPlan extends Plan {
 		    System.err.println(tabs+"      state ["+peer.getPeerName()+"]: "+state);
 		    for ( int i = 0; i < local_cache.size(); i++)
 			if (local_cache.get(i) instanceof Bag && ((Bag)local_cache.get(i)).size() > 0)
-			    System.out.println(tabs+"      cache "+i+": "+local_cache.get(i));
+			    System.out.println(tabs+"      cache ["+peer.getPeerName()+"] "+i+": "+local_cache.get(i));
 		};
-		pair.set(0,msg_cache);
-		pair.set(1,state);
+		triple.set(0,local_cache);
+		triple.set(1,msg_cache);
+		triple.set(2,state);
 		this_peer = peer;
-		cache = local_cache;
 		// evaluate one superstep
-		result = (Tuple)superstep_fnc.eval(pair);
+		result = (Tuple)superstep_fnc.eval(triple);
 		Bag msgs = (Bag)result.get(0);
 		exit = ((MR_bool)result.get(2)).get();
 		state = result.get(1);

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/869eefd5/src/BSPTranslator.gen
----------------------------------------------------------------------
diff --git a/src/BSPTranslator.gen b/src/BSPTranslator.gen
index ab0244f..0f363e1 100644
--- a/src/BSPTranslator.gen
+++ b/src/BSPTranslator.gen
@@ -35,7 +35,7 @@ public class BSPTranslator extends TypeInference {
     final static int coerceM = ClassImporter.find_method_number("coerce",#[any,int]);
 
     final static Trees planNames = plans_with_distributed_lambdas
-	            .append(#[ParsedSource,Generator,BinarySource,Repeat,Closure,repeat]);
+	             .append(#[ParsedSource,Generator,BinarySource,GroupByJoin,Repeat,Closure,repeat]);
 
     private static int source_num = 0;
 
@@ -103,6 +103,10 @@ public class BSPTranslator extends TypeInference {
 	    return #<CrossAggregateProduct(`mx,`my,`r,`acc,`zero,
 					   `(new_source(x)),
 					   `(new_source(y)))>;
+	case GroupByJoin(`kx,`ky,`gx,`gy,`mr,`c,`r,`x,`y,`o):
+	    return #<GroupByJoin(`kx,`ky,`gx,`gy,`mr,`c,`r,
+				 `(new_source(x)),
+				 `(new_source(y)),`o)>;
 	case Repeat(`f,`ds,`max):
 	    return #<Repeat(`(preprocess(f)),`(preprocess(ds)),`max)>;
 	case repeat(`f,`ds,`max):
@@ -133,16 +137,31 @@ public class BSPTranslator extends TypeInference {
 	return -1;
     }
 
-    private static Tree subst_cache_num ( int n, int m, Tree e ) {
+    private static Tree subst_getCache_num ( int n, int m, Tree e ) {
 	match e {
-	case setCache(`k,...r):
+	case getCache(`cache,`k):
 	    if (!k.equals(#<`n>))
 		fail;
-	    return #<setCache(`m,...r)>;
+	    return #<getCache(`cache,`m)>;
 	case `f(...as):
 	    Trees bs = #[];
 	    for (Tree a: as)
-		bs = bs.append(subst_cache_num(n,m,a));
+		bs = bs.append(subst_getCache_num(n,m,a));
+	    return #<`f(...bs)>;
+	};
+	return e;
+    }
+
+    private static Tree subst_setCache_num ( int n, int m, Tree e ) {
+	match e {
+	case setCache(`cache,`k,...r):
+	    if (!k.equals(#<`n>))
+		fail;
+	    return #<setCache(`cache,`m,...r)>;
+	case `f(...as):
+	    Trees bs = #[];
+	    for (Tree a: as)
+		bs = bs.append(subst_setCache_num(n,m,a));
 	    return #<`f(...bs)>;
 	};
 	return e;
@@ -151,7 +170,7 @@ public class BSPTranslator extends TypeInference {
     private static Tree set_cache_num ( Tree e, int n ) {
 	match e {
 	case BSP(`m,...r):
-	    return subst_cache_num((int)((LongLeaf)m).value(),n,#<BSP(`n,...r)>);
+	    return subst_setCache_num((int)((LongLeaf)m).value(),n,#<BSP(`n,...r)>);
 	case BSPSource(_,`x):
 	    return #<BSPSource(`n,`x)>;
 	};
@@ -181,13 +200,13 @@ public class BSPTranslator extends TypeInference {
     private static Tree getCache ( Tree var, Tree e, Tree body ) {
 	Trees ns = source_nums(e);
 	if (ns.length() > 0)
-	    return subst(var,#<getCache(...ns)>,body);
+	    return subst(var,#<getCache(cache,...ns)>,body);
 	return body;
     }
 
     private static Tree getCache ( Tree var, Trees s, Tree body ) {
 	Trees ns = source_nums(s);
-	return subst(var,#<getCache(...ns)>,body);
+	return subst(var,#<getCache(cache,...ns)>,body);
     }
 
     /** optimize a BSP plan after BSP fusion */
@@ -197,11 +216,6 @@ public class BSPTranslator extends TypeInference {
 	    if (is_collection(S))
 		return post_simplify(#<map(lambda(`x,`a),`b)>);
 	    else fail
-	case getCache(`a,...as):
-	    Tree z = #<getCache(`a)>;
-	    for ( Tree x: as )
-		z = #<callM(union,`unionM,`z,getCache(`x))>;
-	    return z;
 	case `f(...as):
 	    Trees bs = #[];
 	    for (Tree a: as)
@@ -256,28 +270,28 @@ public class BSPTranslator extends TypeInference {
 	case Aggregate(`acc,`zero,`S):
 	    return #<Aggregate(`acc,`zero,`(mr2bsp(S)))>;
 	case cMap(`m,`S):
-	    return mkBSP(#<lambda(tuple(ms,k),
-				  setCache(o_,cmap(`m,i_),
+	    return mkBSP(#<lambda(tuple(cache,ms,k),
+				  setCache(cache,o_,cmap(`m,i_),
 					   tuple(BAG(),tuple(),TRUE())))>,
 			 #<tuple()>,
 			 #<false>,
 			 mr2bsp(S));
 	case AggregateMap(`m,`acc,`zero,`S):
-	    return mkBSP(#<lambda(tuple(ms,k),
-				  setCache(o_,aggregate(`acc,`zero,cmap(`m,i_)),
+	    return mkBSP(#<lambda(tuple(cache,ms,k),
+				  setCache(cache,o_,aggregate(`acc,`zero,cmap(`m,i_)),
 					   tuple(BAG(),tuple(),TRUE())))>,
 			 #<tuple()>,
 			 #<false>,
 			 mr2bsp(S));
 	case MapReduce(`m,`r,`S,`o):
-	    return mkBSP(#<lambda(tuple(ms,map_step),
+	    return mkBSP(#<lambda(tuple(cache,ms,map_step),
 				  if(map_step,
 				     tuple(cmap(lambda(tuple(k,c),
                                                        bag(tuple(k,tuple(k,c)))),
 						cmap(`m,i_)),
 					   false,
 					   FALSE()),
-				     setCache(o_,`(o.equals(#<true>)   // need to sort the result?
+				     setCache(cache,o_,`(o.equals(#<true>)   // need to sort the result?
 						   ? #<cmap(lambda(tuple(k,s),
 								   cmap(lambda(x,bag(tuple(x,k))),
 									apply(`r,tuple(k,s)))),
@@ -288,20 +302,20 @@ public class BSPTranslator extends TypeInference {
 			 o,
 			 mr2bsp(S));
         case MapAggregateReduce(`m,`r,`acc,`zero,`S,_):
-            return mkBSP(#<lambda(tuple(ms,map_step),
+            return mkBSP(#<lambda(tuple(cache,ms,map_step),
                                   if(map_step,
                                      tuple(cmap(lambda(tuple(k,c),
                                                        bag(tuple(k,tuple(k,c)))),
 						cmap(`m,i_)),
 					   false,
 					   FALSE()),
-				     setCache(o_,aggregate(`acc,`zero,cmap(`r,groupBy(ms))),
+				     setCache(cache,o_,aggregate(`acc,`zero,cmap(`r,groupBy(ms))),
 					      tuple(BAG(),false,TRUE()))))>,
 			 #<true>,
                          #<false>,
                          mr2bsp(S));
 	case MapCombineReduce(`m,`c,`r,`S,`o):
-	    return mkBSP(#<lambda(tuple(ms,map_step),
+	    return mkBSP(#<lambda(tuple(cache,ms,map_step),
 				  if(map_step,
 				     tuple(cmap(lambda(tuple(k,s),
 						       cmap(lambda(x,bag(tuple(k,tuple(k,x)))),
@@ -309,7 +323,7 @@ public class BSPTranslator extends TypeInference {
 						groupBy(cmap(`m,i_))),
 					   false,
 					   FALSE()),
-				     setCache(o_,`(o.equals(#<true>)   // need to sort the result?
+				     setCache(cache,o_,`(o.equals(#<true>)   // need to sort the result?
 						   ? #<cmap(lambda(tuple(k,s),
 								   cmap(lambda(x,bag(tuple(x,k))),
 									apply(`r,tuple(k,s)))),
@@ -320,7 +334,7 @@ public class BSPTranslator extends TypeInference {
 			 o,
 			 mr2bsp(S));
 	case MapCombineReduce2(`mx,`my,`c,`r,`x,`y,`o):
-	    return mkBSP2(#<lambda(tuple(ms,map_step),
+	    return mkBSP2(#<lambda(tuple(cache,ms,map_step),
 				   if(map_step,
 				      tuple(callM(union,`unionM,
 						  cmap(lambda(tuple(kx,x),
@@ -331,7 +345,7 @@ public class BSPTranslator extends TypeInference {
 						       cmap(`my,j_))),
 					    false,
 					    FALSE()),
-				      setCache(o_,cmap(lambda(tuple(k,s),
+				      setCache(cache,o_,cmap(lambda(tuple(k,s),
 							      cmap(lambda(tuple(kk,ss),
 									  cmap(lambda(x,bag(tuple(kk,x))),
 									       apply(`c,tuple(kk,ss)))),
@@ -353,7 +367,7 @@ public class BSPTranslator extends TypeInference {
 			  mr2bsp(x),
 			  mr2bsp(y));
 	case MapReduce2(`mx,`my,`r,`x,`y,`o):
-	    return mkBSP2(#<lambda(tuple(ms,map_step),
+	    return mkBSP2(#<lambda(tuple(cache,ms,map_step),
 				   if(map_step,
 				      tuple(callM(union,`unionM,
 						  cmap(lambda(tuple(kx,x),
@@ -364,7 +378,7 @@ public class BSPTranslator extends TypeInference {
 						       cmap(`my,j_))),
 					    false,
 					    FALSE()),
-				      setCache(o_,cmap(lambda(tuple(k,s),
+				      setCache(cache,o_,cmap(lambda(tuple(k,s),
 							      cmap(lambda(x,bag(`(o.equals(#<true>) ? #<tuple(x,k)> : #<x>))),
 								   apply(`r,
 									 tuple(cmap(lambda(tuple(kx,x),
@@ -384,7 +398,7 @@ public class BSPTranslator extends TypeInference {
 			  mr2bsp(x),
 			  mr2bsp(y));
 	case MapAggregateReduce2(`mx,`my,`r,`acc,`zero,`x,`y,_):
-	    return mkBSP2(#<lambda(tuple(ms,map_step),
+	    return mkBSP2(#<lambda(tuple(cache,ms,map_step),
 				   if(map_step,
 				      tuple(callM(union,`unionM,
 						  cmap(lambda(tuple(kx,x),
@@ -395,7 +409,7 @@ public class BSPTranslator extends TypeInference {
 						       cmap(`my,j_))),
 					    false,
 					    FALSE()),
-				      setCache(o_,aggregate(`acc,`zero,
+				      setCache(cache,o_,aggregate(`acc,`zero,
 						      cmap(lambda(tuple(k,s),
 							      apply(`r,tuple(cmap(lambda(tuple(kx,x),
 											if(callM(eq,`eqM,kx,1),
@@ -435,14 +449,15 @@ public class BSPTranslator extends TypeInference {
 	    type_inference(ykey);
 	    xkey = PlanGeneration.makePlan(xkey);
 	    ykey = PlanGeneration.makePlan(ykey);
-	    return mkBSP2(#<lambda(tuple(ms,map_step),
+	    return mkBSP2(#<lambda(tuple(cache,ms,map_step),
 				   if(map_step,
 				      tuple(callM(union,`unionM,
 						  cmap(lambda(`vx,`xkey),i_),
 						  cmap(lambda(`vy,`ykey),j_)),
 					    false,
 					    FALSE()),
-				      setCache(o_,mergeGroupByJoin(`kx,`ky,lambda(`vx,`gx),lambda(`vy,`gy),`mr,`c,`r,
+				      setCache(cache,o_,
+					       mergeGroupByJoin(`kx,`ky,lambda(`vx,`gx),lambda(`vy,`gy),`mr,`c,`r,
 						      cmap(lambda(tuple(kx,p,x),
 								  if(callM(eq,`eqM,kx,1),bag(tuple(p,x)),bag())),ms),
 						      cmap(lambda(tuple(ky,p,y),
@@ -453,9 +468,9 @@ public class BSPTranslator extends TypeInference {
 			  mr2bsp(x),
 			  mr2bsp(y));
 	case CrossProduct(`mx,`my,`r,`x,`y):
-	    return mkBSP(#<lambda(tuple(ms,ys),
+	    return mkBSP(#<lambda(tuple(cache,ms,ys),
 				  tuple(BAG(),
-					setCache(o_,
+					setCache(cache,o_,
 						 cmap(lambda(x,
 							     cmap(lambda(y,apply(`r,tuple(x,y))),
 								  cmap(`my,ys))),
@@ -466,9 +481,9 @@ public class BSPTranslator extends TypeInference {
 			 #<false>,
 			 mr2bsp(x));
 	case CrossAggregateProduct(`mx,`my,`r,`acc,`zero,`x,`y):
-	    return mkBSP(#<lambda(tuple(ms,ys),
+	    return mkBSP(#<lambda(tuple(cache,ms,ys),
 				  tuple(BAG(),
-					setCache(o_,
+					setCache(cache,o_,
 						 aggregate(`acc,`zero,
 							   cmap(lambda(x,
 								       cmap(lambda(y,apply(`r,tuple(x,y))),
@@ -498,15 +513,15 @@ public class BSPTranslator extends TypeInference {
 			else sources = sources.append(x);
 		    case _: sources = sources.append(x);
 		    };
-		s = subst(#<getCache(`step_input)>,#<getCache(`step_cache_num)>,s);
+		s = subst_getCache_num((int)((LongLeaf)step_input).value(),step_cache_num,s);
 		return mkBSPL(step_cache_num,
-			      #<lambda(tuple(ms,tuple(k,steps)),
+			      #<lambda(tuple(cache,ms,tuple(k,steps)),
 				       let(tuple(ts,kk,step_end),
-					   apply(`s,tuple(ms,k)),
+					   apply(`s,tuple(cache,ms,k)),
 					   if(step_end,   // end of repeat step
-					      setCache(`step_cache_num,
+					      setCache(cache,`step_cache_num,
 						       map(lambda(tuple(x,bb),x),
-							   getCache(`step_cache_num)),
+							   getCache(cache,`step_cache_num)),
 						      tuple(bag(),
 							    tuple(`k0,callM(plus,`plusM,steps,1)),
 							    if(callM(gt,`gtM,steps,`max),
@@ -515,7 +530,7 @@ public class BSPTranslator extends TypeInference {
 								     aggregate(lambda(tuple(x,y),callM(or,`orM,x,y)),
 									       false,
 									       map(lambda(tuple(x,bb),bb),
-										   getCache(`step_cache_num))))))),
+										   getCache(cache,`step_cache_num))))))),
 					      tuple(ts,tuple(kk,steps),FALSE()))))>,
 			      #<tuple(`k0,1)>,
 			      #<false>,
@@ -526,7 +541,7 @@ public class BSPTranslator extends TypeInference {
 	case repeat(lambda(`v,`b),`ds,`max):
 	    Tree step = bspSimplify(mr2bsp(b));
 	    int step_cache_num = source_num(step);
-	    step = subst(v,#<map(lambda(tuple(x,b),x),getCache(`step_cache_num))>,step);
+	    step = subst(v,#<map(lambda(tuple(x,b),x),getCache(cache,`step_cache_num))>,step);
 	    match step {
 	    case BSP(`n,`s,`k0,_,...as):
 		// the initial values of all data sources
@@ -534,14 +549,14 @@ public class BSPTranslator extends TypeInference {
 		for ( Tree x: as )
 		    if (!x.equals(v))
 			sources = sources.append(x);
-		Tree res = mkBSPL(#<lambda(tuple(ms,tuple(k,steps,firstp,S)),
-				      let(ignore,if(firstp,setCache(`step_cache_num,S,0),0),
+		Tree res = mkBSPL(#<lambda(tuple(cache,ms,tuple(k,steps,firstp,S)),
+				      let(ignore,if(firstp,setCache(cache,`step_cache_num,S,0),0),
 					  let(tuple(ts,kk,step_end),
-					      apply(`s,tuple(ms,k)),
+					      apply(`s,tuple(cache,ms,k)),
 					      if(step_end,   // end of repeat step
-						 setCache(o_,map(lambda(tuple(x,b),x),getCache(`step_cache_num)),
-						    setCache(`step_cache_num,
-						       distribute(getCache(`step_cache_num)),
+						 setCache(cache,o_,map(lambda(tuple(x,b),x),getCache(cache,`step_cache_num)),
+						    setCache(cache,`step_cache_num,
+						       distribute(getCache(cache,`step_cache_num)),
 							     tuple(bag(),
 								   tuple(`k0,callM(plus,`plusM,steps,1),false,bag()),
 								   if(callM(gt,`gtM,steps,`max),
@@ -550,7 +565,7 @@ public class BSPTranslator extends TypeInference {
 									    aggregate(lambda(tuple(x,y),callM(or,`orM,x,y)),
 										      false,
 										      map(lambda(tuple(x,bb),bb),
-											  getCache(`step_cache_num)))))))),
+											  getCache(cache,`step_cache_num)))))))),
 						 tuple(ts,tuple(kk,steps,false,bag()),FALSE())))))>,
 				  #<tuple(`k0,1,true,map(lambda(x,tuple(x,false)),`ds))>,
 				  #<false>,
@@ -577,13 +592,13 @@ public class BSPTranslator extends TypeInference {
 			else sources = sources.append(x);
 		    case _: sources = sources.append(x);
 		    };
-		s = subst(#<getCache(`step_input)>,#<getCache(`step_cache_num)>,s);
+		s = subst_getCache_num((int)((LongLeaf)step_input).value(),step_cache_num,s);
 		return mkBSPL(step_cache_num,
-			      #<lambda(tuple(ms,tuple(k,steps,len)),
+			      #<lambda(tuple(cache,ms,tuple(k,steps,len)),
 				       let(tuple(ts,kk,step_end),
-					   apply(`s,tuple(ms,k)),
+					   apply(`s,tuple(cache,ms,k)),
 					   if(step_end,   // end of repeat step
-					      let(newLen,callM(count,`countM,getCache(`step_cache_num)),
+					      let(newLen,callM(count,`countM,getCache(cache,`step_cache_num)),
 						  tuple(bag(),
 							tuple(`k0,callM(plus,`plusM,steps,1),newLen),
 							if(callM(gt,`gtM,steps,`max),
@@ -596,7 +611,6 @@ public class BSPTranslator extends TypeInference {
 			      sources);
 	    case `x: throw new Error("Cannot compile the closure function: "+x);
 	    }
-
 	case Loop(lambda(tuple(...vs),tuple(...bs)),tuple(...ss),`max):
 	    Tree[] steps = new Tree[vs.length()];
 	    Tree[] inits = new Tree[vs.length()];
@@ -608,12 +622,12 @@ public class BSPTranslator extends TypeInference {
 		inits[i] = mr2bsp(ss.nth(i));
 		sources = sources.append(inits[i]);
 		cache_num[i] = source_num(inits[i]);
-		all_cache = all_cache.append(#<getCache(`(cache_num[i]))>);
+		all_cache = all_cache.append(#<getCache(cache,`(cache_num[i]))>);
 	    };
 	    for ( int i = 0; i < vs.length(); i++ )
 		match bspSimplify(mr2bsp(bs.nth(i))) {
 		case BSP(`n,`s,`k0,_,...as):
-		    steps[i] = subst_cache_num((int)((LongLeaf)n).value(),cache_num[i],s);
+		    steps[i] = subst_setCache_num((int)((LongLeaf)n).value(),cache_num[i],s);
 		    k[i] = k0;
 		    loop: for ( Tree x: as )
 			match x {
@@ -622,13 +636,14 @@ public class BSPTranslator extends TypeInference {
 				fail;
 			    for ( int j = 0; j < vs.length(); j++ )
 				if (w.equals(vs.nth(j)))
-				    steps[i] = subst(#<getCache(`m)>,#<getCache(`(cache_num[j]))>,steps[i]);
+				    steps[i] = subst_getCache_num((int)((LongLeaf)m).value(),cache_num[j],steps[i]);
 			case BSPSource(`n1,`d1):
 			    for ( Tree y: sources )
 				match y {
 				case BSPSource(`n2,`d2):
 				    if (d1.equals(d2)) {
-					steps[i] = subst(#<getCache(`n1)>,#<getCache(`n2)>,steps[i]);
+					steps[i] = subst_getCache_num((int)((LongLeaf)n1).value(),
+								      (int)((LongLeaf)n2).value(),steps[i]);
 					continue loop;
 				    }
 				};
@@ -640,7 +655,7 @@ public class BSPTranslator extends TypeInference {
 	    for ( int i = 0; i < vs.length(); i++ )
 		code = #<if(callM(eq,`eqM,i,`i),
 			    let(tuple(ts,kk,step_end),
-				apply(`(steps[i]),tuple(ms,k)),
+				apply(`(steps[i]),tuple(cache,ms,k)),
 				if(step_end,   // end of repeat step
 				   `((i+1 < vs.length())
 				     ? #<tuple(bag(),tuple(`(k[i+1]),`(i+1),steps),FALSE())>
@@ -652,7 +667,7 @@ public class BSPTranslator extends TypeInference {
 				   tuple(ts,tuple(kk,i,steps),FALSE()))),
 			    `code)>;
 	    return mkBSPL(cache_num,
-			  #<lambda(tuple(ms,tuple(k,i,steps)),`code)>,
+			  #<lambda(tuple(cache,ms,tuple(k,i,steps)),`code)>,
 			  #<tuple(`(k[0]),0,2)>,
 			  #<false>,
 			  sources);
@@ -678,7 +693,8 @@ public class BSPTranslator extends TypeInference {
 			match y {
 			case BSPSource(`n2,`d2):
 			    if (d1.equals(d2)) {
-				s1 = subst(#<getCache(`n1)>,#<getCache(`n2)>,s1);
+				s1 = subst_getCache_num((int)((LongLeaf)n1).value(),
+							(int)((LongLeaf)n2).value(),s1);
 				continue loop;
 			    }
 			};
@@ -686,16 +702,16 @@ public class BSPTranslator extends TypeInference {
 		case _: ns = ns.append(x);
 		};
 	    return bspSimplify(mkBSPL((int)((LongLeaf)n).value(),
-				      #<lambda(tuple(ms,tuple(first,k)),
+				      #<lambda(tuple(cache,ms,tuple(first,k)),
 					       if(first,
-						  let(tuple(ts,kk,b),apply(`s1,tuple(ms,k)),
+						  let(tuple(ts,kk,b),apply(`s1,tuple(cache,ms,k)),
                                                       let(exit,
 							  synchronize(b),   // poll all peers: do you want to exit?
 							  // all peers must aggree to exit the inner BSP
 							  //    and proceed to the outer BSP
 							  tuple(ts,tuple(callM(not,`notM,exit),
 									 if(exit,`k2,kk)),FALSE()))),
-						  let(tuple(ts,kk,bb),apply(`s2,tuple(ms,k)),
+						  let(tuple(ts,kk,bb),apply(`s2,tuple(cache,ms,k)),
 						      tuple(ts,tuple(false,kk),bb))))>,
 				      #<tuple(true,`k1)>,
 				      o,
@@ -709,11 +725,29 @@ public class BSPTranslator extends TypeInference {
 	return e;
     }
 
+    private static Tree post_simplify_plan ( Tree e ) {
+	match e {
+        case setCache(`cache,`a,`v,`ret):
+	    return post_simplify_plan(#<setNth(`cache,`a,materialize(`v),`ret)>);
+	case getCache(`cache,`a,...as):
+	    Tree z = #<nth(`cache,`a)>;
+	    for ( Tree x: as )
+		z = #<callM(union,`unionM,`z,nth(`cache,`x))>;
+	    return z;
+	case `f(...as):
+	    Trees bs = #[];
+	    for (Tree a: as)
+		bs = bs.append(post_simplify_plan(a));
+	    return #<`f(...bs)>;
+	};
+	return e;
+    }
+
     /** construct and simplify the BSP plan from a physical plan
      * @param plan the physical plan
      * @return the BSP plan
      */
     public static Tree constructBSPplan ( Tree plan ) {
-	return bspSimplify(mr2bsp(preprocess(plan)));
+	return post_simplify_plan(bspSimplify(mr2bsp(preprocess(plan))));
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/869eefd5/src/Compiler.gen
----------------------------------------------------------------------
diff --git a/src/Compiler.gen b/src/Compiler.gen
index 65954f2..f584d3c 100644
--- a/src/Compiler.gen
+++ b/src/Compiler.gen
@@ -348,6 +348,8 @@ final public class Compiler extends Translator {
 	    return "new Lambda("+compilef(v.toString(),body)+")";
 	case nth(`x,`n):
 	    return "(((Tuple)("+compileE(x)+")).get("+((LongLeaf)n).value()+"))";
+	case setNth(`x,`n,`v,`ret):
+	    return "(((Tuple)("+compileE(x)+")).set("+((LongLeaf)n).value()+","+compileE(v)+","+compileE(ret)+"))";
 	case materialize(`u):
 	    return "MapReduceAlgebra.materialize("+compileE(u)+")";
 	case let(`v,`u,`body):
@@ -409,14 +411,6 @@ final public class Compiler extends Translator {
 	    return "SystemFunctions.synchronize((MR_bool)"+compileE(b)+")";
 	case distribute(`s):
 	    return "SystemFunctions.distribute((Bag)"+compileE(s)+")";
-	case getCache(`n):
-	    String loc = "((MR_int)"+compileE(n)+").get()";
-	    return (Config.hadoop_mode) ? "Plan.getCache("+loc+")" : "MapReduceAlgebra.getCache("+loc+")";
-	case setCache(`n,`v,`rv):
-	    String loc = "((MR_int)"+compileE(n)+").get()";
-	    return (Config.hadoop_mode)
-		   ? "Plan.setCache("+loc+","+compileE(v)+","+compileE(rv)+")"
-		   : "MapReduceAlgebra.setCache("+loc+","+compileE(v)+","+compileE(rv)+")";
 	case mapReduce(`mx,`my,`s,_):
 	    return "MapReduceAlgebra.mapReduce("+compileF(mx)+","+compileF(my)+",(Bag)("+compileE(s)+"))";
 	case mapReduce2(`mx,`my,`r,`x,`y,_):

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/869eefd5/src/Interpreter.gen
----------------------------------------------------------------------
diff --git a/src/Interpreter.gen b/src/Interpreter.gen
index e254792..b43f403 100644
--- a/src/Interpreter.gen
+++ b/src/Interpreter.gen
@@ -203,6 +203,8 @@ public class Interpreter extends TypeInference {
 	    return new Lambda(evalf(v.toString(),body,env));
 	case nth(`x,`n):
 	    return ((Tuple)evalE(x,env)).get((int)n.longValue());
+	case setNth(`x,`n,`v,`ret):
+	    return ((Tuple)evalE(x,env)).set((int)n.longValue(),evalE(v,env),evalE(ret,env));
 	case materialize(`u):
 	    return MapReduceAlgebra.materialize(evalE(u,env));
 	case let(`v,`u,`body):
@@ -295,14 +297,6 @@ public class Interpreter extends TypeInference {
 	    return Evaluator.synchronize((MR_bool)evalE(b,env));
 	case distribute(`s):
 	    return Evaluator.distribute((Bag)evalE(s,env));
-	case getCache(`n):
-	    int loc = ((MR_int)evalE(n,env)).get();
-	    return (Config.hadoop_mode) ? Plan.getCache(loc) : MapReduceAlgebra.getCache(loc);
-	case setCache(`n,`v,`rv):
-	    int loc = ((MR_int)evalE(n,env)).get();
-	    return (Config.hadoop_mode)
-		   ? Plan.setCache(loc,evalE(v,env),evalE(rv,env))
-		   : MapReduceAlgebra.setCache(loc,evalE(v,env),evalE(rv,env));
 	case mapReduce(`m,`r,`s,_):
 	    return MapReduceAlgebra.mapReduce(evalF(m,env),
 					      evalF(r,env),
@@ -467,7 +461,8 @@ public class Interpreter extends TypeInference {
 	    msg.printStackTrace();
 	    throw new Error("Evaluation error in: "+print_query(e));
 	} catch (Exception ex) {
-	    System.err.println(ex.getMessage());
+	    if (Config.trace)
+		System.err.println(ex.getMessage());
 	    throw new Error("Evaluation error in: "+print_query(e));
 	}
     }
@@ -743,6 +738,7 @@ public class Interpreter extends TypeInference {
 	    ne = Simplification.rename(Translator.translate_select(ne));
 	    if (Config.trace)
 		System.out.println("After removing select-queries:\n"+ne.pretty(0));
+	    type_inference(ne);
 	    ne = Simplification.simplify_all(ne);
 	    if (Config.trace)
 		System.out.println("Algebra expression:\n"+ne.pretty(0));
@@ -757,7 +753,8 @@ public class Interpreter extends TypeInference {
 	    if (Config.trace)
 		System.out.println("Physical plan type: "+print_type(et));
 	    repeat_variables = #[];
-	    Tree plan = PlanGeneration.makePlan(Simplification.simplify_all(ne));
+	    ne = Simplification.simplify_all(ne);
+	    Tree plan = PlanGeneration.makePlan(ne);
 	    if (Config.bsp_mode) {
 		BSPTranslator.reset();
 		if (Config.trace)

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/869eefd5/src/MapReduceAlgebra.java
----------------------------------------------------------------------
diff --git a/src/MapReduceAlgebra.java b/src/MapReduceAlgebra.java
index 676ecc5..f7529b5 100644
--- a/src/MapReduceAlgebra.java
+++ b/src/MapReduceAlgebra.java
@@ -724,12 +724,6 @@ final public class MapReduceAlgebra {
     /** the cache that holds all local data in memory */
     private static Tuple cache;
 
-    private static void cleanCache () {
-	cache = new Tuple(100);
-	for ( int i = 0; i < 100; i++ )
-	    cache.set(i,new Bag());
-    }
-
     /** return the cache element at location loc */
     public static MRData getCache ( int loc ) {
 	return cache.get(loc);
@@ -763,7 +757,9 @@ final public class MapReduceAlgebra {
 	boolean skip = false;
 	String tabs = "";
 	int step = 0;
-	cleanCache();
+	cache = new Tuple(100);
+	for ( int i = 0; i < 100; i++ )
+	    cache.set(i,new Bag());
 	for ( Bag x: inputs ) {
 	    Tuple p = (Tuple)(x.get(0));
 	    cache.set(((MR_int)p.first()).get(),
@@ -781,7 +777,7 @@ final public class MapReduceAlgebra {
 		    if (cache.get(i) instanceof Bag && ((Bag)cache.get(i)).size() > 0)
 			System.out.println(tabs+"      cache "+i+": "+cache.get(i));
 	    };
-	    result = (Tuple)superstep.eval(new Tuple(msgs,state));
+	    result = (Tuple)superstep.eval(new Tuple(cache,msgs,state));
 	    Bag new_msgs = (Bag)result.get(0);
 	    state = result.get(1);
 	    exit = ((MR_bool)result.get(2)).get();

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/869eefd5/src/Normalization.gen
----------------------------------------------------------------------
diff --git a/src/Normalization.gen b/src/Normalization.gen
index ed243b7..4d3d85b 100644
--- a/src/Normalization.gen
+++ b/src/Normalization.gen
@@ -23,7 +23,7 @@ import Gen.*;
 /** normalize algebraic expressions to more efficient forms using heuristic rules */
 public class Normalization extends Translator {
 
-    /** given that patter=e, find the bindings of the pattern variables */
+    /** given that pattern=e, find the bindings of the pattern variables */
     static Trees bind_pattern ( Tree pattern, Tree e ) {
         Trees args = #[];
 	match pattern {

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/869eefd5/src/PlanGeneration.gen
----------------------------------------------------------------------
diff --git a/src/PlanGeneration.gen b/src/PlanGeneration.gen
index daa4fb2..ca72b15 100644
--- a/src/PlanGeneration.gen
+++ b/src/PlanGeneration.gen
@@ -185,24 +185,41 @@ final public class PlanGeneration extends AlgebraicOptimization {
      */
     public static Tree makePlan ( Tree e ) {
        match e {
-       // extract the mapReduce combiner
+       // combine groupBy with Join (experimental)
        case mapReduce(lambda(`vm,`bm),lambda(`vr,`br),`s,`o):
 	   if (!Config.use_combiner || !is_dataset_expr(s))
 	       fail;
 	   Tree splan = makePlan(s);
-	   Aggregates.clear();
-	   Tree nv = new_var();
-	   match TypeInference.type_inference(bm) {
-	      case `S(`tp):
-		  if (!is_collection(S))
-		      fail;
-		  type_env.insert(nv.toString(),tp);
-	   };
-	   Tree rd = Aggregates.derive_combiner(br,#<lambda(`vm,`bm)>,vm,vr);
-	   if (Aggregates.can_use_combiner) {
+	   match splan {
+	   case MapReduce2(lambda(`mvx,bag(tuple(`kx,`mx))),
+			   lambda(`mvy,bag(tuple(`ky,`my))),
+			   lambda(`v,cmap(lambda(`x,cmap(lambda(`y,bag(tuple(tuple(`gx,`gy),`b))),
+							 nth(`vx,1))),
+					  nth(`vy,0))),
+			   `X,`Y,`o2):
+	       if (!vx.equals(v) || !vy.equals(v) || !mx.equals(mvx) || !my.equals(mvy))
+		   fail;
+	       Tree gxx = gx;
+	       Tree gyy = gy;
+	       if (free_variables(gx,#[]).equals(#[`y]) && free_variables(gy,#[]).equals(#[`x])) {
+		   gxx = gy;
+		   gyy = gx;
+	       } else if (!free_variables(gx,#[]).equals(#[`x]) || !free_variables(gy,#[]).equals(#[`y]))
+		   fail;
+	       Aggregates.clear();
+	       Tree nv = new_var();
+	       match TypeInference.type_inference(bm) {
+	       case `S(`tp):
+		   if (!is_collection(S))
+		       fail;
+		   type_env.insert(nv.toString(),tp);
+	       };
+	       Tree rd = Aggregates.derive_combiner(br,#<lambda(`vm,`bm)>,vm,vr);
+	       if (Aggregates.reduces.is_empty())
+		   fail;
 	       Tree vr2 = new_var();
 	       Tree m = simplify_all(#<lambda(`vm,cmap(lambda(`nv,bag(tuple(nth(`nv,0),
-							      tuple(...(Aggregates.maps))))),`bm))>);
+	       					    tuple(...(Aggregates.maps))))),`bm))>);
 	       Tree c = subst(vr,vr2,#<bag(tuple(...(Aggregates.combines)))>);
 	       c = simplify_all(#<lambda(`vr2,`c)>);
 	       Tree r = simplify_all(#<lambda(`vr,`rd)>);
@@ -215,38 +232,61 @@ final public class PlanGeneration extends AlgebraicOptimization {
 	       TypeInference.type_inference(r);
 	       Tree combiner = makePlan(c);
 	       Tree reducer = makePlan(r);
-	       match splan {
-	       // combine groupBy with Join (experimental)
-	       case MapReduce2(lambda(`mvx,bag(tuple(`kx,`mx))),
-			       lambda(`mvy,bag(tuple(`ky,`my))),
-			       lambda(`v,cmap(lambda(`x,cmap(lambda(`y,bag(tuple(tuple(`gx,`gy),`b))),
-							     nth(`vx,1))),
-					      nth(`vy,0))),
-			       `X,`Y,`o2):
-		   if (!vx.equals(v) || !vy.equals(v) || !mx.equals(mvx) || !my.equals(mvy))
-		       fail;
-		   Tree nm = simplify_all(subst(x,#<nth(`vx,0)>,
-					  subst(y,#<nth(`vy,1)>,
-						#<cmap(lambda(x,bag(nth(x,1))),
-						       cmap(`m,bag(tuple(tuple(`gx,`gy),`b))))>)));
-		   type_env.insert(v.toString(),TypeInference.type_inference(#<tuple(`mvx,`mvy)>));
-		   nm = makePlan(nm);
-		   return #<GroupByJoin(lambda(`mvx,`kx),
-					lambda(`mvy,`ky),
-					lambda(`x,`gx),
-					lambda(`y,`gy),
-					lambda(`v,`nm),
-					`combiner,
-					`reducer,
-					`X,`Y,`o)>;
-	       // if the MapCombineReduce input is a join, push the combiner to the join
-	       case MapReduce2(`mx,`my,lambda(`rv,`rb),`x,`y,`o2):
-		   Tree nr = makePlan(simplify_all(#<lambda(`rv,cmap(`m,`rb))>));
-		   return #<MapReduce(lambda(`vm,bag(`vm)),`reducer,
-				      MapCombineReduce2(`mx,`my,`combiner,`nr,`x,`y,`o2),`o)>;
-	       case `input:
-		   return #<MapCombineReduce(`(makePlan(m)),`combiner,`reducer,`input,`o)>;
-	       }
+	       Tree nm = simplify_all(subst(x,#<nth(`vx,0)>,
+					    subst(y,#<nth(`vy,1)>,
+						  #<cmap(lambda(x,bag(nth(x,1))),
+							 cmap(`m,bag(tuple(tuple(`gx,`gy),`b))))>)));
+	       type_env.insert(v.toString(),TypeInference.type_inference(#<tuple(`mvx,`mvy)>));
+	       nm = makePlan(nm);
+	       return #<GroupByJoin(lambda(`mvx,`kx),
+				    lambda(`mvy,`ky),
+				    lambda(`x,`gxx),
+				    lambda(`y,`gyy),
+				    lambda(`v,`nm),
+				    `combiner,
+				    `reducer,
+				    `X,`Y,`o)>;
+	   };
+	   fail
+       // extract the mapReduce combiner
+       case mapReduce(lambda(`vm,`bm),lambda(`vr,`br),`s,`o):
+	   if (!Config.use_combiner || !is_dataset_expr(s))
+	       fail;
+	   Tree splan = makePlan(s);
+	   Aggregates.clear();
+	   Tree nv = new_var();
+	   match TypeInference.type_inference(bm) {
+	      case `S(`tp):
+		  if (!is_collection(S))
+		      fail;
+		  type_env.insert(nv.toString(),tp);
+	   };
+	   Tree rd = Aggregates.derive_combiner(br,#<lambda(`vm,`bm)>,vm,vr);
+	   if (!Aggregates.can_use_combiner || Aggregates.reduces.is_empty())
+	       fail;
+	   Tree vr2 = new_var();
+	   Tree m = simplify_all(#<lambda(`vm,cmap(lambda(`nv,bag(tuple(nth(`nv,0),
+					       tuple(...(Aggregates.maps))))),`bm))>);
+	   Tree c = subst(vr,vr2,#<bag(tuple(...(Aggregates.combines)))>);
+	   c = simplify_all(#<lambda(`vr2,`c)>);
+	   Tree r = simplify_all(#<lambda(`vr,`rd)>);
+	   Tree mtp = TypeInference.type_inference(#<bag(tuple(...(Aggregates.maps)))>);
+	   Tree rtp = #<tuple(`(TypeInference.type_inference(#<nth(`vr,0)>)),`mtp)>;
+	   type_env.insert(vr.toString(),rtp);
+	   type_env.insert(vr2.toString(),rtp);
+	   TypeInference.type_inference(m);
+	   TypeInference.type_inference(c);
+	   TypeInference.type_inference(r);
+	   Tree combiner = makePlan(c);
+	   Tree reducer = makePlan(r);
+	   match splan {
+	   // if the MapCombineReduce input is a join, push the combiner to the join
+	   case MapReduce2(`mx,`my,lambda(`rv,`rb),`x,`y,`o2):
+	       Tree nr = makePlan(simplify_all(#<lambda(`rv,cmap(`m,`rb))>));
+	       return #<MapReduce(lambda(`vm,bag(`vm)),`reducer,
+				  MapCombineReduce2(`mx,`my,`combiner,`nr,`x,`y,`o2),`o)>;
+	   case `input:
+	       return #<MapCombineReduce(`(makePlan(m)),`combiner,`reducer,`input,`o)>;
 	   };
 	   fail
        case mapReduce(`m,`r,`s,`o):
@@ -481,7 +521,7 @@ final public class PlanGeneration extends AlgebraicOptimization {
 		       };
 		       plus = makePlan(simplify_all(#<lambda(`np,apply(`plus,`np))>));
 		       Tree acc = makePlan(simplify_all(#<lambda(`na,apply(`plus,tuple(nth(`na,0),
-										       apply(`unit,nth(`na,1)))))>));
+								    apply(`unit,nth(`na,1)))))>));
 		       zero = makePlan((f.equals(#<avg>)) ? zero : #<typed(`zero,`tp)>);
 		       match plan {
 		       case MapCombineReduce(`m,`c,`r,`s,_):

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/869eefd5/src/QueryPlan.gen
----------------------------------------------------------------------
diff --git a/src/QueryPlan.gen b/src/QueryPlan.gen
index 9969964..0c6fd8f 100644
--- a/src/QueryPlan.gen
+++ b/src/QueryPlan.gen
@@ -647,12 +647,30 @@ private final static class SingleQueryPlan {
 		      `(plan[j]))>;
     }
 
+    private static Tree top_pattern_variables ( Tree pat ) {
+	match pat {
+	case _(...ts):
+	    Trees ps = #[];
+	    for ( Tree t: ts )
+		match t {
+		case `f(...): ps = ps.append(#<`f>);
+		case _: ps = ps.append(t);
+		};
+	    if (ps.length() > 1)
+		return #<tuple(...ps)>;
+	    else return ps.head();
+	};
+	return pat;
+    }
+
     static Tree make_unnest ( int i, int j ) {
 	Tree body = null;
 	if (Config.trace)
 	    System.out.print("unnest "+pattern[i]+" -> "+pattern[j]);
 	if (!no_grouping && depth[i] < depth[j]) {
-	    body = subst_expr(pattern_head(pattern[j],false),plan[j],plan[i]);
+	    // Changed 6/13/13: must rearrange binding variables in nested queries based on join order
+	    //body = subst_expr(pattern_head(pattern[j],false),plan[j],plan[i]);
+	    body = subst_header(pattern_head(pattern[j],false),top_pattern_variables(pattern[j]),plan[j],plan[i]);
 	    // new pattern[i] is the old pattern[i]
 	} else {
 	    body = join_body(pattern[j],pattern[i],predicate[i][j]);

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/869eefd5/src/Simplification.gen
----------------------------------------------------------------------
diff --git a/src/Simplification.gen b/src/Simplification.gen
index 3c56759..289dea9 100644
--- a/src/Simplification.gen
+++ b/src/Simplification.gen
@@ -53,25 +53,18 @@ public class Simplification extends Normalization {
 	return e.equals(v);
     }
 
-    private static Tree get_accessor ( Tree v, Tree d, Tree e ) {
-	match d {
-	case nth(`u,`n):
-	    int nn = (int)((LongLeaf)n).value();
-	    Trees bl = #[];
-	    for ( int i = 0; i < 2; i++ )
-		bl = bl.append(i == nn ? get_accessor(#<nth(`v,`i)>,u,e) : #<nth(`v,`i)>);
-	    return #<tuple(...bl)>;
-	case project(`u,`a): return v;
-	};
-	return e;
-    }
-
     private static Trees factor_out_aggr ( Tree e, Tree v ) {
 	match e {
 	case call(`g,cmap(`f,`d)):
 	    if (!simple_accessor(v,d) || !free_variables(f,#[]).is_empty())
 		fail;
-	    return #[bind(`e,`d)];
+	    for ( Tree monoid: monoids )
+		match monoid {
+		case `aggr(...):
+		    if (aggr.equals(g.toString()))
+			return #[bind(`e,`d)];
+		};
+	    fail
        case `f(...al):
 	   Trees bl = #[];
 	   for ( Tree a: al )
@@ -97,7 +90,7 @@ public class Simplification extends Normalization {
 		    for ( Tree b: bl )
 			match b {
 			case bind(`uu,`dd):
-			    if (d.equals(dd))
+			    if (d.equals(dd) && !u.equals(uu))
 				if (alpha_equivalent(u,uu))
 				    exists = true;
 			};
@@ -158,43 +151,53 @@ public class Simplification extends Normalization {
 	case cmap(`f,if(`p,`x,`y)):
 	    return simplify(#<if(`p,cmap(`f,`x),cmap(`f,`y))>);
 	// if the join reducer contains an independent aggregation push it to the input
-	// NOT USED YET
-	case xxxjoin(`kx,`ky,lambda(`v,cmap(lambda(`v1,cmap(lambda(`v2,bag(tuple(`k,`b))),
-							 nth(`vy,1))),
-					 nth(`vx,0))),
+	case join(`kx,`ky,
+		  lambda(`v,cmap(lambda(`v1,cmap(lambda(`v2,bag(tuple(`k,`b))),
+						 nth(`vy,1))),
+				 nth(`vx,0))),
 		  `X,`Y):
 	    if (!vx.equals(v) || !vy.equals(v))
 		fail;
 	    Trees l1 = factor_out_aggregations(b,v1);
-	    Trees dl = #[];
-	    if (!l1.is_empty())
-		for ( Tree a: l1 )
-		    match a {
-		    case bind(`u,`d):
-			Tree nv = new_var();
-			b = subst(u,d,b);
-			if (!dl.member(d))
-			    X = #<cmap(lambda(`nv,bag(`(subst(v1,nv,get_accessor(v1,d,u))))),`X)>;
-			dl = dl.append(d);
-		    };
 	    Trees l2 = factor_out_aggregations(b,v2);
-	    dl = #[];
-	    if (!l2.is_empty())
-		for ( Tree a: l2 )
-		    match a {
-		    case bind(`u,`d):
-			Tree nv = new_var();
-			b = subst(u,d,b);
-			if (!dl.member(d))
-			    Y = #<cmap(lambda(`nv,bag(`(subst(v2,nv,get_accessor(v2,d,u))))),`Y)>;
-			dl = dl.append(d);
-		    };
 	    if (l1.is_empty() && l2.is_empty())
 		fail;
-	    return simplify(#<join(`kx,`ky,lambda(`v,cmap(lambda(`v1,cmap(lambda(`v2,bag(tuple(`k,`b))),
-									  nth(`vy,1))),
-							  nth(`vx,0))),
-				   `X,`Y)>);
+	    Tree px = v1;
+	    Trees dl = #[];
+	    for ( Tree a: l1 )
+		match a {
+		case bind(`u,`d):
+		    Tree nv = new_var();
+		    b = subst(u,nv,b);
+		    if (true || !dl.member(d)) {
+			Tree vv = new_var();
+			X = #<cmap(lambda(`px,bag(tuple(`px,`u))),`X)>;
+			px = #<tuple(`px,`nv)>;
+		    }
+		    dl = dl.append(d);
+		};
+	    Tree py = v2;
+	    dl = #[];
+	    for ( Tree a: l2 )
+		match a {
+		case bind(`u,`d):
+		    Tree nv = new_var();
+		    b = subst(u,nv,b);
+		    if (true || !dl.member(d)) {
+			Tree vv = new_var();
+			Y = #<cmap(lambda(`py,bag(tuple(`py,`u))),`Y)>;
+			py = #<tuple(`py,`nv)>;
+		    };
+		    dl = dl.append(d);
+		};
+	    Tree res = #<join(lambda(`px,apply(`kx,`v1)),
+			      lambda(`py,apply(`ky,`v2)),
+			      lambda(`v,cmap(lambda(`px,cmap(lambda(`py,bag(tuple(`k,`b))),
+							     nth(`vy,1))),
+					     nth(`vx,0))),
+			      `X,`Y)>;
+	    res = rename(res);
+	    return simplify(res);
         // if the reducer of a join generates pairs (k,v), where k is functional dependent
         // on a join key, then the outer groupBy just groups the v values
 	case groupBy(join(lambda(`vx,`bx),`ky,

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/869eefd5/src/TopLevel.gen
----------------------------------------------------------------------
diff --git a/src/TopLevel.gen b/src/TopLevel.gen
index 4c0e5a4..86543c3 100644
--- a/src/TopLevel.gen
+++ b/src/TopLevel.gen
@@ -23,6 +23,7 @@ import java.io.PrintStream;
 
 /** Provides the API for compilation/code-generation */
 final public class TopLevel extends Interpreter {
+    static Tree xml_type;
 
     public TopLevel () {
 	// XML and JSON are user-defined types:
@@ -36,6 +37,7 @@ final public class TopLevel extends Interpreter {
 			       Jbool(bool),
 			       Jnull(tuple()))>);
 	constant(#<PI>,#<double>,new MR_double(Math.PI));
+	xml_type = global_datatype_env.lookup("XML");
 	DataSource.loadParsers();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/869eefd5/src/Tuple.java
----------------------------------------------------------------------
diff --git a/src/Tuple.java b/src/Tuple.java
index 1ba7e63..d2ca6d2 100644
--- a/src/Tuple.java
+++ b/src/Tuple.java
@@ -55,9 +55,14 @@ final public class Tuple implements MRData {
     /** the second element of the tuple */
     public MRData second () { return tuple[1]; }
 
+    /** replace the i'th element of a tuple with new data and return a new value */
+    public MRData set ( int i, MRData data, MRData ret ) {
+	tuple[i] = data;
+	return ret;
+    }
+
     /** replace the i'th element of a tuple with new data */
     public Tuple set ( int i, MRData data ) {
-	assert(i >= 0 && i < tuple.length);
 	tuple[i] = data;
 	return this;
     }

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/869eefd5/src/TypeInference.gen
----------------------------------------------------------------------
diff --git a/src/TypeInference.gen b/src/TypeInference.gen
index d780df2..6aa4258 100644
--- a/src/TypeInference.gen
+++ b/src/TypeInference.gen
@@ -654,10 +654,12 @@ public class TypeInference extends Translator {
 	    match ta {
 	    case XML:
 		return #<list(XML)>;
-	    case `S(XML):
-		if (is_collection(S))
+	    case `S(`tp):
+		if (is_collection(S) && (tp.equals(#<XML>) || tp.equals(TopLevel.xml_type)))
 		    return #<`S(XML)>;
 	    };
+	    if (ta.equals(TopLevel.xml_type))
+		return #<list(XML)>;
 	    match expand(ta) {
 	    case record(...ts):
 		for ( Tree t: ts )

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/869eefd5/tests/queries/factozization_1.mrql
----------------------------------------------------------------------
diff --git a/tests/queries/factozization_1.mrql b/tests/queries/factozization_1.mrql
new file mode 100644
index 0000000..13b86d2
--- /dev/null
+++ b/tests/queries/factozization_1.mrql
@@ -0,0 +1,38 @@
+Mmatrix = source(line,"tests/data/Xmatrix.txt",",",type( (double,long,long) ));
+Hmatrix = source(line,"tests/data/Ymatrix.txt",",",type( (double,long,long) ));
+Wmatrix = source(line,"tests/data/Ymatrix.txt",",",type( (double,long,long) ));
+
+
+macro transpose ( X ) {
+  select (x,j,i)
+    from (x,i,j) in X
+};
+
+// matrix multiplication:
+macro multiply ( X, Y ) {
+  select (sum(z),i,j)
+    from (x,i,k) in X, (y,k,j) in Y, z = x*y
+   group by (i,j)
+};
+
+// cell-wise multiplication:
+macro Cmult ( X, Y ) {
+  select ( x*y, i, j )
+    from (x,i,j) in X, (y,i,j) in Y
+};
+
+// cell-wise division:
+macro Cdiv ( X, Y ) {
+  select ( x/y, i, j )
+    from (x,i,j) in X, (y,i,j) in Y
+};
+
+// Gaussian non-negative matrix factorization (from SystemML paper)
+macro factorize ( V, Hinit, Winit ) {
+  repeat (H,W) = (Hinit,Winit)
+    step ( Cmult(H,Cdiv(multiply(transpose(W),V),multiply(transpose(W),multiply(W,H)))),
+           Cmult(W,Cdiv(multiply(V,transpose(H)),multiply(W,multiply(H,transpose(H))))) )
+   limit 2
+};
+
+//factorize(Mmatrix,Hmatrix,Wmatrix);

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/869eefd5/tests/queries/xml_2.mrql
----------------------------------------------------------------------
diff --git a/tests/queries/xml_2.mrql b/tests/queries/xml_2.mrql
index bdd73b6..08c60c4 100644
--- a/tests/queries/xml_2.mrql
+++ b/tests/queries/xml_2.mrql
@@ -1 +1,2 @@
-source(xml,"tests/data/cs.xml",{"gradstudent"},xpath(name[firstname="Leonidas"]));
+select n.lastname
+from n in source(xml,"tests/data/cs.xml",{"gradstudent"},xpath(name[firstname="Leonidas"]));