You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mrql.apache.org by fe...@apache.org on 2016/02/22 01:26:41 UTC

[2/2] incubator-mrql git commit: [MRQL-84] Improve performance of Incremental MRQL

[MRQL-84] Improve performance of Incremental MRQL


Project: http://git-wip-us.apache.org/repos/asf/incubator-mrql/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mrql/commit/a624ffc5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mrql/tree/a624ffc5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mrql/diff/a624ffc5

Branch: refs/heads/master
Commit: a624ffc5063bac59b752c4eb92d3ac6073cc30cd
Parents: 807df96
Author: fegaras <fe...@cse.uta.edu>
Authored: Sat Feb 20 18:12:45 2016 -0600
Committer: fegaras <fe...@cse.uta.edu>
Committed: Sat Feb 20 18:12:45 2016 -0600

----------------------------------------------------------------------
 .../org/apache/mrql/AlgebraicOptimization.gen   |    2 +-
 core/src/main/java/org/apache/mrql/Compiler.gen |    3 +-
 .../main/java/org/apache/mrql/Interpreter.gen   |   23 +-
 .../java/org/apache/mrql/Materialization.gen    |    2 +
 .../java/org/apache/mrql/PlanGeneration.gen     |   27 +-
 core/src/main/java/org/apache/mrql/Printer.gen  |   30 +-
 .../src/main/java/org/apache/mrql/Streaming.gen | 1045 ++++++++++++------
 .../java/org/apache/mrql/SystemFunctions.java   |    8 +
 core/src/main/java/org/apache/mrql/TopLevel.gen |   70 +-
 .../main/java/org/apache/mrql/TypeInference.gen |   27 +-
 .../java/org/apache/mrql/FlinkEvaluator.gen     |    8 +
 queries/incremental-pagerank.mrql               |   16 +-
 .../java/org/apache/mrql/SparkEvaluator.gen     |   34 +-
 .../org/apache/mrql/SparkFileInputStream.java   |    2 +-
 .../java/org/apache/mrql/SparkStreaming.gen     |   33 +-
 15 files changed, 914 insertions(+), 416 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/a624ffc5/core/src/main/java/org/apache/mrql/AlgebraicOptimization.gen
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/AlgebraicOptimization.gen b/core/src/main/java/org/apache/mrql/AlgebraicOptimization.gen
index b89dd3c..7f1fbcc 100644
--- a/core/src/main/java/org/apache/mrql/AlgebraicOptimization.gen
+++ b/core/src/main/java/org/apache/mrql/AlgebraicOptimization.gen
@@ -93,7 +93,7 @@ public class AlgebraicOptimization extends Simplification {
             return translate(#<crossProduct(`mx,`my,lambda(`v,cmap(`m,`b)),`X,`Y)>);
         case cmap(`r,`groupBy1(cmap(`m,`groupBy2(`s)))):
             if (! #[groupBy,orderBy].member(#<`groupBy1>)
-                && ! #[groupBy,orderBy].member(#<`groupBy2>))
+                || ! #[groupBy,orderBy].member(#<`groupBy2>))
                 fail;
             return #<mapReduce(`(identity()),
                                `(translate(r)),

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/a624ffc5/core/src/main/java/org/apache/mrql/Compiler.gen
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/Compiler.gen b/core/src/main/java/org/apache/mrql/Compiler.gen
index 531205a..0c2ba3a 100644
--- a/core/src/main/java/org/apache/mrql/Compiler.gen
+++ b/core/src/main/java/org/apache/mrql/Compiler.gen
@@ -624,8 +624,7 @@ final public class Compiler extends Translator {
                    +compileF(superstep)+","+compileE(state)+","+o+","
                    +"new Bag[]{"+ds.substring(1)+"})";
         case DataSetCollect(`s):
-            return "(new Bag(((MR_dataset)(Interpreter.lookup_global_binding(\""+s.toString()
-                +"\"))).dataset().take(Integer.MAX_VALUE)))";
+            return "Interpreter.dataSetCollect(\""+s+"\")";
         case `v:
             if (v.is_variable())
                 return v.toString();

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/a624ffc5/core/src/main/java/org/apache/mrql/Interpreter.gen
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/Interpreter.gen b/core/src/main/java/org/apache/mrql/Interpreter.gen
index 6627a12..9d99e8b 100644
--- a/core/src/main/java/org/apache/mrql/Interpreter.gen
+++ b/core/src/main/java/org/apache/mrql/Interpreter.gen
@@ -72,6 +72,8 @@ public class Interpreter extends TypeInference {
 
     /** insert a new global variable binding visible to all nodes */
     public final static void new_distributed_binding ( final String var, final MRData value ) {
+        if (value == null)
+            return;
         new_global_binding(var,value);
         if (Config.hadoop_mode) {   // insert the binding in the hadoop configuration
             String gvs = Plan.conf.get("mrql.global.vars");
@@ -222,6 +224,16 @@ public class Interpreter extends TypeInference {
         return b.toString();
     }
 
+    /** implements the DataSetCollect operation */
+    public static Bag dataSetCollect ( String var ) {
+        MRData val = lookup_global_binding(var);
+        if (val instanceof Bag)
+            return (Bag)val;
+        else if (val instanceof MR_dataset)
+            return new Bag(((MR_dataset)val).dataset().take(Integer.MAX_VALUE));
+        throw new Error("Cannot collect the MRQL value into a bag: "+val);
+    }
+
     public static long pre_trace ( String msg ) {
         tab_count += 3;
         trace_count++;
@@ -302,6 +314,14 @@ public class Interpreter extends TypeInference {
             return ((Tuple)evalE(x,env)).set((int)n.longValue(),evalE(v,env),evalE(ret,env));
         case materialize(`u):
             return MapReduceAlgebra.materialize(evalE(u,env));
+        case let(`v,DataSetCollect(`s),`body):
+            MRData x = evalE(s,env);
+            if (x instanceof MR_dataset)
+                x = new Bag(((MR_dataset)x).dataset().take(Integer.MAX_VALUE));
+            else if (x instanceof Bag)
+                ((Bag)x).materialize();
+            new_distributed_binding(v.toString(),x);
+            return evalE(body,new Environment(v.toString(),x,env));
         case let(`v,`u,`body):
             MRData x = evalE(u,env);
             if (x instanceof Bag)
@@ -414,8 +434,7 @@ public class Interpreter extends TypeInference {
                 return b;
             } catch (Exception ex) { throw new Error(ex); }
         case DataSetCollect(`s):
-            DataSet ds = Evaluator.evaluator.eval(s,env,"-");
-            return new Bag(ds.take(Integer.MAX_VALUE));
+            return dataSetCollect(s.toString());
         case dataset_size(`x):
             return new MR_long(Plan.size(Evaluator.evaluator.eval(x,env,"-")) / (1024*1024));
         case synchronize(`peer,`b):

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/a624ffc5/core/src/main/java/org/apache/mrql/Materialization.gen
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/Materialization.gen b/core/src/main/java/org/apache/mrql/Materialization.gen
index 4869d2e..c056e20 100644
--- a/core/src/main/java/org/apache/mrql/Materialization.gen
+++ b/core/src/main/java/org/apache/mrql/Materialization.gen
@@ -119,6 +119,8 @@ final public class Materialization extends Translator {
             if (((LongLeaf)k).value() != unionM)
                 fail;
             return new_domain(vars,x,new_domain(vars,y,d));
+        case callM(join_key,_,`x,`y):
+            return new_domain(vars,x,new_domain(vars,y,d));
         case `f(...as):
             Domains nd = new Domains(d.domains,d.repeats);
             for ( Tree a: as )

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/a624ffc5/core/src/main/java/org/apache/mrql/PlanGeneration.gen
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/PlanGeneration.gen b/core/src/main/java/org/apache/mrql/PlanGeneration.gen
index e3901a8..1b724da 100644
--- a/core/src/main/java/org/apache/mrql/PlanGeneration.gen
+++ b/core/src/main/java/org/apache/mrql/PlanGeneration.gen
@@ -230,6 +230,12 @@ final public class PlanGeneration extends AlgebraicOptimization {
             if (!free_variables(b,#[`var,`v]).is_empty())
                 fail;
             return e;
+        case mapReduce(lambda(`v,`b),`r,`s,...):
+            if (!repeat_variables.member(s))
+                fail;
+            if (!free_variables(b,#[`var,`v]).is_empty())
+                fail;
+            return e;
         case `f(...r):
             for ( Tree a: r )
                 match get_nested_query(var,a) {
@@ -309,7 +315,7 @@ final public class PlanGeneration extends AlgebraicOptimization {
            };
            fail
        case mapReduce(lambda(`vm,`bm),`nr,`s,`o):
-           if (!is_dataset_expr(s))
+           if (!is_dataset_expr(s) || Config.bsp_mode)
                fail;
            Tree pm = makePlan(bm);
            match get_nested_query(vm,pm) {
@@ -324,12 +330,22 @@ final public class PlanGeneration extends AlgebraicOptimization {
                                       `(makePlan(nr)),
                                       `(makePlan(s)),
                                       `o))>;
+           case mapReduce(`km,`kr,`ks,`ko):
+               Tree nv = new_var();
+               pm = subst(ks,nv,pm);
+               TypeInference.global_type_env.insert(nv.toString(),TypeInference.type_inference(ks));
+               return #<let(`nv,DataSetCollect(`ks),
+                            MapReduce(lambda(`vm,`pm),
+                                      `(makePlan(nr)),
+                                      `(makePlan(s)),
+                                      `o))>;
            };
            fail
        // extract the mapReduce combiner
        case mapReduce(lambda(`vm,`bm),lambda(`vr,`br),`s,`o):
            if (!Config.use_combiner || !is_dataset_expr(s))
                fail;
+           TypeInference.type_inference(e);
            Aggregates.clear();
            Tree nv = new_var();
            match TypeInference.type_inference(bm) {
@@ -472,6 +488,9 @@ final public class PlanGeneration extends AlgebraicOptimization {
                    fail;
            repeat_variables = repeat_variables.append(vs);
            return #<Loop(lambda(tuple(...vs),`(makePlan(b))),`(makePlan(#<tuple(...s)>)),`(makePlan(n)))>;
+       case incr(`z,lambda(tuple(`s,`v),`m),`a):
+           repeat_variables = repeat_variables.append(v);
+           return #<incr(`(makePlan(z)),lambda(tuple(`s,`v),`(makePlan(m))),`(makePlan(a)))>;
        case record(...bl):
            Trees el = #[];
            for ( Tree b: bl )
@@ -731,7 +750,9 @@ final public class PlanGeneration extends AlgebraicOptimization {
         case MapReduce2(`mx,`my,`r,`x,`y,`o):
             return physical_plan(#<MapAggregateReduce2(`mx,`my,`r,null,null,`x,`y,`o)>);
         case MapAggregateReduce2(`mx,`my,lambda(`v,`b),`acc,`zero,`x,`y,false):
-            if (!Config.flink_mode)
+            // dissable mapjoins in Flink and Spark mode, because to select between a mapjoin
+            //    and a coGroup at run-time, it needs to know the input sizes
+            if (!Config.flink_mode  && !Config.spark_mode)
                 fail;
             return #<MapAggregateReduce2(`mx,`my,lambda(`v,`b),`acc,`zero,
                                          `(physical_plan(x)),
@@ -814,7 +835,7 @@ final public class PlanGeneration extends AlgebraicOptimization {
         case CrossProduct(`mx,`my,`r,`x,`y):
             return physical_plan(#<CrossAggregateProduct(`mx,`my,`r,null,null,`x,`y)>);
         case CrossAggregateProduct(`mx,`my,lambda(`v,`b),`acc,`zero,`x,`y):
-            if (Config.flink_mode)
+            if (Config.flink_mode || Config.spark_mode)
                 fail;
             Tree X = new_var();
             Tree Y = new_var();

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/a624ffc5/core/src/main/java/org/apache/mrql/Printer.gen
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/Printer.gen b/core/src/main/java/org/apache/mrql/Printer.gen
index b75de46..18975c9 100644
--- a/core/src/main/java/org/apache/mrql/Printer.gen
+++ b/core/src/main/java/org/apache/mrql/Printer.gen
@@ -398,8 +398,17 @@ public class Printer {
             return f+" ("+v+"):\n"+tab(n+3)+"init: "+print_plan(s,n+9,true)+"\n"
                    +tab(n+3)+"step: "+print_plan(b,n+9,true);
         case Let(`v,`u,`body):
-            return "let "+v+" = "+print_plan(u,n+10+v.toString().length(),pv)+"\n"
+            return "Let "+v+" = "+print_plan(u,n+10+v.toString().length(),pv)+"\n"
                    +tab(n)+print_plan(body,n,pv);
+        case let(`v,`u,`body):
+            if (print_plan(u,0,pv).length() > 0)
+                return "let "+v+" = "+print_plan(u,n+10+v.toString().length(),pv)+"\n"
+                    +tab(n)+print_plan(body,n,pv);
+            else return print_plan(body,n,pv);
+        case lambda(tuple(...vs),`b):
+            return "lambda "+vs+":\n"+tab(n+3)+print_plan(b,n+9,true);
+        case lambda(`v,`b):
+            return "lambda ("+v+"):\n"+tab(n+3)+print_plan(b,n+9,true);
         case If(_,`x1,If(_,`x2,If(_,`x3,`x4))):
             return "Choice 1: "+print_plan(x1,n+10,pv)+"\n"
                    +tab(n)+"Choice 2: "+print_plan(x2,n+10,pv)+"\n"
@@ -412,6 +421,25 @@ public class Printer {
         case If(`c,`x,`y):
             return "Choice 1: "+print_plan(x,n+10,pv)+"\n"
                    +tab(n)+"Choice 2: "+print_plan(y,n+10,pv);
+        case tuple():
+            return "( )";
+        case tuple(...ts):
+            String s = "( ";
+            if (print_plan(ts.head(),0,pv).length() > 0)
+                s += print_plan(ts.head(),n+2,pv)+"\n";
+            for ( Tree t: ts.tail() )
+                if (print_plan(t,0,pv).length() > 0)
+                    s += tab(n+2)+print_plan(t,n+2,pv)+"\n";
+            if (s.equals("( "))
+                return "";
+            else return s+tab(n)+")";
+        case incr(`z,lambda(tuple(...s),`h),lambda(tuple(...t),`a)):
+            return "Incremental:\n"+tab(n+3)+"State Transformer "+s+": "+print_plan(h,n+21+s.toString().length(),true)+"\n"
+                +tab(n+3)+"Answer "+t+": "+print_plan(a,n+10+t.toString().length(),true);
+        case incr(`z,lambda(`s,`h),lambda(`t,`a)):
+            return "Incremental:\n"+tab(n+3)+"Zero: "+print_plan(z,n+9,false)+"\n"
+                +tab(n+3)+"State Transformer ("+s+"): "+print_plan(h,n+23+s.toString().length(),true)+"\n"
+                +tab(n+3)+"Answer ("+t+"): "+print_plan(a,n+12+t.toString().length(),true);
         case `f(...as):
             String s = "";
             for (Tree a: as) {

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/a624ffc5/core/src/main/java/org/apache/mrql/Streaming.gen
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/Streaming.gen b/core/src/main/java/org/apache/mrql/Streaming.gen
index 01e754c..99ce205 100644
--- a/core/src/main/java/org/apache/mrql/Streaming.gen
+++ b/core/src/main/java/org/apache/mrql/Streaming.gen
@@ -21,6 +21,7 @@ import org.apache.mrql.gen.*;
 
 /** Generates code for streaming queries */
 final public class Streaming extends AlgebraicOptimization {
+    private static int istate = 0; 
     // set it to true to debug the monoid inference
     private static boolean inference_tracing = false;
     // set it to true to debug the split function
@@ -122,9 +123,7 @@ final public class Streaming extends AlgebraicOptimization {
                 return #<none>;
             else if (mx.equals(#<fixed>) && my.equals(#<fixed>))
                 return #<fixed>;
-            else if (my.equals(#<fixed>))
-                return #<groupBy(product(`mx,`my))>;
-            else return #<groupBy(product(fixed,`my))>;
+            else return #<groupBy(product(`mx,`my))>;
         case cmap(lambda(`v,bag(`w)),`x):
             if (!w.equals(v))
                 fail;
@@ -160,12 +159,15 @@ final public class Streaming extends AlgebraicOptimization {
                     return #<`gb2(`m2)>;
                 case union:
                     return #<union>;
-                case fixed: return #<fixed>;
+                case fixed:
+                    return #<fixed>;
                 }
             case fixed:
                 match inference(b,new Environment(v.toString(),#<fixed>,env)) {
-                case union: return #<union>;
-                case fixed: return #<fixed>;
+                case union:
+                    return #<union>;
+                case fixed:
+                    return #<fixed>;
                 case `gb(`m):
                     if (! #[groupBy,orderBy].member(#<`gb>) )
                         fail;
@@ -239,31 +241,25 @@ final public class Streaming extends AlgebraicOptimization {
             case product(...ms):
                 return ms.nth(n);
             };
-            fail
+            return #<none>;
         case project(`u,`a):
             match inference(u,env) {
             case fixed:
                 return #<fixed>;
             };
-            fail
+            return #<none>;
+        case if(`p,`e1,`e2):
+            return inference(e1,env);
         case bag(`u):
             match inference(u,env) {
             case fixed:
                 return #<fixed>;
             };
-            fail
+            return #<none>;
         case call(stream,...):
             return #<union>;
         case call(source,...):
             return #<fixed>;
-        case call(`f,`s):
-            for ( Tree monoid: monoids )
-                match monoid {
-                case `aggr(`mtp,`plus,`zero,`unit):
-                    if (aggr.equals(f.toString()))
-                        return #<monoid(`plus,`zero,`unit)>;
-                };
-            fail
         case call(`f,...s):
             Trees ms = #[ ];
             for ( Tree x: s )
@@ -278,16 +274,20 @@ final public class Streaming extends AlgebraicOptimization {
                 fixed &= mm.equals(#<fixed>);
             if (fixed)
                 return #<fixed>;
-            fail
+            return #<none>;
         case bind(_,`u):
             return inference(u,env);
+        case typed(`u,_):
+            return inference(u,env);
         case _:
             if (e.is_string() || e.is_long() || e.is_double())
                 return #<fixed>;
             else if (e.is_variable())
                 if (repeat_var(e))
                     return find(e.toString(),repeat_environment);
-                else return find(e.toString(),env);
+                else if (member(e.toString(),env))
+                    return find(e.toString(),env);
+                else return #<fixed>;
         };
         return #<none>;
     }
@@ -296,7 +296,7 @@ final public class Streaming extends AlgebraicOptimization {
         return inference(e,null);
     }
 
-    private static Tree cmap1 ( Tree f, Tree e ) {
+    private static Tree smap1 ( Tree f, Tree e ) {
         Tree nv = new_var();
         Tree b = new_var();
         return #<cmap(lambda(`nv,cmap(lambda(`b,bag(tuple(nth(`nv,0),`b))),
@@ -304,47 +304,64 @@ final public class Streaming extends AlgebraicOptimization {
                       `e)>;
     }
 
-    private static Tree cmap2 ( Tree f, Tree e ) {
+    private static Tree smap2 ( Tree f, Tree e ) {
         Tree nv = new_var();
         Tree nw = new_var();
-        return #<cmap(lambda(`nv,cmap(lambda(`nw,bag(tuple(tuple(nth(`nw,0),nth(`nv,0)),nth(`nw,1)))),
+        return #<cmap(lambda(`nv,cmap(lambda(`nw,bag(tuple(nth(`nw,0),
+                                                           tuple(nth(`nv,0),nth(`nw,1))))),
                                       apply(`f,tuple(nth(nth(`nv,0),0),nth(`nv,1))))),
                       `e)>;
     }
 
-    private static Tree cmap3 ( Tree f, Tree e ) {
+    private static Tree smap3 ( Tree f, Tree e ) {
         Tree nv = new_var();
         Tree nw = new_var();
-        return #<cmap(lambda(`nv,cmap(lambda(`nw,bag(tuple(tuple(nth(`nw,0),tuple()),nth(`nw,1)))),
+        return #<cmap(lambda(`nv,cmap(lambda(`nw,bag(tuple(nth(`nw,0),
+                                                           tuple(tuple(),nth(`nw,1))))),
                                       apply(`f,`nv))),
                       `e)>;
     }
 
+    private static Tree swap ( Tree e ) {
+        Tree nv = new_var();
+        return #<cmap(lambda(`nv,bag(tuple(tuple(nth(`nv,0),nth(nth(`nv,1),0)),
+                                           nth(nth(`nv,1),1)))),
+                      `e)>;
+    }
+
+    private static Tree mix ( Tree e ) {
+        Tree nv = new_var();
+        Tree x = new_var();
+        Tree y = new_var();
+        Tree b = #<bag(tuple(tuple(nth(`nv,0),
+                                   tuple(nth(`x,0),nth(`y,0))),
+                             tuple(nth(`x,1),nth(`y,1))))>;
+        return #<cmap(lambda(`nv,cmap(lambda(`x,cmap(lambda(`y,`b),
+                                                     groupBy(nth(nth(`nv,1),1)))),
+                                      groupBy(nth(nth(`nv,1),0)))),
+                      `e)>;
+    }
+
     /** inject lineage (all groupBy/join keys) to a query e
      * @param e an MRQL query
      * @return an algebraic term that associates lineage to every query result
      */
     public static Tree inject_q ( Tree e ) {
         match e {
-        case tuple(...as):
-            Trees bs = #[ ];
-            for ( Tree a: as )
-                bs = bs.append(inject_q(a));
-            return #<tuple(...bs)>;
         case reduce(`m,cmap(`f,call(stream,...s))):
-            return e;
+            return #<bag(tuple(tuple(),`e))>;
         case reduce(`m,cmap(`f,`v)):
             if (!repeat_var(v))
                 fail;
-            return e;
+            return #<bag(tuple(tuple(),`e))>;
         case reduce(`m,cmap(`f,`u)):
-            return #<reduce(groupBy(`m),`(cmap1(f,inject_e(u))))>;
+            return #<reduce(groupBy(`m),`(smap1(f,inject_e(u))))>;
         case reduce(`m,call(stream,...s)):
-            return e;
+            return #<bag(tuple(tuple(),`e))>;
         case reduce(`m,`v):
             if (!repeat_var(v))
                 fail;
-            return e;
+            return #<bag(tuple(tuple(),`e))>;
         case cmap(`f,call(stream,...s)):
             Tree a = new_var();
             Tree b = new_var();
@@ -360,14 +377,18 @@ final public class Streaming extends AlgebraicOptimization {
                                          apply(`f,`a))),
                           `v)>;
         case cmap(`f,`u):
-            return cmap1(f,inject_e(u));
-        case `f(...as):
-            Trees bs = #[ ];
-            for ( Tree a: as )
-                bs = bs.append(inject_q(a));
-            return #<`f(...bs)>;
+            return smap1(f,inject_e(u));
         };
-        return e;
+        // otherwise, it's a constant
+        return #<bag(tuple(tuple(),`e))>;
+    }
+
+    private static boolean stream_source ( Tree e ) {
+        match e {
+        case call(stream,...):
+            return true;
+        };
+        return repeat_var(e);
     }
 
     /** inject lineage (all groupBy/join keys) to a groupBy/join term e
@@ -377,10 +398,46 @@ final public class Streaming extends AlgebraicOptimization {
     static Tree inject_e ( Tree e ) {
         match e {
         case `gb(`c):
-            if (! #[groupBy,orderBy].member(#<`gb>) ) fail;
-            return #<`gb(`(inject_c(c)))>;
+            if (! #[groupBy,orderBy].member(#<`gb>) )
+                fail;
+            return #<`gb(`(swap(inject_c(c))))>;
         case coGroup(`c1,`c2):
-            return #<coGroup(`(inject_c(c1)),`(inject_c(c2)))>;
+            match c1 {
+            case cmap(_,`u):
+                if (!stream_source(u))
+                    fail;
+                match c2 {
+                case cmap(_,`s):
+                    if (!stream_source(s))
+                        fail;
+                    Tree nv = new_var();
+                    return #<cmap(lambda(`nv,bag(tuple(tuple(nth(`nv,0),tuple(tuple(),tuple())),
+                                                       nth(`nv,1)))),
+                                  coGroup(`c1,`c2))>;
+                };
+                Tree nv = new_var();
+                Tree y = new_var();
+                return #<cmap(lambda(`nv,cmap(lambda(`y,bag(tuple(tuple(nth(`nv,0),
+                                                                        tuple(tuple(),nth(`y,0))),
+                                                                  tuple(nth(nth(`nv,1),0),
+                                                                        nth(`y,1))))),
+                                              groupBy(nth(nth(`nv,1),1)))),
+                              coGroup(`c1,`(inject_c(c2))))>;
+            };
+            match c2 {
+            case cmap(_,`u):
+                if (!stream_source(u))
+                    fail;
+                Tree nv = new_var();
+                Tree x = new_var();
+                return #<cmap(lambda(`nv,cmap(lambda(`x,bag(tuple(tuple(nth(`nv,0),
+                                                                        tuple(nth(`x,0),tuple())),
+                                                                  tuple(nth(`x,1),
+                                                                        nth(nth(`nv,1),1))))),
+                                              groupBy(nth(nth(`nv,1),0)))),
+                              coGroup(`(inject_c(c1)),`c2))>;
+            };
+            return mix(#<coGroup(`(inject_c(c1)),`(inject_c(c2)))>);
         };
         return inject_c(e);
     }
@@ -392,108 +449,30 @@ final public class Streaming extends AlgebraicOptimization {
     static Tree inject_c ( Tree e ) {
         match e {
         case cmap(`f,call(stream,...s)):
-            return cmap3(f,#<call(stream,...s)>);
+            return smap3(f,#<call(stream,...s)>);
         case cmap(`f,`v):
             if (!repeat_var(v))
                 fail;
-            return cmap3(f,v);
+            return smap3(f,v);
         case cmap(`f,`u):
-            return cmap2(f,inject_e(u));
+            return smap2(f,inject_e(u));
         case call(stream,...s):
-            return cmap3(#<lambda(x,bag(x))>,#<call(stream,...s)>);
+            return smap3(#<lambda(x,bag(x))>,#<call(stream,...s)>);
         case `v:
             if (!repeat_var(v))
                 fail;
-            return cmap3(#<lambda(x,bag(x))>,v);
+            return smap3(#<lambda(x,bag(x))>,v);
         };
         return e;
     }
 
-    /** Return the answer function of a query e
-     * @param e a query
-     * @param var the input of the answer function
-     * @param merge the monoid associated with the input
-     * @return the answer function
-     */
-    static Tree answer ( Tree e, Tree var, Tree merge ) {
-        match e {
-        case tuple(...as):
-            match merge {
-            case product(...ms):
-                Trees bs = #[ ];
-                int i = 0;
-                for ( Tree a: as )
-                    bs = bs.append(answer(a,#<nth(`var,`i)>,ms.nth(i++)));
-                return #<tuple(...bs)>;
-            }
-        case record(...as):
-            match merge {
-            case record(...ms):
-                Trees bs = #[ ];
-                int i = 0;
-                for ( Tree a: as )
-                    match a {
-                    case bind(`v,`u):
-                        match ms.nth(i++) {
-                        case bind(_,`m):
-                            bs = bs.append(#<bind(`v,`(answer(u,#<project(`var,`v)>,m)))>);
-                        }
-                    }
-                return #<record(...bs)>;
-            }
-        case reduce(`m,cmap(`f,`u)):
-            match merge {
-            case `gb(_):
-                if (! #[groupBy,orderBy].member(#<`gb>) )
-                    fail;
-                Tree nv = new_var();
-                return #<reduce(`m,cmap(lambda(`nv,bag(nth(`nv,1))),`var))>;
-            case _:  // total aggregation
-                return var;
-            }
-        case reduce(`m,call(stream,...s)):
-            return var;
-        case reduce(`m,`v):
-            if (!repeat_var(v))
-                fail;
-            return var;
-        case cmap(`f,call(stream,...s)):
-            Tree nv = new_var();
-            return #<cmap(lambda(`nv,bag(nth(`nv,1))),`var)>;
-        case cmap(`f,`v):
-            if (!repeat_var(v))
-                fail;
-            Tree nv = new_var();
-            return #<cmap(lambda(`nv,bag(nth(`nv,1))),`var)>;
-        case cmap(`f,`u):
-            match merge {
-            case `gb(`m):
-                if (! #[groupBy,orderBy].member(#<`gb>) )
-                    fail;
-                Tree nv = new_var();
-                match TypeInference.type_inference(var) {
-                case `T(tuple(tuple(_,tuple()),_)):
-                    // only one key => don't group-by
-                    return #<cmap(lambda(`nv,bag(nth(`nv,1))),`var)>;
-                };
-                Tree nw = new_var();
-                Tree nn = new_var();
-                return #<cmap(lambda(`nv,cmap(lambda(`nn,bag(reduce(`m,`nn))),
-                                              nth(`nv,1))),
-                              `gb(cmap(lambda(`nw,bag(tuple(nth(nth(`nw,0),0),nth(`nw,1)))),
-                                       `var)))>;
-            }
-        };
-        return var;
-    }
-
     /** find all homomorphic terms in the algebraic term e
      * @param e an algebraic term
      * @param env binds variables to monoids
      * @return a list of homomorphic terms
      */
     static Trees find_homomorphisms ( Tree e, Environment env ) {
-        if (! #[none].member(inference(e,env)))
+        if (! #[none,fixed].member(inference(e,env)))
             return #[ `e ];
         match e {
         case cmap(lambda(`v,bag(`b)),`x):
@@ -520,97 +499,85 @@ final public class Streaming extends AlgebraicOptimization {
             for ( Tree d: ds )
                 match d {
                 case bind(`n,`a):
-                    bs = bs.append(find_homomorphisms(a,env));
+                    bs = union(bs,find_homomorphisms(a,env));
                 };
             return bs;
         case call(`f,...as):
             Trees bs = #[ ];
             for ( Tree a: as )
-                bs = bs.append(find_homomorphisms(a,env));
+                bs = union(bs,find_homomorphisms(a,env));
+            return bs;
+        case nth(`u,`n):
+            Trees bs = #[ ];
+            for ( Tree x: find_homomorphisms(u,env) )
+                bs = union(bs,#[nth(`x,`n)]);
+            return bs;
+        case project(`u,`a):
+            Trees bs = #[ ];
+            for ( Tree x: find_homomorphisms(u,env) )
+                bs = union(bs,#[project(`x,`a)]);
             return bs;
         case `f(...as):
             Trees bs = #[ ];
             for ( Tree a: as )
-                bs = bs.append(find_homomorphisms(a,env));
+                bs = union(bs,find_homomorphisms(a,env));
             return bs;
         };
         return #[ ];
     }
 
-    /** Split a cMap term e into a homomorhism and an answer function
-     * @param e a cMap term
+    /** Split the term e into a homomorhism and an answer function
+     * @param e an algebraic term
      * @param var the input variable of the answer function
-     * @param cmap_var the cmap variable
      * @param env binds variables to monoids
      * @return a pair (answer,homomorphism)
      */
-    static Tree split ( Tree e, Tree var, Tree cmap_var, Environment env ) {
+    static Tree split ( Tree e, Tree var, Environment env ) {
         if (split_tracing) {
             tab_count += 3;
-            System.out.println(Interpreter.tabs(tab_count)+var+" "+cmap_var+"\n"+e.pretty(0));
+            System.out.println(Interpreter.tabs(tab_count)+var+"\n"
+                               +Interpreter.tabs(tab_count)+e.pretty(tab_count));
 
         };
-        Tree res = split_trace(e,var,cmap_var,env);
+        Tree res = split_trace(e,var,env);
         if (split_tracing) {
-            System.out.println(Interpreter.tabs(tab_count)+"-> "+res.pretty(0));
+            System.out.println(Interpreter.tabs(tab_count)+"-> "+res.pretty(tab_count+3));
             tab_count -= 3;
         };
         return res;
     }
 
-    private static Tree split_trace ( Tree e, Tree var, Tree cmap_var, Environment env ) {
+    /** Split the term e into a homomorhism and an answer function
+     * @param e an algebraic term
+     * @param var the input variable of the answer function
+     * @param env binds variables to monoids
+     * @return a pair (answer,homomorphism)
+     */
+    static Tree split_trace ( Tree e, Tree var, Environment env ) {
         match e {
-        case cmap(lambda(`v,bag(tuple(nth(`k,0),`b))),`u):
-            if (!cmap_var.equals(k))
-                fail;
-            match inference(u,env) {
-            case `gb(`m):
-                if (! #[groupBy,orderBy].member(#<`gb>) )
-                    fail;
-                Trees hs = find_homomorphisms(b,new Environment(v.toString(),#<product(fixed,`m)>,env));
-                if (hs.length() == 1 && hs.nth(0).equals(b))
-                    return #<pair(`var,`e)>;
-                Tree nv = new_var();
-                Tree ne = b;
-                int i = 0;
-                for ( Tree h: hs ) {
-                    ne = subst(h,#<nth(`nv,`i)>,ne);
-                    i++;
-                };
-                return #<pair(cmap(lambda(`nv,bag(`ne)),`var),
-                              cmap(lambda(`v,bag(tuple(nth(`k,0),tuple(...hs)))),`u))>;
-            case fixed:
-                Trees hs = find_homomorphisms(b,new Environment(v.toString(),#<fixed>,env));
-                if (hs.length() == 1 && hs.nth(0).equals(b))
-                    return #<pair(`var,`e)>;
-                Tree nv = new_var();
-                Tree ne = b;
-                int i = 0;
-                for ( Tree h: hs ) {
-                    ne = subst(h,#<nth(`nv,`i)>,ne);
-                    i++;
-                };
-                return #<pair(cmap(lambda(`nv,bag(`ne)),`var),
-                              cmap(lambda(`v,bag(tuple(nth(`k,0),tuple(...hs)))),`u))>;
-            }
         case cmap(lambda(`v,bag(tuple(`k,`b))),`u):
+            Environment nenv = env;
+            Tree nv = new_var();
             match inference(u,env) {
+            case union:
+                nv = #<nth(`nv,1)>;
+                nenv = new Environment(v.toString(),#<union>,nenv);
             case `gb(`m):
                 if (! #[groupBy,orderBy].member(#<`gb>) )
                     fail;
-                Trees hs = find_homomorphisms(b,new Environment(v.toString(),#<product(fixed,`m)>,env));
-                if (hs.length() == 1 && hs.nth(0).equals(b))
-                    return #<pair(`var,`e)>;
-                Tree nv = new_var();
-                Tree ne = b;
-                int i = 0;
-                for ( Tree h: hs ) {
-                    ne = subst(h,#<nth(`nv,`i)>,ne);
-                    i++;
-                };
-                return #<pair(cmap(lambda(`nv,bag(`ne)),`var),
-                              cmap(lambda(`v,bag(tuple(`k,tuple(...hs)))),`u))>;
-            }
+                nenv = new Environment(v.toString(),#<product(fixed,`m)>,nenv);
+            };
+            Trees hs = find_homomorphisms(b,nenv);
+            if (hs.length() == 1 && hs.nth(0).equals(b))
+                return #<pair(`var,`e)>;
+            Tree ne = b;
+            int i = 0;
+            for ( Tree h: hs ) {
+                ne = subst(h,#<nth(nth(`nv,1),`i)>,ne);
+                i++;
+            };
+            return #<pair(cmap(lambda(`nv,bag(tuple(nth(`nv,0),`ne))),`var),
+                          cmap(lambda(`v,bag(tuple(`k,tuple(...hs)))),`u))>;
         case cmap(lambda(`v,`b),`u):
             match inference(u,env) {
             case `gb(`m):
@@ -626,38 +593,54 @@ final public class Streaming extends AlgebraicOptimization {
                     Tree nv = new_var();
                     match split(b,nv,nenv) {
                     case pair(`a,`h):
-                        a = subst(nv,var,a);
-                        return #<pair(`a,cmap(lambda(`v,`h),`u))>;
+                        Tree nw = new_var();
+                        if (occurences(v,a) == 0) {
+                            a = subst(nv,var,a);
+                            return #<pair(`a,cmap(lambda(`v,`h),`u))>;
+                        };
+                        // The answer function needs v; need to piggyback v along with the state
+                        a = subst(nv,#<bag(tuple(nth(`nw,0),nth(nth(`nw,1),1)))>,
+                                  subst(v,#<nth(nth(`nw,1),0)>,a));
+                        return #<pair(cmap(lambda(`nw,`a),`var),
+                                      cmap(lambda(`v,cmap(lambda(`nv,bag(tuple(nth(`nv,0),
+                                                                               tuple(`v,nth(`nv,1))))),
+                                                          `h)),
+                                           `u))>;
                     }
                 }
-            case fixed:
+            case `m:
+                if (! #[union,fixed].member(m))
+                    fail;
                 Tree nv = new_var();
-                match split(b,nv,new Environment(v.toString(),#<fixed>,env)) {
+                match split(b,nv,new Environment(v.toString(),m,env)) {
                 case pair(`a,`h):
-                    a = subst(nv,var,a);
-                    a = subst(cmap_var,nv,a);
-                    return #<pair(`a,cmap(lambda(`v,`h),`u))>;
+                    Tree nw = new_var();
+                    if (occurences(v,a) == 0) {
+                        a = subst(nv,var,a);
+                        return #<pair(`a,cmap(lambda(`v,`h),`u))>;
+                    };
+                    // The answer function needs v; need to piggyback v along with the state
+                    a = subst(nv,#<bag(tuple(nth(`nw,0),nth(nth(`nw,1),1)))>,
+                              subst(v,#<nth(nth(`nw,1),0)>,a));
+                    return #<pair(cmap(lambda(`nw,`a),`var),
+                                  cmap(lambda(`v,cmap(lambda(`nv,bag(tuple(nth(`nv,0),
+                                                                           tuple(`v,nth(`nv,1))))),
+                                                      `h)),
+                                       `u))>;
                 }
             }
-        };
-        return #<fail>;
-    }
-
-    /** Split the term e into a homomorhism and an answer function
-     * @param e an algebraic term
-     * @param var the input variable of the answer function
-     * @param env binds variables to monoids
-     * @return a pair (answer,homomorphism)
-     */
-    static Tree split ( Tree e, Tree var, Environment env ) {
-        match e {
-        case cmap(lambda(`v,`b),`u):
-            match split(e,var,v,env) {
-            case fail:
-                return #<pair(cmap(lambda(`v,`b),`var),`u)>;
-            case `t:
-                return t;
-            }
+        case call(stream,...):
+            return #<pair(`var,`e)>;
+        case reduce(`m,`u):
+            match inference(u,env) {
+            case union:
+                return #<pair(`var,`e)>;
+            case _:
+                match split(u,var,env) {
+                case pair(`b,`h):
+                    return #<pair(reduce(`m,`b),`h)>;
+                }
+            };
         case tuple(...as):
             Trees bs = #[ ];
             Trees hs = #[ ];
@@ -682,8 +665,15 @@ final public class Streaming extends AlgebraicOptimization {
                     }
                 };
             return #<pair(record(...bs),record(...hs))>;
-        case reduce(`m,`u):
-            return #<pair(`var,`e)>;
+        case call(`f,...as):
+            if (! #[fixed].member(inference(e,env)))
+                fail;
+            return #<pair(`e,tuple())>;
+        case call(`f,`a):
+            match split(a,var,env) {
+            case pair(`b,`h):
+                return #<pair(call(`f,`b),`h)>;
+            };
         case call(`f,...as):
             Tree nv = new_var();
             Trees bs = #[ ];
@@ -696,6 +686,33 @@ final public class Streaming extends AlgebraicOptimization {
                     hs = hs.append(h);
                 };
             return #<pair(apply(lambda(`nv,call(`f,...bs)),`var),tuple(...hs))>;
+        case nth(`u,`n):
+            match split(u,var,env) {
+            case pair(`b,`h):
+                return #<pair(nth(`b,`n),`h)>;
+            };
+        case project(`u,`a):
+            match split(u,var,env) {
+            case pair(`b,`h):
+                return #<pair(project(`b,`a),`h)>;
+            };
+        case bag(tuple(`k,`u)):
+            Tree ne = #<nth(call(elem,`var),1)>;
+            match TypeInference.type_inference(u) {
+            case `S(_):
+                if (!is_persistent_collection(S))
+                    fail;
+                ne = #<nth(call(elem,Collect(`var)),1)>;
+            };
+            match split(u,ne,env) {
+            case pair(`b,`h):
+                return #<pair(`b,bag(tuple(`k,`h)))>;
+            };
+        case if(`p,`u,bag()):
+            match split(u,var,env) {
+            case pair(`b,`h):
+                return #<pair(`b,if(`p,`h,bag()))>;
+            };
         case `f(...as):
             Tree nv = new_var();
             Trees bs = #[ ];
@@ -709,7 +726,82 @@ final public class Streaming extends AlgebraicOptimization {
                 };
             return #<pair(apply(lambda(`nv,`f(...bs)),`var),tuple(...hs))>;
         };
-        return #<pair(`e,tuple())>;
+        return #<pair(`var,`e)>;
+    }
+
+    /** Return the monoid of a query e */
+    private static Tree get_monoid ( Tree e, Environment env ) {
+        match e {
+        case reduce(`m,`u):
+            return #<groupBy(`m)>;
+        case cmap(`f,call(stream,...s)):
+            return #<union>;
+        case cmap(`f,`v):
+            if (!repeat_var(v))
+                fail;
+            return #<union>;
+        case cmap(lambda(`v,`b),`u):
+            match inference(u,env) {
+            case groupBy(`m):
+                return get_monoid(b,new Environment(v.toString(),#<product(fixed,`m)>,env));
+            case `m:
+                return get_monoid(b,env);
+            }
+        case bag(tuple(`k,`v)):
+            Tree m = inference(v,env);
+            return #<groupBy(`m)>;
+        case if(`p,`e1,bag()):
+            return get_monoid(e1,env);
+        };
+        throw new Error("Unable to find the merge function for: "+e);
+        //return #<groupBy(fixed)>;
+    }
+
+    private static Tree convert_reduce_all ( Tree m, Tree e ) {
+        match m {
+        case fixed:
+            return #<call(elem,`e)>;
+        case union:
+            Tree v = new_var();
+            return #<cmap(lambda(`v,`v),`e)>;
+        case product(...ps):
+            int i = 0;
+            Trees as = #[ ];
+            for ( Tree p: ps ) {
+                Tree nv = new_var();
+                as = as.append(convert_reduce_all(p,#<cmap(lambda(`nv,bag(nth(`nv,`(i++)))),`e)>));
+            };
+            return #<tuple(...as)>;
+        case record(...bs):
+            Trees as = #[ ];
+            for ( Tree b: bs )
+                match b {
+                case bind(`n,`p):
+                    Tree nv = new_var();
+                    Tree nb = convert_reduce_all(p,#<cmap(lambda(`nv,bag(project(`nv,`n))),`e)>);
+                    as = as.append(#<bind(`n,`nb)>);
+                };
+            return #<record(...as)>;
+        case `gb(`gm):
+            if (! #[groupBy,orderBy].member(#<`gb>) )
+                fail;
+            Tree v = new_var();
+            Tree me = convert_reduce_all(gm,#<nth(`v,1)>);
+            return #<cmap(lambda(`v,bag(tuple(nth(`v,0),`me))),
+                          `gb(`e))>;
+        case _:
+            if (!m.is_variable())
+                fail;
+            for ( Tree monoid: monoids )
+                match monoid {
+                case `aggr(`mtp,`plus,`zero,`unit):
+                    if (#<`aggr>.equals(m)) {
+                        plus = Normalization.rename(plus);
+                        return #<aggregate(`plus,`zero,`e)>;
+                    }
+                };
+        };
+        return #<call(elem,`e)>;
     }
 
     /** Convert a reduce on the monoid m to an aggregation */
@@ -717,6 +809,8 @@ final public class Streaming extends AlgebraicOptimization {
         match m {
         case fixed:
             return e;
+        case union:
+            return e;
         case product(...ps):
             int i = 0;
             Trees as = #[ ];
@@ -732,15 +826,21 @@ final public class Streaming extends AlgebraicOptimization {
                 };
             return #<record(...as)>;
         case `gb(`gm):
-            if (! #[groupBy,orderBy].member(#<`gb>) ) fail;
+            if (! #[groupBy,orderBy].member(#<`gb>) )
+                fail;
             Tree v = new_var();
-            Tree me = convert_reduce(gm,#<nth(`v,1)>);
-            return #<cmap(lambda(`v,bag(tuple(nth(`v,0),`me))),`gb(`e))>;
-        };
-        match e {
-        // FIX: may have more cases
-        case cmap(...):
-            return #<call(`m,`e)>;
+            Tree me = convert_reduce_all(gm,#<nth(`v,1)>);
+            return #<cmap(lambda(`v,bag(tuple(nth(`v,0),`me))),
+                          `gb(`e))>;
+        case _:
+            if (!m.is_variable())
+                fail;
+            for ( Tree monoid: monoids )
+                match monoid {
+                case `aggr(`mtp,`plus,`zero,`unit):
+                    if (#<`aggr>.equals(m))
+                        return #<call(`aggr,`e)>;
+                };
         };
         return e;
     }
@@ -754,12 +854,14 @@ final public class Streaming extends AlgebraicOptimization {
             Tree vy = new_var();
             Tree cx = convert_to_algebra(x);
             Tree cy = convert_to_algebra(y);
+            Tree mx = new_var();
+            Tree my = new_var();
             return #<join(lambda(`vx,nth(`vx,0)),lambda(`vy,nth(`vy,0)),
                           lambda(`v,bag(tuple(call(join_key,nth(`v,0),nth(`v,1)),
                                               tuple(cmap(lambda(`vx,bag(nth(`vx,1))),nth(`v,0)),
                                                     cmap(lambda(`vy,bag(nth(`vy,1))),nth(`v,1)))))),
                           `cx, `cy)>;
-        case reduce(`aggr,`s):
+      case reduce(`aggr,`s):
             return convert_reduce(aggr,convert_to_algebra(s));
         case `f(...as):
             Trees bs = #[ ];
@@ -803,31 +905,72 @@ final public class Streaming extends AlgebraicOptimization {
         return e;
     }
 
+    /** true if there is only one instance value of type tp */
+    private static boolean unique_key ( Tree tp ) {
+        match tp {
+        case tuple():
+            return true;
+        case `T(`t):
+            if (!is_collection(T))
+                fail;
+            return unique_key(t);
+        case tuple(...as):
+            for ( Tree a: as )
+                if (!unique_key(a))
+                    return false;
+            return true;
+        case record(...as):
+            for ( Tree a: as )
+                match a {
+                case bind(_,`b):
+                    if (!unique_key(b))
+                        return false;
+                }
+            return true;
+        };
+        return false;
+    }
+
     /** Return the merge function (over X and Y) of the monoid m; type is used for key equality */
     private static Tree merge ( Tree m, Tree type, Tree X, Tree Y ) {
         match m {
         case `gb(`n):
-            // needs outer-join in function r
             if (! #[groupBy,orderBy].member(#<`gb>) )
                 fail;
-            Tree v = new_var();
-            Tree vx = new_var();
-            Tree vy = new_var();
-            Tree mx = new_var();
-            Tree my = new_var();
             match type {
             case `T(tuple(`keytp,`tp)):
+                if (unique_key(keytp)) {
+                    Tree vx = new_var();
+                    Tree vy = new_var();
+                    Tree v = merge(n,tp,#<nth(call(elem,`vx),1)>,#<nth(call(elem,`vy),1)>);
+                    if (is_persistent_collection(T)) {
+                        X = #<Collect(`X)>;
+                        Y = #<Collect(`Y)>;
+                    };
+                    return #<let(`vx,`X,
+                                 let(`vy,`Y,
+                                     if(call(exists,`vx),
+                                        if(call(exists,`vy),
+                                           bag(tuple(`keytp,`v)),
+                                           `vx),
+                                        `vy)))>;
+                };
+                // needs an outer-join in the reducer function
+                Tree v = new_var();
+                Tree vx = new_var();
+                Tree vy = new_var();
+                Tree mx = new_var();
+                Tree my = new_var();
+                Tree mb = merge(n,tp,#<nth(`vx,1)>,#<nth(`vy,1)>);
+                Tree b = #<cmap(lambda(`vx,cmap(lambda(`vy,bag(tuple(nth(`vx,0),`mb))),
+                                                `my)),
+                                `mx)>;
                 return #<join(lambda(`vx,`(key_equality(keytp,#<nth(`vx,0)>))),
                               lambda(`vy,`(key_equality(keytp,#<nth(`vy,0)>))),
                               lambda(`v,let(`mx,nth(`v,0),
                                             let(`my,nth(`v,1),
                                                 if(call(exists,`mx),
-                                                   if(call(exists,`my),
-                                                      cmap(lambda(`vx,cmap(lambda(`vy,bag(tuple(nth(`vx,0),
-                                                                              `(merge(n,tp,#<nth(`vx,1)>,#<nth(`vy,1)>))))),
-                                                                           `my)),
-                                                           `mx),
-                                                      `mx),
+                                                   if(call(exists,`my),`b,`mx),
                                                    `my)))),
                               `X, `Y)>;
             };
@@ -862,7 +1005,7 @@ final public class Streaming extends AlgebraicOptimization {
             for ( Tree monoid: monoids )
                 match monoid {
                 case `aggr(`mtp,`plus,`zero,`unit):
-                    if (aggr.equals(m.toString()))
+                    if (#<`aggr>.equals(m))
                         return #<apply(`plus,tuple(`X,`Y))>;
                 };
         };
@@ -899,7 +1042,7 @@ final public class Streaming extends AlgebraicOptimization {
             for ( Tree monoid: monoids )
                 match monoid {
                 case `aggr(`mtp,`plus,`zero,`unit):
-                    if (aggr.equals(m.toString()))
+                    if (#<`aggr>.equals(m))
                         return zero;
                 };
         };
@@ -909,26 +1052,78 @@ final public class Streaming extends AlgebraicOptimization {
     /** Convert joins to coGroups, plus other transformations */
     private static Tree normalize_term ( Tree e ) {
         match e {
+        case join(`kx,`ky,`r,cmap(`f,`x),cmap(`g,`y)):
+            if (!x.equals(y))
+                fail;
+            Tree v = new_var(), w = new_var(),
+                wx = new_var(), wy = new_var(),
+                vx = new_var(), vy = new_var();
+            Tree xtp = TypeInference.type_inference(((Node)kx).children.head);
+            Tree ytp = TypeInference.type_inference(((Node)ky).children.head);
+            Tree T = #<union(inL(`xtp),inR(`ytp))>;
+            Tree X = #<cmap(lambda(`w,call(plus,cmap(lambda(`wx,bag(tuple(apply(`kx,`wx),
+                                                                          typed(tagged_union(0,`wx),`T)))),
+                                                     apply(`f,`w)),
+                                           cmap(lambda(`wy,bag(tuple(apply(`ky,`wy),
+                                                                     typed(tagged_union(1,`wy),`T)))),
+                                                apply(`g,`w)))),
+                            `x)>;
+            Tree sx = #<cmap(lambda(`vx,if(call(eq,union_tag(`vx),0),bag(typed(union_value(`vx),`xtp)),bag())),
+                             nth(`v,1))>;
+            Tree sy = #<cmap(lambda(`vy,if(call(eq,union_tag(`vy),1),bag(typed(union_value(`vy),`ytp)),bag())),
+                             nth(`v,1))>;
+            return normalize_term(#<cmap(lambda(`v,apply(`r,tuple(`sx,`sy))),
+                                         groupBy(`X))>);
+        case join(`kx,`ky,`r,`x,cmap(`g,`y)):
+            if (!x.equals(y))
+                fail;
+            Tree v = new_var(), w = new_var(), wy = new_var(),
+                vx = new_var(), vy = new_var();
+            Tree xtp = TypeInference.type_inference(((Node)kx).children.head);
+            Tree ytp = TypeInference.type_inference(((Node)ky).children.head);
+            Tree T = #<union(inL(`xtp),inR(`ytp))>;
+            Tree X = #<cmap(lambda(`w,call(plus,bag(tuple(apply(`kx,`w),typed(tagged_union(0,`w),`T))),
+                                           cmap(lambda(`wy,bag(tuple(apply(`ky,`wy),
+                                                                     typed(tagged_union(1,`wy),`T)))),
+                                                apply(`g,`w)))),
+                            `x)>;
+            Tree sx = #<cmap(lambda(`vx,if(call(eq,union_tag(`vx),0),bag(typed(union_value(`vx),`xtp)),bag())),
+                             nth(`v,1))>;
+            Tree sy = #<cmap(lambda(`vy,if(call(eq,union_tag(`vy),1),bag(typed(union_value(`vy),`ytp)),bag())),
+                             nth(`v,1))>;
+            return normalize_term(#<cmap(lambda(`v,apply(`r,tuple(`sx,`sy))),
+                                         groupBy(`X))>);
+        case join(`kx,`ky,`r,cmap(`f,`x),`y):
+            if (!x.equals(y))
+                fail;
+            Tree v = new_var(), w = new_var(), wx = new_var(),
+                vx = new_var(), vy = new_var();
+            Tree xtp = TypeInference.type_inference(((Node)kx).children.head);
+            Tree ytp = TypeInference.type_inference(((Node)ky).children.head);
+            Tree T = #<union(inL(`xtp),inR(`ytp))>;
+            Tree X = #<cmap(lambda(`w,call(plus,cmap(lambda(`wx,bag(tuple(apply(`kx,`wx),
+                                                                          typed(tagged_union(0,`wx),`T)))),
+                                                     apply(`f,`w)),
+                                           bag(tuple(apply(`ky,`w),typed(tagged_union(1,`w),`T))))),
+                            `x)>;
+            Tree sx = #<cmap(lambda(`vx,if(call(eq,union_tag(`vx),0),bag(typed(union_value(`vx),`xtp)),bag())),
+                             nth(`v,1))>;
+            Tree sy = #<cmap(lambda(`vy,if(call(eq,union_tag(`vy),1),bag(typed(union_value(`vy),`ytp)),bag())),
+                             nth(`v,1))>;
+            return normalize_term(#<cmap(lambda(`v,apply(`r,tuple(`sx,`sy))),
+                                         groupBy(`X))>);
         case join(`kx,`ky,`r,`x,`y):
-            match TypeInference.type_inference(x) {
-            case `T(`xtp):
-                match TypeInference.type_inference(x) {
-                case `S(`ytp):
-                    Tree v = new_var();
-                    Tree vx = new_var();
-                    Tree vy = new_var();
-                    type_env.insert(vx.toString(),xtp);
-                    type_env.insert(vy.toString(),ytp);
-                    type_env.insert(v.toString(),#<tuple(none,tuple(`T(`xtp),`S(`ytp)))>);
-                    return normalize_term(#<cmap(lambda(`v,apply(`r,tuple(nth(nth(`v,1),0),
-                                                                          nth(nth(`v,1),1)))),
-                                                 coGroup(cmap(lambda(`vx,bag(tuple(apply(`kx,`vx),`vx))),`x),
-                                                         cmap(lambda(`vy,bag(tuple(apply(`ky,`vy),`vy))),`y)))>);
-                }
-            };
-            fail
+            Tree v = new_var();
+            Tree vx = new_var();
+            Tree vy = new_var();
+            Tree X = #<cmap(lambda(`vx,bag(tuple(apply(`kx,`vx),`vx))),`x)>;
+            Tree Y = #<cmap(lambda(`vy,bag(tuple(apply(`ky,`vy),`vy))),`y)>;
+            return normalize_term(#<cmap(lambda(`v,apply(`r,tuple(nth(nth(`v,1),0),
+                                                                  nth(nth(`v,1),1)))),
+                                         coGroup(`X,`Y))>);
         case `gb(`u):
-            if (! #[groupBy,orderBy].member(#<`gb>) ) fail;
+            if (! #[groupBy,orderBy].member(#<`gb>) )
+                fail;
             match u {
             case cmap(...):
                 return #<`gb(`(normalize_term(u)))>;
@@ -956,6 +1151,32 @@ final public class Streaming extends AlgebraicOptimization {
             };
             Tree nv = new_var();
             return #<reduce(`m,cmap(lambda(`nv,bag(`nv)),`(normalize_term(u))))>;
+        case call(`f,`s):
+            for ( Tree monoid: monoids )
+                match monoid {
+                case `aggr(`mtp,`plus,`zero,`unit):
+                    if (#<`aggr>.equals(f)) {
+                        match s {
+                        case cmap(...):
+                            return #<reduce(`aggr,`(normalize_term(s)))>;
+                        };
+                        Tree nv = new_var();
+                        return #<reduce(`aggr,cmap(lambda(`nv,bag(`nv)),`(normalize_term(s))))>;
+                    }
+                };
+            fail
+        case `f(...as):
+            Trees bs = #[ ];
+            for ( Tree a: as )
+                bs = bs.append(normalize_term(a));
+            return #<`f(...bs)>;
+        };
+        return e;
+    }
+
+    /** Embed missing cmaps */
+    private static Tree embed_missing_cmaps ( Tree e ) {
+        match e {
         case project(`x,`a):
             match TypeInference.type_inference(x) {
             case `T(`tp):
@@ -963,7 +1184,7 @@ final public class Streaming extends AlgebraicOptimization {
                     fail;
                 Tree v = new_var();
                 type_env.insert(v.toString(),tp);
-                return normalize_term(#<cmap(lambda(`v,bag(project(`v,`a))),`x)>);
+                return embed_missing_cmaps(#<cmap(lambda(`v,bag(project(`v,`a))),`x)>);
             };
             fail
         case nth(`x,`n):
@@ -973,32 +1194,30 @@ final public class Streaming extends AlgebraicOptimization {
                     fail;
                 Tree v = new_var();
                 type_env.insert(v.toString(),tp);
-                return normalize_term(#<cmap(lambda(`v,bag(nth(`v,`n))),`x)>);
+                return embed_missing_cmaps(#<cmap(lambda(`v,bag(nth(`v,`n))),`x)>);
             };
             fail
-        case call(`f,`s):
-            for ( Tree monoid: monoids )
-                match monoid {
-                case `aggr(`mtp,`plus,`zero,`unit):
-                    if (aggr.equals(f.toString())) {
-                        match s {
-                        case cmap(...):
-                            return #<reduce(`aggr,`(normalize_term(s)))>;
-                        };
-                        Tree nv = new_var();
-                        return #<reduce(`aggr,cmap(lambda(`nv,bag(`nv)),`(normalize_term(s))))>;
-		    }
-                };
-            fail
         case `f(...as):
             Trees bs = #[ ];
             for ( Tree a: as )
-                bs = bs.append(normalize_term(a));
+                bs = bs.append(embed_missing_cmaps(a));
             return #<`f(...bs)>;
         };
         return e;
     }
 
+    private static Tree subst_pattern ( Tree pattern, Tree src, Tree dst ) {
+        match pattern {
+        case tuple(...as):
+            Tree res = dst;
+            int i = 0;
+            for ( Tree a: as )
+                res = subst_pattern(a,#<nth(`src,`(i++))>,res);
+            return res;
+        };
+        return subst_var(pattern,src,dst);
+    }
+
     /** Simplify the term e using rewrite rules */
     static Tree simplify_term ( Tree e ) {
         match e {
@@ -1008,12 +1227,22 @@ final public class Streaming extends AlgebraicOptimization {
             return simplify_term(subst_var(x,a,b));
         case cmap(lambda(`x,`b),bag()):
             return #<bag()>;
+        case cmap(`f,if(`p,`e1,`e2)):
+            return simplify_term(#<if(`p,cmap(`f,`e1),cmap(`f,`e2))>);
+        case cmap(`f,call(plus,`x,`y)):
+            return simplify_term(#<call(plus,cmap(`f,`x),cmap(`f,`y))>);
         case groupBy(bag()):
             return #<bag()>;
+        case cmap(`f,join(`kx,`ky,lambda(`v,`r),`X,`Y)):
+            return simplify_term(#<join(`kx,`ky,lambda(`v,cmap(`f,`r)),`X,`Y)>);
         case apply(lambda(`v,`b),`u):
             if (!v.is_variable())
                 fail;
             return simplify_term(subst_var(v,u,b));
+        case apply(lambda(`pattern,`b),`u):
+            return simplify_term(subst_pattern(pattern,u,b));
+        case call(elem,bag(`u)):
+            return simplify_term(u);
         case nth(tuple(...al),`n):
             if (!n.is_long())
                 fail;
@@ -1074,7 +1303,7 @@ final public class Streaming extends AlgebraicOptimization {
                         rs = rs.cons(b);
             return rs;
         };
-        if (repeat_var(e))
+        if (false && repeat_var(e))
             return #[`e];
         else return #[];
     }
@@ -1094,6 +1323,69 @@ final public class Streaming extends AlgebraicOptimization {
         return ne;
     }
 
+    public static Tree unstreamify ( Tree e ) {
+        match e {
+        case call(stream,...r):
+            return #<call(source,...r)>;
+        case BinaryStream(...r):
+            return #<BinarySource(...r)>;
+        case ParsedStream(...r):
+            return #<ParsedSource(...r)>;
+        case `f(...al):
+            Trees bs = #[ ];
+            for ( Tree a: al )
+                bs = bs.append(unstreamify(a));
+            return #<`f(...bs)>;
+        };
+        return e;
+    }
+
+    /** Return the answer function of a query e
+     * @param e a query
+     * @param var the input of the answer function
+     * @param merge the monoid associated with the input
+     * @return the answer function
+     */
+    static Tree answer ( Tree e, Tree var, Tree merge ) {
+        match e {
+        case cmap(`f,call(stream,...s)):
+            return var;
+        case cmap(`f,`v):
+            if (!repeat_var(v))
+                fail;
+            return var;
+        case cmap(lambda(`v,`b),`u):
+            match merge {
+            case `gb(`m):
+                if (! #[groupBy,orderBy].member(#<`gb>) )
+                    fail;
+                Tree nv = new_var();
+                return #<reduce(`gb(`m),
+                                cmap(lambda(`nv,bag(tuple(nth(nth(`nv,0),0),nth(`nv,1)))),
+                                     `var))>;
+            }
+        };
+        return var;
+    }
+
+    /** Remove the state from the query output */
+    private static Tree strip_state ( Tree e, Tree var ) {
+        match e {
+        case reduce(`m,call(stream,...s)):
+            return var;
+        case reduce(`m,`v):
+            if (!repeat_var(v))
+                fail;
+            return var;
+        case reduce(`m,`u):
+            return var;
+        case cmap(`f,`u):
+            Tree nn = new_var();
+            return #<cmap(lambda(`nn,bag(nth(`nn,1))),`var)>;
+        };
+        return var;
+    }
+
     /** Convert a stream-based query to an incremental stream-based program.
      *    It returns the quadruple (zero,merge,homomorphism,answer),
      *    where (zero,merge) is the monoid for the homomorphism
@@ -1103,16 +1395,15 @@ final public class Streaming extends AlgebraicOptimization {
      * @return a quadruple that represents the incremental stream-based program
      */
     static Tree generate_code ( Tree e, Environment env ) {
-        if (!is_streaming(e)) {
-            Tree x = new_var();
-            Tree s = new_var();
-            TypeInference.type_env.insert(x.toString(),#<tuple(tuple(),tuple())>);
-            TypeInference.type_env.insert(s.toString(),#<tuple()>);
-            return #<tuple(tuple(),lambda(`x,tuple()),tuple(),lambda(`s,`e))>;
-        }
+        if (Config.trace) {
+            System.out.println("Subterm:");
+            System.out.println(e.pretty(0));
+        };
         Tree ne = SimplifyTerm(normalize_term(e));
+        TypeInference.type_inference(ne);
+        ne = SimplifyTerm(embed_missing_cmaps(ne));
         if (Config.trace) {
-            System.out.println("After converting joins to coGroup and simplifying:");
+            System.out.println("Subterm normalized:");
             System.out.println(ne.pretty(0));
         };
         Tree qe = SimplifyTerm(inject_q(ne));
@@ -1122,40 +1413,128 @@ final public class Streaming extends AlgebraicOptimization {
         };
         Tree nv = new_var();
         match split(qe,nv,env) {
-        case pair(`a,`q):
+        case pair(`a,`h):
             if (Config.trace) {
                 System.out.println("After split:");
-                System.out.println(a.pretty(0)+"\n"+q.pretty(0));
+                System.out.println(a.pretty(0)+"\n"+h.pretty(0));
+            };
+            h = SimplifyTerm(h);
+            if (Config.trace) {
                 System.out.println("After split (simplified):");
-                System.out.println(SimplifyTerm(a).pretty(0)+"\n"+SimplifyTerm(q).pretty(0));
+                System.out.println(SimplifyTerm(a).pretty(0)+"\n"+h.pretty(0));
             };
-            Tree m = inference(q,env);
+            Tree m = get_monoid(h,env);
             if (Config.trace)
-                System.out.println("monoid: "+m.pretty(0));
-            q = convert_to_algebra(SimplifyTerm(q));
+                System.out.println("Subterm monoid: "+m.pretty(0));
+            Tree q = convert_to_algebra(h);
             Tree tp = TypeInference.type_inference(q);
             if (Config.trace) {
-                System.out.println("Query type: "+tp);
+                System.out.println("Subterm type: "+tp);
                 System.out.println("Merge function over X and Y: ");
                 System.out.println(SimplifyTerm(merge(m,tp,#<X>,#<Y>)).pretty(0));
             };
             type_env.insert(nv.toString(),tp);
             Tree answer = answer(ne,nv,m);
             answer = subst(nv,answer,a);
+            answer = strip_state(ne,answer);
             answer = SimplifyTerm(convert_to_algebra(SimplifyTerm(answer)));
             if (Config.trace) {
                 System.out.println("Answer function:");
                 System.out.println(answer.pretty(0));
             };
             Tree zero = zero(m);
-            Tree mv = new_var();
-            Tree merge = convert_to_algebra(SimplifyTerm(merge(m,tp,#<nth(`mv,0)>,#<nth(`mv,1)>)));
-            Tree res = #<tuple(`zero,lambda(`mv,`merge),`q,lambda(`nv,`answer))>;
+            Tree my = new_var();
+            Tree merge = convert_to_algebra(SimplifyTerm(merge(m,tp,nv,my)));
+            Tree res = #<tuple(`zero,lambda(tuple(`nv,`my),`merge),`q,lambda(`nv,`answer))>;
+            if (Config.trace) {
+                System.out.println("Incremental subterm:");
+                System.out.println(res.pretty(0));
+            };
             return res;
         };
         throw new Error("Cannot generate incremental code: "+e);
     }
 
+    private static Tree generate_incremental_code ( Tree e, Environment env ) {
+        if (!is_streaming(e))
+            return #<tuple(tuple(),lambda(tuple(tuple(),tuple()),tuple()),tuple(),lambda(tuple(),`e))>;
+        match e {
+        case call(stream,...):
+            Tree v = new_var();
+            return generate_code(#<cmap(lambda(`v,bag(`v)),`e)>,env);
+        case call(`f,...qs):
+            boolean is_monoid = false;
+            for ( Tree monoid: monoids )
+                match monoid {
+                case `aggr(`mtp,`plus,`zero,`unit):
+                    if (#<`aggr>.equals(f))
+                        is_monoid = true;
+                };
+            if (is_monoid)
+                fail;
+            Trees zs = #[], vs = #[], ws = #[], ms = #[], hs = #[], as = #[];
+            int i = 0;
+            for ( Tree q: qs )
+                match generate_incremental_code(q,env) {
+                case tuple(`z,lambda(tuple(`v,`w),`m),`h,lambda(_,`a)):
+                    zs = zs.append(z); hs = hs.append(h); as = as.append(a);
+                    vs = vs.append(v); ws = ws.append(w); ms = ms.append(m);
+                    i++;
+                };
+            return #<tuple(tuple(...zs),
+                           lambda(tuple(tuple(...vs),tuple(...ws)),tuple(...ms)),
+                           tuple(...hs),
+                           lambda(tuple(...vs),call(`f,...as)))>;
+        case record(...rs):
+            Trees zs = #[], vs = #[], ws = #[], ms = #[], hs = #[], as = #[];
+            int i = 0;
+            for ( Tree r: rs )
+                match r {
+                case bind(`k,`q):
+                    match generate_incremental_code(q,env) {
+                    case tuple(`z,lambda(tuple(`v,`w),`m),`h,lambda(_,`a)):
+                        zs = zs.append(z); hs = hs.append(h); as = as.append(a);
+                        vs = vs.append(v); ws = ws.append(w); ms = ms.append(m);
+                        i++;
+                    }
+                };
+            return #<tuple(tuple(...zs),
+                           lambda(tuple(tuple(...vs),tuple(...ws)),tuple(...ms)),
+                           tuple(...hs),
+                           lambda(tuple(...vs),record(...as)))>;
+        case tuple(...qs):
+            Trees zs = #[], vs = #[], ws = #[], ms = #[], hs = #[], as = #[];
+            int i = 0;
+            for ( Tree q: qs )
+                match generate_incremental_code(q,env) {
+                case tuple(`z,lambda(tuple(`v,`w),`m),`h,lambda(_,`a)):
+                    zs = zs.append(z); hs = hs.append(h); as = as.append(a);
+                    vs = vs.append(v); ws = ws.append(w); ms = ms.append(m);
+                    i++;
+                };
+            return #<tuple(tuple(...zs),
+                           lambda(tuple(tuple(...vs),tuple(...ws)),tuple(...ms)),
+                           tuple(...hs),
+                           lambda(tuple(...vs),tuple(...as)))>;
+        };
+        return generate_code(e,env);
+    }
+
+    /** bind the pattern variables to types */
+    private static void bind_pattern2type ( Tree pattern, Tree tp ) {
+        match pattern {
+        case tuple(...pl):
+            match tp {
+            case tuple(...tl):
+                int i = 0;
+                for ( Tree p: pl )
+                    bind_pattern2type(p,tl.nth(i++));
+                return;
+            }
+        };
+        TypeInference.type_env.insert(pattern.toString(),tp);
+    }
+
     /** Convert a stream-based query to an incremental stream-based program.
      * @param e a stream-based query
      * @return the incremental stream-based program
@@ -1163,7 +1542,7 @@ final public class Streaming extends AlgebraicOptimization {
     public static Tree generate_incremental_code ( Tree e ) {
         match e {
         case repeat(lambda(`v,`u),`x,`n):
-            if (!is_streaming(x))
+            if (false && !is_streaming(x))
                 fail;
             if (!(n instanceof LongLeaf))
                 throw new Error("The repeat must have a constant repetition: "+n);
@@ -1174,68 +1553,40 @@ final public class Streaming extends AlgebraicOptimization {
             u = #<cmap(lambda(`nv,bag(nth(`nv,0))),`u)>;
             TypeInference.type_inference(u);
             repeat_environment = new Environment(v.toString(),#<union>,repeat_environment);
-            match generate_code(u,null) {
-            case tuple(`z,`m,`h,lambda(`s,`a)):
-                match generate_code(x,null) {
-                case tuple(`xz,`xm,`xh,lambda(`xs,`xa)):
-                    Tree state = new_var();
-                    Tree w = new_var();
-                    TypeInference.type_env.insert(state.toString(),TypeInference.type_inference(s));
-                    TypeInference.type_env.insert(w.toString(),TypeInference.type_inference(s));
-                    Tree hh = subst(v,xa,h);
-                    h = subst(v,a,h);
-                    Tree q = #<repeat(lambda(`s,cmap(lambda(`w,bag(tuple(`w,true))),
-                                                     apply(`m,tuple(`state,`h)))),
-                                      apply(`m,tuple(`state,`hh)),
-                                      `(nn-1))>;
-                    Tree xq = #<apply(`xm,tuple(`xs,`xh))>;
-                    a = subst(s,state,a);
-                    TypeInference.type_inference(a);
-                    Tree res = #<tuple(tuple(`xz,`z),
-                                       lambda(tuple(`xs,`state),tuple(`xq,`q)),
-                                       lambda(tuple(`xs,`state),`a))>;
-                    res = SimplifyTerm(res);
-                    Tree tp = TypeInference.type_inference(res);
-                    return res;
-                }
-            }
-        case repeat(lambda(`v,`u),`x,`n):
-            // when x is invariant (doesn't contain a stream source)
-            if (!(n instanceof LongLeaf))
-                throw new Error("wrong repeat: "+n);
-            int nn = (int)((LongLeaf)n).value();
-            if (nn < 1)
-                throw new Error("wrong repeat number: "+n);
-            Tree nv = new_var();
-            u = #<cmap(lambda(`nv,bag(nth(`nv,0))),`u)>;
-            TypeInference.type_inference(u);
-            repeat_environment = new Environment(v.toString(),#<union>,repeat_environment);
-            match generate_code(u,null) {
-            case tuple(`z,`m,`h,lambda(`s,`a)):
-                Tree state = new_var();
+            match generate_incremental_code(u,null) {
+            case tuple(`z,lambda(tuple(`v1,`v2),`m),`h,lambda(`s,`a)):
+                Tree deltaT = new_var();
                 Tree w = new_var();
-                TypeInference.type_env.insert(state.toString(),TypeInference.type_inference(s));
+                TypeInference.type_env.insert(deltaT.toString(),TypeInference.type_inference(s));
                 TypeInference.type_env.insert(w.toString(),TypeInference.type_inference(s));
-                Tree hh = subst(v,x,h);
-                h = subst(v,a,h);
-                Tree q = #<repeat(lambda(`s,cmap(lambda(`w,bag(tuple(`w,true))),
-                                                 apply(`m,tuple(`state,`h)))),
-                                  apply(`m,tuple(`state,`hh)),
+                Tree nm = subst(v1,deltaT,subst(v2,h,m));
+                Tree m0 = subst(v,x,h);
+                Tree q = #<repeat(lambda(`deltaT,cmap(lambda(`w,bag(tuple(`w,true))),`nm)),
+                                  `m0,
                                   `(nn-1))>;
-                a = subst(s,state,a);
-                TypeInference.type_inference(a);
-                Tree res = #<tuple(`z,
-                                   lambda(`state,`q),
-                                   lambda(`state,`a))>;
+                Tree res = #<incr(tuple(`z,`(unstreamify(x))),
+                                  lambda(tuple(`s,`v),tuple(apply(lambda(tuple(`v1,`v2),`m),tuple(`s,`q)),`a)),
+                                  lambda(tuple(`s,`v),`v))>;
                 res = SimplifyTerm(res);
-                Tree tp = TypeInference.type_inference(res);
+                if (Config.trace) {
+                    System.out.println("Incremental program:");
+                    System.out.println(res.pretty(0));
+                };
+                TypeInference.type_inference(res);
                 return res;
             }
         };
-        match generate_code(e,null) {
-        case tuple(`z,`m,`h,lambda(`v,`a)):
-            Tree q = SimplifyTerm(#<apply(`m,tuple(`v,`h))>);
-            Tree res = #<tuple(`z,lambda(`v,`q),lambda(`v,`a))>;
+        match generate_incremental_code(e,null) {
+        case tuple(`z,lambda(tuple(`v1,`v2),`m),`h,lambda(_,`a)):
+            Tree tp = TypeInference.type_inference(h);
+            bind_pattern2type(v1,tp);
+            Tree nm = SimplifyTerm(#<apply(lambda(`v2,`m),`h)>);
+            a = SimplifyTerm(a);
+            Tree res = #<incr(`z,lambda(`v1,`nm),lambda(`v1,`a))>;
+            if (Config.trace) {
+                System.out.println("Incremental program:");
+                System.out.println(res.pretty(0));
+            };
             TypeInference.type_inference(res);
             return res;
         };

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/a624ffc5/core/src/main/java/org/apache/mrql/SystemFunctions.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/SystemFunctions.java b/core/src/main/java/org/apache/mrql/SystemFunctions.java
index c19e344..62fdd50 100644
--- a/core/src/main/java/org/apache/mrql/SystemFunctions.java
+++ b/core/src/main/java/org/apache/mrql/SystemFunctions.java
@@ -219,6 +219,14 @@ final public class SystemFunctions {
         return (s.contains(e)) ? true_value : false_value;
     }
 
+    public static MRData elem ( Bag s ) {
+        s.materialize();
+        Iterator<MRData> it = s.iterator();
+        if (it.hasNext())
+            return it.next();
+        throw new Error("method elem was applied on an empty bag");
+    }
+
     public static MR_long count ( Bag s ) {
         if (s.materialized())
             return new MR_long(s.size());

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/a624ffc5/core/src/main/java/org/apache/mrql/TopLevel.gen
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/TopLevel.gen b/core/src/main/java/org/apache/mrql/TopLevel.gen
index 7e4f38e..5838ed5 100644
--- a/core/src/main/java/org/apache/mrql/TopLevel.gen
+++ b/core/src/main/java/org/apache/mrql/TopLevel.gen
@@ -17,6 +17,7 @@
  */
 package org.apache.mrql;
 
+import java.util.List;
 import org.apache.mrql.gen.*;
 
 
@@ -265,6 +266,36 @@ final public class TopLevel extends Interpreter {
         monoids = monoids.append(#<`name(`type,`plus,`zero,`unit)>);
     }
 
+    /** bind the pattern variables to values */
+    private static Environment bind_list ( Tree pattern, MRData value, Environment env ) {
+        match pattern {
+        case tuple(...ps):
+            int i = 0;
+            Tuple t = (Tuple)value;
+            for ( Tree p: ps )
+                env = bind_list(p,t.get(i++),env);
+            return env;
+        };
+        return new Environment(pattern.toString(),value,env);
+    }
+
+    /** bind the pattern variables to values */
+    private static Environment bind_list ( Tree pattern, Tree src, Environment env ) {
+        match pattern {
+        case tuple(...ps):
+            int i = 0;
+            match src {
+            case tuple(...es):
+                for ( Tree p: ps )
+                    env = bind_list(p,es.nth(i++),env);
+                return env;
+            }
+        };
+        MRData value = Evaluator.evaluator.evalE(src,env);
+        env.replace(pattern.toString(),value);
+        return env;
+    }
+
     /** the MRQL top-level interfacse to evaluate a single MRQL expression or command */
     public final static void evaluate_top_level ( Tree expr ) {
         if (expr == null)
@@ -330,30 +361,31 @@ final public class TopLevel extends Interpreter {
                 return;
             Evaluator.evaluator.initialize_query();
             match plan {
-            case tuple(`zero,lambda(tuple(`state1,`state2),`merge),lambda(_,`answer)):
-                final Tree qt = make_persistent_type(query_type);
-                Tuple z = (Tuple)evalE(zero,null);
-                final Environment env = 
-                    new Environment(state1.toString(),z.get(0),
-                                    new Environment(state2.toString(),z.get(1),null));
-                final Tree ans = answer;
-                merge = Streaming.streamify(merge);
-                Evaluator.evaluator.streaming(#<lambda(tuple(`state1,`state2),`merge)>,env,
-                   new Function(){
-                        public MRData eval ( final MRData state ) {
-                            MRData a = Evaluator.evaluator.evalE(ans,env);
-                            System.out.println(print(a,qt));
-                            return state;
-                        }
-                    });
-            case tuple(`zero,lambda(`state,`merge),lambda(_,`answer)):
+            case incr(`zero,lambda(`state,`merge),lambda(_,`answer)):
                 final Tree qt = make_persistent_type(query_type);
                 MRData z = evalE(zero,null);
-                final Environment env = new Environment(state.toString(),z,null);
+                final Environment env = bind_list(state,z,null);
+                final boolean incr_test = false;
+                if (incr_test) {
+                    // for code testing only
+                    long t = System.currentTimeMillis();
+                    Environment nenv = bind_list(state,Streaming.unstreamify(merge),env);
+                    for ( Environment ev = nenv; ev != null; ev = ev.next )
+                        if (ev.value instanceof MR_dataset) {
+                            DataSet ds = ((MR_dataset)ev.value).dataset();
+                            List<MRData> vals = ds.take(Config.max_bag_size_print);
+                            System.out.println(ev.name+" = "+vals.size()+ " "+vals);
+                        };
+                    MRData a = Evaluator.evaluator.evalE(answer,nenv);
+                    System.out.println(print(a,qt));
+                    if (!Config.quiet_execution)
+                        System.out.println("Run time: "+(System.currentTimeMillis()-t)/1000.0+" secs");
+                    return;
+                };
                 final Tree ans = answer;
                 merge = Streaming.streamify(merge);
                 Evaluator.evaluator.streaming(#<lambda(`state,`merge)>,env,
-                   new Function(){
+                   new Function() {
                         public MRData eval ( final MRData state ) {
                             MRData a = Evaluator.evaluator.evalE(ans,env);
                             System.out.println(print(a,qt));

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/a624ffc5/core/src/main/java/org/apache/mrql/TypeInference.gen
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/TypeInference.gen b/core/src/main/java/org/apache/mrql/TypeInference.gen
index 7fe483d..7c80240 100644
--- a/core/src/main/java/org/apache/mrql/TypeInference.gen
+++ b/core/src/main/java/org/apache/mrql/TypeInference.gen
@@ -543,6 +543,9 @@ public class TypeInference extends Translator {
         case let(`p,`u,`b):
             bind_pattern_type(p,type_inference2(u));
             return type_inference2(b);
+        case Let(`p,`u,`b):
+            bind_pattern_type(p,type_inference2(u));
+            return type_inference2(b);
         case case(`u,...cs):
             Tree tp = type_inference2(u);
             Trees ts = #[];
@@ -667,10 +670,12 @@ public class TypeInference extends Translator {
             return type_inference2(b);
         case typed(null,`tp):
             return tp;
-        case typed(`f(...),`tp):
-            if (f.equals("tagged_union") || f.equals("union_value"))
-                return tp;
-            else fail
+        case typed(tagged_union(`n,`u),`tp):
+            type_inference2(u);
+            return tp;
+        case typed(union_value(`u),`tp):
+            type_inference2(u);
+            return tp;
         case typed(`x,`tp):
             if (tp.is_variable() && !tp.equals(#<string>)
                  && MRContainer.type_code(tp.toString()) >= 0) {
@@ -927,9 +932,9 @@ public class TypeInference extends Translator {
             case `T(tuple(`kx,`a)):
                 if (!is_collection(T))
                     type_error(e,"Wrong coGroup left operand: "+print_query(x));
-                match xtp {
+                match ytp {
                 case `S(tuple(`ky,`b)):
-                    if (!is_collection(T))
+                    if (!is_collection(S))
                         type_error(e,"Wrong coGroup right operand: "+print_query(y));
                     if (unify(kx,ky) == null)
                         type_error(e,"incompatible keys in coGroup: "+print_query(e));
@@ -1206,6 +1211,8 @@ public class TypeInference extends Translator {
             return rt;
         case call(inv,`x):
             return type_inference(x);
+        case incr(...s):
+            return type_inference(#<tuple(...s)>);
         case call(plus,`x,`y):
             Tree tx = type_inference2(x);
             Tree ty = type_inference2(y);
@@ -1250,6 +1257,14 @@ public class TypeInference extends Translator {
                 coerce(tx,t1,((Node)e).children.tail);
                 return #<bool>;
             };
+        case call(elem,`x):
+            Tree tp = type_inference2(x);
+            match tp {
+            case `T(`t):
+                if (is_collection(T))
+                    return t;
+            };
+            type_error(e,"method elem must be applied to a collection: "+tp);
         case call(`f,`x,`y):
             if (! #[eq,neq,lt,leq,gt,geq].member(f))
                 fail;

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/a624ffc5/flink/src/main/java/org/apache/mrql/FlinkEvaluator.gen
----------------------------------------------------------------------
diff --git a/flink/src/main/java/org/apache/mrql/FlinkEvaluator.gen b/flink/src/main/java/org/apache/mrql/FlinkEvaluator.gen
index 1f2637f..eb5098a 100644
--- a/flink/src/main/java/org/apache/mrql/FlinkEvaluator.gen
+++ b/flink/src/main/java/org/apache/mrql/FlinkEvaluator.gen
@@ -854,6 +854,14 @@ public class FlinkEvaluator extends Evaluator implements Serializable {
                 IterativeDataSet<FData> startOfIteration = eval(s,env).iterate(max_num);
                 DataSet<FData> toBeFedBack = eval(b,new Environment(v.toString(),new MR_flink(startOfIteration),env));
                 return startOfIteration.closeWith(toBeFedBack);
+            case let(`v,DataSetCollect(`s),`body):
+                MRData x = evalE(s,env);
+                if (x instanceof MR_dataset)
+                    x = new Bag(((MR_dataset)x).dataset().take(Integer.MAX_VALUE));
+                else if (x instanceof Bag)
+                    ((Bag)x).materialize();
+                new_distributed_binding(v.toString(),x);
+                return eval(body,new Environment(v.toString(),x,env));
             case let(`v,`u,`body):
                 return eval(body,new Environment(v.toString(),evalE(u,null),env));
             case Let(`v,`u,`body):