You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mrql.apache.org by fe...@apache.org on 2013/09/06 22:57:20 UTC

[02/18] MRQL-16: correct source files. ASF licenses, and POMs for release

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/3de9e485/src/main/java/gen/Trees.java
----------------------------------------------------------------------
diff --git a/src/main/java/gen/Trees.java b/src/main/java/gen/Trees.java
index 79b70c3..86f65ca 100644
--- a/src/main/java/gen/Trees.java
+++ b/src/main/java/gen/Trees.java
@@ -29,9 +29,9 @@ final class TreeIterator implements Iterator<Tree> {
     public boolean hasNext () { return trees.tail != null; }
 
     public Tree next () {
-	Tree res = trees.head;
-	trees = trees.tail;
-	return res;
+        Tree res = trees.head;
+        trees = trees.tail;
+        return res;
     }
 
     public void remove () { trees = trees.tail; }
@@ -44,151 +44,151 @@ final public class Trees implements Iterable<Tree>, Serializable {
     public Trees tail;
 
     public Trees ( Tree head, Trees tail ) {
-	if (tail == null)
-	    throw new Error("Gen: an empty list of nodes must be nil, not null");
-	this.head = head;
-	this.tail = tail;
+        if (tail == null)
+            throw new Error("Gen: an empty list of nodes must be nil, not null");
+        this.head = head;
+        this.tail = tail;
     }
 
     public Trees () {
-	head = null;
-	tail = null;
+        head = null;
+        tail = null;
     }
 
     public final static Trees nil = new Trees();
 
     public Trees ( Tree head ) {
-	this.head = head;
-	tail = nil;
+        this.head = head;
+        tail = nil;
     }
 
     public Tree head () {
-	if (tail == null)
-	   throw new Error("Gen: tried to retrieve the head of an empty list of nodes");
-	return head;
+        if (tail == null)
+           throw new Error("Gen: tried to retrieve the head of an empty list of nodes");
+        return head;
     }
 
     public Trees tail () {
-	if (tail == null)
-	   throw new Error("Gen: tried to retrieve the tail of an empty list of nodes");
-	return tail;
+        if (tail == null)
+           throw new Error("Gen: tried to retrieve the tail of an empty list of nodes");
+        return tail;
     }
 
     public boolean is_empty () {
-	return (tail == null);
+        return (tail == null);
     }
 
     /* number of nodes */
     public int length () {
-	int n = 0;
-	for (Trees r = this; !r.is_empty(); r = r.tail)
-	    n += 1;
-	return n;
+        int n = 0;
+        for (Trees r = this; !r.is_empty(); r = r.tail)
+            n += 1;
+        return n;
     }
 
     /* put an Tree e at the beginning of the nodes */
     public Trees cons ( Tree e ) {
-	return new Trees(e,this);
+        return new Trees(e,this);
     }
 
     /* put an Tree e at the end of the nodes */
     public Trees append ( Tree e ) {
-	if (is_empty())
-	    return new Trees(e);
-	else {
-	    Trees temp = new Trees(e,new Trees(e));
-	    Trees res = temp;
-	    for (Trees r = this; !r.is_empty(); r = r.tail) {
-		temp.tail = temp.tail.cons(r.head);
-		temp = temp.tail;
-	    };
-	    return res.tail;
-	}
+        if (is_empty())
+            return new Trees(e);
+        else {
+            Trees temp = new Trees(e,new Trees(e));
+            Trees res = temp;
+            for (Trees r = this; !r.is_empty(); r = r.tail) {
+                temp.tail = temp.tail.cons(r.head);
+                temp = temp.tail;
+            };
+            return res.tail;
+        }
     }
 
     /* append two lists of nodes */
     public Trees append ( Trees s ) {
-	if (is_empty())
-	   return s;
-	else if (s.is_empty())
-	   return this;
-	else {
-	   Trees temp = s.cons(s.head);
-	   Trees res = temp;
-	   for (Trees r = this; !r.is_empty(); r = r.tail)
-	   {   temp.tail = temp.tail.cons(r.head);
-	       temp = temp.tail;
-	   }
-	   return res.tail;
-	}
+        if (is_empty())
+           return s;
+        else if (s.is_empty())
+           return this;
+        else {
+           Trees temp = s.cons(s.head);
+           Trees res = temp;
+           for (Trees r = this; !r.is_empty(); r = r.tail)
+           {   temp.tail = temp.tail.cons(r.head);
+               temp = temp.tail;
+           }
+           return res.tail;
+        }
     }
 
     /* reverse the order of nodes */
     public Trees reverse () {
-	Trees res = nil;
-	for (Trees r = this; !r.is_empty(); r = r.tail)
-	    res = res.cons(r.head);
-	return res;
+        Trees res = nil;
+        for (Trees r = this; !r.is_empty(); r = r.tail)
+            res = res.cons(r.head);
+        return res;
     }
 
     /* is e one of the nodes? */
     public boolean member ( Tree e ) {
-	for (Trees r = this; !r.is_empty(); r = r.tail)
-	    if (r.head.equals(e))
-		return true;
-	return false;
+        for (Trees r = this; !r.is_empty(); r = r.tail)
+            if (r.head.equals(e))
+                return true;
+        return false;
     }
 
     /* return the nth node */
     public Tree nth ( int n ) {
-	Trees r = this;
-	for (int i = 0; !r.is_empty() && i < n; r = r.tail(), i++)
-	    ;
-	if (r.is_empty())
-	    throw new Error("Gen: tried to retrieve a nonexistent nth element from a list of nodes");
-	else return r.head;
+        Trees r = this;
+        for (int i = 0; !r.is_empty() && i < n; r = r.tail(), i++)
+            ;
+        if (r.is_empty())
+            throw new Error("Gen: tried to retrieve a nonexistent nth element from a list of nodes");
+        else return r.head;
     }
 
     /* deep equality */
     public boolean equals ( Trees s ) {
-	Trees n = this;
-	Trees m = s;
-	for(; n.tail != null && m.tail != null; n = n.tail, m = m.tail )
-	    if (!n.head.equals(m.head))
-		return false;
-	return (m.tail == null) && (n.tail == null);
+        Trees n = this;
+        Trees m = s;
+        for(; n.tail != null && m.tail != null; n = n.tail, m = m.tail )
+            if (!n.head.equals(m.head))
+                return false;
+        return (m.tail == null) && (n.tail == null);
     }
 
     protected int size () {
-	int n = 1;
-	for (Trees r = this; !r.is_empty(); r = r.tail)
-	    n += r.head.size()+1;
-	 return n;
+        int n = 1;
+        for (Trees r = this; !r.is_empty(); r = r.tail)
+            n += r.head.size()+1;
+         return n;
     }
 
     public Iterator<Tree> iterator () { return new TreeIterator(this); }
 
     /* print the nodes */
     public String toString () {
-	if (is_empty())
-	    return "()";
-	String s = "(" + head;
-	for (Trees r = tail; !r.is_empty(); r = r.tail)
-	    s = s + "," + r.head;
-	return s + ")";
+        if (is_empty())
+            return "()";
+        String s = "(" + head;
+        for (Trees r = tail; !r.is_empty(); r = r.tail)
+            s = s + "," + r.head;
+        return s + ")";
     }
 
     /* pretty-print the nodes */
     public String pretty ( int position ) {
-	if (is_empty() || (position+size() <= screen_size))
-	    return toString();
-	String s = "(" + head.pretty(position+1);
-	for (Trees r=tail; !r.is_empty(); r=r.tail) {
-	    s = s + ",\n";
-	    for (int i=0; i<position+1; i++)
-		s = s + " ";
-	    s = s + r.head.pretty(position+1);
-	};
-	return s + ")";
+        if (is_empty() || (position+size() <= screen_size))
+            return toString();
+        String s = "(" + head.pretty(position+1);
+        for (Trees r=tail; !r.is_empty(); r=r.tail) {
+            s = s + ",\n";
+            for (int i=0; i<position+1; i++)
+                s = s + " ";
+            s = s + r.head.pretty(position+1);
+        };
+        return s + ")";
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/3de9e485/src/main/java/gen/VariableLeaf.java
----------------------------------------------------------------------
diff --git a/src/main/java/gen/VariableLeaf.java b/src/main/java/gen/VariableLeaf.java
index 624e8bc..c4c0ed9 100644
--- a/src/main/java/gen/VariableLeaf.java
+++ b/src/main/java/gen/VariableLeaf.java
@@ -24,15 +24,15 @@ final public class VariableLeaf extends Tree {
     public String value;
 
     public VariableLeaf ( String s ) {
-	super();
-	value = Tree.add(s);
+        super();
+        value = Tree.add(s);
     }
 
     final public String value () { return value; }
 
     public boolean equals ( Tree e ) {
-	return (e instanceof VariableLeaf)
-	    && value == ((VariableLeaf) e).value;
+        return (e instanceof VariableLeaf)
+            && value == ((VariableLeaf) e).value;
     }
 
     protected int size () { return value.length(); }
@@ -42,11 +42,11 @@ final public class VariableLeaf extends Tree {
     public String pretty ( int position ) { return value; }
 
     private void writeObject(ObjectOutputStream out) throws IOException {
-	out.defaultWriteObject();
+        out.defaultWriteObject();
     }
 
     private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
-	in.defaultReadObject();
-	value = Tree.add(value);
+        in.defaultReadObject();
+        value = Tree.add(value);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/3de9e485/src/main/java/spark/BinaryInputFormat.java
----------------------------------------------------------------------
diff --git a/src/main/java/spark/BinaryInputFormat.java b/src/main/java/spark/BinaryInputFormat.java
index 337e045..ad364bb 100644
--- a/src/main/java/spark/BinaryInputFormat.java
+++ b/src/main/java/spark/BinaryInputFormat.java
@@ -26,28 +26,28 @@ import org.apache.hadoop.mapred.*;
 /** Input format for hadoop sequence files */
 final public class BinaryInputFormat extends MRQLFileInputFormat {
     public static class BinaryInputRecordReader extends SequenceFileRecordReader<MRContainer,MRContainer> {
-	final MRContainer result = new MRContainer();
+        final MRContainer result = new MRContainer();
 
-	public BinaryInputRecordReader ( FileSplit split,
-					 JobConf job ) throws IOException {
-	    super(job,split);
-	}
+        public BinaryInputRecordReader ( FileSplit split,
+                                         JobConf job ) throws IOException {
+            super(job,split);
+        }
 
-	@Override
-	public synchronized boolean next ( MRContainer key, MRContainer value ) throws IOException {
-		boolean b = super.next(key,result);
-		value.set(result.data());
-		return b;
-	}
+        @Override
+        public synchronized boolean next ( MRContainer key, MRContainer value ) throws IOException {
+                boolean b = super.next(key,result);
+                value.set(result.data());
+                return b;
+        }
     }
 
     @Override
     public RecordReader<MRContainer,MRContainer>
-	      getRecordReader ( InputSplit split,
-				JobConf job,
-				Reporter reporter ) throws IOException {
-	String path = ((FileSplit)split).getPath().toString();
-	BinaryDataSource ds = (BinaryDataSource)DataSource.get(path,job);
-	return new BinaryInputRecordReader((FileSplit)split,job);
+              getRecordReader ( InputSplit split,
+                                JobConf job,
+                                Reporter reporter ) throws IOException {
+        String path = ((FileSplit)split).getPath().toString();
+        BinaryDataSource ds = (BinaryDataSource)DataSource.get(path,job);
+        return new BinaryInputRecordReader((FileSplit)split,job);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/3de9e485/src/main/java/spark/Evaluator.gen
----------------------------------------------------------------------
diff --git a/src/main/java/spark/Evaluator.gen b/src/main/java/spark/Evaluator.gen
index be7287a..30905b1 100644
--- a/src/main/java/spark/Evaluator.gen
+++ b/src/main/java/spark/Evaluator.gen
@@ -49,94 +49,94 @@ final public class Evaluator extends Interpreter implements Serializable {
 
     /** initialize the Spark evaluator */
     final static void init ( Configuration conf ) throws IOException {
-	Config.spark_mode = true;
-	if (Config.hadoop_mode && Config.local_mode)
-	    spark_context = new JavaSparkContext("local["+Config.nodes+"]",
-						 "MRQL Local Spark Evaluator");
-	else if (Config.hadoop_mode) {
-	    HashMap<String,String> env = new HashMap<String,String>();
-	    data_source_directory = System.getenv("FS_DEFAULT_NAME")+"/"+Plan.new_path(conf);
-	    env.put("mrql.data.source.directory",data_source_directory);
-	    spark_context = new JavaSparkContext(System.getenv("SPARK_MASTER"),
-						 "MRQL Spark Evaluator",
-						 System.getenv("SPARK_HOME"),
-						 new String[]{mrql_jar},
-						 env);
-	    Plan.conf = spark_context.hadoopConfiguration();
-	    FileSystem.setDefaultUri(Plan.conf,System.getenv("FS_DEFAULT_NAME"));
-	}
+        Config.spark_mode = true;
+        if (Config.hadoop_mode && Config.local_mode)
+            spark_context = new JavaSparkContext("local["+Config.nodes+"]",
+                                                 "MRQL Local Spark Evaluator");
+        else if (Config.hadoop_mode) {
+            HashMap<String,String> env = new HashMap<String,String>();
+            data_source_directory = System.getenv("FS_DEFAULT_NAME")+"/"+Plan.new_path(conf);
+            env.put("mrql.data.source.directory",data_source_directory);
+            spark_context = new JavaSparkContext(System.getenv("SPARK_MASTER"),
+                                                 "MRQL Spark Evaluator",
+                                                 System.getenv("SPARK_HOME"),
+                                                 new String[]{mrql_jar},
+                                                 env);
+            Plan.conf = spark_context.hadoopConfiguration();
+            FileSystem.setDefaultUri(Plan.conf,System.getenv("FS_DEFAULT_NAME"));
+        }
     }
 
     /** shutdown the Spark evaluator */
     final static void shutdown ( Configuration conf ) {
-	spark_context.stop();
-	spark_context = null;
-	System.clearProperty("spark.driver.port");
+        spark_context.stop();
+        spark_context = null;
+        System.clearProperty("spark.driver.port");
     }
 
     final static void initialize_query () {
-	spark_context.clearJars();
-	spark_context.addJar(mrql_jar);
-	Plan.distribute_compiled_arguments(Plan.conf);
-	if (Config.compile_functional_arguments)
-	    spark_context.addJar(Plan.conf.get("mrql.jar.path"));
+        spark_context.clearJars();
+        spark_context.addJar(mrql_jar);
+        Plan.distribute_compiled_arguments(Plan.conf);
+        if (Config.compile_functional_arguments)
+            spark_context.addJar(Plan.conf.get("mrql.jar.path"));
     }
 
     final static Configuration new_configuration () {
-	return new Configuration();
+        return new Configuration();
     }
 
     public static MR_bool synchronize ( MR_string peerName, MR_bool mr_exit ) {
-	throw new Error("You can only synchronize BSP tasks");
+        throw new Error("You can only synchronize BSP tasks");
     }
 
     public static Bag distribute ( MR_string peerName, Bag s ) {
-	throw new Error("You can only distribute bags among BSP tasks");
+        throw new Error("You can only distribute bags among BSP tasks");
     }
 
     final static Bag collect ( final DataSet x ) throws Exception {
-	return Plan.collect(x);
+        return Plan.collect(x);
     }
 
     final static MRData bsp ( Tree plan, Environment env ) throws Exception {
-	throw new Error("You can not run a BSP task in Spark mode");
+        throw new Error("You can not run a BSP task in Spark mode");
     }
 
     /** used by the master to send parsing details (eg, record types) to workers */
     public static void dump_source_dir () throws IOException {
-	if (Config.local_mode)
-	    return;
-	DataSource.dataSourceDirectory.distribute(Plan.conf);
-	Path path = new Path(data_source_directory);
-	FileSystem fs = path.getFileSystem(Plan.conf);
-	PrintStream ps = new PrintStream(fs.create(path,true));
-	ps.println(Plan.conf.get("mrql.data.source.directory"));
-	ps.close();
+        if (Config.local_mode)
+            return;
+        DataSource.dataSourceDirectory.distribute(Plan.conf);
+        Path path = new Path(data_source_directory);
+        FileSystem fs = path.getFileSystem(Plan.conf);
+        PrintStream ps = new PrintStream(fs.create(path,true));
+        ps.println(Plan.conf.get("mrql.data.source.directory"));
+        ps.close();
     }
 
     /** executed by a worker when reading parsed input (see ParsedInputFormat) */
     public static void load_source_dir () throws IOException {
-	if (Config.local_mode)
-	    return;
-	if (Plan.conf == null)
-	    Plan.conf = new_configuration();
-	// the name of the file that contains the source directory details is passed
-	//   to workers through the HashMap environment in the JavaSparkContext
-	Path path = new Path(System.getenv("mrql.data.source.directory"));
-	FileSystem fs = path.getFileSystem(Plan.conf);
-	BufferedReader ftp = new BufferedReader(new InputStreamReader(fs.open(path)));
-	Plan.conf.set("mrql.data.source.directory",ftp.readLine());
-	DataSource.dataSourceDirectory.read(Plan.conf);
-	ftp.close();
+        if (Config.local_mode)
+            return;
+        if (Plan.conf == null)
+            Plan.conf = new_configuration();
+        // the name of the file that contains the source directory details is passed
+        //   to workers through the HashMap environment in the JavaSparkContext
+        Path path = new Path(System.getenv("mrql.data.source.directory"));
+        FileSystem fs = path.getFileSystem(Plan.conf);
+        BufferedReader ftp = new BufferedReader(new InputStreamReader(fs.open(path)));
+        Plan.conf.set("mrql.data.source.directory",ftp.readLine());
+        DataSource.dataSourceDirectory.read(Plan.conf);
+        ftp.close();
     }
 
     private static Function2<MRData,MRData,MRData> accumulator ( Tree acc_fnc, Environment env ) {
-	final org.apache.mrql.Function f = evalF(acc_fnc,env);
-	return new Function2<MRData,MRData,MRData>() {
-	    public MRData call ( MRData x, MRData y ) {
-		return f.eval(new Tuple(x,y));
-	    }
-	};
+        final org.apache.mrql.Function f = evalF(acc_fnc,env);
+        return new Function2<MRData,MRData,MRData>() {
+            public MRData call ( MRData x, MRData y ) {
+                return f.eval(new Tuple(x,y));
+            }
+        };
     }
 
     /** The Aggregate physical operator
@@ -147,69 +147,69 @@ final public class Evaluator extends Interpreter implements Serializable {
      * @return the aggregation result of type T
      */
     final static MRData aggregate ( Tree acc_fnc,
-				    Tree zero,
-				    Tree plan,
-				    Environment env ) throws Exception {
-	Function2<MRData,MRData,MRData> f2 = accumulator(acc_fnc,env);
-	MRData z = evalE(zero,env);
-	match plan {
-	case AggregateMap(`m,`acc,_,`s):
-	    return evalD(#<cMap(`m,`s)>,env)
-		     .aggregate(z,accumulator(acc,env),f2);
-	case MapAggregateReduce(`m,`r,`acc,_,`s,`o):
-	    if (acc.equals(#<null>))
-		fail;
-	    return evalD(#<MapReduce(`m,`r,`s,`o)>,env)
-		     .aggregate(z,accumulator(acc,env),f2);
-	case CrossAggregateProduct(`mx,`my,`r,`acc,_,`x,`y):
-	    if (acc.equals(#<null>))
-		fail;
-	    return evalD(#<CrossProduct(`mx,`my,`r,`x,`y)>,env)
-		     .aggregate(z,accumulator(acc,env),f2);
-	case MapAggregateReduce2(`mx,`my,`r,`acc,_,`x,`y,`o):
-	    if (acc.equals(#<null>))
-		fail;
-	    return evalD(#<MapReduce2(`mx,`my,`r,`x,`y,`o)>,env)
-		     .aggregate(z,accumulator(acc,env),f2);
-	case MapAggregateJoin(`mx,`my,`r,`acc,_,`x,`y):
-	    if (acc.equals(#<null>))
-		fail;
-	    return evalD(#<MapJoin(`mx,`my,`r,`x,`y)>,env)
-		     .aggregate(z,accumulator(acc,env),f2);
-	};
-	throw new Error("Unrecognized aggregation: "+plan);
+                                    Tree zero,
+                                    Tree plan,
+                                    Environment env ) throws Exception {
+        Function2<MRData,MRData,MRData> f2 = accumulator(acc_fnc,env);
+        MRData z = evalE(zero,env);
+        match plan {
+        case AggregateMap(`m,`acc,_,`s):
+            return evalD(#<cMap(`m,`s)>,env)
+                     .aggregate(z,accumulator(acc,env),f2);
+        case MapAggregateReduce(`m,`r,`acc,_,`s,`o):
+            if (acc.equals(#<null>))
+                fail;
+            return evalD(#<MapReduce(`m,`r,`s,`o)>,env)
+                     .aggregate(z,accumulator(acc,env),f2);
+        case CrossAggregateProduct(`mx,`my,`r,`acc,_,`x,`y):
+            if (acc.equals(#<null>))
+                fail;
+            return evalD(#<CrossProduct(`mx,`my,`r,`x,`y)>,env)
+                     .aggregate(z,accumulator(acc,env),f2);
+        case MapAggregateReduce2(`mx,`my,`r,`acc,_,`x,`y,`o):
+            if (acc.equals(#<null>))
+                fail;
+            return evalD(#<MapReduce2(`mx,`my,`r,`x,`y,`o)>,env)
+                     .aggregate(z,accumulator(acc,env),f2);
+        case MapAggregateJoin(`mx,`my,`r,`acc,_,`x,`y):
+            if (acc.equals(#<null>))
+                fail;
+            return evalD(#<MapJoin(`mx,`my,`r,`x,`y)>,env)
+                     .aggregate(z,accumulator(acc,env),f2);
+        };
+        throw new Error("Unrecognized aggregation: "+plan);
     }
 
     /** Evaluate a loop a fixed # of times */
     final static Tuple loop ( Tree e, Environment env ) throws Exception {
-	match e {
-	case Loop(lambda(tuple(...vs),tuple(...bs)),tuple(...ss),`num):
-	    int limit = ((MR_int)evalE(num,env)).get();
-	    MR_rdd[] s = new MR_rdd[vs.length()];
-	    for ( int i = 0; i < vs.length(); i++ )
-		s[i] = new MR_rdd(eval(ss.nth(i),env));
-	    for ( int n = 0; n < limit; n++ ) {
-		Environment nenv = env;
-		for ( int i = 0; i < vs.length(); i ++ )
-		    nenv = new Environment(vs.nth(i).toString(),s[i],nenv);
-		for ( int i = 0; i < vs.length(); i ++ )
-		    s[i] = new MR_rdd(eval(bs.nth(i),nenv));
-	    };
-	    return new Tuple(s);
-	};
-	throw new Error("Wrong Loop format");
+        match e {
+        case Loop(lambda(tuple(...vs),tuple(...bs)),tuple(...ss),`num):
+            int limit = ((MR_int)evalE(num,env)).get();
+            MR_rdd[] s = new MR_rdd[vs.length()];
+            for ( int i = 0; i < vs.length(); i++ )
+                s[i] = new MR_rdd(eval(ss.nth(i),env));
+            for ( int n = 0; n < limit; n++ ) {
+                Environment nenv = env;
+                for ( int i = 0; i < vs.length(); i ++ )
+                    nenv = new Environment(vs.nth(i).toString(),s[i],nenv);
+                for ( int i = 0; i < vs.length(); i ++ )
+                    s[i] = new MR_rdd(eval(bs.nth(i),nenv));
+            };
+            return new Tuple(s);
+        };
+        throw new Error("Wrong Loop format");
     }
 
     private static Bag bag ( final List<MRData> s ) {
-	final Iterator<MRData> i = s.iterator();
-	return new Bag(new BagIterator() {
-		public MRData next () {
-		    return i.next();
-		}
-		public boolean hasNext () {
-		    return i.hasNext();
-		}
-	    });
+        final Iterator<MRData> i = s.iterator();
+        return new Bag(new BagIterator() {
+                public MRData next () {
+                    return i.next();
+                }
+                public boolean hasNext () {
+                    return i.hasNext();
+                }
+            });
     }
 
     final static TaskContext context = new TaskContext(0,0,0,null);
@@ -219,32 +219,32 @@ final public class Evaluator extends Interpreter implements Serializable {
      * @return a lazy bag that contains all RDD elements
      */
     public static Bag bag ( final JavaRDD<MRData> rdd ) {
-	final JavaRDD<MRData> rd = rdd.cache();
-	rd.count();  // force the evaluation of all pending Spark operations
-	final List<Partition> ps = rd.splits();
-	return new Bag(new BagIterator() {
-		Iterator<MRData> i = null;
-		int c = 0;
-		public MRData next () {
-		    return i.next();
-		}
-		public boolean hasNext () {
-		    do {
-			if (i != null && i.hasNext()) 
-			    return true;
-			if (c >= ps.size())
-			    return false;
-			i = rdd.iterator(ps.get(c++),context);
-		    } while (true);
-		}
-	    });
+        final JavaRDD<MRData> rd = rdd.cache();
+        rd.count();  // force the evaluation of all pending Spark operations
+        final List<Partition> ps = rd.splits();
+        return new Bag(new BagIterator() {
+                Iterator<MRData> i = null;
+                int c = 0;
+                public MRData next () {
+                    return i.next();
+                }
+                public boolean hasNext () {
+                    do {
+                        if (i != null && i.hasNext()) 
+                            return true;
+                        if (c >= ps.size())
+                            return false;
+                        i = rdd.iterator(ps.get(c++),context);
+                    } while (true);
+                }
+            });
     }
 
     private final static Function<MRData,MRData> get_first
-	= new Function<MRData,MRData>() {
-	           public MRData call ( MRData value ) {
-		       return ((Tuple)value).first();
-		   };
+        = new Function<MRData,MRData>() {
+                   public MRData call ( MRData value ) {
+                       return ((Tuple)value).first();
+                   };
     };
 
     /** Evaluate an MRQL physical plan using Spark
@@ -254,23 +254,23 @@ final public class Evaluator extends Interpreter implements Serializable {
      * @return a DataSet
      */
     final static DataSet eval ( final Tree e,
-				final Environment env,
-				final String counter ) {
-	JavaRDD<MRData> rd = eval(e,env);
-	int count = 0;
-	if (!counter.equals("-")) {
-	    final Accumulator<Integer> c = spark_context.intAccumulator(0);
-	    rd = rd.cache();
-	    rd.foreach(new VoidFunction<MRData>() {
-		    public void call ( MRData value ) {
-			if (((MR_bool)((Tuple)value).second()).get())
-			    c.add(1);    // count the true results (the results that need another loop step)
-		    }
-		});
-	    count = c.value();
-	    rd = rd.map(get_first);
-	};
-	return new DataSet(new RDDDataSource(rd),count,0);
+                                final Environment env,
+                                final String counter ) {
+        JavaRDD<MRData> rd = eval(e,env);
+        int count = 0;
+        if (!counter.equals("-")) {
+            final Accumulator<Integer> c = spark_context.intAccumulator(0);
+            rd = rd.cache();
+            rd.foreach(new VoidFunction<MRData>() {
+                    public void call ( MRData value ) {
+                        if (((MR_bool)((Tuple)value).second()).get())
+                            c.add(1);    // count the true results (the results that need another loop step)
+                    }
+                });
+            count = c.value();
+            rd = rd.map(get_first);
+        };
+        return new DataSet(new RDDDataSource(rd),count,0);
     }
 
     /** Evaluate an MRQL physical plan using Spark and print tracing info
@@ -279,136 +279,136 @@ final public class Evaluator extends Interpreter implements Serializable {
      * @return a Spark RDD
      */
     final static JavaRDD<MRData> eval ( final Tree e, final Environment env ) {
-	if (Config.trace_execution) {
-	    tab_count += 3;
-	    System.out.println(tabs(tab_count)+print_query(e));
-	};
-	final JavaRDD<MRData> res = evalD(e,env);
-	if (Config.trace_execution) 
-	    try {
-		System.out.println(tabs(tab_count)+"-> "+res.collect());
-		tab_count -= 3;
-	    } catch (Exception ex) {
-		throw new Error("Cannot collect the operator output: "+e);
-	    };
-	return res;
+        if (Config.trace_execution) {
+            tab_count += 3;
+            System.out.println(tabs(tab_count)+print_query(e));
+        };
+        final JavaRDD<MRData> res = evalD(e,env);
+        if (Config.trace_execution) 
+            try {
+                System.out.println(tabs(tab_count)+"-> "+res.collect());
+                tab_count -= 3;
+            } catch (Exception ex) {
+                throw new Error("Cannot collect the operator output: "+e);
+            };
+        return res;
     }
 
     /* convert an MRQL lambda to a Spark Function */
     private static FlatMapFunction<MRData,MRData> cmap_fnc ( Tree fnc, Environment env ) {
-	final org.apache.mrql.Function f = evalF(fnc,env);
-	return new FlatMapFunction<MRData,MRData>() {
-	    public Iterable<MRData> call ( MRData value ) {
-		return (Bag)f.eval(value);
-	    }
-	};
+        final org.apache.mrql.Function f = evalF(fnc,env);
+        return new FlatMapFunction<MRData,MRData>() {
+            public Iterable<MRData> call ( MRData value ) {
+                return (Bag)f.eval(value);
+            }
+        };
     }
 
     /* group-by s and then reduce by fnc; if o is true, sort after group-by */
     private static JavaRDD<MRData> groupBy ( JavaRDD<MRData> s, Tree fnc, Environment env, Tree o ) {
-	match o {
-	case true:   // the result must be sorted
-	    return s.groupBy(get_first)
-		.sortByKey()
-		.map(new Function<Tuple2<MRData,List<MRData>>,MRData>() {
-			public MRData call ( Tuple2<MRData,List<MRData>> value ) {
-			    return new Tuple(value._1,bag(value._2));
-			}
-		    })
-		.flatMap(cmap_fnc(fnc,env)).map(new Function<MRData,MRData>() {
-			public MRData call ( MRData value ) {
-			    return ((Tuple)value).second();
-			}
-		    });
-	};
-	return s.groupBy(get_first)
-	    .map(new Function<Tuple2<MRData,List<MRData>>,MRData> () {
-		    public MRData call ( Tuple2<MRData,List<MRData>> value ) {
-			final Iterator<MRData> i = value._2.iterator();
-			return new Tuple(value._1,new Bag(new BagIterator() {
-				public MRData next () {
-				    return ((Tuple)i.next()).second();
-				}
-				public boolean hasNext () {
-				    return i.hasNext();
-				}
-			    }));
-		    }
-		})
-	    .flatMap(cmap_fnc(fnc,env));
+        match o {
+        case true:   // the result must be sorted
+            return s.groupBy(get_first)
+                .sortByKey()
+                .map(new Function<Tuple2<MRData,List<MRData>>,MRData>() {
+                        public MRData call ( Tuple2<MRData,List<MRData>> value ) {
+                            return new Tuple(value._1,bag(value._2));
+                        }
+                    })
+                .flatMap(cmap_fnc(fnc,env)).map(new Function<MRData,MRData>() {
+                        public MRData call ( MRData value ) {
+                            return ((Tuple)value).second();
+                        }
+                    });
+        };
+        return s.groupBy(get_first)
+            .map(new Function<Tuple2<MRData,List<MRData>>,MRData> () {
+                    public MRData call ( Tuple2<MRData,List<MRData>> value ) {
+                        final Iterator<MRData> i = value._2.iterator();
+                        return new Tuple(value._1,new Bag(new BagIterator() {
+                                public MRData next () {
+                                    return ((Tuple)i.next()).second();
+                                }
+                                public boolean hasNext () {
+                                    return i.hasNext();
+                                }
+                            }));
+                    }
+                })
+            .flatMap(cmap_fnc(fnc,env));
     }
 
     private static JavaRDD<MRData> containerData ( JavaPairRDD<MRContainer,MRContainer> rd ) {
-	final Environment env = Interpreter.global_env;
-	return rd.map(new Function<Tuple2<MRContainer,MRContainer>,MRData>() {
-		// need to pass the global bindings (the in-memory repeat vars) to workers
-		boolean first = true;
-		public MRData call ( Tuple2<MRContainer,MRContainer> value ) {
-		    if (first) {
-			// need to pass the global bindings (the in-memory repeat vars) to workers
-			Interpreter.set_global_bindings(env);
-			first = false;
-		    };
-		    return value._2.data();
-		}
-	    });
+        final Environment env = Interpreter.global_env;
+        return rd.map(new Function<Tuple2<MRContainer,MRContainer>,MRData>() {
+                // need to pass the global bindings (the in-memory repeat vars) to workers
+                boolean first = true;
+                public MRData call ( Tuple2<MRContainer,MRContainer> value ) {
+                    if (first) {
+                        // need to pass the global bindings (the in-memory repeat vars) to workers
+                        Interpreter.set_global_bindings(env);
+                        first = false;
+                    };
+                    return value._2.data();
+                }
+            });
     }
 
     private static Iterable<Tuple2<MRData,MRData>> joinIterator ( final Iterator<MRData> i ) {
-	return new Iterable<Tuple2<MRData,MRData>>() {
-	    public Iterator<Tuple2<MRData,MRData>> iterator() {
-		return new Iterator<Tuple2<MRData,MRData>> () {
-		    public Tuple2<MRData,MRData> next () {
-			Tuple data = (Tuple)i.next();
-			return new Tuple2<MRData,MRData>(data.first(),data.second());
-		    }
-		    public boolean hasNext () {
-			return i.hasNext();
-		    }
-		    public void remove () {}
-		};
-	    }
-	};
+        return new Iterable<Tuple2<MRData,MRData>>() {
+            public Iterator<Tuple2<MRData,MRData>> iterator() {
+                return new Iterator<Tuple2<MRData,MRData>> () {
+                    public Tuple2<MRData,MRData> next () {
+                        Tuple data = (Tuple)i.next();
+                        return new Tuple2<MRData,MRData>(data.first(),data.second());
+                    }
+                    public boolean hasNext () {
+                        return i.hasNext();
+                    }
+                    public void remove () {}
+                };
+            }
+        };
     }
 
     private static FlatMapFunction<Iterator<MRData>,MRData> combiner_fnc ( final org.apache.mrql.Function f ) {
-	return new FlatMapFunction<Iterator<MRData>,MRData>() {
-	          public Iterable<MRData> call ( final Iterator<MRData> i ) {
-		      return MapReduceAlgebra.cmap(new org.apache.mrql.Function() {
-			      public MRData eval ( MRData x ) {
-				  final MRData key = ((Tuple)x).first();
-				  final Iterator<MRData> it = ((Bag)f.eval(x)).iterator();
-				  return new Bag(new BagIterator() {
-					  public MRData next () {
-					      return new Tuple(key,it.next());
-					  }
-					  public boolean hasNext () {
-					      return it.hasNext();
-					  }
-				      });
-			      }},
-			  MapReduceAlgebra.groupBy(new Bag(new BagIterator() {
-				  public MRData next () {
-				      return i.next();
-				  }
-				  public boolean hasNext () {
-				      return i.hasNext();
-				  }
-			      })));
-		  }
-	};
+        return new FlatMapFunction<Iterator<MRData>,MRData>() {
+                  public Iterable<MRData> call ( final Iterator<MRData> i ) {
+                      return MapReduceAlgebra.cmap(new org.apache.mrql.Function() {
+                              public MRData eval ( MRData x ) {
+                                  final MRData key = ((Tuple)x).first();
+                                  final Iterator<MRData> it = ((Bag)f.eval(x)).iterator();
+                                  return new Bag(new BagIterator() {
+                                          public MRData next () {
+                                              return new Tuple(key,it.next());
+                                          }
+                                          public boolean hasNext () {
+                                              return it.hasNext();
+                                          }
+                                      });
+                              }},
+                          MapReduceAlgebra.groupBy(new Bag(new BagIterator() {
+                                  public MRData next () {
+                                      return i.next();
+                                  }
+                                  public boolean hasNext () {
+                                      return i.hasNext();
+                                  }
+                              })));
+                  }
+        };
     }
 
     private static Hashtable<MRData,Bag> make_built_table ( List<Tuple2<MRData,MRData>> values ) {
-	Hashtable<MRData,Bag> built_table = new Hashtable<MRData,Bag>(Config.map_cache_size);
-	for ( Tuple2<MRData,MRData> t: values ) {
-	    Bag entries = built_table.get(t._1);
-	    built_table.put(t._1,
-			    (entries == null)
-			    ? (new Bag(t._2))
-			    : entries.add_element(t._2));
-	};
-	return built_table;
+        Hashtable<MRData,Bag> built_table = new Hashtable<MRData,Bag>(Config.map_cache_size);
+        for ( Tuple2<MRData,MRData> t: values ) {
+            Bag entries = built_table.get(t._1);
+            built_table.put(t._1,
+                            (entries == null)
+                            ? (new Bag(t._2))
+                            : entries.add_element(t._2));
+        };
+        return built_table;
     }
 
     /** Evaluate MRQL physical operators using Spark
@@ -417,272 +417,272 @@ final public class Evaluator extends Interpreter implements Serializable {
      * @return a Spark RDD
      */
     final static JavaRDD<MRData> evalD ( final Tree e, final Environment env ) {
-	try {
-	    match e {
-	    case MapAggregateReduce(`m,`r,null,_,`s,`o):
-		return evalD(#<MapReduce(`m,`r,`s,`o)>,env);
-	    case CrossAggregateProduct(`mx,`my,`r,null,_,`x,`y):
-		return evalD(#<CrossProduct(`mx,`my,`r,`x,`y)>,env);
-	    case MapAggregateReduce2(`mx,`my,`r,null,_,`x,`y,`o):
-		return evalD(#<MapReduce2(`mx,`my,`r,`x,`y,`o)>,env);
-	    case MapAggregateJoin(`mx,`my,`r,null,_,`x,`y):
-		return evalD(#<MapJoin(`mx,`my,`r,`x,`y)>,env);
-	    case cMap(`f,`s):
-		return eval(s,env).flatMap(cmap_fnc(f,env));
-	    case MapReduce(`m,`r,`s,`o):
-		return groupBy(eval(s,env).flatMap(cmap_fnc(m,env)),r,env,o);
-	    case MapCombineReduce(`m,`c,`r,`s,`o):
-		return groupBy(eval(s,env).flatMap(cmap_fnc(m,env))
-			       .mapPartitions(combiner_fnc(evalF(c,env))),r,env,o);
-	    case MapCombineReduce2(`mx,`my,`c,`r,`x,`y,`o):
-		return evalD(#<MapReduce2(`mx,`my,`r,`x,`y,`o)>,env).mapPartitions(combiner_fnc(evalF(c,env)));
-	    case CrossProduct(`mx,`my,`r,`x,`y):
-		final org.apache.mrql.Function fr = evalF(r,env);
-		return eval(x,env)
-		    .flatMap(cmap_fnc(mx,env))
-		    .cartesian(eval(y,env).flatMap(cmap_fnc(my,env)))
-		    .flatMap(new FlatMapFunction<Tuple2<MRData,MRData>,MRData>() {
-			    public Iterable<MRData> call ( Tuple2<MRData,MRData> value ) {
-				return (Bag)fr.eval(new Tuple(value._1,value._2));
-			    }
-			});
-	    case MapReduce2(`mx,`my,`r,`x,`y,`o):
-		final org.apache.mrql.Function fx = evalF(mx,env);
-		final org.apache.mrql.Function fy = evalF(my,env);
-		final org.apache.mrql.Function fr = evalF(r,env);
-		JavaPairRDD<MRData,MRData> xs
-		    = eval(x,env).flatMap(new PairFlatMapFunction<MRData,MRData,MRData>() {
-			    public Iterable<Tuple2<MRData,MRData>> call ( MRData value ) {
-				return joinIterator(((Bag)fx.eval(value)).iterator());
-			    }
-			});
-		JavaPairRDD<MRData,MRData> ys
-		    = eval(y,env).flatMap(new PairFlatMapFunction<MRData,MRData,MRData>() {
-			    public Iterable<Tuple2<MRData,MRData>> call ( MRData value ) {
-				return joinIterator(((Bag)fy.eval(value)).iterator());
-			    }
-			});
-		return xs.cogroup(ys)
-		    .flatMap(new FlatMapFunction<Tuple2<MRData,Tuple2<List<MRData>,List<MRData>>>,MRData>() {
-			    public Iterable<MRData> call ( Tuple2<MRData,Tuple2<List<MRData>,List<MRData>>> value ) {
-				return (Bag)fr.eval(new Tuple(bag(value._2._1),bag(value._2._2)));
-			    }
-			});
-	    case GroupByJoin(`kx,`ky,`gx,`gy,`mp,`c,`r,`x,`y,`o):
-		final int n = 10;
-		final int m = 10;
-		final org.apache.mrql.Function fkx = evalF(kx,env);
-		final org.apache.mrql.Function fky = evalF(ky,env);
-		final org.apache.mrql.Function fgx = evalF(gx,env);
-		final org.apache.mrql.Function fgy = evalF(gy,env);
-		final org.apache.mrql.Function fm = evalF(mp,env);
-		final org.apache.mrql.Function fc = evalF(c,env);
-		final org.apache.mrql.Function fr = evalF(r,env);
-		JavaPairRDD<MRData,MRData> xs
-		    = eval(x,env).flatMap(new PairFlatMapFunction<MRData,MRData,MRData>() {
-			    public Iterable<Tuple2<MRData,MRData>> call ( final MRData value ) {
-				return new Iterable<Tuple2<MRData,MRData>>() {
-				    public Iterator<Tuple2<MRData,MRData>> iterator() {
-					return new Iterator<Tuple2<MRData,MRData>>() {
-					    int i = 0;
-					    public Tuple2<MRData,MRData> next () {
-						MRData key = new MR_int((fgx.eval(value).hashCode() % m)+m*i);
-						i++;
-						return new Tuple2<MRData,MRData>(key,new Tuple(fkx.eval(value),value));
-					    }
-					    public boolean hasNext () {
-						return i < n;
-					    }
-					    public void remove () {}
-					};
-				    }
-				};
-			    }
-			});
-		JavaPairRDD<MRData,MRData> ys
-		    = eval(y,env).flatMap(new PairFlatMapFunction<MRData,MRData,MRData>() {
-			    public Iterable<Tuple2<MRData,MRData>> call ( final MRData value ) {
-				return new Iterable<Tuple2<MRData,MRData>>() {
-				    public Iterator<Tuple2<MRData,MRData>> iterator() {
-					return new Iterator<Tuple2<MRData,MRData>>() {
-					    int i = 0;
-					    public Tuple2<MRData,MRData> next () {
-						MRData key = new MR_int((fgy.eval(value).hashCode() % n)*m+i);
-						i++;
-						return new Tuple2<MRData,MRData>(key,new Tuple(fky.eval(value),value));
-					    }
-					    public boolean hasNext () {
-						return i < m;
-					    }
-					    public void remove () {}
-					};
-				    }
-				};
-			    }
-			});
-		return xs.cogroup(ys)
-		    .flatMap(new FlatMapFunction<Tuple2<MRData,Tuple2<List<MRData>,List<MRData>>>,MRData>() {
-			    public Iterable<MRData> call ( Tuple2<MRData,Tuple2<List<MRData>,List<MRData>>> value ) {
-				final Iterator<MRData> i = (MapReduceAlgebra.mergeGroupByJoin(fkx,fky,fgx,fgy,fm,fc,fr,
-									bag(value._2._1),bag(value._2._2))).iterator();
-				return new Iterable<MRData>() {
-				    public Iterator<MRData> iterator() {
-					return new Iterator<MRData>() {
-					    public MRData next () {
-						return i.next();
-					    }
-					    public boolean hasNext () {
-						return i.hasNext();
-					    }
-					    public void remove () {}
-					};
-				    }
-				};
-			    }
-			});
-	    case MapJoin(`mx,`my,`r,`x,`y):
-		final org.apache.mrql.Function fx = evalF(mx,env);
-		final org.apache.mrql.Function fy = evalF(my,env);
-		final org.apache.mrql.Function fr = evalF(r,env);
-		final Broadcast<List<Tuple2<MRData,MRData>>> ys
-		    = spark_context.broadcast(eval(y,env).flatMap(new PairFlatMapFunction<MRData,MRData,MRData>() {
-				public Iterable<Tuple2<MRData,MRData>> call ( MRData value ) {
-				    return joinIterator(((Bag)fy.eval(value)).iterator());
-				}
-			    }).collect());
-		return eval(x,env).flatMap(new FlatMapFunction<MRData,MRData>() {
-			final Hashtable<MRData,Bag> built_table = make_built_table(ys.value());
-			public Iterable<MRData> call ( MRData value ) {
-			    final Iterator<MRData> i = ((Bag)fx.eval(value)).iterator();
-			    return new Iterable<MRData>() {
-				public Iterator<MRData> iterator() {
-				    return new Iterator<MRData>() {
-					Tuple p;
-					Iterator<MRData> ix = null;
-					public MRData next () {
-					    return ix.next();
-					}
-					public boolean hasNext () {
-					    if (ix != null && ix.hasNext())
-						return true;
-					    while (i.hasNext()) {
-						p = (Tuple)i.next();
-						MRData pd = built_table.get(p.first());
-						if (pd == null)
-						    continue;
-						Bag bb = ((Bag)fr.eval(new Tuple(p.second(),pd)));
-						ix = bb.iterator();
-						if (ix.hasNext())
-						    return true;
-					    };
-					    return false;
-					}
-					public void remove () {}
-				    };
-				}
-			    };
-			}
-		    });
-	    case BinarySource(`file,_):
-		String path = ((MR_string)evalE(file,env)).get();
-		new BinaryDataSource(path,Plan.conf);
-		return containerData(spark_context.sequenceFile(file.stringValue(),
-								MRContainer.class,MRContainer.class,
-								Config.nodes));
-	    case ParsedSource(`parser,`file,...args):
-		String path = ((MR_string)evalE(file,env)).get();
-		Class<? extends Parser> p = DataSource.parserDirectory.get(parser.toString());
-		if (p == null)
-		    throw new Error("Unknown parser: "+parser);
-		new ParsedDataSource(path,p,args,Plan.conf);
-		dump_source_dir();
-		return containerData(spark_context.hadoopFile(path,ParsedInputFormat.class,
-							      MRContainer.class,MRContainer.class,
-							      Config.nodes));
-	    case Merge(`x,`y):
-		return eval(x,env).union(eval(y,env));
-	    case Repeat(lambda(`v,`b),`s,`n):
-		int max_num = ((MR_int)evalE(n,env)).get();
-		JavaRDD<MRData> rd;
-		JavaRDD<MRData> res = eval(s,env).cache();
-		int i = 0;
-		boolean cont = true;
-		do {
-		    rd = eval(b,new Environment(v.toString(),new MR_rdd(res),env)).cache();
-		    res = rd.map(get_first).cache();
-		    Integer true_results
-			= rd.aggregate(new Integer(0),
-				       new Function2<Integer,MRData,Integer>() {
-					   public Integer call ( Integer x, MRData y ) {
-					       return (((MR_bool)((Tuple)y).second()).get()) ? x+1 : x;
-					   }
-				       },
-				       new Function2<Integer,Integer,Integer>() {
-					   public Integer call ( Integer x, Integer y ) { return x+y; }
-				       });
-		    i++;
-		    cont = true_results > 0 && i <= max_num;
-		    System.err.println("*** Repeat #"+i+": "+true_results+" true results");
-		} while (cont);
-		return res;
-	    case Closure(lambda(`v,`b),`s,`m):
-		int max_num = ((MR_int)evalE(m,env)).get();
-		JavaRDD<MRData> res = eval(s,env).cache();
-		long n = 0;
-		long old = 0;
-		int i = 0;
-		boolean cont = true;
-		do {
-		    res = eval(b,new Environment(v.toString(),new MR_rdd(res),env)).cache();
-		    old = n;
-		    n = res.count();
-		    i++;
-		    System.err.println("*** Repeat #"+i+": "+(old-n)+" new records");
-		} while (old < n && i <= max_num);
-		return res;
-	    case Generator(`min,`max,`size):
-		DataSet ds = Plan.generator(((MR_long)evalE(min,env)).get(),
-					    ((MR_long)evalE(max,env)).get(),
-					    ((MR_long)evalE(size,env)).get());
-		JavaRDD<MRData> rd = null;
-		for ( DataSource d: ds.source )
-		    if (rd == null)
-			rd = containerData(spark_context.hadoopFile(d.path,GeneratorInputFormat.class,
-								    MRContainer.class,MRContainer.class,1));
-		    else rd = rd.union(containerData(spark_context.hadoopFile(d.path,GeneratorInputFormat.class,
-									      MRContainer.class,MRContainer.class,1)));
-		return rd;
-	    case let(`v,`u,`body):
-		return eval(body,new Environment(v.toString(),evalE(u,env),env));
-	    case Let(`v,`u,`body):
-		return eval(body,new Environment(v.toString(),new MR_rdd(eval(u,env)),env));
-	    case If(`c,`x,`y):
-		if (((MR_bool)evalE(c,env)).get())
-		    return eval(x,env);
-		else return eval(y,env);
-	    case `v:
-		if (!v.is_variable())
-		    fail;
-		MRData x = variable_lookup(v.toString(),env);
-		if (x != null)
-		    if (x instanceof MR_rdd)
-			return ((MR_rdd)x).rdd();
-		x = variable_lookup(v.toString(),global_env);
-		if (x != null)
-		    if (x instanceof MR_rdd)
-			return ((MR_rdd)x).rdd();
-		throw new Error("Variable "+v+" is not bound");
-	    };
-	    throw new Error("Cannot evaluate the Spark plan: "+e);
-	} catch (Error msg) {
-	    if (!Config.trace)
-		throw new Error(msg.getMessage());
-	    System.err.println(msg.getMessage());
-	    throw new Error("Evaluation error in: "+print_query(e));
-	} catch (Exception ex) {
-	    System.err.println(ex.getMessage());
-	    ex.printStackTrace();
-	    throw new Error("Evaluation error in: "+print_query(e));
-	}
+        try {
+            match e {
+            case MapAggregateReduce(`m,`r,null,_,`s,`o):
+                return evalD(#<MapReduce(`m,`r,`s,`o)>,env);
+            case CrossAggregateProduct(`mx,`my,`r,null,_,`x,`y):
+                return evalD(#<CrossProduct(`mx,`my,`r,`x,`y)>,env);
+            case MapAggregateReduce2(`mx,`my,`r,null,_,`x,`y,`o):
+                return evalD(#<MapReduce2(`mx,`my,`r,`x,`y,`o)>,env);
+            case MapAggregateJoin(`mx,`my,`r,null,_,`x,`y):
+                return evalD(#<MapJoin(`mx,`my,`r,`x,`y)>,env);
+            case cMap(`f,`s):
+                return eval(s,env).flatMap(cmap_fnc(f,env));
+            case MapReduce(`m,`r,`s,`o):
+                return groupBy(eval(s,env).flatMap(cmap_fnc(m,env)),r,env,o);
+            case MapCombineReduce(`m,`c,`r,`s,`o):
+                return groupBy(eval(s,env).flatMap(cmap_fnc(m,env))
+                               .mapPartitions(combiner_fnc(evalF(c,env))),r,env,o);
+            case MapCombineReduce2(`mx,`my,`c,`r,`x,`y,`o):
+                return evalD(#<MapReduce2(`mx,`my,`r,`x,`y,`o)>,env).mapPartitions(combiner_fnc(evalF(c,env)));
+            case CrossProduct(`mx,`my,`r,`x,`y):
+                final org.apache.mrql.Function fr = evalF(r,env);
+                return eval(x,env)
+                    .flatMap(cmap_fnc(mx,env))
+                    .cartesian(eval(y,env).flatMap(cmap_fnc(my,env)))
+                    .flatMap(new FlatMapFunction<Tuple2<MRData,MRData>,MRData>() {
+                            public Iterable<MRData> call ( Tuple2<MRData,MRData> value ) {
+                                return (Bag)fr.eval(new Tuple(value._1,value._2));
+                            }
+                        });
+            case MapReduce2(`mx,`my,`r,`x,`y,`o):
+                final org.apache.mrql.Function fx = evalF(mx,env);
+                final org.apache.mrql.Function fy = evalF(my,env);
+                final org.apache.mrql.Function fr = evalF(r,env);
+                JavaPairRDD<MRData,MRData> xs
+                    = eval(x,env).flatMap(new PairFlatMapFunction<MRData,MRData,MRData>() {
+                            public Iterable<Tuple2<MRData,MRData>> call ( MRData value ) {
+                                return joinIterator(((Bag)fx.eval(value)).iterator());
+                            }
+                        });
+                JavaPairRDD<MRData,MRData> ys
+                    = eval(y,env).flatMap(new PairFlatMapFunction<MRData,MRData,MRData>() {
+                            public Iterable<Tuple2<MRData,MRData>> call ( MRData value ) {
+                                return joinIterator(((Bag)fy.eval(value)).iterator());
+                            }
+                        });
+                return xs.cogroup(ys)
+                    .flatMap(new FlatMapFunction<Tuple2<MRData,Tuple2<List<MRData>,List<MRData>>>,MRData>() {
+                            public Iterable<MRData> call ( Tuple2<MRData,Tuple2<List<MRData>,List<MRData>>> value ) {
+                                return (Bag)fr.eval(new Tuple(bag(value._2._1),bag(value._2._2)));
+                            }
+                        });
+            case GroupByJoin(`kx,`ky,`gx,`gy,`mp,`c,`r,`x,`y,`o):
+                final int n = 10;
+                final int m = 10;
+                final org.apache.mrql.Function fkx = evalF(kx,env);
+                final org.apache.mrql.Function fky = evalF(ky,env);
+                final org.apache.mrql.Function fgx = evalF(gx,env);
+                final org.apache.mrql.Function fgy = evalF(gy,env);
+                final org.apache.mrql.Function fm = evalF(mp,env);
+                final org.apache.mrql.Function fc = evalF(c,env);
+                final org.apache.mrql.Function fr = evalF(r,env);
+                JavaPairRDD<MRData,MRData> xs
+                    = eval(x,env).flatMap(new PairFlatMapFunction<MRData,MRData,MRData>() {
+                            public Iterable<Tuple2<MRData,MRData>> call ( final MRData value ) {
+                                return new Iterable<Tuple2<MRData,MRData>>() {
+                                    public Iterator<Tuple2<MRData,MRData>> iterator() {
+                                        return new Iterator<Tuple2<MRData,MRData>>() {
+                                            int i = 0;
+                                            public Tuple2<MRData,MRData> next () {
+                                                MRData key = new MR_int((fgx.eval(value).hashCode() % m)+m*i);
+                                                i++;
+                                                return new Tuple2<MRData,MRData>(key,new Tuple(fkx.eval(value),value));
+                                            }
+                                            public boolean hasNext () {
+                                                return i < n;
+                                            }
+                                            public void remove () {}
+                                        };
+                                    }
+                                };
+                            }
+                        });
+                JavaPairRDD<MRData,MRData> ys
+                    = eval(y,env).flatMap(new PairFlatMapFunction<MRData,MRData,MRData>() {
+                            public Iterable<Tuple2<MRData,MRData>> call ( final MRData value ) {
+                                return new Iterable<Tuple2<MRData,MRData>>() {
+                                    public Iterator<Tuple2<MRData,MRData>> iterator() {
+                                        return new Iterator<Tuple2<MRData,MRData>>() {
+                                            int i = 0;
+                                            public Tuple2<MRData,MRData> next () {
+                                                MRData key = new MR_int((fgy.eval(value).hashCode() % n)*m+i);
+                                                i++;
+                                                return new Tuple2<MRData,MRData>(key,new Tuple(fky.eval(value),value));
+                                            }
+                                            public boolean hasNext () {
+                                                return i < m;
+                                            }
+                                            public void remove () {}
+                                        };
+                                    }
+                                };
+                            }
+                        });
+                return xs.cogroup(ys)
+                    .flatMap(new FlatMapFunction<Tuple2<MRData,Tuple2<List<MRData>,List<MRData>>>,MRData>() {
+                            public Iterable<MRData> call ( Tuple2<MRData,Tuple2<List<MRData>,List<MRData>>> value ) {
+                                final Iterator<MRData> i = (MapReduceAlgebra.mergeGroupByJoin(fkx,fky,fgx,fgy,fm,fc,fr,
+                                                                        bag(value._2._1),bag(value._2._2))).iterator();
+                                return new Iterable<MRData>() {
+                                    public Iterator<MRData> iterator() {
+                                        return new Iterator<MRData>() {
+                                            public MRData next () {
+                                                return i.next();
+                                            }
+                                            public boolean hasNext () {
+                                                return i.hasNext();
+                                            }
+                                            public void remove () {}
+                                        };
+                                    }
+                                };
+                            }
+                        });
+            case MapJoin(`mx,`my,`r,`x,`y):
+                final org.apache.mrql.Function fx = evalF(mx,env);
+                final org.apache.mrql.Function fy = evalF(my,env);
+                final org.apache.mrql.Function fr = evalF(r,env);
+                final Broadcast<List<Tuple2<MRData,MRData>>> ys
+                    = spark_context.broadcast(eval(y,env).flatMap(new PairFlatMapFunction<MRData,MRData,MRData>() {
+                                public Iterable<Tuple2<MRData,MRData>> call ( MRData value ) {
+                                    return joinIterator(((Bag)fy.eval(value)).iterator());
+                                }
+                            }).collect());
+                return eval(x,env).flatMap(new FlatMapFunction<MRData,MRData>() {
+                        final Hashtable<MRData,Bag> built_table = make_built_table(ys.value());
+                        public Iterable<MRData> call ( MRData value ) {
+                            final Iterator<MRData> i = ((Bag)fx.eval(value)).iterator();
+                            return new Iterable<MRData>() {
+                                public Iterator<MRData> iterator() {
+                                    return new Iterator<MRData>() {
+                                        Tuple p;
+                                        Iterator<MRData> ix = null;
+                                        public MRData next () {
+                                            return ix.next();
+                                        }
+                                        public boolean hasNext () {
+                                            if (ix != null && ix.hasNext())
+                                                return true;
+                                            while (i.hasNext()) {
+                                                p = (Tuple)i.next();
+                                                MRData pd = built_table.get(p.first());
+                                                if (pd == null)
+                                                    continue;
+                                                Bag bb = ((Bag)fr.eval(new Tuple(p.second(),pd)));
+                                                ix = bb.iterator();
+                                                if (ix.hasNext())
+                                                    return true;
+                                            };
+                                            return false;
+                                        }
+                                        public void remove () {}
+                                    };
+                                }
+                            };
+                        }
+                    });
+            case BinarySource(`file,_):
+                String path = ((MR_string)evalE(file,env)).get();
+                new BinaryDataSource(path,Plan.conf);
+                return containerData(spark_context.sequenceFile(file.stringValue(),
+                                                                MRContainer.class,MRContainer.class,
+                                                                Config.nodes));
+            case ParsedSource(`parser,`file,...args):
+                String path = ((MR_string)evalE(file,env)).get();
+                Class<? extends Parser> p = DataSource.parserDirectory.get(parser.toString());
+                if (p == null)
+                    throw new Error("Unknown parser: "+parser);
+                new ParsedDataSource(path,p,args,Plan.conf);
+                dump_source_dir();
+                return containerData(spark_context.hadoopFile(path,ParsedInputFormat.class,
+                                                              MRContainer.class,MRContainer.class,
+                                                              Config.nodes));
+            case Merge(`x,`y):
+                return eval(x,env).union(eval(y,env));
+            case Repeat(lambda(`v,`b),`s,`n):
+                int max_num = ((MR_int)evalE(n,env)).get();
+                JavaRDD<MRData> rd;
+                JavaRDD<MRData> res = eval(s,env).cache();
+                int i = 0;
+                boolean cont = true;
+                do {
+                    rd = eval(b,new Environment(v.toString(),new MR_rdd(res),env)).cache();
+                    res = rd.map(get_first).cache();
+                    Integer true_results
+                        = rd.aggregate(new Integer(0),
+                                       new Function2<Integer,MRData,Integer>() {
+                                           public Integer call ( Integer x, MRData y ) {
+                                               return (((MR_bool)((Tuple)y).second()).get()) ? x+1 : x;
+                                           }
+                                       },
+                                       new Function2<Integer,Integer,Integer>() {
+                                           public Integer call ( Integer x, Integer y ) { return x+y; }
+                                       });
+                    i++;
+                    cont = true_results > 0 && i <= max_num;
+                    System.err.println("*** Repeat #"+i+": "+true_results+" true results");
+                } while (cont);
+                return res;
+            case Closure(lambda(`v,`b),`s,`m):
+                int max_num = ((MR_int)evalE(m,env)).get();
+                JavaRDD<MRData> res = eval(s,env).cache();
+                long n = 0;
+                long old = 0;
+                int i = 0;
+                boolean cont = true;
+                do {
+                    res = eval(b,new Environment(v.toString(),new MR_rdd(res),env)).cache();
+                    old = n;
+                    n = res.count();
+                    i++;
+                    System.err.println("*** Repeat #"+i+": "+(old-n)+" new records");
+                } while (old < n && i <= max_num);
+                return res;
+            case Generator(`min,`max,`size):
+                DataSet ds = Plan.generator(((MR_long)evalE(min,env)).get(),
+                                            ((MR_long)evalE(max,env)).get(),
+                                            ((MR_long)evalE(size,env)).get());
+                JavaRDD<MRData> rd = null;
+                for ( DataSource d: ds.source )
+                    if (rd == null)
+                        rd = containerData(spark_context.hadoopFile(d.path,GeneratorInputFormat.class,
+                                                                    MRContainer.class,MRContainer.class,1));
+                    else rd = rd.union(containerData(spark_context.hadoopFile(d.path,GeneratorInputFormat.class,
+                                                                              MRContainer.class,MRContainer.class,1)));
+                return rd;
+            case let(`v,`u,`body):
+                return eval(body,new Environment(v.toString(),evalE(u,env),env));
+            case Let(`v,`u,`body):
+                return eval(body,new Environment(v.toString(),new MR_rdd(eval(u,env)),env));
+            case If(`c,`x,`y):
+                if (((MR_bool)evalE(c,env)).get())
+                    return eval(x,env);
+                else return eval(y,env);
+            case `v:
+                if (!v.is_variable())
+                    fail;
+                MRData x = variable_lookup(v.toString(),env);
+                if (x != null)
+                    if (x instanceof MR_rdd)
+                        return ((MR_rdd)x).rdd();
+                x = variable_lookup(v.toString(),global_env);
+                if (x != null)
+                    if (x instanceof MR_rdd)
+                        return ((MR_rdd)x).rdd();
+                throw new Error("Variable "+v+" is not bound");
+            };
+            throw new Error("Cannot evaluate the Spark plan: "+e);
+        } catch (Error msg) {
+            if (!Config.trace)
+                throw new Error(msg.getMessage());
+            System.err.println(msg.getMessage());
+            throw new Error("Evaluation error in: "+print_query(e));
+        } catch (Exception ex) {
+            System.err.println(ex.getMessage());
+            ex.printStackTrace();
+            throw new Error("Evaluation error in: "+print_query(e));
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/3de9e485/src/main/java/spark/GeneratorInputFormat.java
----------------------------------------------------------------------
diff --git a/src/main/java/spark/GeneratorInputFormat.java b/src/main/java/spark/GeneratorInputFormat.java
index 61d7c34..c9e6066 100644
--- a/src/main/java/spark/GeneratorInputFormat.java
+++ b/src/main/java/spark/GeneratorInputFormat.java
@@ -29,52 +29,52 @@ import org.apache.hadoop.mapred.*;
  *  an (offset,size) pair that generates the range of values [offset,offset+size] */
 final public class GeneratorInputFormat extends MRQLFileInputFormat {
     public static class GeneratorRecordReader implements RecordReader<MRContainer,MRContainer> {
-	final long offset;
-	final long size;
-	long index;
-	SequenceFile.Reader reader;
+        final long offset;
+        final long size;
+        long index;
+        SequenceFile.Reader reader;
 
-	public GeneratorRecordReader ( FileSplit split,
-				       Configuration conf ) throws IOException {
-	    Path path = split.getPath();
-	    FileSystem fs = path.getFileSystem(conf);
-	    reader = new SequenceFile.Reader(path.getFileSystem(conf),path,conf);
-	    MRContainer key = new MRContainer();
-	    MRContainer value = new MRContainer();
-	    reader.next(key,value);
-	    offset = ((MR_long)((Tuple)(value.data())).first()).get();
-	    size = ((MR_long)((Tuple)(value.data())).second()).get();
-	    index = -1;
-	}
+        public GeneratorRecordReader ( FileSplit split,
+                                       Configuration conf ) throws IOException {
+            Path path = split.getPath();
+            FileSystem fs = path.getFileSystem(conf);
+            reader = new SequenceFile.Reader(path.getFileSystem(conf),path,conf);
+            MRContainer key = new MRContainer();
+            MRContainer value = new MRContainer();
+            reader.next(key,value);
+            offset = ((MR_long)((Tuple)(value.data())).first()).get();
+            size = ((MR_long)((Tuple)(value.data())).second()).get();
+            index = -1;
+        }
 
-	public MRContainer createKey () {
-	    return new MRContainer(null);
-	}
+        public MRContainer createKey () {
+            return new MRContainer(null);
+        }
 
-	public MRContainer createValue () {
-	    return new MRContainer(null);
-	}
+        public MRContainer createValue () {
+            return new MRContainer(null);
+        }
 
-	public boolean next ( MRContainer key, MRContainer value ) throws IOException {
-	    index++;
-	    value.set(new MR_long(offset+index));
-	    key.set(new MR_long(index));
-	    return index < size;
-	}
+        public boolean next ( MRContainer key, MRContainer value ) throws IOException {
+            index++;
+            value.set(new MR_long(offset+index));
+            key.set(new MR_long(index));
+            return index < size;
+        }
 
-	public long getPos () throws IOException { return index; }
+        public long getPos () throws IOException { return index; }
 
-	public void close () throws IOException { reader.close(); }
+        public void close () throws IOException { reader.close(); }
 
-	public float getProgress () throws IOException {
-	    return index / (float)size;
-	}
+        public float getProgress () throws IOException {
+            return index / (float)size;
+        }
     }
 
     public RecordReader<MRContainer,MRContainer>
-	      getRecordReader ( InputSplit split, JobConf job, Reporter reporter ) throws IOException {
-	return (RecordReader<MRContainer,MRContainer>)
-	              new GeneratorRecordReader((FileSplit)split,job);
+              getRecordReader ( InputSplit split, JobConf job, Reporter reporter ) throws IOException {
+        return (RecordReader<MRContainer,MRContainer>)
+                      new GeneratorRecordReader((FileSplit)split,job);
     }
 
     /** Insert all results from the generators stored in path into a Bag.
@@ -83,31 +83,31 @@ final public class GeneratorInputFormat extends MRQLFileInputFormat {
      * @return a Bag that contains all data
      */
     public Bag materialize ( final Path path ) throws IOException {
-	Configuration conf = Plan.conf;
-	FileSystem fs = path.getFileSystem(conf);
-	final SequenceFile.Reader reader = new SequenceFile.Reader(path.getFileSystem(conf),path,conf);
-	final MRContainer key = new MRContainer();
-	final MRContainer value = new MRContainer();
-	return new Bag(new BagIterator () {
-		long offset = 0;
-		long size = 0;
-		long i = 0;
-		public boolean hasNext () {
-		    if (++i >= offset+size)
-			try {
-			    if (!reader.next(key,value))
-				return false;
-			    offset = ((MR_long)((Tuple)(value.data())).first()).get();
-			    size = ((MR_long)((Tuple)(value.data())).second()).get();
-			    i = offset;
-			} catch (IOException e) {
-			    throw new Error("Cannot collect values from a generator");
-			};
-		    return true;
-		}
-		public MRData next () {
-		    return new MR_long(i);
-		}
-	    });
+        Configuration conf = Plan.conf;
+        FileSystem fs = path.getFileSystem(conf);
+        final SequenceFile.Reader reader = new SequenceFile.Reader(path.getFileSystem(conf),path,conf);
+        final MRContainer key = new MRContainer();
+        final MRContainer value = new MRContainer();
+        return new Bag(new BagIterator () {
+                long offset = 0;
+                long size = 0;
+                long i = 0;
+                public boolean hasNext () {
+                    if (++i >= offset+size)
+                        try {
+                            if (!reader.next(key,value))
+                                return false;
+                            offset = ((MR_long)((Tuple)(value.data())).first()).get();
+                            size = ((MR_long)((Tuple)(value.data())).second()).get();
+                            i = offset;
+                        } catch (IOException e) {
+                            throw new Error("Cannot collect values from a generator");
+                        };
+                    return true;
+                }
+                public MRData next () {
+                    return new MR_long(i);
+                }
+            });
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/3de9e485/src/main/java/spark/MRQLFileInputFormat.java
----------------------------------------------------------------------
diff --git a/src/main/java/spark/MRQLFileInputFormat.java b/src/main/java/spark/MRQLFileInputFormat.java
index ca26e43..73b0910 100644
--- a/src/main/java/spark/MRQLFileInputFormat.java
+++ b/src/main/java/spark/MRQLFileInputFormat.java
@@ -30,39 +30,39 @@ abstract public class MRQLFileInputFormat extends FileInputFormat<MRContainer,MR
 
     /** record reader for spark */
     abstract public RecordReader<MRContainer,MRContainer>
-	getRecordReader ( InputSplit split, JobConf job, Reporter reporter ) throws IOException;
+        getRecordReader ( InputSplit split, JobConf job, Reporter reporter ) throws IOException;
 
     /** materialize the input file into a memory Bag */
     public Bag materialize ( final Path file ) throws Exception {
-	final JobConf job = new JobConf(Plan.conf,MRQLFileInputFormat.class);
-	setInputPaths(job,file);
-	final InputSplit[] splits = getSplits(job,1);
-	final Reporter reporter = null;
-	final RecordReader<MRContainer,MRContainer> rd = getRecordReader(splits[0],job,reporter);
-	return new Bag(new BagIterator () {
-		RecordReader<MRContainer,MRContainer> reader = rd;
-		MRContainer key = reader.createKey();
-		MRContainer value = reader.createKey();
-		int i = 0;
-		public boolean hasNext () {
-		    try {
-			if (reader.next(key,value))
-			    return true;
-			do {
-			    if (++i >= splits.length)
-				return false;
-			    reader.close();
-			    reader = getRecordReader(splits[i],job,reporter);
-			} while (!reader.next(key,value));
-			return true;
-		    } catch (IOException e) {
-			throw new Error("Cannot collect values from an intermediate result");
-		    }
-		}
-		public MRData next () {
-		    return value.data();
-		}
-	    });
+        final JobConf job = new JobConf(Plan.conf,MRQLFileInputFormat.class);
+        setInputPaths(job,file);
+        final InputSplit[] splits = getSplits(job,1);
+        final Reporter reporter = null;
+        final RecordReader<MRContainer,MRContainer> rd = getRecordReader(splits[0],job,reporter);
+        return new Bag(new BagIterator () {
+                RecordReader<MRContainer,MRContainer> reader = rd;
+                MRContainer key = reader.createKey();
+                MRContainer value = reader.createKey();
+                int i = 0;
+                public boolean hasNext () {
+                    try {
+                        if (reader.next(key,value))
+                            return true;
+                        do {
+                            if (++i >= splits.length)
+                                return false;
+                            reader.close();
+                            reader = getRecordReader(splits[i],job,reporter);
+                        } while (!reader.next(key,value));
+                        return true;
+                    } catch (IOException e) {
+                        throw new Error("Cannot collect values from an intermediate result");
+                    }
+                }
+                public MRData next () {
+                    return value.data();
+                }
+            });
     }
 
     /** materialize the entire dataset into a Bag
@@ -71,13 +71,13 @@ abstract public class MRQLFileInputFormat extends FileInputFormat<MRContainer,MR
      * @return the Bag that contains the collected values
      */
     public final static Bag collect ( final DataSet x, boolean strip ) throws Exception {
-	Bag res = new Bag();
-	for ( DataSource s: x.source )
-	    if (s instanceof RDDDataSource)
-		res = res.union(Evaluator.bag(((RDDDataSource)s).rdd));
-	    else if (s.to_be_merged)
-		res = res.union(Plan.merge(s));
-	    else res = res.union(s.inputFormat.newInstance().materialize(new Path(s.path)));
-	return res;
+        Bag res = new Bag();
+        for ( DataSource s: x.source )
+            if (s instanceof RDDDataSource)
+                res = res.union(Evaluator.bag(((RDDDataSource)s).rdd));
+            else if (s.to_be_merged)
+                res = res.union(Plan.merge(s));
+            else res = res.union(s.inputFormat.newInstance().materialize(new Path(s.path)));
+        return res;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/3de9e485/src/main/java/spark/MR_rdd.java
----------------------------------------------------------------------
diff --git a/src/main/java/spark/MR_rdd.java b/src/main/java/spark/MR_rdd.java
index e2739df..4b3eff5 100644
--- a/src/main/java/spark/MR_rdd.java
+++ b/src/main/java/spark/MR_rdd.java
@@ -37,18 +37,18 @@ final public class MR_rdd extends MRData {
     public JavaRDD<MRData> rdd () { return rdd; }
 
     final public void write ( DataOutput out ) throws IOException {
-	throw new Error("RDDs are not serializable");
+        throw new Error("RDDs are not serializable");
     }
 
     public void readFields ( DataInput in ) throws IOException {
-	throw new Error("RDDs are not serializable");
+        throw new Error("RDDs are not serializable");
     }
 
     public int compareTo ( MRData x ) {
-	throw new Error("RDDs cannot be compared");
+        throw new Error("RDDs cannot be compared");
     }
 
     public boolean equals ( Object x ) {
-	throw new Error("RDDs cannot be compared");
+        throw new Error("RDDs cannot be compared");
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/3de9e485/src/main/java/spark/ParsedInputFormat.java
----------------------------------------------------------------------
diff --git a/src/main/java/spark/ParsedInputFormat.java b/src/main/java/spark/ParsedInputFormat.java
index 016c3e1..8ca7db6 100644
--- a/src/main/java/spark/ParsedInputFormat.java
+++ b/src/main/java/spark/ParsedInputFormat.java
@@ -29,69 +29,69 @@ import org.apache.hadoop.conf.Configuration;
 /** A FileInputFormat for text files (CVS, XML, JSON, ...) */
 final public class ParsedInputFormat extends MRQLFileInputFormat {
     public static class ParsedRecordReader implements RecordReader<MRContainer,MRContainer> {
-	final FSDataInputStream fsin;
-	final long start;
-	final long end;
-	Iterator<MRData> result;
-	Parser parser;
+        final FSDataInputStream fsin;
+        final long start;
+        final long end;
+        Iterator<MRData> result;
+        Parser parser;
 
-	public ParsedRecordReader ( FileSplit split,
-				    Configuration conf,
-				    Class<? extends Parser> parser_class,
-				    Trees args ) throws IOException {
-	    start = split.getStart();
-	    end = start + split.getLength();
-	    Path file = split.getPath();
-	    FileSystem fs = file.getFileSystem(conf);
-	    fsin = fs.open(split.getPath());
-	    try {
-		parser = parser_class.newInstance();
-	    } catch (Exception ex) {
-		throw new Error("Unrecognized parser:"+parser_class);
-	    };
-	    parser.initialize(args);
-	    parser.open(fsin,start,end);
-	    result = null;
-	}
+        public ParsedRecordReader ( FileSplit split,
+                                    Configuration conf,
+                                    Class<? extends Parser> parser_class,
+                                    Trees args ) throws IOException {
+            start = split.getStart();
+            end = start + split.getLength();
+            Path file = split.getPath();
+            FileSystem fs = file.getFileSystem(conf);
+            fsin = fs.open(split.getPath());
+            try {
+                parser = parser_class.newInstance();
+            } catch (Exception ex) {
+                throw new Error("Unrecognized parser:"+parser_class);
+            };
+            parser.initialize(args);
+            parser.open(fsin,start,end);
+            result = null;
+        }
 
         public MRContainer createKey () {
-	    return new MRContainer();
-	}
+            return new MRContainer();
+        }
 
-	public MRContainer createValue () {
-	    return new MRContainer();
-	}
+        public MRContainer createValue () {
+            return new MRContainer();
+        }
 
-	public synchronized boolean next ( MRContainer key, MRContainer value ) throws IOException {
-	    while (result == null || !result.hasNext()) {
-		String s = parser.slice();
-		if (s == null)
-		    return false;
-		result = parser.parse(s).iterator();
-	    };
-	    value.set((MRData)result.next());
-	    key.set(new MR_long(fsin.getPos()));
-	    return true;
-	}
+        public synchronized boolean next ( MRContainer key, MRContainer value ) throws IOException {
+            while (result == null || !result.hasNext()) {
+                String s = parser.slice();
+                if (s == null)
+                    return false;
+                result = parser.parse(s).iterator();
+            };
+            value.set((MRData)result.next());
+            key.set(new MR_long(fsin.getPos()));
+            return true;
+        }
 
-	public synchronized long getPos () throws IOException { return fsin.getPos(); }
+        public synchronized long getPos () throws IOException { return fsin.getPos(); }
 
-	public synchronized void close () throws IOException { fsin.close(); }
+        public synchronized void close () throws IOException { fsin.close(); }
 
-	public float getProgress () throws IOException {
-	    if (end == start)
-		return 0.0f;
-	    else return Math.min(1.0f, (getPos() - start) / (float)(end - start));
-	}
+        public float getProgress () throws IOException {
+            if (end == start)
+                return 0.0f;
+            else return Math.min(1.0f, (getPos() - start) / (float)(end - start));
+        }
     }
 
     public RecordReader<MRContainer,MRContainer>
-	      getRecordReader ( InputSplit split,
-				JobConf job,
-				Reporter reporter ) throws IOException {
-	Evaluator.load_source_dir();  // load the parsed source parameters from a file
-	String path = ((FileSplit)split).getPath().toString();
-	ParsedDataSource ds = (ParsedDataSource)DataSource.get(path,Plan.conf);
-	return new ParsedRecordReader((FileSplit)split,job,ds.parser,(Trees)ds.args);
+              getRecordReader ( InputSplit split,
+                                JobConf job,
+                                Reporter reporter ) throws IOException {
+        Evaluator.load_source_dir();  // load the parsed source parameters from a file
+        String path = ((FileSplit)split).getPath().toString();
+        ParsedDataSource ds = (ParsedDataSource)DataSource.get(path,Plan.conf);
+        return new ParsedRecordReader((FileSplit)split,job,ds.parser,(Trees)ds.args);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/3de9e485/src/main/java/spark/RDDDataSource.java
----------------------------------------------------------------------
diff --git a/src/main/java/spark/RDDDataSource.java b/src/main/java/spark/RDDDataSource.java
index 2e7ef63..609c753 100644
--- a/src/main/java/spark/RDDDataSource.java
+++ b/src/main/java/spark/RDDDataSource.java
@@ -25,12 +25,12 @@ final public class RDDDataSource extends DataSource {
     JavaRDD<MRData> rdd;
 
     RDDDataSource ( JavaRDD<MRData> rdd ) {
-	super();
-	this.rdd = rdd;
+        super();
+        this.rdd = rdd;
     }
 
     @Override
     public long size ( Configuration conf ) {
-	return rdd.count();
+        return rdd.count();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/3de9e485/tests/queries/core_1.mrql
----------------------------------------------------------------------
diff --git a/tests/queries/core_1.mrql b/tests/queries/core_1.mrql
index 50794d7..dd221d3 100644
--- a/tests/queries/core_1.mrql
+++ b/tests/queries/core_1.mrql
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 [1,2,3,4][2];
 
 [1..1000][10:20];

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/3de9e485/tests/queries/distinct_1.mrql
----------------------------------------------------------------------
diff --git a/tests/queries/distinct_1.mrql b/tests/queries/distinct_1.mrql
index 4118729..46a19c9 100644
--- a/tests/queries/distinct_1.mrql
+++ b/tests/queries/distinct_1.mrql
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 C = source(line,"tests/data/customer.tbl","|",type(<CUSTKEY:int,NAME:string>));
 O = source(line,"tests/data/orders.tbl","|",type(<ORDERKEY:string,CUSTKEY:int,ORDERSTATUS:any,TOTALPRICE:float>));
 

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/3de9e485/tests/queries/factorization_1.mrql
----------------------------------------------------------------------
diff --git a/tests/queries/factorization_1.mrql b/tests/queries/factorization_1.mrql
index 13b86d2..e46abd3 100644
--- a/tests/queries/factorization_1.mrql
+++ b/tests/queries/factorization_1.mrql
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 Mmatrix = source(line,"tests/data/Xmatrix.txt",",",type( (double,long,long) ));
 Hmatrix = source(line,"tests/data/Ymatrix.txt",",",type( (double,long,long) ));
 Wmatrix = source(line,"tests/data/Ymatrix.txt",",",type( (double,long,long) ));

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/3de9e485/tests/queries/group_by_1.mrql
----------------------------------------------------------------------
diff --git a/tests/queries/group_by_1.mrql b/tests/queries/group_by_1.mrql
index 8210e02..119c260 100644
--- a/tests/queries/group_by_1.mrql
+++ b/tests/queries/group_by_1.mrql
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 C = source(line,"tests/data/customer.tbl","|",type(<CUSTKEY:int,NAME:string>));
 O = source(line,"tests/data/orders.tbl","|",type(<ORDERKEY:string,CUSTKEY:int,ORDERSTATUS:any,TOTALPRICE:float>));
 

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/3de9e485/tests/queries/group_by_having_1.mrql
----------------------------------------------------------------------
diff --git a/tests/queries/group_by_having_1.mrql b/tests/queries/group_by_having_1.mrql
index 07ba883..31f6fa4 100644
--- a/tests/queries/group_by_having_1.mrql
+++ b/tests/queries/group_by_having_1.mrql
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 C = source(line,"tests/data/customer.tbl","|",type(<CUSTKEY:int,NAME:string>));
 O = source(line,"tests/data/orders.tbl","|",type(<ORDERKEY:string,CUSTKEY:int,ORDERSTATUS:any,TOTALPRICE:float>));
 

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/3de9e485/tests/queries/group_by_order_by_1.mrql
----------------------------------------------------------------------
diff --git a/tests/queries/group_by_order_by_1.mrql b/tests/queries/group_by_order_by_1.mrql
index 0652c84..e72b463 100644
--- a/tests/queries/group_by_order_by_1.mrql
+++ b/tests/queries/group_by_order_by_1.mrql
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 C = source(line,"tests/data/customer.tbl","|",type(<CUSTKEY:int,NAME:string>));
 O = source(line,"tests/data/orders.tbl","|",type(<ORDERKEY:string,CUSTKEY:int,ORDERSTATUS:any,TOTALPRICE:float>));