You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mrql.apache.org by fe...@apache.org on 2013/10/01 03:29:29 UTC

[2/5] MRQL-19: Fix license issues to prepare a new release candidate

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/17a0ed95/src/main/java/spark/Evaluator.gen
----------------------------------------------------------------------
diff --git a/src/main/java/spark/Evaluator.gen b/src/main/java/spark/Evaluator.gen
deleted file mode 100644
index 30905b1..0000000
--- a/src/main/java/spark/Evaluator.gen
+++ /dev/null
@@ -1,688 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.mrql;
-
-import java_cup.runtime.*;
-import org.apache.mrql.gen.*;
-import java.util.*;
-import java.io.*;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.FileInputFormat;
-import scala.Tuple2;
-import spark.TaskContext;
-import spark.Partition;
-import spark.Accumulator;
-import spark.broadcast.Broadcast;
-import spark.api.java.JavaRDD;
-import spark.api.java.JavaPairRDD;
-import spark.api.java.JavaSparkContext;
-import spark.api.java.function.Function2;
-import spark.api.java.function.PairFunction;
-import spark.api.java.function.FlatMapFunction;
-import spark.api.java.function.PairFlatMapFunction;
-import spark.api.java.function.Function;
-import spark.api.java.function.VoidFunction;
-
-
-/** Evaluates physical plans in Spark mode */
-final public class Evaluator extends Interpreter implements Serializable {
-    static JavaSparkContext spark_context;
-    final static String mrql_jar = Evaluator.class.getProtectionDomain().getCodeSource().getLocation().toString();
-    static String data_source_directory;
-
-    /** initialize the Spark evaluator */
-    final static void init ( Configuration conf ) throws IOException {
-        Config.spark_mode = true;
-        if (Config.hadoop_mode && Config.local_mode)
-            spark_context = new JavaSparkContext("local["+Config.nodes+"]",
-                                                 "MRQL Local Spark Evaluator");
-        else if (Config.hadoop_mode) {
-            HashMap<String,String> env = new HashMap<String,String>();
-            data_source_directory = System.getenv("FS_DEFAULT_NAME")+"/"+Plan.new_path(conf);
-            env.put("mrql.data.source.directory",data_source_directory);
-            spark_context = new JavaSparkContext(System.getenv("SPARK_MASTER"),
-                                                 "MRQL Spark Evaluator",
-                                                 System.getenv("SPARK_HOME"),
-                                                 new String[]{mrql_jar},
-                                                 env);
-            Plan.conf = spark_context.hadoopConfiguration();
-            FileSystem.setDefaultUri(Plan.conf,System.getenv("FS_DEFAULT_NAME"));
-        }
-    }
-
-    /** shutdown the Spark evaluator */
-    final static void shutdown ( Configuration conf ) {
-        spark_context.stop();
-        spark_context = null;
-        System.clearProperty("spark.driver.port");
-    }
-
-    final static void initialize_query () {
-        spark_context.clearJars();
-        spark_context.addJar(mrql_jar);
-        Plan.distribute_compiled_arguments(Plan.conf);
-        if (Config.compile_functional_arguments)
-            spark_context.addJar(Plan.conf.get("mrql.jar.path"));
-    }
-
-    final static Configuration new_configuration () {
-        return new Configuration();
-    }
-
-    public static MR_bool synchronize ( MR_string peerName, MR_bool mr_exit ) {
-        throw new Error("You can only synchronize BSP tasks");
-    }
-
-    public static Bag distribute ( MR_string peerName, Bag s ) {
-        throw new Error("You can only distribute bags among BSP tasks");
-    }
-
-    final static Bag collect ( final DataSet x ) throws Exception {
-        return Plan.collect(x);
-    }
-
-    final static MRData bsp ( Tree plan, Environment env ) throws Exception {
-        throw new Error("You can not run a BSP task in Spark mode");
-    }
-
-    /** used by the master to send parsing details (eg, record types) to workers */
-    public static void dump_source_dir () throws IOException {
-        if (Config.local_mode)
-            return;
-        DataSource.dataSourceDirectory.distribute(Plan.conf);
-        Path path = new Path(data_source_directory);
-        FileSystem fs = path.getFileSystem(Plan.conf);
-        PrintStream ps = new PrintStream(fs.create(path,true));
-        ps.println(Plan.conf.get("mrql.data.source.directory"));
-        ps.close();
-    }
-
-    /** executed by a worker when reading parsed input (see ParsedInputFormat) */
-    public static void load_source_dir () throws IOException {
-        if (Config.local_mode)
-            return;
-        if (Plan.conf == null)
-            Plan.conf = new_configuration();
-        // the name of the file that contains the source directory details is passed
-        //   to workers through the HashMap environment in the JavaSparkContext
-        Path path = new Path(System.getenv("mrql.data.source.directory"));
-        FileSystem fs = path.getFileSystem(Plan.conf);
-        BufferedReader ftp = new BufferedReader(new InputStreamReader(fs.open(path)));
-        Plan.conf.set("mrql.data.source.directory",ftp.readLine());
-        DataSource.dataSourceDirectory.read(Plan.conf);
-        ftp.close();
-    }
-
-    private static Function2<MRData,MRData,MRData> accumulator ( Tree acc_fnc, Environment env ) {
-        final org.apache.mrql.Function f = evalF(acc_fnc,env);
-        return new Function2<MRData,MRData,MRData>() {
-            public MRData call ( MRData x, MRData y ) {
-                return f.eval(new Tuple(x,y));
-            }
-        };
-    }
-
-    /** The Aggregate physical operator
-     * @param acc_fnc  the accumulator function from (T,T) to T
-     * @param zero  the zero element of type T
-     * @param plan the plan that constructs the dataset that contains the bag of values {T}
-     * @param env contains bindings fro variables to values (MRData)
-     * @return the aggregation result of type T
-     */
-    final static MRData aggregate ( Tree acc_fnc,
-                                    Tree zero,
-                                    Tree plan,
-                                    Environment env ) throws Exception {
-        Function2<MRData,MRData,MRData> f2 = accumulator(acc_fnc,env);
-        MRData z = evalE(zero,env);
-        match plan {
-        case AggregateMap(`m,`acc,_,`s):
-            return evalD(#<cMap(`m,`s)>,env)
-                     .aggregate(z,accumulator(acc,env),f2);
-        case MapAggregateReduce(`m,`r,`acc,_,`s,`o):
-            if (acc.equals(#<null>))
-                fail;
-            return evalD(#<MapReduce(`m,`r,`s,`o)>,env)
-                     .aggregate(z,accumulator(acc,env),f2);
-        case CrossAggregateProduct(`mx,`my,`r,`acc,_,`x,`y):
-            if (acc.equals(#<null>))
-                fail;
-            return evalD(#<CrossProduct(`mx,`my,`r,`x,`y)>,env)
-                     .aggregate(z,accumulator(acc,env),f2);
-        case MapAggregateReduce2(`mx,`my,`r,`acc,_,`x,`y,`o):
-            if (acc.equals(#<null>))
-                fail;
-            return evalD(#<MapReduce2(`mx,`my,`r,`x,`y,`o)>,env)
-                     .aggregate(z,accumulator(acc,env),f2);
-        case MapAggregateJoin(`mx,`my,`r,`acc,_,`x,`y):
-            if (acc.equals(#<null>))
-                fail;
-            return evalD(#<MapJoin(`mx,`my,`r,`x,`y)>,env)
-                     .aggregate(z,accumulator(acc,env),f2);
-        };
-        throw new Error("Unrecognized aggregation: "+plan);
-    }
-
-    /** Evaluate a loop a fixed # of times */
-    final static Tuple loop ( Tree e, Environment env ) throws Exception {
-        match e {
-        case Loop(lambda(tuple(...vs),tuple(...bs)),tuple(...ss),`num):
-            int limit = ((MR_int)evalE(num,env)).get();
-            MR_rdd[] s = new MR_rdd[vs.length()];
-            for ( int i = 0; i < vs.length(); i++ )
-                s[i] = new MR_rdd(eval(ss.nth(i),env));
-            for ( int n = 0; n < limit; n++ ) {
-                Environment nenv = env;
-                for ( int i = 0; i < vs.length(); i ++ )
-                    nenv = new Environment(vs.nth(i).toString(),s[i],nenv);
-                for ( int i = 0; i < vs.length(); i ++ )
-                    s[i] = new MR_rdd(eval(bs.nth(i),nenv));
-            };
-            return new Tuple(s);
-        };
-        throw new Error("Wrong Loop format");
-    }
-
-    private static Bag bag ( final List<MRData> s ) {
-        final Iterator<MRData> i = s.iterator();
-        return new Bag(new BagIterator() {
-                public MRData next () {
-                    return i.next();
-                }
-                public boolean hasNext () {
-                    return i.hasNext();
-                }
-            });
-    }
-
-    final static TaskContext context = new TaskContext(0,0,0,null);
-
-    /** Convert a Spark RDD into a lazy bag
-     * @param rdd the Spark RDD
-     * @return a lazy bag that contains all RDD elements
-     */
-    public static Bag bag ( final JavaRDD<MRData> rdd ) {
-        final JavaRDD<MRData> rd = rdd.cache();
-        rd.count();  // force the evaluation of all pending Spark operations
-        final List<Partition> ps = rd.splits();
-        return new Bag(new BagIterator() {
-                Iterator<MRData> i = null;
-                int c = 0;
-                public MRData next () {
-                    return i.next();
-                }
-                public boolean hasNext () {
-                    do {
-                        if (i != null && i.hasNext()) 
-                            return true;
-                        if (c >= ps.size())
-                            return false;
-                        i = rdd.iterator(ps.get(c++),context);
-                    } while (true);
-                }
-            });
-    }
-
-    private final static Function<MRData,MRData> get_first
-        = new Function<MRData,MRData>() {
-                   public MRData call ( MRData value ) {
-                       return ((Tuple)value).first();
-                   };
-    };
-
-    /** Evaluate an MRQL physical plan using Spark
-     * @param e the physical plan
-     * @param env contains bindings fro variables to values (MRData)
-     * @param counter the name of the counter used in loops
-     * @return a DataSet
-     */
-    final static DataSet eval ( final Tree e,
-                                final Environment env,
-                                final String counter ) {
-        JavaRDD<MRData> rd = eval(e,env);
-        int count = 0;
-        if (!counter.equals("-")) {
-            final Accumulator<Integer> c = spark_context.intAccumulator(0);
-            rd = rd.cache();
-            rd.foreach(new VoidFunction<MRData>() {
-                    public void call ( MRData value ) {
-                        if (((MR_bool)((Tuple)value).second()).get())
-                            c.add(1);    // count the true results (the results that need another loop step)
-                    }
-                });
-            count = c.value();
-            rd = rd.map(get_first);
-        };
-        return new DataSet(new RDDDataSource(rd),count,0);
-    }
-
-    /** Evaluate an MRQL physical plan using Spark and print tracing info
-     * @param e the physical plan
-     * @param env contains bindings from variables to values (MRData)
-     * @return a Spark RDD
-     */
-    final static JavaRDD<MRData> eval ( final Tree e, final Environment env ) {
-        if (Config.trace_execution) {
-            tab_count += 3;
-            System.out.println(tabs(tab_count)+print_query(e));
-        };
-        final JavaRDD<MRData> res = evalD(e,env);
-        if (Config.trace_execution) 
-            try {
-                System.out.println(tabs(tab_count)+"-> "+res.collect());
-                tab_count -= 3;
-            } catch (Exception ex) {
-                throw new Error("Cannot collect the operator output: "+e);
-            };
-        return res;
-    }
-
-    /* convert an MRQL lambda to a Spark Function */
-    private static FlatMapFunction<MRData,MRData> cmap_fnc ( Tree fnc, Environment env ) {
-        final org.apache.mrql.Function f = evalF(fnc,env);
-        return new FlatMapFunction<MRData,MRData>() {
-            public Iterable<MRData> call ( MRData value ) {
-                return (Bag)f.eval(value);
-            }
-        };
-    }
-
-    /* group-by s and then reduce by fnc; if o is true, sort after group-by */
-    private static JavaRDD<MRData> groupBy ( JavaRDD<MRData> s, Tree fnc, Environment env, Tree o ) {
-        match o {
-        case true:   // the result must be sorted
-            return s.groupBy(get_first)
-                .sortByKey()
-                .map(new Function<Tuple2<MRData,List<MRData>>,MRData>() {
-                        public MRData call ( Tuple2<MRData,List<MRData>> value ) {
-                            return new Tuple(value._1,bag(value._2));
-                        }
-                    })
-                .flatMap(cmap_fnc(fnc,env)).map(new Function<MRData,MRData>() {
-                        public MRData call ( MRData value ) {
-                            return ((Tuple)value).second();
-                        }
-                    });
-        };
-        return s.groupBy(get_first)
-            .map(new Function<Tuple2<MRData,List<MRData>>,MRData> () {
-                    public MRData call ( Tuple2<MRData,List<MRData>> value ) {
-                        final Iterator<MRData> i = value._2.iterator();
-                        return new Tuple(value._1,new Bag(new BagIterator() {
-                                public MRData next () {
-                                    return ((Tuple)i.next()).second();
-                                }
-                                public boolean hasNext () {
-                                    return i.hasNext();
-                                }
-                            }));
-                    }
-                })
-            .flatMap(cmap_fnc(fnc,env));
-    }
-
-    private static JavaRDD<MRData> containerData ( JavaPairRDD<MRContainer,MRContainer> rd ) {
-        final Environment env = Interpreter.global_env;
-        return rd.map(new Function<Tuple2<MRContainer,MRContainer>,MRData>() {
-                // need to pass the global bindings (the in-memory repeat vars) to workers
-                boolean first = true;
-                public MRData call ( Tuple2<MRContainer,MRContainer> value ) {
-                    if (first) {
-                        // need to pass the global bindings (the in-memory repeat vars) to workers
-                        Interpreter.set_global_bindings(env);
-                        first = false;
-                    };
-                    return value._2.data();
-                }
-            });
-    }
-
-    private static Iterable<Tuple2<MRData,MRData>> joinIterator ( final Iterator<MRData> i ) {
-        return new Iterable<Tuple2<MRData,MRData>>() {
-            public Iterator<Tuple2<MRData,MRData>> iterator() {
-                return new Iterator<Tuple2<MRData,MRData>> () {
-                    public Tuple2<MRData,MRData> next () {
-                        Tuple data = (Tuple)i.next();
-                        return new Tuple2<MRData,MRData>(data.first(),data.second());
-                    }
-                    public boolean hasNext () {
-                        return i.hasNext();
-                    }
-                    public void remove () {}
-                };
-            }
-        };
-    }
-
-    private static FlatMapFunction<Iterator<MRData>,MRData> combiner_fnc ( final org.apache.mrql.Function f ) {
-        return new FlatMapFunction<Iterator<MRData>,MRData>() {
-                  public Iterable<MRData> call ( final Iterator<MRData> i ) {
-                      return MapReduceAlgebra.cmap(new org.apache.mrql.Function() {
-                              public MRData eval ( MRData x ) {
-                                  final MRData key = ((Tuple)x).first();
-                                  final Iterator<MRData> it = ((Bag)f.eval(x)).iterator();
-                                  return new Bag(new BagIterator() {
-                                          public MRData next () {
-                                              return new Tuple(key,it.next());
-                                          }
-                                          public boolean hasNext () {
-                                              return it.hasNext();
-                                          }
-                                      });
-                              }},
-                          MapReduceAlgebra.groupBy(new Bag(new BagIterator() {
-                                  public MRData next () {
-                                      return i.next();
-                                  }
-                                  public boolean hasNext () {
-                                      return i.hasNext();
-                                  }
-                              })));
-                  }
-        };
-    }
-
-    private static Hashtable<MRData,Bag> make_built_table ( List<Tuple2<MRData,MRData>> values ) {
-        Hashtable<MRData,Bag> built_table = new Hashtable<MRData,Bag>(Config.map_cache_size);
-        for ( Tuple2<MRData,MRData> t: values ) {
-            Bag entries = built_table.get(t._1);
-            built_table.put(t._1,
-                            (entries == null)
-                            ? (new Bag(t._2))
-                            : entries.add_element(t._2));
-        };
-        return built_table;
-    }
-
-    /** Evaluate MRQL physical operators using Spark
-     * @param e the physical plan
-     * @param env contains bindings fro variables to values (MRData)
-     * @return a Spark RDD
-     */
-    final static JavaRDD<MRData> evalD ( final Tree e, final Environment env ) {
-        try {
-            match e {
-            case MapAggregateReduce(`m,`r,null,_,`s,`o):
-                return evalD(#<MapReduce(`m,`r,`s,`o)>,env);
-            case CrossAggregateProduct(`mx,`my,`r,null,_,`x,`y):
-                return evalD(#<CrossProduct(`mx,`my,`r,`x,`y)>,env);
-            case MapAggregateReduce2(`mx,`my,`r,null,_,`x,`y,`o):
-                return evalD(#<MapReduce2(`mx,`my,`r,`x,`y,`o)>,env);
-            case MapAggregateJoin(`mx,`my,`r,null,_,`x,`y):
-                return evalD(#<MapJoin(`mx,`my,`r,`x,`y)>,env);
-            case cMap(`f,`s):
-                return eval(s,env).flatMap(cmap_fnc(f,env));
-            case MapReduce(`m,`r,`s,`o):
-                return groupBy(eval(s,env).flatMap(cmap_fnc(m,env)),r,env,o);
-            case MapCombineReduce(`m,`c,`r,`s,`o):
-                return groupBy(eval(s,env).flatMap(cmap_fnc(m,env))
-                               .mapPartitions(combiner_fnc(evalF(c,env))),r,env,o);
-            case MapCombineReduce2(`mx,`my,`c,`r,`x,`y,`o):
-                return evalD(#<MapReduce2(`mx,`my,`r,`x,`y,`o)>,env).mapPartitions(combiner_fnc(evalF(c,env)));
-            case CrossProduct(`mx,`my,`r,`x,`y):
-                final org.apache.mrql.Function fr = evalF(r,env);
-                return eval(x,env)
-                    .flatMap(cmap_fnc(mx,env))
-                    .cartesian(eval(y,env).flatMap(cmap_fnc(my,env)))
-                    .flatMap(new FlatMapFunction<Tuple2<MRData,MRData>,MRData>() {
-                            public Iterable<MRData> call ( Tuple2<MRData,MRData> value ) {
-                                return (Bag)fr.eval(new Tuple(value._1,value._2));
-                            }
-                        });
-            case MapReduce2(`mx,`my,`r,`x,`y,`o):
-                final org.apache.mrql.Function fx = evalF(mx,env);
-                final org.apache.mrql.Function fy = evalF(my,env);
-                final org.apache.mrql.Function fr = evalF(r,env);
-                JavaPairRDD<MRData,MRData> xs
-                    = eval(x,env).flatMap(new PairFlatMapFunction<MRData,MRData,MRData>() {
-                            public Iterable<Tuple2<MRData,MRData>> call ( MRData value ) {
-                                return joinIterator(((Bag)fx.eval(value)).iterator());
-                            }
-                        });
-                JavaPairRDD<MRData,MRData> ys
-                    = eval(y,env).flatMap(new PairFlatMapFunction<MRData,MRData,MRData>() {
-                            public Iterable<Tuple2<MRData,MRData>> call ( MRData value ) {
-                                return joinIterator(((Bag)fy.eval(value)).iterator());
-                            }
-                        });
-                return xs.cogroup(ys)
-                    .flatMap(new FlatMapFunction<Tuple2<MRData,Tuple2<List<MRData>,List<MRData>>>,MRData>() {
-                            public Iterable<MRData> call ( Tuple2<MRData,Tuple2<List<MRData>,List<MRData>>> value ) {
-                                return (Bag)fr.eval(new Tuple(bag(value._2._1),bag(value._2._2)));
-                            }
-                        });
-            case GroupByJoin(`kx,`ky,`gx,`gy,`mp,`c,`r,`x,`y,`o):
-                final int n = 10;
-                final int m = 10;
-                final org.apache.mrql.Function fkx = evalF(kx,env);
-                final org.apache.mrql.Function fky = evalF(ky,env);
-                final org.apache.mrql.Function fgx = evalF(gx,env);
-                final org.apache.mrql.Function fgy = evalF(gy,env);
-                final org.apache.mrql.Function fm = evalF(mp,env);
-                final org.apache.mrql.Function fc = evalF(c,env);
-                final org.apache.mrql.Function fr = evalF(r,env);
-                JavaPairRDD<MRData,MRData> xs
-                    = eval(x,env).flatMap(new PairFlatMapFunction<MRData,MRData,MRData>() {
-                            public Iterable<Tuple2<MRData,MRData>> call ( final MRData value ) {
-                                return new Iterable<Tuple2<MRData,MRData>>() {
-                                    public Iterator<Tuple2<MRData,MRData>> iterator() {
-                                        return new Iterator<Tuple2<MRData,MRData>>() {
-                                            int i = 0;
-                                            public Tuple2<MRData,MRData> next () {
-                                                MRData key = new MR_int((fgx.eval(value).hashCode() % m)+m*i);
-                                                i++;
-                                                return new Tuple2<MRData,MRData>(key,new Tuple(fkx.eval(value),value));
-                                            }
-                                            public boolean hasNext () {
-                                                return i < n;
-                                            }
-                                            public void remove () {}
-                                        };
-                                    }
-                                };
-                            }
-                        });
-                JavaPairRDD<MRData,MRData> ys
-                    = eval(y,env).flatMap(new PairFlatMapFunction<MRData,MRData,MRData>() {
-                            public Iterable<Tuple2<MRData,MRData>> call ( final MRData value ) {
-                                return new Iterable<Tuple2<MRData,MRData>>() {
-                                    public Iterator<Tuple2<MRData,MRData>> iterator() {
-                                        return new Iterator<Tuple2<MRData,MRData>>() {
-                                            int i = 0;
-                                            public Tuple2<MRData,MRData> next () {
-                                                MRData key = new MR_int((fgy.eval(value).hashCode() % n)*m+i);
-                                                i++;
-                                                return new Tuple2<MRData,MRData>(key,new Tuple(fky.eval(value),value));
-                                            }
-                                            public boolean hasNext () {
-                                                return i < m;
-                                            }
-                                            public void remove () {}
-                                        };
-                                    }
-                                };
-                            }
-                        });
-                return xs.cogroup(ys)
-                    .flatMap(new FlatMapFunction<Tuple2<MRData,Tuple2<List<MRData>,List<MRData>>>,MRData>() {
-                            public Iterable<MRData> call ( Tuple2<MRData,Tuple2<List<MRData>,List<MRData>>> value ) {
-                                final Iterator<MRData> i = (MapReduceAlgebra.mergeGroupByJoin(fkx,fky,fgx,fgy,fm,fc,fr,
-                                                                        bag(value._2._1),bag(value._2._2))).iterator();
-                                return new Iterable<MRData>() {
-                                    public Iterator<MRData> iterator() {
-                                        return new Iterator<MRData>() {
-                                            public MRData next () {
-                                                return i.next();
-                                            }
-                                            public boolean hasNext () {
-                                                return i.hasNext();
-                                            }
-                                            public void remove () {}
-                                        };
-                                    }
-                                };
-                            }
-                        });
-            case MapJoin(`mx,`my,`r,`x,`y):
-                final org.apache.mrql.Function fx = evalF(mx,env);
-                final org.apache.mrql.Function fy = evalF(my,env);
-                final org.apache.mrql.Function fr = evalF(r,env);
-                final Broadcast<List<Tuple2<MRData,MRData>>> ys
-                    = spark_context.broadcast(eval(y,env).flatMap(new PairFlatMapFunction<MRData,MRData,MRData>() {
-                                public Iterable<Tuple2<MRData,MRData>> call ( MRData value ) {
-                                    return joinIterator(((Bag)fy.eval(value)).iterator());
-                                }
-                            }).collect());
-                return eval(x,env).flatMap(new FlatMapFunction<MRData,MRData>() {
-                        final Hashtable<MRData,Bag> built_table = make_built_table(ys.value());
-                        public Iterable<MRData> call ( MRData value ) {
-                            final Iterator<MRData> i = ((Bag)fx.eval(value)).iterator();
-                            return new Iterable<MRData>() {
-                                public Iterator<MRData> iterator() {
-                                    return new Iterator<MRData>() {
-                                        Tuple p;
-                                        Iterator<MRData> ix = null;
-                                        public MRData next () {
-                                            return ix.next();
-                                        }
-                                        public boolean hasNext () {
-                                            if (ix != null && ix.hasNext())
-                                                return true;
-                                            while (i.hasNext()) {
-                                                p = (Tuple)i.next();
-                                                MRData pd = built_table.get(p.first());
-                                                if (pd == null)
-                                                    continue;
-                                                Bag bb = ((Bag)fr.eval(new Tuple(p.second(),pd)));
-                                                ix = bb.iterator();
-                                                if (ix.hasNext())
-                                                    return true;
-                                            };
-                                            return false;
-                                        }
-                                        public void remove () {}
-                                    };
-                                }
-                            };
-                        }
-                    });
-            case BinarySource(`file,_):
-                String path = ((MR_string)evalE(file,env)).get();
-                new BinaryDataSource(path,Plan.conf);
-                return containerData(spark_context.sequenceFile(file.stringValue(),
-                                                                MRContainer.class,MRContainer.class,
-                                                                Config.nodes));
-            case ParsedSource(`parser,`file,...args):
-                String path = ((MR_string)evalE(file,env)).get();
-                Class<? extends Parser> p = DataSource.parserDirectory.get(parser.toString());
-                if (p == null)
-                    throw new Error("Unknown parser: "+parser);
-                new ParsedDataSource(path,p,args,Plan.conf);
-                dump_source_dir();
-                return containerData(spark_context.hadoopFile(path,ParsedInputFormat.class,
-                                                              MRContainer.class,MRContainer.class,
-                                                              Config.nodes));
-            case Merge(`x,`y):
-                return eval(x,env).union(eval(y,env));
-            case Repeat(lambda(`v,`b),`s,`n):
-                int max_num = ((MR_int)evalE(n,env)).get();
-                JavaRDD<MRData> rd;
-                JavaRDD<MRData> res = eval(s,env).cache();
-                int i = 0;
-                boolean cont = true;
-                do {
-                    rd = eval(b,new Environment(v.toString(),new MR_rdd(res),env)).cache();
-                    res = rd.map(get_first).cache();
-                    Integer true_results
-                        = rd.aggregate(new Integer(0),
-                                       new Function2<Integer,MRData,Integer>() {
-                                           public Integer call ( Integer x, MRData y ) {
-                                               return (((MR_bool)((Tuple)y).second()).get()) ? x+1 : x;
-                                           }
-                                       },
-                                       new Function2<Integer,Integer,Integer>() {
-                                           public Integer call ( Integer x, Integer y ) { return x+y; }
-                                       });
-                    i++;
-                    cont = true_results > 0 && i <= max_num;
-                    System.err.println("*** Repeat #"+i+": "+true_results+" true results");
-                } while (cont);
-                return res;
-            case Closure(lambda(`v,`b),`s,`m):
-                int max_num = ((MR_int)evalE(m,env)).get();
-                JavaRDD<MRData> res = eval(s,env).cache();
-                long n = 0;
-                long old = 0;
-                int i = 0;
-                boolean cont = true;
-                do {
-                    res = eval(b,new Environment(v.toString(),new MR_rdd(res),env)).cache();
-                    old = n;
-                    n = res.count();
-                    i++;
-                    System.err.println("*** Repeat #"+i+": "+(old-n)+" new records");
-                } while (old < n && i <= max_num);
-                return res;
-            case Generator(`min,`max,`size):
-                DataSet ds = Plan.generator(((MR_long)evalE(min,env)).get(),
-                                            ((MR_long)evalE(max,env)).get(),
-                                            ((MR_long)evalE(size,env)).get());
-                JavaRDD<MRData> rd = null;
-                for ( DataSource d: ds.source )
-                    if (rd == null)
-                        rd = containerData(spark_context.hadoopFile(d.path,GeneratorInputFormat.class,
-                                                                    MRContainer.class,MRContainer.class,1));
-                    else rd = rd.union(containerData(spark_context.hadoopFile(d.path,GeneratorInputFormat.class,
-                                                                              MRContainer.class,MRContainer.class,1)));
-                return rd;
-            case let(`v,`u,`body):
-                return eval(body,new Environment(v.toString(),evalE(u,env),env));
-            case Let(`v,`u,`body):
-                return eval(body,new Environment(v.toString(),new MR_rdd(eval(u,env)),env));
-            case If(`c,`x,`y):
-                if (((MR_bool)evalE(c,env)).get())
-                    return eval(x,env);
-                else return eval(y,env);
-            case `v:
-                if (!v.is_variable())
-                    fail;
-                MRData x = variable_lookup(v.toString(),env);
-                if (x != null)
-                    if (x instanceof MR_rdd)
-                        return ((MR_rdd)x).rdd();
-                x = variable_lookup(v.toString(),global_env);
-                if (x != null)
-                    if (x instanceof MR_rdd)
-                        return ((MR_rdd)x).rdd();
-                throw new Error("Variable "+v+" is not bound");
-            };
-            throw new Error("Cannot evaluate the Spark plan: "+e);
-        } catch (Error msg) {
-            if (!Config.trace)
-                throw new Error(msg.getMessage());
-            System.err.println(msg.getMessage());
-            throw new Error("Evaluation error in: "+print_query(e));
-        } catch (Exception ex) {
-            System.err.println(ex.getMessage());
-            ex.printStackTrace();
-            throw new Error("Evaluation error in: "+print_query(e));
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/17a0ed95/src/main/java/spark/GeneratorInputFormat.java
----------------------------------------------------------------------
diff --git a/src/main/java/spark/GeneratorInputFormat.java b/src/main/java/spark/GeneratorInputFormat.java
deleted file mode 100644
index c9e6066..0000000
--- a/src/main/java/spark/GeneratorInputFormat.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.mrql;
-
-import java.io.*;
-import java.util.Iterator;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.*;
-
-
-/** the FileInputFormat for data generators: it creates HDFS files, where each file contains
- *  an (offset,size) pair that generates the range of values [offset,offset+size] */
-final public class GeneratorInputFormat extends MRQLFileInputFormat {
-    public static class GeneratorRecordReader implements RecordReader<MRContainer,MRContainer> {
-        final long offset;
-        final long size;
-        long index;
-        SequenceFile.Reader reader;
-
-        public GeneratorRecordReader ( FileSplit split,
-                                       Configuration conf ) throws IOException {
-            Path path = split.getPath();
-            FileSystem fs = path.getFileSystem(conf);
-            reader = new SequenceFile.Reader(path.getFileSystem(conf),path,conf);
-            MRContainer key = new MRContainer();
-            MRContainer value = new MRContainer();
-            reader.next(key,value);
-            offset = ((MR_long)((Tuple)(value.data())).first()).get();
-            size = ((MR_long)((Tuple)(value.data())).second()).get();
-            index = -1;
-        }
-
-        public MRContainer createKey () {
-            return new MRContainer(null);
-        }
-
-        public MRContainer createValue () {
-            return new MRContainer(null);
-        }
-
-        public boolean next ( MRContainer key, MRContainer value ) throws IOException {
-            index++;
-            value.set(new MR_long(offset+index));
-            key.set(new MR_long(index));
-            return index < size;
-        }
-
-        public long getPos () throws IOException { return index; }
-
-        public void close () throws IOException { reader.close(); }
-
-        public float getProgress () throws IOException {
-            return index / (float)size;
-        }
-    }
-
-    public RecordReader<MRContainer,MRContainer>
-              getRecordReader ( InputSplit split, JobConf job, Reporter reporter ) throws IOException {
-        return (RecordReader<MRContainer,MRContainer>)
-                      new GeneratorRecordReader((FileSplit)split,job);
-    }
-
-    /** Insert all results from the generators stored in path into a Bag.
-     *  The Bag is lazily constructed.
-     * @param path the path directory that contains the generator data (offset,size)
-     * @return a Bag that contains all data
-     */
-    public Bag materialize ( final Path path ) throws IOException {
-        Configuration conf = Plan.conf;
-        FileSystem fs = path.getFileSystem(conf);
-        final SequenceFile.Reader reader = new SequenceFile.Reader(path.getFileSystem(conf),path,conf);
-        final MRContainer key = new MRContainer();
-        final MRContainer value = new MRContainer();
-        return new Bag(new BagIterator () {
-                long offset = 0;
-                long size = 0;
-                long i = 0;
-                public boolean hasNext () {
-                    if (++i >= offset+size)
-                        try {
-                            if (!reader.next(key,value))
-                                return false;
-                            offset = ((MR_long)((Tuple)(value.data())).first()).get();
-                            size = ((MR_long)((Tuple)(value.data())).second()).get();
-                            i = offset;
-                        } catch (IOException e) {
-                            throw new Error("Cannot collect values from a generator");
-                        };
-                    return true;
-                }
-                public MRData next () {
-                    return new MR_long(i);
-                }
-            });
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/17a0ed95/src/main/java/spark/MRQLFileInputFormat.java
----------------------------------------------------------------------
diff --git a/src/main/java/spark/MRQLFileInputFormat.java b/src/main/java/spark/MRQLFileInputFormat.java
deleted file mode 100644
index 73b0910..0000000
--- a/src/main/java/spark/MRQLFileInputFormat.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.mrql;
-
-import java.io.*;
-import java.util.Iterator;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.mapred.*;
-
-
-/** A superclass for all MRQL FileInputFormats */
-abstract public class MRQLFileInputFormat extends FileInputFormat<MRContainer,MRContainer> {
-    public MRQLFileInputFormat () {}
-
-    /** record reader for spark */
-    abstract public RecordReader<MRContainer,MRContainer>
-        getRecordReader ( InputSplit split, JobConf job, Reporter reporter ) throws IOException;
-
-    /** materialize the input file into a memory Bag */
-    public Bag materialize ( final Path file ) throws Exception {
-        final JobConf job = new JobConf(Plan.conf,MRQLFileInputFormat.class);
-        setInputPaths(job,file);
-        final InputSplit[] splits = getSplits(job,1);
-        final Reporter reporter = null;
-        final RecordReader<MRContainer,MRContainer> rd = getRecordReader(splits[0],job,reporter);
-        return new Bag(new BagIterator () {
-                RecordReader<MRContainer,MRContainer> reader = rd;
-                MRContainer key = reader.createKey();
-                MRContainer value = reader.createKey();
-                int i = 0;
-                public boolean hasNext () {
-                    try {
-                        if (reader.next(key,value))
-                            return true;
-                        do {
-                            if (++i >= splits.length)
-                                return false;
-                            reader.close();
-                            reader = getRecordReader(splits[i],job,reporter);
-                        } while (!reader.next(key,value));
-                        return true;
-                    } catch (IOException e) {
-                        throw new Error("Cannot collect values from an intermediate result");
-                    }
-                }
-                public MRData next () {
-                    return value.data();
-                }
-            });
-    }
-
-    /** materialize the entire dataset into a Bag
-     * @param x the DataSet in HDFS to collect values from
-     * @param strip is not used in MapReduce mode
-     * @return the Bag that contains the collected values
-     */
-    public final static Bag collect ( final DataSet x, boolean strip ) throws Exception {
-        Bag res = new Bag();
-        for ( DataSource s: x.source )
-            if (s instanceof RDDDataSource)
-                res = res.union(Evaluator.bag(((RDDDataSource)s).rdd));
-            else if (s.to_be_merged)
-                res = res.union(Plan.merge(s));
-            else res = res.union(s.inputFormat.newInstance().materialize(new Path(s.path)));
-        return res;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/17a0ed95/src/main/java/spark/ParsedInputFormat.java
----------------------------------------------------------------------
diff --git a/src/main/java/spark/ParsedInputFormat.java b/src/main/java/spark/ParsedInputFormat.java
deleted file mode 100644
index 8ca7db6..0000000
--- a/src/main/java/spark/ParsedInputFormat.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.mrql;
-
-import org.apache.mrql.gen.Trees;
-import java.io.*;
-import java.util.Iterator;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.conf.Configuration;
-
-
-/** A FileInputFormat for text files (CVS, XML, JSON, ...) */
-final public class ParsedInputFormat extends MRQLFileInputFormat {
-    public static class ParsedRecordReader implements RecordReader<MRContainer,MRContainer> {
-        final FSDataInputStream fsin;
-        final long start;
-        final long end;
-        Iterator<MRData> result;
-        Parser parser;
-
-        public ParsedRecordReader ( FileSplit split,
-                                    Configuration conf,
-                                    Class<? extends Parser> parser_class,
-                                    Trees args ) throws IOException {
-            start = split.getStart();
-            end = start + split.getLength();
-            Path file = split.getPath();
-            FileSystem fs = file.getFileSystem(conf);
-            fsin = fs.open(split.getPath());
-            try {
-                parser = parser_class.newInstance();
-            } catch (Exception ex) {
-                throw new Error("Unrecognized parser:"+parser_class);
-            };
-            parser.initialize(args);
-            parser.open(fsin,start,end);
-            result = null;
-        }
-
-        public MRContainer createKey () {
-            return new MRContainer();
-        }
-
-        public MRContainer createValue () {
-            return new MRContainer();
-        }
-
-        public synchronized boolean next ( MRContainer key, MRContainer value ) throws IOException {
-            while (result == null || !result.hasNext()) {
-                String s = parser.slice();
-                if (s == null)
-                    return false;
-                result = parser.parse(s).iterator();
-            };
-            value.set((MRData)result.next());
-            key.set(new MR_long(fsin.getPos()));
-            return true;
-        }
-
-        public synchronized long getPos () throws IOException { return fsin.getPos(); }
-
-        public synchronized void close () throws IOException { fsin.close(); }
-
-        public float getProgress () throws IOException {
-            if (end == start)
-                return 0.0f;
-            else return Math.min(1.0f, (getPos() - start) / (float)(end - start));
-        }
-    }
-
-    public RecordReader<MRContainer,MRContainer>
-              getRecordReader ( InputSplit split,
-                                JobConf job,
-                                Reporter reporter ) throws IOException {
-        Evaluator.load_source_dir();  // load the parsed source parameters from a file
-        String path = ((FileSplit)split).getPath().toString();
-        ParsedDataSource ds = (ParsedDataSource)DataSource.get(path,Plan.conf);
-        return new ParsedRecordReader((FileSplit)split,job,ds.parser,(Trees)ds.args);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/17a0ed95/src/main/java/spark/SparkBinaryInputFormat.java
----------------------------------------------------------------------
diff --git a/src/main/java/spark/SparkBinaryInputFormat.java b/src/main/java/spark/SparkBinaryInputFormat.java
new file mode 100644
index 0000000..f11c796
--- /dev/null
+++ b/src/main/java/spark/SparkBinaryInputFormat.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import java.io.*;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapred.*;
+
+
+/** Input format for hadoop sequence files */
+final public class SparkBinaryInputFormat extends SparkMRQLFileInputFormat {
+    public static class BinaryInputRecordReader extends SequenceFileRecordReader<MRContainer,MRContainer> {
+        final MRContainer result = new MRContainer();
+
+        public BinaryInputRecordReader ( FileSplit split,
+                                         JobConf job ) throws IOException {
+            super(job,split);
+        }
+
+        @Override
+        public synchronized boolean next ( MRContainer key, MRContainer value ) throws IOException {
+                boolean b = super.next(key,result);
+                value.set(result.data());
+                return b;
+        }
+    }
+
+    @Override
+    public RecordReader<MRContainer,MRContainer>
+              getRecordReader ( InputSplit split,
+                                JobConf job,
+                                Reporter reporter ) throws IOException {
+        String path = ((FileSplit)split).getPath().toString();
+        BinaryDataSource ds = (BinaryDataSource)DataSource.get(path,job);
+        return new BinaryInputRecordReader((FileSplit)split,job);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/17a0ed95/src/main/java/spark/SparkEvaluator.gen
----------------------------------------------------------------------
diff --git a/src/main/java/spark/SparkEvaluator.gen b/src/main/java/spark/SparkEvaluator.gen
new file mode 100644
index 0000000..54d4da0
--- /dev/null
+++ b/src/main/java/spark/SparkEvaluator.gen
@@ -0,0 +1,713 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import java_cup.runtime.*;
+import org.apache.mrql.gen.*;
+import java_cup.runtime.Scanner;
+import java.util.*;
+import java.io.*;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.FileInputFormat;
+import scala.Tuple2;
+import spark.TaskContext;
+import spark.Partition;
+import spark.Accumulator;
+import spark.broadcast.Broadcast;
+import spark.api.java.JavaRDD;
+import spark.api.java.JavaPairRDD;
+import spark.api.java.JavaSparkContext;
+import spark.api.java.function.Function2;
+import spark.api.java.function.PairFunction;
+import spark.api.java.function.FlatMapFunction;
+import spark.api.java.function.PairFlatMapFunction;
+import spark.api.java.function.Function;
+import spark.api.java.function.VoidFunction;
+
+
+/** Evaluates physical plans in Apache Spark mode */
+final public class SparkEvaluator extends Evaluator implements Serializable {
+    static JavaSparkContext spark_context;
+    final static String spark_jar = SparkEvaluator.class.getProtectionDomain().getCodeSource().getLocation().toString();
+    final static String core_jar = Interpreter.class.getProtectionDomain().getCodeSource().getLocation().toString();
+    final static String gen_jar = Tree.class.getProtectionDomain().getCodeSource().getLocation().toString();
+    final static String scanner_jar = Scanner.class.getProtectionDomain().getCodeSource().getLocation().toString();
+    static String data_source_directory;
+    final static Bag empty_bag = new Bag();
+
+    /** initialize the Spark evaluator */
+    final public void init ( Configuration conf ) {
+        Config.spark_mode = true;
+        try {
+            if (Config.hadoop_mode && Config.local_mode)
+                spark_context = new JavaSparkContext("local["+Config.nodes+"]",
+                                                     "Apache MRQL Local Spark Evaluator");
+            else if (Config.hadoop_mode) {
+                HashMap<String,String> env = new HashMap<String,String>();
+                data_source_directory = System.getenv("FS_DEFAULT_NAME")+"/"+Plan.new_path(conf);
+                env.put("mrql.data.source.directory",data_source_directory);
+                spark_context = new JavaSparkContext(System.getenv("SPARK_MASTER"),
+                                                     "Apache MRQL Spark Evaluator",
+                                                     System.getenv("SPARK_HOME"),
+                                                     new String[]{spark_jar,core_jar,gen_jar,scanner_jar},
+                                                     env);
+                Plan.conf = spark_context.hadoopConfiguration();
+                FileSystem.setDefaultUri(Plan.conf,System.getenv("FS_DEFAULT_NAME"));
+            }
+        } catch (IOException ex) {
+            throw new Error(ex);
+        }
+    }
+
+    /** shutdown the Spark evaluator */
+    final public void shutdown ( Configuration conf ) {
+        spark_context.stop();
+        spark_context = null;
+        System.clearProperty("spark.driver.port");
+    }
+
+    final public void initialize_query () {
+        spark_context.clearJars();
+        spark_context.addJar(spark_jar);
+        spark_context.addJar(core_jar);
+        spark_context.addJar(gen_jar);
+        spark_context.addJar(scanner_jar);
+        Plan.distribute_compiled_arguments(Plan.conf);
+        if (Config.compile_functional_arguments)
+            spark_context.addJar(Plan.conf.get("mrql.jar.path"));
+    }
+
+    final public Configuration new_configuration () {
+        return new Configuration();
+    }
+
+    final public MR_bool synchronize ( MR_string peerName, MR_bool mr_exit ) {
+        throw new Error("You can only synchronize BSP tasks");
+    }
+
+    final public Bag distribute ( MR_string peerName, Bag s ) {
+        throw new Error("You can only distribute bags among BSP tasks");
+    }
+
+    final public Bag collect ( final DataSet x ) throws Exception {
+        return Plan.collect(x);
+    }
+
+    final public MRData bsp ( Tree plan, Environment env ) throws Exception {
+        throw new Error("You can not run a BSP task in Spark mode");
+    }
+
+    /** return the FileInputFormat for parsed files (CSV, XML, JSON, etc) */
+    final public Class<? extends MRQLFileInputFormat> parsedInputFormat () {
+        return SparkParsedInputFormat.class;
+    }
+
+    /** return the FileInputFormat for binary files */
+    final public Class<? extends MRQLFileInputFormat> binaryInputFormat () {
+        return SparkBinaryInputFormat.class;
+    }
+
+    /** return the FileInputFormat for data generator files */
+    final public Class<? extends MRQLFileInputFormat> generatorInputFormat () {
+        return SparkGeneratorInputFormat.class;
+    }
+
+    /** used by the master to send parsing details (eg, record types) to workers */
+    public static void dump_source_dir () throws IOException {
+        if (Config.local_mode)
+            return;
+        DataSource.dataSourceDirectory.distribute(Plan.conf);
+        Path path = new Path(data_source_directory);
+        FileSystem fs = path.getFileSystem(Plan.conf);
+        PrintStream ps = new PrintStream(fs.create(path,true));
+        ps.println(Plan.conf.get("mrql.data.source.directory"));
+        ps.close();
+    }
+
+    /** executed by a worker when reading parsed input (see SparkParsedInputFormat) */
+    public static void load_source_dir () throws IOException {
+        if (Config.local_mode)
+            return;
+        if (Plan.conf == null)
+            Plan.conf = Evaluator.evaluator.new_configuration();
+        // the name of the file that contains the source directory details is passed
+        //   to workers through the HashMap environment in the JavaSparkContext
+        Path path = new Path(System.getenv("mrql.data.source.directory"));
+        FileSystem fs = path.getFileSystem(Plan.conf);
+        BufferedReader ftp = new BufferedReader(new InputStreamReader(fs.open(path)));
+        Plan.conf.set("mrql.data.source.directory",ftp.readLine());
+        DataSource.dataSourceDirectory.read(Plan.conf);
+        ftp.close();
+    }
+
+    private static Function2<MRData,MRData,MRData> accumulator ( Tree acc_fnc, Environment env ) {
+        final org.apache.mrql.Function f = evalF(acc_fnc,env);
+        return new Function2<MRData,MRData,MRData>() {
+            public MRData call ( MRData x, MRData y ) {
+                return f.eval(new Tuple(x,y));
+            }
+        };
+    }
+
+    /** The Aggregate physical operator
+     * @param acc_fnc  the accumulator function from (T,T) to T
+     * @param zero  the zero element of type T
+     * @param plan the plan that constructs the dataset that contains the bag of values {T}
+     * @param env contains bindings fro variables to values (MRData)
+     * @return the aggregation result of type T
+     */
+    final public MRData aggregate ( Tree acc_fnc,
+                                    Tree zero,
+                                    Tree plan,
+                                    Environment env ) throws Exception {
+        Function2<MRData,MRData,MRData> f2 = accumulator(acc_fnc,env);
+        MRData z = evalE(zero,env);
+        match plan {
+        case AggregateMap(`m,`acc,_,`s):
+            return evalD(#<cMap(`m,`s)>,env)
+                     .aggregate(z,accumulator(acc,env),f2);
+        case MapAggregateReduce(`m,`r,`acc,_,`s,`o):
+            if (acc.equals(#<null>))
+                fail;
+            return evalD(#<MapReduce(`m,`r,`s,`o)>,env)
+                     .aggregate(z,accumulator(acc,env),f2);
+        case CrossAggregateProduct(`mx,`my,`r,`acc,_,`x,`y):
+            if (acc.equals(#<null>))
+                fail;
+            return evalD(#<CrossProduct(`mx,`my,`r,`x,`y)>,env)
+                     .aggregate(z,accumulator(acc,env),f2);
+        case MapAggregateReduce2(`mx,`my,`r,`acc,_,`x,`y,`o):
+            if (acc.equals(#<null>))
+                fail;
+            return evalD(#<MapReduce2(`mx,`my,`r,`x,`y,`o)>,env)
+                     .aggregate(z,accumulator(acc,env),f2);
+        case MapAggregateJoin(`mx,`my,`r,`acc,_,`x,`y):
+            if (acc.equals(#<null>))
+                fail;
+            return evalD(#<MapJoin(`mx,`my,`r,`x,`y)>,env)
+                     .aggregate(z,accumulator(acc,env),f2);
+        };
+        throw new Error("Unrecognized aggregation: "+plan);
+    }
+
+    /** Evaluate a loop a fixed # of times */
+    final public Tuple loop ( Tree e, Environment env ) throws Exception {
+        match e {
+        case Loop(lambda(tuple(...vs),tuple(...bs)),tuple(...ss),`num):
+            int limit = ((MR_int)evalE(num,env)).get();
+            MR_rdd[] s = new MR_rdd[vs.length()];
+            for ( int i = 0; i < vs.length(); i++ )
+                s[i] = new MR_rdd(eval(ss.nth(i),env));
+            for ( int n = 0; n < limit; n++ ) {
+                Environment nenv = env;
+                for ( int i = 0; i < vs.length(); i ++ )
+                    nenv = new Environment(vs.nth(i).toString(),s[i],nenv);
+                for ( int i = 0; i < vs.length(); i ++ )
+                    s[i] = new MR_rdd(eval(bs.nth(i),nenv));
+            };
+            return new Tuple(s);
+        };
+        throw new Error("Wrong Loop format");
+    }
+
+    private static Bag bag ( final List<MRData> s ) {
+        final Iterator<MRData> i = s.iterator();
+        return new Bag(new BagIterator() {
+                public MRData next () {
+                    return i.next();
+                }
+                public boolean hasNext () {
+                    return i.hasNext();
+                }
+            });
+    }
+
+    final static TaskContext context = new TaskContext(0,0,0,null);
+
+    /** Convert a Spark RDD into a lazy bag
+     * @param rdd the Spark RDD
+     * @return a lazy bag that contains all RDD elements
+     */
+    public static Bag bag ( final JavaRDD<MRData> rdd ) {
+        final JavaRDD<MRData> rd = rdd.cache();
+        rd.count();  // force the evaluation of all pending Spark operations
+        final List<Partition> ps = rd.splits();
+        return new Bag(new BagIterator() {
+                Iterator<MRData> i = null;
+                int c = 0;
+                public MRData next () {
+                    return i.next();
+                }
+                public boolean hasNext () {
+                    do {
+                        if (i != null && i.hasNext()) 
+                            return true;
+                        if (c >= ps.size())
+                            return false;
+                        i = rdd.iterator(ps.get(c++),context);
+                    } while (true);
+                }
+            });
+    }
+
+    private final static Function<MRData,MRData> get_first
+        = new Function<MRData,MRData>() {
+                   public MRData call ( MRData value ) {
+                       return ((Tuple)value).first();
+                   };
+    };
+
+    /** Evaluate an MRQL physical plan using Spark
+     * @param e the physical plan
+     * @param env contains bindings fro variables to values (MRData)
+     * @param counter the name of the counter used in loops
+     * @return a DataSet
+     */
+    final public DataSet eval ( final Tree e,
+                                final Environment env,
+                                final String counter ) {
+        JavaRDD<MRData> rd = eval(e,env);
+        int count = 0;
+        if (!counter.equals("-")) {
+            final Accumulator<Integer> c = spark_context.intAccumulator(0);
+            rd = rd.cache();
+            rd.foreach(new VoidFunction<MRData>() {
+                    public void call ( MRData value ) {
+                        if (((MR_bool)((Tuple)value).second()).get())
+                            c.add(1);    // count the true results (the results that need another loop step)
+                    }
+                });
+            count = c.value();
+            rd = rd.map(get_first);
+        };
+        return new DataSet(new RDDDataSource(rd),count,0);
+    }
+
+    /** Evaluate an MRQL physical plan using Spark and print tracing info
+     * @param e the physical plan
+     * @param env contains bindings from variables to values (MRData)
+     * @return a Spark RDD
+     */
+    final public JavaRDD<MRData> eval ( final Tree e, final Environment env ) {
+        if (Config.trace_execution) {
+            tab_count += 3;
+            System.out.println(tabs(tab_count)+print_query(e));
+        };
+        final JavaRDD<MRData> res = evalD(e,env);
+        if (Config.trace_execution) 
+            try {
+                System.out.println(tabs(tab_count)+"-> "+res.collect());
+                tab_count -= 3;
+            } catch (Exception ex) {
+                throw new Error("Cannot collect the operator output: "+e);
+            };
+        return res;
+    }
+
+    /* convert an MRQL lambda to a Spark Function */
+    private static FlatMapFunction<MRData,MRData> cmap_fnc ( Tree fnc, Environment env ) {
+        final org.apache.mrql.Function f = evalF(fnc,env);
+        return new FlatMapFunction<MRData,MRData>() {
+            public Iterable<MRData> call ( MRData value ) {
+                return (Bag)f.eval(value);
+            }
+        };
+    }
+
+    /* group-by s and then reduce by fnc; if o is true, sort after group-by */
+    private static JavaRDD<MRData> groupBy ( JavaRDD<MRData> s, Tree fnc, Environment env, Tree o ) {
+        match o {
+        case true:   // the result must be sorted
+            return s.groupBy(get_first)
+                .sortByKey()
+                .map(new Function<Tuple2<MRData,List<MRData>>,MRData>() {
+                        public MRData call ( Tuple2<MRData,List<MRData>> value ) {
+                            return new Tuple(value._1,bag(value._2));
+                        }
+                    })
+                .flatMap(cmap_fnc(fnc,env)).map(new Function<MRData,MRData>() {
+                        public MRData call ( MRData value ) {
+                            return ((Tuple)value).second();
+                        }
+                    });
+        };
+        return s.groupBy(get_first)
+            .map(new Function<Tuple2<MRData,List<MRData>>,MRData> () {
+                    public MRData call ( Tuple2<MRData,List<MRData>> value ) {
+                        final Iterator<MRData> i = value._2.iterator();
+                        return new Tuple(value._1,new Bag(new BagIterator() {
+                                public MRData next () {
+                                    return ((Tuple)i.next()).second();
+                                }
+                                public boolean hasNext () {
+                                    return i.hasNext();
+                                }
+                            }));
+                    }
+                })
+            .flatMap(cmap_fnc(fnc,env));
+    }
+
+    private static JavaRDD<MRData> containerData ( JavaPairRDD<MRContainer,MRContainer> rd ) {
+        final Environment env = Interpreter.global_env;
+        return rd.map(new Function<Tuple2<MRContainer,MRContainer>,MRData>() {
+                // need to pass the global bindings (the in-memory repeat vars) to workers
+                boolean first = true;
+                public MRData call ( Tuple2<MRContainer,MRContainer> value ) {
+                    if (first) {
+                        // need to pass the global bindings (the in-memory repeat vars) to workers
+                        Interpreter.set_global_bindings(env);
+                        first = false;
+                    };
+                    return value._2.data();
+                }
+            });
+    }
+
+    private static Iterable<Tuple2<MRData,MRData>> joinIterator ( final Iterator<MRData> i ) {
+        return new Iterable<Tuple2<MRData,MRData>>() {
+            public Iterator<Tuple2<MRData,MRData>> iterator() {
+                return new Iterator<Tuple2<MRData,MRData>> () {
+                    public Tuple2<MRData,MRData> next () {
+                        Tuple data = (Tuple)i.next();
+                        return new Tuple2<MRData,MRData>(data.first(),data.second());
+                    }
+                    public boolean hasNext () {
+                        return i.hasNext();
+                    }
+                    public void remove () {}
+                };
+            }
+        };
+    }
+
+    private static FlatMapFunction<Iterator<MRData>,MRData> combiner_fnc ( final org.apache.mrql.Function f ) {
+        return new FlatMapFunction<Iterator<MRData>,MRData>() {
+                  public Iterable<MRData> call ( final Iterator<MRData> i ) {
+                      return MapReduceAlgebra.cmap(new org.apache.mrql.Function() {
+                              public MRData eval ( MRData x ) {
+                                  final MRData key = ((Tuple)x).first();
+                                  final Iterator<MRData> it = ((Bag)f.eval(x)).iterator();
+                                  return new Bag(new BagIterator() {
+                                          public MRData next () {
+                                              return new Tuple(key,it.next());
+                                          }
+                                          public boolean hasNext () {
+                                              return it.hasNext();
+                                          }
+                                      });
+                              }},
+                          MapReduceAlgebra.groupBy(new Bag(new BagIterator() {
+                                  public MRData next () {
+                                      return i.next();
+                                  }
+                                  public boolean hasNext () {
+                                      return i.hasNext();
+                                  }
+                              })));
+                  }
+        };
+    }
+
+    private static Hashtable<MRData,Bag> make_built_table ( List<Tuple2<MRData,MRData>> values ) {
+        Hashtable<MRData,Bag> built_table = new Hashtable<MRData,Bag>(Config.map_cache_size);
+        for ( Tuple2<MRData,MRData> t: values ) {
+            Bag entries = built_table.get(t._1);
+            built_table.put(t._1,
+                            (entries == null)
+                            ? (new Bag(t._2))
+                            : entries.add_element(t._2));
+        };
+        return built_table;
+    }
+
+    /** Evaluate MRQL physical operators using Spark
+     * @param e the physical plan
+     * @param env contains bindings fro variables to values (MRData)
+     * @return a Spark RDD
+     */
+    final public JavaRDD<MRData> evalD ( final Tree e, final Environment env ) {
+        try {
+            match e {
+            case MapAggregateReduce(`m,`r,null,_,`s,`o):
+                return evalD(#<MapReduce(`m,`r,`s,`o)>,env);
+            case CrossAggregateProduct(`mx,`my,`r,null,_,`x,`y):
+                return evalD(#<CrossProduct(`mx,`my,`r,`x,`y)>,env);
+            case MapAggregateReduce2(`mx,`my,`r,null,_,`x,`y,`o):
+                return evalD(#<MapReduce2(`mx,`my,`r,`x,`y,`o)>,env);
+            case MapAggregateJoin(`mx,`my,`r,null,_,`x,`y):
+                return evalD(#<MapJoin(`mx,`my,`r,`x,`y)>,env);
+            case cMap(`f,`s):
+                return eval(s,env).flatMap(cmap_fnc(f,env));
+            case MapReduce(`m,`r,`s,`o):
+                return groupBy(eval(s,env).flatMap(cmap_fnc(m,env)),r,env,o);
+            case MapCombineReduce(`m,`c,`r,`s,`o):
+                return groupBy(eval(s,env).flatMap(cmap_fnc(m,env))
+                               .mapPartitions(combiner_fnc(evalF(c,env))),r,env,o);
+            case MapCombineReduce2(`mx,`my,`c,`r,`x,`y,`o):
+                return evalD(#<MapReduce2(`mx,`my,`r,`x,`y,`o)>,env).mapPartitions(combiner_fnc(evalF(c,env)));
+            case CrossProduct(`mx,`my,`r,`x,`y):
+                final org.apache.mrql.Function fr = evalF(r,env);
+                return eval(x,env)
+                    .flatMap(cmap_fnc(mx,env))
+                    .cartesian(eval(y,env).flatMap(cmap_fnc(my,env)))
+                    .flatMap(new FlatMapFunction<Tuple2<MRData,MRData>,MRData>() {
+                            public Iterable<MRData> call ( Tuple2<MRData,MRData> value ) {
+                                return (Bag)fr.eval(new Tuple(value._1,value._2));
+                            }
+                        });
+            case MapReduce2(`mx,`my,`r,`x,`y,`o):
+                final org.apache.mrql.Function fx = evalF(mx,env);
+                final org.apache.mrql.Function fy = evalF(my,env);
+                final org.apache.mrql.Function fr = evalF(r,env);
+                JavaPairRDD<MRData,MRData> xs
+                    = eval(x,env).flatMap(new PairFlatMapFunction<MRData,MRData,MRData>() {
+                            public Iterable<Tuple2<MRData,MRData>> call ( MRData value ) {
+                                return joinIterator(((Bag)fx.eval(value)).iterator());
+                            }
+                        });
+                JavaPairRDD<MRData,MRData> ys
+                    = eval(y,env).flatMap(new PairFlatMapFunction<MRData,MRData,MRData>() {
+                            public Iterable<Tuple2<MRData,MRData>> call ( MRData value ) {
+                                return joinIterator(((Bag)fy.eval(value)).iterator());
+                            }
+                        });
+                return xs.cogroup(ys)
+                    .flatMap(new FlatMapFunction<Tuple2<MRData,Tuple2<List<MRData>,List<MRData>>>,MRData>() {
+                            public Iterable<MRData> call ( Tuple2<MRData,Tuple2<List<MRData>,List<MRData>>> value ) {
+                                return (Bag)fr.eval(new Tuple(bag(value._2._1),bag(value._2._2)));
+                            }
+                        });
+            case GroupByJoin(`kx,`ky,`gx,`gy,`mp,`c,`r,`x,`y,`o):
+                final int n = 10;
+                final int m = 10;
+                final org.apache.mrql.Function fkx = evalF(kx,env);
+                final org.apache.mrql.Function fky = evalF(ky,env);
+                final org.apache.mrql.Function fgx = evalF(gx,env);
+                final org.apache.mrql.Function fgy = evalF(gy,env);
+                final org.apache.mrql.Function fm = evalF(mp,env);
+                final org.apache.mrql.Function fc = evalF(c,env);
+                final org.apache.mrql.Function fr = evalF(r,env);
+                JavaPairRDD<MRData,MRData> xs
+                    = eval(x,env).flatMap(new PairFlatMapFunction<MRData,MRData,MRData>() {
+                            public Iterable<Tuple2<MRData,MRData>> call ( final MRData value ) {
+                                return new Iterable<Tuple2<MRData,MRData>>() {
+                                    public Iterator<Tuple2<MRData,MRData>> iterator() {
+                                        return new Iterator<Tuple2<MRData,MRData>>() {
+                                            int i = 0;
+                                            public Tuple2<MRData,MRData> next () {
+                                                MRData key = new MR_int((fgx.eval(value).hashCode() % m)+m*i);
+                                                i++;
+                                                return new Tuple2<MRData,MRData>(key,new Tuple(fkx.eval(value),value));
+                                            }
+                                            public boolean hasNext () {
+                                                return i < n;
+                                            }
+                                            public void remove () {}
+                                        };
+                                    }
+                                };
+                            }
+                        });
+                JavaPairRDD<MRData,MRData> ys
+                    = eval(y,env).flatMap(new PairFlatMapFunction<MRData,MRData,MRData>() {
+                            public Iterable<Tuple2<MRData,MRData>> call ( final MRData value ) {
+                                return new Iterable<Tuple2<MRData,MRData>>() {
+                                    public Iterator<Tuple2<MRData,MRData>> iterator() {
+                                        return new Iterator<Tuple2<MRData,MRData>>() {
+                                            int i = 0;
+                                            public Tuple2<MRData,MRData> next () {
+                                                MRData key = new MR_int((fgy.eval(value).hashCode() % n)*m+i);
+                                                i++;
+                                                return new Tuple2<MRData,MRData>(key,new Tuple(fky.eval(value),value));
+                                            }
+                                            public boolean hasNext () {
+                                                return i < m;
+                                            }
+                                            public void remove () {}
+                                        };
+                                    }
+                                };
+                            }
+                        });
+                return xs.cogroup(ys)
+                    .flatMap(new FlatMapFunction<Tuple2<MRData,Tuple2<List<MRData>,List<MRData>>>,MRData>() {
+                            public Iterable<MRData> call ( Tuple2<MRData,Tuple2<List<MRData>,List<MRData>>> value ) {
+                                final Iterator<MRData> i = (MapReduceAlgebra.mergeGroupByJoin(fkx,fky,fgx,fgy,fm,fc,fr,
+                                                                        bag(value._2._1),bag(value._2._2))).iterator();
+                                return new Iterable<MRData>() {
+                                    public Iterator<MRData> iterator() {
+                                        return new Iterator<MRData>() {
+                                            public MRData next () {
+                                                return i.next();
+                                            }
+                                            public boolean hasNext () {
+                                                return i.hasNext();
+                                            }
+                                            public void remove () {}
+                                        };
+                                    }
+                                };
+                            }
+                        });
+            case MapJoin(`mx,`my,`r,`x,`y):
+                final org.apache.mrql.Function fx = evalF(mx,env);
+                final org.apache.mrql.Function fy = evalF(my,env);
+                final org.apache.mrql.Function fr = evalF(r,env);
+                final Broadcast<List<Tuple2<MRData,MRData>>> ys
+                    = spark_context.broadcast(eval(y,env).flatMap(new PairFlatMapFunction<MRData,MRData,MRData>() {
+                                public Iterable<Tuple2<MRData,MRData>> call ( MRData value ) {
+                                    return joinIterator(((Bag)fy.eval(value)).iterator());
+                                }
+                            }).collect());
+                return eval(x,env).flatMap(new FlatMapFunction<MRData,MRData>() {
+                        final Hashtable<MRData,Bag> built_table = make_built_table(ys.value());
+                        public Iterable<MRData> call ( MRData value ) {
+                            final Iterator<MRData> i = ((Bag)fx.eval(value)).iterator();
+                            return new Iterable<MRData>() {
+                                public Iterator<MRData> iterator() {
+                                    return new Iterator<MRData>() {
+                                        Tuple p;
+                                        Iterator<MRData> ix = null;
+                                        public MRData next () {
+                                            return ix.next();
+                                        }
+                                        public boolean hasNext () {
+                                            if (ix != null && ix.hasNext())
+                                                return true;
+                                            while (i.hasNext()) {
+                                                p = (Tuple)i.next();
+                                                MRData pd = built_table.get(p.first());
+                                                Bag bb = ((Bag)fr.eval(new Tuple(p.second(),(pd == null) ? empty_bag : pd)));
+                                                ix = bb.iterator();
+                                                if (ix.hasNext())
+                                                    return true;
+                                            };
+                                            return false;
+                                        }
+                                        public void remove () {}
+                                    };
+                                }
+                            };
+                        }
+                    });
+            case BinarySource(`file,_):
+                String path = ((MR_string)evalE(file,env)).get();
+                new BinaryDataSource(path,Plan.conf);
+                return containerData(spark_context.sequenceFile(file.stringValue(),
+                                                                MRContainer.class,MRContainer.class,
+                                                                Config.nodes));
+            case ParsedSource(`parser,`file,...args):
+                String path = ((MR_string)evalE(file,env)).get();
+                Class<? extends Parser> p = DataSource.parserDirectory.get(parser.toString());
+                if (p == null)
+                    throw new Error("Unknown parser: "+parser);
+                new ParsedDataSource(path,p,args,Plan.conf);
+                dump_source_dir();
+                return containerData(spark_context.hadoopFile(path,SparkParsedInputFormat.class,
+                                                              MRContainer.class,MRContainer.class,
+                                                              Config.nodes));
+            case Merge(`x,`y):
+                return eval(x,env).union(eval(y,env));
+            case Repeat(lambda(`v,`b),`s,`n):
+                int max_num = ((MR_int)evalE(n,env)).get();
+                JavaRDD<MRData> rd;
+                JavaRDD<MRData> res = eval(s,env).cache();
+                int i = 0;
+                boolean cont = true;
+                do {
+                    rd = eval(b,new Environment(v.toString(),new MR_rdd(res),env)).cache();
+                    res = rd.map(get_first).cache();
+                    Integer true_results
+                        = rd.aggregate(new Integer(0),
+                                       new Function2<Integer,MRData,Integer>() {
+                                           public Integer call ( Integer x, MRData y ) {
+                                               return (((MR_bool)((Tuple)y).second()).get()) ? x+1 : x;
+                                           }
+                                       },
+                                       new Function2<Integer,Integer,Integer>() {
+                                           public Integer call ( Integer x, Integer y ) { return x+y; }
+                                       });
+                    i++;
+                    cont = true_results > 0 && i <= max_num;
+                    System.err.println("*** Repeat #"+i+": "+true_results+" true results");
+                } while (cont);
+                return res;
+            case Closure(lambda(`v,`b),`s,`m):
+                int max_num = ((MR_int)evalE(m,env)).get();
+                JavaRDD<MRData> res = eval(s,env).cache();
+                long n = 0;
+                long old = 0;
+                int i = 0;
+                boolean cont = true;
+                do {
+                    res = eval(b,new Environment(v.toString(),new MR_rdd(res),env)).cache();
+                    old = n;
+                    n = res.count();
+                    i++;
+                    System.err.println("*** Repeat #"+i+": "+(old-n)+" new records");
+                } while (old < n && i <= max_num);
+                return res;
+            case Generator(`min,`max,`size):
+                DataSet ds = Plan.generator(((MR_long)evalE(min,env)).get(),
+                                            ((MR_long)evalE(max,env)).get(),
+                                            ((MR_long)evalE(size,env)).get());
+                JavaRDD<MRData> rd = null;
+                for ( DataSource d: ds.source )
+                    if (rd == null)
+                        rd = containerData(spark_context.hadoopFile(d.path,SparkGeneratorInputFormat.class,
+                                                                    MRContainer.class,MRContainer.class,1));
+                    else rd = rd.union(containerData(spark_context.hadoopFile(d.path,SparkGeneratorInputFormat.class,
+                                                                              MRContainer.class,MRContainer.class,1)));
+                return rd;
+            case let(`v,`u,`body):
+                return eval(body,new Environment(v.toString(),evalE(u,env),env));
+            case Let(`v,`u,`body):
+                return eval(body,new Environment(v.toString(),new MR_rdd(eval(u,env)),env));
+            case If(`c,`x,`y):
+                if (((MR_bool)evalE(c,env)).get())
+                    return eval(x,env);
+                else return eval(y,env);
+            case `v:
+                if (!v.is_variable())
+                    fail;
+                MRData x = variable_lookup(v.toString(),env);
+                if (x != null)
+                    if (x instanceof MR_rdd)
+                        return ((MR_rdd)x).rdd();
+                x = variable_lookup(v.toString(),global_env);
+                if (x != null)
+                    if (x instanceof MR_rdd)
+                        return ((MR_rdd)x).rdd();
+                throw new Error("Variable "+v+" is not bound");
+            };
+            throw new Error("Cannot evaluate the Spark plan: "+e);
+        } catch (Error msg) {
+            if (!Config.trace)
+                throw new Error(msg.getMessage());
+            System.err.println(msg.getMessage());
+            throw new Error("Evaluation error in: "+print_query(e));
+        } catch (Exception ex) {
+            System.err.println(ex.getMessage());
+            ex.printStackTrace();
+            throw new Error("Evaluation error in: "+print_query(e));
+        }
+    }
+}