You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mrql.apache.org by fe...@apache.org on 2014/03/13 15:24:48 UTC

[23/26] MRQL-32: Refactoring directory structure for Eclipse

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/core/src/main/java/org/apache/mrql/Environment.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/Environment.java b/core/src/main/java/org/apache/mrql/Environment.java
new file mode 100644
index 0000000..da8dd84
--- /dev/null
+++ b/core/src/main/java/org/apache/mrql/Environment.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import java.io.*;
+import org.apache.mrql.gen.Tree;
+
+
+/** the run-time environment for in-memory evaluation (binds variables to MRData) */
+final public class Environment implements Serializable {
+    public String name;
+    public MRData value;
+    public Environment next;
+
+    Environment ( String n, MRData v, Environment next ) {
+        name = n;
+        value = v;
+        this.next = next;
+    }
+
+    private void writeObject(ObjectOutputStream out) throws IOException {
+        out.defaultWriteObject();
+    }
+
+    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+        in.defaultReadObject();
+        name = Tree.add(name);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/core/src/main/java/org/apache/mrql/Evaluator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/Evaluator.java b/core/src/main/java/org/apache/mrql/Evaluator.java
new file mode 100644
index 0000000..28d356f
--- /dev/null
+++ b/core/src/main/java/org/apache/mrql/Evaluator.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import java_cup.runtime.*;
+import org.apache.mrql.gen.*;
+import java.io.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+
+
+/** Evaluates physical plans using one of the evaluation engines */
+abstract public class Evaluator extends Interpreter {
+
+    /** the current MRQL evaluator */
+    public static Evaluator evaluator;
+
+    /** initialize the evaluator */
+    abstract public void init ( Configuration conf );
+
+    /** shutdown the evaluator */
+    abstract public void shutdown ( Configuration conf );
+
+    /** initialize the query evaluation */
+    abstract public void initialize_query ();
+
+    /** create a new evaluation configuration */
+    abstract public Configuration new_configuration ();
+
+    /** synchronize peers in BSP mode */
+    public MR_bool synchronize ( MR_string peerName, MR_bool mr_exit ) {
+        throw new Error("You can only synchronize BSP tasks");
+    }
+
+    /** distribute a bag among peers in BSP mode */
+    public Bag distribute ( MR_string peerName, Bag s ) {
+        throw new Error("You can only distribute bags among BSP tasks");
+    }
+
+    /** run a BSP task */
+    public MRData bsp ( Tree plan, Environment env ) throws Exception {
+        throw new Error("You can only run a BSP task in BSP mode");
+    }
+
+    /** return the FileInputFormat for parsed files (CSV, XML, JSON, etc) */
+    abstract public Class<? extends MRQLFileInputFormat> parsedInputFormat ();
+
+    /** return the FileInputFormat for binary files */
+    abstract public Class<? extends MRQLFileInputFormat> binaryInputFormat ();
+
+    /** return the FileInputFormat for data generator files */
+    abstract public Class<? extends MRQLFileInputFormat> generatorInputFormat ();
+
+    /** The Aggregate physical operator
+     * @param acc_fnc  the accumulator function from (T,T) to T
+     * @param zero  the zero element of type T
+     * @param plan the plan that constructs the dataset that contains the bag of values {T}
+     * @param env contains bindings fro variables to values (MRData)
+     * @return the aggregation result of type T
+     */
+    abstract public MRData aggregate ( Tree acc_fnc,
+                                       Tree zero,
+                                       Tree plan,
+                                       Environment env ) throws Exception;
+
+    /** Evaluate a loop a fixed number of times */
+    abstract public Tuple loop ( Tree e, Environment env ) throws Exception;
+
+    /** Evaluate a MRQL physical plan and print tracing info
+     * @param e the physical plan
+     * @param env contains bindings fro variables to values (MRData)
+     * @return a DataSet (stored in HDFS)
+     */
+    abstract public DataSet eval ( final Tree e,
+                                   final Environment env,
+                                   final String counter );
+
+    final static MR_long counter_key = new MR_long(0);
+    final static MRContainer counter_container = new MRContainer(counter_key);
+    final static MRContainer value_container = new MRContainer(new MR_int(0));
+
+    /** dump MRQL data into a sequence file */
+    public void dump ( String file, Tree type, MRData data ) throws Exception {
+        Path path = new Path(file);
+        FileSystem fs = path.getFileSystem(Plan.conf);
+        PrintStream ftp = new PrintStream(fs.create(path.suffix(".type")));
+        ftp.print("2@"+type.toString()+"\n");
+        ftp.close();
+        SequenceFile.Writer writer
+            = new SequenceFile.Writer(fs,Plan.conf,path,
+                                      MRContainer.class,MRContainer.class);
+        if (data instanceof MR_dataset)
+            data = Plan.collect(((MR_dataset)data).dataset());
+        if (data instanceof Bag) {
+            Bag s = (Bag)data;
+            long i = 0;
+            for ( MRData e: s ) {
+                counter_key.set(i++);
+                value_container.set(e);
+                writer.append(counter_container,value_container);
+            }
+        } else {
+            counter_key.set(0);
+            value_container.set(data);
+            writer.append(counter_container,value_container);
+        };
+        writer.close();
+    }
+
+    /** dump MRQL data into a text CVS file */
+    public void dump_text ( String file, Tree type, MRData data ) throws Exception {
+	int ps = Config.max_bag_size_print;
+	Config.max_bag_size_print = -1;
+	final PrintStream out = (Config.hadoop_mode)
+	                         ? Plan.print_stream(file)
+	                         : new PrintStream(file);
+	if (data instanceof MR_dataset)
+	    data = Plan.collect(((MR_dataset)data).dataset());
+	if (Translator.collection_type(type)) {
+	    Tree tp = ((Node)type).children().head();
+	    if (tp instanceof Node && ((Node)tp).name().equals("tuple")) {
+		Trees ts = ((Node)tp).children();
+		for ( MRData x: (Bag)data ) {
+		    Tuple t = (Tuple)x;
+		    out.print(print(t.get((short)0),ts.nth(0)));
+		    for ( short i = 1; i < t.size(); i++ )
+			out.print(","+print(t.get(i),ts.nth(i)));
+		    out.println();
+		}
+	    } else for ( MRData x: (Bag)data )
+		       out.println(print(x,tp));
+	} else out.println(print(data,query_type));
+	Config.max_bag_size_print = ps;
+	out.close();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/core/src/main/java/org/apache/mrql/Function.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/Function.java b/core/src/main/java/org/apache/mrql/Function.java
new file mode 100644
index 0000000..1f1d4e1
--- /dev/null
+++ b/core/src/main/java/org/apache/mrql/Function.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import java.io.*;
+
+
+/**
+ * An anonymous function from MRData to MRData (a lambda abstraction)
+ * Must provide a concrete implementation for eval (the lambda body)
+ */
+abstract public class Function implements Serializable {
+    /**
+     * Evaluate the anonymous function from MRData to MRData
+     * @param arg the operand to be evaluated
+     * @return the result of evaluating
+     */
+    abstract public MRData eval ( final MRData arg );
+}

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/core/src/main/java/org/apache/mrql/GeneratorDataSource.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/GeneratorDataSource.java b/core/src/main/java/org/apache/mrql/GeneratorDataSource.java
new file mode 100644
index 0000000..380f50d
--- /dev/null
+++ b/core/src/main/java/org/apache/mrql/GeneratorDataSource.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import java.io.IOException;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.conf.Configuration;
+
+
+/** A DataSource used for processing the range min..max */
+final public class GeneratorDataSource extends DataSource {
+    GeneratorDataSource ( int source_num, String path, Configuration conf ) {
+        super(source_num,path,Evaluator.evaluator.generatorInputFormat(),conf);
+    }
+
+    GeneratorDataSource ( String path, Configuration conf ) {
+        super(-1,path,Evaluator.evaluator.generatorInputFormat(),conf);
+    }
+
+    public static long size ( Path path, Configuration conf ) throws IOException {
+        // each file generates range_split_size long integers
+        FileStatus s = path.getFileSystem(conf).getFileStatus(path);
+        if (!s.isDir())
+            return Config.range_split_size*8;
+        long size = 0;
+        for ( FileStatus fs: path.getFileSystem(conf).listStatus(path) )
+            size += Config.range_split_size*8;
+        return size;
+    }
+
+    public String toString () {
+        return "Generator"+separator+source_num+separator+path;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/core/src/main/java/org/apache/mrql/Interpreter.gen
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/Interpreter.gen b/core/src/main/java/org/apache/mrql/Interpreter.gen
new file mode 100644
index 0000000..97c76ce
--- /dev/null
+++ b/core/src/main/java/org/apache/mrql/Interpreter.gen
@@ -0,0 +1,833 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import org.apache.mrql.gen.*;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.io.PrintStream;
+
+
+/** The MRQL interpreter */
+public class Interpreter extends TypeInference {
+
+    public final static Tree identity_mapper = #<lambda(x,bag(x))>;
+
+    protected static Environment global_env = null;
+
+    /** retrieve variable binding */
+    public final static MRData variable_lookup ( final String v, final Environment environment ) {
+        for ( Environment env = environment; env != null; env = env.next ) {
+            if (v.equals(env.name))
+                return env.value;
+        };
+        return null;
+    }
+
+    /** insert a new global variable binding */
+    public final static void new_global_binding ( final String var, final MRData value ) {
+        if (value instanceof Bag)
+            ((Bag)value).materialize();
+        global_env = new Environment(var,value,global_env);
+    }
+
+    /** remove a global variable binding */
+    public static void remove_global_binding ( String v ) {
+        if (global_env == null)
+            return;
+        for ( Environment env = global_env; env.next != null; env = env.next )
+            if (v.equals(env.next.name))
+                env.next = env.next.next;
+        if (global_env.name == v)
+            global_env = global_env.next;
+    }
+
+    /** retrieve a global variable binding */
+    public static MRData lookup_global_binding ( String v ) {
+        for ( Environment env = global_env; env != null; env = env.next )
+            if (v.equals(env.name))
+                return env.value;
+        return null;
+    }
+
+    public static void set_global_bindings ( Environment env ) {
+        global_env = env;
+    }
+
+    final static int coerce_method = ClassImporter.find_method_number("coerce",#[any,int]);
+
+    /** untyped reify: not type-correct but will not crash the run-time system */
+    private final static Tree reify ( final MRData x ) {
+        if (x instanceof Bag) {
+            Bag b = (Bag)x;
+            Trees as = #[];
+            for ( MRData e: b)
+                as = as.append(reify(e));
+            return #<list(...as)>;
+        } else if (x instanceof Tuple) {
+            Tuple t = (Tuple)x;
+            Trees as = #[];
+            for ( short i = 0; i < t.size(); i++ )
+                as = as.append(reify(t.get(i)));
+            return #<tuple(...as)>;
+        } else if (x instanceof MR_string)
+            return new StringLeaf(((MR_string)x).get());
+        else if (x instanceof MR_short)
+            return #<callM(coerce,`coerce_method,`(((MR_short)x).get()),`(MRContainer.SHORT))>;
+        else if (x instanceof MR_int)
+            return #<`(((MR_int)x).get())>;
+        else if (x instanceof MR_long)
+            return #<callM(coerce,`coerce_method,`((int)((MR_long)x).get()),`(MRContainer.LONG))>;
+        else if (x instanceof MR_float)
+            return #<`(((MR_float)x).get())>;
+        else if (x instanceof MR_double)
+            return #<callM(coerce,`coerce_method,`((float)(((MR_double)x).get())),`(MRContainer.DOUBLE))>;
+        throw new Error("wrong MRData: "+x);
+    }
+
+    /** evaluate an MRQL function in memory */
+    private final static Function evalf ( final String v,
+                                          final Tree body,
+                                          final Environment env ) {
+        return new Function() {
+            final public MRData eval ( final MRData x ) {
+                return evalE(body,new Environment(v,x,env));
+            }
+        };
+    }
+
+    /** evaluate an MRQL function in memory */
+    public final static Function evalF ( Tree fnc, Environment env ) {
+        match fnc {
+        case compiled(`ln,`lm,...vars):
+            try {
+                return Compiler.compiled(Thread.currentThread().getContextClassLoader(),ln.toString());
+            } catch (Exception ex) {
+                System.err.println("*** Unable to retrieve the compiled lambda: "+fnc);
+                return ((Lambda) evalE(lm)).lambda();
+            }
+        case lambda(`v,`b):
+            return evalf(v.toString(),b,env);
+        case function(tuple(...params),`tp,`body):
+            String[] as = new String[params.length()];
+            int i = 0;
+            for ( Tree param: params )
+                match param {
+                case `bind(`v,_):
+                    as[i++] = v.toString();
+                };
+            return evalT(as,body,env);
+        };
+        throw new Error("Ill-formed lambda: "+fnc);
+    }
+
+    /** evaluate an MRQL function in memory */
+    private final static Function evalT ( final String[] params,
+                                          final Tree body,
+                                          final Environment env ) {
+        return new Function() {
+            final public MRData eval ( final MRData x ) {
+                Environment new_env = env;
+                for ( int i = 0; i < params.length; i++ )
+                    new_env = new Environment(params[i],((Tuple)x).get(i),new_env);
+                return evalE(body,new_env);
+            }
+        };
+    }
+
+    final static String true_name = #<true>.toString();
+    final static String false_name = #<false>.toString();
+    final static String null_name = #<null>.toString();
+    final static MRData null_value = new Tuple(0);
+    final static MRData true_value = new MR_bool(true);
+    final static MRData false_value = new MR_bool(false);
+
+    static int tab_count = -3;
+
+    public static String tabs ( int n ) {
+        StringBuffer b = new StringBuffer();
+        for ( int i = 0; i < n; i++)
+            b.append(' ');
+        return b.toString();
+    }
+
+    /** evaluate an MRQL expression in memory and print tracing info */
+    final static MRData evalE ( final Tree e, final Environment env ) {
+        if (Config.trace_exp_execution) {
+            tab_count += 3;
+            System.out.println(tabs(tab_count)+print_query(e));
+        };
+        MRData res = evalEE(e,env);
+        if (Config.trace_exp_execution) {
+            System.out.println(tabs(tab_count)+"-> "+res);
+            tab_count -= 3;
+        };
+        return res;
+    }
+
+    /** evaluate an MRQL expression in memory */
+    private final static MRData evalEE ( final Tree e, final Environment env ) {
+        try {
+            if (e.is_variable()) {
+                String v = e.toString();
+                if (v == true_name)
+                    return true_value;
+                else if (v == false_name)
+                    return false_value;
+                else if (v == null_name)
+                    return null_value;
+                MRData x = variable_lookup(v,env);
+                if (x != null)
+                    return x;
+                x = lookup_global_binding(v);
+                if (x == null)
+                    throw new Error("Variable "+v+" is not bound");
+                return x;
+            } else if (e.is_long())
+                return new MR_int((int)e.longValue());
+            else if (e.is_double())
+                return new MR_float((float)e.doubleValue());
+            else if (e.is_string())
+                return new MR_string(e.stringValue());
+        match e {
+        case callM(and,_,`x,`y):  // lazy
+            return (((MR_bool)evalE(x,env)).get()) ? evalE(y,env) : false_value;
+        case callM(or,_,`x,`y):
+            return (((MR_bool)evalE(x,env)).get()) ? true_value : evalE(y,env);
+        case callM(`f,`n,...args):   // internal function call
+            MRData[] as = new MRData[args.length()];
+            for ( int i = 0; i < args.length(); i++ )
+                as[i] = evalE(args.nth(i),env);
+            return ClassImporter.call((int)n.longValue(),as);
+        case compiled(`ln,_,...vars):
+            return new Lambda(Compiler.compiled(Thread.currentThread().getContextClassLoader(),ln.toString()));
+        case lambda(`v,`body):
+            return new Lambda(evalf(v.toString(),body,env));
+        case nth(`x,`n):
+            return ((Tuple)evalE(x,env)).get((int)n.longValue());
+        case setNth(`x,`n,`v,`ret):
+            return ((Tuple)evalE(x,env)).set((int)n.longValue(),evalE(v,env),evalE(ret,env));
+        case materialize(`u):
+            return MapReduceAlgebra.materialize(evalE(u,env));
+        case let(`v,`u,`body):
+            MRData x = evalE(u,env);
+            if (x instanceof Bag)
+                ((Bag)x).materialize();
+            return evalE(body,new Environment(v.toString(),x,env));
+        case cmap(`f,`s):
+            return MapReduceAlgebra.cmap(evalF(f,env),(Bag)evalE(s,env));
+        case filter(`p,`m,`s):
+            return MapReduceAlgebra.filter(evalF(p,env),evalF(m,env),(Bag)evalE(s,env));
+        case map(`m,`s):
+            return MapReduceAlgebra.map(evalF(m,env),(Bag)evalE(s,env));
+        case repeat(lambda(`v,`b),`s,`n):
+            final String nm = v.toString();
+            final Tree body = b;
+            if (Config.hadoop_mode) {
+                Function loop_fnc = new Function () {
+                        public MRData eval ( MRData s ) {
+                            new_global_binding(nm,s);
+                            MRData val = new MR_dataset(Evaluator.evaluator.eval(body,new Environment(nm,s,env),nm));
+                            return val;
+                        }; };
+                return MapReduceAlgebra.repeat(loop_fnc,(Bag)evalE(s,env),((MR_int)evalE(n,env)).get());
+            } else {
+                Function loop_fnc = new Function () {
+                        public MRData eval ( MRData s ) {
+                            new_global_binding(nm,s);
+                            return evalM(body,new Environment(nm,s,env));
+                        }; };
+                return MapReduceAlgebra.repeat(loop_fnc,(Bag)evalE(s,env),((MR_int)evalE(n,env)).get());
+            }
+        case repeat(`lm,`s,`n):
+            return MapReduceAlgebra.repeat(evalF(lm,env),(Bag)evalE(s,env),((MR_int)evalE(n,env)).get());
+        case range(`min,`max):
+            return MapReduceAlgebra.generator(((MR_long)evalE(min,env)).get(),
+                                              ((MR_long)evalE(max,env)).get());
+        case call(`f,...args):
+            Tuple t = new Tuple(args.length());
+            int i = 0;
+            for ( Tree a: args )
+                t.set(i++,evalE(a,env));
+            return evalF(f,env).eval(t);
+        case tuple(`x,`y):
+            return new Tuple(evalE(x,env),evalE(y,env));
+        case tuple(`x,`y,`z):
+            return new Tuple(evalE(x,env),evalE(y,env),evalE(z,env));
+        case tuple(...el):
+            Tuple t = new Tuple(el.length());
+            int i = 0;
+            for ( Tree a: el )
+                t.set(i++,evalE(a,env));
+            return t;
+        case tagged_union(`n,`u):
+            return new Union((byte)n.longValue(),evalE(u,env));
+        case union_value(`x):
+            return ((Union)evalE(x,env)).value();
+        case union_tag(`x):
+            return new MR_int(((Union)evalE(x,env)).tag());
+        // used for shortcutting sync in bsp supersteps
+        case BAG():
+            return SystemFunctions.bsp_empty_bag;
+        case TRUE():
+            return SystemFunctions.bsp_true_value;
+        case FALSE():
+            return SystemFunctions.bsp_false_value;
+        case `T(...el):
+            if (!is_collection(T))
+                fail;
+            if (el.is_empty())
+                return new Bag();
+            Bag b = new Bag(el.length());
+            for ( Tree a: el )
+                b.add(evalE(a,env));
+            return b;
+        case if(`c,`x,`y):
+            if (((MR_bool)evalE(c,env)).get())
+                return evalE(x,env);
+            else return evalE(y,env);
+        case Collect(`s):
+            try {
+                if (Config.hadoop_mode)
+                    return Plan.collect(Evaluator.evaluator.eval(s,env,"-"));
+                Bag b = evalS(s,env);
+                b.materialize();
+                return b;
+            } catch (Exception ex) { throw new Error(ex); }
+        case dataset_size(`x):
+            return new MR_long(Plan.size(Evaluator.evaluator.eval(x,env,"-")) / (1024*1024));
+        case synchronize(`peer,`b):
+            return Evaluator.evaluator.synchronize(((MR_string)evalE(peer,env)),(MR_bool)evalE(b,env));
+        case distribute(`peer,`s):
+            return Evaluator.evaluator.distribute(((MR_string)evalE(peer,env)),(Bag)evalE(s,env));
+        case mapReduce(`m,`r,`s,_):
+            return MapReduceAlgebra.mapReduce(evalF(m,env),
+                                              evalF(r,env),
+                                              (Bag)evalE(s,env));
+        case mapReduce2(`mx,`my,`r,`x,`y,_):
+            return MapReduceAlgebra.mapReduce2(
+                                evalF(mx,env),
+                                evalF(my,env),
+                                evalF(r,env),
+                                (Bag)evalE(x,env),
+                                (Bag)evalE(y,env));
+        case mapJoin(`kx,`ky,`r,`x,`y):
+            return MapReduceAlgebra.mapJoin(
+                                evalF(kx,env),
+                                evalF(ky,env),
+                                evalF(r,env),
+                                (Bag)evalE(x,env),
+                                (Bag)evalE(y,env));
+        case crossProduct(`mx,`my,`r,`x,`y):
+            return MapReduceAlgebra.crossProduct(
+                              evalF(mx,env),
+                              evalF(my,env),
+                              evalF(r,env),
+                              (Bag)evalE(x,env),
+                              (Bag)evalE(y,env));
+        case groupBy(`s):
+            return MapReduceAlgebra.groupBy((Bag)evalE(s,env));
+        case orderBy(`s):
+            return MapReduceAlgebra.groupBy((Bag)evalE(s,env));
+        case index(`x,`n):
+            MRData xv = evalE(x,env);
+            MRData nv = evalE(n,env);
+            final int k = (int)((MR_int)nv).get();
+            if (k < 0)
+                throw new Error("Negative list index: "+k);
+            if (xv instanceof MR_dataset) {
+                List<MRData> res = ((MR_dataset)xv).dataset().take(k+1);
+                if (k >= res.size())
+                    throw new Error("List index out of bounds: "+k);
+                return res.get(k);
+            };
+            Bag b = (Bag)xv;
+            return b.get(k);
+        case range(`x,`i,`j):
+            MRData xv = evalE(x,env);
+            MRData ni = evalE(i,env);
+            MRData nj = evalE(j,env);
+            int ki = (int)((MR_int)ni).get();
+            int kj = (int)((MR_int)nj).get();
+            if (ki < 0 || kj < ki)
+                throw new Error("Wrong list range: ["+ki+","+kj+"]");
+            Iterator<MRData> it = (xv instanceof MR_dataset)
+                                   ? ((MR_dataset)xv).dataset().take(kj+1).iterator()
+                                   : ((Bag)xv).iterator();
+            Bag s = new Bag();
+            for ( int n = 0; it.hasNext() && n < ki; n++ )
+                it.next();
+            for ( int n = ki; it.hasNext() && n <= kj; n++ )
+                s.add(it.next());
+            return s;
+        case map_index(`x,`key):
+            MRData xv = evalE(x,env);
+            final MRData nk = evalE(key,env);
+            if (xv instanceof MR_dataset) {
+                xv = ((MR_dataset)xv).dataset().reduce(new Tuple(),new Function() {
+                        public MRData eval ( MRData value ) {
+                            Tuple p = (Tuple)value;
+                            Tuple y = (Tuple)p.second();
+                            return (y.first().equals(nk)) ? y.second() : p.first();
+                        }
+                    });
+                if (xv instanceof Tuple && ((Tuple)xv).size() == 0)
+                    throw new Error("Map key not found: "+nk);
+                return xv;
+            };
+            return ((Bag)xv).map_find(nk);
+        case aggregate(`acc,`zero,`s):
+            return MapReduceAlgebra.aggregate(evalF(acc,env),evalE(zero,env),
+                                              (Bag)evalE(s,env));
+        case Aggregate(`acc,`zero,`s):
+            if (Config.hadoop_mode)
+                return Evaluator.evaluator.aggregate(closure(acc,env),zero,s,env);
+            else return MapReduceAlgebra.aggregate(evalF(acc,env),evalE(zero,env),evalM(s,env));
+        case mergeGroupByJoin(`kx,`ky,`gx,`gy,`m,`c,`r,`x,`y,`o):
+            return MapReduceAlgebra.mergeGroupByJoin(evalF(kx,env),evalF(ky,env),evalF(gx,env),evalF(gy,env),
+                                                     evalF(m,env),evalF(c,env),evalF(r,env),
+                                                     (Bag)evalE(x,env),(Bag)evalE(y,env));
+        case BSP(tuple(...ns),`superstep,`state,`o,...as):
+            if (Config.hadoop_mode)
+                return Evaluator.evaluator.bsp(e,env);
+            Bag[] ds = new Bag[as.length()];
+            for ( int i = 0; i < ds.length; i++ )
+                ds[i] = evalM(as.nth(i),env);
+            int[] nn = new int[ns.length()];
+            for ( int i = 0; i < ns.length(); i++ )
+                nn[i] = (int)((LongLeaf)ns.nth(i)).value();
+            return MapReduceAlgebra.BSP(nn,
+                                        evalF(superstep,env),
+                                        evalE(state,env),
+                                        o.equals(#<true>),
+                                        ds);
+        case BSP(`n,`superstep,`state,`o,...as):
+            if (Config.hadoop_mode)
+                return Evaluator.evaluator.bsp(e,env);
+            Bag[] ds = new Bag[as.length()];
+            for ( int i = 0; i < ds.length; i++ )
+                ds[i] = evalM(as.nth(i),env);
+            return MapReduceAlgebra.BSP(new int[]{(int)((LongLeaf)n).value()},
+                                        evalF(superstep,env),
+                                        evalE(state,env),
+                                        o.equals(#<true>),
+                                        ds);
+        case Loop(lambda(tuple(...vs),tuple(...bs)),tuple(...ss),`num):
+            if (Config.hadoop_mode)
+                return Evaluator.evaluator.loop(e,env);
+            int limit = ((MR_int)evalE(num,env)).get();
+            Bag[] s = new Bag[vs.length()];
+            for ( int i = 0; i < vs.length(); i++ )
+                s[i] = evalM(ss.nth(i),env);
+            for ( int n = 0; n < limit; n++ ) {
+                Environment nenv = env;
+                for ( int i = 0; i < vs.length(); i ++ ) {
+                    s[i].materialize();
+                    nenv = new Environment(vs.nth(i).toString(),s[i],nenv);
+                };
+                for ( int i = 0; i < vs.length(); i ++ )
+                    s[i] = (Bag)evalM(bs.nth(i),nenv);
+            };
+            return new Tuple(s);
+        case function(tuple(...params),`tp,`body):
+            String[] as = new String[params.length()];
+            int i = 0;
+            for ( Tree param: params )
+                match param {
+                case `bind(`v,_):
+                    as[i++] = v.toString();
+                };
+            return new Lambda(evalT(as,body,env));
+        case typed(`x,_):
+            return evalE(x,env);
+        case apply(`f,`arg):
+            if (!f.is_variable())
+                return evalF(f,env).eval(evalE(arg,env));
+            MRData fnc = lookup_global_binding(f.toString());
+            if (fnc == null) {
+                String s = Plan.conf.get("mrql.global."+f);
+                if (s != null)
+                    try {
+                        Tree ft = Tree.parse(s);
+                        TopLevel.store(f.toString(),ft);
+                        fnc = evalE(ft,env);
+                        new_global_binding(f.toString(),fnc);
+                    } catch (Exception ex) {
+                        throw new Error(ex);
+                    }
+            };
+            MRData t = evalE(arg,env);
+            if (!(t instanceof Tuple))
+                throw new Error("Expected a tuple in function application: "+t);
+            return ((Lambda)fnc).lambda().eval(t);
+        case trace(`x):
+            MRData z = evalE(x,env);
+            System.err.println("*** "+x+": "+z);
+            return z;
+        case _:
+            try {
+                if (Config.hadoop_mode)
+                    return new MR_dataset(Evaluator.evaluator.eval(e,env,"-"));
+                else return evalS(e,env);
+            } catch (Exception ex) { throw new Error(ex); }
+        };
+        throw new Error("Cannot evaluate the expression: "+e);
+        } catch (Error msg) {
+            if (!Config.trace)
+                throw new Error(msg.getMessage());
+            System.err.println(msg.getMessage());
+            msg.printStackTrace();
+            throw new Error("Evaluation error in: "+print_query(e));
+        } catch (Exception ex) {
+            if (Config.trace) {
+                System.err.println(ex.getMessage());
+                ex.printStackTrace();
+            }
+            throw new Error("Evaluation error in: "+print_query(e));
+        }
+    }
+
+    /** evaluate an MRQL expression in memory */
+    final static MRData evalE ( final Tree e ) {
+        return evalE(e,null);
+    }
+
+    /** evaluate MRQL physical operators in memory (returns a Bag) */
+    final static Bag evalS ( final Tree e, final Environment env ) {
+        return evalM(e,env);
+    }
+
+    /** evaluate MRQL physical operators in memory (returns a Bag) */
+    final static Bag evalM ( final Tree e, final Environment env ) {
+        if (Config.trace_execution) {
+            tab_count += 3;
+            System.out.println(tabs(tab_count)+print_query(e));
+        };
+        Bag res = evalMM(e,env);
+        if (Config.trace_execution) {
+            System.out.println(tabs(tab_count)+"-> "+res);
+            tab_count -= 3;
+        };
+        return res;
+    }
+
+    /** evaluate MRQL physical operators in memory (returns a Bag) */
+    final static Bag evalMM ( final Tree e, final Environment env ) {
+        try {
+            match e {
+            case cMap(`f,`s):
+                return MapReduceAlgebra.cmap(evalF(f,env),evalM(s,env));
+            case AggregateMap(`f,`acc,`zero,`s):
+                return new Bag(MapReduceAlgebra.aggregate(evalF(acc,env),evalE(zero,env),
+                                                          evalM(#<cMap(`f,`s)>,env)));
+            case MapReduce(`m,`r,`s,_):
+                return MapReduceAlgebra.mapReduce(
+                                   evalF(m,env),
+                                   evalF(r,env),
+                                   evalM(s,env));
+            case MapAggregateReduce(`m,`r,`acc,`zero,`s,_):
+                return new Bag(MapReduceAlgebra.aggregate(evalF(acc,env),evalE(zero,env),
+                                                          evalM(#<MapReduce(`m,`r,`s,false)>,env)));
+            case MapCombineReduce(`m,`c,`r,`s,_):
+                return MapReduceAlgebra.mapReduce(
+                                   evalF(m,env),
+                                   evalF(r,env),
+                                   evalM(s,env));
+            case MapReduce2(`mx,`my,`r,`x,`y,_):
+                return MapReduceAlgebra.mapReduce2(
+                                evalF(mx,env),
+                                evalF(my,env),
+                                evalF(r,env),
+                                evalM(x,env),
+                                evalM(y,env));
+            case MapCombineReduce2(`mx,`my,`c,`r,`x,`y,_):
+                return MapReduceAlgebra.mapReduce2(
+                                evalF(mx,env),
+                                evalF(my,env),
+                                evalF(r,env),
+                                evalM(x,env),
+                                evalM(y,env));
+            case MapAggregateReduce2(`mx,`my,`r,`acc,`zero,`x,`y,_):
+                return new Bag(MapReduceAlgebra.aggregate(evalF(acc,env),evalE(zero,env),
+                                                          evalM(#< MapReduce2(`mx,`my,`r,`x,`y,false)>,env)));
+            case MapJoin(`kx,`ky,`r,`x,`y):
+                return MapReduceAlgebra.mapJoin(
+                                evalF(kx,env),
+                                evalF(ky,env),
+                                evalF(r,env),
+                                evalM(x,env),
+                                evalM(y,env));
+            case MapAggregateJoin(`kx,`ky,`r,`acc,`zero,`x,`y):
+                return new Bag(MapReduceAlgebra.aggregate(evalF(acc,env),evalE(zero,env),
+                                                          evalM(#<MapJoin(`kx,`ky,`r,`x,`y)>,env)));
+            case GroupByJoin(`kx,`ky,`gx,`gy,`m,`c,`r,`x,`y,`o):
+                return MapReduceAlgebra.groupByJoin(
+                                evalF(kx,env),
+                                evalF(ky,env),
+                                evalF(gx,env),
+                                evalF(gy,env),
+                                evalF(m,env),
+                                evalF(c,env),
+                                evalF(r,env),
+                                evalM(x,env),
+                                evalM(y,env));
+            case CrossProduct(`mx,`my,`r,`x,`y):
+                return MapReduceAlgebra.crossProduct(
+                                evalF(mx,env),
+                                evalF(my,env),
+                                evalF(r,env),
+                                evalM(x,env),
+                                evalM(y,env));
+            case CrossAggregateProduct(`mx,`my,`r,`acc,`zero,`x,`y):
+                return new Bag(MapReduceAlgebra.aggregate(evalF(acc,env),evalE(zero,env),
+                                                          evalM(#<CrossProduct(`mx,`my,`r,`x,`y)>,env)));
+            case BinarySource(`file,_):
+                return (Bag)MapReduceAlgebra.read_binary(file.stringValue());
+            case BSPSource(`n,BinarySource(`file,_)):
+                return (Bag)MapReduceAlgebra.read_binary((int)((LongLeaf)n).value(),
+                                                         file.stringValue());
+            case BSPSource(`n,ParsedSource(`parser,`file,...args)):
+                if (!(n instanceof LongLeaf))
+                    fail;
+                Parser p = DataSource.parserDirectory.get(parser.toString()).newInstance();
+                if (p == null)
+                    throw new Error("Unknown parser: "+parser);
+                return MapReduceAlgebra.parsedSource((int)(((LongLeaf)n).value()),p,
+                                                     ((MR_string)evalE(file,env)).get(),args);
+            case ParsedSource(`parser,`file,...args):
+                Parser p = DataSource.parserDirectory.get(parser.toString()).newInstance();
+                if (p == null)
+                    throw new Error("Unknown parser: "+parser);
+                return MapReduceAlgebra.parsedSource(p,((MR_string)evalE(file,env)).get(),args);
+            case Merge(`x,`y):
+                return evalM(x,env).union(evalM(y,env));
+            case Repeat(lambda(`v,`b),`s,`n):
+                final String vs = v.toString();
+                final Tree body = b;
+                Function loop = new Function() {
+                        final public MRData eval ( final MRData x ) {
+                            return evalM(body,new Environment(vs,x,env));
+                        }
+                    };
+                return MapReduceAlgebra.repeat(loop,(Bag)evalM(s,env),((MR_int)evalE(n,env)).get());
+            case Closure(lambda(`v,`b),`s,`n):
+                final String vs = v.toString();
+                final Tree body = b;
+                Function loop = new Function() {
+                        final public MRData eval ( final MRData x ) {
+                            return evalM(body,new Environment(vs,x,env));
+                        }
+                    };
+                return MapReduceAlgebra.closure(loop,(Bag)evalM(s,env),((MR_int)evalE(n,env)).get());
+            case Generator(`min,`max,`size):
+                return MapReduceAlgebra.generator(((MR_long)evalE(min,env)).get(),
+                                                  ((MR_long)evalE(max,env)).get());
+            case BSPSource(`n,Generator(`min,`max,`size)):
+                return MapReduceAlgebra.generator((int)((LongLeaf)n).value(),
+                                                  ((MR_long)evalE(min,env)).get(),
+                                                  ((MR_long)evalE(max,env)).get());
+            case Dump(`s):
+                Bag bs = (Bag)evalE(s,env);
+                final Iterator<MRData> iter = bs.iterator();
+                return new Bag(new BagIterator() {
+                        public boolean hasNext () {
+                            return iter.hasNext();
+                        }
+                        public MRData next () {
+                            return new Tuple(new MR_int(0),iter.next());
+                        }
+                    });
+            case let(`v,`u,`body):
+                return evalM(body,new Environment(v.toString(),evalE(u,env),env));
+            case apply(`f,`arg):
+                if (!f.is_variable())
+                    return (Bag)evalF(f,env).eval(evalE(arg));
+                MRData fnc = lookup_global_binding(f.toString());
+                if (fnc == null)
+                    throw new Error("Unknown function: "+f);
+                MRData t = evalE(arg,env);
+                if (!(t instanceof Tuple))
+                    throw new Error("Expected a tuple in function application: "+t);
+                return (Bag)((Lambda)fnc).lambda().eval(t);
+            case BSPSource(`n,`s):
+                final MR_int i = new MR_int((int)((LongLeaf)n).value());
+                Bag bs = (Bag)evalE(s,env);
+                final Iterator<MRData> iter = bs.iterator();
+                return new Bag(new BagIterator() {
+                        public boolean hasNext () {
+                            return iter.hasNext();
+                        }
+                        public MRData next () {
+                            return new Tuple(i,iter.next());
+                        }
+                    });
+            case BSP(...):
+                MRData res = evalE(e,env);
+                if (res instanceof Bag)
+                    return (Bag)res;
+                else return new Bag(res);
+            case `v:
+                if (!v.is_variable())
+                    fail;
+                MRData x = variable_lookup(v.toString(),env);
+                if (x != null)
+                    return (Bag)x;
+                x = lookup_global_binding(v.toString());
+                if (x != null)
+                    return (Bag)x;
+                throw new Error("Variable "+v+" is not bound");
+            };
+            throw new Error("Cannot evaluate the plan: "+e);
+        } catch (Error msg) {
+            if (!Config.trace)
+                throw new Error(msg.getMessage());
+            System.err.println(msg.getMessage());
+            msg.printStackTrace();
+            throw new Error("Evaluation error in: "+print_query(e));
+        } catch (Exception ex) {
+            if (Config.trace)
+                ex.printStackTrace();
+            throw new Error("Evaluation error in: "+print_query(e));
+        }
+    }
+
+    /** replace all non-free variables with their reified values */
+    private final static Tree closure ( Tree e, Environment env, Trees local_vars ) {
+        match e {
+        case lambda(`x,`b):
+            return #<lambda(`x,`(closure(b,env,local_vars.cons(x))))>;
+        case apply(`f,...as):
+            Trees bs = #[];
+            for (Tree a: as)
+                bs = bs.append(closure(a,env,local_vars));
+            return #<apply(`f,...bs)>;
+        case `f(...as):
+            Trees bs = #[];
+            for (Tree a: as)
+                bs = bs.append(closure(a,env,local_vars));
+            return #<`f(...bs)>;
+        case null: return null;
+        case `v:
+            if (!v.is_variable())
+                fail;
+            if (local_vars.member(v))
+                fail;
+            MRData x = variable_lookup(v.toString(),env);
+            if (x != null)
+                if (!(x instanceof MR_dataset))
+                    return reify(x);
+            x = lookup_global_binding(v.toString());
+            if (x != null)
+                if (!(x instanceof MR_dataset))
+                    return reify(x);
+        };
+        return e;
+    }
+
+    /** replace all non-free variables with their reified values */
+    final static Tree closure ( Tree e, Environment env ) {
+        return closure(e,env,#[]);
+    }
+
+    static Tree query_type;
+    static Tree query_plan;
+    static boolean is_dataset;
+
+    /** translate an MRQL expression e into a physical plan */
+    final static Tree translate_expression ( Tree e ) {
+        try {
+            if (Config.trace)
+                System.out.println("Query at line "+Main.parser.line_pos()+": "+print_query(e));
+            Tree qt = TypeInference.type_inference(e);
+            if (!Config.quiet_execution)
+                System.out.println("Query type: "+print_type(qt));
+            query_type = qt;
+            Tree ne = Normalization.remove_groupby(e);
+            if (Config.trace)
+                System.out.println("After removing group-by:\n"+ne.pretty(0));
+            ne = Simplification.rename(ne);
+            if (Config.trace)
+                System.out.println("After renaming variables:\n"+ne.pretty(0));
+            ne = Simplification.rename(Normalization.normalize_all(ne));
+            if (Config.trace)
+                System.out.println("Normalized query:\n"+ne.pretty(0));
+            type_inference(ne);
+            ne = QueryPlan.best_plan(ne);
+            if (Config.trace)
+                System.out.println("Best plan:\n"+ne.pretty(0));
+            ne = Simplification.rename(Translator.translate_select(ne));
+            if (Config.trace)
+                System.out.println("After removing select-queries:\n"+ne.pretty(0));
+            type_inference(ne);
+            ne = Simplification.simplify_all(ne);
+            if (Config.trace)
+                System.out.println("Algebra expression:\n"+ne.pretty(0));
+            Tree pt = type_inference(ne);
+            if (Config.trace)
+                System.out.println("Algebraic type: "+print_type(pt));
+            ne = AlgebraicOptimization.translate_all(ne);
+            if (Config.trace)
+                System.out.println("Translated expression:\n"+ne.pretty(0));
+            Tree et = TypeInference.type_inference(ne);
+            is_dataset = PlanGeneration.is_dataset_expr(ne);
+            if (Config.trace)
+                System.out.println("Physical plan type: "+print_type(et));
+            repeat_variables = #[];
+            ne = Simplification.simplify_all(ne);
+            Tree plan = PlanGeneration.makePlan(ne);
+            if (Config.bsp_mode) {
+                BSPTranslator.reset();
+                if (Config.trace)
+                    System.out.println("Physical plan:\n"+plan.pretty(0));
+                plan = Materialization.materialize_terms(BSPTranslator.constructBSPplan(plan));
+                if (Config.trace)
+                    System.out.println("BSP plan:\n"+plan.pretty(0));
+                else {
+                    String splan = print_plan(plan,0,false);
+                    if (!splan.equals("") && !Config.quiet_execution)
+                        System.out.println("BSP plan:\n"+splan);
+                }
+            } else {
+                if (Config.hadoop_mode)
+                    plan = PlanGeneration.physical_plan(plan);
+                plan = Materialization.materialize_terms(AlgebraicOptimization.common_factoring(plan));
+                if (Config.trace)
+                    System.out.println("Physical plan:\n"+plan.pretty(0));
+                else {
+                    String splan = print_plan(plan,0,false);
+                    if (!splan.equals("") && !Config.quiet_execution)
+                        System.out.println("Physical plan:\n"+splan);
+                }
+            };
+            if (Config.compile_functional_arguments)
+                plan = Compiler.compile(plan);
+            return plan;
+        } catch (Error x) {
+            if (Config.testing)
+                throw new Error(x);
+            if (!Config.trace && x.toString().endsWith("Type Error"))
+                return null;
+            if (x.getMessage() != null) // system error
+                System.err.println("*** MRQL System Error at line "+Main.parser.line_pos()+": "+x);
+            if (Config.trace)
+                x.printStackTrace(System.err);
+            return null;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/core/src/main/java/org/apache/mrql/Inv.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/Inv.java b/core/src/main/java/org/apache/mrql/Inv.java
new file mode 100644
index 0000000..3faa7fe
--- /dev/null
+++ b/core/src/main/java/org/apache/mrql/Inv.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import java.io.IOException;
+import java.io.DataInput;
+import java.io.DataOutput;
+
+
+final public class Inv extends MRData {
+    private MRData value;
+
+    Inv ( MRData v ) { value = v; }
+
+    public void materializeAll () { value.materializeAll(); };
+
+    public MRData value () { return value; }
+
+    final public void write ( DataOutput out ) throws IOException {
+        out.writeByte(MRContainer.INV);
+        value.write(out);
+    }
+
+    final public static Inv read ( DataInput in ) throws IOException {
+        return new Inv(MRContainer.read(in));
+    }
+
+    public void readFields ( DataInput in ) throws IOException {
+        value.readFields(in);
+    }
+
+    public int compareTo ( MRData x ) {
+        assert(x instanceof Inv);
+        return -value.compareTo(((Inv)x).value);
+    }
+
+    final public static int compare ( byte[] x, int xs, int xl, byte[] y, int ys, int yl, int[] size ) {
+        int n = MRContainer.compare(x,xs,xl,y,ys,yl,size);
+        size[0] += 1;
+        return -n;
+    }
+
+    public boolean equals ( Object x ) {
+        return x instanceof Inv && value.equals(((Inv)x).value);
+    }
+
+    public int hashCode () { return value.hashCode(); }
+
+    public String toString () { return "inv("+value.toString()+")"; }
+}

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/core/src/main/java/org/apache/mrql/JSON.cup
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/JSON.cup b/core/src/main/java/org/apache/mrql/JSON.cup
new file mode 100644
index 0000000..c045437
--- /dev/null
+++ b/core/src/main/java/org/apache/mrql/JSON.cup
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import java_cup.runtime.*;
+
+terminal TRUE, FALSE, NULL, COLON, COMMA, O_BEGIN, O_END, A_BEGIN, A_END;
+
+terminal String STRING;
+terminal Long INTEGER;
+terminal Double DOUBLE;
+
+non terminal MRData top, json, pair, value;
+non terminal Bag members, elements;
+
+precedence left O_BEGIN, O_END, A_BEGIN, A_END;
+precedence left COMMA;
+precedence left COLON;
+
+start with top;
+
+top ::= json:e                        {: RESULT = e; :}
+    ;
+json ::= O_BEGIN O_END                {: RESULT = new Union((byte)0,new Bag()); :}
+     |   O_BEGIN members:m O_END      {: RESULT = new Union((byte)0,m); :}
+     |   A_BEGIN A_END                {: RESULT = new Union((byte)1,new Bag()); :}
+     |   A_BEGIN elements:m A_END     {: RESULT = new Union((byte)1,m); :}
+     |   O_BEGIN O_BEGIN INTEGER:n O_END O_END {: RESULT = new MR_variable((int)n.longValue()); :}
+     ;
+members ::= pair:p                    {: RESULT = new Bag(p); :}
+	|   members:m COMMA pair:p    {: RESULT = m.add_element(p); :}
+	;
+pair ::= STRING:s COLON value:v          {: RESULT = (new Tuple(2)).set(0,new MR_string(s)).set(1,v); :}
+     ;
+elements ::= value:v                     {: RESULT = new Bag(v); :}
+	 |   elements:es COMMA value:v   {: RESULT = es.add_element(v); :}
+	 ;
+value ::= json:e                         {: RESULT = e; :}
+      |   STRING:s			 {: RESULT = new Union((byte)2,new MR_string(s)); :}
+      |   INTEGER:n			 {: RESULT = new Union((byte)3,new MR_long(n.longValue())); :}
+      |   DOUBLE:n			 {: RESULT = new Union((byte)4,new MR_double(n.doubleValue())); :}
+      |   TRUE				 {: RESULT = new Union((byte)5,new MR_bool(true)); :}
+      |   FALSE				 {: RESULT = new Union((byte)5,new MR_bool(false)); :}
+      |   NULL				 {: RESULT = new Union((byte)6,new Tuple(0)); :}
+      ;

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/core/src/main/java/org/apache/mrql/JSON.lex
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/JSON.lex b/core/src/main/java/org/apache/mrql/JSON.lex
new file mode 100644
index 0000000..c75d1bb
--- /dev/null
+++ b/core/src/main/java/org/apache/mrql/JSON.lex
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import java_cup.runtime.Symbol;
+
+
+%%
+%class JSONLex
+%public
+%line
+%char
+%cup
+%eofval{
+  return symbol(jsym.EOF);
+%eofval}
+%{
+  public String text () { return yytext(); }
+
+  public int line_pos () { return yyline+1; }
+
+  public int char_pos () { return yychar; }
+
+  public static Symbol symbol ( int s ) {
+    return new Symbol(s);
+  }
+
+  public static Symbol symbol ( int s, Object o ) {
+    return new Symbol(s,o);
+  }
+
+%}
+
+INT = [+-]?[0-9]+
+DOUBLE = [+-]?[0-9]+([\.][0-9]+)?([eE][+-]?[0-9]+)?
+
+%%
+
+\"[^\"]*\"	        { return symbol(jsym.STRING,yytext().substring(1,yytext().length()-1)); }
+{INT}		        { return symbol(jsym.INTEGER,new Long(yytext())); }
+{DOUBLE}         	{ return symbol(jsym.DOUBLE,new Double(yytext())); }
+true                    { return symbol(jsym.TRUE); }
+false                   { return symbol(jsym.FALSE); }
+null                    { return symbol(jsym.NULL); }
+\{                      { return symbol(jsym.O_BEGIN); }
+\}                      { return symbol(jsym.O_END); }
+\[                      { return symbol(jsym.A_BEGIN); }
+\]                      { return symbol(jsym.A_END); }
+\,                      { return symbol(jsym.COMMA); }
+\:                      { return symbol(jsym.COLON); }
+[ \t\f]                 { }
+[\r\n]                  { }
+.                       { throw new Error("Illegal character: "+yytext()); }

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/core/src/main/java/org/apache/mrql/JsonFormatParser.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/JsonFormatParser.java b/core/src/main/java/org/apache/mrql/JsonFormatParser.java
new file mode 100644
index 0000000..77aa891
--- /dev/null
+++ b/core/src/main/java/org/apache/mrql/JsonFormatParser.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import org.apache.mrql.gen.*;
+import java.io.StringReader;
+import java.nio.ByteBuffer;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.DataOutputBuffer;
+import java_cup.runtime.Symbol;
+
+
+/** The JSON parser */
+public class JsonFormatParser implements Parser {
+    String[] tags;          // split tags
+    JsonSplitter splitter;
+
+    public void initialize ( Trees args ) {
+        try {
+            if (args.length() > 0) {
+                if (!(args.nth(0) instanceof Node)
+                    || !(((Node)args.nth(0)).name().equals("list")
+                         || ((Node)args.nth(0)).name().equals("bag")))
+                    throw new Error("Must provide a bag of synchronization property names to split the JSON source: "+args.nth(0));
+                Trees ts = ((Node)args.nth(0)).children();
+                if (ts.length() == 0)
+                    throw new Error("Expected at least one synchronization tagname in JSON source: "+ts);
+                tags = new String[ts.length()];
+                for ( int i = 0; i < tags.length; i++ )
+                    if (ts.nth(i) instanceof StringLeaf)
+                        tags[i] = ((StringLeaf)(ts.nth(i))).value();
+                    else throw new Error("Expected a synchronization tagname in JSON source: "+ts.nth(i));
+            }
+        } catch (Exception e) {
+            throw new Error(e);
+        }
+    }
+
+    public void open ( String file ) {
+        try {
+            splitter = new JsonSplitter(tags,file,new DataOutputBuffer());
+        } catch (Exception e) {
+            throw new Error(e);
+        }
+    }
+
+    public void open ( FSDataInputStream fsin, long start, long end ) {
+        try {
+            splitter = new JsonSplitter(tags,fsin,start,end,new DataOutputBuffer());
+        } catch (Exception e) {
+            throw new Error(e);
+        }
+    }
+
+    public Tree type () { return new VariableLeaf("JSON"); }
+
+    public String slice () {
+        if (splitter.hasNext()) {
+            DataOutputBuffer b = splitter.next();
+            return new String(b.getData(),0,b.getLength());
+        } else return null;
+    }
+
+    public Bag parse ( String s ) {
+        try {
+            JSONLex scanner = new JSONLex(new StringReader(s));
+            JSONParser parser = new JSONParser(scanner);
+            Symbol sym = parser.parse();
+            return new Bag((MRData)sym.value);
+        } catch (Exception e) {
+            System.err.println(e);
+            return new Bag();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/core/src/main/java/org/apache/mrql/JsonSplitter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/JsonSplitter.java b/core/src/main/java/org/apache/mrql/JsonSplitter.java
new file mode 100644
index 0000000..7017e72
--- /dev/null
+++ b/core/src/main/java/org/apache/mrql/JsonSplitter.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import org.apache.mrql.gen.*;
+import java_cup.runtime.Symbol;
+import java.util.Iterator;
+import java.io.*;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.io.DataOutputBuffer;
+
+
+/** Extract the JSON objects tagged by tags from a data split of the input stream (fsin) */
+final public class JsonSplitter implements Iterator<DataOutputBuffer> {
+    boolean in_memory;
+    FSDataInputStream fsin; // for HDFS processing
+    InputStream in;         // for in-memory processing
+    JSONLex scanner;
+    String[] tags;
+    long start;
+    long end;
+    final DataOutputBuffer buffer;
+
+    JsonSplitter ( String[] tags, FSDataInputStream fsin, long start, long end,
+                   DataOutputBuffer buffer ) {
+        in_memory = false;
+        this.tags = tags;
+        this.fsin = fsin;
+        this.end = end;
+        this.buffer = buffer;
+        try {
+            fsin.seek(start);
+            this.start = (start == 0) ? start : sync(start);
+            fsin.seek(this.start);
+            scanner = new JSONLex(fsin);
+        } catch ( IOException e ) {
+            System.err.println("*** Cannot parse the data split: "+fsin);
+        }
+    }
+
+    JsonSplitter ( String[] tags, String file, DataOutputBuffer buffer ) {
+        in_memory = true;
+        try {
+            in = new FileInputStream(file);
+        } catch ( Exception e ) {
+            throw new Error("Cannot open the file: "+file);
+        };
+        this.tags = tags;
+        this.buffer = buffer;
+        scanner = new JSONLex(in);
+    }
+
+    private long sync ( long start ) {
+        try {
+            long first_quote = -1;
+            for ( long offset = 0; ; offset++ ) {
+                char c = (char)fsin.read();
+                if (c == '\"') {
+                    if (first_quote >= 0)
+                        if ((char)fsin.read() == ':')
+                            return start+first_quote;
+                    first_quote = offset;
+                }
+            }
+        } catch (IOException ex) {
+            return (long)0;
+        }
+    }
+
+    public boolean hasNext () {
+        try {
+            if (in_memory || start+scanner.char_pos() < end)
+                if (skip())
+                    return store();
+            return false;
+        } catch (Exception e) {
+            System.err.println(e);
+            return false;
+        }
+    }
+
+    public DataOutputBuffer next () {
+        return buffer;
+    }
+
+    public void remove () { }
+
+    boolean is_start_tag ( String tagname ) {
+        if (tags == null)
+            return true;
+        for (String tag: tags)
+            if (tag.contentEquals(tagname))
+                return true;
+        return false;
+    }
+
+    /** skip until the beginning of a split element */
+    boolean skip () throws IOException {
+        while (true) {
+            Symbol s = scanner.next_token();
+            if (s.sym == jsym.EOF || (!in_memory && start+scanner.char_pos() >= end))
+                return false;
+            if (s.sym == jsym.STRING && is_start_tag((String)s.value)) {
+                String tag = (String)s.value;
+                if (scanner.next_token().sym == jsym.COLON) {
+                    buffer.reset();
+                    buffer.write('{');
+                    buffer.write('\"');
+                    for ( int i = 0; i < tag.length(); i++ )
+                        buffer.write(tag.charAt(i));
+                    buffer.write('\"');
+                    buffer.write(':');
+                    return true;
+                }
+            }
+        }
+    }
+
+    /** store one split element into the buffer; may cross split boundaries */
+    boolean store () throws IOException {
+        int nest = 0;
+        while (true) {
+            Symbol s = scanner.next_token();
+            if (s.sym == jsym.EOF)
+                return false;
+            if (s.sym == jsym.O_BEGIN || s.sym == jsym.A_BEGIN)
+                nest++;
+            else if (s.sym == jsym.O_END || s.sym == jsym.A_END)
+                nest--;
+            String text = scanner.text();
+            for ( int i = 0; i < text.length(); i++ )
+                buffer.write(text.charAt(i));
+            if (nest == 0) {
+                buffer.write('}');
+                return true;
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/core/src/main/java/org/apache/mrql/Lambda.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/Lambda.java b/core/src/main/java/org/apache/mrql/Lambda.java
new file mode 100644
index 0000000..d0cfd7a
--- /dev/null
+++ b/core/src/main/java/org/apache/mrql/Lambda.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import java.io.IOException;
+import java.io.DataInput;
+import java.io.DataOutput;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.fs.*;
+
+
+/** an anonymous function encapsulated as MRData */
+final public class Lambda extends MRData {
+    private Function lambda;
+
+    public Lambda ( Function f ) { lambda = f; }
+
+    public void materializeAll () {};
+
+    public Function lambda () { return lambda; }
+
+    final public void write ( DataOutput out ) throws IOException {
+        throw new Error("Functions are not serializable");
+    }
+
+    public void readFields ( DataInput in ) throws IOException {
+        throw new Error("Functions are not serializable");
+    }
+
+    public int compareTo ( MRData x ) {
+        throw new Error("Functions cannot be compared");
+    }
+
+    public boolean equals ( Object x ) {
+        throw new Error("Functions cannot be compared");
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/core/src/main/java/org/apache/mrql/LineParser.gen
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/LineParser.gen b/core/src/main/java/org/apache/mrql/LineParser.gen
new file mode 100644
index 0000000..fb1e068
--- /dev/null
+++ b/core/src/main/java/org/apache/mrql/LineParser.gen
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import org.apache.mrql.gen.*;
+import java.io.*;
+import java.util.*;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.util.LineReader;
+
+
+/** A parser for line-oriented, character delimited text files (such as CVS) */
+final public class LineParser implements Parser {
+    final static int maxLineLength = 1000;
+    boolean in_memory;
+    FSDataInputStream fsin;     // for HDFS processing
+    LineReader in;
+    BufferedReader buffered_in; // for in-memory processing
+    Text line;
+    long start;
+    long end;
+    long pos;
+    String delimiter;
+    Tree type;
+    byte[] types;    // a vector of basic type ids (see MRContainer in MapReduceData)
+    int type_length;
+
+    static byte[] relational_record ( Tree tp ) {
+        match tp {
+        case record(...al):
+            Trees attrs = #[];
+            byte[] types = new byte[al.length()];
+            for ( int i = 0; i < types.length; i++ )
+                match al.nth(i) {
+                case bind(`v,any):
+                    types[i] = -1;
+                    if (attrs.member(v))
+                        TypeInference.type_error(tp,"Duplicate record attribute name: "+v);
+                    attrs = attrs.append(v);
+                case bind(`v,`t):
+                    if (!t.is_variable())
+                        fail;
+                    types[i] = MRContainer.type_code(t.toString());
+                    if (attrs.member(v))
+                        TypeInference.type_error(tp,"Duplicate record attribute name: "+v);
+                    attrs = attrs.append(v);
+                    if (!MRContainer.basic_type(types[i]))
+                        TypeInference.error("Expected a basic type for a relational attribute: "+t);
+                case `t: TypeInference.error("Expected a basic type for a relational attribute: "
+                                             +TypeInference.print_type(t));
+                };
+            return types;
+        case tuple(...al):
+            byte[] types = new byte[al.length()];
+            for ( int i = 0; i < types.length; i++ )
+                match al.nth(i) {
+                case any:
+                    types[i] = -1;
+                case `t:
+                    if (!t.is_variable())
+                        fail;
+                    types[i] = MRContainer.type_code(t.toString());
+                    if (!MRContainer.basic_type(types[i]))
+                        TypeInference.error("Expected a basic type for a relational attribute: "+t);
+                case `t: TypeInference.error("Expected a basic type for a relational attribute: "
+                                             +TypeInference.print_type(t));
+                };
+            return types;
+        };
+        TypeInference.error("Expected a relational record or a tuple type: "
+                            +TypeInference.print_type(tp));
+        return null;
+    }
+
+    static Tree relational_record_type ( Tree tp ) {
+        match tp {
+        case record(...al):
+            Trees ts = #[];
+            for ( Tree a: al )
+                match a {
+                case bind(_,any): ;
+                case `t: ts = ts.append(t);
+                };
+            return #<record(...ts)>;
+        case tuple(...al):
+            Trees ts = #[];
+            for ( Tree a: al )
+                if (!a.equals(#<any>))
+                    ts = ts.append(a);
+            return #<tuple(...ts)>;
+        };
+        TypeInference.error("Expected a relational record type: "
+                            +TypeInference.print_type(tp));
+        return null;
+    }
+
+    public Tree type () {
+        return relational_record_type(type);
+    }
+
+    public void initialize ( Trees args ) {
+        if (Config.hadoop_mode && Plan.conf == null)
+            Plan.conf = Evaluator.evaluator.new_configuration();
+        if (args.length() != 2)
+            throw new Error("The line parser must have two arguments: "+args);
+        if (!(args.nth(0) instanceof StringLeaf))
+            throw new Error("Expected a delimiter: "+args.nth(0));
+        delimiter = ((StringLeaf)args.nth(0)).value();
+        if (delimiter.length() == 0)
+            throw new Error("Expected a delimiter with at least one character: "+delimiter);
+        type = ((Node)args.nth(1)).children().nth(0);
+        types = relational_record(type);
+        type_length = 0;
+        for ( int i = 0; i < types.length; i++ )
+            if (types[i] >= 0)
+                type_length++;
+        if (type_length < 1)
+            TypeInference.error("A relational record type must have at least one component: "
+                                +TypeInference.print_type(type));
+    }
+
+    public void open ( String file ) {
+        in_memory = true;
+        try {
+            buffered_in = new BufferedReader(new InputStreamReader(new FileInputStream(file)),
+                                             10000);
+        } catch ( Exception e ) {
+            throw new Error("Cannot open the file: "+file);
+        }
+    }
+
+    public void open ( FSDataInputStream fsin, long fstart, long fend ) {
+        in_memory = false;
+        this.fsin = fsin;
+        start = fstart;
+        end = fend;
+        line = new Text();
+        try {
+            if (start != 0) {  // for all but the first data split, skip the first record
+                --start;
+                fsin.seek(start);
+                in = new LineReader(fsin,Plan.conf);
+                start += in.readLine(new Text(),0,(int) Math.min(Integer.MAX_VALUE,end-start));
+            } else in = new LineReader(fsin,Plan.conf);
+            pos = start;
+        } catch ( IOException e ) {
+            System.err.println("*** Cannot parse the data split: "+fsin);
+            this.start = end;
+        }
+    }
+
+    public String slice () {
+        try {
+            if (in_memory)
+                return buffered_in.readLine();
+            while (pos < end) {
+                int newSize = in.readLine(line,maxLineLength,
+                                          Math.max((int)Math.min(Integer.MAX_VALUE,end-pos),
+                                                   maxLineLength));
+                if (newSize == 0)
+                    return null;
+                pos += newSize;
+                if (newSize < maxLineLength)
+                    return line.toString();
+            };
+            return null;
+        } catch ( Exception e ) {
+            System.err.println("*** Cannot slice the text: "+e);
+            return "";
+        }
+    }
+
+    private static MRData parse_value ( String text, byte type ) {
+        switch (type) {
+        case MRContainer.BYTE: return new MR_byte(Byte.parseByte(text));
+        case MRContainer.SHORT: return new MR_short(Short.parseShort(text));
+        case MRContainer.INT: return new MR_int(Integer.parseInt(text));
+        case MRContainer.LONG: return new MR_long(Long.parseLong(text));
+        case MRContainer.FLOAT: return new MR_float(Float.parseFloat(text));
+        case MRContainer.DOUBLE: return new MR_double(Double.parseDouble(text));
+        case MRContainer.CHAR: return new MR_char(text.charAt(0));
+        case MRContainer.STRING: return new MR_string(text);
+        };
+        System.err.println("*** Cannot parse the type "+MRContainer.type_names[type]+" in '"+text+"'");
+        return null;
+    }
+
+    public Bag parse ( String line ) {
+        try {
+            if (line == null)
+                return new Bag();
+            Tuple t = new Tuple(type_length);
+            int loc = 0;
+            int j = 0;
+            for ( int i = 0; i < types.length; i++ ) {
+                int k = line.indexOf(delimiter,loc);
+                if (types[i] >= 0) {
+                    String s = (k > 0) ? line.substring(loc,k) : line.substring(loc);
+                    MRData v = parse_value(s,types[i]);
+                    if (v == null)
+                        return new Bag();
+                    t.set(j++,v);
+                };
+                loc = k+delimiter.length();
+                if (k < 0 && i+1 < types.length) {
+                    System.err.println("*** Incomplete parsed text line: "+line);
+                    return new Bag();
+                }
+            };
+            return new Bag(t);
+        } catch ( Exception e ) {
+            System.err.println("*** Cannot parse the text line: "+line);
+            return new Bag();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/core/src/main/java/org/apache/mrql/MRContainer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/MRContainer.java b/core/src/main/java/org/apache/mrql/MRContainer.java
new file mode 100644
index 0000000..d72c452
--- /dev/null
+++ b/core/src/main/java/org/apache/mrql/MRContainer.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import java.io.*;
+import org.apache.hadoop.io.WritableComparable;
+
+
+/** A container for MRData that implements read (the deserializer) */
+final public class MRContainer implements WritableComparable<MRContainer>, Serializable {
+    transient MRData data;
+
+    public final static byte BOOLEAN = 0, BYTE = 1, SHORT = 2, INT = 3, LONG = 4,
+        FLOAT = 5, DOUBLE = 6, CHAR = 7, STRING = 8, PAIR = 9, TUPLE = 10, BAG = 11,
+        LAZY_BAG = 12, END_OF_LAZY_BAG = 13, UNION = 14, INV = 15, LAMBDA = 16,
+        VARIABLE = 17, TRIPLE = 18, NULL = 19, DATASET = 20, SYNC = 99, MORE_BSP_STEPS = 98;
+
+    public final static byte[] type_codes
+        = { BOOLEAN, BYTE, SHORT, INT, LONG, FLOAT, DOUBLE, CHAR, STRING, NULL, PAIR, TRIPLE,
+            TUPLE, BAG, LAZY_BAG, END_OF_LAZY_BAG, UNION, INV, LAMBDA, VARIABLE, SYNC };
+
+    public final static String[] type_names
+        = { "boolean", "byte", "short", "int", "long", "float", "double", "char", "string",
+            "null", "pair", "triple", "tuple", "bag", "lazy_bag", "end_of_lazy_bag", "union",
+            "inv", "lambda", "variable", "sync", "more_bsp_steps" };
+
+    public static byte type_code ( String type_name ) {
+        for ( byte i = 0; i < type_names.length; i ++ )
+            if (type_names[i].equals(type_name))
+                return type_codes[i];
+        return -1;
+    }
+
+    public static boolean basic_type ( byte type_code ) {
+        return type_code >= 0 && type_code <= 8;
+    }
+
+    MRContainer ( MRData d ) { data = d; }
+
+    MRContainer () { data = null; }
+
+    public final static MRData end_of_lazy_bag = new MR_EOLB();
+
+    MRData data () { return data; }
+
+    public void set ( MRData v ) { data = v; }
+    final public void write ( DataOutput out ) throws IOException { data.write(out); }
+    public void readFields ( DataInput in ) throws IOException { data = read(in); }
+    public int compareTo ( MRContainer x ) { return data.compareTo(x.data); }
+    public boolean equals ( Object x ) { return data.equals(x); }
+    public int hashCode () { return data.hashCode(); }
+    public String toString () { return data.toString(); }
+
+    final public static MRData read ( DataInput in ) throws IOException {
+        final byte tag = in.readByte();
+        switch (tag) {
+        case TUPLE: return Tuple.read(in);
+        case NULL: return new Tuple(0);
+        case PAIR: return Tuple.read2(in);
+        case TRIPLE: return Tuple.read3(in);
+        case BAG: return Bag.read(in);
+        case LAZY_BAG: return Bag.lazy_read(in);
+        case END_OF_LAZY_BAG: return end_of_lazy_bag;
+        case UNION: return Union.read(in);
+        case INV: return Inv.read(in);
+        case BOOLEAN: return MR_bool.read(in);
+        case BYTE: return MR_byte.read(in);
+        case SHORT: return MR_short.read(in);
+        case INT: return MR_int.read(in);
+        case LONG: return MR_long.read(in);
+        case FLOAT: return MR_float.read(in);
+        case DOUBLE: return MR_double.read(in);
+        case CHAR: return MR_char.read(in);
+        case STRING: return MR_string.read(in);
+        case SYNC: return new MR_sync();
+        case MORE_BSP_STEPS: return new MR_more_bsp_steps();
+        };
+        throw new Error("Unrecognized MRQL type tag: "+tag);
+    }
+
+    final public static int compare ( byte[] x, int xs, int xl, byte[] y, int ys, int yl, int[] size ) {
+        if (x[xs] != y[ys])
+            return x[xs] - y[ys];
+        switch (x[xs]) {
+        case TUPLE: return Tuple.compare(x,xs+1,xl-1,y,ys+1,yl-1,size);
+        case NULL: return 0;
+        case PAIR: return Tuple.compare2(x,xs+1,xl-1,y,ys+1,yl-1,size);
+        case TRIPLE: return Tuple.compare3(x,xs+1,xl-1,y,ys+1,yl-1,size);
+        case BAG: return Bag.compare(x,xs+1,xl-1,y,ys+1,yl-1,size);
+        case UNION: return Union.compare(x,xs+1,xl-1,y,ys+1,yl-1,size);
+        case INV: return Inv.compare(x,xs+1,xl-1,y,ys+1,yl-1,size);
+        case BOOLEAN: return MR_bool.compare(x,xs+1,xl-1,y,ys+1,yl-1,size);
+        case BYTE: return MR_byte.compare(x,xs+1,xl-1,y,ys+1,yl-1,size);
+        case SHORT: return MR_short.compare(x,xs+1,xl-1,y,ys+1,yl-1,size);
+        case INT: return MR_int.compare(x,xs+1,xl-1,y,ys+1,yl-1,size);
+        case LONG: return MR_long.compare(x,xs+1,xl-1,y,ys+1,yl-1,size);
+        case FLOAT: return MR_float.compare(x,xs+1,xl-1,y,ys+1,yl-1,size);
+        case DOUBLE: return MR_double.compare(x,xs+1,xl-1,y,ys+1,yl-1,size);
+        case CHAR: return MR_char.compare(x,xs+1,xl-1,y,ys+1,yl-1,size);
+        case STRING: return MR_string.compare(x,xs+1,xl-1,y,ys+1,yl-1,size);
+        case SYNC: return 0;
+        case MORE_BSP_STEPS: return 0;
+        };
+        throw new Error("Unrecognized MRQL type tag: "+x[xs]);
+    }
+
+    private void writeObject ( ObjectOutputStream out ) throws IOException {
+        data.write(out);
+    }
+
+    private void readObject ( ObjectInputStream in ) throws IOException, ClassNotFoundException {
+        data = read(in);
+    }
+
+    private void readObjectNoData () throws ObjectStreamException {}
+
+    final static class MR_EOLB extends MRData {
+        MR_EOLB () {}
+
+        public void materializeAll () {};
+
+        final public void write ( DataOutput out ) throws IOException {
+            out.writeByte(MRContainer.END_OF_LAZY_BAG);
+        }
+
+        public void readFields ( DataInput in ) throws IOException {}
+
+        public int compareTo ( MRData x ) { return 0; }
+
+        public boolean equals ( Object x ) { return x instanceof MR_EOLB; }
+
+        public int hashCode () { return 0; }
+
+        public String toString () {
+            return "end_of_lazy_bag";
+        }
+    }
+}