You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mrql.apache.org by fe...@apache.org on 2014/03/13 15:24:49 UTC

[24/26] MRQL-32: Refactoring directory structure for Eclipse

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/core/src/main/java/org/apache/mrql/Bag.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/Bag.java b/core/src/main/java/org/apache/mrql/Bag.java
new file mode 100644
index 0000000..b092f30
--- /dev/null
+++ b/core/src/main/java/org/apache/mrql/Bag.java
@@ -0,0 +1,578 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import java.util.*;
+import java.io.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.fs.*;
+
+
+/**
+ *   A sequence of MRData.
+ *   There are 3 kinds of Bag implementations, which are converted at run-time, when necessary:
+ *   1) vector-based (materialized): used for small bags (when size is less than Config.max_materialized_bag);
+ *   2) stream-based: can be traversed only once; implemented as Java iterators;
+ *   3) spilled to a local file: can be accessed multiple times
+ */
+public class Bag extends MRData implements Iterable<MRData> {
+    private final static long serialVersionUID = 64629834894869L;
+    enum Modes { STREAMED, MATERIALIZED, SPILLED };
+    private transient Modes mode;
+    private transient ArrayList<MRData> content;      // content of a materialized bag
+    private transient BagIterator iterator;           // iterator for a streamed bag
+    private transient boolean consumed;               // true, if the stream has already been used
+    private transient String path;                    // local path that contains the spilled bag
+    private transient SequenceFile.Writer writer;     // the file writer for spiled bags
+
+    /**
+     * create an empty bag as an ArrayList
+     */
+    public Bag () {
+        mode = Modes.MATERIALIZED;
+        content = new ArrayList<MRData>();
+    }
+
+    /**
+     * create an empty bag as an ArrayList with a given capacity
+     * @param size initial capacity
+     */
+    public Bag ( final int size ) {
+        mode = Modes.MATERIALIZED;
+        content = new ArrayList<MRData>(size);
+    }
+
+    /**
+     * in-memory Bag construction (an ArrayList) initialized with data
+     * @param as a vector of MRData to insert in the Bag
+     */
+    public Bag ( final MRData ...as ) {
+        mode = Modes.MATERIALIZED;
+        content = new ArrayList<MRData>(as.length);
+        for ( MRData a: as )
+            content.add(a);
+    }
+
+    /**
+     * in-memory Bag construction (an ArrayList) initialized with data
+     * @param as a vector of MRData to insert in the Bag
+     */
+    public Bag ( final List<MRData> as ) {
+        mode = Modes.MATERIALIZED;
+        content = new ArrayList<MRData>(as.size());
+        for ( MRData a: as )
+            content.add(a);
+    }
+
+    /**
+     * lazy construction (stream-based) of a Bag
+     * @param i the Iterator that generates the Bag elements
+     */
+    public Bag ( final BagIterator i ) {
+        mode = Modes.STREAMED;
+        iterator = i;
+        consumed = false;
+    }
+
+    /** is the Bag stored in an ArrayList? */
+    public boolean materialized () {
+        return mode == Modes.MATERIALIZED;
+    }
+
+    /** is the Bag stream-based? */
+    public boolean streamed () {
+        return mode == Modes.STREAMED;
+    }
+
+    /** is the Bag spilled into a file? */
+    public boolean spilled () {
+        return mode == Modes.SPILLED;
+    }
+
+    /** return the Bag size (cache it in memory if necessary) */
+    public int size () {
+        if (materialized())
+            return content.size();
+        if (streamed() && consumed)
+            throw new Error("*** The collection stream has already been consumed");
+        int i = 0;
+        for ( MRData e: this )
+            i++;
+        if (streamed())
+            consumed = true;
+        return i;
+    }
+
+    /** trim the ArrayList that caches the Bag */
+    public void trim () {
+        if (materialized())
+            content.trimToSize();
+    }
+
+    /** get the n'th element of a Bag (cache it in memory if necessary)
+     * @param n the index
+     * @return the n'th element
+     */
+    public MRData get ( final int n ) {
+        if (materialized())
+            if (n < size())
+                return content.get(n);
+            else throw new Error("List index out of range: "+n);
+        if (streamed() && consumed)
+            throw new Error("*** The collection stream has already been consumed");
+        int i = 0;
+        for ( MRData e: this )
+            if (i++ == n)
+                return e;
+        if (streamed())
+            consumed = true;
+        throw new Error("Cannot retrieve the "+n+"th element of a sequence");
+    }
+
+    /** replace the n'th element of a Bag with a new value
+     * @param n the index
+     * @param value the new value
+     * @return the Bag
+     */
+    public Bag set ( final int n, final MRData value ) {
+        if (!materialized())
+            throw new Error("Cannot replace an element of a non-materialized sequence");
+        content.set(n,value);
+        return this;
+    }
+
+    /** add a new value to a Bag (cache it in memory if necessary)
+     * @param x the new value
+     */
+    public void add ( final MRData x ) {
+        materialize();
+        if (!spilled() && Config.hadoop_mode
+             && size() >= Config.max_materialized_bag)
+            spill();
+        if (spilled())
+            try {
+                if (writer == null) {   // writer was closed earlier for reading
+                    FileSystem fs = FileSystem.getLocal(Plan.conf);
+                    writer = SequenceFile.createWriter(fs,Plan.conf,new Path(path),
+                                                       MRContainer.class,NullWritable.class,
+                                                       SequenceFile.CompressionType.NONE);
+                    System.err.println("*** Appending elements to a spilled Bag: "+path);
+                };
+                writer.append(new MRContainer(x),NullWritable.get());
+            } catch (IOException e) {
+                throw new Error("Cannot append an element to a spilled Bag: "+path);
+            }
+        else content.add(x);
+    }
+
+    /** add a new value to a Bag (cache it in memory if necessary)
+     * @param x the new value
+     * @return the Bag
+     */
+    public Bag add_element ( final MRData x ) {
+        add(x);
+        return this;
+    }
+
+    /** add the elements of a Bag to the end of this Bag
+     * @param b the Bag whose elements are copied
+     * @return the Bag
+     */
+    public Bag addAll ( final Bag b ) {
+        for ( MRData e: b )
+            add(e);
+        return this;
+    }
+
+    /** make this Bag empty (cache it in memory if necessary) */
+    public void clear () {
+        if (materialized())
+            content.clear();
+        else if (streamed()) {
+            if (writer != null)
+                try {
+                    writer.close();
+                } catch (IOException ex) {
+                    throw new Error(ex);
+                };
+            writer = null;
+            path = null;
+            mode = Modes.MATERIALIZED;
+            content = new ArrayList<MRData>(100);
+        };
+        mode = Modes.MATERIALIZED;
+        content = new ArrayList<MRData>();
+    }
+
+    /** cache the Bag to an ArrayList when is absolutely necessary */
+    public void materialize () {
+        if (materialized() || spilled())
+            return;
+        Iterator<MRData> iter = iterator();
+        mode = Modes.MATERIALIZED;
+        writer = null;
+        path = null;
+        content = new ArrayList<MRData>(100);
+        while ( iter.hasNext() )
+            add(iter.next());
+        if (materialized())    // it may have been spilled
+            content.trimToSize();
+        iterator = null;
+    }
+
+    private static Random random_generator = new Random();
+
+    private static String new_path ( FileSystem fs ) throws IOException {
+        Path p;
+        do {
+            p = new Path("file://"+Config.tmpDirectory+"/mrql"+(random_generator.nextInt(1000000)));
+        } while (p.getFileSystem(Plan.conf).exists(p));
+        String path = p.toString();
+        Plan.temporary_paths.add(path);
+        return path;
+    }
+
+    /** spill the Bag to a local file */
+    private void spill () {
+        if (!spilled() && Config.hadoop_mode)
+            try {
+                if (Plan.conf == null)
+                    Plan.conf = Evaluator.evaluator.new_configuration();
+                final FileSystem fs = FileSystem.getLocal(Plan.conf);
+                path = new_path(fs);
+                System.err.println("*** Spilling a Bag to a local file: "+path);
+                final Path p = new Path(path);
+                writer = SequenceFile.createWriter(fs,Plan.conf,new Path(path),
+                                                   MRContainer.class,NullWritable.class,
+                                                   SequenceFile.CompressionType.NONE);
+                for ( MRData e: this )
+                    writer.append(new MRContainer(e),NullWritable.get());
+                mode = Modes.SPILLED;
+                content = null;
+                iterator = null;
+            } catch (Exception e) {
+                throw new Error("Cannot spill a Bag to a local file");
+            }
+    }
+
+    /**
+     * sort the Bag (cache it in memory if necessary).
+     * If the Bag was spilled during caching, use external sorting
+     */
+    public void sort () {
+        materialize();
+        if (spilled())  // if it was spilled during materialize()
+            try {       // use external sorting
+                if (writer != null)
+                    writer.close();
+                FileSystem fs = FileSystem.getLocal(Plan.conf);
+                SequenceFile.Sorter sorter
+                    = new SequenceFile.Sorter(fs,new Plan.MRContainerKeyComparator(),
+                                              MRContainer.class,NullWritable.class,Plan.conf);
+                String out_path = new_path(fs);
+                System.err.println("*** Using external sorting on a spilled bag "+path+" -> "+out_path);
+                sorter.setMemory(64*1024*1024);
+                sorter.sort(new Path(path),new Path(out_path));
+                path = out_path;
+                writer = null;
+            } catch (Exception ex) {
+                throw new Error("Cannot sort a spilled bag");
+            }
+        else Collections.sort(content);
+    }
+
+    /** return the Bag Iterator */
+    public Iterator<MRData> iterator () {
+        if (spilled())
+            try {
+                if (writer != null)
+                    writer.close();
+                writer = null;
+                return new BagIterator () {
+                    final FileSystem fs = FileSystem.getLocal(Plan.conf);
+                    final SequenceFile.Reader reader = new SequenceFile.Reader(fs,new Path(path),Plan.conf);
+                    final MRContainer key = new MRContainer();
+                    final NullWritable value = NullWritable.get();
+                    MRData data;
+                    public boolean hasNext () {
+                        try {
+                            if (!reader.next(key,value)) {
+                                reader.close();
+                                return false;
+                            };
+                            data = key.data();
+                            return true;
+                        } catch (IOException e) {
+                            throw new Error("Cannot collect values from a spilled Bag");
+                        }
+                    }
+                    public MRData next () {
+                        return data;
+                    }
+                };
+            } catch (IOException e) {
+                throw new Error("Cannot collect values from a spilled Bag");
+            }
+        else if (materialized())
+            return content.iterator();
+        else {
+            if (consumed)  // this should never happen
+                throw new Error("*** The collection stream has already been consumed");
+            consumed = true;
+            return iterator;
+        }
+    }
+
+    /** cache MRData in memory by caching all Bags at any place and depth in MRData */
+    public void materializeAll () {
+        materialize();
+        for (MRData e: this)
+            e.materializeAll();
+    }
+
+    /** concatenate the elements of a given Bag to the elements of this Bag.
+     * Does not change either Bag
+     * @param s the given Bag
+     * @return a new Bag
+     */
+    public Bag union ( final Bag s ) {
+        final Iterator<MRData> i1 = iterator();
+        final Iterator<MRData> i2 = s.iterator();
+        return new Bag(new BagIterator () {
+                boolean first = true;
+                public boolean hasNext () {
+                    if (first)
+                        if (i1.hasNext())
+                            return true;
+                        else {
+                            first = false;
+                            return i2.hasNext();
+                        }
+                    else return i2.hasNext();
+                }
+                public MRData next () {
+                    if (first)
+                        return i1.next();
+                    else return i2.next();
+                }
+            });
+    }
+
+    /** does this Bag contain an element?
+     * Cache this Bag in memory befor tetsing if necessary
+     * @param x the element to find
+     */
+    public boolean contains ( final MRData x ) {
+        if (materialized())
+            return content.contains(x);
+        if (streamed() && consumed)
+            throw new Error("*** The collection stream has already been consumed");
+        for ( MRData e: this )
+            if (x.equals(e))
+                return true;
+        if (streamed())
+            consumed = true;
+        return false;
+    }
+
+    /** if this Bag is a Map from keys to values (a Bag of (key,value) pairs),
+     * find the value with the given key; raise an error if not found
+     * @param key the search key
+     * @return the value associated with the key
+     */
+    public MRData map_find ( final MRData key ) {
+        if (streamed() && consumed)
+            throw new Error("*** The collection stream has already been consumed");
+        for ( MRData e: this ) {
+            Tuple p = (Tuple) e;
+            if (key.equals(p.first()))
+                return p.second();
+        };
+        if (streamed())
+            consumed = true;
+        throw new Error("key "+key+" not found in map");
+    }
+
+    /** if this Bag is a Map from keys to values (a Bag of (key,value) pairs),
+     * does it contain a given key?
+     * @param key the search key
+     */
+    public boolean map_contains ( final MRData key ) {
+        if (streamed() && consumed)
+            throw new Error("*** The collection stream has already been consumed");
+        for ( MRData e: this )
+            if (key.equals(((Tuple)e).first()))
+                return true;
+        if (streamed())
+            consumed = true;
+        return false;
+    }
+
+    /** the output serializer for Bag.
+     * Stream-based Bags are serialized lazily (without having to cache the Bag in memory)
+     */
+    final public void write ( DataOutput out ) throws IOException {
+        if (materialized()) {
+            out.writeByte(MRContainer.BAG);
+            WritableUtils.writeVInt(out,size());
+            for ( MRData e: this )
+                e.write(out);
+        } else {
+            out.writeByte(MRContainer.LAZY_BAG);
+            for ( MRData e: this )
+                e.write(out);
+            out.writeByte(MRContainer.END_OF_LAZY_BAG);
+        }
+    }
+
+    /** the input serializer for Bag */
+    final public static Bag read ( DataInput in ) throws IOException {
+        int n = WritableUtils.readVInt(in);
+        Bag bag = new Bag(n);
+        for ( int i = 0; i < n; i++ )
+            bag.add(MRContainer.read(in));
+        return bag;
+    }
+
+    /** a lazy input serializer for a Bag (it doesn't need to cache a Bag in memory) */
+    public static Bag lazy_read ( final DataInput in ) throws IOException {
+        Bag bag = new Bag(100);
+        MRData data = MRContainer.read(in);
+        while (data != MRContainer.end_of_lazy_bag) {
+            bag.add(data);
+            data = MRContainer.read(in);
+        };
+        if (bag.materialized())
+            bag.content.trimToSize();
+        return bag;
+    }
+
+    /** the input serializer for Bag */
+    public void readFields ( DataInput in ) throws IOException {
+        int n = WritableUtils.readVInt(in);
+        mode = Modes.MATERIALIZED;
+        iterator = null;
+        path = null;
+        writer = null;
+        if (content == null)
+            content = new ArrayList<MRData>(n);
+        else {
+            content.clear();
+            content.ensureCapacity(n);
+        };
+        for ( int i = 0; i < n; i++ )
+            add(MRContainer.read(in));
+    }
+
+    private void writeObject ( ObjectOutputStream out ) throws IOException {
+        materialize();
+        WritableUtils.writeVInt(out,size());
+        for ( MRData e: this )
+            e.write(out);
+    }
+
+    private void readObject ( ObjectInputStream in ) throws IOException, ClassNotFoundException {
+        int n = WritableUtils.readVInt(in);
+        mode = Modes.MATERIALIZED;
+        iterator = null;
+        path = null;
+        writer = null;
+        content = new ArrayList<MRData>(n);
+        for ( int i = 0; i < n; i++ )
+            add(MRContainer.read(in));
+    }
+
+    private void readObjectNoData () throws ObjectStreamException { };
+
+    /** compare this Bag with a given Bag by comparing their associated elements */
+    public int compareTo ( MRData x ) {
+        Bag xt = (Bag)x;
+        Iterator<MRData> xi = xt.iterator();
+        Iterator<MRData> yi = iterator();
+        while ( xi.hasNext() && yi.hasNext() ) {
+            int c = xi.next().compareTo(yi.next());
+            if (c < 0)
+                return -1;
+            else if (c > 0)
+                return 1;
+        };
+        if (xi.hasNext())
+            return -1;
+        else if (yi.hasNext())
+            return 1;
+        else return 0;
+    }
+
+    /** compare this Bag with a given Bag by comparing their associated elements */
+    final public static int compare ( byte[] x, int xs, int xl, byte[] y, int ys, int yl, int[] size ) {
+        try {
+            int xn = WritableComparator.readVInt(x,xs);
+            int xx = WritableUtils.decodeVIntSize(x[xs]);
+            int yn = WritableComparator.readVInt(y,ys);
+            int yy = WritableUtils.decodeVIntSize(y[ys]);
+            for ( int i = 0; i < xn && i < yn; i++ ) {
+                int k = MRContainer.compare(x,xs+xx,xl-xx,y,ys+yy,yl-yy,size);
+                if (k != 0)
+                    return k;
+                xx += size[0];
+                yy += size[0];
+            };
+            size[0] = xx+1;
+            if (xn > yn)
+                return 1;
+            if (xn < yn)
+                return -1;
+            return 0;
+        } catch (IOException e) {
+            throw new Error(e);
+        }
+    }
+
+    /** is this Bag equal to another Bag (order is important) */
+    public boolean equals ( Object x ) {
+        if (!(x instanceof Bag))
+            return false;
+        Bag xt = (Bag) x;
+        Iterator<MRData> xi = xt.iterator();
+        Iterator<MRData> yi = iterator();
+        while ( xi.hasNext() && yi.hasNext() )
+            if ( !xi.next().equals(yi.next()) )
+                return false;
+        return xi.hasNext() || yi.hasNext();
+    }
+
+    /** the hash code of this Bag is the XOR of the hash code of its elements */
+    public int hashCode () {
+        int h = 127;
+        for ( MRData e: this )
+            h ^= e.hashCode();
+        return Math.abs(h);
+    }
+
+    /** show the first few Bag elements (controlled by -bag_print) */
+    public String toString () {
+        materialize();
+        StringBuffer b = new StringBuffer("{ ");
+        int i = 0;
+        for ( MRData e: this )
+            if ( i++ < Config.max_bag_size_print )
+                b.append(((i>1)?", ":"")+e);
+            else return b.append(", ... }").toString();
+        return b.append(" }").toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/core/src/main/java/org/apache/mrql/BagIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/BagIterator.java b/core/src/main/java/org/apache/mrql/BagIterator.java
new file mode 100644
index 0000000..0fd7354
--- /dev/null
+++ b/core/src/main/java/org/apache/mrql/BagIterator.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import java.util.Iterator;
+
+/** an Iterator over Bags */
+abstract public class BagIterator implements Iterator<MRData> {
+    public void remove () {
+        throw new Error("Bag deletions are not permitted");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/core/src/main/java/org/apache/mrql/BinaryDataSource.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/BinaryDataSource.java b/core/src/main/java/org/apache/mrql/BinaryDataSource.java
new file mode 100644
index 0000000..d4338ec
--- /dev/null
+++ b/core/src/main/java/org/apache/mrql/BinaryDataSource.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import org.apache.hadoop.conf.Configuration;
+
+
+/** A DataSource used for storing intermediate results and data dumps */
+final public class BinaryDataSource extends DataSource {
+    BinaryDataSource ( int source_num, String path, Configuration conf ) {
+        super(source_num,path,Evaluator.evaluator.binaryInputFormat(),conf);
+    }
+
+    BinaryDataSource ( String path, Configuration conf ) {
+        super(-1,path,Evaluator.evaluator.binaryInputFormat(),conf);
+    }
+
+    public String toString () {
+        return "Binary"+separator+source_num+separator+path;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/core/src/main/java/org/apache/mrql/ClassImporter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/ClassImporter.java b/core/src/main/java/org/apache/mrql/ClassImporter.java
new file mode 100644
index 0000000..8efc1ae
--- /dev/null
+++ b/core/src/main/java/org/apache/mrql/ClassImporter.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import java.lang.reflect.Method;
+import org.apache.mrql.gen.*;
+import java.util.*;
+
+/** imports external Java methods into MRQL */
+final public class ClassImporter {
+    final static boolean trace_imported_methods = false;
+
+    final static String[] object_methods
+        = { "hashCode", "getClass", "wait", "equals", "toString", "notify", "notifyAll" };
+
+    static Vector<MethodInfo> methods = new Vector<MethodInfo>();
+
+    public static void load_classes () {
+        if (methods == null)
+            methods = new Vector<MethodInfo>();
+        if (methods.size() == 0) {
+            importClass("org.apache.mrql.SystemFunctions");
+            //****** import your classes with user-defined functions here
+        }
+    }
+
+    private static boolean object_method ( String s ) {
+        for (int i = 0; i < object_methods.length; i++)
+            if (object_methods[i].equals(s))
+                return true;
+        return false;
+    }
+
+    private static Tree getType ( Class<?> c ) {
+        String cn = c.getCanonicalName();
+        Class<?>[] inf = c.getInterfaces();
+        if (cn.equals("org.apache.mrql.MRData"))
+            return new VariableLeaf("any");
+        if (cn.startsWith("org.apache.mrql.MR_"))
+            return new VariableLeaf(cn.substring(19));
+        if (cn.equals("org.apache.mrql.Bag"))
+            return new Node("bag",new Trees(new VariableLeaf("any")));
+        if (cn.equals("org.apache.mrql.Inv"))
+            return new VariableLeaf("any");
+        if (cn.equals("org.apache.mrql.Union"))
+            return new VariableLeaf("union");
+        if (cn.equals("org.apache.mrql.Lambda"))
+            return new VariableLeaf("any");
+        if (inf.length > 0 && inf[0].equals("org.apache.mrql.MRData"))
+            return new VariableLeaf("any");
+        throw new Error("Unsupported type in imported method: "+cn);
+    }
+
+    private static Trees signature ( Method m ) {
+        Class<?> co = m.getReturnType();
+        Class<?>[] cs = m.getParameterTypes();
+        Trees as = new Trees(getType(co));
+        for (int i = 0; i < cs.length; i++)
+            as = as.append(getType(cs[i]));
+        return as;
+    }
+
+    public static String method_name ( int method_number ) {
+        return methods.get(method_number).name;
+    }
+
+    public static Trees signature ( int method_number ) {
+        return methods.get(method_number).signature;
+    }
+
+    /** import all Java methods from a given Java class */
+    public static void importClass ( String class_name ) {
+        try {
+            Method[] ms = Class.forName(class_name).getMethods();
+            Vector<MethodInfo> mv = new Vector<MethodInfo>();
+            for (int i = 0; i < ms.length; i++)
+                if (!object_method(ms[i].getName()) && ms[i].getModifiers() == 9)
+                    try {
+                        Trees sig = signature(ms[i]);
+                        MethodInfo m = new MethodInfo(ms[i].getName(),sig,ms[i]);
+                        mv.add(m);
+                        methods.add(m);
+                    } catch ( Exception e ) {
+                        System.out.println("Warning: method "+ms[i].getName()+" cannot be imported");
+                        System.out.println(e);
+                        throw new Error("");
+                    };
+            Collections.sort(methods);
+            if (Translator.functions == null)
+                Translator.functions = Trees.nil;
+            for ( MethodInfo m: methods )
+                Translator.functions = Translator.functions.append(new Node(m.name,m.signature));
+            if (trace_imported_methods) {
+                System.out.print("Importing methods: ");
+                for (int i = 0; i < mv.size(); i++ )
+                    System.out.print(mv.get(i).name+mv.get(i).signature.tail()
+                                     +":"+mv.get(i).signature.head()+"  ");
+                System.out.println();
+            }
+        } catch (ClassNotFoundException x) {
+            throw new Error("Undefined class: "+class_name);
+        }
+    }
+
+    /** import a Java method with a given name from a given Java class */
+    public static void importMethod ( String class_name, String method_name ) {
+        try {
+            Method[] ms = Class.forName(class_name).getMethods();
+            MethodInfo m = null;
+            for (int i = 0; i < ms.length; i++)
+                if (ms[i].getName().equals(method_name)
+                    && !object_method(ms[i].getName()) && ms[i].getModifiers() == 9) {
+                    Trees sig = signature(ms[i]);
+                    m = new MethodInfo(ms[i].getName(),sig,ms[i]);
+                    Translator.functions = Translator.functions.append(new Node(ms[i].getName(),sig));
+                    break;
+                };
+            if (m == null)
+                throw new Error("No such method: "+method_name);
+            methods.add(m);
+            Collections.sort(methods);
+            if (trace_imported_methods)
+                System.out.println("Importing method: "+m.name+m.signature.tail()
+                                   +":"+m.signature.head()+"  ");
+        } catch (ClassNotFoundException x) {
+            throw new Error("Undefined class: "+class_name);
+        }
+    }
+
+    public static void print_methods () {
+        for (int i = 0; i < methods.size(); i++ ) {
+            MethodInfo m = methods.get(i);
+            System.out.print(" "+m.name+":"+m.signature.tail()+"->"+m.signature.head());
+        };
+    }
+
+    /** return the method specification of a system method with a given name over some expressions;
+     * When the method is overloaded, find the most specific (in terms of arg subtyping)
+     * @param method_name the given method name
+     * @param args the method expressions
+     * @return the method specification
+     */
+    public static Tree find_method ( String method_name, Trees args ) {
+        for (int i = 0; i < methods.size(); i++ ) {
+            MethodInfo m = methods.get(i);
+            if (m.name.equals(method_name) && TypeInference.subtype(args,m.signature.tail()))
+                return m.signature.head();
+        };
+        return null;
+    }
+
+    /** return the method number of a system method with a given name over some expressions;
+     * When the method is overloaded, find the most specific (in terms of arg subtyping)
+     * @param method_name the given method name
+     * @param args the method expressions
+     * @return the method number
+     */
+    public static int find_method_number ( String method_name, Trees args ) {
+        for (int i = 0; i < methods.size(); i++ ) {
+            MethodInfo m = methods.get(i);
+            if (m.name.equals(method_name) && TypeInference.subtype(args,m.signature.tail()))
+                return i;
+        };
+        return -1;
+    }
+
+    /** call a system method with a given number over MRData
+     * @param method_number the method number
+     * @param args in input arguments
+     * @return the result of invoking this method over the args
+     */
+    public static MRData call ( int method_number, MRData... args ) {
+        if (method_number < 0 || method_number >= methods.size())
+            throw new Error("Run-time error (unknown method name)");
+        MethodInfo m = methods.get(method_number);
+        try {
+            return (MRData)m.method.invoke(null,(Object[])args);
+        } catch (Exception e) {
+            Tuple t = new Tuple(args.length);
+            for ( int i = 0; i < args.length; i++ )
+                t.set(i,args[i]);
+            System.err.println("Run-time error in method call: "+m.name+t+" of type "
+                               +m.signature.tail()+"->"+m.signature.head());
+            throw new Error(e.toString());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/core/src/main/java/org/apache/mrql/Compiler.gen
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/Compiler.gen b/core/src/main/java/org/apache/mrql/Compiler.gen
new file mode 100644
index 0000000..668cdc8
--- /dev/null
+++ b/core/src/main/java/org/apache/mrql/Compiler.gen
@@ -0,0 +1,584 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import org.apache.mrql.gen.*;
+import java.io.*;
+import javax.tools.*;
+import javax.tools.JavaCompiler.CompilationTask;
+import java.lang.reflect.*;
+import java.util.*;
+import java.net.*;
+import java.util.jar.*;
+import org.apache.hadoop.io.WritableComparable;
+
+
+/** compilation of MRQL expressions to Java code and then to Java bytecode */
+final public class Compiler extends Translator {
+    static JavaCompiler compiler = ToolProvider.getSystemJavaCompiler();
+    static DiagnosticCollector<JavaFileObject> diagnostics = new DiagnosticCollector<JavaFileObject>();
+    final static String tmp_dir = "/tmp/mrql_jar_"+System.getProperty("user.name");
+    public static String jar_path;
+    static Trees in_memory_repeat_vars = #[];
+    static int lambda_num = 0;
+    static int user_functions_num = 0;
+
+    /** Compile the MRQL functional arguments into Java bytecode */
+    final private static class JavaSourceFromString extends SimpleJavaFileObject {
+        final String code;
+
+        JavaSourceFromString ( String name, String code ) {
+            super(URI.create("string:///org/apache/mrql/" + name.replace('.','/') + Kind.SOURCE.extension),Kind.SOURCE);
+            this.code = code;
+        }
+
+        @Override
+        public CharSequence getCharContent ( boolean ignoreEncodingErrors ) {
+            return code;
+        }
+    }
+
+    private static String new_lambda_name () {
+        return "MRQL_Lambda_"+(lambda_num++);
+    }
+
+    private static void add2jar ( File source, int offset, JarOutputStream target ) throws IOException {
+        if (source.isDirectory()) {
+            String name = source.getPath();
+            if (name.length() > offset) {
+                JarEntry entry = new JarEntry(name.substring(offset));
+                entry.setTime(source.lastModified());
+                target.putNextEntry(entry);
+                target.closeEntry();
+            };
+            for ( File nestedFile: source.listFiles() )
+                add2jar(nestedFile,offset,target);
+        } else {
+            JarEntry entry = new JarEntry(source.getPath().substring(offset));
+            entry.setTime(source.lastModified());
+            target.putNextEntry(entry);
+            BufferedInputStream in = new BufferedInputStream(new FileInputStream(source));
+            byte[] buffer = new byte[1024];
+            int count = 1;
+            while (count > 0) {
+                count = in.read(buffer);
+                if (count > 0)
+                    target.write(buffer,0,count);
+            };
+            target.closeEntry();
+            in.close();
+        }
+    }
+
+    private static void remove ( File file ) throws IOException {
+        if (file.isDirectory())
+            for ( File nestedFile: file.listFiles() )
+                remove(nestedFile);
+        file.delete();
+    }
+
+    public static void clean () throws IOException {
+        in_memory_repeat_vars = #[];
+        remove(new File(tmp_dir));
+    }
+
+    final private static Tree compile ( Tree e, StringBuffer out ) throws Exception {
+        match e {
+        case repeat(lambda(`v,`body),`s,`n):
+            in_memory_repeat_vars = in_memory_repeat_vars.cons(v);
+            return #<repeat(lambda(`v,`(compile(body,out))),`(compile(s,out)),`n)>;
+        case `f(...al):
+            if (!plans_with_distributed_lambdas.member(#<`f>))
+                fail;
+            Trees nl = #[];
+            for ( Tree a: al)
+                match Interpreter.closure(a,Interpreter.global_env) {
+                case lambda(`v,`body):
+                    String fname = new_lambda_name();
+                    StringBuffer sb = new StringBuffer(1000);
+                    sb.append("final class "+fname+" extends Function {\n");
+                    sb.append(fname+" () {}\n");
+                    sb.append("final public MRData eval ( final MRData "+v
+                              +" ) { return "+compileE(body)+"; }\n}\n");
+                    out.append(sb);
+                    nl = nl.append(#<compiled(`fname,`a)>);
+                case _: nl = nl.append(compile(a,out));
+                };
+            return #<`f(...nl)>;
+        case `f(...al):
+            Trees nl = #[];
+            for ( Tree a: al)
+                nl = nl.append(compile(a,out));
+            return #<`f(...nl)>;
+        };
+        return e;
+    }
+
+    private static StringBuffer out;
+
+    /** compile the functional arguments of the MRQL operators using the Java compiler
+     * @param query the expression to compile
+     * @return the query with all functional arguments compiled to Java bytecode
+     */
+    final public static Tree compile ( Tree query ) {
+        try {
+            user_functions_num = lambda_num++;
+            // remove the old jar
+            if (jar_path != null)
+                remove(new File(jar_path));
+            jar_path = tmp_dir+"/mrql_args_"+(new Random().nextInt(1000000))+".jar";
+            out = new StringBuffer(1000);
+            out.append("package org.apache.mrql;\n");
+            Tree nq = compile(query,out);
+            StringBuffer sb = new StringBuffer(1000);
+            for ( String f: global_functions )
+                match global_functions.lookup(f) {
+                case function(tuple(...params),`otp,`body):
+                    sb.append("final public static "+get_MR_type(otp)+" "+f);
+                    if (params.is_empty())
+                        sb.append(" ()");
+                    else {
+                        match params.head() {
+                        case bind(`v,`tp):
+                            sb.append(" ( final "+get_MR_type(tp)+" "+v);
+                        };
+                        for ( Tree var: params.tail() )
+                            match var {
+                            case bind(`v,`tp):
+                                sb.append(", final "+get_MR_type(tp)+" "+v);
+                            }
+                        sb.append(" ) { return ("+get_MR_type(otp)+")");
+                        sb.append(compileE(body));
+                        sb.append("; }\n");
+                    }
+                };
+            out.append("final class UserFunctions_"+user_functions_num+" {\n");
+            out.append(sb);
+            out.append("}\n");
+            String code = out.toString();
+            //System.out.println(code);
+            JavaFileObject file = new JavaSourceFromString("UserFunctions_"+user_functions_num,code);
+            Iterable<? extends JavaFileObject> compilationUnits = Arrays.asList(file);
+            List<String> optionList = new ArrayList<String>();
+            (new File(tmp_dir)).mkdir();
+            String dir = tmp_dir+"/classes_"+(new Random().nextInt(1000000));
+            File fdir = new File(dir);
+            fdir.mkdir();
+            ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+            String classpath = jar_path;
+            String separator = System.getProperty("path.separator");
+            for ( URL url: ((URLClassLoader) classLoader).getURLs() )
+                classpath += separator+url.getFile();
+            // use hadoop core jar
+            classpath += separator + WritableComparable.class.getProtectionDomain().getCodeSource().getLocation().toString();
+            optionList.addAll(Arrays.asList("-classpath",classpath));
+            optionList.addAll(Arrays.asList("-d",dir));
+            CompilationTask task = compiler.getTask(null,null,diagnostics,optionList,null,compilationUnits);
+            boolean success = task.call();
+            if (!success)
+                for ( Diagnostic d: diagnostics.getDiagnostics() )
+                    System.err.println("*** Compilation error at line "+d.getLineNumber()+" position "
+                                       +d.getColumnNumber()+": "+d.getMessage(Locale.US));
+            Manifest manifest = new Manifest();
+            manifest.getMainAttributes().put(Attributes.Name.MANIFEST_VERSION,"1.0");
+            JarOutputStream target = new JarOutputStream(new FileOutputStream(jar_path),manifest);
+            add2jar(new File(dir+"/"),dir.length()+1,target);
+            target.close();
+            remove(fdir);
+            return nq;
+        } catch (Exception e) {
+            System.err.println("*** Warning: Unable to compile the query:\n"+query);
+            if (Config.trace)
+                e.printStackTrace(System.err);
+            return query;
+        }
+    }
+
+    /** load the Java class of the anonymous function with name lambda_name */
+    final public static Function compiled ( ClassLoader cl, String lambda_name ) throws Exception {
+        URL[] urls = ((URLClassLoader) cl).getURLs();
+        URL[] new_urls = new URL[urls.length+1];
+        for ( int i = 0; i < urls.length; i++ )
+            new_urls[i+1] = urls[i];
+        new_urls[0] = new URL("file://"+jar_path);
+        URLClassLoader loader = new URLClassLoader(new_urls,cl);
+        Class c = loader.loadClass("org.apache.mrql."+lambda_name);
+        Constructor cc = c.getDeclaredConstructors()[0];
+        cc.setAccessible(true);
+        return (Function)cc.newInstance();
+    }
+
+    /** The Java type of an MRQL type */
+    private static String get_MR_type ( Tree type ) {
+        match type {
+        case boolean: return "MR_bool";
+        case byte: return "MR_byte";
+        case short: return "MR_short";
+        case int: return "MR_int";
+        case long: return "MR_long";
+        case float: return "MR_float";
+        case double: return "MR_double";
+        case char: return "MR_char";
+        case string: return "MR_string";
+        case union: return "Union";
+        case bag(...): return "Bag";
+        case list(...): return "Bag";
+        };
+        return "MRData";
+    }
+
+    private static Trees remove_duplicates ( Trees al ) {
+        if (al.is_empty())
+            return al;
+        Trees el = remove_duplicates(al.tail());
+        if (el.member(al.head()))
+            return el;
+        else return el.cons(al.head());
+    }
+
+    /** lambda lifting: generate Java code from an anonymous function */
+    private static String compilef ( String v, Tree body ) throws Exception {
+        String fname = new_lambda_name();
+        Trees free_vars = remove_duplicates(free_variables(body,#[`v]));
+        StringBuffer sb = new StringBuffer(1000);
+        sb.append("final class "+fname+" extends Function {\n");
+        for ( Tree var: free_vars )
+            sb.append("MRData "+var+"; ");
+        sb.append("\npublic "+fname+" (");
+        if (free_vars.is_empty())
+            sb.append(") {}\n");
+        else {
+            sb.append(" MRData "+free_vars.head());
+            for ( Tree var: free_vars.tail() )
+                sb.append(", MRData "+var);
+            sb.append(" ) { ");
+            for ( Tree var: free_vars )
+                sb.append("this."+var+" = "+var+"; ");
+            sb.append("}\n");
+        };
+        sb.append("final public MRData eval ( final MRData "+v
+                   +" ) { return "+compileE(body)+"; }\n}\n");
+        out.append(sb);
+        String s = "new "+fname+"(";
+        if (!free_vars.is_empty()) {
+            s += free_vars.head();
+            for ( Tree var: free_vars.tail() )
+                s += ","+var;
+        };
+        return s+")";
+    }
+
+    private static String compileF ( Tree fnc ) throws Exception {
+        match fnc {
+        case lambda(`v,`b):
+            return compilef(v.toString(),b);
+        case compiled(`f,`lm):
+            // recompile the function
+            String s = compileF(lm);
+            ((Node)fnc).children().head = new VariableLeaf(s.substring(4,s.indexOf("(")));  //destructive
+            return s;
+        case function(tuple(...params),`tp,`body):
+            String ret = "new Lambda(new Function () { "
+                         +"final public MRData eval ( final MRData _x ) { ";
+            for ( int i = 0; i < params.length(); i++ )
+                match params.nth(i) {
+                case bind(`v,_):
+                    ret += "final MRData "+v+" = ((Tuple)_x).get("+i+"); ";
+                };
+            return ret+" return "+compileE(body)+"; } })";
+        };
+        throw new Exception("Ill-formed lambda: "+fnc);
+    }
+
+    private static String compileEL ( Trees el ) throws Exception {
+        if (el.is_empty())
+            return "";
+        String ret = compileE(el.head());
+        for ( Tree a: el.tail() )
+            ret += ","+compileE(a);
+        return ret;
+    }
+
+    private static String compileE ( Tree e ) throws Exception {
+        if (e == null)
+            return "(new MR_byte(0))";
+        if (e.equals(#<true>))
+            return "(new MR_bool(true))";
+        else if (e.equals(#<false>))
+            return "(new MR_bool(false))";
+        else if (e.equals(#<null>))
+            return "(new MR_byte(0))";
+        else if (e.is_variable())
+            if (in_memory_repeat_vars.member(e))
+                return "Interpreter.lookup_global_binding(\""+e.toString()+"\")";
+            else return e.toString();
+        else if (e.is_long())
+            return "(new MR_int("+((LongLeaf)e).value()+"))";
+        else if (e.is_double())
+            return "(new MR_float("+((DoubleLeaf)e).value()+"))";
+        else if (e.is_string())
+            return "(new MR_string("+e.toString()+"))";
+        match e {
+        case callM(and,_,`x,`y):
+            return "(new MR_bool(((MR_bool)"+compileE(x)
+                +").get() && ((MR_bool)"+compileE(y)+").get()))";
+        case callM(or,_,`x,`y):
+            return "(new MR_bool(((MR_bool)"+compileE(x)
+                +").get() || ((MR_bool)"+compileE(y)+").get()))";
+        case callM(not,_,`x):
+            return "(new MR_bool(!((MR_bool)"+compileE(x)+").get()))";
+        case callM(`f,`n,...args):
+            if (!n.is_long())
+                fail;
+            String ret = "SystemFunctions."+ClassImporter.method_name((int)((LongLeaf)n).value())+"(";
+            Trees sig = ClassImporter.signature((int)((LongLeaf)n).value());
+            for (int i = 0; i < args.length(); i++)
+                ret += ((i > 0) ? ",(" : "(")+get_MR_type(sig.nth(i+1))+")("+compileE(args.nth(i))+")";
+            return ret+")";
+        case lambda(`v,`body):
+            return "new Lambda("+compilef(v.toString(),body)+")";
+        case nth(`x,`n):
+            return "(((Tuple)("+compileE(x)+")).get("+((LongLeaf)n).value()+"))";
+        case setNth(`x,`n,`v,`ret):
+            return "(((Tuple)("+compileE(x)+")).set("+((LongLeaf)n).value()+","+compileE(v)+","+compileE(ret)+"))";
+        case materialize(`u):
+            return "MapReduceAlgebra.materialize("+compileE(u)+")";
+        case let(`v,`u,`body):
+            return "(new Function () { public MRData eval ( final MRData "+v
+                +" ) { if ("+v+" instanceof Bag) ((Bag)"+v+").materialize(); return "
+                +compileE(body)+"; }; }).eval("+compileE(u)+")";
+        case cmap(`m,`s):
+            return "MapReduceAlgebra.cmap("+compileF(m)+",(Bag)("+compileE(s)+"))";
+        case filter(`p,`m,`s):
+            return "MapReduceAlgebra.filter("+compileF(p)+","+compileF(m)
+                   +",(Bag)"+compileE(s)+")";
+        case map(`m,`s):
+            return "MapReduceAlgebra.map("+compileF(m)+",(Bag)"+compileE(s)+")";
+        case range(`min,`max):
+            return "MapReduceAlgebra.generator(((MR_long)"+compileE(min)+").get(),"
+                   +"((MR_long)"+compileE(max)+").get())";
+        case call(`f,...args):
+            return "("+compileF(f)+".eval("+compileE(#<tuple(...args)>)+"))";
+        case tuple():
+            return "(new Tuple())";
+        case tuple(`x):
+            return "(new Tuple("+compileE(x)+"))";
+        case tuple(`a,...el):
+            String ret = "(new Tuple("+compileE(a);
+            for ( Tree x: el )
+                ret += ","+compileE(x);
+            return ret+"))";
+        case tagged_union(`n,`u):
+            return "(new Union((byte)"+((LongLeaf)n).value()+","+compileE(u)+"))";
+        case union_value(`x):
+            return "(((Union)"+compileE(x)+").value())";
+        case union_tag(`x):
+            return "(new MR_int(((Union)"+compileE(x)+").tag()))";
+        // used for shortcutting sync in bsp supersteps
+        case BAG():
+            return "SystemFunctions.bsp_empty_bag";
+        case TRUE():
+            return "SystemFunctions.bsp_true_value";
+        case FALSE():
+            return "SystemFunctions.bsp_false_value";
+        case `T():
+            if (is_collection(T))
+                return "(new Bag())";
+            else fail
+        case `T(e):
+            if (is_collection(T))
+                return "(new Bag("+compileE(e)+"))";
+            else fail
+        case `T(`a,...el):
+            if (!is_collection(T))
+                fail;
+            String ret = "(new Bag("+compileE(a);
+            for ( Tree x: el )
+                ret += ",(MRData)"+compileE(x);
+            return ret+"))";
+        case if(`c,`x,`y):
+            return "((((MR_bool)"+compileE(c)+").get())?"+compileE(x)+":"+compileE(y)+")";
+        case synchronize(`peer,`b):
+            return "SystemFunctions.synchronize(((MR_string)"+compileE(peer)+"),(MR_bool)"+compileE(b)+")";
+        case distribute(`peer,`s):
+            return "SystemFunctions.distribute(((MR_string)"+compileE(peer)+"),(Bag)"+compileE(s)+")";
+        case mapReduce(`mx,`my,`s,_):
+            return "MapReduceAlgebra.mapReduce("+compileF(mx)+","+compileF(my)+",(Bag)("+compileE(s)+"))";
+        case mapReduce2(`mx,`my,`r,`x,`y,_):
+            return "MapReduceAlgebra.mapReduce2("+compileF(mx)+","+compileF(my)+","+compileF(r)
+                   +",(Bag)("+compileE(x)+"),(Bag)("+compileE(y)+"))";
+        case mapJoin(`kx,`ky,`r,`x,`y):
+            return "MapReduceAlgebra.mapJoin("+compileF(kx)+","+compileF(ky)+","+compileF(r)
+                   +",(Bag)("+compileE(x)+"),(Bag)("+compileE(y)+"))";
+        case join(`kx,`ky,`r,`x,`y):
+            return "MapReduceAlgebra.join("+compileF(kx)+","+compileF(ky)+","+compileF(r)
+                   +",(Bag)("+compileE(x)+"),(Bag)("+compileE(y)+"))";
+        case groupBy(`s):
+            return "MapReduceAlgebra.groupBy((Bag)("+compileE(s)+"))";
+        case index(`x,`n):
+            return "SystemFunctions.index((Bag)("+compileE(x)+"),"+compileE(n)+")";
+        case range(`x,`i,`j):
+            return "SystemFunctions.range((Bag)("+compileE(x)+"),"+compileE(i)+","+compileE(j)+")";
+        case map_index(`x,`key):
+            return "((Bag)("+compileE(x)+")).map_find("+compileE(key)+")";
+        case aggregate(`acc,`zero,`s):
+            return "MapReduceAlgebra.aggregate("+compileF(acc)+","+compileE(zero)+",(Bag)("+compileE(s)+"))";
+        case Aggregate(`acc,`zero,`s):
+            return "MapReducePlan.aggregate("+compileF(acc)+","+compileE(zero)+","+compileM(s)+")";
+        case mergeGroupByJoin(`kx,`ky,`gx,`gy,`m,`c,`r,`x,`y,`o):
+            return "MapReduceAlgebra.mergeGroupByJoin("+compileF(kx)+","+compileF(ky)
+                   +","+compileF(gx)+","+compileF(gy)+","+compileF(m)+","+compileF(c)
+                   +","+compileF(r)+",(Bag)"+compileE(x)+",(Bag)"+compileE(y)+")";
+        case function(tuple(...params),`tp,`body):
+            return compileF(e);
+        case typed(`x,_):
+            return compileE(x);
+        case apply(`f,tuple(...args)):
+            if (!f.is_variable())
+                fail;
+            match global_functions.lookup(f.toString()) {
+            case function(tuple(...params),`otp,`body):
+                String ret = "UserFunctions_"+user_functions_num+"."+f+"(";
+                if (args.is_empty())
+                    return ret+")";
+                for ( int i = 0; i < params.length(); i++ )
+                    match params.nth(i) {
+                    case bind(_,`tp):
+                        ret += ((i==0)?"":",")+"("+get_MR_type(tp)+")"+compileE(args.nth(i));
+                    };
+                return ret+")";
+            };
+        case apply(`f,`arg):
+            if (!f.is_variable())
+                fail;
+            match global_functions.lookup(f.toString()) {
+            case function(tuple(...params),`otp,`body):
+                String ac = compileE(arg);
+                String ret = "UserFunctions_"+user_functions_num+"."+f+"(";
+                for ( int i = 0; i < params.length(); i++ )
+                    match params.nth(i) {
+                    case bind(_,`tp):
+                        ret += ((i==0)?"":",")+"("+get_MR_type(tp)+")((Tuple)"+ac+").get("+i+")";
+                    };
+                return ret+")";
+            };
+        case apply(`f,`arg):
+            if (!f.is_variable())
+                return "("+compileF(f)+").eval("+compileE(arg)+")";
+            else return "(((Lambda)"+compileE(f)+").lambda().eval("+compileE(arg)+"))";
+        case Collect(`s):
+            return "Plan.collect("+compileM(s)+")";
+        case trace(`x):
+            return compileE(x);
+        case _:
+            return compileM(e);
+        };
+        throw new Exception("Cannot compile: "+e);
+    }
+
+    final private static String compileM ( Tree e ) throws Exception {
+        match e {
+        case cMap(`f,`s):
+            return "MapReduceAlgebra.cmap("+compileF(f)+",(Bag)"+compileM(s)+")";
+        case AggregateMap(`f,`acc,`zero,`s):
+            return "new Bag(MapReduceAlgebra.aggregate("+compileF(acc)+","+compileE(zero)
+                   +","+compileM(#<cMap(`f,`s)>)+"))";
+        case MapReduce(`m,`r,`s,_):
+            return "MapReduceAlgebra.mapReduce("+compileF(m)+","
+                   +compileF(r)+",(Bag)"+compileM(s)+")";
+        case MapAggregateReduce(`m,`r,`acc,`zero,`s,_):
+            return "new Bag(MapReduceAlgebra.aggregate("+compileF(acc)
+                   +","+compileE(zero)+","+compileM(#<MapReduce(`m,`r,`s)>)+"))";
+        case MapCombineReduce(`m,`c,`r,`s,_):
+            return "MapReduceAlgebra.mapReduce("+compileF(m)
+                   +","+compileF(r)+",(Bag)"+compileM(s)+")";
+        case MapReduce2(`mx,`my,`c,`r,`x,`y,_):
+            return "MapReduceAlgebra.mapReduce2("+compileF(mx)+","+compileF(my)
+                   +","+compileF(r)+",(Bag)"+compileM(x)+",(Bag)"+compileM(y)+")";
+        case MapReduce2(`mx,`my,`r,`x,`y,_):
+            return "MapReduceAlgebra.mapReduce2("+compileF(mx)+","+compileF(my)
+                   +","+compileF(r)+",(Bag)"+compileM(x)+",(Bag)"+compileM(y)+")";
+        case MapAggregateReduce2(`mx,`my,`r,`acc,`zero,`x,`y,_):
+            return "new Bag(MapReduceAlgebra.aggregate("+compileF(acc)+","+compileE(zero)
+                   +","+compileM(#< MapReduce2(`mx,`my,`r,`x,`y)>)+"))";
+        case MapJoin(`kx,`ky,`r,`x,`y):
+            return "MapReduceAlgebra.mapJoin("+compileF(kx)+","+compileF(ky)
+                   +","+compileF(r)+",(Bag)"+compileM(x)+",(Bag)"+compileM(y)+")";
+        case MapAggregateJoin(`kx,`ky,`r,`acc,`zero,`x,`y):
+            return "new Bag(MapReduceAlgebra.aggregate("+compileF(acc)
+                   +","+compileE(zero)+","+compileM(#<MapJoin(`kx,`ky,`r,`x,`y)>)+"))";
+        case GroupByJoin(`kx,`ky,`gx,`gy,`m,`c,`r,`x,`y,`o):
+            return "MapReduceAlgebra.groupByJoin("+compileF(kx)+","+compileF(ky)
+                   +","+compileF(gx)+","+compileF(gy)+","+compileF(m)+","+compileF(c)
+                   +","+compileF(r)+",(Bag)"+compileM(x)+",(Bag)"+compileM(y)+")";
+        case CrossProduct(`mx,`my,`r,`x,`y):
+            return "MapReduceAlgebra.crossProduct("+compileF(mx)+","+compileF(my)
+                   +","+compileF(r)+",(Bag)"+compileM(x)+",(Bag)"+compileM(y)+")";
+        case CrossAggregateProduct(`mx,`my,`r,`acc,`zero,`x,`y):
+            return "new Bag(MapReduceAlgebra.aggregate("+compileF(acc)+","+compileE(zero)
+                   +","+compileM(#<CrossProduct(`mx,`my,`r,`x,`y)>)+"))";
+        case BSPSource(`n,BinarySource(`file,_)):
+            if (Config.hadoop_mode)
+                return "Plan.binarySource("+compileE(n)+",((MR_string)"
+                       +compileE(file)+").get())";
+            else return "(Bag)MapReduceAlgebra.read_binary("+compileE(n)+",((MR_string)"
+                        +compileE(file)+").get())";
+        case BinarySource(`file,_):
+            if (Config.hadoop_mode)
+                return "Plan.binarySource(((MR_string)"+compileE(file)+").get())";
+            else return "(Bag)MapReduceAlgebra.read_binary(((MR_string)"+compileE(file)+").get())";
+        case BSPSource(`n,ParsedSource(`parser,`file,...args)):
+            if (!(n instanceof LongLeaf))
+                fail;
+            if (!Config.hadoop_mode)
+                return "MapReduceAlgebra.parsedSource(((MR_int)"+compileE(n)+").get(),\""
+                       +parser+"\",((MR_string)"+compileE(file)+").get(),"
+                       +reify(args)+")";
+            Class<? extends Parser> p = DataSource.parserDirectory.get(parser.toString());
+            if (p == null)
+                throw new Error("Unknown parser: "+parser);
+            return "Plan.parsedSource("+p.getName()+".class,((MR_string)"+compileE(file)+").get(),"
+                   +reify(args)+")";
+        case ParsedSource(`parser,`file,...args):
+            if (!Config.hadoop_mode)
+                return "MapReduceAlgebra.parsedSource(\""+parser+"\",((MR_string)"
+                       +compileE(file)+").get(),"+reify(args)+")";
+            Class<? extends Parser> p = DataSource.parserDirectory.get(parser.toString());
+            if (p == null)
+                throw new Error("Unknown parser: "+parser);
+            return "Plan.parsedSource("+p.getName()+".class,((MR_string)"+compileE(file)+").get(),"
+                   +reify(args)+")";
+        case Merge(`x,`y):
+            return "((Bag)"+compileM(x)+").union((Bag)"+compileM(y)+")";
+        case Generator(`min,`max,`size):
+            return "MapReduceAlgebra.generator(((MR_long)"+compileE(min)+").get(),"
+                   +"((MR_long)"+compileE(max)+").get())";
+        case BSP(`n,`superstep,`state,`o,...as):
+            String ds = "";
+            for ( Tree a: as )
+                ds += ",(Bag)("+compileM(a)+")";
+            return "MapReduceAlgebra.BSP("+((LongLeaf)n).value()+","
+                   +compileF(superstep)+","+compileE(state)+","+o+","
+                   +"new Bag[]{"+ds.substring(1)+"})";
+        case `v:
+            if (v.is_variable())
+                return v.toString();
+        };
+        throw new Exception("Cannot compile: "+e);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/core/src/main/java/org/apache/mrql/Config.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/Config.java b/core/src/main/java/org/apache/mrql/Config.java
new file mode 100644
index 0000000..a957d89
--- /dev/null
+++ b/core/src/main/java/org/apache/mrql/Config.java
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import org.apache.mrql.gen.VariableLeaf;
+import java.util.ArrayList;
+import java.io.FileInputStream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+
+/** MRQL configuration parameters */
+final public class Config {
+    public static boolean loaded = false;
+
+    // true for using Hadoop HDFS file-system
+    public static boolean hadoop_mode = false;
+    // true for local execution (one node)
+    public static boolean local_mode = false;
+    // true for local execution (one node)
+    public static boolean distributed_mode = false;
+    // true for Hadoop map-reduce mode
+    public static boolean map_reduce_mode = false;
+    // true, for BSP mode using Hama
+    public static boolean bsp_mode = false;
+    // true, for Spark mode
+    public static boolean spark_mode = false;
+    // if true, it process the input interactively
+    public static boolean interactive = true;
+    // compile the MR functional arguments to Java bytecode at run-time
+    // (each task-tracker repeats the compilation at the MR setup time)
+    public static boolean compile_functional_arguments = true;
+    // if true, generates info about all compilation and optimization steps
+    public static boolean trace = false;
+    // number of worker nodes
+    public static int nodes = 2;
+    // true, to disable mapJoin
+    public static boolean noMapJoin = false;
+    // max distributed cache size for MapJoin (fragment-replicate join) in MBs
+    public static int mapjoin_size = 50;
+    // max entries for in-mapper combiner before they are flushed out
+    public static int map_cache_size = 100000;
+    // max number of bag elements to print
+    public static int max_bag_size_print = 20;
+    // max size of materialized vector before is spilled to a file:
+    public static int max_materialized_bag = 500000;
+    // max number of incoming messages before a sub-sync()
+    public static int bsp_msg_size = Integer.MAX_VALUE;
+    // number of elements per mapper to process the range min...max
+    public static long range_split_size = 100000;
+    // max number of streams to merge simultaneously
+    public static int max_merged_streams = 100;
+    // the directory for temporary files and spilled bags
+    public static String tmpDirectory = "/tmp/mrql_"+System.getProperty("user.name");
+    // true, if we want to derive a combine function for MapReduce
+    public static boolean use_combiner = true;
+    // true, if we can use the rule that fuses a groupBy with a join over the same key
+    public static boolean groupJoinOpt = true;
+    // true, if we can use the rule that converts a self-join into a simple mapreduce
+    public static boolean selfJoinOpt = true;
+    // true for run-time trace of plans
+    public static boolean trace_execution = false;
+    // true for extensive run-time trace of expressions & plans
+    public static boolean trace_exp_execution = false;
+    // true if you don't want to print statistics
+    public static boolean quiet_execution = false;
+    // true if this is during testing
+    public static boolean testing = false;
+    // true to display INFO log messages
+    public static boolean info = false;
+
+    /** store the configuration parameters */
+    public static void write ( Configuration conf ) {
+        conf.setBoolean("mrql.hadoop.mode",hadoop_mode);
+        conf.setBoolean("mrql.local.mode",local_mode);
+        conf.setBoolean("mrql.distributed.mode",distributed_mode);
+        conf.setBoolean("mrql.map.reduce.mode",map_reduce_mode);
+        conf.setBoolean("mrql.bsp.mode",bsp_mode);
+        conf.setBoolean("mrql.spark.mode",spark_mode);
+        conf.setBoolean("mrql.interactive",interactive);
+        conf.setBoolean("mrql.compile.functional.arguments",compile_functional_arguments);
+        conf.setBoolean("mrql.trace",trace);
+        conf.setInt("mrql.nodes",nodes);
+        conf.setInt("mrql.mapjoin.size",mapjoin_size);
+        conf.setInt("mrql.in.mapper.size",map_cache_size);
+        conf.setInt("mrql.max.bag.size.print",max_bag_size_print);
+        conf.setInt("mrql.max.materialized.bag",max_materialized_bag);
+        conf.setInt("mrql.bsp.msg.size",bsp_msg_size);
+        conf.setLong("mrql.range.split.size",range_split_size);
+        conf.setInt("mrql.max.merged.streams",max_merged_streams);
+        conf.set("mrql.tmp.directory",tmpDirectory);
+        conf.setBoolean("mrql.use.combiner",use_combiner);
+        conf.setBoolean("mrql.group.join.opt",groupJoinOpt);
+        conf.setBoolean("mrql.self.join.opt",selfJoinOpt);
+        conf.setBoolean("mrql.trace.execution",trace_execution);
+        conf.setBoolean("mrql.trace.exp.execution",trace_exp_execution);
+        conf.setBoolean("mrql.quiet.execution",quiet_execution);
+        conf.setBoolean("mrql.testing",testing);
+        conf.setBoolean("mrql.info",info);
+    }
+
+    /** load the configuration parameters */
+    public static void read ( Configuration conf ) {
+        if (loaded)
+            return;
+        loaded = true;
+        hadoop_mode = conf.getBoolean("mrql.hadoop.mode",hadoop_mode);
+        local_mode = conf.getBoolean("mrql.local.mode",local_mode);
+        distributed_mode = conf.getBoolean("mrql.distributed.mode",distributed_mode);
+        map_reduce_mode = conf.getBoolean("mrql.map.reduce.mode",map_reduce_mode);
+        bsp_mode = conf.getBoolean("mrql.bsp.mode",bsp_mode);
+        spark_mode = conf.getBoolean("mrql.spark.mode",spark_mode);
+        interactive = conf.getBoolean("mrql.interactive",interactive);
+        compile_functional_arguments = conf.getBoolean("mrql.compile.functional.arguments",compile_functional_arguments);
+        trace = conf.getBoolean("mrql.trace",trace);
+        nodes = conf.getInt("mrql.nodes",nodes);
+        mapjoin_size = conf.getInt("mrql.mapjoin.size",mapjoin_size);
+        map_cache_size = conf.getInt("mrql.in.mapper.size",map_cache_size);
+        max_bag_size_print = conf.getInt("mrql.max.bag.size.print",max_bag_size_print);
+        max_materialized_bag = conf.getInt("mrql.max.materialized.bag",max_materialized_bag);
+        bsp_msg_size = conf.getInt("mrql.bsp.msg.size",bsp_msg_size);
+        range_split_size = conf.getLong("mrql.range.split.size",range_split_size);
+        max_merged_streams = conf.getInt("mrql.max.merged.streams",max_merged_streams);
+        tmpDirectory = conf.get("mrql.tmp.directory");
+        use_combiner = conf.getBoolean("mrql.use.combiner",use_combiner);
+        groupJoinOpt = conf.getBoolean("mrql.group.join.opt",groupJoinOpt);
+        selfJoinOpt = conf.getBoolean("mrql.self.join.opt",selfJoinOpt);
+        trace_execution = conf.getBoolean("mrql.trace.execution",trace_execution);
+        trace_exp_execution = conf.getBoolean("mrql.trace.exp.execution",trace_exp_execution);
+        quiet_execution = conf.getBoolean("mrql.quiet.execution",quiet_execution);
+        testing = conf.getBoolean("mrql.testing",testing);
+        info = conf.getBoolean("mrql.info",info);
+    }
+
+    public static ArrayList<String> extra_args = new ArrayList<String>();
+
+    /** read configuration parameters from the Main args */
+    public static Bag parse_args ( String args[], Configuration conf ) throws Exception {
+        int i = 0;
+        int iargs = 0;
+        extra_args = new ArrayList<String>();
+        ClassImporter.load_classes();
+        interactive = true;
+        while (i < args.length) {
+            if (args[i].equals("-local")) {
+                local_mode = true;
+                i++;
+            } else if (args[i].equals("-dist")) {
+                distributed_mode = true;
+                i++;
+            } else if (args[i].equals("-reducers")) {
+                if (++i >= args.length)
+                    throw new Error("Expected number of reductions");
+                nodes = Integer.parseInt(args[i]);
+                i++;
+            } else if (args[i].equals("-bsp")) {
+                bsp_mode = true;
+                i++;
+            } else if (args[i].equals("-spark")) {
+                spark_mode = true;
+                i++;
+            } else if (args[i].equals("-bsp_tasks")) {
+                if (++i >= args.length && Integer.parseInt(args[i]) < 1)
+                    throw new Error("Expected max number of bsp tasks > 1");
+                nodes = Integer.parseInt(args[i]);
+                i++;
+            } else if (args[i].equals("-nodes")) {
+                if (++i >= args.length && Integer.parseInt(args[i]) < 1)
+                    throw new Error("Expected number of nodes > 1");
+                nodes = Integer.parseInt(args[i]);
+                i++;
+            } else if (args[i].equals("-bsp_msg_size")) {
+                if (++i >= args.length && Integer.parseInt(args[i]) < 10000)
+                    throw new Error("Expected max number of bsp messages before subsync() > 10000");
+                bsp_msg_size = Integer.parseInt(args[i]);
+                i++;
+            } else if (args[i].equals("-mapjoin_size")) {
+                if (++i >= args.length)
+                    throw new Error("Expected number of MBs");
+                mapjoin_size = Integer.parseInt(args[i]);
+                i++;
+            } else if (args[i].equals("-cache_size")) {
+                if (++i >= args.length)
+                    throw new Error("Expected number of entries");
+                map_cache_size = Integer.parseInt(args[i]);
+                i++;
+            } else if (args[i].equals("-tmp")) {
+                if (++i >= args.length)
+                    throw new Error("Expected a temporary directory");
+                tmpDirectory = args[i];
+                i++;
+            } else if (args[i].equals("-bag_size")) {
+                if (++i >= args.length && Integer.parseInt(args[i]) < 10000)
+                    throw new Error("Expected max size of materialized bag > 10000");
+                max_materialized_bag = Integer.parseInt(args[i]);
+                i++;
+            } else if (args[i].equals("-bag_print")) {
+                if (++i >= args.length)
+                    throw new Error("Expected number of bag elements to print");
+                max_bag_size_print = Integer.parseInt(args[i]);
+                i++;
+            } else if (args[i].equals("-split_size")) {
+                if (++i >= args.length)
+                    throw new Error("Expected a split size");
+                range_split_size = Long.parseLong(args[i]);
+                i++;
+            } else if (args[i].equals("-max_merged")) {
+                if (++i >= args.length)
+                    throw new Error("Expected a max number of merged streams");
+                max_merged_streams = Integer.parseInt(args[i]);
+                i++;
+            } else if (args[i].equals("-trace")) {
+                trace = true;
+                i++;
+            } else if (args[i].equals("-C")) {
+                compile_functional_arguments = true;
+                i++;
+            } else if (args[i].equals("-NC")) {
+                compile_functional_arguments = false;
+                i++;
+            } else if (args[i].equals("-P")) {
+                trace_execution = true;
+                i++;
+            } else if (args[i].equals("-quiet")) {
+                quiet_execution = true;
+                i++;
+            } else if (args[i].equals("-info")) {
+                info = true;
+                i++;
+            } else if (args[i].equals("-trace_execution")) {
+                trace_execution = true;
+                trace_exp_execution = true;
+                compile_functional_arguments = false;
+                i++;
+            } else if (args[i].equals("-methods")) {
+                System.out.print("\nImported methods: ");
+                ClassImporter.print_methods();
+                System.out.println();
+                System.out.print("\nAggregations:");
+                Translator.print_aggregates();
+                System.out.println();
+                i++;
+            } else if (args[i].charAt(0) == '-')
+                throw new Error("Unknown MRQL parameter: "+args[i]);
+            else {
+                if (interactive) {
+                    Main.query_file = args[i++];
+                    interactive = false;
+                } else extra_args.add(args[i++]);
+            }
+        };
+        if (hadoop_mode)
+            write(conf);
+        Plan.conf = conf;
+        Bag b = new Bag();
+        for ( String s: extra_args )
+            b.add(new MR_string(s));
+        Interpreter.new_global_binding(new VariableLeaf("args").value(),b);
+        return b;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/core/src/main/java/org/apache/mrql/DataSet.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/DataSet.java b/core/src/main/java/org/apache/mrql/DataSet.java
new file mode 100644
index 0000000..efe5646
--- /dev/null
+++ b/core/src/main/java/org/apache/mrql/DataSet.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import java.util.List;
+import java.util.ArrayList;
+import org.apache.hadoop.conf.Configuration;
+
+
+/** The domain of the MRQL physical algebra is a set of DataSources */
+public class DataSet {
+    public ArrayList<DataSource> source;  // multiple sources
+    public long counter;  // a Hadoop user-defined counter used by the `repeat' operator
+    public long records;  // total number of dataset records
+
+    /** Construct a DataSet that contains one DataSource
+     * @param s the given DataSource
+     * @param counter a Hadoop user-defined counter used by the `repeat' operator
+     * @param records total number of dataset records
+     */
+    DataSet ( DataSource s, long counter, long records ) {
+        source = new ArrayList<DataSource>();
+        source.add(s);
+        this.counter = counter;
+        this.records = records;
+    }
+
+    /** Construct a set of DataSources
+     * @param counter a Hadoop user-defined counter used by the `repeat' operator
+     * @param records total number of dataset records
+     */
+    DataSet ( long counter, long records ) {
+        source = new ArrayList<DataSource>();
+        this.counter = counter;
+        this.records = records;
+    }
+
+    /** add a DataSource to this DataSet */
+    public void add ( DataSource s ) {
+        source.add(s);
+    }
+
+    /** merge this DataSet with the given DataSet */
+    public void merge ( DataSet ds ) {
+        source.addAll(ds.source);
+        counter += ds.counter;
+        records += ds.records;
+    }
+
+    /** dataset size in bytes */
+    public long size ( Configuration conf ) {
+        long n = 0;
+        for (DataSource s: source)
+            n += s.size(conf);
+        return n;
+    }
+
+    /** return a single DataSource path by merging all the DataSource paths in this DataSet */
+    public String merge () {
+        Object[] ds = source.toArray();
+        String path = ((DataSource)ds[0]).path.toString();
+        for ( int i = 1; i < ds.length; i++ )
+            path += ","+((DataSource)ds[i]).path;
+        return path;
+    }
+
+    /** return the first num values */
+    public List<MRData> take ( int num ) {
+        int count = num;
+        ArrayList<MRData> res = new ArrayList<MRData>();
+        for ( DataSource s: source ) {
+            res.addAll(s.take(count));
+            if (res.size() < count)
+                count = count-res.size();
+            else return res;
+        };
+        return res;
+    }
+
+    /** accumulate all dataset values */
+    public MRData reduce ( MRData zero, Function acc ) {
+        MRData res = zero;
+        for ( DataSource s: source )
+            res = s.reduce(res,acc);
+        return res;
+    }
+
+    public String toString () {
+        String p = "<"+counter;
+        for (DataSource s: source)
+            p += ","+s;
+        return p+">";
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/core/src/main/java/org/apache/mrql/DataSource.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/DataSource.java b/core/src/main/java/org/apache/mrql/DataSource.java
new file mode 100644
index 0000000..6a0ef4f
--- /dev/null
+++ b/core/src/main/java/org/apache/mrql/DataSource.java
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import org.apache.mrql.gen.*;
+import java.io.IOException;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Iterator;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.conf.Configuration;
+
+
+/** A DataSource is any input data source, such as a text file, a key/value map, a data base, an intermediate file, etc */
+public class DataSource {
+    final public static String separator = "%%%";
+    public static DataSourceDirectory dataSourceDirectory = new DataSourceDirectory();
+    public static ParserDirectory parserDirectory = new ParserDirectory();
+    private static boolean loaded = false;
+
+    public String path;
+    public Class<? extends MRQLFileInputFormat> inputFormat;
+    public int source_num;
+    public boolean to_be_merged;    // if the path is a directory with multiple files, merge them
+
+    final static class ParserDirectory extends HashMap<String,Class<? extends Parser>> {
+    }
+
+    /** A dictionary that maps data source paths to DataSource data.
+     * It assumes that each path can only be associated with a single data source format and parser
+     */
+    final static class DataSourceDirectory extends HashMap<String,DataSource> {
+        public void read ( Configuration conf ) {
+            clear();
+            for ( String s: conf.get("mrql.data.source.directory").split("@@@") ) {
+                String[] p = s.split("===");
+                put(p[0],DataSource.read(p[1],conf));
+            }
+        }
+
+        public String toString () {
+            String s = "";
+            for ( String k: keySet() )
+                s += "@@@"+k+"==="+get(k);
+            if (s.equals(""))
+                return s;
+            else return s.substring(3);
+        }
+
+        public DataSource get ( String name ) {
+            for ( Map.Entry<String,DataSource> e: entrySet() )
+                if (name.startsWith(e.getKey()))
+                    return e.getValue();
+            return null;
+        }
+
+        public void distribute ( Configuration conf ) {
+            conf.set("mrql.data.source.directory",toString());
+        }
+    }
+
+    DataSource () {}
+
+    DataSource ( int source_num,
+                 String path,
+                 Class<? extends MRQLFileInputFormat> inputFormat,
+                 Configuration conf ) {
+        this.source_num = source_num;
+        this.path = path;
+        this.inputFormat = inputFormat;
+        to_be_merged = false;
+        try {
+            Path p = new Path(path);
+            FileSystem fs = p.getFileSystem(conf);
+            String complete_path = fs.getFileStatus(p).getPath().toString();
+            //String complete_path = "file:"+path;
+            this.path = complete_path;
+            dataSourceDirectory.put(this.path,this);
+        } catch (IOException e) {
+            throw new Error(e);
+        }
+    }
+
+    public static void loadParsers() {
+        if (!loaded) {
+            DataSource.parserDirectory.put("xml",XMLParser.class);
+            DataSource.parserDirectory.put("json",JsonFormatParser.class);
+            DataSource.parserDirectory.put("line",LineParser.class);
+            loaded = true;
+        }
+    }
+
+    static {
+        loadParsers();
+    }
+
+    private static long size ( Path path, Configuration conf ) throws IOException {
+        FileStatus s = path.getFileSystem(conf).getFileStatus(path);
+        if (!s.isDir())
+            return s.getLen();
+        long size = 0;
+        for ( FileStatus fs: path.getFileSystem(conf).listStatus(path) )
+            size += fs.getLen();
+        return size;
+    }
+
+    /** data set size in bytes */
+    public long size ( Configuration conf ) {
+        try {
+            return size(new Path(path),conf);
+        } catch (IOException e) {
+            throw new Error(e);
+        }
+    }
+
+    public static DataSource read ( String buffer, Configuration conf ) {
+        try {
+            String[] s = buffer.split(separator);
+            int n = Integer.parseInt(s[1]);
+            if (s[0].equals("Binary"))
+                return new BinaryDataSource(n,s[2],conf);
+            else if (s[0].equals("Generator"))
+                return new GeneratorDataSource(n,s[2],conf);
+            else if (s[0].equals("Text"))
+                return new ParsedDataSource(n,s[3],parserDirectory.get(s[2]),((Node)Tree.parse(s[4])).children(),conf);
+            else throw new Error("Unrecognized data source: "+s[0]);
+        } catch (Exception e) {
+            throw new Error(e);
+        }
+    }
+
+    public static DataSource get ( String path, Configuration conf ) {
+        if (dataSourceDirectory.isEmpty())
+            dataSourceDirectory.read(conf);
+        return dataSourceDirectory.get(path);
+    }
+
+    public static DataSource getCached ( String remote_path, String local_path, Configuration conf ) {
+        DataSource ds = get(remote_path,conf);
+        ds.path = local_path;
+        dataSourceDirectory.put(local_path,ds);
+        return ds;
+    }
+
+    /** return the first num values */
+    public List<MRData> take ( int num ) {
+        int count = num;
+        try {
+            ArrayList<MRData> res = new ArrayList<MRData>();
+            Iterator<MRData> it = inputFormat.newInstance().materialize(new Path(path)).iterator();
+            for ( int i = num; (num < 0 || i > 0) && it.hasNext(); i-- )
+                if (Config.hadoop_mode && Config.bsp_mode)
+                    res.add(((Tuple)it.next()).get(1));  // strip tag in BSP mode
+                else res.add(it.next());
+            return res;
+        } catch (Exception ex) {
+            throw new Error(ex);
+        }
+    }
+
+    static Tuple tuple_container = new Tuple(new Tuple(),new Tuple());
+
+    /** accumulate all datasource values */
+    public MRData reduce ( MRData zero, final Function acc ) {
+        try {
+            MRData res = zero;
+            for ( MRData x: inputFormat.newInstance().materialize(new Path(path)) ) {
+                if (Config.hadoop_mode && Config.bsp_mode)
+                    x = ((Tuple)x).get(1); // strip tag in BSP mode
+                tuple_container.set(0,res).set(1,x);
+                res = acc.eval(tuple_container);
+            };
+            return res;
+        } catch (Exception ex) {
+            throw new Error(ex);
+        }
+    }
+}