You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mrql.apache.org by fe...@apache.org on 2013/09/06 22:57:30 UTC

[12/18] MRQL-16: correct source files. ASF licenses, and POMs for release

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/3de9e485/src/main/java/core/Interpreter.gen
----------------------------------------------------------------------
diff --git a/src/main/java/core/Interpreter.gen b/src/main/java/core/Interpreter.gen
index 1799c08..5cc50ce 100644
--- a/src/main/java/core/Interpreter.gen
+++ b/src/main/java/core/Interpreter.gen
@@ -31,122 +31,122 @@ public class Interpreter extends TypeInference {
 
     /** retrieve variable binding */
     public final static MRData variable_lookup ( final String v, final Environment environment ) {
-	for ( Environment env = environment; env != null; env = env.next ) {
-	    if (v.equals(env.name))
-		return env.value;
-	};
-	return null;
+        for ( Environment env = environment; env != null; env = env.next ) {
+            if (v.equals(env.name))
+                return env.value;
+        };
+        return null;
     }
 
     /** insert a new global variable binding */
     public final static void new_global_binding ( final String var, final MRData value ) {
-	if (value instanceof Bag)
-	    ((Bag)value).materialize();
-	global_env = new Environment(var,value,global_env);
+        if (value instanceof Bag)
+            ((Bag)value).materialize();
+        global_env = new Environment(var,value,global_env);
     }
 
     /** remove a global variable binding */
     public static void remove_global_binding ( String v ) {
-	if (global_env == null)
-	    return;
-	for ( Environment env = global_env; env.next != null; env = env.next )
-	    if (v.equals(env.next.name))
-		env.next = env.next.next;
-	if (global_env.name == v)
-	    global_env = global_env.next;
+        if (global_env == null)
+            return;
+        for ( Environment env = global_env; env.next != null; env = env.next )
+            if (v.equals(env.next.name))
+                env.next = env.next.next;
+        if (global_env.name == v)
+            global_env = global_env.next;
     }
 
     /** retrieve a global variable binding */
     public static MRData lookup_global_binding ( String v ) {
-	for ( Environment env = global_env; env != null; env = env.next )
-	    if (v.equals(env.name))
-		return env.value;
-	return null;
+        for ( Environment env = global_env; env != null; env = env.next )
+            if (v.equals(env.name))
+                return env.value;
+        return null;
     }
 
     public static void set_global_bindings ( Environment env ) {
-	global_env = env;
+        global_env = env;
     }
 
     final static int coerce_method = ClassImporter.find_method_number("coerce",#[any,int]);
 
     /** untyped reify: not type-correct but will not crash the run-time system */
     private final static Tree reify ( final MRData x ) {
-	if (x instanceof Bag) {
-	    Bag b = (Bag)x;
-	    Trees as = #[];
-	    for ( MRData e: b)
-		as = as.append(reify(e));
-	    return #<list(...as)>;
-	} else if (x instanceof Tuple) {
-	    Tuple t = (Tuple)x;
-	    Trees as = #[];
-	    for ( short i = 0; i < t.size(); i++ )
-		as = as.append(reify(t.get(i)));
-	    return #<tuple(...as)>;
-	} else if (x instanceof MR_string)
-	    return new StringLeaf(((MR_string)x).get());
-	else if (x instanceof MR_short)
-	    return #<callM(coerce,`coerce_method,`(((MR_short)x).get()),`(MRContainer.SHORT))>;
-	else if (x instanceof MR_int)
-	    return #<`(((MR_int)x).get())>;
-	else if (x instanceof MR_long)
-	    return #<callM(coerce,`coerce_method,`((int)((MR_long)x).get()),`(MRContainer.LONG))>;
-	else if (x instanceof MR_float)
-	    return #<`(((MR_float)x).get())>;
-	else if (x instanceof MR_double)
-	    return #<callM(coerce,`coerce_method,`((float)(((MR_double)x).get())),`(MRContainer.DOUBLE))>;
-	throw new Error("wrong MRData: "+x);
+        if (x instanceof Bag) {
+            Bag b = (Bag)x;
+            Trees as = #[];
+            for ( MRData e: b)
+                as = as.append(reify(e));
+            return #<list(...as)>;
+        } else if (x instanceof Tuple) {
+            Tuple t = (Tuple)x;
+            Trees as = #[];
+            for ( short i = 0; i < t.size(); i++ )
+                as = as.append(reify(t.get(i)));
+            return #<tuple(...as)>;
+        } else if (x instanceof MR_string)
+            return new StringLeaf(((MR_string)x).get());
+        else if (x instanceof MR_short)
+            return #<callM(coerce,`coerce_method,`(((MR_short)x).get()),`(MRContainer.SHORT))>;
+        else if (x instanceof MR_int)
+            return #<`(((MR_int)x).get())>;
+        else if (x instanceof MR_long)
+            return #<callM(coerce,`coerce_method,`((int)((MR_long)x).get()),`(MRContainer.LONG))>;
+        else if (x instanceof MR_float)
+            return #<`(((MR_float)x).get())>;
+        else if (x instanceof MR_double)
+            return #<callM(coerce,`coerce_method,`((float)(((MR_double)x).get())),`(MRContainer.DOUBLE))>;
+        throw new Error("wrong MRData: "+x);
     }
 
     /** evaluate an MRQL function in memory */
     private final static Function evalf ( final String v,
-				  final Tree body,
-				  final Environment env ) {
-	return new Function() {
-	    final public MRData eval ( final MRData x ) {
-		return evalE(body,new Environment(v,x,env));
-	    }
-	};
+                                  final Tree body,
+                                  final Environment env ) {
+        return new Function() {
+            final public MRData eval ( final MRData x ) {
+                return evalE(body,new Environment(v,x,env));
+            }
+        };
     }
 
     /** evaluate an MRQL function in memory */
     public final static Function evalF ( Tree fnc, Environment env ) {
-	match fnc {
-	case compiled(`ln,`lm,...vars):
-	    try {
-		return Compiler.compiled(Thread.currentThread().getContextClassLoader(),ln.toString());
-	    } catch (Exception ex) {
-		System.err.println("*** Unable to retrieve the compiled lambda: "+fnc);
-		return ((Lambda) evalE(lm)).lambda();
-	    }
-	case lambda(`v,`b):
-	    return evalf(v.toString(),b,env);
-	case function(tuple(...params),`tp,`body):
-	    String[] as = new String[params.length()];
-	    int i = 0;
-	    for ( Tree param: params )
-		match param {
-		case `bind(`v,_):
-		    as[i++] = v.toString();
-		};
-	    return evalT(as,body,env);
-	};
-	throw new Error("Ill-formed lambda: "+fnc);
+        match fnc {
+        case compiled(`ln,`lm,...vars):
+            try {
+                return Compiler.compiled(Thread.currentThread().getContextClassLoader(),ln.toString());
+            } catch (Exception ex) {
+                System.err.println("*** Unable to retrieve the compiled lambda: "+fnc);
+                return ((Lambda) evalE(lm)).lambda();
+            }
+        case lambda(`v,`b):
+            return evalf(v.toString(),b,env);
+        case function(tuple(...params),`tp,`body):
+            String[] as = new String[params.length()];
+            int i = 0;
+            for ( Tree param: params )
+                match param {
+                case `bind(`v,_):
+                    as[i++] = v.toString();
+                };
+            return evalT(as,body,env);
+        };
+        throw new Error("Ill-formed lambda: "+fnc);
     }
 
     /** evaluate an MRQL function in memory */
     private final static Function evalT ( final String[] params,
-					  final Tree body,
-					  final Environment env ) {
-	return new Function() {
-	    final public MRData eval ( final MRData x ) {
-		Environment new_env = env;
-		for ( int i = 0; i < params.length; i++ )
-		    new_env = new Environment(params[i],((Tuple)x).get(i),new_env);
-		return evalE(body,new_env);
-	    }
-	};
+                                          final Tree body,
+                                          final Environment env ) {
+        return new Function() {
+            final public MRData eval ( final MRData x ) {
+                Environment new_env = env;
+                for ( int i = 0; i < params.length; i++ )
+                    new_env = new Environment(params[i],((Tuple)x).get(i),new_env);
+                return evalE(body,new_env);
+            }
+        };
     }
 
     final static String true_name = #<true>.toString();
@@ -159,573 +159,573 @@ public class Interpreter extends TypeInference {
     static int tab_count = -3;
 
     public static String tabs ( int n ) {
-	StringBuffer b = new StringBuffer();
-	for ( int i = 0; i < n; i++)
-	    b.append(' ');
-	return b.toString();
+        StringBuffer b = new StringBuffer();
+        for ( int i = 0; i < n; i++)
+            b.append(' ');
+        return b.toString();
     }
 
     /** evaluate an MRQL expression in memory and print tracing info */
     final static MRData evalE ( final Tree e, final Environment env ) {
-	if (Config.trace_exp_execution) {
-	    tab_count += 3;
-	    System.out.println(tabs(tab_count)+print_query(e));
-	};
-	MRData res = evalEE(e,env);
-	if (Config.trace_exp_execution) {
-	    System.out.println(tabs(tab_count)+"-> "+res);
-	    tab_count -= 3;
-	};
-	return res;
+        if (Config.trace_exp_execution) {
+            tab_count += 3;
+            System.out.println(tabs(tab_count)+print_query(e));
+        };
+        MRData res = evalEE(e,env);
+        if (Config.trace_exp_execution) {
+            System.out.println(tabs(tab_count)+"-> "+res);
+            tab_count -= 3;
+        };
+        return res;
     }
 
     /** evaluate an MRQL expression in memory */
     private final static MRData evalEE ( final Tree e, final Environment env ) {
-	try {
-	    if (e.is_variable()) {
-		String v = e.toString();
-		if (v == true_name)
-		    return true_value;
-		else if (v == false_name)
-		    return false_value;
-		else if (v == null_name)
-		    return null_value;
-		MRData x = variable_lookup(v,env);
-		if (x != null)
-		    return x;
-		x = lookup_global_binding(v);
-		if (x == null)
-		    throw new Error("Variable "+v+" is not bound");
-		return x;
-	    } else if (e.is_long())
-	        return new MR_int((int)e.longValue());
-	    else if (e.is_double())
-	        return new MR_float((float)e.doubleValue());
-	    else if (e.is_string())
-		return new MR_string(e.stringValue());
+        try {
+            if (e.is_variable()) {
+                String v = e.toString();
+                if (v == true_name)
+                    return true_value;
+                else if (v == false_name)
+                    return false_value;
+                else if (v == null_name)
+                    return null_value;
+                MRData x = variable_lookup(v,env);
+                if (x != null)
+                    return x;
+                x = lookup_global_binding(v);
+                if (x == null)
+                    throw new Error("Variable "+v+" is not bound");
+                return x;
+            } else if (e.is_long())
+                return new MR_int((int)e.longValue());
+            else if (e.is_double())
+                return new MR_float((float)e.doubleValue());
+            else if (e.is_string())
+                return new MR_string(e.stringValue());
         match e {
-	case callM(and,_,`x,`y):  // lazy
-	    return (((MR_bool)evalE(x,env)).get()) ? evalE(y,env) : false_value;
-	case callM(or,_,`x,`y):
-	    return (((MR_bool)evalE(x,env)).get()) ? true_value : evalE(y,env);
-	case callM(`f,`n,...args):   // internal function call
-	    MRData[] as = new MRData[args.length()];
-	    for ( int i = 0; i < args.length(); i++ )
-		as[i] = evalE(args.nth(i),env);
-	    return ClassImporter.call((int)n.longValue(),as);
-	case compiled(`ln,_,...vars):
-	    return new Lambda(Compiler.compiled(Thread.currentThread().getContextClassLoader(),ln.toString()));
-	case lambda(`v,`body):
-	    return new Lambda(evalf(v.toString(),body,env));
-	case nth(`x,`n):
-	    return ((Tuple)evalE(x,env)).get((int)n.longValue());
-	case setNth(`x,`n,`v,`ret):
-	    return ((Tuple)evalE(x,env)).set((int)n.longValue(),evalE(v,env),evalE(ret,env));
-	case materialize(`u):
-	    return MapReduceAlgebra.materialize(evalE(u,env));
-	case let(`v,`u,`body):
-	    MRData x = evalE(u,env);
-	    if (x instanceof Bag)
-		((Bag)x).materialize();
-	    return evalE(body,new Environment(v.toString(),x,env));
-	case cmap(`f,`s):
-	    return MapReduceAlgebra.cmap(evalF(f,env),(Bag)evalE(s,env));
-	case filter(`p,`m,`s):
-	    return MapReduceAlgebra.filter(evalF(p,env),evalF(m,env),(Bag)evalE(s,env));
-	case map(`m,`s):
-	    return MapReduceAlgebra.map(evalF(m,env),(Bag)evalE(s,env));
-	case repeat(lambda(`v,`b),`s,`n):
-	    final String nm = v.toString();
-	    final Tree body = b;
-	    if (Config.hadoop_mode) {
-		Function loop_fnc = new Function () {
-			public MRData eval ( MRData s ) {
-			    new_global_binding(nm,s);
-			    MRData val = new MR_dataset(Evaluator.eval(body,new Environment(nm,s,env),nm));
-			    return val;
-			}; };
-		return MapReduceAlgebra.repeat(loop_fnc,(Bag)evalE(s,env),((MR_int)evalE(n,env)).get());
-	    } else {
-		Function loop_fnc = new Function () {
-			public MRData eval ( MRData s ) {
-			    new_global_binding(nm,s);
-			    return evalM(body,new Environment(nm,s,env));
-			}; };
-		return MapReduceAlgebra.repeat(loop_fnc,(Bag)evalE(s,env),((MR_int)evalE(n,env)).get());
-	    }
-	case repeat(`lm,`s,`n):
-	    return MapReduceAlgebra.repeat(evalF(lm,env),(Bag)evalE(s,env),((MR_int)evalE(n,env)).get());
-	case range(`min,`max):
-	    return MapReduceAlgebra.generator(((MR_long)evalE(min,env)).get(),
-					      ((MR_long)evalE(max,env)).get());
-	case call(`f,...args):
-	    Tuple t = new Tuple(args.length());
-	    int i = 0;
-	    for ( Tree a: args )
-		t.set(i++,evalE(a,env));
-	    return evalF(f,env).eval(t);
-	case tuple(`x,`y):
-	    return new Tuple(evalE(x,env),evalE(y,env));
-	case tuple(`x,`y,`z):
-	    return new Tuple(evalE(x,env),evalE(y,env),evalE(z,env));
-	case tuple(...el):
-	    Tuple t = new Tuple(el.length());
-	    int i = 0;
-	    for ( Tree a: el )
-		t.set(i++,evalE(a,env));
-	    return t;
-	case tagged_union(`n,`u):
-	    return new Union((byte)n.longValue(),evalE(u,env));
-	case union_value(`x):
-	    return ((Union)evalE(x,env)).value();
-	case union_tag(`x):
-	    return new MR_int(((Union)evalE(x,env)).tag());
-	// used for shortcutting sync in bsp supersteps
-	case BAG():
-	    return SystemFunctions.bsp_empty_bag;
-	case TRUE():
-	    return SystemFunctions.bsp_true_value;
-	case FALSE():
-	    return SystemFunctions.bsp_false_value;
-	case `T(...el):
-	    if (!is_collection(T))
-		fail;
-	    if (el.is_empty())
-		return new Bag();
-	    Bag b = new Bag(el.length());
-	    for ( Tree a: el )
-		b.add(evalE(a,env));
-	    return b;
-	case if(`c,`x,`y):
-	    if (((MR_bool)evalE(c,env)).get())
-		return evalE(x,env);
-	    else return evalE(y,env);
+        case callM(and,_,`x,`y):  // lazy
+            return (((MR_bool)evalE(x,env)).get()) ? evalE(y,env) : false_value;
+        case callM(or,_,`x,`y):
+            return (((MR_bool)evalE(x,env)).get()) ? true_value : evalE(y,env);
+        case callM(`f,`n,...args):   // internal function call
+            MRData[] as = new MRData[args.length()];
+            for ( int i = 0; i < args.length(); i++ )
+                as[i] = evalE(args.nth(i),env);
+            return ClassImporter.call((int)n.longValue(),as);
+        case compiled(`ln,_,...vars):
+            return new Lambda(Compiler.compiled(Thread.currentThread().getContextClassLoader(),ln.toString()));
+        case lambda(`v,`body):
+            return new Lambda(evalf(v.toString(),body,env));
+        case nth(`x,`n):
+            return ((Tuple)evalE(x,env)).get((int)n.longValue());
+        case setNth(`x,`n,`v,`ret):
+            return ((Tuple)evalE(x,env)).set((int)n.longValue(),evalE(v,env),evalE(ret,env));
+        case materialize(`u):
+            return MapReduceAlgebra.materialize(evalE(u,env));
+        case let(`v,`u,`body):
+            MRData x = evalE(u,env);
+            if (x instanceof Bag)
+                ((Bag)x).materialize();
+            return evalE(body,new Environment(v.toString(),x,env));
+        case cmap(`f,`s):
+            return MapReduceAlgebra.cmap(evalF(f,env),(Bag)evalE(s,env));
+        case filter(`p,`m,`s):
+            return MapReduceAlgebra.filter(evalF(p,env),evalF(m,env),(Bag)evalE(s,env));
+        case map(`m,`s):
+            return MapReduceAlgebra.map(evalF(m,env),(Bag)evalE(s,env));
+        case repeat(lambda(`v,`b),`s,`n):
+            final String nm = v.toString();
+            final Tree body = b;
+            if (Config.hadoop_mode) {
+                Function loop_fnc = new Function () {
+                        public MRData eval ( MRData s ) {
+                            new_global_binding(nm,s);
+                            MRData val = new MR_dataset(Evaluator.eval(body,new Environment(nm,s,env),nm));
+                            return val;
+                        }; };
+                return MapReduceAlgebra.repeat(loop_fnc,(Bag)evalE(s,env),((MR_int)evalE(n,env)).get());
+            } else {
+                Function loop_fnc = new Function () {
+                        public MRData eval ( MRData s ) {
+                            new_global_binding(nm,s);
+                            return evalM(body,new Environment(nm,s,env));
+                        }; };
+                return MapReduceAlgebra.repeat(loop_fnc,(Bag)evalE(s,env),((MR_int)evalE(n,env)).get());
+            }
+        case repeat(`lm,`s,`n):
+            return MapReduceAlgebra.repeat(evalF(lm,env),(Bag)evalE(s,env),((MR_int)evalE(n,env)).get());
+        case range(`min,`max):
+            return MapReduceAlgebra.generator(((MR_long)evalE(min,env)).get(),
+                                              ((MR_long)evalE(max,env)).get());
+        case call(`f,...args):
+            Tuple t = new Tuple(args.length());
+            int i = 0;
+            for ( Tree a: args )
+                t.set(i++,evalE(a,env));
+            return evalF(f,env).eval(t);
+        case tuple(`x,`y):
+            return new Tuple(evalE(x,env),evalE(y,env));
+        case tuple(`x,`y,`z):
+            return new Tuple(evalE(x,env),evalE(y,env),evalE(z,env));
+        case tuple(...el):
+            Tuple t = new Tuple(el.length());
+            int i = 0;
+            for ( Tree a: el )
+                t.set(i++,evalE(a,env));
+            return t;
+        case tagged_union(`n,`u):
+            return new Union((byte)n.longValue(),evalE(u,env));
+        case union_value(`x):
+            return ((Union)evalE(x,env)).value();
+        case union_tag(`x):
+            return new MR_int(((Union)evalE(x,env)).tag());
+        // used for shortcutting sync in bsp supersteps
+        case BAG():
+            return SystemFunctions.bsp_empty_bag;
+        case TRUE():
+            return SystemFunctions.bsp_true_value;
+        case FALSE():
+            return SystemFunctions.bsp_false_value;
+        case `T(...el):
+            if (!is_collection(T))
+                fail;
+            if (el.is_empty())
+                return new Bag();
+            Bag b = new Bag(el.length());
+            for ( Tree a: el )
+                b.add(evalE(a,env));
+            return b;
+        case if(`c,`x,`y):
+            if (((MR_bool)evalE(c,env)).get())
+                return evalE(x,env);
+            else return evalE(y,env);
         case Collect(`s):
-	    try {
-		if (Config.hadoop_mode)
-		    return Plan.collect(Evaluator.eval(s,env,"-"));
-		Bag b = evalS(s,env);
-		b.materialize();
-		return b;
-	    } catch (Exception ex) { throw new Error(ex); }
-	case dataset_size(`x):
-	    return new MR_long(Plan.size(Evaluator.eval(x,env,"-")) / (1024*1024));
-	case synchronize(`peer,`b):
-	    return Evaluator.synchronize(((MR_string)evalE(peer,env)),(MR_bool)evalE(b,env));
-	case distribute(`peer,`s):
-	    return Evaluator.distribute(((MR_string)evalE(peer,env)),(Bag)evalE(s,env));
-	case mapReduce(`m,`r,`s,_):
-	    return MapReduceAlgebra.mapReduce(evalF(m,env),
-					      evalF(r,env),
-					      (Bag)evalE(s,env));
-	case mapReduce2(`mx,`my,`r,`x,`y,_):
-	    return MapReduceAlgebra.mapReduce2(
-				evalF(mx,env),
-				evalF(my,env),
-				evalF(r,env),
-				(Bag)evalE(x,env),
-				(Bag)evalE(y,env));
-	case mapJoin(`kx,`ky,`r,`x,`y):
-	    return MapReduceAlgebra.mapJoin(
-				evalF(kx,env),
-				evalF(ky,env),
-				evalF(r,env),
-				(Bag)evalE(x,env),
-				(Bag)evalE(y,env));
-	case crossProduct(`mx,`my,`r,`x,`y):
-	    return MapReduceAlgebra.crossProduct(
-			      evalF(mx,env),
-			      evalF(my,env),
-			      evalF(r,env),
-			      (Bag)evalE(x,env),
-			      (Bag)evalE(y,env));
-	case groupBy(`s):
-	    return MapReduceAlgebra.groupBy((Bag)evalE(s,env));
-	case orderBy(`s):
-	    return MapReduceAlgebra.groupBy((Bag)evalE(s,env));
-	case index(`x,`n):
-	    MRData xv = evalE(x,env);
-	    MRData nv = evalE(n,env);
-	    if (xv instanceof MR_dataset)
-		xv = Plan.collect(((MR_dataset)xv).dataset());
-	    Bag b = (Bag)xv;
-	    int k = (int)((MR_int)nv).get();
-	    return b.get(k);
-	case range(`x,`i,`j):
-	    MRData xv = evalE(x,env);
-	    MRData ni = evalE(i,env);
-	    MRData nj = evalE(j,env);
-	    if (xv instanceof MR_dataset)
-		xv = Plan.collect(((MR_dataset)xv).dataset());
-	    Bag b = (Bag)xv;
-	    int ki = (int)((MR_int)ni).get();
-	    int kj = (int)((MR_int)nj).get();
-	    Iterator<MRData> it = b.iterator();
-	    Bag s = new Bag();
-	    for ( int n = 0; it.hasNext() && n < ki; n++ )
-		it.next();
-	    for ( int n = ki; it.hasNext() && n <= kj; n++ )
-		s.add(it.next());
-	    return s;
-	case map_index(`x,`key):
-	    MRData xv = evalE(x,env);
-	    MRData nk = evalE(key,env);
-	    if (xv instanceof MR_dataset)
-		xv = Plan.collect(((MR_dataset)xv).dataset());
-	    return ((Bag)xv).map_find(nk);
-	case aggregate(`acc,`zero,`s):
-	    return MapReduceAlgebra.aggregate(evalF(acc,env),evalE(zero,env),
-					      (Bag)evalE(s,env));
-	case Aggregate(`acc,`zero,`s):
-	    if (Config.hadoop_mode)
-		return Evaluator.aggregate(closure(acc,env),zero,s,env);
-	    else return MapReduceAlgebra.aggregate(evalF(acc,env),evalE(zero,env),evalM(s,env));
-	case BSP(tuple(...ns),`superstep,`state,`o,...as):
-	    if (Config.hadoop_mode)
-		return Evaluator.bsp(e,env);
-	    Bag[] ds = new Bag[as.length()];
-	    for ( int i = 0; i < ds.length; i++ )
-		ds[i] = evalM(as.nth(i),env);
-	    int[] nn = new int[ns.length()];
-	    for ( int i = 0; i < ns.length(); i++ )
-		nn[i] = (int)((LongLeaf)ns.nth(i)).value();
-	    return MapReduceAlgebra.BSP(nn,
-					evalF(superstep,env),
-					evalE(state,env),
-					o.equals(#<true>),
-					ds);
-	case BSP(`n,`superstep,`state,`o,...as):
-	    if (Config.hadoop_mode)
-		return Evaluator.bsp(e,env);
-	    Bag[] ds = new Bag[as.length()];
-	    for ( int i = 0; i < ds.length; i++ )
-		ds[i] = evalM(as.nth(i),env);
-	    return MapReduceAlgebra.BSP(new int[]{(int)((LongLeaf)n).value()},
-					evalF(superstep,env),
-					evalE(state,env),
-					o.equals(#<true>),
-					ds);
-	case Loop(lambda(tuple(...vs),tuple(...bs)),tuple(...ss),`num):
-	    if (Config.hadoop_mode)
-		return Evaluator.loop(e,env);
-	    int limit = ((MR_int)evalE(num,env)).get();
-	    Bag[] s = new Bag[vs.length()];
-	    for ( int i = 0; i < vs.length(); i++ )
-		s[i] = evalM(ss.nth(i),env);
-	    for ( int n = 0; n < limit; n++ ) {
-		Environment nenv = env;
-		for ( int i = 0; i < vs.length(); i ++ ) {
-		    s[i].materialize();
-		    nenv = new Environment(vs.nth(i).toString(),s[i],nenv);
-		};
-		for ( int i = 0; i < vs.length(); i ++ )
-		    s[i] = (Bag)evalM(bs.nth(i),nenv);
-	    };
-	    return new Tuple(s);
-	case function(tuple(...params),`tp,`body):
-	    String[] as = new String[params.length()];
-	    int i = 0;
-	    for ( Tree param: params )
-		match param {
-		case `bind(`v,_):
-		    as[i++] = v.toString();
-		};
-	    return new Lambda(evalT(as,body,env));
-	case typed(`x,_):
-	    return evalE(x,env);
-	case apply(`f,`arg):
-	    if (!f.is_variable())
-		return evalF(f,env).eval(evalE(arg,env));
-	    MRData fnc = lookup_global_binding(f.toString());
-	    if (fnc == null) {
-		String s = Plan.conf.get("mrql.global."+f);
-		if (s != null)
-		    try {
-			Tree ft = Tree.parse(s);
-			TopLevel.store(f.toString(),ft);
-			fnc = evalE(ft,env);
-			new_global_binding(f.toString(),fnc);
-		    } catch (Exception ex) {
-			throw new Error(ex);
-		    }
-	    };
-	    MRData t = evalE(arg,env);
-	    if (!(t instanceof Tuple))
-		throw new Error("Expected a tuple in function application: "+t);
-	    return ((Lambda)fnc).lambda().eval(t);
-	case trace(`x):
-	    MRData z = evalE(x,env);
-	    System.err.println("*** "+x+": "+z);
-	    return z;
-	case BinarySource(...,`file,`tp):
-	    if (Config.hadoop_mode)
-		if (collection_type(tp))
-		    return Plan.collect(Plan.binarySource(file.stringValue()));
-		else return Plan.collect(Plan.binarySource(file.stringValue())).get(0);
-	    else return MapReduceAlgebra.read_binary(file.stringValue());
-	case _:
-	    try {
-		if (Config.hadoop_mode)
-		    return new MR_dataset(Evaluator.eval(e,env,"-"));
-		else return evalS(e,env);
-	    } catch (Exception ex) { throw new Error(ex); }
+            try {
+                if (Config.hadoop_mode)
+                    return Plan.collect(Evaluator.eval(s,env,"-"));
+                Bag b = evalS(s,env);
+                b.materialize();
+                return b;
+            } catch (Exception ex) { throw new Error(ex); }
+        case dataset_size(`x):
+            return new MR_long(Plan.size(Evaluator.eval(x,env,"-")) / (1024*1024));
+        case synchronize(`peer,`b):
+            return Evaluator.synchronize(((MR_string)evalE(peer,env)),(MR_bool)evalE(b,env));
+        case distribute(`peer,`s):
+            return Evaluator.distribute(((MR_string)evalE(peer,env)),(Bag)evalE(s,env));
+        case mapReduce(`m,`r,`s,_):
+            return MapReduceAlgebra.mapReduce(evalF(m,env),
+                                              evalF(r,env),
+                                              (Bag)evalE(s,env));
+        case mapReduce2(`mx,`my,`r,`x,`y,_):
+            return MapReduceAlgebra.mapReduce2(
+                                evalF(mx,env),
+                                evalF(my,env),
+                                evalF(r,env),
+                                (Bag)evalE(x,env),
+                                (Bag)evalE(y,env));
+        case mapJoin(`kx,`ky,`r,`x,`y):
+            return MapReduceAlgebra.mapJoin(
+                                evalF(kx,env),
+                                evalF(ky,env),
+                                evalF(r,env),
+                                (Bag)evalE(x,env),
+                                (Bag)evalE(y,env));
+        case crossProduct(`mx,`my,`r,`x,`y):
+            return MapReduceAlgebra.crossProduct(
+                              evalF(mx,env),
+                              evalF(my,env),
+                              evalF(r,env),
+                              (Bag)evalE(x,env),
+                              (Bag)evalE(y,env));
+        case groupBy(`s):
+            return MapReduceAlgebra.groupBy((Bag)evalE(s,env));
+        case orderBy(`s):
+            return MapReduceAlgebra.groupBy((Bag)evalE(s,env));
+        case index(`x,`n):
+            MRData xv = evalE(x,env);
+            MRData nv = evalE(n,env);
+            if (xv instanceof MR_dataset)
+                xv = Plan.collect(((MR_dataset)xv).dataset());
+            Bag b = (Bag)xv;
+            int k = (int)((MR_int)nv).get();
+            return b.get(k);
+        case range(`x,`i,`j):
+            MRData xv = evalE(x,env);
+            MRData ni = evalE(i,env);
+            MRData nj = evalE(j,env);
+            if (xv instanceof MR_dataset)
+                xv = Plan.collect(((MR_dataset)xv).dataset());
+            Bag b = (Bag)xv;
+            int ki = (int)((MR_int)ni).get();
+            int kj = (int)((MR_int)nj).get();
+            Iterator<MRData> it = b.iterator();
+            Bag s = new Bag();
+            for ( int n = 0; it.hasNext() && n < ki; n++ )
+                it.next();
+            for ( int n = ki; it.hasNext() && n <= kj; n++ )
+                s.add(it.next());
+            return s;
+        case map_index(`x,`key):
+            MRData xv = evalE(x,env);
+            MRData nk = evalE(key,env);
+            if (xv instanceof MR_dataset)
+                xv = Plan.collect(((MR_dataset)xv).dataset());
+            return ((Bag)xv).map_find(nk);
+        case aggregate(`acc,`zero,`s):
+            return MapReduceAlgebra.aggregate(evalF(acc,env),evalE(zero,env),
+                                              (Bag)evalE(s,env));
+        case Aggregate(`acc,`zero,`s):
+            if (Config.hadoop_mode)
+                return Evaluator.aggregate(closure(acc,env),zero,s,env);
+            else return MapReduceAlgebra.aggregate(evalF(acc,env),evalE(zero,env),evalM(s,env));
+        case BSP(tuple(...ns),`superstep,`state,`o,...as):
+            if (Config.hadoop_mode)
+                return Evaluator.bsp(e,env);
+            Bag[] ds = new Bag[as.length()];
+            for ( int i = 0; i < ds.length; i++ )
+                ds[i] = evalM(as.nth(i),env);
+            int[] nn = new int[ns.length()];
+            for ( int i = 0; i < ns.length(); i++ )
+                nn[i] = (int)((LongLeaf)ns.nth(i)).value();
+            return MapReduceAlgebra.BSP(nn,
+                                        evalF(superstep,env),
+                                        evalE(state,env),
+                                        o.equals(#<true>),
+                                        ds);
+        case BSP(`n,`superstep,`state,`o,...as):
+            if (Config.hadoop_mode)
+                return Evaluator.bsp(e,env);
+            Bag[] ds = new Bag[as.length()];
+            for ( int i = 0; i < ds.length; i++ )
+                ds[i] = evalM(as.nth(i),env);
+            return MapReduceAlgebra.BSP(new int[]{(int)((LongLeaf)n).value()},
+                                        evalF(superstep,env),
+                                        evalE(state,env),
+                                        o.equals(#<true>),
+                                        ds);
+        case Loop(lambda(tuple(...vs),tuple(...bs)),tuple(...ss),`num):
+            if (Config.hadoop_mode)
+                return Evaluator.loop(e,env);
+            int limit = ((MR_int)evalE(num,env)).get();
+            Bag[] s = new Bag[vs.length()];
+            for ( int i = 0; i < vs.length(); i++ )
+                s[i] = evalM(ss.nth(i),env);
+            for ( int n = 0; n < limit; n++ ) {
+                Environment nenv = env;
+                for ( int i = 0; i < vs.length(); i ++ ) {
+                    s[i].materialize();
+                    nenv = new Environment(vs.nth(i).toString(),s[i],nenv);
+                };
+                for ( int i = 0; i < vs.length(); i ++ )
+                    s[i] = (Bag)evalM(bs.nth(i),nenv);
+            };
+            return new Tuple(s);
+        case function(tuple(...params),`tp,`body):
+            String[] as = new String[params.length()];
+            int i = 0;
+            for ( Tree param: params )
+                match param {
+                case `bind(`v,_):
+                    as[i++] = v.toString();
+                };
+            return new Lambda(evalT(as,body,env));
+        case typed(`x,_):
+            return evalE(x,env);
+        case apply(`f,`arg):
+            if (!f.is_variable())
+                return evalF(f,env).eval(evalE(arg,env));
+            MRData fnc = lookup_global_binding(f.toString());
+            if (fnc == null) {
+                String s = Plan.conf.get("mrql.global."+f);
+                if (s != null)
+                    try {
+                        Tree ft = Tree.parse(s);
+                        TopLevel.store(f.toString(),ft);
+                        fnc = evalE(ft,env);
+                        new_global_binding(f.toString(),fnc);
+                    } catch (Exception ex) {
+                        throw new Error(ex);
+                    }
+            };
+            MRData t = evalE(arg,env);
+            if (!(t instanceof Tuple))
+                throw new Error("Expected a tuple in function application: "+t);
+            return ((Lambda)fnc).lambda().eval(t);
+        case trace(`x):
+            MRData z = evalE(x,env);
+            System.err.println("*** "+x+": "+z);
+            return z;
+        case BinarySource(...,`file,`tp):
+            if (Config.hadoop_mode)
+                if (collection_type(tp))
+                    return Plan.collect(Plan.binarySource(file.stringValue()));
+                else return Plan.collect(Plan.binarySource(file.stringValue())).get(0);
+            else return MapReduceAlgebra.read_binary(file.stringValue());
+        case _:
+            try {
+                if (Config.hadoop_mode)
+                    return new MR_dataset(Evaluator.eval(e,env,"-"));
+                else return evalS(e,env);
+            } catch (Exception ex) { throw new Error(ex); }
         };
-	throw new Error("Cannot evaluate the expression: "+e);
-	} catch (Error msg) {
-	    if (!Config.trace)
-		throw new Error(msg.getMessage());
-	    System.err.println(msg.getMessage());
-	    msg.printStackTrace();
-	    throw new Error("Evaluation error in: "+print_query(e));
-	} catch (Exception ex) {
-	    if (Config.trace) {
-		System.err.println(ex.getMessage());
-		ex.printStackTrace();
-	    }
-	    throw new Error("Evaluation error in: "+print_query(e));
-	}
+        throw new Error("Cannot evaluate the expression: "+e);
+        } catch (Error msg) {
+            if (!Config.trace)
+                throw new Error(msg.getMessage());
+            System.err.println(msg.getMessage());
+            msg.printStackTrace();
+            throw new Error("Evaluation error in: "+print_query(e));
+        } catch (Exception ex) {
+            if (Config.trace) {
+                System.err.println(ex.getMessage());
+                ex.printStackTrace();
+            }
+            throw new Error("Evaluation error in: "+print_query(e));
+        }
     }
 
     /** evaluate an MRQL expression in memory */
     final static MRData evalE ( final Tree e ) {
-	return evalE(e,null);
+        return evalE(e,null);
     }
 
     /** evaluate MRQL physical operators in memory (returns a Bag) */
     final static Bag evalS ( final Tree e, final Environment env ) {
-	return evalM(e,env);
+        return evalM(e,env);
     }
 
     /** evaluate MRQL physical operators in memory (returns a Bag) */
     final static Bag evalM ( final Tree e, final Environment env ) {
-	if (Config.trace_execution) {
-	    tab_count += 3;
-	    System.out.println(tabs(tab_count)+print_query(e));
-	};
-	Bag res = evalMM(e,env);
-	if (Config.trace_execution) {
-	    System.out.println(tabs(tab_count)+"-> "+res);
-	    tab_count -= 3;
-	};
-	return res;
+        if (Config.trace_execution) {
+            tab_count += 3;
+            System.out.println(tabs(tab_count)+print_query(e));
+        };
+        Bag res = evalMM(e,env);
+        if (Config.trace_execution) {
+            System.out.println(tabs(tab_count)+"-> "+res);
+            tab_count -= 3;
+        };
+        return res;
     }
 
     /** evaluate MRQL physical operators in memory (returns a Bag) */
     final static Bag evalMM ( final Tree e, final Environment env ) {
-	try {
-	    match e {
-	    case cMap(`f,`s):
-		return MapReduceAlgebra.cmap(evalF(f,env),evalM(s,env));
-	    case AggregateMap(`f,`acc,`zero,`s):
-		return new Bag(MapReduceAlgebra.aggregate(evalF(acc,env),evalE(zero,env),
-							  evalM(#<cMap(`f,`s)>,env)));
-	    case MapReduce(`m,`r,`s,_):
-		return MapReduceAlgebra.mapReduce(
-				   evalF(m,env),
-				   evalF(r,env),
-				   evalM(s,env));
-	    case MapAggregateReduce(`m,`r,`acc,`zero,`s,_):
-		return new Bag(MapReduceAlgebra.aggregate(evalF(acc,env),evalE(zero,env),
-							  evalM(#<MapReduce(`m,`r,`s,false)>,env)));
-	    case MapCombineReduce(`m,`c,`r,`s,_):
-		return MapReduceAlgebra.mapReduce(
-				   evalF(m,env),
-				   evalF(r,env),
-				   evalM(s,env));
-	    case MapReduce2(`mx,`my,`r,`x,`y,_):
-		return MapReduceAlgebra.mapReduce2(
-				evalF(mx,env),
-				evalF(my,env),
-				evalF(r,env),
-				evalM(x,env),
-				evalM(y,env));
-	    case MapCombineReduce2(`mx,`my,`c,`r,`x,`y,_):
-		return MapReduceAlgebra.mapReduce2(
-				evalF(mx,env),
-				evalF(my,env),
-				evalF(r,env),
-				evalM(x,env),
-				evalM(y,env));
-	    case MapAggregateReduce2(`mx,`my,`r,`acc,`zero,`x,`y,_):
-		return new Bag(MapReduceAlgebra.aggregate(evalF(acc,env),evalE(zero,env),
-							  evalM(#< MapReduce2(`mx,`my,`r,`x,`y,false)>,env)));
-	    case MapJoin(`kx,`ky,`r,`x,`y):
-		return MapReduceAlgebra.mapJoin(
-				evalF(kx,env),
-				evalF(ky,env),
-				evalF(r,env),
-				evalM(x,env),
-				evalM(y,env));
-	    case MapAggregateJoin(`kx,`ky,`r,`acc,`zero,`x,`y):
-		return new Bag(MapReduceAlgebra.aggregate(evalF(acc,env),evalE(zero,env),
-							  evalM(#<MapJoin(`kx,`ky,`r,`x,`y)>,env)));
-	    case GroupByJoin(`kx,`ky,`gx,`gy,`m,`c,`r,`x,`y,`o):
-		return MapReduceAlgebra.groupByJoin(
-				evalF(kx,env),
-				evalF(ky,env),
-				evalF(gx,env),
-				evalF(gy,env),
-				evalF(m,env),
-				evalF(c,env),
-				evalF(r,env),
-				evalM(x,env),
-				evalM(y,env));
-	    case CrossProduct(`mx,`my,`r,`x,`y):
-		return MapReduceAlgebra.crossProduct(
-				evalF(mx,env),
-				evalF(my,env),
-				evalF(r,env),
-				evalM(x,env),
-				evalM(y,env));
-	    case CrossAggregateProduct(`mx,`my,`r,`acc,`zero,`x,`y):
-		return new Bag(MapReduceAlgebra.aggregate(evalF(acc,env),evalE(zero,env),
-							  evalM(#<CrossProduct(`mx,`my,`r,`x,`y)>,env)));
-	    case BinarySource(`file,_):
-		return (Bag)MapReduceAlgebra.read_binary(file.stringValue());
-	    case BSPSource(`n,BinarySource(`file,_)):
-		return (Bag)MapReduceAlgebra.read_binary((int)((LongLeaf)n).value(),
-							 file.stringValue());
-	    case BSPSource(`n,ParsedSource(`parser,`file,...args)):
-		if (!(n instanceof LongLeaf))
-		    fail;
-		Parser p = DataSource.parserDirectory.get(parser.toString()).newInstance();
-		if (p == null)
-		    throw new Error("Unknown parser: "+parser);
-		return MapReduceAlgebra.parsedSource((int)(((LongLeaf)n).value()),p,
-						     ((MR_string)evalE(file,env)).get(),args);
-	    case ParsedSource(`parser,`file,...args):
-		Parser p = DataSource.parserDirectory.get(parser.toString()).newInstance();
-		if (p == null)
-		    throw new Error("Unknown parser: "+parser);
-		return MapReduceAlgebra.parsedSource(p,((MR_string)evalE(file,env)).get(),args);
-	    case Merge(`x,`y):
-		return evalM(x,env).union(evalM(y,env));
-	    case Repeat(lambda(`v,`b),`s,`n):
-		final String vs = v.toString();
-		final Tree body = b;
-		Function loop = new Function() {
-			final public MRData eval ( final MRData x ) {
-			    return evalM(body,new Environment(vs,x,env));
-			}
-		    };
-		return MapReduceAlgebra.repeat(loop,(Bag)evalM(s,env),((MR_int)evalE(n,env)).get());
-	    case Closure(lambda(`v,`b),`s,`n):
-		final String vs = v.toString();
-		final Tree body = b;
-		Function loop = new Function() {
-			final public MRData eval ( final MRData x ) {
-			    return evalM(body,new Environment(vs,x,env));
-			}
-		    };
-		return MapReduceAlgebra.closure(loop,(Bag)evalM(s,env),((MR_int)evalE(n,env)).get());
-	    case Generator(`min,`max,`size):
-		return MapReduceAlgebra.generator(((MR_long)evalE(min,env)).get(),
-						  ((MR_long)evalE(max,env)).get());
-	    case BSPSource(`n,Generator(`min,`max,`size)):
-		return MapReduceAlgebra.generator((int)((LongLeaf)n).value(),
-						  ((MR_long)evalE(min,env)).get(),
-						  ((MR_long)evalE(max,env)).get());
-	    case Dump(`s):
-		Bag bs = (Bag)evalE(s,env);
-		final Iterator<MRData> iter = bs.iterator();
-		return new Bag(new BagIterator() {
-			public boolean hasNext () {
-			    return iter.hasNext();
-			}
-			public MRData next () {
-			    return new Tuple(new MR_int(0),iter.next());
-			}
-		    });
-	    case let(`v,`u,`body):
-		return evalM(body,new Environment(v.toString(),evalE(u,env),env));
-	    case apply(`f,`arg):
-		if (!f.is_variable())
-		    return (Bag)evalF(f,env).eval(evalE(arg));
-		MRData fnc = lookup_global_binding(f.toString());
-		if (fnc == null)
-		    throw new Error("Unknown function: "+f);
-		MRData t = evalE(arg,env);
-		if (!(t instanceof Tuple))
-		    throw new Error("Expected a tuple in function application: "+t);
-		return (Bag)((Lambda)fnc).lambda().eval(t);
-	    case BSPSource(`n,`s):
-		final MR_int i = new MR_int((int)((LongLeaf)n).value());
-		Bag bs = (Bag)evalE(s,env);
-		final Iterator<MRData> iter = bs.iterator();
-		return new Bag(new BagIterator() {
-			public boolean hasNext () {
-			    return iter.hasNext();
-			}
-			public MRData next () {
-			    return new Tuple(i,iter.next());
-			}
-		    });
-	    case BSP(...):
-		MRData res = evalE(e,env);
-		if (res instanceof Bag)
-		    return (Bag)res;
-		else return new Bag(res);
-	    case `v:
-		if (!v.is_variable())
-		    fail;
-		MRData x = variable_lookup(v.toString(),env);
-		if (x != null)
-		    return (Bag)x;
-		x = lookup_global_binding(v.toString());
-		if (x != null)
-		    return (Bag)x;
-		throw new Error("Variable "+v+" is not bound");
-	    };
-	    throw new Error("Cannot evaluate the plan: "+e);
-	} catch (Error msg) {
-	    if (!Config.trace)
-		throw new Error(msg.getMessage());
-	    System.err.println(msg.getMessage());
-	    msg.printStackTrace();
-	    throw new Error("Evaluation error in: "+print_query(e));
-	} catch (Exception ex) {
-	    if (Config.trace)
-		ex.printStackTrace();
-	    throw new Error("Evaluation error in: "+print_query(e));
-	}
+        try {
+            match e {
+            case cMap(`f,`s):
+                return MapReduceAlgebra.cmap(evalF(f,env),evalM(s,env));
+            case AggregateMap(`f,`acc,`zero,`s):
+                return new Bag(MapReduceAlgebra.aggregate(evalF(acc,env),evalE(zero,env),
+                                                          evalM(#<cMap(`f,`s)>,env)));
+            case MapReduce(`m,`r,`s,_):
+                return MapReduceAlgebra.mapReduce(
+                                   evalF(m,env),
+                                   evalF(r,env),
+                                   evalM(s,env));
+            case MapAggregateReduce(`m,`r,`acc,`zero,`s,_):
+                return new Bag(MapReduceAlgebra.aggregate(evalF(acc,env),evalE(zero,env),
+                                                          evalM(#<MapReduce(`m,`r,`s,false)>,env)));
+            case MapCombineReduce(`m,`c,`r,`s,_):
+                return MapReduceAlgebra.mapReduce(
+                                   evalF(m,env),
+                                   evalF(r,env),
+                                   evalM(s,env));
+            case MapReduce2(`mx,`my,`r,`x,`y,_):
+                return MapReduceAlgebra.mapReduce2(
+                                evalF(mx,env),
+                                evalF(my,env),
+                                evalF(r,env),
+                                evalM(x,env),
+                                evalM(y,env));
+            case MapCombineReduce2(`mx,`my,`c,`r,`x,`y,_):
+                return MapReduceAlgebra.mapReduce2(
+                                evalF(mx,env),
+                                evalF(my,env),
+                                evalF(r,env),
+                                evalM(x,env),
+                                evalM(y,env));
+            case MapAggregateReduce2(`mx,`my,`r,`acc,`zero,`x,`y,_):
+                return new Bag(MapReduceAlgebra.aggregate(evalF(acc,env),evalE(zero,env),
+                                                          evalM(#< MapReduce2(`mx,`my,`r,`x,`y,false)>,env)));
+            case MapJoin(`kx,`ky,`r,`x,`y):
+                return MapReduceAlgebra.mapJoin(
+                                evalF(kx,env),
+                                evalF(ky,env),
+                                evalF(r,env),
+                                evalM(x,env),
+                                evalM(y,env));
+            case MapAggregateJoin(`kx,`ky,`r,`acc,`zero,`x,`y):
+                return new Bag(MapReduceAlgebra.aggregate(evalF(acc,env),evalE(zero,env),
+                                                          evalM(#<MapJoin(`kx,`ky,`r,`x,`y)>,env)));
+            case GroupByJoin(`kx,`ky,`gx,`gy,`m,`c,`r,`x,`y,`o):
+                return MapReduceAlgebra.groupByJoin(
+                                evalF(kx,env),
+                                evalF(ky,env),
+                                evalF(gx,env),
+                                evalF(gy,env),
+                                evalF(m,env),
+                                evalF(c,env),
+                                evalF(r,env),
+                                evalM(x,env),
+                                evalM(y,env));
+            case CrossProduct(`mx,`my,`r,`x,`y):
+                return MapReduceAlgebra.crossProduct(
+                                evalF(mx,env),
+                                evalF(my,env),
+                                evalF(r,env),
+                                evalM(x,env),
+                                evalM(y,env));
+            case CrossAggregateProduct(`mx,`my,`r,`acc,`zero,`x,`y):
+                return new Bag(MapReduceAlgebra.aggregate(evalF(acc,env),evalE(zero,env),
+                                                          evalM(#<CrossProduct(`mx,`my,`r,`x,`y)>,env)));
+            case BinarySource(`file,_):
+                return (Bag)MapReduceAlgebra.read_binary(file.stringValue());
+            case BSPSource(`n,BinarySource(`file,_)):
+                return (Bag)MapReduceAlgebra.read_binary((int)((LongLeaf)n).value(),
+                                                         file.stringValue());
+            case BSPSource(`n,ParsedSource(`parser,`file,...args)):
+                if (!(n instanceof LongLeaf))
+                    fail;
+                Parser p = DataSource.parserDirectory.get(parser.toString()).newInstance();
+                if (p == null)
+                    throw new Error("Unknown parser: "+parser);
+                return MapReduceAlgebra.parsedSource((int)(((LongLeaf)n).value()),p,
+                                                     ((MR_string)evalE(file,env)).get(),args);
+            case ParsedSource(`parser,`file,...args):
+                Parser p = DataSource.parserDirectory.get(parser.toString()).newInstance();
+                if (p == null)
+                    throw new Error("Unknown parser: "+parser);
+                return MapReduceAlgebra.parsedSource(p,((MR_string)evalE(file,env)).get(),args);
+            case Merge(`x,`y):
+                return evalM(x,env).union(evalM(y,env));
+            case Repeat(lambda(`v,`b),`s,`n):
+                final String vs = v.toString();
+                final Tree body = b;
+                Function loop = new Function() {
+                        final public MRData eval ( final MRData x ) {
+                            return evalM(body,new Environment(vs,x,env));
+                        }
+                    };
+                return MapReduceAlgebra.repeat(loop,(Bag)evalM(s,env),((MR_int)evalE(n,env)).get());
+            case Closure(lambda(`v,`b),`s,`n):
+                final String vs = v.toString();
+                final Tree body = b;
+                Function loop = new Function() {
+                        final public MRData eval ( final MRData x ) {
+                            return evalM(body,new Environment(vs,x,env));
+                        }
+                    };
+                return MapReduceAlgebra.closure(loop,(Bag)evalM(s,env),((MR_int)evalE(n,env)).get());
+            case Generator(`min,`max,`size):
+                return MapReduceAlgebra.generator(((MR_long)evalE(min,env)).get(),
+                                                  ((MR_long)evalE(max,env)).get());
+            case BSPSource(`n,Generator(`min,`max,`size)):
+                return MapReduceAlgebra.generator((int)((LongLeaf)n).value(),
+                                                  ((MR_long)evalE(min,env)).get(),
+                                                  ((MR_long)evalE(max,env)).get());
+            case Dump(`s):
+                Bag bs = (Bag)evalE(s,env);
+                final Iterator<MRData> iter = bs.iterator();
+                return new Bag(new BagIterator() {
+                        public boolean hasNext () {
+                            return iter.hasNext();
+                        }
+                        public MRData next () {
+                            return new Tuple(new MR_int(0),iter.next());
+                        }
+                    });
+            case let(`v,`u,`body):
+                return evalM(body,new Environment(v.toString(),evalE(u,env),env));
+            case apply(`f,`arg):
+                if (!f.is_variable())
+                    return (Bag)evalF(f,env).eval(evalE(arg));
+                MRData fnc = lookup_global_binding(f.toString());
+                if (fnc == null)
+                    throw new Error("Unknown function: "+f);
+                MRData t = evalE(arg,env);
+                if (!(t instanceof Tuple))
+                    throw new Error("Expected a tuple in function application: "+t);
+                return (Bag)((Lambda)fnc).lambda().eval(t);
+            case BSPSource(`n,`s):
+                final MR_int i = new MR_int((int)((LongLeaf)n).value());
+                Bag bs = (Bag)evalE(s,env);
+                final Iterator<MRData> iter = bs.iterator();
+                return new Bag(new BagIterator() {
+                        public boolean hasNext () {
+                            return iter.hasNext();
+                        }
+                        public MRData next () {
+                            return new Tuple(i,iter.next());
+                        }
+                    });
+            case BSP(...):
+                MRData res = evalE(e,env);
+                if (res instanceof Bag)
+                    return (Bag)res;
+                else return new Bag(res);
+            case `v:
+                if (!v.is_variable())
+                    fail;
+                MRData x = variable_lookup(v.toString(),env);
+                if (x != null)
+                    return (Bag)x;
+                x = lookup_global_binding(v.toString());
+                if (x != null)
+                    return (Bag)x;
+                throw new Error("Variable "+v+" is not bound");
+            };
+            throw new Error("Cannot evaluate the plan: "+e);
+        } catch (Error msg) {
+            if (!Config.trace)
+                throw new Error(msg.getMessage());
+            System.err.println(msg.getMessage());
+            msg.printStackTrace();
+            throw new Error("Evaluation error in: "+print_query(e));
+        } catch (Exception ex) {
+            if (Config.trace)
+                ex.printStackTrace();
+            throw new Error("Evaluation error in: "+print_query(e));
+        }
     }
 
     /** replace all non-free variables with their reified values */
     private final static Tree closure ( Tree e, Environment env, Trees local_vars ) {
-	match e {
-	case lambda(`x,`b):
-	    return #<lambda(`x,`(closure(b,env,local_vars.cons(x))))>;
- 	case apply(`f,...as):
-	    Trees bs = #[];
-	    for (Tree a: as)
-		bs = bs.append(closure(a,env,local_vars));
-	    return #<apply(`f,...bs)>;
-	case `f(...as):
-	    Trees bs = #[];
-	    for (Tree a: as)
-		bs = bs.append(closure(a,env,local_vars));
-	    return #<`f(...bs)>;
-	case null: return null;
-	case `v:
-	    if (!v.is_variable())
-		fail;
-	    if (local_vars.member(v))
-		fail;
-	    MRData x = variable_lookup(v.toString(),env);
-	    if (x != null)
-		if (!(x instanceof MR_dataset))
-		    return reify(x);
-	    x = lookup_global_binding(v.toString());
-	    if (x != null)
-		if (!(x instanceof MR_dataset))
-		    return reify(x);
-	};
-	return e;
+        match e {
+        case lambda(`x,`b):
+            return #<lambda(`x,`(closure(b,env,local_vars.cons(x))))>;
+        case apply(`f,...as):
+            Trees bs = #[];
+            for (Tree a: as)
+                bs = bs.append(closure(a,env,local_vars));
+            return #<apply(`f,...bs)>;
+        case `f(...as):
+            Trees bs = #[];
+            for (Tree a: as)
+                bs = bs.append(closure(a,env,local_vars));
+            return #<`f(...bs)>;
+        case null: return null;
+        case `v:
+            if (!v.is_variable())
+                fail;
+            if (local_vars.member(v))
+                fail;
+            MRData x = variable_lookup(v.toString(),env);
+            if (x != null)
+                if (!(x instanceof MR_dataset))
+                    return reify(x);
+            x = lookup_global_binding(v.toString());
+            if (x != null)
+                if (!(x instanceof MR_dataset))
+                    return reify(x);
+        };
+        return e;
     }
 
     /** replace all non-free variables with their reified values */
     final static Tree closure ( Tree e, Environment env ) {
-	return closure(e,env,#[]);
+        return closure(e,env,#[]);
     }
 
     static Tree query_type;
@@ -734,83 +734,83 @@ public class Interpreter extends TypeInference {
 
     /** translate an MRQL expression e into a physical plan */
     final static Tree translate_expression ( Tree e ) {
-	try {
-	    if (Config.trace)
-		System.out.println("Query at line "+Main.parser.line_pos()+": "+print_query(e));
-	    Tree qt = TypeInference.type_inference(e);
-	    if (!Config.quiet_execution)
-		System.out.println("Query type: "+print_type(qt));
-	    query_type = qt;
-	    Tree ne = Normalization.remove_groupby(e);
-	    if (Config.trace)
-		System.out.println("After removing group-by:\n"+ne.pretty(0));
-	    ne = Simplification.rename(ne);
-	    if (Config.trace)
-		System.out.println("After renaming variables:\n"+ne.pretty(0));
-	    ne = Simplification.rename(Normalization.normalize_all(ne));
-	    if (Config.trace)
-		System.out.println("Normalized query:\n"+ne.pretty(0));
-	    type_inference(ne);
-	    ne = QueryPlan.best_plan(ne);
-	    if (Config.trace)
-		System.out.println("Best plan:\n"+ne.pretty(0));
-	    ne = Simplification.rename(Translator.translate_select(ne));
-	    if (Config.trace)
-		System.out.println("After removing select-queries:\n"+ne.pretty(0));
-	    type_inference(ne);
-	    ne = Simplification.simplify_all(ne);
-	    if (Config.trace)
-		System.out.println("Algebra expression:\n"+ne.pretty(0));
-	    Tree pt = type_inference(ne);
-	    if (Config.trace)
-		System.out.println("Algebraic type: "+print_type(pt));
-	    ne = AlgebraicOptimization.translate_all(ne);
-	    if (Config.trace)
-		System.out.println("Translated expression:\n"+ne.pretty(0));
-	    Tree et = TypeInference.type_inference(ne);
-	    is_dataset = PlanGeneration.is_dataset_expr(ne);
-	    if (Config.trace)
-		System.out.println("Physical plan type: "+print_type(et));
-	    repeat_variables = #[];
-	    ne = Simplification.simplify_all(ne);
-	    Tree plan = PlanGeneration.makePlan(ne);
-	    if (Config.bsp_mode) {
-		BSPTranslator.reset();
-		if (Config.trace)
-		    System.out.println("Physical plan:\n"+plan.pretty(0));
-		plan = Materialization.materialize_terms(BSPTranslator.constructBSPplan(plan));
-		if (Config.trace)
-		    System.out.println("BSP plan:\n"+plan.pretty(0));
-		else {
-		    String splan = print_plan(plan,0,false);
-		    if (!splan.equals("") && !Config.quiet_execution)
-			System.out.println("BSP plan:\n"+splan);
-		}
-	    } else {
-		if (Config.hadoop_mode)
-		    plan = PlanGeneration.physical_plan(plan);
-		plan = Materialization.materialize_terms(AlgebraicOptimization.common_factoring(plan));
-		if (Config.trace)
-		    System.out.println("Physical plan:\n"+plan.pretty(0));
-		else {
-		    String splan = print_plan(plan,0,false);
-		    if (!splan.equals("") && !Config.quiet_execution)
-			System.out.println("Physical plan:\n"+splan);
-		}
-	    };
-	    if (Config.compile_functional_arguments)
-		plan = Compiler.compile(plan);
-	    return plan;
-	} catch (Error x) {
-	    if (Config.testing)
-		throw new Error(x);
-	    if (!Config.trace && x.toString().endsWith("Type Error"))
-		return null;
-	    if (x.getMessage() != null) // system error
-		System.err.println("*** MRQL System Error at line "+Main.parser.line_pos()+": "+x);
-	    if (Config.trace)
-		x.printStackTrace(System.err);
-	    return null;
-	}
+        try {
+            if (Config.trace)
+                System.out.println("Query at line "+Main.parser.line_pos()+": "+print_query(e));
+            Tree qt = TypeInference.type_inference(e);
+            if (!Config.quiet_execution)
+                System.out.println("Query type: "+print_type(qt));
+            query_type = qt;
+            Tree ne = Normalization.remove_groupby(e);
+            if (Config.trace)
+                System.out.println("After removing group-by:\n"+ne.pretty(0));
+            ne = Simplification.rename(ne);
+            if (Config.trace)
+                System.out.println("After renaming variables:\n"+ne.pretty(0));
+            ne = Simplification.rename(Normalization.normalize_all(ne));
+            if (Config.trace)
+                System.out.println("Normalized query:\n"+ne.pretty(0));
+            type_inference(ne);
+            ne = QueryPlan.best_plan(ne);
+            if (Config.trace)
+                System.out.println("Best plan:\n"+ne.pretty(0));
+            ne = Simplification.rename(Translator.translate_select(ne));
+            if (Config.trace)
+                System.out.println("After removing select-queries:\n"+ne.pretty(0));
+            type_inference(ne);
+            ne = Simplification.simplify_all(ne);
+            if (Config.trace)
+                System.out.println("Algebra expression:\n"+ne.pretty(0));
+            Tree pt = type_inference(ne);
+            if (Config.trace)
+                System.out.println("Algebraic type: "+print_type(pt));
+            ne = AlgebraicOptimization.translate_all(ne);
+            if (Config.trace)
+                System.out.println("Translated expression:\n"+ne.pretty(0));
+            Tree et = TypeInference.type_inference(ne);
+            is_dataset = PlanGeneration.is_dataset_expr(ne);
+            if (Config.trace)
+                System.out.println("Physical plan type: "+print_type(et));
+            repeat_variables = #[];
+            ne = Simplification.simplify_all(ne);
+            Tree plan = PlanGeneration.makePlan(ne);
+            if (Config.bsp_mode) {
+                BSPTranslator.reset();
+                if (Config.trace)
+                    System.out.println("Physical plan:\n"+plan.pretty(0));
+                plan = Materialization.materialize_terms(BSPTranslator.constructBSPplan(plan));
+                if (Config.trace)
+                    System.out.println("BSP plan:\n"+plan.pretty(0));
+                else {
+                    String splan = print_plan(plan,0,false);
+                    if (!splan.equals("") && !Config.quiet_execution)
+                        System.out.println("BSP plan:\n"+splan);
+                }
+            } else {
+                if (Config.hadoop_mode)
+                    plan = PlanGeneration.physical_plan(plan);
+                plan = Materialization.materialize_terms(AlgebraicOptimization.common_factoring(plan));
+                if (Config.trace)
+                    System.out.println("Physical plan:\n"+plan.pretty(0));
+                else {
+                    String splan = print_plan(plan,0,false);
+                    if (!splan.equals("") && !Config.quiet_execution)
+                        System.out.println("Physical plan:\n"+splan);
+                }
+            };
+            if (Config.compile_functional_arguments)
+                plan = Compiler.compile(plan);
+            return plan;
+        } catch (Error x) {
+            if (Config.testing)
+                throw new Error(x);
+            if (!Config.trace && x.toString().endsWith("Type Error"))
+                return null;
+            if (x.getMessage() != null) // system error
+                System.err.println("*** MRQL System Error at line "+Main.parser.line_pos()+": "+x);
+            if (Config.trace)
+                x.printStackTrace(System.err);
+            return null;
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/3de9e485/src/main/java/core/Inv.java
----------------------------------------------------------------------
diff --git a/src/main/java/core/Inv.java b/src/main/java/core/Inv.java
index 78253c1..fade0db 100644
--- a/src/main/java/core/Inv.java
+++ b/src/main/java/core/Inv.java
@@ -32,31 +32,31 @@ final public class Inv extends MRData {
     public MRData value () { return value; }
 
     final public void write ( DataOutput out ) throws IOException {
-	out.writeByte(MRContainer.INV);
-	value.write(out);
+        out.writeByte(MRContainer.INV);
+        value.write(out);
     }
 
     final public static Inv read ( DataInput in ) throws IOException {
-	return new Inv(MRContainer.read(in));
+        return new Inv(MRContainer.read(in));
     }
 
     public void readFields ( DataInput in ) throws IOException {
-	value.readFields(in);
+        value.readFields(in);
     }
 
     public int compareTo ( MRData x ) {
-	assert(x instanceof Inv);
-	return -value.compareTo(((Inv)x).value);
+        assert(x instanceof Inv);
+        return -value.compareTo(((Inv)x).value);
     }
 
     final public static int compare ( byte[] x, int xs, int xl, byte[] y, int ys, int yl, int[] size ) {
-	int n = MRContainer.compare(x,xs,xl,y,ys,yl,size);
-	size[0] += 1;
-	return -n;
+        int n = MRContainer.compare(x,xs,xl,y,ys,yl,size);
+        size[0] += 1;
+        return -n;
     }
 
     public boolean equals ( Object x ) {
-	return value.equals(((Inv)x).value);
+        return value.equals(((Inv)x).value);
     }
 
     public int hashCode () { return value.hashCode(); }

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/3de9e485/src/main/java/core/JsonParser.java
----------------------------------------------------------------------
diff --git a/src/main/java/core/JsonParser.java b/src/main/java/core/JsonParser.java
index c87bcc9..b5376dd 100644
--- a/src/main/java/core/JsonParser.java
+++ b/src/main/java/core/JsonParser.java
@@ -31,60 +31,60 @@ public class JsonParser implements Parser {
     JsonSplitter splitter;
 
     public void initialize ( Trees args ) {
-	try {
-	    if (args.length() > 0) {
-		if (!(args.nth(0) instanceof Node)
-		    || !(((Node)args.nth(0)).name().equals("list")
-			 || ((Node)args.nth(0)).name().equals("bag")))
-		    throw new Error("Expected a bag of synchronization tagnames in JSON source: "+args.nth(0));
-		Trees ts = ((Node)args.nth(0)).children();
-		if (ts.length() == 0)
-		    throw new Error("Expected at least one synchronization tagname in JSON source: "+ts);
-		tags = new String[ts.length()];
-		for ( int i = 0; i < tags.length; i++ )
-		    if (ts.nth(i) instanceof StringLeaf)
-			tags[i] = ((StringLeaf)(ts.nth(i))).value();
-		    else throw new Error("Expected a synchronization tagname in JSON source: "+ts.nth(i));
-	    }
-	} catch (Exception e) {
-	    throw new Error(e);
-	}
+        try {
+            if (args.length() > 0) {
+                if (!(args.nth(0) instanceof Node)
+                    || !(((Node)args.nth(0)).name().equals("list")
+                         || ((Node)args.nth(0)).name().equals("bag")))
+                    throw new Error("Expected a bag of synchronization tagnames in JSON source: "+args.nth(0));
+                Trees ts = ((Node)args.nth(0)).children();
+                if (ts.length() == 0)
+                    throw new Error("Expected at least one synchronization tagname in JSON source: "+ts);
+                tags = new String[ts.length()];
+                for ( int i = 0; i < tags.length; i++ )
+                    if (ts.nth(i) instanceof StringLeaf)
+                        tags[i] = ((StringLeaf)(ts.nth(i))).value();
+                    else throw new Error("Expected a synchronization tagname in JSON source: "+ts.nth(i));
+            }
+        } catch (Exception e) {
+            throw new Error(e);
+        }
     }
 
     public void open ( String file ) {
-	try {
-	    splitter = new JsonSplitter(tags,file,new DataOutputBuffer());
-	} catch (Exception e) {
-	    throw new Error(e);
-	}
+        try {
+            splitter = new JsonSplitter(tags,file,new DataOutputBuffer());
+        } catch (Exception e) {
+            throw new Error(e);
+        }
     }
 
     public void open ( FSDataInputStream fsin, long start, long end ) {
-	try {
-	    splitter = new JsonSplitter(tags,fsin,start,end,new DataOutputBuffer());
-	} catch (Exception e) {
-	    throw new Error(e);
-	}
+        try {
+            splitter = new JsonSplitter(tags,fsin,start,end,new DataOutputBuffer());
+        } catch (Exception e) {
+            throw new Error(e);
+        }
     }
 
     public Tree type () { return new VariableLeaf("JSON"); }
 
     public String slice () {
-	if (splitter.hasNext()) {
-	    DataOutputBuffer b = splitter.next();
-	    return new String(b.getData(),0,b.getLength());
-	} else return null;
+        if (splitter.hasNext()) {
+            DataOutputBuffer b = splitter.next();
+            return new String(b.getData(),0,b.getLength());
+        } else return null;
     }
 
     public Bag parse ( String s ) {
-	try {
-	    JSONLex scanner = new JSONLex(new StringReader(s));
-	    JSONParser parser = new JSONParser(scanner);
-	    parser.parse();
-	    return new Bag(parser.top_level);
-	} catch (Exception e) {
-	    System.err.println(e);
-	    return new Bag();
-	}
+        try {
+            JSONLex scanner = new JSONLex(new StringReader(s));
+            JSONParser parser = new JSONParser(scanner);
+            parser.parse();
+            return new Bag(parser.top_level);
+        } catch (Exception e) {
+            System.err.println(e);
+            return new Bag();
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/3de9e485/src/main/java/core/JsonSplitter.java
----------------------------------------------------------------------
diff --git a/src/main/java/core/JsonSplitter.java b/src/main/java/core/JsonSplitter.java
index 5e158f0..0e76390 100644
--- a/src/main/java/core/JsonSplitter.java
+++ b/src/main/java/core/JsonSplitter.java
@@ -37,100 +37,100 @@ final public class JsonSplitter implements Iterator<DataOutputBuffer> {
     final DataOutputBuffer buffer;
 
     JsonSplitter ( String[] tags, FSDataInputStream fsin, long start, long end,
-		  DataOutputBuffer buffer ) {
-	in_memory = false;
-	this.tags = tags;
-	this.fsin = fsin;
-	this.start = start;
-	this.end = end;
-	this.buffer = buffer;
-	scanner = new JSONLex(fsin);
-	try {
-	    fsin.seek(start);
-	} catch ( IOException e ) {
-	    System.err.println("*** Cannot parse the data split: "+fsin);
-	}
+                  DataOutputBuffer buffer ) {
+        in_memory = false;
+        this.tags = tags;
+        this.fsin = fsin;
+        this.start = start;
+        this.end = end;
+        this.buffer = buffer;
+        scanner = new JSONLex(fsin);
+        try {
+            fsin.seek(start);
+        } catch ( IOException e ) {
+            System.err.println("*** Cannot parse the data split: "+fsin);
+        }
     }
 
     JsonSplitter ( String[] tags, String file, DataOutputBuffer buffer ) {
-	in_memory = true;
-	try {
-	    in = new FileInputStream(file);
-	} catch ( Exception e ) {
-	    throw new Error("Cannot open the file: "+file);
-	};
-	this.tags = tags;
-	this.buffer = buffer;
-	scanner = new JSONLex(in);
+        in_memory = true;
+        try {
+            in = new FileInputStream(file);
+        } catch ( Exception e ) {
+            throw new Error("Cannot open the file: "+file);
+        };
+        this.tags = tags;
+        this.buffer = buffer;
+        scanner = new JSONLex(in);
     }
 
     public boolean hasNext () {
-	try {
-	    if (in_memory || fsin.getPos() < end)
-		if (skip())
-		    return store();
-	    return false;
-	} catch (Exception e) {
-	    System.err.println(e);
-	    return false;
-	}
+        try {
+            if (in_memory || fsin.getPos() < end)
+                if (skip())
+                    return store();
+            return false;
+        } catch (Exception e) {
+            System.err.println(e);
+            return false;
+        }
     }
 
     public DataOutputBuffer next () {
-	return buffer;
+        return buffer;
     }
 
     public void remove () { }
 
     boolean is_start_tag ( String tagname ) {
-	if (tags == null)
-	    return true;
-	for (String tag: tags)
-	    if (tag.contentEquals(tagname))
-		return true;
-	return false;
+        if (tags == null)
+            return true;
+        for (String tag: tags)
+            if (tag.contentEquals(tagname))
+                return true;
+        return false;
     }
 
     /** skip until the beginning of a split element */
     boolean skip () throws IOException {
-	while (true) {
-	    Symbol s = scanner.next_token();
-	    if (s.sym == jsym.EOF || (!in_memory && fsin.getPos() >= end))
-		return false;
-	    if (s.sym == jsym.STRING && is_start_tag((String)s.value)) {
-		String tag = (String)s.value;
-		if (scanner.next_token().sym == jsym.COLON) {
-		    buffer.reset();
-		    buffer.write('{');
-		    buffer.write('\"');
-		    for ( int i = 0; i < tag.length(); i++ )
-			buffer.write(tag.charAt(i));
-		    buffer.write('\"');
-		    buffer.write(':');
-		    return true;
-		}
-	    }
-	}
+        while (true) {
+            Symbol s = scanner.next_token();
+            if (s.sym == jsym.EOF || (!in_memory && fsin.getPos() >= end))
+                return false;
+            if (s.sym == jsym.STRING && is_start_tag((String)s.value)) {
+                String tag = (String)s.value;
+                if (scanner.next_token().sym == jsym.COLON) {
+                    buffer.reset();
+                    buffer.write('{');
+                    buffer.write('\"');
+                    for ( int i = 0; i < tag.length(); i++ )
+                        buffer.write(tag.charAt(i));
+                    buffer.write('\"');
+                    buffer.write(':');
+                    return true;
+                }
+            }
+        }
     }
 
     /** store one split element into the buffer; may cross split boundaries */
     boolean store () throws IOException {
-	int nest = 0;
-	while (true) {
-	    Symbol s = scanner.next_token();
-	    if (s.sym == jsym.EOF)
-		return false;
-	    if (s.sym == jsym.O_BEGIN || s.sym == jsym.A_BEGIN)
-		nest++;
-	    else if (s.sym == jsym.O_END || s.sym == jsym.A_END)
-		nest--;
-	    String text = scanner.text();
-	    for ( int i = 0; i < text.length(); i++ )
-		buffer.write(text.charAt(i));
-	    if (nest == 0) {
-		buffer.write('}');
-		return true;
-	    }
-	}
+        int nest = 0;
+        while (true) {
+            Symbol s = scanner.next_token();
+            if (s.sym == jsym.EOF)
+                return false;
+            if (s.sym == jsym.O_BEGIN || s.sym == jsym.A_BEGIN)
+                nest++;
+            else if (s.sym == jsym.O_END || s.sym == jsym.A_END)
+                nest--;
+            String text = scanner.text();
+            for ( int i = 0; i < text.length(); i++ )
+                buffer.write(text.charAt(i));
+            if (nest == 0) {
+                buffer.write('}');
+                return true;
+            }
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/3de9e485/src/main/java/core/Lambda.java
----------------------------------------------------------------------
diff --git a/src/main/java/core/Lambda.java b/src/main/java/core/Lambda.java
index 6ec8472..d0cfd7a 100644
--- a/src/main/java/core/Lambda.java
+++ b/src/main/java/core/Lambda.java
@@ -35,18 +35,18 @@ final public class Lambda extends MRData {
     public Function lambda () { return lambda; }
 
     final public void write ( DataOutput out ) throws IOException {
-	throw new Error("Functions are not serializable");
+        throw new Error("Functions are not serializable");
     }
 
     public void readFields ( DataInput in ) throws IOException {
-	throw new Error("Functions are not serializable");
+        throw new Error("Functions are not serializable");
     }
 
     public int compareTo ( MRData x ) {
-	throw new Error("Functions cannot be compared");
+        throw new Error("Functions cannot be compared");
     }
 
     public boolean equals ( Object x ) {
-	throw new Error("Functions cannot be compared");
+        throw new Error("Functions cannot be compared");
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/3de9e485/src/main/java/core/LineParser.gen
----------------------------------------------------------------------
diff --git a/src/main/java/core/LineParser.gen b/src/main/java/core/LineParser.gen
index 5160002..f4af146 100644
--- a/src/main/java/core/LineParser.gen
+++ b/src/main/java/core/LineParser.gen
@@ -42,188 +42,188 @@ final public class LineParser implements Parser {
     int type_length;
 
     static byte[] relational_record ( Tree tp ) {
-	match tp {
-	case record(...al):
-	    Trees attrs = #[];
-	    byte[] types = new byte[al.length()];
-	    for ( int i = 0; i < types.length; i++ )
-		match al.nth(i) {
-		case bind(`v,any):
-		    types[i] = -1;
-		    if (attrs.member(v))
-			TypeInference.type_error(tp,"Duplicate record attribute name: "+v);
-		    attrs = attrs.append(v);
-		case bind(`v,`t):
-		    if (!t.is_variable())
-			fail;
-		    types[i] = MRContainer.type_code(t.toString());
-		    if (attrs.member(v))
-			TypeInference.type_error(tp,"Duplicate record attribute name: "+v);
-		    attrs = attrs.append(v);
-		    if (!MRContainer.basic_type(types[i]))
-			TypeInference.error("Expected a basic type for a relational attribute: "+t);
-		case `t: TypeInference.error("Expected a basic type for a relational attribute: "
-					     +TypeInference.print_type(t));
-		};
-	    return types;
-	case tuple(...al):
-	    byte[] types = new byte[al.length()];
-	    for ( int i = 0; i < types.length; i++ )
-		match al.nth(i) {
-		case any:
-		    types[i] = -1;
-		case `t:
-		    if (!t.is_variable())
-			fail;
-		    types[i] = MRContainer.type_code(t.toString());
-		    if (!MRContainer.basic_type(types[i]))
-			TypeInference.error("Expected a basic type for a relational attribute: "+t);
-		case `t: TypeInference.error("Expected a basic type for a relational attribute: "
-					     +TypeInference.print_type(t));
-		};
-	    return types;
-	};
-	TypeInference.error("Expected a relational record or a tuple type: "
-			    +TypeInference.print_type(tp));
-	return null;
+        match tp {
+        case record(...al):
+            Trees attrs = #[];
+            byte[] types = new byte[al.length()];
+            for ( int i = 0; i < types.length; i++ )
+                match al.nth(i) {
+                case bind(`v,any):
+                    types[i] = -1;
+                    if (attrs.member(v))
+                        TypeInference.type_error(tp,"Duplicate record attribute name: "+v);
+                    attrs = attrs.append(v);
+                case bind(`v,`t):
+                    if (!t.is_variable())
+                        fail;
+                    types[i] = MRContainer.type_code(t.toString());
+                    if (attrs.member(v))
+                        TypeInference.type_error(tp,"Duplicate record attribute name: "+v);
+                    attrs = attrs.append(v);
+                    if (!MRContainer.basic_type(types[i]))
+                        TypeInference.error("Expected a basic type for a relational attribute: "+t);
+                case `t: TypeInference.error("Expected a basic type for a relational attribute: "
+                                             +TypeInference.print_type(t));
+                };
+            return types;
+        case tuple(...al):
+            byte[] types = new byte[al.length()];
+            for ( int i = 0; i < types.length; i++ )
+                match al.nth(i) {
+                case any:
+                    types[i] = -1;
+                case `t:
+                    if (!t.is_variable())
+                        fail;
+                    types[i] = MRContainer.type_code(t.toString());
+                    if (!MRContainer.basic_type(types[i]))
+                        TypeInference.error("Expected a basic type for a relational attribute: "+t);
+                case `t: TypeInference.error("Expected a basic type for a relational attribute: "
+                                             +TypeInference.print_type(t));
+                };
+            return types;
+        };
+        TypeInference.error("Expected a relational record or a tuple type: "
+                            +TypeInference.print_type(tp));
+        return null;
     }
 
     static Tree relational_record_type ( Tree tp ) {
-	match tp {
-	case record(...al):
-	    Trees ts = #[];
-	    for ( Tree a: al )
-		match a {
-		case bind(_,any): ;
-		case `t: ts = ts.append(t);
-		};
-	    return #<record(...ts)>;
-	case tuple(...al):
-	    Trees ts = #[];
-	    for ( Tree a: al )
-		if (!a.equals(#<any>))
-		    ts = ts.append(a);
-	    return #<tuple(...ts)>;
-	};
-	TypeInference.error("Expected a relational record type: "
-			    +TypeInference.print_type(tp));
-	return null;
+        match tp {
+        case record(...al):
+            Trees ts = #[];
+            for ( Tree a: al )
+                match a {
+                case bind(_,any): ;
+                case `t: ts = ts.append(t);
+                };
+            return #<record(...ts)>;
+        case tuple(...al):
+            Trees ts = #[];
+            for ( Tree a: al )
+                if (!a.equals(#<any>))
+                    ts = ts.append(a);
+            return #<tuple(...ts)>;
+        };
+        TypeInference.error("Expected a relational record type: "
+                            +TypeInference.print_type(tp));
+        return null;
     }
 
     public Tree type () {
-	return relational_record_type(type);
+        return relational_record_type(type);
     }
 
     public void initialize ( Trees args ) {
-	if (Config.hadoop_mode && Plan.conf == null)
-	    Plan.conf = Evaluator.new_configuration();
-	if (args.length() != 2)
-	    throw new Error("The line parser must have two arguments: "+args);
-	if (!(args.nth(0) instanceof StringLeaf))
-	    throw new Error("Expected a delimiter: "+args.nth(0));
-	delimiter = ((StringLeaf)args.nth(0)).value();
-	if (delimiter.length() == 0)
-	    throw new Error("Expected a delimiter with at least one character: "+delimiter);
-	type = ((Node)args.nth(1)).children().nth(0);
-	types = relational_record(type);
-	type_length = 0;
-	for ( int i = 0; i < types.length; i++ )
-	    if (types[i] >= 0)
-		type_length++;
-	if (type_length < 1)
-	    TypeInference.error("A relational record type must have at least one component: "
-				+TypeInference.print_type(type));
+        if (Config.hadoop_mode && Plan.conf == null)
+            Plan.conf = Evaluator.new_configuration();
+        if (args.length() != 2)
+            throw new Error("The line parser must have two arguments: "+args);
+        if (!(args.nth(0) instanceof StringLeaf))
+            throw new Error("Expected a delimiter: "+args.nth(0));
+        delimiter = ((StringLeaf)args.nth(0)).value();
+        if (delimiter.length() == 0)
+            throw new Error("Expected a delimiter with at least one character: "+delimiter);
+        type = ((Node)args.nth(1)).children().nth(0);
+        types = relational_record(type);
+        type_length = 0;
+        for ( int i = 0; i < types.length; i++ )
+            if (types[i] >= 0)
+                type_length++;
+        if (type_length < 1)
+            TypeInference.error("A relational record type must have at least one component: "
+                                +TypeInference.print_type(type));
     }
 
     public void open ( String file ) {
-	in_memory = true;
-	try {
-	    buffered_in = new BufferedReader(new InputStreamReader(new FileInputStream(file)),
-					     10000);
-	} catch ( Exception e ) {
-	    throw new Error("Cannot open the file: "+file);
-	}
+        in_memory = true;
+        try {
+            buffered_in = new BufferedReader(new InputStreamReader(new FileInputStream(file)),
+                                             10000);
+        } catch ( Exception e ) {
+            throw new Error("Cannot open the file: "+file);
+        }
     }
 
     public void open ( FSDataInputStream fsin, long fstart, long fend ) {
-	in_memory = false;
-	this.fsin = fsin;
-	start = fstart;
-	end = fend;
-	line = new Text();
-	try {
-	    if (start != 0) {  // for all but the first data split, skip the first record
-		--start;
-		fsin.seek(start);
-		in = new LineReader(fsin,Plan.conf);
-		start += in.readLine(new Text(),0,(int) Math.min(Integer.MAX_VALUE,end-start));
-	    } else in = new LineReader(fsin,Plan.conf);
-	    pos = start;
-	} catch ( IOException e ) {
-	    System.err.println("*** Cannot parse the data split: "+fsin);
-	    this.start = end;
-	}
+        in_memory = false;
+        this.fsin = fsin;
+        start = fstart;
+        end = fend;
+        line = new Text();
+        try {
+            if (start != 0) {  // for all but the first data split, skip the first record
+                --start;
+                fsin.seek(start);
+                in = new LineReader(fsin,Plan.conf);
+                start += in.readLine(new Text(),0,(int) Math.min(Integer.MAX_VALUE,end-start));
+            } else in = new LineReader(fsin,Plan.conf);
+            pos = start;
+        } catch ( IOException e ) {
+            System.err.println("*** Cannot parse the data split: "+fsin);
+            this.start = end;
+        }
     }
 
     public String slice () {
-	try {
-	    if (in_memory)
-		return buffered_in.readLine();
-	    while (pos < end) {
-		int newSize = in.readLine(line,maxLineLength,
-					  Math.max((int)Math.min(Integer.MAX_VALUE,end-pos),
-						   maxLineLength));
-		if (newSize == 0)
-		    return null;
-		pos += newSize;
-		if (newSize < maxLineLength)
-		    return line.toString();
-	    };
-	    return null;
-	} catch ( Exception e ) {
-	    System.err.println("*** Cannot slice the text: "+e);
-	    return "";
-	}
+        try {
+            if (in_memory)
+                return buffered_in.readLine();
+            while (pos < end) {
+                int newSize = in.readLine(line,maxLineLength,
+                                          Math.max((int)Math.min(Integer.MAX_VALUE,end-pos),
+                                                   maxLineLength));
+                if (newSize == 0)
+                    return null;
+                pos += newSize;
+                if (newSize < maxLineLength)
+                    return line.toString();
+            };
+            return null;
+        } catch ( Exception e ) {
+            System.err.println("*** Cannot slice the text: "+e);
+            return "";
+        }
     }
 
     private static MRData parse_value ( String text, byte type ) {
-	switch (type) {
-	case MRContainer.BYTE: return new MR_byte(Byte.parseByte(text));
-	case MRContainer.SHORT: return new MR_short(Short.parseShort(text));
-	case MRContainer.INT: return new MR_int(Integer.parseInt(text));
-	case MRContainer.LONG: return new MR_long(Long.parseLong(text));
-	case MRContainer.FLOAT: return new MR_float(Float.parseFloat(text));
-	case MRContainer.DOUBLE: return new MR_double(Double.parseDouble(text));
-	case MRContainer.CHAR: return new MR_char(text.charAt(0));
-	case MRContainer.STRING: return new MR_string(text);
-	};
-	System.err.println("*** Cannot parse the type "+MRContainer.type_names[type]+" in '"+text+"'");
-	return null;
+        switch (type) {
+        case MRContainer.BYTE: return new MR_byte(Byte.parseByte(text));
+        case MRContainer.SHORT: return new MR_short(Short.parseShort(text));
+        case MRContainer.INT: return new MR_int(Integer.parseInt(text));
+        case MRContainer.LONG: return new MR_long(Long.parseLong(text));
+        case MRContainer.FLOAT: return new MR_float(Float.parseFloat(text));
+        case MRContainer.DOUBLE: return new MR_double(Double.parseDouble(text));
+        case MRContainer.CHAR: return new MR_char(text.charAt(0));
+        case MRContainer.STRING: return new MR_string(text);
+        };
+        System.err.println("*** Cannot parse the type "+MRContainer.type_names[type]+" in '"+text+"'");
+        return null;
     }
 
     public Bag parse ( String line ) {
         try {
-	    if (line == null)
-		return new Bag();
+            if (line == null)
+                return new Bag();
             Tuple t = new Tuple(type_length);
-	    int loc = 0;
-	    int j = 0;
+            int loc = 0;
+            int j = 0;
             for ( int i = 0; i < types.length; i++ ) {
-		int k = line.indexOf(delimiter,loc);
-		if (types[i] >= 0) {
-		    String s = (k > 0) ? line.substring(loc,k) : line.substring(loc);
-		    MRData v = parse_value(s,types[i]);
-		    if (v == null)
-			return new Bag();
-		    t.set(j++,v);
-		};
-		loc = k+delimiter.length();
-		if (k < 0 && i+1 < types.length) {
-		    System.err.println("*** Incomplete parsed text line: "+line);
-		    return new Bag();
-		}
-	    };
-	    return new Bag(t);
+                int k = line.indexOf(delimiter,loc);
+                if (types[i] >= 0) {
+                    String s = (k > 0) ? line.substring(loc,k) : line.substring(loc);
+                    MRData v = parse_value(s,types[i]);
+                    if (v == null)
+                        return new Bag();
+                    t.set(j++,v);
+                };
+                loc = k+delimiter.length();
+                if (k < 0 && i+1 < types.length) {
+                    System.err.println("*** Incomplete parsed text line: "+line);
+                    return new Bag();
+                }
+            };
+            return new Bag(t);
         } catch ( Exception e ) {
             System.err.println("*** Cannot parse the text line: "+line);
             return new Bag();