You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mrql.apache.org by fe...@apache.org on 2014/03/13 15:24:35 UTC

[10/26] MRQL-32: Refactoring directory structure for Eclipse

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/src/main/java/core/Bag.java
----------------------------------------------------------------------
diff --git a/src/main/java/core/Bag.java b/src/main/java/core/Bag.java
deleted file mode 100644
index b092f30..0000000
--- a/src/main/java/core/Bag.java
+++ /dev/null
@@ -1,578 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.mrql;
-
-import java.util.*;
-import java.io.*;
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.fs.*;
-
-
-/**
- *   A sequence of MRData.
- *   There are 3 kinds of Bag implementations, which are converted at run-time, when necessary:
- *   1) vector-based (materialized): used for small bags (when size is less than Config.max_materialized_bag);
- *   2) stream-based: can be traversed only once; implemented as Java iterators;
- *   3) spilled to a local file: can be accessed multiple times
- */
-public class Bag extends MRData implements Iterable<MRData> {
-    private final static long serialVersionUID = 64629834894869L;
-    enum Modes { STREAMED, MATERIALIZED, SPILLED };
-    private transient Modes mode;
-    private transient ArrayList<MRData> content;      // content of a materialized bag
-    private transient BagIterator iterator;           // iterator for a streamed bag
-    private transient boolean consumed;               // true, if the stream has already been used
-    private transient String path;                    // local path that contains the spilled bag
-    private transient SequenceFile.Writer writer;     // the file writer for spiled bags
-
-    /**
-     * create an empty bag as an ArrayList
-     */
-    public Bag () {
-        mode = Modes.MATERIALIZED;
-        content = new ArrayList<MRData>();
-    }
-
-    /**
-     * create an empty bag as an ArrayList with a given capacity
-     * @param size initial capacity
-     */
-    public Bag ( final int size ) {
-        mode = Modes.MATERIALIZED;
-        content = new ArrayList<MRData>(size);
-    }
-
-    /**
-     * in-memory Bag construction (an ArrayList) initialized with data
-     * @param as a vector of MRData to insert in the Bag
-     */
-    public Bag ( final MRData ...as ) {
-        mode = Modes.MATERIALIZED;
-        content = new ArrayList<MRData>(as.length);
-        for ( MRData a: as )
-            content.add(a);
-    }
-
-    /**
-     * in-memory Bag construction (an ArrayList) initialized with data
-     * @param as a vector of MRData to insert in the Bag
-     */
-    public Bag ( final List<MRData> as ) {
-        mode = Modes.MATERIALIZED;
-        content = new ArrayList<MRData>(as.size());
-        for ( MRData a: as )
-            content.add(a);
-    }
-
-    /**
-     * lazy construction (stream-based) of a Bag
-     * @param i the Iterator that generates the Bag elements
-     */
-    public Bag ( final BagIterator i ) {
-        mode = Modes.STREAMED;
-        iterator = i;
-        consumed = false;
-    }
-
-    /** is the Bag stored in an ArrayList? */
-    public boolean materialized () {
-        return mode == Modes.MATERIALIZED;
-    }
-
-    /** is the Bag stream-based? */
-    public boolean streamed () {
-        return mode == Modes.STREAMED;
-    }
-
-    /** is the Bag spilled into a file? */
-    public boolean spilled () {
-        return mode == Modes.SPILLED;
-    }
-
-    /** return the Bag size (cache it in memory if necessary) */
-    public int size () {
-        if (materialized())
-            return content.size();
-        if (streamed() && consumed)
-            throw new Error("*** The collection stream has already been consumed");
-        int i = 0;
-        for ( MRData e: this )
-            i++;
-        if (streamed())
-            consumed = true;
-        return i;
-    }
-
-    /** trim the ArrayList that caches the Bag */
-    public void trim () {
-        if (materialized())
-            content.trimToSize();
-    }
-
-    /** get the n'th element of a Bag (cache it in memory if necessary)
-     * @param n the index
-     * @return the n'th element
-     */
-    public MRData get ( final int n ) {
-        if (materialized())
-            if (n < size())
-                return content.get(n);
-            else throw new Error("List index out of range: "+n);
-        if (streamed() && consumed)
-            throw new Error("*** The collection stream has already been consumed");
-        int i = 0;
-        for ( MRData e: this )
-            if (i++ == n)
-                return e;
-        if (streamed())
-            consumed = true;
-        throw new Error("Cannot retrieve the "+n+"th element of a sequence");
-    }
-
-    /** replace the n'th element of a Bag with a new value
-     * @param n the index
-     * @param value the new value
-     * @return the Bag
-     */
-    public Bag set ( final int n, final MRData value ) {
-        if (!materialized())
-            throw new Error("Cannot replace an element of a non-materialized sequence");
-        content.set(n,value);
-        return this;
-    }
-
-    /** add a new value to a Bag (cache it in memory if necessary)
-     * @param x the new value
-     */
-    public void add ( final MRData x ) {
-        materialize();
-        if (!spilled() && Config.hadoop_mode
-             && size() >= Config.max_materialized_bag)
-            spill();
-        if (spilled())
-            try {
-                if (writer == null) {   // writer was closed earlier for reading
-                    FileSystem fs = FileSystem.getLocal(Plan.conf);
-                    writer = SequenceFile.createWriter(fs,Plan.conf,new Path(path),
-                                                       MRContainer.class,NullWritable.class,
-                                                       SequenceFile.CompressionType.NONE);
-                    System.err.println("*** Appending elements to a spilled Bag: "+path);
-                };
-                writer.append(new MRContainer(x),NullWritable.get());
-            } catch (IOException e) {
-                throw new Error("Cannot append an element to a spilled Bag: "+path);
-            }
-        else content.add(x);
-    }
-
-    /** add a new value to a Bag (cache it in memory if necessary)
-     * @param x the new value
-     * @return the Bag
-     */
-    public Bag add_element ( final MRData x ) {
-        add(x);
-        return this;
-    }
-
-    /** add the elements of a Bag to the end of this Bag
-     * @param b the Bag whose elements are copied
-     * @return the Bag
-     */
-    public Bag addAll ( final Bag b ) {
-        for ( MRData e: b )
-            add(e);
-        return this;
-    }
-
-    /** make this Bag empty (cache it in memory if necessary) */
-    public void clear () {
-        if (materialized())
-            content.clear();
-        else if (streamed()) {
-            if (writer != null)
-                try {
-                    writer.close();
-                } catch (IOException ex) {
-                    throw new Error(ex);
-                };
-            writer = null;
-            path = null;
-            mode = Modes.MATERIALIZED;
-            content = new ArrayList<MRData>(100);
-        };
-        mode = Modes.MATERIALIZED;
-        content = new ArrayList<MRData>();
-    }
-
-    /** cache the Bag to an ArrayList when is absolutely necessary */
-    public void materialize () {
-        if (materialized() || spilled())
-            return;
-        Iterator<MRData> iter = iterator();
-        mode = Modes.MATERIALIZED;
-        writer = null;
-        path = null;
-        content = new ArrayList<MRData>(100);
-        while ( iter.hasNext() )
-            add(iter.next());
-        if (materialized())    // it may have been spilled
-            content.trimToSize();
-        iterator = null;
-    }
-
-    private static Random random_generator = new Random();
-
-    private static String new_path ( FileSystem fs ) throws IOException {
-        Path p;
-        do {
-            p = new Path("file://"+Config.tmpDirectory+"/mrql"+(random_generator.nextInt(1000000)));
-        } while (p.getFileSystem(Plan.conf).exists(p));
-        String path = p.toString();
-        Plan.temporary_paths.add(path);
-        return path;
-    }
-
-    /** spill the Bag to a local file */
-    private void spill () {
-        if (!spilled() && Config.hadoop_mode)
-            try {
-                if (Plan.conf == null)
-                    Plan.conf = Evaluator.evaluator.new_configuration();
-                final FileSystem fs = FileSystem.getLocal(Plan.conf);
-                path = new_path(fs);
-                System.err.println("*** Spilling a Bag to a local file: "+path);
-                final Path p = new Path(path);
-                writer = SequenceFile.createWriter(fs,Plan.conf,new Path(path),
-                                                   MRContainer.class,NullWritable.class,
-                                                   SequenceFile.CompressionType.NONE);
-                for ( MRData e: this )
-                    writer.append(new MRContainer(e),NullWritable.get());
-                mode = Modes.SPILLED;
-                content = null;
-                iterator = null;
-            } catch (Exception e) {
-                throw new Error("Cannot spill a Bag to a local file");
-            }
-    }
-
-    /**
-     * sort the Bag (cache it in memory if necessary).
-     * If the Bag was spilled during caching, use external sorting
-     */
-    public void sort () {
-        materialize();
-        if (spilled())  // if it was spilled during materialize()
-            try {       // use external sorting
-                if (writer != null)
-                    writer.close();
-                FileSystem fs = FileSystem.getLocal(Plan.conf);
-                SequenceFile.Sorter sorter
-                    = new SequenceFile.Sorter(fs,new Plan.MRContainerKeyComparator(),
-                                              MRContainer.class,NullWritable.class,Plan.conf);
-                String out_path = new_path(fs);
-                System.err.println("*** Using external sorting on a spilled bag "+path+" -> "+out_path);
-                sorter.setMemory(64*1024*1024);
-                sorter.sort(new Path(path),new Path(out_path));
-                path = out_path;
-                writer = null;
-            } catch (Exception ex) {
-                throw new Error("Cannot sort a spilled bag");
-            }
-        else Collections.sort(content);
-    }
-
-    /** return the Bag Iterator */
-    public Iterator<MRData> iterator () {
-        if (spilled())
-            try {
-                if (writer != null)
-                    writer.close();
-                writer = null;
-                return new BagIterator () {
-                    final FileSystem fs = FileSystem.getLocal(Plan.conf);
-                    final SequenceFile.Reader reader = new SequenceFile.Reader(fs,new Path(path),Plan.conf);
-                    final MRContainer key = new MRContainer();
-                    final NullWritable value = NullWritable.get();
-                    MRData data;
-                    public boolean hasNext () {
-                        try {
-                            if (!reader.next(key,value)) {
-                                reader.close();
-                                return false;
-                            };
-                            data = key.data();
-                            return true;
-                        } catch (IOException e) {
-                            throw new Error("Cannot collect values from a spilled Bag");
-                        }
-                    }
-                    public MRData next () {
-                        return data;
-                    }
-                };
-            } catch (IOException e) {
-                throw new Error("Cannot collect values from a spilled Bag");
-            }
-        else if (materialized())
-            return content.iterator();
-        else {
-            if (consumed)  // this should never happen
-                throw new Error("*** The collection stream has already been consumed");
-            consumed = true;
-            return iterator;
-        }
-    }
-
-    /** cache MRData in memory by caching all Bags at any place and depth in MRData */
-    public void materializeAll () {
-        materialize();
-        for (MRData e: this)
-            e.materializeAll();
-    }
-
-    /** concatenate the elements of a given Bag to the elements of this Bag.
-     * Does not change either Bag
-     * @param s the given Bag
-     * @return a new Bag
-     */
-    public Bag union ( final Bag s ) {
-        final Iterator<MRData> i1 = iterator();
-        final Iterator<MRData> i2 = s.iterator();
-        return new Bag(new BagIterator () {
-                boolean first = true;
-                public boolean hasNext () {
-                    if (first)
-                        if (i1.hasNext())
-                            return true;
-                        else {
-                            first = false;
-                            return i2.hasNext();
-                        }
-                    else return i2.hasNext();
-                }
-                public MRData next () {
-                    if (first)
-                        return i1.next();
-                    else return i2.next();
-                }
-            });
-    }
-
-    /** does this Bag contain an element?
-     * Cache this Bag in memory befor tetsing if necessary
-     * @param x the element to find
-     */
-    public boolean contains ( final MRData x ) {
-        if (materialized())
-            return content.contains(x);
-        if (streamed() && consumed)
-            throw new Error("*** The collection stream has already been consumed");
-        for ( MRData e: this )
-            if (x.equals(e))
-                return true;
-        if (streamed())
-            consumed = true;
-        return false;
-    }
-
-    /** if this Bag is a Map from keys to values (a Bag of (key,value) pairs),
-     * find the value with the given key; raise an error if not found
-     * @param key the search key
-     * @return the value associated with the key
-     */
-    public MRData map_find ( final MRData key ) {
-        if (streamed() && consumed)
-            throw new Error("*** The collection stream has already been consumed");
-        for ( MRData e: this ) {
-            Tuple p = (Tuple) e;
-            if (key.equals(p.first()))
-                return p.second();
-        };
-        if (streamed())
-            consumed = true;
-        throw new Error("key "+key+" not found in map");
-    }
-
-    /** if this Bag is a Map from keys to values (a Bag of (key,value) pairs),
-     * does it contain a given key?
-     * @param key the search key
-     */
-    public boolean map_contains ( final MRData key ) {
-        if (streamed() && consumed)
-            throw new Error("*** The collection stream has already been consumed");
-        for ( MRData e: this )
-            if (key.equals(((Tuple)e).first()))
-                return true;
-        if (streamed())
-            consumed = true;
-        return false;
-    }
-
-    /** the output serializer for Bag.
-     * Stream-based Bags are serialized lazily (without having to cache the Bag in memory)
-     */
-    final public void write ( DataOutput out ) throws IOException {
-        if (materialized()) {
-            out.writeByte(MRContainer.BAG);
-            WritableUtils.writeVInt(out,size());
-            for ( MRData e: this )
-                e.write(out);
-        } else {
-            out.writeByte(MRContainer.LAZY_BAG);
-            for ( MRData e: this )
-                e.write(out);
-            out.writeByte(MRContainer.END_OF_LAZY_BAG);
-        }
-    }
-
-    /** the input serializer for Bag */
-    final public static Bag read ( DataInput in ) throws IOException {
-        int n = WritableUtils.readVInt(in);
-        Bag bag = new Bag(n);
-        for ( int i = 0; i < n; i++ )
-            bag.add(MRContainer.read(in));
-        return bag;
-    }
-
-    /** a lazy input serializer for a Bag (it doesn't need to cache a Bag in memory) */
-    public static Bag lazy_read ( final DataInput in ) throws IOException {
-        Bag bag = new Bag(100);
-        MRData data = MRContainer.read(in);
-        while (data != MRContainer.end_of_lazy_bag) {
-            bag.add(data);
-            data = MRContainer.read(in);
-        };
-        if (bag.materialized())
-            bag.content.trimToSize();
-        return bag;
-    }
-
-    /** the input serializer for Bag */
-    public void readFields ( DataInput in ) throws IOException {
-        int n = WritableUtils.readVInt(in);
-        mode = Modes.MATERIALIZED;
-        iterator = null;
-        path = null;
-        writer = null;
-        if (content == null)
-            content = new ArrayList<MRData>(n);
-        else {
-            content.clear();
-            content.ensureCapacity(n);
-        };
-        for ( int i = 0; i < n; i++ )
-            add(MRContainer.read(in));
-    }
-
-    private void writeObject ( ObjectOutputStream out ) throws IOException {
-        materialize();
-        WritableUtils.writeVInt(out,size());
-        for ( MRData e: this )
-            e.write(out);
-    }
-
-    private void readObject ( ObjectInputStream in ) throws IOException, ClassNotFoundException {
-        int n = WritableUtils.readVInt(in);
-        mode = Modes.MATERIALIZED;
-        iterator = null;
-        path = null;
-        writer = null;
-        content = new ArrayList<MRData>(n);
-        for ( int i = 0; i < n; i++ )
-            add(MRContainer.read(in));
-    }
-
-    private void readObjectNoData () throws ObjectStreamException { };
-
-    /** compare this Bag with a given Bag by comparing their associated elements */
-    public int compareTo ( MRData x ) {
-        Bag xt = (Bag)x;
-        Iterator<MRData> xi = xt.iterator();
-        Iterator<MRData> yi = iterator();
-        while ( xi.hasNext() && yi.hasNext() ) {
-            int c = xi.next().compareTo(yi.next());
-            if (c < 0)
-                return -1;
-            else if (c > 0)
-                return 1;
-        };
-        if (xi.hasNext())
-            return -1;
-        else if (yi.hasNext())
-            return 1;
-        else return 0;
-    }
-
-    /** compare this Bag with a given Bag by comparing their associated elements */
-    final public static int compare ( byte[] x, int xs, int xl, byte[] y, int ys, int yl, int[] size ) {
-        try {
-            int xn = WritableComparator.readVInt(x,xs);
-            int xx = WritableUtils.decodeVIntSize(x[xs]);
-            int yn = WritableComparator.readVInt(y,ys);
-            int yy = WritableUtils.decodeVIntSize(y[ys]);
-            for ( int i = 0; i < xn && i < yn; i++ ) {
-                int k = MRContainer.compare(x,xs+xx,xl-xx,y,ys+yy,yl-yy,size);
-                if (k != 0)
-                    return k;
-                xx += size[0];
-                yy += size[0];
-            };
-            size[0] = xx+1;
-            if (xn > yn)
-                return 1;
-            if (xn < yn)
-                return -1;
-            return 0;
-        } catch (IOException e) {
-            throw new Error(e);
-        }
-    }
-
-    /** is this Bag equal to another Bag (order is important) */
-    public boolean equals ( Object x ) {
-        if (!(x instanceof Bag))
-            return false;
-        Bag xt = (Bag) x;
-        Iterator<MRData> xi = xt.iterator();
-        Iterator<MRData> yi = iterator();
-        while ( xi.hasNext() && yi.hasNext() )
-            if ( !xi.next().equals(yi.next()) )
-                return false;
-        return xi.hasNext() || yi.hasNext();
-    }
-
-    /** the hash code of this Bag is the XOR of the hash code of its elements */
-    public int hashCode () {
-        int h = 127;
-        for ( MRData e: this )
-            h ^= e.hashCode();
-        return Math.abs(h);
-    }
-
-    /** show the first few Bag elements (controlled by -bag_print) */
-    public String toString () {
-        materialize();
-        StringBuffer b = new StringBuffer("{ ");
-        int i = 0;
-        for ( MRData e: this )
-            if ( i++ < Config.max_bag_size_print )
-                b.append(((i>1)?", ":"")+e);
-            else return b.append(", ... }").toString();
-        return b.append(" }").toString();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/src/main/java/core/BagIterator.java
----------------------------------------------------------------------
diff --git a/src/main/java/core/BagIterator.java b/src/main/java/core/BagIterator.java
deleted file mode 100644
index 0fd7354..0000000
--- a/src/main/java/core/BagIterator.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.mrql;
-
-import java.util.Iterator;
-
-/** an Iterator over Bags */
-abstract public class BagIterator implements Iterator<MRData> {
-    public void remove () {
-        throw new Error("Bag deletions are not permitted");
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/src/main/java/core/BinaryDataSource.java
----------------------------------------------------------------------
diff --git a/src/main/java/core/BinaryDataSource.java b/src/main/java/core/BinaryDataSource.java
deleted file mode 100644
index d4338ec..0000000
--- a/src/main/java/core/BinaryDataSource.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.mrql;
-
-import org.apache.hadoop.conf.Configuration;
-
-
-/** A DataSource used for storing intermediate results and data dumps */
-final public class BinaryDataSource extends DataSource {
-    BinaryDataSource ( int source_num, String path, Configuration conf ) {
-        super(source_num,path,Evaluator.evaluator.binaryInputFormat(),conf);
-    }
-
-    BinaryDataSource ( String path, Configuration conf ) {
-        super(-1,path,Evaluator.evaluator.binaryInputFormat(),conf);
-    }
-
-    public String toString () {
-        return "Binary"+separator+source_num+separator+path;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/src/main/java/core/ClassImporter.java
----------------------------------------------------------------------
diff --git a/src/main/java/core/ClassImporter.java b/src/main/java/core/ClassImporter.java
deleted file mode 100644
index 8efc1ae..0000000
--- a/src/main/java/core/ClassImporter.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.mrql;
-
-import java.lang.reflect.Method;
-import org.apache.mrql.gen.*;
-import java.util.*;
-
-/** imports external Java methods into MRQL */
-final public class ClassImporter {
-    final static boolean trace_imported_methods = false;
-
-    final static String[] object_methods
-        = { "hashCode", "getClass", "wait", "equals", "toString", "notify", "notifyAll" };
-
-    static Vector<MethodInfo> methods = new Vector<MethodInfo>();
-
-    public static void load_classes () {
-        if (methods == null)
-            methods = new Vector<MethodInfo>();
-        if (methods.size() == 0) {
-            importClass("org.apache.mrql.SystemFunctions");
-            //****** import your classes with user-defined functions here
-        }
-    }
-
-    private static boolean object_method ( String s ) {
-        for (int i = 0; i < object_methods.length; i++)
-            if (object_methods[i].equals(s))
-                return true;
-        return false;
-    }
-
-    private static Tree getType ( Class<?> c ) {
-        String cn = c.getCanonicalName();
-        Class<?>[] inf = c.getInterfaces();
-        if (cn.equals("org.apache.mrql.MRData"))
-            return new VariableLeaf("any");
-        if (cn.startsWith("org.apache.mrql.MR_"))
-            return new VariableLeaf(cn.substring(19));
-        if (cn.equals("org.apache.mrql.Bag"))
-            return new Node("bag",new Trees(new VariableLeaf("any")));
-        if (cn.equals("org.apache.mrql.Inv"))
-            return new VariableLeaf("any");
-        if (cn.equals("org.apache.mrql.Union"))
-            return new VariableLeaf("union");
-        if (cn.equals("org.apache.mrql.Lambda"))
-            return new VariableLeaf("any");
-        if (inf.length > 0 && inf[0].equals("org.apache.mrql.MRData"))
-            return new VariableLeaf("any");
-        throw new Error("Unsupported type in imported method: "+cn);
-    }
-
-    private static Trees signature ( Method m ) {
-        Class<?> co = m.getReturnType();
-        Class<?>[] cs = m.getParameterTypes();
-        Trees as = new Trees(getType(co));
-        for (int i = 0; i < cs.length; i++)
-            as = as.append(getType(cs[i]));
-        return as;
-    }
-
-    public static String method_name ( int method_number ) {
-        return methods.get(method_number).name;
-    }
-
-    public static Trees signature ( int method_number ) {
-        return methods.get(method_number).signature;
-    }
-
-    /** import all Java methods from a given Java class */
-    public static void importClass ( String class_name ) {
-        try {
-            Method[] ms = Class.forName(class_name).getMethods();
-            Vector<MethodInfo> mv = new Vector<MethodInfo>();
-            for (int i = 0; i < ms.length; i++)
-                if (!object_method(ms[i].getName()) && ms[i].getModifiers() == 9)
-                    try {
-                        Trees sig = signature(ms[i]);
-                        MethodInfo m = new MethodInfo(ms[i].getName(),sig,ms[i]);
-                        mv.add(m);
-                        methods.add(m);
-                    } catch ( Exception e ) {
-                        System.out.println("Warning: method "+ms[i].getName()+" cannot be imported");
-                        System.out.println(e);
-                        throw new Error("");
-                    };
-            Collections.sort(methods);
-            if (Translator.functions == null)
-                Translator.functions = Trees.nil;
-            for ( MethodInfo m: methods )
-                Translator.functions = Translator.functions.append(new Node(m.name,m.signature));
-            if (trace_imported_methods) {
-                System.out.print("Importing methods: ");
-                for (int i = 0; i < mv.size(); i++ )
-                    System.out.print(mv.get(i).name+mv.get(i).signature.tail()
-                                     +":"+mv.get(i).signature.head()+"  ");
-                System.out.println();
-            }
-        } catch (ClassNotFoundException x) {
-            throw new Error("Undefined class: "+class_name);
-        }
-    }
-
-    /** import a Java method with a given name from a given Java class */
-    public static void importMethod ( String class_name, String method_name ) {
-        try {
-            Method[] ms = Class.forName(class_name).getMethods();
-            MethodInfo m = null;
-            for (int i = 0; i < ms.length; i++)
-                if (ms[i].getName().equals(method_name)
-                    && !object_method(ms[i].getName()) && ms[i].getModifiers() == 9) {
-                    Trees sig = signature(ms[i]);
-                    m = new MethodInfo(ms[i].getName(),sig,ms[i]);
-                    Translator.functions = Translator.functions.append(new Node(ms[i].getName(),sig));
-                    break;
-                };
-            if (m == null)
-                throw new Error("No such method: "+method_name);
-            methods.add(m);
-            Collections.sort(methods);
-            if (trace_imported_methods)
-                System.out.println("Importing method: "+m.name+m.signature.tail()
-                                   +":"+m.signature.head()+"  ");
-        } catch (ClassNotFoundException x) {
-            throw new Error("Undefined class: "+class_name);
-        }
-    }
-
-    public static void print_methods () {
-        for (int i = 0; i < methods.size(); i++ ) {
-            MethodInfo m = methods.get(i);
-            System.out.print(" "+m.name+":"+m.signature.tail()+"->"+m.signature.head());
-        };
-    }
-
-    /** return the method specification of a system method with a given name over some expressions;
-     * When the method is overloaded, find the most specific (in terms of arg subtyping)
-     * @param method_name the given method name
-     * @param args the method expressions
-     * @return the method specification
-     */
-    public static Tree find_method ( String method_name, Trees args ) {
-        for (int i = 0; i < methods.size(); i++ ) {
-            MethodInfo m = methods.get(i);
-            if (m.name.equals(method_name) && TypeInference.subtype(args,m.signature.tail()))
-                return m.signature.head();
-        };
-        return null;
-    }
-
-    /** return the method number of a system method with a given name over some expressions;
-     * When the method is overloaded, find the most specific (in terms of arg subtyping)
-     * @param method_name the given method name
-     * @param args the method expressions
-     * @return the method number
-     */
-    public static int find_method_number ( String method_name, Trees args ) {
-        for (int i = 0; i < methods.size(); i++ ) {
-            MethodInfo m = methods.get(i);
-            if (m.name.equals(method_name) && TypeInference.subtype(args,m.signature.tail()))
-                return i;
-        };
-        return -1;
-    }
-
-    /** call a system method with a given number over MRData
-     * @param method_number the method number
-     * @param args in input arguments
-     * @return the result of invoking this method over the args
-     */
-    public static MRData call ( int method_number, MRData... args ) {
-        if (method_number < 0 || method_number >= methods.size())
-            throw new Error("Run-time error (unknown method name)");
-        MethodInfo m = methods.get(method_number);
-        try {
-            return (MRData)m.method.invoke(null,(Object[])args);
-        } catch (Exception e) {
-            Tuple t = new Tuple(args.length);
-            for ( int i = 0; i < args.length; i++ )
-                t.set(i,args[i]);
-            System.err.println("Run-time error in method call: "+m.name+t+" of type "
-                               +m.signature.tail()+"->"+m.signature.head());
-            throw new Error(e.toString());
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/src/main/java/core/Compiler.gen
----------------------------------------------------------------------
diff --git a/src/main/java/core/Compiler.gen b/src/main/java/core/Compiler.gen
deleted file mode 100644
index 668cdc8..0000000
--- a/src/main/java/core/Compiler.gen
+++ /dev/null
@@ -1,584 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.mrql;
-
-import org.apache.mrql.gen.*;
-import java.io.*;
-import javax.tools.*;
-import javax.tools.JavaCompiler.CompilationTask;
-import java.lang.reflect.*;
-import java.util.*;
-import java.net.*;
-import java.util.jar.*;
-import org.apache.hadoop.io.WritableComparable;
-
-
-/** compilation of MRQL expressions to Java code and then to Java bytecode */
-final public class Compiler extends Translator {
-    static JavaCompiler compiler = ToolProvider.getSystemJavaCompiler();
-    static DiagnosticCollector<JavaFileObject> diagnostics = new DiagnosticCollector<JavaFileObject>();
-    final static String tmp_dir = "/tmp/mrql_jar_"+System.getProperty("user.name");
-    public static String jar_path;
-    static Trees in_memory_repeat_vars = #[];
-    static int lambda_num = 0;
-    static int user_functions_num = 0;
-
-    /** Compile the MRQL functional arguments into Java bytecode */
-    final private static class JavaSourceFromString extends SimpleJavaFileObject {
-        final String code;
-
-        JavaSourceFromString ( String name, String code ) {
-            super(URI.create("string:///org/apache/mrql/" + name.replace('.','/') + Kind.SOURCE.extension),Kind.SOURCE);
-            this.code = code;
-        }
-
-        @Override
-        public CharSequence getCharContent ( boolean ignoreEncodingErrors ) {
-            return code;
-        }
-    }
-
-    private static String new_lambda_name () {
-        return "MRQL_Lambda_"+(lambda_num++);
-    }
-
-    private static void add2jar ( File source, int offset, JarOutputStream target ) throws IOException {
-        if (source.isDirectory()) {
-            String name = source.getPath();
-            if (name.length() > offset) {
-                JarEntry entry = new JarEntry(name.substring(offset));
-                entry.setTime(source.lastModified());
-                target.putNextEntry(entry);
-                target.closeEntry();
-            };
-            for ( File nestedFile: source.listFiles() )
-                add2jar(nestedFile,offset,target);
-        } else {
-            JarEntry entry = new JarEntry(source.getPath().substring(offset));
-            entry.setTime(source.lastModified());
-            target.putNextEntry(entry);
-            BufferedInputStream in = new BufferedInputStream(new FileInputStream(source));
-            byte[] buffer = new byte[1024];
-            int count = 1;
-            while (count > 0) {
-                count = in.read(buffer);
-                if (count > 0)
-                    target.write(buffer,0,count);
-            };
-            target.closeEntry();
-            in.close();
-        }
-    }
-
-    private static void remove ( File file ) throws IOException {
-        if (file.isDirectory())
-            for ( File nestedFile: file.listFiles() )
-                remove(nestedFile);
-        file.delete();
-    }
-
-    public static void clean () throws IOException {
-        in_memory_repeat_vars = #[];
-        remove(new File(tmp_dir));
-    }
-
-    final private static Tree compile ( Tree e, StringBuffer out ) throws Exception {
-        match e {
-        case repeat(lambda(`v,`body),`s,`n):
-            in_memory_repeat_vars = in_memory_repeat_vars.cons(v);
-            return #<repeat(lambda(`v,`(compile(body,out))),`(compile(s,out)),`n)>;
-        case `f(...al):
-            if (!plans_with_distributed_lambdas.member(#<`f>))
-                fail;
-            Trees nl = #[];
-            for ( Tree a: al)
-                match Interpreter.closure(a,Interpreter.global_env) {
-                case lambda(`v,`body):
-                    String fname = new_lambda_name();
-                    StringBuffer sb = new StringBuffer(1000);
-                    sb.append("final class "+fname+" extends Function {\n");
-                    sb.append(fname+" () {}\n");
-                    sb.append("final public MRData eval ( final MRData "+v
-                              +" ) { return "+compileE(body)+"; }\n}\n");
-                    out.append(sb);
-                    nl = nl.append(#<compiled(`fname,`a)>);
-                case _: nl = nl.append(compile(a,out));
-                };
-            return #<`f(...nl)>;
-        case `f(...al):
-            Trees nl = #[];
-            for ( Tree a: al)
-                nl = nl.append(compile(a,out));
-            return #<`f(...nl)>;
-        };
-        return e;
-    }
-
-    private static StringBuffer out;
-
-    /** compile the functional arguments of the MRQL operators using the Java compiler
-     * @param query the expression to compile
-     * @return the query with all functional arguments compiled to Java bytecode
-     */
-    final public static Tree compile ( Tree query ) {
-        try {
-            user_functions_num = lambda_num++;
-            // remove the old jar
-            if (jar_path != null)
-                remove(new File(jar_path));
-            jar_path = tmp_dir+"/mrql_args_"+(new Random().nextInt(1000000))+".jar";
-            out = new StringBuffer(1000);
-            out.append("package org.apache.mrql;\n");
-            Tree nq = compile(query,out);
-            StringBuffer sb = new StringBuffer(1000);
-            for ( String f: global_functions )
-                match global_functions.lookup(f) {
-                case function(tuple(...params),`otp,`body):
-                    sb.append("final public static "+get_MR_type(otp)+" "+f);
-                    if (params.is_empty())
-                        sb.append(" ()");
-                    else {
-                        match params.head() {
-                        case bind(`v,`tp):
-                            sb.append(" ( final "+get_MR_type(tp)+" "+v);
-                        };
-                        for ( Tree var: params.tail() )
-                            match var {
-                            case bind(`v,`tp):
-                                sb.append(", final "+get_MR_type(tp)+" "+v);
-                            }
-                        sb.append(" ) { return ("+get_MR_type(otp)+")");
-                        sb.append(compileE(body));
-                        sb.append("; }\n");
-                    }
-                };
-            out.append("final class UserFunctions_"+user_functions_num+" {\n");
-            out.append(sb);
-            out.append("}\n");
-            String code = out.toString();
-            //System.out.println(code);
-            JavaFileObject file = new JavaSourceFromString("UserFunctions_"+user_functions_num,code);
-            Iterable<? extends JavaFileObject> compilationUnits = Arrays.asList(file);
-            List<String> optionList = new ArrayList<String>();
-            (new File(tmp_dir)).mkdir();
-            String dir = tmp_dir+"/classes_"+(new Random().nextInt(1000000));
-            File fdir = new File(dir);
-            fdir.mkdir();
-            ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
-            String classpath = jar_path;
-            String separator = System.getProperty("path.separator");
-            for ( URL url: ((URLClassLoader) classLoader).getURLs() )
-                classpath += separator+url.getFile();
-            // use hadoop core jar
-            classpath += separator + WritableComparable.class.getProtectionDomain().getCodeSource().getLocation().toString();
-            optionList.addAll(Arrays.asList("-classpath",classpath));
-            optionList.addAll(Arrays.asList("-d",dir));
-            CompilationTask task = compiler.getTask(null,null,diagnostics,optionList,null,compilationUnits);
-            boolean success = task.call();
-            if (!success)
-                for ( Diagnostic d: diagnostics.getDiagnostics() )
-                    System.err.println("*** Compilation error at line "+d.getLineNumber()+" position "
-                                       +d.getColumnNumber()+": "+d.getMessage(Locale.US));
-            Manifest manifest = new Manifest();
-            manifest.getMainAttributes().put(Attributes.Name.MANIFEST_VERSION,"1.0");
-            JarOutputStream target = new JarOutputStream(new FileOutputStream(jar_path),manifest);
-            add2jar(new File(dir+"/"),dir.length()+1,target);
-            target.close();
-            remove(fdir);
-            return nq;
-        } catch (Exception e) {
-            System.err.println("*** Warning: Unable to compile the query:\n"+query);
-            if (Config.trace)
-                e.printStackTrace(System.err);
-            return query;
-        }
-    }
-
-    /** load the Java class of the anonymous function with name lambda_name */
-    final public static Function compiled ( ClassLoader cl, String lambda_name ) throws Exception {
-        URL[] urls = ((URLClassLoader) cl).getURLs();
-        URL[] new_urls = new URL[urls.length+1];
-        for ( int i = 0; i < urls.length; i++ )
-            new_urls[i+1] = urls[i];
-        new_urls[0] = new URL("file://"+jar_path);
-        URLClassLoader loader = new URLClassLoader(new_urls,cl);
-        Class c = loader.loadClass("org.apache.mrql."+lambda_name);
-        Constructor cc = c.getDeclaredConstructors()[0];
-        cc.setAccessible(true);
-        return (Function)cc.newInstance();
-    }
-
-    /** The Java type of an MRQL type */
-    private static String get_MR_type ( Tree type ) {
-        match type {
-        case boolean: return "MR_bool";
-        case byte: return "MR_byte";
-        case short: return "MR_short";
-        case int: return "MR_int";
-        case long: return "MR_long";
-        case float: return "MR_float";
-        case double: return "MR_double";
-        case char: return "MR_char";
-        case string: return "MR_string";
-        case union: return "Union";
-        case bag(...): return "Bag";
-        case list(...): return "Bag";
-        };
-        return "MRData";
-    }
-
-    private static Trees remove_duplicates ( Trees al ) {
-        if (al.is_empty())
-            return al;
-        Trees el = remove_duplicates(al.tail());
-        if (el.member(al.head()))
-            return el;
-        else return el.cons(al.head());
-    }
-
-    /** lambda lifting: generate Java code from an anonymous function */
-    private static String compilef ( String v, Tree body ) throws Exception {
-        String fname = new_lambda_name();
-        Trees free_vars = remove_duplicates(free_variables(body,#[`v]));
-        StringBuffer sb = new StringBuffer(1000);
-        sb.append("final class "+fname+" extends Function {\n");
-        for ( Tree var: free_vars )
-            sb.append("MRData "+var+"; ");
-        sb.append("\npublic "+fname+" (");
-        if (free_vars.is_empty())
-            sb.append(") {}\n");
-        else {
-            sb.append(" MRData "+free_vars.head());
-            for ( Tree var: free_vars.tail() )
-                sb.append(", MRData "+var);
-            sb.append(" ) { ");
-            for ( Tree var: free_vars )
-                sb.append("this."+var+" = "+var+"; ");
-            sb.append("}\n");
-        };
-        sb.append("final public MRData eval ( final MRData "+v
-                   +" ) { return "+compileE(body)+"; }\n}\n");
-        out.append(sb);
-        String s = "new "+fname+"(";
-        if (!free_vars.is_empty()) {
-            s += free_vars.head();
-            for ( Tree var: free_vars.tail() )
-                s += ","+var;
-        };
-        return s+")";
-    }
-
-    private static String compileF ( Tree fnc ) throws Exception {
-        match fnc {
-        case lambda(`v,`b):
-            return compilef(v.toString(),b);
-        case compiled(`f,`lm):
-            // recompile the function
-            String s = compileF(lm);
-            ((Node)fnc).children().head = new VariableLeaf(s.substring(4,s.indexOf("(")));  //destructive
-            return s;
-        case function(tuple(...params),`tp,`body):
-            String ret = "new Lambda(new Function () { "
-                         +"final public MRData eval ( final MRData _x ) { ";
-            for ( int i = 0; i < params.length(); i++ )
-                match params.nth(i) {
-                case bind(`v,_):
-                    ret += "final MRData "+v+" = ((Tuple)_x).get("+i+"); ";
-                };
-            return ret+" return "+compileE(body)+"; } })";
-        };
-        throw new Exception("Ill-formed lambda: "+fnc);
-    }
-
-    private static String compileEL ( Trees el ) throws Exception {
-        if (el.is_empty())
-            return "";
-        String ret = compileE(el.head());
-        for ( Tree a: el.tail() )
-            ret += ","+compileE(a);
-        return ret;
-    }
-
-    private static String compileE ( Tree e ) throws Exception {
-        if (e == null)
-            return "(new MR_byte(0))";
-        if (e.equals(#<true>))
-            return "(new MR_bool(true))";
-        else if (e.equals(#<false>))
-            return "(new MR_bool(false))";
-        else if (e.equals(#<null>))
-            return "(new MR_byte(0))";
-        else if (e.is_variable())
-            if (in_memory_repeat_vars.member(e))
-                return "Interpreter.lookup_global_binding(\""+e.toString()+"\")";
-            else return e.toString();
-        else if (e.is_long())
-            return "(new MR_int("+((LongLeaf)e).value()+"))";
-        else if (e.is_double())
-            return "(new MR_float("+((DoubleLeaf)e).value()+"))";
-        else if (e.is_string())
-            return "(new MR_string("+e.toString()+"))";
-        match e {
-        case callM(and,_,`x,`y):
-            return "(new MR_bool(((MR_bool)"+compileE(x)
-                +").get() && ((MR_bool)"+compileE(y)+").get()))";
-        case callM(or,_,`x,`y):
-            return "(new MR_bool(((MR_bool)"+compileE(x)
-                +").get() || ((MR_bool)"+compileE(y)+").get()))";
-        case callM(not,_,`x):
-            return "(new MR_bool(!((MR_bool)"+compileE(x)+").get()))";
-        case callM(`f,`n,...args):
-            if (!n.is_long())
-                fail;
-            String ret = "SystemFunctions."+ClassImporter.method_name((int)((LongLeaf)n).value())+"(";
-            Trees sig = ClassImporter.signature((int)((LongLeaf)n).value());
-            for (int i = 0; i < args.length(); i++)
-                ret += ((i > 0) ? ",(" : "(")+get_MR_type(sig.nth(i+1))+")("+compileE(args.nth(i))+")";
-            return ret+")";
-        case lambda(`v,`body):
-            return "new Lambda("+compilef(v.toString(),body)+")";
-        case nth(`x,`n):
-            return "(((Tuple)("+compileE(x)+")).get("+((LongLeaf)n).value()+"))";
-        case setNth(`x,`n,`v,`ret):
-            return "(((Tuple)("+compileE(x)+")).set("+((LongLeaf)n).value()+","+compileE(v)+","+compileE(ret)+"))";
-        case materialize(`u):
-            return "MapReduceAlgebra.materialize("+compileE(u)+")";
-        case let(`v,`u,`body):
-            return "(new Function () { public MRData eval ( final MRData "+v
-                +" ) { if ("+v+" instanceof Bag) ((Bag)"+v+").materialize(); return "
-                +compileE(body)+"; }; }).eval("+compileE(u)+")";
-        case cmap(`m,`s):
-            return "MapReduceAlgebra.cmap("+compileF(m)+",(Bag)("+compileE(s)+"))";
-        case filter(`p,`m,`s):
-            return "MapReduceAlgebra.filter("+compileF(p)+","+compileF(m)
-                   +",(Bag)"+compileE(s)+")";
-        case map(`m,`s):
-            return "MapReduceAlgebra.map("+compileF(m)+",(Bag)"+compileE(s)+")";
-        case range(`min,`max):
-            return "MapReduceAlgebra.generator(((MR_long)"+compileE(min)+").get(),"
-                   +"((MR_long)"+compileE(max)+").get())";
-        case call(`f,...args):
-            return "("+compileF(f)+".eval("+compileE(#<tuple(...args)>)+"))";
-        case tuple():
-            return "(new Tuple())";
-        case tuple(`x):
-            return "(new Tuple("+compileE(x)+"))";
-        case tuple(`a,...el):
-            String ret = "(new Tuple("+compileE(a);
-            for ( Tree x: el )
-                ret += ","+compileE(x);
-            return ret+"))";
-        case tagged_union(`n,`u):
-            return "(new Union((byte)"+((LongLeaf)n).value()+","+compileE(u)+"))";
-        case union_value(`x):
-            return "(((Union)"+compileE(x)+").value())";
-        case union_tag(`x):
-            return "(new MR_int(((Union)"+compileE(x)+").tag()))";
-        // used for shortcutting sync in bsp supersteps
-        case BAG():
-            return "SystemFunctions.bsp_empty_bag";
-        case TRUE():
-            return "SystemFunctions.bsp_true_value";
-        case FALSE():
-            return "SystemFunctions.bsp_false_value";
-        case `T():
-            if (is_collection(T))
-                return "(new Bag())";
-            else fail
-        case `T(e):
-            if (is_collection(T))
-                return "(new Bag("+compileE(e)+"))";
-            else fail
-        case `T(`a,...el):
-            if (!is_collection(T))
-                fail;
-            String ret = "(new Bag("+compileE(a);
-            for ( Tree x: el )
-                ret += ",(MRData)"+compileE(x);
-            return ret+"))";
-        case if(`c,`x,`y):
-            return "((((MR_bool)"+compileE(c)+").get())?"+compileE(x)+":"+compileE(y)+")";
-        case synchronize(`peer,`b):
-            return "SystemFunctions.synchronize(((MR_string)"+compileE(peer)+"),(MR_bool)"+compileE(b)+")";
-        case distribute(`peer,`s):
-            return "SystemFunctions.distribute(((MR_string)"+compileE(peer)+"),(Bag)"+compileE(s)+")";
-        case mapReduce(`mx,`my,`s,_):
-            return "MapReduceAlgebra.mapReduce("+compileF(mx)+","+compileF(my)+",(Bag)("+compileE(s)+"))";
-        case mapReduce2(`mx,`my,`r,`x,`y,_):
-            return "MapReduceAlgebra.mapReduce2("+compileF(mx)+","+compileF(my)+","+compileF(r)
-                   +",(Bag)("+compileE(x)+"),(Bag)("+compileE(y)+"))";
-        case mapJoin(`kx,`ky,`r,`x,`y):
-            return "MapReduceAlgebra.mapJoin("+compileF(kx)+","+compileF(ky)+","+compileF(r)
-                   +",(Bag)("+compileE(x)+"),(Bag)("+compileE(y)+"))";
-        case join(`kx,`ky,`r,`x,`y):
-            return "MapReduceAlgebra.join("+compileF(kx)+","+compileF(ky)+","+compileF(r)
-                   +",(Bag)("+compileE(x)+"),(Bag)("+compileE(y)+"))";
-        case groupBy(`s):
-            return "MapReduceAlgebra.groupBy((Bag)("+compileE(s)+"))";
-        case index(`x,`n):
-            return "SystemFunctions.index((Bag)("+compileE(x)+"),"+compileE(n)+")";
-        case range(`x,`i,`j):
-            return "SystemFunctions.range((Bag)("+compileE(x)+"),"+compileE(i)+","+compileE(j)+")";
-        case map_index(`x,`key):
-            return "((Bag)("+compileE(x)+")).map_find("+compileE(key)+")";
-        case aggregate(`acc,`zero,`s):
-            return "MapReduceAlgebra.aggregate("+compileF(acc)+","+compileE(zero)+",(Bag)("+compileE(s)+"))";
-        case Aggregate(`acc,`zero,`s):
-            return "MapReducePlan.aggregate("+compileF(acc)+","+compileE(zero)+","+compileM(s)+")";
-        case mergeGroupByJoin(`kx,`ky,`gx,`gy,`m,`c,`r,`x,`y,`o):
-            return "MapReduceAlgebra.mergeGroupByJoin("+compileF(kx)+","+compileF(ky)
-                   +","+compileF(gx)+","+compileF(gy)+","+compileF(m)+","+compileF(c)
-                   +","+compileF(r)+",(Bag)"+compileE(x)+",(Bag)"+compileE(y)+")";
-        case function(tuple(...params),`tp,`body):
-            return compileF(e);
-        case typed(`x,_):
-            return compileE(x);
-        case apply(`f,tuple(...args)):
-            if (!f.is_variable())
-                fail;
-            match global_functions.lookup(f.toString()) {
-            case function(tuple(...params),`otp,`body):
-                String ret = "UserFunctions_"+user_functions_num+"."+f+"(";
-                if (args.is_empty())
-                    return ret+")";
-                for ( int i = 0; i < params.length(); i++ )
-                    match params.nth(i) {
-                    case bind(_,`tp):
-                        ret += ((i==0)?"":",")+"("+get_MR_type(tp)+")"+compileE(args.nth(i));
-                    };
-                return ret+")";
-            };
-        case apply(`f,`arg):
-            if (!f.is_variable())
-                fail;
-            match global_functions.lookup(f.toString()) {
-            case function(tuple(...params),`otp,`body):
-                String ac = compileE(arg);
-                String ret = "UserFunctions_"+user_functions_num+"."+f+"(";
-                for ( int i = 0; i < params.length(); i++ )
-                    match params.nth(i) {
-                    case bind(_,`tp):
-                        ret += ((i==0)?"":",")+"("+get_MR_type(tp)+")((Tuple)"+ac+").get("+i+")";
-                    };
-                return ret+")";
-            };
-        case apply(`f,`arg):
-            if (!f.is_variable())
-                return "("+compileF(f)+").eval("+compileE(arg)+")";
-            else return "(((Lambda)"+compileE(f)+").lambda().eval("+compileE(arg)+"))";
-        case Collect(`s):
-            return "Plan.collect("+compileM(s)+")";
-        case trace(`x):
-            return compileE(x);
-        case _:
-            return compileM(e);
-        };
-        throw new Exception("Cannot compile: "+e);
-    }
-
-    final private static String compileM ( Tree e ) throws Exception {
-        match e {
-        case cMap(`f,`s):
-            return "MapReduceAlgebra.cmap("+compileF(f)+",(Bag)"+compileM(s)+")";
-        case AggregateMap(`f,`acc,`zero,`s):
-            return "new Bag(MapReduceAlgebra.aggregate("+compileF(acc)+","+compileE(zero)
-                   +","+compileM(#<cMap(`f,`s)>)+"))";
-        case MapReduce(`m,`r,`s,_):
-            return "MapReduceAlgebra.mapReduce("+compileF(m)+","
-                   +compileF(r)+",(Bag)"+compileM(s)+")";
-        case MapAggregateReduce(`m,`r,`acc,`zero,`s,_):
-            return "new Bag(MapReduceAlgebra.aggregate("+compileF(acc)
-                   +","+compileE(zero)+","+compileM(#<MapReduce(`m,`r,`s)>)+"))";
-        case MapCombineReduce(`m,`c,`r,`s,_):
-            return "MapReduceAlgebra.mapReduce("+compileF(m)
-                   +","+compileF(r)+",(Bag)"+compileM(s)+")";
-        case MapReduce2(`mx,`my,`c,`r,`x,`y,_):
-            return "MapReduceAlgebra.mapReduce2("+compileF(mx)+","+compileF(my)
-                   +","+compileF(r)+",(Bag)"+compileM(x)+",(Bag)"+compileM(y)+")";
-        case MapReduce2(`mx,`my,`r,`x,`y,_):
-            return "MapReduceAlgebra.mapReduce2("+compileF(mx)+","+compileF(my)
-                   +","+compileF(r)+",(Bag)"+compileM(x)+",(Bag)"+compileM(y)+")";
-        case MapAggregateReduce2(`mx,`my,`r,`acc,`zero,`x,`y,_):
-            return "new Bag(MapReduceAlgebra.aggregate("+compileF(acc)+","+compileE(zero)
-                   +","+compileM(#< MapReduce2(`mx,`my,`r,`x,`y)>)+"))";
-        case MapJoin(`kx,`ky,`r,`x,`y):
-            return "MapReduceAlgebra.mapJoin("+compileF(kx)+","+compileF(ky)
-                   +","+compileF(r)+",(Bag)"+compileM(x)+",(Bag)"+compileM(y)+")";
-        case MapAggregateJoin(`kx,`ky,`r,`acc,`zero,`x,`y):
-            return "new Bag(MapReduceAlgebra.aggregate("+compileF(acc)
-                   +","+compileE(zero)+","+compileM(#<MapJoin(`kx,`ky,`r,`x,`y)>)+"))";
-        case GroupByJoin(`kx,`ky,`gx,`gy,`m,`c,`r,`x,`y,`o):
-            return "MapReduceAlgebra.groupByJoin("+compileF(kx)+","+compileF(ky)
-                   +","+compileF(gx)+","+compileF(gy)+","+compileF(m)+","+compileF(c)
-                   +","+compileF(r)+",(Bag)"+compileM(x)+",(Bag)"+compileM(y)+")";
-        case CrossProduct(`mx,`my,`r,`x,`y):
-            return "MapReduceAlgebra.crossProduct("+compileF(mx)+","+compileF(my)
-                   +","+compileF(r)+",(Bag)"+compileM(x)+",(Bag)"+compileM(y)+")";
-        case CrossAggregateProduct(`mx,`my,`r,`acc,`zero,`x,`y):
-            return "new Bag(MapReduceAlgebra.aggregate("+compileF(acc)+","+compileE(zero)
-                   +","+compileM(#<CrossProduct(`mx,`my,`r,`x,`y)>)+"))";
-        case BSPSource(`n,BinarySource(`file,_)):
-            if (Config.hadoop_mode)
-                return "Plan.binarySource("+compileE(n)+",((MR_string)"
-                       +compileE(file)+").get())";
-            else return "(Bag)MapReduceAlgebra.read_binary("+compileE(n)+",((MR_string)"
-                        +compileE(file)+").get())";
-        case BinarySource(`file,_):
-            if (Config.hadoop_mode)
-                return "Plan.binarySource(((MR_string)"+compileE(file)+").get())";
-            else return "(Bag)MapReduceAlgebra.read_binary(((MR_string)"+compileE(file)+").get())";
-        case BSPSource(`n,ParsedSource(`parser,`file,...args)):
-            if (!(n instanceof LongLeaf))
-                fail;
-            if (!Config.hadoop_mode)
-                return "MapReduceAlgebra.parsedSource(((MR_int)"+compileE(n)+").get(),\""
-                       +parser+"\",((MR_string)"+compileE(file)+").get(),"
-                       +reify(args)+")";
-            Class<? extends Parser> p = DataSource.parserDirectory.get(parser.toString());
-            if (p == null)
-                throw new Error("Unknown parser: "+parser);
-            return "Plan.parsedSource("+p.getName()+".class,((MR_string)"+compileE(file)+").get(),"
-                   +reify(args)+")";
-        case ParsedSource(`parser,`file,...args):
-            if (!Config.hadoop_mode)
-                return "MapReduceAlgebra.parsedSource(\""+parser+"\",((MR_string)"
-                       +compileE(file)+").get(),"+reify(args)+")";
-            Class<? extends Parser> p = DataSource.parserDirectory.get(parser.toString());
-            if (p == null)
-                throw new Error("Unknown parser: "+parser);
-            return "Plan.parsedSource("+p.getName()+".class,((MR_string)"+compileE(file)+").get(),"
-                   +reify(args)+")";
-        case Merge(`x,`y):
-            return "((Bag)"+compileM(x)+").union((Bag)"+compileM(y)+")";
-        case Generator(`min,`max,`size):
-            return "MapReduceAlgebra.generator(((MR_long)"+compileE(min)+").get(),"
-                   +"((MR_long)"+compileE(max)+").get())";
-        case BSP(`n,`superstep,`state,`o,...as):
-            String ds = "";
-            for ( Tree a: as )
-                ds += ",(Bag)("+compileM(a)+")";
-            return "MapReduceAlgebra.BSP("+((LongLeaf)n).value()+","
-                   +compileF(superstep)+","+compileE(state)+","+o+","
-                   +"new Bag[]{"+ds.substring(1)+"})";
-        case `v:
-            if (v.is_variable())
-                return v.toString();
-        };
-        throw new Exception("Cannot compile: "+e);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/src/main/java/core/Config.java
----------------------------------------------------------------------
diff --git a/src/main/java/core/Config.java b/src/main/java/core/Config.java
deleted file mode 100644
index a957d89..0000000
--- a/src/main/java/core/Config.java
+++ /dev/null
@@ -1,277 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.mrql;
-
-import org.apache.mrql.gen.VariableLeaf;
-import java.util.ArrayList;
-import java.io.FileInputStream;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-
-/** MRQL configuration parameters */
-final public class Config {
-    public static boolean loaded = false;
-
-    // true for using Hadoop HDFS file-system
-    public static boolean hadoop_mode = false;
-    // true for local execution (one node)
-    public static boolean local_mode = false;
-    // true for local execution (one node)
-    public static boolean distributed_mode = false;
-    // true for Hadoop map-reduce mode
-    public static boolean map_reduce_mode = false;
-    // true, for BSP mode using Hama
-    public static boolean bsp_mode = false;
-    // true, for Spark mode
-    public static boolean spark_mode = false;
-    // if true, it process the input interactively
-    public static boolean interactive = true;
-    // compile the MR functional arguments to Java bytecode at run-time
-    // (each task-tracker repeats the compilation at the MR setup time)
-    public static boolean compile_functional_arguments = true;
-    // if true, generates info about all compilation and optimization steps
-    public static boolean trace = false;
-    // number of worker nodes
-    public static int nodes = 2;
-    // true, to disable mapJoin
-    public static boolean noMapJoin = false;
-    // max distributed cache size for MapJoin (fragment-replicate join) in MBs
-    public static int mapjoin_size = 50;
-    // max entries for in-mapper combiner before they are flushed out
-    public static int map_cache_size = 100000;
-    // max number of bag elements to print
-    public static int max_bag_size_print = 20;
-    // max size of materialized vector before is spilled to a file:
-    public static int max_materialized_bag = 500000;
-    // max number of incoming messages before a sub-sync()
-    public static int bsp_msg_size = Integer.MAX_VALUE;
-    // number of elements per mapper to process the range min...max
-    public static long range_split_size = 100000;
-    // max number of streams to merge simultaneously
-    public static int max_merged_streams = 100;
-    // the directory for temporary files and spilled bags
-    public static String tmpDirectory = "/tmp/mrql_"+System.getProperty("user.name");
-    // true, if we want to derive a combine function for MapReduce
-    public static boolean use_combiner = true;
-    // true, if we can use the rule that fuses a groupBy with a join over the same key
-    public static boolean groupJoinOpt = true;
-    // true, if we can use the rule that converts a self-join into a simple mapreduce
-    public static boolean selfJoinOpt = true;
-    // true for run-time trace of plans
-    public static boolean trace_execution = false;
-    // true for extensive run-time trace of expressions & plans
-    public static boolean trace_exp_execution = false;
-    // true if you don't want to print statistics
-    public static boolean quiet_execution = false;
-    // true if this is during testing
-    public static boolean testing = false;
-    // true to display INFO log messages
-    public static boolean info = false;
-
-    /** store the configuration parameters */
-    public static void write ( Configuration conf ) {
-        conf.setBoolean("mrql.hadoop.mode",hadoop_mode);
-        conf.setBoolean("mrql.local.mode",local_mode);
-        conf.setBoolean("mrql.distributed.mode",distributed_mode);
-        conf.setBoolean("mrql.map.reduce.mode",map_reduce_mode);
-        conf.setBoolean("mrql.bsp.mode",bsp_mode);
-        conf.setBoolean("mrql.spark.mode",spark_mode);
-        conf.setBoolean("mrql.interactive",interactive);
-        conf.setBoolean("mrql.compile.functional.arguments",compile_functional_arguments);
-        conf.setBoolean("mrql.trace",trace);
-        conf.setInt("mrql.nodes",nodes);
-        conf.setInt("mrql.mapjoin.size",mapjoin_size);
-        conf.setInt("mrql.in.mapper.size",map_cache_size);
-        conf.setInt("mrql.max.bag.size.print",max_bag_size_print);
-        conf.setInt("mrql.max.materialized.bag",max_materialized_bag);
-        conf.setInt("mrql.bsp.msg.size",bsp_msg_size);
-        conf.setLong("mrql.range.split.size",range_split_size);
-        conf.setInt("mrql.max.merged.streams",max_merged_streams);
-        conf.set("mrql.tmp.directory",tmpDirectory);
-        conf.setBoolean("mrql.use.combiner",use_combiner);
-        conf.setBoolean("mrql.group.join.opt",groupJoinOpt);
-        conf.setBoolean("mrql.self.join.opt",selfJoinOpt);
-        conf.setBoolean("mrql.trace.execution",trace_execution);
-        conf.setBoolean("mrql.trace.exp.execution",trace_exp_execution);
-        conf.setBoolean("mrql.quiet.execution",quiet_execution);
-        conf.setBoolean("mrql.testing",testing);
-        conf.setBoolean("mrql.info",info);
-    }
-
-    /** load the configuration parameters */
-    public static void read ( Configuration conf ) {
-        if (loaded)
-            return;
-        loaded = true;
-        hadoop_mode = conf.getBoolean("mrql.hadoop.mode",hadoop_mode);
-        local_mode = conf.getBoolean("mrql.local.mode",local_mode);
-        distributed_mode = conf.getBoolean("mrql.distributed.mode",distributed_mode);
-        map_reduce_mode = conf.getBoolean("mrql.map.reduce.mode",map_reduce_mode);
-        bsp_mode = conf.getBoolean("mrql.bsp.mode",bsp_mode);
-        spark_mode = conf.getBoolean("mrql.spark.mode",spark_mode);
-        interactive = conf.getBoolean("mrql.interactive",interactive);
-        compile_functional_arguments = conf.getBoolean("mrql.compile.functional.arguments",compile_functional_arguments);
-        trace = conf.getBoolean("mrql.trace",trace);
-        nodes = conf.getInt("mrql.nodes",nodes);
-        mapjoin_size = conf.getInt("mrql.mapjoin.size",mapjoin_size);
-        map_cache_size = conf.getInt("mrql.in.mapper.size",map_cache_size);
-        max_bag_size_print = conf.getInt("mrql.max.bag.size.print",max_bag_size_print);
-        max_materialized_bag = conf.getInt("mrql.max.materialized.bag",max_materialized_bag);
-        bsp_msg_size = conf.getInt("mrql.bsp.msg.size",bsp_msg_size);
-        range_split_size = conf.getLong("mrql.range.split.size",range_split_size);
-        max_merged_streams = conf.getInt("mrql.max.merged.streams",max_merged_streams);
-        tmpDirectory = conf.get("mrql.tmp.directory");
-        use_combiner = conf.getBoolean("mrql.use.combiner",use_combiner);
-        groupJoinOpt = conf.getBoolean("mrql.group.join.opt",groupJoinOpt);
-        selfJoinOpt = conf.getBoolean("mrql.self.join.opt",selfJoinOpt);
-        trace_execution = conf.getBoolean("mrql.trace.execution",trace_execution);
-        trace_exp_execution = conf.getBoolean("mrql.trace.exp.execution",trace_exp_execution);
-        quiet_execution = conf.getBoolean("mrql.quiet.execution",quiet_execution);
-        testing = conf.getBoolean("mrql.testing",testing);
-        info = conf.getBoolean("mrql.info",info);
-    }
-
-    public static ArrayList<String> extra_args = new ArrayList<String>();
-
-    /** read configuration parameters from the Main args */
-    public static Bag parse_args ( String args[], Configuration conf ) throws Exception {
-        int i = 0;
-        int iargs = 0;
-        extra_args = new ArrayList<String>();
-        ClassImporter.load_classes();
-        interactive = true;
-        while (i < args.length) {
-            if (args[i].equals("-local")) {
-                local_mode = true;
-                i++;
-            } else if (args[i].equals("-dist")) {
-                distributed_mode = true;
-                i++;
-            } else if (args[i].equals("-reducers")) {
-                if (++i >= args.length)
-                    throw new Error("Expected number of reductions");
-                nodes = Integer.parseInt(args[i]);
-                i++;
-            } else if (args[i].equals("-bsp")) {
-                bsp_mode = true;
-                i++;
-            } else if (args[i].equals("-spark")) {
-                spark_mode = true;
-                i++;
-            } else if (args[i].equals("-bsp_tasks")) {
-                if (++i >= args.length && Integer.parseInt(args[i]) < 1)
-                    throw new Error("Expected max number of bsp tasks > 1");
-                nodes = Integer.parseInt(args[i]);
-                i++;
-            } else if (args[i].equals("-nodes")) {
-                if (++i >= args.length && Integer.parseInt(args[i]) < 1)
-                    throw new Error("Expected number of nodes > 1");
-                nodes = Integer.parseInt(args[i]);
-                i++;
-            } else if (args[i].equals("-bsp_msg_size")) {
-                if (++i >= args.length && Integer.parseInt(args[i]) < 10000)
-                    throw new Error("Expected max number of bsp messages before subsync() > 10000");
-                bsp_msg_size = Integer.parseInt(args[i]);
-                i++;
-            } else if (args[i].equals("-mapjoin_size")) {
-                if (++i >= args.length)
-                    throw new Error("Expected number of MBs");
-                mapjoin_size = Integer.parseInt(args[i]);
-                i++;
-            } else if (args[i].equals("-cache_size")) {
-                if (++i >= args.length)
-                    throw new Error("Expected number of entries");
-                map_cache_size = Integer.parseInt(args[i]);
-                i++;
-            } else if (args[i].equals("-tmp")) {
-                if (++i >= args.length)
-                    throw new Error("Expected a temporary directory");
-                tmpDirectory = args[i];
-                i++;
-            } else if (args[i].equals("-bag_size")) {
-                if (++i >= args.length && Integer.parseInt(args[i]) < 10000)
-                    throw new Error("Expected max size of materialized bag > 10000");
-                max_materialized_bag = Integer.parseInt(args[i]);
-                i++;
-            } else if (args[i].equals("-bag_print")) {
-                if (++i >= args.length)
-                    throw new Error("Expected number of bag elements to print");
-                max_bag_size_print = Integer.parseInt(args[i]);
-                i++;
-            } else if (args[i].equals("-split_size")) {
-                if (++i >= args.length)
-                    throw new Error("Expected a split size");
-                range_split_size = Long.parseLong(args[i]);
-                i++;
-            } else if (args[i].equals("-max_merged")) {
-                if (++i >= args.length)
-                    throw new Error("Expected a max number of merged streams");
-                max_merged_streams = Integer.parseInt(args[i]);
-                i++;
-            } else if (args[i].equals("-trace")) {
-                trace = true;
-                i++;
-            } else if (args[i].equals("-C")) {
-                compile_functional_arguments = true;
-                i++;
-            } else if (args[i].equals("-NC")) {
-                compile_functional_arguments = false;
-                i++;
-            } else if (args[i].equals("-P")) {
-                trace_execution = true;
-                i++;
-            } else if (args[i].equals("-quiet")) {
-                quiet_execution = true;
-                i++;
-            } else if (args[i].equals("-info")) {
-                info = true;
-                i++;
-            } else if (args[i].equals("-trace_execution")) {
-                trace_execution = true;
-                trace_exp_execution = true;
-                compile_functional_arguments = false;
-                i++;
-            } else if (args[i].equals("-methods")) {
-                System.out.print("\nImported methods: ");
-                ClassImporter.print_methods();
-                System.out.println();
-                System.out.print("\nAggregations:");
-                Translator.print_aggregates();
-                System.out.println();
-                i++;
-            } else if (args[i].charAt(0) == '-')
-                throw new Error("Unknown MRQL parameter: "+args[i]);
-            else {
-                if (interactive) {
-                    Main.query_file = args[i++];
-                    interactive = false;
-                } else extra_args.add(args[i++]);
-            }
-        };
-        if (hadoop_mode)
-            write(conf);
-        Plan.conf = conf;
-        Bag b = new Bag();
-        for ( String s: extra_args )
-            b.add(new MR_string(s));
-        Interpreter.new_global_binding(new VariableLeaf("args").value(),b);
-        return b;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/src/main/java/core/DataSet.java
----------------------------------------------------------------------
diff --git a/src/main/java/core/DataSet.java b/src/main/java/core/DataSet.java
deleted file mode 100644
index efe5646..0000000
--- a/src/main/java/core/DataSet.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.mrql;
-
-import java.util.List;
-import java.util.ArrayList;
-import org.apache.hadoop.conf.Configuration;
-
-
-/** The domain of the MRQL physical algebra is a set of DataSources */
-public class DataSet {
-    public ArrayList<DataSource> source;  // multiple sources
-    public long counter;  // a Hadoop user-defined counter used by the `repeat' operator
-    public long records;  // total number of dataset records
-
-    /** Construct a DataSet that contains one DataSource
-     * @param s the given DataSource
-     * @param counter a Hadoop user-defined counter used by the `repeat' operator
-     * @param records total number of dataset records
-     */
-    DataSet ( DataSource s, long counter, long records ) {
-        source = new ArrayList<DataSource>();
-        source.add(s);
-        this.counter = counter;
-        this.records = records;
-    }
-
-    /** Construct a set of DataSources
-     * @param counter a Hadoop user-defined counter used by the `repeat' operator
-     * @param records total number of dataset records
-     */
-    DataSet ( long counter, long records ) {
-        source = new ArrayList<DataSource>();
-        this.counter = counter;
-        this.records = records;
-    }
-
-    /** add a DataSource to this DataSet */
-    public void add ( DataSource s ) {
-        source.add(s);
-    }
-
-    /** merge this DataSet with the given DataSet */
-    public void merge ( DataSet ds ) {
-        source.addAll(ds.source);
-        counter += ds.counter;
-        records += ds.records;
-    }
-
-    /** dataset size in bytes */
-    public long size ( Configuration conf ) {
-        long n = 0;
-        for (DataSource s: source)
-            n += s.size(conf);
-        return n;
-    }
-
-    /** return a single DataSource path by merging all the DataSource paths in this DataSet */
-    public String merge () {
-        Object[] ds = source.toArray();
-        String path = ((DataSource)ds[0]).path.toString();
-        for ( int i = 1; i < ds.length; i++ )
-            path += ","+((DataSource)ds[i]).path;
-        return path;
-    }
-
-    /** return the first num values */
-    public List<MRData> take ( int num ) {
-        int count = num;
-        ArrayList<MRData> res = new ArrayList<MRData>();
-        for ( DataSource s: source ) {
-            res.addAll(s.take(count));
-            if (res.size() < count)
-                count = count-res.size();
-            else return res;
-        };
-        return res;
-    }
-
-    /** accumulate all dataset values */
-    public MRData reduce ( MRData zero, Function acc ) {
-        MRData res = zero;
-        for ( DataSource s: source )
-            res = s.reduce(res,acc);
-        return res;
-    }
-
-    public String toString () {
-        String p = "<"+counter;
-        for (DataSource s: source)
-            p += ","+s;
-        return p+">";
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/src/main/java/core/DataSource.java
----------------------------------------------------------------------
diff --git a/src/main/java/core/DataSource.java b/src/main/java/core/DataSource.java
deleted file mode 100644
index 6a0ef4f..0000000
--- a/src/main/java/core/DataSource.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.mrql;
-
-import org.apache.mrql.gen.*;
-import java.io.IOException;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Map;
-import java.util.HashMap;
-import java.util.Iterator;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.conf.Configuration;
-
-
-/** A DataSource is any input data source, such as a text file, a key/value map, a data base, an intermediate file, etc */
-public class DataSource {
-    final public static String separator = "%%%";
-    public static DataSourceDirectory dataSourceDirectory = new DataSourceDirectory();
-    public static ParserDirectory parserDirectory = new ParserDirectory();
-    private static boolean loaded = false;
-
-    public String path;
-    public Class<? extends MRQLFileInputFormat> inputFormat;
-    public int source_num;
-    public boolean to_be_merged;    // if the path is a directory with multiple files, merge them
-
-    final static class ParserDirectory extends HashMap<String,Class<? extends Parser>> {
-    }
-
-    /** A dictionary that maps data source paths to DataSource data.
-     * It assumes that each path can only be associated with a single data source format and parser
-     */
-    final static class DataSourceDirectory extends HashMap<String,DataSource> {
-        public void read ( Configuration conf ) {
-            clear();
-            for ( String s: conf.get("mrql.data.source.directory").split("@@@") ) {
-                String[] p = s.split("===");
-                put(p[0],DataSource.read(p[1],conf));
-            }
-        }
-
-        public String toString () {
-            String s = "";
-            for ( String k: keySet() )
-                s += "@@@"+k+"==="+get(k);
-            if (s.equals(""))
-                return s;
-            else return s.substring(3);
-        }
-
-        public DataSource get ( String name ) {
-            for ( Map.Entry<String,DataSource> e: entrySet() )
-                if (name.startsWith(e.getKey()))
-                    return e.getValue();
-            return null;
-        }
-
-        public void distribute ( Configuration conf ) {
-            conf.set("mrql.data.source.directory",toString());
-        }
-    }
-
-    DataSource () {}
-
-    DataSource ( int source_num,
-                 String path,
-                 Class<? extends MRQLFileInputFormat> inputFormat,
-                 Configuration conf ) {
-        this.source_num = source_num;
-        this.path = path;
-        this.inputFormat = inputFormat;
-        to_be_merged = false;
-        try {
-            Path p = new Path(path);
-            FileSystem fs = p.getFileSystem(conf);
-            String complete_path = fs.getFileStatus(p).getPath().toString();
-            //String complete_path = "file:"+path;
-            this.path = complete_path;
-            dataSourceDirectory.put(this.path,this);
-        } catch (IOException e) {
-            throw new Error(e);
-        }
-    }
-
-    public static void loadParsers() {
-        if (!loaded) {
-            DataSource.parserDirectory.put("xml",XMLParser.class);
-            DataSource.parserDirectory.put("json",JsonFormatParser.class);
-            DataSource.parserDirectory.put("line",LineParser.class);
-            loaded = true;
-        }
-    }
-
-    static {
-        loadParsers();
-    }
-
-    private static long size ( Path path, Configuration conf ) throws IOException {
-        FileStatus s = path.getFileSystem(conf).getFileStatus(path);
-        if (!s.isDir())
-            return s.getLen();
-        long size = 0;
-        for ( FileStatus fs: path.getFileSystem(conf).listStatus(path) )
-            size += fs.getLen();
-        return size;
-    }
-
-    /** data set size in bytes */
-    public long size ( Configuration conf ) {
-        try {
-            return size(new Path(path),conf);
-        } catch (IOException e) {
-            throw new Error(e);
-        }
-    }
-
-    public static DataSource read ( String buffer, Configuration conf ) {
-        try {
-            String[] s = buffer.split(separator);
-            int n = Integer.parseInt(s[1]);
-            if (s[0].equals("Binary"))
-                return new BinaryDataSource(n,s[2],conf);
-            else if (s[0].equals("Generator"))
-                return new GeneratorDataSource(n,s[2],conf);
-            else if (s[0].equals("Text"))
-                return new ParsedDataSource(n,s[3],parserDirectory.get(s[2]),((Node)Tree.parse(s[4])).children(),conf);
-            else throw new Error("Unrecognized data source: "+s[0]);
-        } catch (Exception e) {
-            throw new Error(e);
-        }
-    }
-
-    public static DataSource get ( String path, Configuration conf ) {
-        if (dataSourceDirectory.isEmpty())
-            dataSourceDirectory.read(conf);
-        return dataSourceDirectory.get(path);
-    }
-
-    public static DataSource getCached ( String remote_path, String local_path, Configuration conf ) {
-        DataSource ds = get(remote_path,conf);
-        ds.path = local_path;
-        dataSourceDirectory.put(local_path,ds);
-        return ds;
-    }
-
-    /** return the first num values */
-    public List<MRData> take ( int num ) {
-        int count = num;
-        try {
-            ArrayList<MRData> res = new ArrayList<MRData>();
-            Iterator<MRData> it = inputFormat.newInstance().materialize(new Path(path)).iterator();
-            for ( int i = num; (num < 0 || i > 0) && it.hasNext(); i-- )
-                if (Config.hadoop_mode && Config.bsp_mode)
-                    res.add(((Tuple)it.next()).get(1));  // strip tag in BSP mode
-                else res.add(it.next());
-            return res;
-        } catch (Exception ex) {
-            throw new Error(ex);
-        }
-    }
-
-    static Tuple tuple_container = new Tuple(new Tuple(),new Tuple());
-
-    /** accumulate all datasource values */
-    public MRData reduce ( MRData zero, final Function acc ) {
-        try {
-            MRData res = zero;
-            for ( MRData x: inputFormat.newInstance().materialize(new Path(path)) ) {
-                if (Config.hadoop_mode && Config.bsp_mode)
-                    x = ((Tuple)x).get(1); // strip tag in BSP mode
-                tuple_container.set(0,res).set(1,x);
-                res = acc.eval(tuple_container);
-            };
-            return res;
-        } catch (Exception ex) {
-            throw new Error(ex);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/src/main/java/core/Environment.java
----------------------------------------------------------------------
diff --git a/src/main/java/core/Environment.java b/src/main/java/core/Environment.java
deleted file mode 100644
index da8dd84..0000000
--- a/src/main/java/core/Environment.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.mrql;
-
-import java.io.*;
-import org.apache.mrql.gen.Tree;
-
-
-/** the run-time environment for in-memory evaluation (binds variables to MRData) */
-final public class Environment implements Serializable {
-    public String name;
-    public MRData value;
-    public Environment next;
-
-    Environment ( String n, MRData v, Environment next ) {
-        name = n;
-        value = v;
-        this.next = next;
-    }
-
-    private void writeObject(ObjectOutputStream out) throws IOException {
-        out.defaultWriteObject();
-    }
-
-    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
-        in.defaultReadObject();
-        name = Tree.add(name);
-    }
-}