You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mrql.apache.org by fe...@apache.org on 2013/09/06 22:57:28 UTC

[10/18] MRQL-16: correct source files. ASF licenses, and POMs for release

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/3de9e485/src/main/java/core/MapReduceAlgebra.java
----------------------------------------------------------------------
diff --git a/src/main/java/core/MapReduceAlgebra.java b/src/main/java/core/MapReduceAlgebra.java
index 9c35efa..bc2990e 100644
--- a/src/main/java/core/MapReduceAlgebra.java
+++ b/src/main/java/core/MapReduceAlgebra.java
@@ -27,10 +27,10 @@ final public class MapReduceAlgebra {
 
     /** eager concat-map (not used) */
     private static Bag cmap_eager ( final Function f, final Bag s ) {
-	Bag res = new Bag();
-	for ( MRData e: s )
-	    res.addAll((Bag)f.eval(e));
-	return res;
+        Bag res = new Bag();
+        for ( MRData e: s )
+            res.addAll((Bag)f.eval(e));
+        return res;
     }
 
     /** lazy concat-map (stream-based)
@@ -39,33 +39,33 @@ final public class MapReduceAlgebra {
      * @return a value of type {b}
      */
     public static Bag cmap ( final Function f, final Bag s ) {
-	final Iterator<MRData> si = s.iterator();
-	return new Bag(new BagIterator() {
-		Iterator<MRData> data = null;
-		boolean more = false;
-		public boolean hasNext () {
-		    if (data == null) {
-			while (!more && si.hasNext()) {
-			    data = ((Bag)f.eval(si.next())).iterator();
-			    more = data.hasNext();
-			}
-		    } else {
-			if (more) {
-			    more = data.hasNext();
-			    if (more)
-				return true;
-			};
-			while (!more && si.hasNext()) {
-			    data = ((Bag)f.eval(si.next())).iterator();
-			    more = data.hasNext();
-			}
-		    };
-		    return more;
-		}
-		public MRData next () {
-		    return data.next();
-		}
-	    });
+        final Iterator<MRData> si = s.iterator();
+        return new Bag(new BagIterator() {
+                Iterator<MRData> data = null;
+                boolean more = false;
+                public boolean hasNext () {
+                    if (data == null) {
+                        while (!more && si.hasNext()) {
+                            data = ((Bag)f.eval(si.next())).iterator();
+                            more = data.hasNext();
+                        }
+                    } else {
+                        if (more) {
+                            more = data.hasNext();
+                            if (more)
+                                return true;
+                        };
+                        while (!more && si.hasNext()) {
+                            data = ((Bag)f.eval(si.next())).iterator();
+                            more = data.hasNext();
+                        }
+                    };
+                    return more;
+                }
+                public MRData next () {
+                    return data.next();
+                }
+            });
     }
 
     /** lazy map
@@ -74,11 +74,11 @@ final public class MapReduceAlgebra {
      * @return a value of type {b}
      */
     public static Bag map ( final Function f, final Bag s ) {
-	final Iterator<MRData> si = s.iterator();
-	return new Bag(new BagIterator() {
-		public boolean hasNext () { return si.hasNext(); }
-		public MRData next () { return f.eval(si.next()); }
-	    });
+        final Iterator<MRData> si = s.iterator();
+        return new Bag(new BagIterator() {
+                public boolean hasNext () { return si.hasNext(); }
+                public MRData next () { return f.eval(si.next()); }
+            });
     }
 
     /** lazy filter combined with a map
@@ -88,19 +88,19 @@ final public class MapReduceAlgebra {
      * @return a value of type {b}
      */
     public static Bag filter ( final Function p, final Function f, final Bag s ) {
-	final Iterator<MRData> si = s.iterator();
-	return new Bag(new BagIterator() {
-		MRData data = null;
-		public boolean hasNext () {
-		    while (si.hasNext()) {
-			data = si.next();
-			if (((MR_bool)p.eval(data)).get())
-			    return true;
-		    };
-		    return false;
-		}
-		public MRData next () { return f.eval(data); }
-	    });
+        final Iterator<MRData> si = s.iterator();
+        return new Bag(new BagIterator() {
+                MRData data = null;
+                public boolean hasNext () {
+                    while (si.hasNext()) {
+                        data = si.next();
+                        if (((MR_bool)p.eval(data)).get())
+                            return true;
+                    };
+                    return false;
+                }
+                public MRData next () { return f.eval(data); }
+            });
     }
 
     /** strict group-by
@@ -108,70 +108,70 @@ final public class MapReduceAlgebra {
      * @return a value of type {(a,{b})}
      */
     public static Bag groupBy ( Bag s ) {
-	Bag res = new Bag();
-	s.sort();
-	MRData last = null;
-	Bag group = new Bag();
-	for ( MRData e: s) {
-	    final Tuple p = (Tuple)e;
-	    if (last != null && p.first().equals(last))
-		group.add(p.second());
-	    else {
-		if (last != null) {
-		    group.trim();
-		    res.add(new Tuple(last,group));
-		};
-		last = p.first();
-		group = new Bag();
-		group.add(p.second());
-	    }
-	};
-	if (last != null) {
-	    group.trim();
-	    res.add(new Tuple(last,group));
-	};
-	//res.trim();
-	return res;
+        Bag res = new Bag();
+        s.sort();
+        MRData last = null;
+        Bag group = new Bag();
+        for ( MRData e: s) {
+            final Tuple p = (Tuple)e;
+            if (last != null && p.first().equals(last))
+                group.add(p.second());
+            else {
+                if (last != null) {
+                    group.trim();
+                    res.add(new Tuple(last,group));
+                };
+                last = p.first();
+                group = new Bag();
+                group.add(p.second());
+            }
+        };
+        if (last != null) {
+            group.trim();
+            res.add(new Tuple(last,group));
+        };
+        //res.trim();
+        return res;
     }
 
     /** lazy group-by (not used) */
     private static Bag groupBy_lazy ( Bag s ) {
-	s.sort();
-	final Iterator<MRData> it = s.iterator();
-	return new Bag(new BagIterator() {
-		MRData last = null;
-		MRData data = null;
-		Bag group = new Bag();
-		public boolean hasNext () {
-		    while (it.hasNext()) {
-			final Tuple p = (Tuple)it.next();
-			if (last != null && p.first().equals(last))
-			    group.add(p.second());
-			else if (last != null) {
-			    group.trim();
-			    data = new Tuple(last,group);
-			    last = p.first();
-			    group = new Bag();
-			    group.add(p.second());
-			    return true;
-			} else {
-			    last = p.first();
-			    group = new Bag();
-			    group.add(p.second());
-			}
-		    };
-		    if (last != null) {
-			group.trim();
-			data = new Tuple(last,group);
-			last = null;
-			return true;
-		    };
-		    return false;
-		}
-		public MRData next () {
-		    return data;
-		}
-	    });
+        s.sort();
+        final Iterator<MRData> it = s.iterator();
+        return new Bag(new BagIterator() {
+                MRData last = null;
+                MRData data = null;
+                Bag group = new Bag();
+                public boolean hasNext () {
+                    while (it.hasNext()) {
+                        final Tuple p = (Tuple)it.next();
+                        if (last != null && p.first().equals(last))
+                            group.add(p.second());
+                        else if (last != null) {
+                            group.trim();
+                            data = new Tuple(last,group);
+                            last = p.first();
+                            group = new Bag();
+                            group.add(p.second());
+                            return true;
+                        } else {
+                            last = p.first();
+                            group = new Bag();
+                            group.add(p.second());
+                        }
+                    };
+                    if (last != null) {
+                        group.trim();
+                        data = new Tuple(last,group);
+                        last = null;
+                        return true;
+                    };
+                    return false;
+                }
+                public MRData next () {
+                    return data;
+                }
+            });
     }
 
     /** the MapReduce operation
@@ -181,27 +181,27 @@ final public class MapReduceAlgebra {
      * @return a value of type {c}
      */
     public static Bag mapReduce ( final Function m, final Function r, final Bag s ) {
-	return cmap(r,groupBy(cmap(m,s)));
+        return cmap(r,groupBy(cmap(m,s)));
     }
 
     /** Not used: use mapReduce2 instead */
     private static Bag join ( final Function kx, final Function ky, final Function f,
-			      final Bag X, final Bag Y ) {
-	return cmap(new Function() {
-		public Bag eval ( final MRData e ) {
-		    final Tuple p = (Tuple)e;
-		    return (Bag)f.eval(new Tuple(p.second(),
-						 cmap(new Function() {
-							 public Bag eval ( final MRData y ) {
-							     return (ky.eval(y).equals(p.first()))
-								     ? new Bag(y)
-								     : new Bag();
-							 } }, Y))); }
-	    },
-	    groupBy(cmap(new Function() {
-		    public Bag eval ( final MRData x ) {
-			return new Bag(new Tuple(kx.eval(x),x));
-		    } }, X)));
+                              final Bag X, final Bag Y ) {
+        return cmap(new Function() {
+                public Bag eval ( final MRData e ) {
+                    final Tuple p = (Tuple)e;
+                    return (Bag)f.eval(new Tuple(p.second(),
+                                                 cmap(new Function() {
+                                                         public Bag eval ( final MRData y ) {
+                                                             return (ky.eval(y).equals(p.first()))
+                                                                     ? new Bag(y)
+                                                                     : new Bag();
+                                                         } }, Y))); }
+            },
+            groupBy(cmap(new Function() {
+                    public Bag eval ( final MRData x ) {
+                        return new Bag(new Tuple(kx.eval(x),x));
+                    } }, X)));
     }
 
     /** A hash-based equi-join
@@ -213,24 +213,24 @@ final public class MapReduceAlgebra {
      * @return a value of type {c}
      */
     public static Bag hash_join ( final Function kx, final Function ky, final Function f,
-				  final Bag X, final Bag Y ) {
-	Hashtable<MRData,Bag> hashTable = new Hashtable<MRData,Bag>(1000);
-	for ( MRData x: X ) {
-	    MRData key = kx.eval(x);
-	    Bag old = hashTable.get(key);
-	    if (old == null)
-		hashTable.put(key,new Bag(x));
-	    else old.add(x);
-	};
-	Bag res = new Bag();
-	for ( MRData y: Y ) {
-	    MRData key = ky.eval(y);
-	    Bag match = hashTable.get(key);
-	    if (match != null)
-		for ( MRData x: match )
-		    res.add(f.eval(new Tuple(x,y)));
-	};
-	return res;
+                                  final Bag X, final Bag Y ) {
+        Hashtable<MRData,Bag> hashTable = new Hashtable<MRData,Bag>(1000);
+        for ( MRData x: X ) {
+            MRData key = kx.eval(x);
+            Bag old = hashTable.get(key);
+            if (old == null)
+                hashTable.put(key,new Bag(x));
+            else old.add(x);
+        };
+        Bag res = new Bag();
+        for ( MRData y: Y ) {
+            MRData key = ky.eval(y);
+            Bag match = hashTable.get(key);
+            if (match != null)
+                for ( MRData x: match )
+                    res.add(f.eval(new Tuple(x,y)));
+        };
+        return res;
     }
 
     /** A cross-product
@@ -242,18 +242,18 @@ final public class MapReduceAlgebra {
      * @return a value of type {c}
      */
     public static Bag crossProduct ( final Function mx, final Function my, final Function r,
-				     final Bag X, final Bag Y ) {
-	Bag a = new Bag();
-	for ( MRData y: Y )
-	    for ( MRData v: (Bag)my.eval(y) )
-		a.add(v);
-	Bag b = new Bag();
-	for ( MRData x: X )
-	    for ( MRData xx: (Bag)mx.eval(x) )
-		for ( MRData y: a )
-		    for ( MRData v: (Bag)r.eval(new Tuple(xx,y)) )
-			b.add(v);
-	return b;
+                                     final Bag X, final Bag Y ) {
+        Bag a = new Bag();
+        for ( MRData y: Y )
+            for ( MRData v: (Bag)my.eval(y) )
+                a.add(v);
+        Bag b = new Bag();
+        for ( MRData x: X )
+            for ( MRData xx: (Bag)mx.eval(x) )
+                for ( MRData y: a )
+                    for ( MRData v: (Bag)r.eval(new Tuple(xx,y)) )
+                        b.add(v);
+        return b;
     }
 
     /** A map-reduce operation with two mappers (a join)
@@ -265,65 +265,65 @@ final public class MapReduceAlgebra {
      * @return a value of type {c}
      */
     public static Bag mapReduce2 ( final Function mx,   // left mapper
-				   final Function my,   // right mapper
-				   final Function r,    // reducer
-				   final Bag X, final Bag Y ) {
-	final Bag left = cmap(new Function() {
-		public Bag eval ( final MRData x ) {
-		    return cmap(new Function() {
-			    public Bag eval ( final MRData e ) {
-				final Tuple p = (Tuple)e;
-				return new Bag(new Tuple(p.first(),
-							 new Tuple(new MR_byte(1),p.second())));
-			    } }, (Bag)mx.eval(x));
-		} }, X);
-	final Bag right = cmap(new Function() {
-		public Bag eval ( final MRData y ) {
-		    return cmap(new Function() {
-			    public Bag eval ( final MRData e ) {
-				final Tuple p = (Tuple)e;
-				return new Bag(new Tuple(p.first(),
-							 new Tuple(new MR_byte(2),p.second())));
-			    } }, (Bag)my.eval(y));
-		} }, Y);
-	final Iterator<MRData> li = left.iterator();
-	final Iterator<MRData> ri = right.iterator();
-	final Bag mix = new Bag(new BagIterator () {
-		MRData data;
-		public boolean hasNext () {
-		    if (li.hasNext()) {
-			data = li.next();
-			return true;
-		    } else if (ri.hasNext()) {
-			data = ri.next();
-			return true;
-		    } else return false;
-		}
-		public MRData next () {
-		    return data;
-		}
-	    });
-	return cmap(new Function() {
-		public Bag eval ( final MRData e ) {
-		    final Tuple p = (Tuple)e;
-		    final Bag xs = cmap(new Function() {
-			    public Bag eval ( final MRData e ) {
-				final Tuple q = (Tuple)e;
-				return (((MR_byte)q.first()).get() == 1)
-				        ? new Bag(q.second())
-				        : new Bag();
-			    } }, (Bag)p.second());
-		    final Bag ys = cmap(new Function() {
-			    public Bag eval ( final MRData e ) {
-				final Tuple q = (Tuple)e;
-				return (((MR_byte)q.first()).get() == 2)
-				        ? new Bag(q.second())
-				        : new Bag();
-			    } }, (Bag)p.second());
-		    xs.materialize();
-		    ys.materialize();
-		    return (Bag)r.eval(new Tuple(xs,ys));
-		} }, groupBy(mix));
+                                   final Function my,   // right mapper
+                                   final Function r,    // reducer
+                                   final Bag X, final Bag Y ) {
+        final Bag left = cmap(new Function() {
+                public Bag eval ( final MRData x ) {
+                    return cmap(new Function() {
+                            public Bag eval ( final MRData e ) {
+                                final Tuple p = (Tuple)e;
+                                return new Bag(new Tuple(p.first(),
+                                                         new Tuple(new MR_byte(1),p.second())));
+                            } }, (Bag)mx.eval(x));
+                } }, X);
+        final Bag right = cmap(new Function() {
+                public Bag eval ( final MRData y ) {
+                    return cmap(new Function() {
+                            public Bag eval ( final MRData e ) {
+                                final Tuple p = (Tuple)e;
+                                return new Bag(new Tuple(p.first(),
+                                                         new Tuple(new MR_byte(2),p.second())));
+                            } }, (Bag)my.eval(y));
+                } }, Y);
+        final Iterator<MRData> li = left.iterator();
+        final Iterator<MRData> ri = right.iterator();
+        final Bag mix = new Bag(new BagIterator () {
+                MRData data;
+                public boolean hasNext () {
+                    if (li.hasNext()) {
+                        data = li.next();
+                        return true;
+                    } else if (ri.hasNext()) {
+                        data = ri.next();
+                        return true;
+                    } else return false;
+                }
+                public MRData next () {
+                    return data;
+                }
+            });
+        return cmap(new Function() {
+                public Bag eval ( final MRData e ) {
+                    final Tuple p = (Tuple)e;
+                    final Bag xs = cmap(new Function() {
+                            public Bag eval ( final MRData e ) {
+                                final Tuple q = (Tuple)e;
+                                return (((MR_byte)q.first()).get() == 1)
+                                        ? new Bag(q.second())
+                                        : new Bag();
+                            } }, (Bag)p.second());
+                    final Bag ys = cmap(new Function() {
+                            public Bag eval ( final MRData e ) {
+                                final Tuple q = (Tuple)e;
+                                return (((MR_byte)q.first()).get() == 2)
+                                        ? new Bag(q.second())
+                                        : new Bag();
+                            } }, (Bag)p.second());
+                    xs.materialize();
+                    ys.materialize();
+                    return (Bag)r.eval(new Tuple(xs,ys));
+                } }, groupBy(mix));
     }
 
     /** The fragment-replicate join (map-side join)
@@ -335,23 +335,23 @@ final public class MapReduceAlgebra {
      * @return a value of type {c}
      */
     public static Bag mapJoin ( final Function kx, final Function ky, final Function r,
-				final Bag X, final Bag Y ) {
-	X.materialize();
-	Y.materialize();
-	return cmap(new Function() {
-		public Bag eval ( final MRData e ) {
-		    final Tuple p = (Tuple)e;
-		    return cmap(new Function() {
-			            public Bag eval ( final MRData x ) {
-					return (kx.eval(x).equals(p.first()))
-					    ? (Bag)r.eval(new Tuple(x,p.second()))
-					    : new Bag();
-				    } }, X); }
-	    },
-	    groupBy(cmap(new Function() {
-		    public Bag eval ( final MRData y ) {
-			return new Bag(new Tuple(ky.eval(y),y));
-		    } }, Y)));
+                                final Bag X, final Bag Y ) {
+        X.materialize();
+        Y.materialize();
+        return cmap(new Function() {
+                public Bag eval ( final MRData e ) {
+                    final Tuple p = (Tuple)e;
+                    return cmap(new Function() {
+                                    public Bag eval ( final MRData x ) {
+                                        return (kx.eval(x).equals(p.first()))
+                                            ? (Bag)r.eval(new Tuple(x,p.second()))
+                                            : new Bag();
+                                    } }, X); }
+            },
+            groupBy(cmap(new Function() {
+                    public Bag eval ( final MRData y ) {
+                        return new Bag(new Tuple(ky.eval(y),y));
+                    } }, Y)));
     }
 
     /** An equi-join combined with a group-by (see GroupByJoinPlan)
@@ -367,40 +367,40 @@ final public class MapReduceAlgebra {
      * @return a value of type {e}
      */
     public static Bag groupByJoin ( final Function kx, final Function ky,
-				    final Function gx, final Function gy,
-				    final Function m, final Function c, final Function r,
-				    final Bag X, final Bag Y ) {
-	Bag s = groupBy(hash_join(kx,ky,
-				  new Function() {
-				      public MRData eval ( final MRData e ) {
-					  Tuple t = (Tuple)e;
-					  return new Tuple(new Tuple(gx.eval(t.first()),gy.eval(t.second())),t);
-				      } },
-				  X,Y));
-	Bag res = new Bag();
-	for ( MRData z: s ) {
-	    Tuple t = (Tuple)z;
-	    for ( MRData n: (Bag)r.eval(new Tuple(t.first(),c.eval(new Tuple(t.first(),cmap(m,(Bag)t.second()))))) )
-		res.add(n);
-	};
-	return res;
+                                    final Function gx, final Function gy,
+                                    final Function m, final Function c, final Function r,
+                                    final Bag X, final Bag Y ) {
+        Bag s = groupBy(hash_join(kx,ky,
+                                  new Function() {
+                                      public MRData eval ( final MRData e ) {
+                                          Tuple t = (Tuple)e;
+                                          return new Tuple(new Tuple(gx.eval(t.first()),gy.eval(t.second())),t);
+                                      } },
+                                  X,Y));
+        Bag res = new Bag();
+        for ( MRData z: s ) {
+            Tuple t = (Tuple)z;
+            for ( MRData n: (Bag)r.eval(new Tuple(t.first(),c.eval(new Tuple(t.first(),cmap(m,(Bag)t.second()))))) )
+                res.add(n);
+        };
+        return res;
     }
 
     private static void flush_table ( Hashtable<MRData,MRData> hashTable, Function r, Bag result ) throws IOException {
-	Bag tbag = new Bag(2);
-	Tuple pair = new Tuple(2);
-	Enumeration<MRData> en = hashTable.keys();
-	while (en.hasMoreElements()) {
-	    MRData key = en.nextElement();
-	    MRData value = hashTable.get(key);
-	    pair.set(0,key);
-	    tbag.clear();
-	    tbag.add_element(value);
-	    pair.set(1,tbag);
-	    for ( MRData e: (Bag)r.eval(pair) )
-		result.add(e);
-	};
-	hashTable.clear();
+        Bag tbag = new Bag(2);
+        Tuple pair = new Tuple(2);
+        Enumeration<MRData> en = hashTable.keys();
+        while (en.hasMoreElements()) {
+            MRData key = en.nextElement();
+            MRData value = hashTable.get(key);
+            pair.set(0,key);
+            tbag.clear();
+            tbag.add_element(value);
+            pair.set(1,tbag);
+            for ( MRData e: (Bag)r.eval(pair) )
+                result.add(e);
+        };
+        hashTable.clear();
     }
 
     /** An equi-join combined with a group-by implemented using hashing
@@ -416,68 +416,68 @@ final public class MapReduceAlgebra {
      * @return a value of type {e}
      */
     public static Bag mergeGroupByJoin ( final Function kx, final Function ky,
-					 final Function gx, final Function gy,
-					 final Function m, final Function c, final Function r,
-					 final Bag X, final Bag Y ) {
-	try {
-	    Bag tbag = new Bag(2);
-	    Tuple pair = new Tuple(2);
-	    Hashtable<MRData,MRData> hashTable = new Hashtable<MRData,MRData>(1000);
-	    Bag xs = groupBy(map(new Function() {
-		    public MRData eval ( final MRData e ) {
-			Tuple t = (Tuple)e;
-			return new Tuple(new Tuple(t.first(),kx.eval(t.second())),t.second());
-		    } }, X));
-	    Bag ys = groupBy(map(new Function() {
-		    public MRData eval ( final MRData e ) {
-			Tuple t = (Tuple)e;
-			return new Tuple(new Tuple(t.first(),ky.eval(t.second())),t.second());
-		    } }, Y));
-	    Bag res = new Bag();
-	    Iterator<MRData> xi = xs.iterator();
-	    Iterator<MRData> yi = ys.iterator();
-	    if ( !xi.hasNext() || !yi.hasNext() )
-		return res;
-	    Tuple x = (Tuple)xi.next();
-	    Tuple y = (Tuple)yi.next();
-	    MRData partition = null;
-	    while ( xi.hasNext() && yi.hasNext() ) {
-		int cmp = x.first().compareTo(y.first());
-		if (cmp < 0) { x = (Tuple)xi.next(); continue; };
-		if (cmp > 0) { y = (Tuple)yi.next(); continue; };
-		if (partition == null)
-		    partition = ((Tuple)x.first()).first();
-		else if (!partition.equals(((Tuple)x.first()).first())) {
-		    partition = ((Tuple)x.first()).first();
-		    flush_table(hashTable,r,res);
-		};
-		for ( MRData xx: (Bag)x.second() )
-		    for ( MRData yy: (Bag)y.second() ) {
-			Tuple key = new Tuple(gx.eval(xx),gy.eval(yy));
-			Tuple value = new Tuple(xx,yy);
-			MRData old = hashTable.get(key);
-			pair.set(0,key);
-			for ( MRData e: (Bag)m.eval(value) )
-			    if (old == null)
-				hashTable.put(key,e);
-			    else {
-				tbag.clear();
-				tbag.add_element(e).add_element(old);
-				pair.set(1,tbag);
-				for ( MRData z: (Bag)c.eval(pair) )
-				    hashTable.put(key,z);  // normally, done once
-			    }
-		    };
-		if (xi.hasNext())
-		    x = (Tuple)xi.next();
-		if (yi.hasNext())
-		    y = (Tuple)yi.next();
-	    };
-	    flush_table(hashTable,r,res);
-	    return res;
-	} catch (IOException ex) {
-	    throw new Error(ex);
-	}
+                                         final Function gx, final Function gy,
+                                         final Function m, final Function c, final Function r,
+                                         final Bag X, final Bag Y ) {
+        try {
+            Bag tbag = new Bag(2);
+            Tuple pair = new Tuple(2);
+            Hashtable<MRData,MRData> hashTable = new Hashtable<MRData,MRData>(1000);
+            Bag xs = groupBy(map(new Function() {
+                    public MRData eval ( final MRData e ) {
+                        Tuple t = (Tuple)e;
+                        return new Tuple(new Tuple(t.first(),kx.eval(t.second())),t.second());
+                    } }, X));
+            Bag ys = groupBy(map(new Function() {
+                    public MRData eval ( final MRData e ) {
+                        Tuple t = (Tuple)e;
+                        return new Tuple(new Tuple(t.first(),ky.eval(t.second())),t.second());
+                    } }, Y));
+            Bag res = new Bag();
+            Iterator<MRData> xi = xs.iterator();
+            Iterator<MRData> yi = ys.iterator();
+            if ( !xi.hasNext() || !yi.hasNext() )
+                return res;
+            Tuple x = (Tuple)xi.next();
+            Tuple y = (Tuple)yi.next();
+            MRData partition = null;
+            while ( xi.hasNext() && yi.hasNext() ) {
+                int cmp = x.first().compareTo(y.first());
+                if (cmp < 0) { x = (Tuple)xi.next(); continue; };
+                if (cmp > 0) { y = (Tuple)yi.next(); continue; };
+                if (partition == null)
+                    partition = ((Tuple)x.first()).first();
+                else if (!partition.equals(((Tuple)x.first()).first())) {
+                    partition = ((Tuple)x.first()).first();
+                    flush_table(hashTable,r,res);
+                };
+                for ( MRData xx: (Bag)x.second() )
+                    for ( MRData yy: (Bag)y.second() ) {
+                        Tuple key = new Tuple(gx.eval(xx),gy.eval(yy));
+                        Tuple value = new Tuple(xx,yy);
+                        MRData old = hashTable.get(key);
+                        pair.set(0,key);
+                        for ( MRData e: (Bag)m.eval(value) )
+                            if (old == null)
+                                hashTable.put(key,e);
+                            else {
+                                tbag.clear();
+                                tbag.add_element(e).add_element(old);
+                                pair.set(1,tbag);
+                                for ( MRData z: (Bag)c.eval(pair) )
+                                    hashTable.put(key,z);  // normally, done once
+                            }
+                    };
+                if (xi.hasNext())
+                    x = (Tuple)xi.next();
+                if (yi.hasNext())
+                    y = (Tuple)yi.next();
+            };
+            flush_table(hashTable,r,res);
+            return res;
+        } catch (IOException ex) {
+            throw new Error(ex);
+        }
     }
 
     /** repeat the loop until all termination conditions are true or until we reach the max num of steps
@@ -487,34 +487,34 @@ final public class MapReduceAlgebra {
      * @return a value of type {a}
      */
     public static Bag repeat ( final Function loop,
-			       final Bag init,
-			       final int max_num ) throws Exception {
-	boolean cont;
-	int i = 0;
-	Bag s = init;
-	s.materializeAll();
-	do {
-	    MRData d = loop.eval(s);
-	    i++;
-	    cont = false;
-	    if (d instanceof Bag) {
-		Bag bag = (Bag) d;
-		bag.materialize();
-		s.clear();
-		for ( MRData x: bag ) {
-		    Tuple t = (Tuple)x;
-		    cont |= ((MR_bool)t.second()).get();
-		    s.add(t.first());
-		}
-	    } else if (d instanceof MR_dataset) {
-		DataSet ds = ((MR_dataset)d).dataset();
-		if (ds.counter != 0)
-		    cont = true;
-		System.err.println("*** Repeat #"+i+": "+ds.counter+" true results");
-		s = Plan.collect(ds);
-	    } else throw new Error("Wrong repeat");
-	} while (cont && i <= max_num);
-	return s;
+                               final Bag init,
+                               final int max_num ) throws Exception {
+        boolean cont;
+        int i = 0;
+        Bag s = init;
+        s.materializeAll();
+        do {
+            MRData d = loop.eval(s);
+            i++;
+            cont = false;
+            if (d instanceof Bag) {
+                Bag bag = (Bag) d;
+                bag.materialize();
+                s.clear();
+                for ( MRData x: bag ) {
+                    Tuple t = (Tuple)x;
+                    cont |= ((MR_bool)t.second()).get();
+                    s.add(t.first());
+                }
+            } else if (d instanceof MR_dataset) {
+                DataSet ds = ((MR_dataset)d).dataset();
+                if (ds.counter != 0)
+                    cont = true;
+                System.err.println("*** Repeat #"+i+": "+ds.counter+" true results");
+                s = Plan.collect(ds);
+            } else throw new Error("Wrong repeat");
+        } while (cont && i <= max_num);
+        return s;
     }
 
     /** transitive closure: repeat the loop until the new set is equal to the previous set
@@ -525,30 +525,30 @@ final public class MapReduceAlgebra {
      * @return a value of type {a}
      */
     public static Bag closure ( final Function loop,
-				final Bag init,
-				final int max_num ) throws Exception {
-	int i = 0;
-	long n = 0;
-	long old = 0;
-	Bag s = init;
-	s.materializeAll();
-	do {
-	    MRData d = loop.eval(s);
-	    i++;
-	    if (d instanceof Bag) {
-		s = (Bag)d;
-		s.materialize();
-		old = n;
-		n = s.size();
-	    } else if (d instanceof MR_dataset) {
-		DataSet ds = ((MR_dataset)d).dataset();
-		System.err.println("*** Repeat #"+i+": "+(ds.records-n)+" new records");
-		old = n;
-		n = ds.records;
-		s = Plan.collect(ds);
-	    } else throw new Error("Wrong repeat");
-	} while (old < n && i <= max_num);
-	return s;
+                                final Bag init,
+                                final int max_num ) throws Exception {
+        int i = 0;
+        long n = 0;
+        long old = 0;
+        Bag s = init;
+        s.materializeAll();
+        do {
+            MRData d = loop.eval(s);
+            i++;
+            if (d instanceof Bag) {
+                s = (Bag)d;
+                s.materialize();
+                old = n;
+                n = s.size();
+            } else if (d instanceof MR_dataset) {
+                DataSet ds = ((MR_dataset)d).dataset();
+                System.err.println("*** Repeat #"+i+": "+(ds.records-n)+" new records");
+                old = n;
+                n = ds.records;
+                s = Plan.collect(ds);
+            } else throw new Error("Wrong repeat");
+        } while (old < n && i <= max_num);
+        return s;
     }
 
     /** parse a text document using a given parser
@@ -558,35 +558,35 @@ final public class MapReduceAlgebra {
      * @return a lazy bag that contains the parsed data
      */
     public static Bag parsedSource ( final Parser parser,
-				     final String file,
-				     Trees args ) {
-	try {
-	    parser.initialize(args);
-	    parser.open(file);
-	    return new Bag(new BagIterator() {
-		    Iterator<MRData> result = null;
-		    MRData data;
-		    public boolean hasNext () {
-			try {
-			    while (result == null || !result.hasNext()) {
-				String s = parser.slice();
-				if (s == null)
-				    return false;
-				result = parser.parse(s).iterator();
-			    };
-			    data = (MRData)result.next();
-			    return true;
-			} catch (Exception e) {
-			    throw new Error(e);
-			}
-		    }
-		    public MRData next () {
-			return data;
-		    }
-		});
-	} catch (Exception e) {
-	    throw new Error(e);
-	}
+                                     final String file,
+                                     Trees args ) {
+        try {
+            parser.initialize(args);
+            parser.open(file);
+            return new Bag(new BagIterator() {
+                    Iterator<MRData> result = null;
+                    MRData data;
+                    public boolean hasNext () {
+                        try {
+                            while (result == null || !result.hasNext()) {
+                                String s = parser.slice();
+                                if (s == null)
+                                    return false;
+                                result = parser.parse(s).iterator();
+                            };
+                            data = (MRData)result.next();
+                            return true;
+                        } catch (Exception e) {
+                            throw new Error(e);
+                        }
+                    }
+                    public MRData next () {
+                        return data;
+                    }
+                });
+        } catch (Exception e) {
+            throw new Error(e);
+        }
     }
 
     /** parse a text document using a given parser
@@ -596,15 +596,15 @@ final public class MapReduceAlgebra {
      * @return a lazy bag that contains the parsed data
      */
     public static Bag parsedSource ( String parser, String file, Trees args ) {
-	try {
-	    return parsedSource(DataSource.parserDirectory.get(parser).newInstance(),file,args);
-	} catch (Exception e) {
-	    throw new Error(e);
-	}
+        try {
+            return parsedSource(DataSource.parserDirectory.get(parser).newInstance(),file,args);
+        } catch (Exception e) {
+            throw new Error(e);
+        }
     }
 
     private static Bag add_source_num ( int source_num, Bag input ) {
-	return new Bag(new Tuple(new MR_int(source_num),input));
+        return new Bag(new Tuple(new MR_int(source_num),input));
     }
 
     /** parse a text document using a given parser and tag output data with a source num
@@ -615,10 +615,10 @@ final public class MapReduceAlgebra {
      * @return a lazy bag that contains the parsed data taged with the source id
      */
     public static Bag parsedSource ( int source_num,
-				     Parser parser,
-				     String file,
-				     Trees args ) {
-	return add_source_num(source_num,parsedSource(parser,file,args));
+                                     Parser parser,
+                                     String file,
+                                     Trees args ) {
+        return add_source_num(source_num,parsedSource(parser,file,args));
     }
 
     /** parse a text document using a given parser and tag output data with a source num
@@ -629,11 +629,11 @@ final public class MapReduceAlgebra {
      * @return a lazy bag that contains the parsed data taged with the source id
      */
     public static Bag parsedSource ( int source_num, String parser, String file, Trees args ) {
-	try {
-	    return parsedSource(source_num,DataSource.parserDirectory.get(parser).newInstance(),file,args);
-	} catch (Exception e) {
-	    throw new Error(e);
-	}
+        try {
+            return parsedSource(source_num,DataSource.parserDirectory.get(parser).newInstance(),file,args);
+        } catch (Exception e) {
+            throw new Error(e);
+        }
     }
 
     /** aggregate the Bag elements
@@ -643,82 +643,82 @@ final public class MapReduceAlgebra {
      * @return a value of type b
      */
     public static MRData aggregate ( final Function accumulator,
-				     final MRData zero,
-				     final Bag s ) {
-	MRData result = zero;
-	for ( MRData x: s )
-	    result = accumulator.eval(new Tuple(result,x));
-	return result;
+                                     final MRData zero,
+                                     final Bag s ) {
+        MRData result = zero;
+        for ( MRData x: s )
+            result = accumulator.eval(new Tuple(result,x));
+        return result;
     }
 
     public static MRData materialize ( MRData x ) {
-	if (x instanceof Bag)
-	    ((Bag)x).materialize();
-	return x;
+        if (x instanceof Bag)
+            ((Bag)x).materialize();
+        return x;
     }
 
     /** Dump the value of some type to a binary local file;
      *  The type is dumped to a separate file.type
      */
     public static void dump ( String file, Tree type, MRData value ) throws IOException {
-	PrintStream ftp = new PrintStream(file+".type");
-	ftp.print("1@"+type.toString()+"\n");
-	ftp.close();
-	DataOutputStream out = new DataOutputStream(new FileOutputStream(new File(file)));
-	value.write(out);
-	out.close();
+        PrintStream ftp = new PrintStream(file+".type");
+        ftp.print("1@"+type.toString()+"\n");
+        ftp.close();
+        DataOutputStream out = new DataOutputStream(new FileOutputStream(new File(file)));
+        value.write(out);
+        out.close();
     }
 
     /** return the type of the dumped binary local file from file.type */
     public static Tree get_type ( String file ) {
-	try {
-	    BufferedReader ftp = new BufferedReader(new FileReader(new File(file+".type")));
-	    String s[] = ftp.readLine().split("@");
-	    ftp.close();
-	    if (s.length != 2)
-		return null;
-	    if (!s[0].equals("1"))
-		throw new Error("The binary file has been created in hadoop mode and cannot be read in java mode");
-	    return Tree.parse(s[1]);
-	} catch (Exception e) {
-	    return null;
-	}
+        try {
+            BufferedReader ftp = new BufferedReader(new FileReader(new File(file+".type")));
+            String s[] = ftp.readLine().split("@");
+            ftp.close();
+            if (s.length != 2)
+                return null;
+            if (!s[0].equals("1"))
+                throw new Error("The binary file has been created in hadoop mode and cannot be read in java mode");
+            return Tree.parse(s[1]);
+        } catch (Exception e) {
+            return null;
+        }
     }
 
     /** read the contents of a dumped local binary file */
     public static MRData read_binary ( String file ) {
-	try {
-	    Tree type = get_type(file);
-	    DataInputStream in = new DataInputStream(new FileInputStream(new File(file)));
-	    return MRContainer.read(in);
-	} catch (Exception e) {
-	    return null;
-	} 
+        try {
+            Tree type = get_type(file);
+            DataInputStream in = new DataInputStream(new FileInputStream(new File(file)));
+            return MRContainer.read(in);
+        } catch (Exception e) {
+            return null;
+        } 
     }
 
     /** read the contents of a dumped local binary file and tag data with a source num */
     public static Bag read_binary ( int source_num, String file ) {
-	return add_source_num(source_num,(Bag)read_binary(file));
+        return add_source_num(source_num,(Bag)read_binary(file));
     }
 
     /** generate a lazy bag of long numbers {min...max} */
     public static Bag generator ( final long min, final long max ) {
-	if (min > max)
-	    throw new Error("Min value ("+min+") is larger than max ("+max+") in generator");
-	return new Bag(new BagIterator() {
-		long index = min;
-		public boolean hasNext () {
-		    return index <= max;
-		}
-		public MRData next () {
-		    return new MR_long(index++);
-		}
-	    });
+        if (min > max)
+            throw new Error("Min value ("+min+") is larger than max ("+max+") in generator");
+        return new Bag(new BagIterator() {
+                long index = min;
+                public boolean hasNext () {
+                    return index <= max;
+                }
+                public MRData next () {
+                    return new MR_long(index++);
+                }
+            });
     }
 
     /** generate a lazy bag of long numbers {min...max} and tag each lon number with a source num */
     public static Bag generator ( int source_num, final long min, final long max ) {
-	return add_source_num(source_num,generator(min,max));
+        return add_source_num(source_num,generator(min,max));
     }
 
     /** the cache that holds all local data in memory */
@@ -726,15 +726,15 @@ final public class MapReduceAlgebra {
 
     /** return the cache element at location loc */
     public static MRData getCache ( int loc ) {
-	return cache.get(loc);
+        return cache.get(loc);
     }
 
     /** set the cache element at location loc to value and return ret */
     public static MRData setCache ( int loc, MRData value, MRData ret ) {
-	if (value instanceof Bag)
-	    materialize((Bag)value);
-	cache.set(loc,value);
-	return ret;
+        if (value instanceof Bag)
+            materialize((Bag)value);
+        cache.set(loc,value);
+        return ret;
     }
 
     /** The BSP operation
@@ -746,70 +746,70 @@ final public class MapReduceAlgebra {
      * @return return a Bag in cache[0]
      */
     public static MRData BSP ( final int[] source,
-			       final Function superstep,
-			       final MRData init_state,
-			       boolean order,
-			       final Bag[] inputs ) {
-	Bag msgs = new Bag();
-	MRData state = init_state;
-	Tuple result;
-	boolean exit;
-	boolean skip = false;
-	String tabs = "";
-	int step = 0;
-	cache = new Tuple(100);
-	for ( int i = 0; i < 100; i++ )
-	    cache.set(i,new Bag());
-	for ( Bag x: inputs ) {
-	    Tuple p = (Tuple)(x.get(0));
-	    cache.set(((MR_int)p.first()).get(),
-		      materialize(p.second()));
-	};
-	do {
-	    if (!skip)
-		step++;
-	    if (!skip && Config.trace_execution) {
-		tabs = Interpreter.tabs(Interpreter.tab_count);
-		System.out.println(tabs+"  Superstep "+step+":");
-		System.out.println(tabs+"      messages: "+msgs);
-		System.out.println(tabs+"      state: "+state);
-		for ( int i = 0; i < cache.size(); i++)
-		    if (cache.get(i) instanceof Bag && ((Bag)cache.get(i)).size() > 0)
-			System.out.println(tabs+"      cache "+i+": "+cache.get(i));
-	    };
-	    result = (Tuple)superstep.eval(new Tuple(cache,msgs,state,new MR_string("")));
-	    Bag new_msgs = (Bag)result.get(0);
-	    state = result.get(1);
-	    exit = ((MR_bool)result.get(2)).get();
-	    skip = new_msgs == SystemFunctions.bsp_empty_bag;
-	    if ((!skip || exit) && Config.trace_execution)
-		System.out.println(tabs+"      result: "+result);
-	    final Iterator<MRData> iter = new_msgs.iterator();
-	    msgs = new Bag(new BagIterator() {
-		    public boolean hasNext () {
-			return iter.hasNext();
-		    }
-		    public MRData next () {
-			return ((Tuple)iter.next()).get(1);
-		    }
-		});
-	} while (!exit);
-	MRData[] data = new MRData[source.length];
-	for ( int i = 0; i < data.length; i++ )
-	    data[i] = getCache(source[i]);
-	if (order && data[0] instanceof Bag) {
-	    final Iterator<MRData> iter = ((Bag)data[0]).iterator();
-	    return new Bag(new BagIterator() {
-		    public boolean hasNext () {
-			return iter.hasNext();
-		    }
-		    public MRData next () {
-			return ((Tuple)iter.next()).get(0);
-		    }
-		});
-	};
-	if (data.length == 1)
-	    return data[0];
-	else return new Tuple(data);
+                               final Function superstep,
+                               final MRData init_state,
+                               boolean order,
+                               final Bag[] inputs ) {
+        Bag msgs = new Bag();
+        MRData state = init_state;
+        Tuple result;
+        boolean exit;
+        boolean skip = false;
+        String tabs = "";
+        int step = 0;
+        cache = new Tuple(100);
+        for ( int i = 0; i < 100; i++ )
+            cache.set(i,new Bag());
+        for ( Bag x: inputs ) {
+            Tuple p = (Tuple)(x.get(0));
+            cache.set(((MR_int)p.first()).get(),
+                      materialize(p.second()));
+        };
+        do {
+            if (!skip)
+                step++;
+            if (!skip && Config.trace_execution) {
+                tabs = Interpreter.tabs(Interpreter.tab_count);
+                System.out.println(tabs+"  Superstep "+step+":");
+                System.out.println(tabs+"      messages: "+msgs);
+                System.out.println(tabs+"      state: "+state);
+                for ( int i = 0; i < cache.size(); i++)
+                    if (cache.get(i) instanceof Bag && ((Bag)cache.get(i)).size() > 0)
+                        System.out.println(tabs+"      cache "+i+": "+cache.get(i));
+            };
+            result = (Tuple)superstep.eval(new Tuple(cache,msgs,state,new MR_string("")));
+            Bag new_msgs = (Bag)result.get(0);
+            state = result.get(1);
+            exit = ((MR_bool)result.get(2)).get();
+            skip = new_msgs == SystemFunctions.bsp_empty_bag;
+            if ((!skip || exit) && Config.trace_execution)
+                System.out.println(tabs+"      result: "+result);
+            final Iterator<MRData> iter = new_msgs.iterator();
+            msgs = new Bag(new BagIterator() {
+                    public boolean hasNext () {
+                        return iter.hasNext();
+                    }
+                    public MRData next () {
+                        return ((Tuple)iter.next()).get(1);
+                    }
+                });
+        } while (!exit);
+        MRData[] data = new MRData[source.length];
+        for ( int i = 0; i < data.length; i++ )
+            data[i] = getCache(source[i]);
+        if (order && data[0] instanceof Bag) {
+            final Iterator<MRData> iter = ((Bag)data[0]).iterator();
+            return new Bag(new BagIterator() {
+                    public boolean hasNext () {
+                        return iter.hasNext();
+                    }
+                    public MRData next () {
+                        return ((Tuple)iter.next()).get(0);
+                    }
+                });
+        };
+        if (data.length == 1)
+            return data[0];
+        else return new Tuple(data);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/3de9e485/src/main/java/core/Materialization.gen
----------------------------------------------------------------------
diff --git a/src/main/java/core/Materialization.gen b/src/main/java/core/Materialization.gen
index 738dfa6..5683699 100644
--- a/src/main/java/core/Materialization.gen
+++ b/src/main/java/core/Materialization.gen
@@ -35,100 +35,100 @@ final public class Materialization extends Translator {
 
     // is this a direct-access term? (not the results of a bulk operation)
     private static boolean access_variable ( Tree e ) {
-	match e {
-	case nth(`x,_):
-	    return access_variable(x);
-	case union_value(`x):
-	    return access_variable(x);
-	case index(`x,`n):
-	    return access_variable(x);
-	case `v:
-	    if (v.is_variable())
-		return true;
-	};
-	return false;
+        match e {
+        case nth(`x,_):
+            return access_variable(x);
+        case union_value(`x):
+            return access_variable(x);
+        case index(`x,`n):
+            return access_variable(x);
+        case `v:
+            if (v.is_variable())
+                return true;
+        };
+        return false;
     }
 
     private static Domains new_domain ( Trees vars, Tree e, Domains d ) {
-	if (!access_variable(e))
-	    return materialize(vars,e,d);
-	Domains nd = new Domains(d.domains,d.repeats);
-	if ((d.domains.member(e) || !free_variables(e,vars).is_empty())
-	    && !d.repeats.member(e))
-	    nd.repeats = nd.repeats.cons(e);
-	nd.domains = nd.domains.cons(e);
-	return nd;
+        if (!access_variable(e))
+            return materialize(vars,e,d);
+        Domains nd = new Domains(d.domains,d.repeats);
+        if ((d.domains.member(e) || !free_variables(e,vars).is_empty())
+            && !d.repeats.member(e))
+            nd.repeats = nd.repeats.cons(e);
+        nd.domains = nd.domains.cons(e);
+        return nd;
     }
 
     private static Domains union ( Domains xd, Domains yd ) {
-	Domains nd = new Domains(xd.domains,xd.repeats);
-	for ( Tree y: yd.domains )
-	    if (!nd.domains.member(y))
-		nd.domains = nd.domains.cons(y);
-	for ( Tree y: yd.repeats )
-	    if (!nd.repeats.member(y))
-		nd.repeats = nd.repeats.cons(y);
-	return nd;
+        Domains nd = new Domains(xd.domains,xd.repeats);
+        for ( Tree y: yd.domains )
+            if (!nd.domains.member(y))
+                nd.domains = nd.domains.cons(y);
+        for ( Tree y: yd.repeats )
+            if (!nd.repeats.member(y))
+                nd.repeats = nd.repeats.cons(y);
+        return nd;
     }
 
     final static int unionM = ClassImporter.find_method_number("plus",#[bag(any),bag(any)]);
 
     private static Domains materialize ( Trees vars, Tree e, Domains d ) {
-	match e {
-	case lambda(`v,`b):
-	    return materialize(#[`v],b,d);
-	case cmap(lambda(`v,`b),`s):
-	    return materialize(#[`v],b,new_domain(vars,s,d));
-	case map(lambda(`v,`b),`s):
-	    return materialize(#[`v],b,new_domain(vars,s,d));
-	case filter(lambda(`v1,`b1),lambda(`v2,`b2),`s):
-	    return materialize(#[`v1],b1,materialize(#[`v2],b2,new_domain(vars,s,d)));
-	case aggregate(lambda(`v,`b),`z,`s):
-	    return materialize(#[`v],b,new_domain(vars,s,d));
-	case groupBy(`s):
-	    return new_domain(vars,s,d);
-	case orderBy(`s):
-	    return new_domain(vars,s,d);
-	case mapReduce2(lambda(`v1,`b1),lambda(`v2,`b2),lambda(`v,`b),`x,`y,`o):
-	    return materialize(#[`v],b,materialize(#[`v1],b1,materialize(#[`v2],b2,
-					 new_domain(vars,x,new_domain(vars,y,d)))));
-	case join(lambda(`v1,`b1),lambda(`v2,`b2),lambda(`v,`b),`x,`y):
-	    return materialize(#[`v],b,materialize(#[`v1],b1,materialize(#[`v2],b2,
-					 new_domain(vars,x,new_domain(vars,y,d)))));
-	case crossProduct(lambda(`v1,`b1),lambda(`v2,`b2),lambda(`v,`b),`x,`y):
-	    return materialize(#[`v],b,materialize(#[`v1],b1,materialize(#[`v2],b2,
-					 new_domain(vars,x,new_domain(vars,y,d)))));
-	case let(`v,`x,`y):
-	    Domains nd = materialize(vars.cons(v),y,materialize(vars,x,d));
-	    Trees zs = #[];
-	    for ( Tree z: nd.repeats )
-		if (!v.equals(z))
-		    zs = zs.cons(z);
-	    nd.repeats = zs;
-	    return nd;
-	case if(`p,`x,`y):
-	    Domains nd = materialize(vars,p,d);
-	    return union(materialize(vars,x,nd),
-			 materialize(vars,y,nd));
-	case callM(union,_,`x,`y):
-	    return new_domain(vars,x,new_domain(vars,y,d));
-	case callM(_,`k,`x,`y):
-	    if (((LongLeaf)k).value() != unionM)
-		fail;
-	    return new_domain(vars,x,new_domain(vars,y,d));
-	case `f(...as):
-	    Domains nd = new Domains(d.domains,d.repeats);
-	    for ( Tree a: as )
-		nd = materialize(vars,a,nd);
-	    return nd;
-	};
-	return d;
+        match e {
+        case lambda(`v,`b):
+            return materialize(#[`v],b,d);
+        case cmap(lambda(`v,`b),`s):
+            return materialize(#[`v],b,new_domain(vars,s,d));
+        case map(lambda(`v,`b),`s):
+            return materialize(#[`v],b,new_domain(vars,s,d));
+        case filter(lambda(`v1,`b1),lambda(`v2,`b2),`s):
+            return materialize(#[`v1],b1,materialize(#[`v2],b2,new_domain(vars,s,d)));
+        case aggregate(lambda(`v,`b),`z,`s):
+            return materialize(#[`v],b,new_domain(vars,s,d));
+        case groupBy(`s):
+            return new_domain(vars,s,d);
+        case orderBy(`s):
+            return new_domain(vars,s,d);
+        case mapReduce2(lambda(`v1,`b1),lambda(`v2,`b2),lambda(`v,`b),`x,`y,`o):
+            return materialize(#[`v],b,materialize(#[`v1],b1,materialize(#[`v2],b2,
+                                         new_domain(vars,x,new_domain(vars,y,d)))));
+        case join(lambda(`v1,`b1),lambda(`v2,`b2),lambda(`v,`b),`x,`y):
+            return materialize(#[`v],b,materialize(#[`v1],b1,materialize(#[`v2],b2,
+                                         new_domain(vars,x,new_domain(vars,y,d)))));
+        case crossProduct(lambda(`v1,`b1),lambda(`v2,`b2),lambda(`v,`b),`x,`y):
+            return materialize(#[`v],b,materialize(#[`v1],b1,materialize(#[`v2],b2,
+                                         new_domain(vars,x,new_domain(vars,y,d)))));
+        case let(`v,`x,`y):
+            Domains nd = materialize(vars.cons(v),y,materialize(vars,x,d));
+            Trees zs = #[];
+            for ( Tree z: nd.repeats )
+                if (!v.equals(z))
+                    zs = zs.cons(z);
+            nd.repeats = zs;
+            return nd;
+        case if(`p,`x,`y):
+            Domains nd = materialize(vars,p,d);
+            return union(materialize(vars,x,nd),
+                         materialize(vars,y,nd));
+        case callM(union,_,`x,`y):
+            return new_domain(vars,x,new_domain(vars,y,d));
+        case callM(_,`k,`x,`y):
+            if (((LongLeaf)k).value() != unionM)
+                fail;
+            return new_domain(vars,x,new_domain(vars,y,d));
+        case `f(...as):
+            Domains nd = new Domains(d.domains,d.repeats);
+            for ( Tree a: as )
+                nd = materialize(vars,a,nd);
+            return nd;
+        };
+        return d;
     }
 
     public static Tree materialize_terms ( Tree e ) {
-	Domains d = materialize(#[],e,new Domains(#[],#[]));
-	for ( Tree x: d.repeats )
-	    e = subst(x,#<materialize(`x)>,e);
-	return e;
+        Domains d = materialize(#[],e,new Domains(#[],#[]));
+        for ( Tree x: d.repeats )
+            e = subst(x,#<materialize(`x)>,e);
+        return e;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/3de9e485/src/main/java/core/MethodInfo.java
----------------------------------------------------------------------
diff --git a/src/main/java/core/MethodInfo.java b/src/main/java/core/MethodInfo.java
index 29d4aea..2dc9ad1 100644
--- a/src/main/java/core/MethodInfo.java
+++ b/src/main/java/core/MethodInfo.java
@@ -28,30 +28,30 @@ final public class MethodInfo implements Comparable<MethodInfo> {
     public Method method;
 
     MethodInfo ( String n, Trees s, Method m ) {
-	name = n;
-	signature = s;
-	method = m;
+        name = n;
+        signature = s;
+        method = m;
     }
 
     public int compareTo ( MethodInfo x )  {
-	int c = name.compareTo(x.name);
-	if (c != 0)
-	    return c;
-	if (signature.length() < x.signature.length())
-	    return -1;
-	if (signature.length() > x.signature.length())
-	    return 1;
-	// handles overloading: more specific method signatures first
-	for ( int i = 1; i < signature.length(); i++ ) {
-	    int ct = TypeInference.compare_types(signature.nth(i),x.signature.nth(i));
-	    if (ct != 0)
-		return ct;
-	};
-	return TypeInference.compare_types(signature.nth(0),x.signature.nth(0));
+        int c = name.compareTo(x.name);
+        if (c != 0)
+            return c;
+        if (signature.length() < x.signature.length())
+            return -1;
+        if (signature.length() > x.signature.length())
+            return 1;
+        // handles overloading: more specific method signatures first
+        for ( int i = 1; i < signature.length(); i++ ) {
+            int ct = TypeInference.compare_types(signature.nth(i),x.signature.nth(i));
+            if (ct != 0)
+                return ct;
+        };
+        return TypeInference.compare_types(signature.nth(0),x.signature.nth(0));
     }
 
     public boolean equals ( Object x ) {
-	return name.equals(((MethodInfo)x).name)
-	       && signature.equals(((MethodInfo)x).signature);
+        return name.equals(((MethodInfo)x).name)
+               && signature.equals(((MethodInfo)x).signature);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/3de9e485/src/main/java/core/Normalization.gen
----------------------------------------------------------------------
diff --git a/src/main/java/core/Normalization.gen b/src/main/java/core/Normalization.gen
index 1f8b05c..37d628e 100644
--- a/src/main/java/core/Normalization.gen
+++ b/src/main/java/core/Normalization.gen
@@ -26,343 +26,343 @@ public class Normalization extends Translator {
     /** given that pattern=e, find the bindings of the pattern variables */
     static Trees bind_pattern ( Tree pattern, Tree e ) {
         Trees args = #[];
-	match pattern {
-	case tuple(...pl):
-	    int i = 0;
-	    for ( Tree p: pl ) {
-		args = args.append(bind_pattern(p,#<nth(`e,`i)>));
-		i++;
-	    }
+        match pattern {
+        case tuple(...pl):
+            int i = 0;
+            for ( Tree p: pl ) {
+                args = args.append(bind_pattern(p,#<nth(`e,`i)>));
+                i++;
+            }
         case record(...bl):
-	    Trees attrs = #[];
-	    for ( Tree b: bl )
-		match b {
-		case bind(`n,`p):
-		    args = args.append(bind_pattern(p,#<project(`e,`n)>));
-		    if (attrs.member(n))
-			error("Duplicate record attribute name: "+n);
-		    attrs = attrs.append(n);
-		};
-	case typed(`p,`t):
-	    args = bind_pattern(p,#<typed(`e,`t)>);
-	case list(...pl):
-	    int i = 0;
-	    for ( Tree p: pl ) {
-		args = args.append(bind_pattern(p,#<index(`e,`i)>));
-		i++;
-	    };
-	    args = args.append(#<call(eq,call(count,`e),`i)>);
-	case call(`c,...s):
-	    Tree ci = data_constructors.lookup(c.toString());
-	    if (ci == null)
-		error("Undefined data constructor: "+c);
-	    match ci {
-	    case `dname(`n,`tp):
-		args = args.append(#<call(eq,union_tag(`e),`n)>);
-		args = args.append(bind_pattern(s.length() == 1 ? s.head() : #<tuple(...s)>,
-						#<typed(union_value(`e),`tp)>));
-	    };
+            Trees attrs = #[];
+            for ( Tree b: bl )
+                match b {
+                case bind(`n,`p):
+                    args = args.append(bind_pattern(p,#<project(`e,`n)>));
+                    if (attrs.member(n))
+                        error("Duplicate record attribute name: "+n);
+                    attrs = attrs.append(n);
+                };
+        case typed(`p,`t):
+            args = bind_pattern(p,#<typed(`e,`t)>);
+        case list(...pl):
+            int i = 0;
+            for ( Tree p: pl ) {
+                args = args.append(bind_pattern(p,#<index(`e,`i)>));
+                i++;
+            };
+            args = args.append(#<call(eq,call(count,`e),`i)>);
+        case call(`c,...s):
+            Tree ci = data_constructors.lookup(c.toString());
+            if (ci == null)
+                error("Undefined data constructor: "+c);
+            match ci {
+            case `dname(`n,`tp):
+                args = args.append(#<call(eq,union_tag(`e),`n)>);
+                args = args.append(bind_pattern(s.length() == 1 ? s.head() : #<tuple(...s)>,
+                                                #<typed(union_value(`e),`tp)>));
+            };
         case any: ;
-	case `v:
-	    if (!v.is_variable())    // constant in pattern
-		args = #[call(eq,`e,`v)];
-	    else if (st.lookup(v.toString()) != null    // repeated pattern variable
-		     && !(e.is_variable() && st.lookup(v.toString()).is_variable()))  // exception
-		args = #[call(eq,`e,`(st.lookup(v.toString())))];
-	    else st.insert(v.toString(),e);    // new pattern variable
-	};
-	return args;
+        case `v:
+            if (!v.is_variable())    // constant in pattern
+                args = #[call(eq,`e,`v)];
+            else if (st.lookup(v.toString()) != null    // repeated pattern variable
+                     && !(e.is_variable() && st.lookup(v.toString()).is_variable()))  // exception
+                args = #[call(eq,`e,`(st.lookup(v.toString())))];
+            else st.insert(v.toString(),e);    // new pattern variable
+        };
+        return args;
     }
 
     private static Tree make_tuple ( Trees pl ) {
-    	if (pl.length() == 1)
-	    return pl.head();
-	return #<tuple(...pl)>;
+        if (pl.length() == 1)
+            return pl.head();
+        return #<tuple(...pl)>;
     }
 
     /** remove group-bys and order-bys from the MRQL queries */
     static Tree remove_groupby ( Tree e ) {
-    	Tree ret = #<error>;
-	match e {
-	case select(distinct,`u,from(...bl),where(`c),groupby(...gl),orderby(...ol)):
-	    ret = #<select(none,tuple(`u,`u),from(...bl),where(`c),groupby(...gl),orderby(...ol))>;
-	    ret = #<cmap(lambda(tuple(key,group),list(key)),groupBy(`ret))>;
-	    return remove_groupby(ret);
-	case select(none,`u,from(...bl),where(`c),groupby(),orderby()):
-	    return remove_groupby(#<select(`u,from(...bl),where(`c))>);
-	case select(none,`u,from(...bl),where(`c),groupby(...gl),orderby(`l,...ol)):
-	    Tree tol = make_tuple(ol);
-	    ret = #<cmap(lambda(tuple(key,group),group),
-			 orderBy(select(none,tuple(`tol,`u),
-					from(...bl),
-					where(`c),groupby(...gl),orderby())))>;
-	    return (l.equals(#<none>))
-		   ? remove_groupby(ret)
-		   : #<range(`(remove_groupby(ret)),0,`l)>;
-	case select(none,`u,from(...bl),where(`c),groupby(`h,...gl),orderby()):
-	    Trees pl = #[];
-	    Trees ul = #[];
-	    Trees ql = #[];
-	    for ( Tree b: bl )
-		match b {
-		case bind(`p,`d):
-		    pl = pl.append(p);
-		};
-	    Trees pvs = #[];
-	    for ( Tree g: gl )
-		match g {
-		case bind(`p,`d):
-		    ql = ql.append(p);
-		    ul = ul.append(d);
-		    pvs = pvs.append(pattern_variables(p));
-		};
-	    Tree tql = make_tuple(ql);
-	    Tree tul = make_tuple(ul);
-	    Tree tpl = make_tuple(pl);
-	    Trees xl = #[];
-	    Trees partl = #[];
-	    for ( Tree x: pattern_variables(#<tuple(...pl)>) )
-		if (!pvs.member(x)) {
-		    partl = partl.append(#<bind(`x,`x)>);
-		    match rename(#<select(`x,from(bind(`tpl,group)),where(true))>) {
-		    case select(`hd,`binds,...):
-			xl = xl.append(#<bind(`x,bag(select(`hd,`binds,where(true))))>);
-		    }
-		};
-	    match rename(#<select(record(...partl),from(bind(`tpl,group)),where(true))>) {
-	    case select(`hd,`binds,...):
-		xl = xl.cons(#<bind(partition,bag(select(`hd,`binds,where(true))))>);
-	    }
-	    tpl = subst(#<any>,#<0>,tpl);
-	    ret = #<select(`u,from(bind(tuple(`tql,group),
-					groupBy(select(tuple(`tul,`tpl),from(...bl),where(`c)))),
-				   ...xl),where(`h))>;
-	    return remove_groupby(ret);
-	case intersect(`x,`y):
-	    return remove_groupby(#<select(x,from(bind(x,`x),bind(y,`y)),
-					   where(call(eq,x,y)))>);
-	case except(`x,`y):
-	    return remove_groupby(#<select(x,from(bind(x,`x)),
-					   where(call(not,call(exists,select(y,from(bind(y,`y)),
-									     where(call(eq,x,y)))))))>);
-	case member(`x,`y):
-	    return remove_groupby(#<call(exists,select(y,from(bind(y,`y)),
-						       where(call(eq,y,`x))))>);
-	case call(gen,`min,`max,`size):
-	    return #<gen(`(remove_groupby(min)),`(remove_groupby(max)),`(remove_groupby(size)))>;
-	case call(avg,`s):
-	    return remove_groupby(#<call(avg_value,call(avg_aggr,`s))>);
-	case call(`f,...al):
-	    Tree macro = global_macros.lookup(f.toString());
-	    if (macro == null)
-		fail;
-	    match macro {
-	    case macro(params(...pl),`body):
-		Tree b = rename(remove_groupby(body));
-		if (pl.length() != al.length())
-		    fail;
-		for ( ; !pl.is_empty(); pl = pl.tail(), al = al.tail() )
-		    b = subst(pl.head(),remove_groupby(al.head()),b);
-		return b;
-	    }
-	case call(`f,...al):
-	    if (#[cmap,join,mapReduce,mapReduce2,groupBy,orderBy,tuple,bag,list,set].member(f))
-		return remove_groupby(#<`(f.toString())(...al)>);
-	    else fail
-	case project(`x,`a):
-	    return #<project(`(remove_groupby(x)),`a)>;
-	case `f(...al):
-	    Trees bl = #[];
-	    for ( Tree a: al )
-		bl = bl.append(remove_groupby(a));
-	    return #<`f(...bl)>;
-	case `v:
-	    if (v.is_variable()) {
-		ret = global_vars.lookup(v.toString());
-		if (ret == null)
-		    return v;
-		else if (!v.equals(ret))
-		    return remove_groupby(ret);
-	    }
-	};
-	return e;
+        Tree ret = #<error>;
+        match e {
+        case select(distinct,`u,from(...bl),where(`c),groupby(...gl),orderby(...ol)):
+            ret = #<select(none,tuple(`u,`u),from(...bl),where(`c),groupby(...gl),orderby(...ol))>;
+            ret = #<cmap(lambda(tuple(key,group),list(key)),groupBy(`ret))>;
+            return remove_groupby(ret);
+        case select(none,`u,from(...bl),where(`c),groupby(),orderby()):
+            return remove_groupby(#<select(`u,from(...bl),where(`c))>);
+        case select(none,`u,from(...bl),where(`c),groupby(...gl),orderby(`l,...ol)):
+            Tree tol = make_tuple(ol);
+            ret = #<cmap(lambda(tuple(key,group),group),
+                         orderBy(select(none,tuple(`tol,`u),
+                                        from(...bl),
+                                        where(`c),groupby(...gl),orderby())))>;
+            return (l.equals(#<none>))
+                   ? remove_groupby(ret)
+                   : #<range(`(remove_groupby(ret)),0,`l)>;
+        case select(none,`u,from(...bl),where(`c),groupby(`h,...gl),orderby()):
+            Trees pl = #[];
+            Trees ul = #[];
+            Trees ql = #[];
+            for ( Tree b: bl )
+                match b {
+                case bind(`p,`d):
+                    pl = pl.append(p);
+                };
+            Trees pvs = #[];
+            for ( Tree g: gl )
+                match g {
+                case bind(`p,`d):
+                    ql = ql.append(p);
+                    ul = ul.append(d);
+                    pvs = pvs.append(pattern_variables(p));
+                };
+            Tree tql = make_tuple(ql);
+            Tree tul = make_tuple(ul);
+            Tree tpl = make_tuple(pl);
+            Trees xl = #[];
+            Trees partl = #[];
+            for ( Tree x: pattern_variables(#<tuple(...pl)>) )
+                if (!pvs.member(x)) {
+                    partl = partl.append(#<bind(`x,`x)>);
+                    match rename(#<select(`x,from(bind(`tpl,group)),where(true))>) {
+                    case select(`hd,`binds,...):
+                        xl = xl.append(#<bind(`x,bag(select(`hd,`binds,where(true))))>);
+                    }
+                };
+            match rename(#<select(record(...partl),from(bind(`tpl,group)),where(true))>) {
+            case select(`hd,`binds,...):
+                xl = xl.cons(#<bind(partition,bag(select(`hd,`binds,where(true))))>);
+            }
+            tpl = subst(#<any>,#<0>,tpl);
+            ret = #<select(`u,from(bind(tuple(`tql,group),
+                                        groupBy(select(tuple(`tul,`tpl),from(...bl),where(`c)))),
+                                   ...xl),where(`h))>;
+            return remove_groupby(ret);
+        case intersect(`x,`y):
+            return remove_groupby(#<select(x,from(bind(x,`x),bind(y,`y)),
+                                           where(call(eq,x,y)))>);
+        case except(`x,`y):
+            return remove_groupby(#<select(x,from(bind(x,`x)),
+                                           where(call(not,call(exists,select(y,from(bind(y,`y)),
+                                                                             where(call(eq,x,y)))))))>);
+        case member(`x,`y):
+            return remove_groupby(#<call(exists,select(y,from(bind(y,`y)),
+                                                       where(call(eq,y,`x))))>);
+        case call(gen,`min,`max,`size):
+            return #<gen(`(remove_groupby(min)),`(remove_groupby(max)),`(remove_groupby(size)))>;
+        case call(avg,`s):
+            return remove_groupby(#<call(avg_value,call(avg_aggr,`s))>);
+        case call(`f,...al):
+            Tree macro = global_macros.lookup(f.toString());
+            if (macro == null)
+                fail;
+            match macro {
+            case macro(params(...pl),`body):
+                Tree b = rename(remove_groupby(body));
+                if (pl.length() != al.length())
+                    fail;
+                for ( ; !pl.is_empty(); pl = pl.tail(), al = al.tail() )
+                    b = subst(pl.head(),remove_groupby(al.head()),b);
+                return b;
+            }
+        case call(`f,...al):
+            if (#[cmap,join,mapReduce,mapReduce2,groupBy,orderBy,tuple,bag,list,set].member(f))
+                return remove_groupby(#<`(f.toString())(...al)>);
+            else fail
+        case project(`x,`a):
+            return #<project(`(remove_groupby(x)),`a)>;
+        case `f(...al):
+            Trees bl = #[];
+            for ( Tree a: al )
+                bl = bl.append(remove_groupby(a));
+            return #<`f(...bl)>;
+        case `v:
+            if (v.is_variable()) {
+                ret = global_vars.lookup(v.toString());
+                if (ret == null)
+                    return v;
+                else if (!v.equals(ret))
+                    return remove_groupby(ret);
+            }
+        };
+        return e;
     }
 
     private static Tree make_and ( Trees tests ) {
         if (tests.is_empty())
-	    return #<true>;
-	Tree e = tests.head();
-	for ( Tree t: tests.tail() )
-	    e = #<call(and,`e,`t)>;
-	return e;
+            return #<true>;
+        Tree e = tests.head();
+        for ( Tree t: tests.tail() )
+            e = #<call(and,`e,`t)>;
+        return e;
     }
 
     private static Trees rename_list ( Trees al ) {
-	Trees bl = #[];
-	for ( Tree a: al )
-	    bl = bl.append(rename(a));
-	return bl;
+        Trees bl = #[];
+        for ( Tree a: al )
+            bl = bl.append(rename(a));
+        return bl;
     }
 
     /** compile away patterns and rename local variables of an MRQL expression e with unique names */
     static Tree rename ( Tree e ) {
-    	Tree ret = #<error>;
-	match e {
-	case `v:
-	    if (!v.is_variable())
-		fail;
-	    ret = st.lookup(v.toString());
-	    if (ret==null)
-		return v;
-	    else return ret;
-	case select(`u,from(...bl),where(`c)):
-	    st.begin_scope();
-	    Trees binds = #[];
-	    Trees tests = #[];
-	    for ( Tree b: bl )
-		match b {
-		case bind(`p,`d):
-		    Tree x = new_var();
-		    binds = binds.append(#<bind(`x,`(rename(d)))>);
-		    tests = tests.append(bind_pattern(p,x));
-		};
-	    c = make_and(tests.cons(c));
-	    ret = #<select(`(rename(u)),
-			   from(...binds),
-			   where(`(rename(c))))>;
-	    st.end_scope();
-	    return ret;
+        Tree ret = #<error>;
+        match e {
+        case `v:
+            if (!v.is_variable())
+                fail;
+            ret = st.lookup(v.toString());
+            if (ret==null)
+                return v;
+            else return ret;
+        case select(`u,from(...bl),where(`c)):
+            st.begin_scope();
+            Trees binds = #[];
+            Trees tests = #[];
+            for ( Tree b: bl )
+                match b {
+                case bind(`p,`d):
+                    Tree x = new_var();
+                    binds = binds.append(#<bind(`x,`(rename(d)))>);
+                    tests = tests.append(bind_pattern(p,x));
+                };
+            c = make_and(tests.cons(c));
+            ret = #<select(`(rename(u)),
+                           from(...binds),
+                           where(`(rename(c))))>;
+            st.end_scope();
+            return ret;
         case lambda(`p,`b):
-	    st.begin_scope();
-	    Tree nv = new_var();
-	    if (!bind_pattern(p,nv).is_empty())
-		error("Lambda patterns must be irrefutable: "+print_query(e));
-	    ret = #<lambda(`nv,`(rename(b)))>;
-	    st.end_scope();
-	    return ret;
+            st.begin_scope();
+            Tree nv = new_var();
+            if (!bind_pattern(p,nv).is_empty())
+                error("Lambda patterns must be irrefutable: "+print_query(e));
+            ret = #<lambda(`nv,`(rename(b)))>;
+            st.end_scope();
+            return ret;
        case function(tuple(...params),`outp,`body):
-	    st.begin_scope();
-	    Trees ps = #[];
-	    Trees vs = #[];
-	    for ( Tree p: params )
-		match p {
-		case `bind(`v,`tp):
-		    Tree nv = new_var();
-		    if (vs.member(v))
-			error("Duplicate function parameters: "+print_query(e));
-		    vs = vs.append(v);
-		    ps = ps.append(#<`bind(`nv,`tp)>);
-		    st.insert(v.toString(),nv);
-		};
-	    ret = #<function(tuple(...ps),`outp,`(rename(body)))>;
-	    st.end_scope();
-	    return ret;
-	case let(`p,`u,`b):
-	    Tree ne = rename(u);
-	    st.begin_scope();
-	    Tree nv = new_var();
-	    if (!bind_pattern(p,nv).is_empty())
-		error("Let patterns must be irrefutable: "+print_query(e));
-	    ret = #<let(`nv,`ne,`(rename(b)))>;
-	    st.end_scope();
-	    return ret;
-	case case(`u,...cs):
-	    Trees rs = cs.reverse();
-	    Tree nu = rename(u);
-	    match rs.head() {
-	    case case(`p,`b):
-		Trees conds = bind_pattern(p,nu);
-		if (!conds.is_empty())
-		    error("Non-exhaustive case "+print_query(p)+" in "+print_query(e));
-		ret = b;
-	    };
-	    for ( Tree c: rs.tail() )
-		match c {
-		case case(`p,`b):
-		    Trees conds = bind_pattern(p,nu);
-		    if (!conds.is_empty())
-			ret = #<if(`(make_and(conds)),`b,`ret)>;
-		    else error("Unreachable case "+print_query(p)+" in "+print_query(e));
-		};
-	    return rename(ret);
-	case project(`u,`a):
-	    return #<project(`(rename(u)),`a)>;
-	case bind(`a,`u):
-	    return #<bind(`a,`(rename(u)))>;
-	case loop(lambda(tuple(...vs),`b),`s,`n):
-	    return #<loop(lambda(tuple(...vs),`(rename(b))),`(rename(s)),`n)>;
-	case `f(...al):
-	    Trees bl = rename_list(al);
-	    return #<`f(...bl)>;
-	};
-	return e;
+            st.begin_scope();
+            Trees ps = #[];
+            Trees vs = #[];
+            for ( Tree p: params )
+                match p {
+                case `bind(`v,`tp):
+                    Tree nv = new_var();
+                    if (vs.member(v))
+                        error("Duplicate function parameters: "+print_query(e));
+                    vs = vs.append(v);
+                    ps = ps.append(#<`bind(`nv,`tp)>);
+                    st.insert(v.toString(),nv);
+                };
+            ret = #<function(tuple(...ps),`outp,`(rename(body)))>;
+            st.end_scope();
+            return ret;
+        case let(`p,`u,`b):
+            Tree ne = rename(u);
+            st.begin_scope();
+            Tree nv = new_var();
+            if (!bind_pattern(p,nv).is_empty())
+                error("Let patterns must be irrefutable: "+print_query(e));
+            ret = #<let(`nv,`ne,`(rename(b)))>;
+            st.end_scope();
+            return ret;
+        case case(`u,...cs):
+            Trees rs = cs.reverse();
+            Tree nu = rename(u);
+            match rs.head() {
+            case case(`p,`b):
+                Trees conds = bind_pattern(p,nu);
+                if (!conds.is_empty())
+                    error("Non-exhaustive case "+print_query(p)+" in "+print_query(e));
+                ret = b;
+            };
+            for ( Tree c: rs.tail() )
+                match c {
+                case case(`p,`b):
+                    Trees conds = bind_pattern(p,nu);
+                    if (!conds.is_empty())
+                        ret = #<if(`(make_and(conds)),`b,`ret)>;
+                    else error("Unreachable case "+print_query(p)+" in "+print_query(e));
+                };
+            return rename(ret);
+        case project(`u,`a):
+            return #<project(`(rename(u)),`a)>;
+        case bind(`a,`u):
+            return #<bind(`a,`(rename(u)))>;
+        case loop(lambda(tuple(...vs),`b),`s,`n):
+            return #<loop(lambda(tuple(...vs),`(rename(b))),`(rename(s)),`n)>;
+        case `f(...al):
+            Trees bl = rename_list(al);
+            return #<`f(...bl)>;
+        };
+        return e;
     }
 
     private static Trees has_existential ( Tree e ) {
-	match e {
+        match e {
         case call(and(`x,`y)):
-	    Trees xs = has_existential(x);
-	    Trees ys = has_existential(y);
-	    return #[call(and(`(xs.head()),`(ys.head())),...(xs.tail()),...(ys.tail()))];
-	case call(exists,select(...)):
-	    return #[true,`e];
-	case call(not,call(all,select(...l))):
-	    return #[true,call(exists,select(...l))];
-	};
-	return #[`e];
+            Trees xs = has_existential(x);
+            Trees ys = has_existential(y);
+            return #[call(and(`(xs.head()),`(ys.head())),...(xs.tail()),...(ys.tail()))];
+        case call(exists,select(...)):
+            return #[true,`e];
+        case call(not,call(all,select(...l))):
+            return #[true,call(exists,select(...l))];
+        };
+        return #[`e];
     }
 
     /** normalize algebraic expressions to more efficient forms using heuristic rules */
     public static Tree normalize ( Tree e ) {
-	match e {
-	case select(`u,from(),where(true)):
-	    return normalize(#<bag(`u)>);
-	case select(`u,from(),where(`p)):
-	    return normalize(#<if(`p,bag(`u),bag())>);
-	case select(`u,from(bind(`v,`d)),where(true)):
-	    if (u.equals(v))
-		return normalize(d);
-	    else fail
-	case select(`u,from(...bl,bind(`v,select(`iu,from(...ibl),where(`ic))),...al),where(`c)):
-	    return normalize(#<select(`u,from(...bl,...ibl,bind(`v,bag(`iu)),...al),
-				      where(call(and,`c,`ic)))>);
+        match e {
+        case select(`u,from(),where(true)):
+            return normalize(#<bag(`u)>);
+        case select(`u,from(),where(`p)):
+            return normalize(#<if(`p,bag(`u),bag())>);
+        case select(`u,from(bind(`v,`d)),where(true)):
+            if (u.equals(v))
+                return normalize(d);
+            else fail
+        case select(`u,from(...bl,bind(`v,select(`iu,from(...ibl),where(`ic))),...al),where(`c)):
+            return normalize(#<select(`u,from(...bl,...ibl,bind(`v,bag(`iu)),...al),
+                                      where(call(and,`c,`ic)))>);
         case select(`u,from(...bl,bind(`v,bag(`d)),...al),`c):
-	    if (!is_pure(d) && occurences(v,#<f(`c,`u,...al)>) > 1)  // duplicated side-effects
-		fail;
-	    return normalize(#<select(`(subst(v,d,u)),
-				      from(...bl,...(subst_list(v,d,al))),
-				      `(subst(v,d,c)))>);
+            if (!is_pure(d) && occurences(v,#<f(`c,`u,...al)>) > 1)  // duplicated side-effects
+                fail;
+            return normalize(#<select(`(subst(v,d,u)),
+                                      from(...bl,...(subst_list(v,d,al))),
+                                      `(subst(v,d,c)))>);
         case select(`u,from(...bl),where(`c)):
-	    Trees es = has_existential(c);
-	    if (es.length() <= 1)
-		fail;
-	    Trees binds = bl;
-	    Trees preds = #[`(es.head())];
-	    for ( Tree x: es.tail() )
-		match x {
-		case call(exists,select(`p,from(...bl2),where(`c2))):
-		    preds = preds.cons(p).cons(c2);
-		    binds = binds.append(bl2);
-		};
-	    return normalize(#<select(`u,from(...binds),where(`(make_and(preds))))>);
-	case let_bind(`v,`x,`y):
-	    return #<let(`v,`(normalize(x)),`(normalize(y)))>;
-	case call(eq,tuple(...l),`x):
-	    Tree pl = #<true>;
-	    int i = 0;
-	    for ( Tree y: l ) {
-		pl = #<call(and,`pl,call(eq,`y,nth(`x,`i)))>;
-		i++;
-	    };
-	    return normalize(pl);
-	case call(eq,`x,tuple(...l)):
-	    Tree pl = #<true>;
-	    int i = 0;
-	    for (Tree y: l) {
-		pl = #<call(and,`pl,call(eq,nth(`x,`i),`y))>;
-		i++;
-	    };
-	    return normalize(pl);
+            Trees es = has_existential(c);
+            if (es.length() <= 1)
+                fail;
+            Trees binds = bl;
+            Trees preds = #[`(es.head())];
+            for ( Tree x: es.tail() )
+                match x {
+                case call(exists,select(`p,from(...bl2),where(`c2))):
+                    preds = preds.cons(p).cons(c2);
+                    binds = binds.append(bl2);
+                };
+            return normalize(#<select(`u,from(...binds),where(`(make_and(preds))))>);
+        case let_bind(`v,`x,`y):
+            return #<let(`v,`(normalize(x)),`(normalize(y)))>;
+        case call(eq,tuple(...l),`x):
+            Tree pl = #<true>;
+            int i = 0;
+            for ( Tree y: l ) {
+                pl = #<call(and,`pl,call(eq,`y,nth(`x,`i)))>;
+                i++;
+            };
+            return normalize(pl);
+        case call(eq,`x,tuple(...l)):
+            Tree pl = #<true>;
+            int i = 0;
+            for (Tree y: l) {
+                pl = #<call(and,`pl,call(eq,nth(`x,`i),`y))>;
+                i++;
+            };
+            return normalize(pl);
         case call(and,true,`u): return normalize(u);
         case call(and,`u,true): return normalize(u);
         case call(and,false,`u): return #<false>;
@@ -371,36 +371,36 @@ public class Normalization extends Translator {
         case call(or,`u,true): return #<true>;
         case call(or,false,`u): return normalize(u);
         case call(or,`u,false): return normalize(u);
-	case call(not,true): return #<false>;
-	case call(not,false): return #<true>;
-	case if(true,`e1,`e2): return normalize(e1);
-	case if(false,`e1,`e2): return normalize(e2);
-	case nth(tuple(...al),`n):
-	    if (!n.is_long())
-		fail;
-	    int i = (int)n.longValue();
-	    if ( i >= 0 && i < al.length() )
-		return normalize(al.nth(i));
-	case project(record(...bl),`a):
-	    for ( Tree b: bl )
-		match b {
-		case bind(`v,`u): if (v.equals(a)) return normalize(u);
-		};
-	    error("Wrong projection: "+print_query(e));
-	case `f(...al):
-	    Trees bl = #[];
-	    for ( Tree a: al )
-		bl = bl.append(normalize(a));
-	    return #<`f(...bl)>;
-	};
-	return e;
+        case call(not,true): return #<false>;
+        case call(not,false): return #<true>;
+        case if(true,`e1,`e2): return normalize(e1);
+        case if(false,`e1,`e2): return normalize(e2);
+        case nth(tuple(...al),`n):
+            if (!n.is_long())
+                fail;
+            int i = (int)n.longValue();
+            if ( i >= 0 && i < al.length() )
+                return normalize(al.nth(i));
+        case project(record(...bl),`a):
+            for ( Tree b: bl )
+                match b {
+                case bind(`v,`u): if (v.equals(a)) return normalize(u);
+                };
+            error("Wrong projection: "+print_query(e));
+        case `f(...al):
+            Trees bl = #[];
+            for ( Tree a: al )
+                bl = bl.append(normalize(a));
+            return #<`f(...bl)>;
+        };
+        return e;
     }
 
     /** normalize algebraic expressions to more efficient forms using heuristic rules */
     public static Tree normalize_all ( Tree e ) {
-	Tree ne = normalize(e);
-	if (e.equals(ne))
-	    return e;
-	else return normalize(ne);
+        Tree ne = normalize(e);
+        if (e.equals(ne))
+            return e;
+        else return normalize(ne);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/3de9e485/src/main/java/core/ParsedDataSource.java
----------------------------------------------------------------------
diff --git a/src/main/java/core/ParsedDataSource.java b/src/main/java/core/ParsedDataSource.java
index ce5220b..7ac4c16 100644
--- a/src/main/java/core/ParsedDataSource.java
+++ b/src/main/java/core/ParsedDataSource.java
@@ -29,34 +29,34 @@ public class ParsedDataSource extends DataSource {
     public Trees args;
 
     ParsedDataSource ( int source_num,
-		       String path,
-		       Class<? extends Parser> parser,
-		       Trees args,
-		       Configuration conf ) {
-	super(source_num,path,ParsedInputFormat.class,conf);
-	this.parser = parser;
-	this.args = args;
+                       String path,
+                       Class<? extends Parser> parser,
+                       Trees args,
+                       Configuration conf ) {
+        super(source_num,path,ParsedInputFormat.class,conf);
+        this.parser = parser;
+        this.args = args;
     }
 
     ParsedDataSource ( String path,
-		       Class<? extends Parser> parser,
-		       Trees args,
-		       Configuration conf ) {
-	super(-1,path,ParsedInputFormat.class,conf);
-	this.parser = parser;
-	this.args = args;
+                       Class<? extends Parser> parser,
+                       Trees args,
+                       Configuration conf ) {
+        super(-1,path,ParsedInputFormat.class,conf);
+        this.parser = parser;
+        this.args = args;
     }
 
     public String toString () {
-	try {
-	    String pn = "";
-	    for ( String k: DataSource.parserDirectory.keySet() )
-		if (DataSource.parserDirectory.get(k).equals(parser))
-		    pn = k;
-	    return "Text"+separator+source_num+separator+pn+separator+path
-		   +separator+(new Node("args",args)).toString();
-	} catch (Exception e) {
-	    throw new Error(e);
-	}
+        try {
+            String pn = "";
+            for ( String k: DataSource.parserDirectory.keySet() )
+                if (DataSource.parserDirectory.get(k).equals(parser))
+                    pn = k;
+            return "Text"+separator+source_num+separator+pn+separator+path
+                   +separator+(new Node("args",args)).toString();
+        } catch (Exception e) {
+            throw new Error(e);
+        }
     }
 }