You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mrql.apache.org by fe...@apache.org on 2013/09/06 22:57:36 UTC

[18/18] git commit: MRQL-16: correct source files. ASF licenses, and POMs for release

MRQL-16: correct source files. ASF licenses, and POMs for release


Project: http://git-wip-us.apache.org/repos/asf/incubator-mrql/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-mrql/commit/3de9e485
Tree: http://git-wip-us.apache.org/repos/asf/incubator-mrql/tree/3de9e485
Diff: http://git-wip-us.apache.org/repos/asf/incubator-mrql/diff/3de9e485

Branch: refs/heads/master
Commit: 3de9e4853a211fa3ee8e7470c9295d8b12240a73
Parents: 1de63a6
Author: fegaras <fe...@cse.uta.edu>
Authored: Fri Sep 6 15:56:43 2013 -0500
Committer: fegaras <fe...@cse.uta.edu>
Committed: Fri Sep 6 15:56:43 2013 -0500

----------------------------------------------------------------------
 Makefile                                        |    2 +-
 build.xml                                       |    1 +
 pom.xml                                         |    6 +-
 queries/RMAT.mrql                               |   18 +
 queries/dblp-pagerank.mrql                      |   20 +-
 queries/kmeans.mrql                             |   18 +
 queries/kmeans2.mrql                            |   18 +
 queries/pagerank.mrql                           |   18 +
 queries/pagerank2.mrql                          |   18 +
 queries/points.mrql                             |   18 +
 src/main/assembly/bin.xml                       |    1 +
 src/main/java/BSP/BSPPlan.java                  |  840 +++---
 src/main/java/BSP/BinaryInputFormat.java        |   44 +-
 src/main/java/BSP/Evaluator.gen                 |  234 +-
 src/main/java/BSP/GeneratorInputFormat.java     |   90 +-
 src/main/java/BSP/MRQLFileInputFormat.java      |  122 +-
 src/main/java/BSP/MultipleBSPInput.java         |   32 +-
 src/main/java/BSP/ParsedInputFormat.java        |  116 +-
 src/main/java/MapReduce/BinaryInputFormat.java  |   76 +-
 .../java/MapReduce/CrossProductOperation.java   |  348 +--
 src/main/java/MapReduce/Evaluator.gen           |  404 +--
 .../java/MapReduce/GeneratorInputFormat.java    |  118 +-
 src/main/java/MapReduce/GroupByJoinPlan.java    |  614 ++---
 src/main/java/MapReduce/JoinOperation.java      |  604 ++---
 .../java/MapReduce/MRQLFileInputFormat.java     |   16 +-
 src/main/java/MapReduce/MapJoinOperation.java   |  304 +--
 src/main/java/MapReduce/MapOperation.java       |  166 +-
 src/main/java/MapReduce/MapReduceOperation.java |  406 +--
 src/main/java/MapReduce/MapReducePlan.java      |  120 +-
 src/main/java/MapReduce/ParsedInputFormat.java  |  164 +-
 src/main/java/core/AlgebraicOptimization.gen    |  488 ++--
 src/main/java/core/BSPTranslator.gen            | 1246 ++++-----
 src/main/java/core/Bag.java                     |  648 ++---
 src/main/java/core/BagIterator.java             |    2 +-
 src/main/java/core/BinaryDataSource.java        |    6 +-
 src/main/java/core/ClassImporter.java           |  232 +-
 src/main/java/core/Compiler.gen                 |  966 +++----
 src/main/java/core/Config.java                  |  340 +--
 src/main/java/core/DataSet.java                 |   48 +-
 src/main/java/core/DataSource.java              |  166 +-
 src/main/java/core/Environment.java             |   12 +-
 src/main/java/core/GeneratorDataSource.java     |   22 +-
 src/main/java/core/Interpreter.gen              | 1386 +++++-----
 src/main/java/core/Inv.java                     |   20 +-
 src/main/java/core/JsonParser.java              |   82 +-
 src/main/java/core/JsonSplitter.java            |  146 +-
 src/main/java/core/Lambda.java                  |    8 +-
 src/main/java/core/LineParser.gen               |  310 +--
 src/main/java/core/MRContainer.java             |  148 +-
 src/main/java/core/MRData.java                  |    6 +-
 src/main/java/core/MRQL.java                    |   32 +-
 src/main/java/core/MR_bool.java                 |   24 +-
 src/main/java/core/MR_byte.java                 |   22 +-
 src/main/java/core/MR_char.java                 |   20 +-
 src/main/java/core/MR_dataset.java              |   16 +-
 src/main/java/core/MR_double.java               |   26 +-
 src/main/java/core/MR_float.java                |   26 +-
 src/main/java/core/MR_int.java                  |   30 +-
 src/main/java/core/MR_long.java                 |   32 +-
 src/main/java/core/MR_more_bsp_steps.java       |    4 +-
 src/main/java/core/MR_short.java                |   20 +-
 src/main/java/core/MR_string.java               |   28 +-
 src/main/java/core/MR_sync.java                 |    2 +-
 src/main/java/core/MR_variable.java             |    2 +-
 src/main/java/core/Main.java                    |  238 +-
 src/main/java/core/MapReduceAlgebra.java        | 1076 ++++----
 src/main/java/core/Materialization.gen          |  162 +-
 src/main/java/core/MethodInfo.java              |   38 +-
 src/main/java/core/Normalization.gen            |  664 ++---
 src/main/java/core/ParsedDataSource.java        |   46 +-
 src/main/java/core/Plan.java                    |  484 ++--
 src/main/java/core/PlanGeneration.gen           | 1244 ++++-----
 src/main/java/core/Printer.gen                  |  794 +++---
 src/main/java/core/QueryPlan.gen                | 1524 +++++------
 src/main/java/core/Simplification.gen           |  646 ++---
 src/main/java/core/SystemFunctions.java         |  404 +--
 src/main/java/core/Test.java                    |  144 +-
 src/main/java/core/TopLevel.gen                 |  430 +--
 src/main/java/core/Translator.gen               |  474 ++--
 src/main/java/core/Tuple.java                   |  212 +-
 src/main/java/core/TypeInference.gen            | 2468 +++++++++---------
 src/main/java/core/Union.java                   |   40 +-
 src/main/java/core/XMLParser.java               |  102 +-
 src/main/java/core/XMLSplitter.java             |  172 +-
 src/main/java/core/XPathParser.java             | 1170 ++++-----
 src/main/java/core/mrql.cgen                    |  566 ++--
 src/main/java/gen/Condition.java                |   12 +-
 src/main/java/gen/Crypt.java                    |   58 +-
 src/main/java/gen/DoubleLeaf.java               |   16 +-
 src/main/java/gen/LongLeaf.java                 |   16 +-
 src/main/java/gen/Main.java                     |   68 +-
 src/main/java/gen/Meta.java                     |  362 +--
 src/main/java/gen/Node.java                     |   46 +-
 src/main/java/gen/StringLeaf.java               |   12 +-
 src/main/java/gen/SymbolTable.java              |  188 +-
 src/main/java/gen/Tree.java                     |   30 +-
 src/main/java/gen/Trees.java                    |  174 +-
 src/main/java/gen/VariableLeaf.java             |   14 +-
 src/main/java/spark/BinaryInputFormat.java      |   34 +-
 src/main/java/spark/Evaluator.gen               | 1056 ++++----
 src/main/java/spark/GeneratorInputFormat.java   |  124 +-
 src/main/java/spark/MRQLFileInputFormat.java    |   76 +-
 src/main/java/spark/MR_rdd.java                 |    8 +-
 src/main/java/spark/ParsedInputFormat.java      |  106 +-
 src/main/java/spark/RDDDataSource.java          |    6 +-
 tests/queries/core_1.mrql                       |   18 +
 tests/queries/distinct_1.mrql                   |   18 +
 tests/queries/factorization_1.mrql              |   18 +
 tests/queries/group_by_1.mrql                   |   18 +
 tests/queries/group_by_having_1.mrql            |   18 +
 tests/queries/group_by_order_by_1.mrql          |   18 +
 tests/queries/group_by_order_by_2.mrql          |   18 +
 tests/queries/join_group_by_1.mrql              |   18 +
 tests/queries/join_group_by_2.mrql              |   18 +
 tests/queries/join_group_by_3.mrql              |   18 +
 tests/queries/join_order_by_1.mrql              |   18 +
 tests/queries/joins_1.mrql                      |   18 +
 tests/queries/kmeans_1.mrql                     |   18 +
 tests/queries/matrix_1.mrql                     |   18 +
 tests/queries/nested_select_1.mrql              |   18 +
 tests/queries/order_by_1.mrql                   |   18 +
 tests/queries/pagerank_1.mrql                   |   18 +
 tests/queries/relational_join_1.mrql            |   18 +
 tests/queries/total_aggregation_1.mrql          |   18 +
 tests/queries/total_aggregation_2.mrql          |   18 +
 tests/queries/udf_1.mrql                        |   18 +
 tests/queries/user_aggregation_1.mrql           |   18 +
 tests/queries/xml_1.mrql                        |   18 +
 tests/queries/xml_2.mrql                        |   18 +
 129 files changed, 14010 insertions(+), 13448 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/3de9e485/Makefile
----------------------------------------------------------------------
diff --git a/Makefile b/Makefile
index 531eac8..767755b 100644
--- a/Makefile
+++ b/Makefile
@@ -65,7 +65,7 @@ common: clean_build gen mrql_parser json_parser
 
 clean_build:
 	@rm -rf classes tmp
-	@mkdir -p classes tmp tests/results tests/results/mr-memory tests/results/bsp-memory tests/results/hadoop tests/results/bsp tests/results/spark
+	@mkdir -p lib classes tmp tests/results tests/results/mr-memory tests/results/bsp-memory tests/results/hadoop tests/results/bsp tests/results/spark
 
 gen:
 	@${JFLEX} src/main/java/gen/gen.lex -d tmp

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/3de9e485/build.xml
----------------------------------------------------------------------
diff --git a/build.xml b/build.xml
index 6482916..1cc4f78 100644
--- a/build.xml
+++ b/build.xml
@@ -124,6 +124,7 @@
 	  description="Clean the build directories">
     <delete dir="classes"/>
     <delete dir="tmp"/>
+    <mkdir dir="lib"/>
     <mkdir dir="classes"/>
     <mkdir dir="tmp"/>
     <mkdir dir="tests/results"/>

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/3de9e485/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 448bea0..b8709aa 100644
--- a/pom.xml
+++ b/pom.xml
@@ -103,6 +103,7 @@
         <version>2.2</version>
         <configuration>
           <mavenExecutorId>forked-path</mavenExecutorId>
+          <autoVersionSubmodules>true</autoVersionSubmodules>
         </configuration>
       </plugin>
       <plugin>
@@ -123,8 +124,9 @@
 	    <exclude>.gitignore</exclude>
             <exclude>.classpath/**</exclude>
             <exclude>.project/**</exclude>
-            <exclude>tests/**</exclude>
-            <exclude>queries/**</exclude>
+            <exclude>tests/data/**</exclude>
+            <exclude>tests/results/**</exclude>
+            <exclude>tests/error_log.txt</exclude>
             <exclude>**/dependency-reduced-pom.xml</exclude>
             <exclude>**/docs/apidocs/**</exclude>
           </excludes>

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/3de9e485/queries/RMAT.mrql
----------------------------------------------------------------------
diff --git a/queries/RMAT.mrql b/queries/RMAT.mrql
index 771737b..1fb1181 100644
--- a/queries/RMAT.mrql
+++ b/queries/RMAT.mrql
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 // Kronecker graph generator parameters:
 a = 0.57;
 b = 0.19;

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/3de9e485/queries/dblp-pagerank.mrql
----------------------------------------------------------------------
diff --git a/queries/dblp-pagerank.mrql b/queries/dblp-pagerank.mrql
index 5cbd672..cd6ae85 100644
--- a/queries/dblp-pagerank.mrql
+++ b/queries/dblp-pagerank.mrql
@@ -1,4 +1,22 @@
-DBLP = source(xml,"/home/fegaras/data/dblp2.xml",{"article","incollection","book","inproceedings"});
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+DBLP = source(xml,args[0],{"article","incollection","book","inproceedings"});
 
 graph = select (key,select text(x) from x in c)
           from a in DBLP,

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/3de9e485/queries/kmeans.mrql
----------------------------------------------------------------------
diff --git a/queries/kmeans.mrql b/queries/kmeans.mrql
index 3934f0c..dd85fd4 100644
--- a/queries/kmeans.mrql
+++ b/queries/kmeans.mrql
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 type point = < X: double, Y: double >;
 
 function distance ( x: point, y: point ): double {

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/3de9e485/queries/kmeans2.mrql
----------------------------------------------------------------------
diff --git a/queries/kmeans2.mrql b/queries/kmeans2.mrql
index d34f86c..306a6f6 100644
--- a/queries/kmeans2.mrql
+++ b/queries/kmeans2.mrql
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 type point = < X: double, Y: double, Z: double >;
 
 function distance ( x: point, y: point ): double {

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/3de9e485/queries/pagerank.mrql
----------------------------------------------------------------------
diff --git a/queries/pagerank.mrql b/queries/pagerank.mrql
index ce4e46d..ba1707c 100644
--- a/queries/pagerank.mrql
+++ b/queries/pagerank.mrql
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 // preprocessing: for each node, group its outgoing links into a bag
 graph = select (key,n#1)
           from n in source(binary,"graph.bin")

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/3de9e485/queries/pagerank2.mrql
----------------------------------------------------------------------
diff --git a/queries/pagerank2.mrql b/queries/pagerank2.mrql
index d28a6fd..9f38aad 100644
--- a/queries/pagerank2.mrql
+++ b/queries/pagerank2.mrql
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 // preprocessing: for each node, group its outgoing links into a bag
 graph = select (key,select x.to from x in n)
           from n in source(line,"queries/links.txt",",",type(<id:string,to:string>))

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/3de9e485/queries/points.mrql
----------------------------------------------------------------------
diff --git a/queries/points.mrql b/queries/points.mrql
index b29c81d..155623e 100644
--- a/queries/points.mrql
+++ b/queries/points.mrql
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 // generate random points in the 4 squares: [2..4,6..8]*[2..4,6..8]
 // used in kmeans.mrql
 

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/3de9e485/src/main/assembly/bin.xml
----------------------------------------------------------------------
diff --git a/src/main/assembly/bin.xml b/src/main/assembly/bin.xml
index aba99d9..0a0192a 100644
--- a/src/main/assembly/bin.xml
+++ b/src/main/assembly/bin.xml
@@ -31,6 +31,7 @@
         <include>LICENSE*</include>
         <include>NOTICE*</include>
         <include>conf/mrql-env.sh</include>
+        <include>**/apidocs/**</include>
       </includes>
       <excludes>
         <exclude>**/*~</exclude>

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/3de9e485/src/main/java/BSP/BSPPlan.java
----------------------------------------------------------------------
diff --git a/src/main/java/BSP/BSPPlan.java b/src/main/java/BSP/BSPPlan.java
index 4ccd95d..0ad30e9 100644
--- a/src/main/java/BSP/BSPPlan.java
+++ b/src/main/java/BSP/BSPPlan.java
@@ -32,364 +32,364 @@ import org.apache.hadoop.conf.Configuration;
 final public class BSPPlan extends Plan {
 
     final static Configuration getConfiguration ( BSPJob job ) {
-	return job.getConf();   // use job.getConfiguration() for Hama 0.6.0
+        return job.getConf();   // use job.getConfiguration() for Hama 0.6.0
     }
 
     /** The BSP evaluator */
     final static class BSPop extends BSP<MRContainer,MRContainer,MRContainer,MRContainer,MRContainer> {
-	final static MRContainer null_key = new MRContainer(new MR_byte(0));
-	// special message for sub-sync()
-	final static MRData more_to_come = new MR_sync();
-	final static MRData more_supersteps = new MR_more_bsp_steps();
+        final static MRContainer null_key = new MRContainer(new MR_byte(0));
+        // special message for sub-sync()
+        final static MRData more_to_come = new MR_sync();
+        final static MRData more_supersteps = new MR_more_bsp_steps();
 
-	private int source_num;
-	private Function superstep_fnc;      // superstep function
-	private MRData state;                // BSP state
-	private boolean orderp;              // will output be ordered?
-	private MRData source;               // BSP input
-	private Function acc_fnc;            // aggregator
-	private MRData acc_result;           // aggregation result
-	private static String[] all_peer_names;   // all BSP peer names
-	private static BSPPeer<MRContainer,MRContainer,MRContainer,MRContainer,MRContainer>[] all_peers;   // all BSP peers
-	// a master peer that coordinates and collects results of partial aggregations
-	private String masterTask;
-	// buffer for received messages -- regularly in a vector, but can be spilled in a local file
-	Bag msg_cache;
-	// the cache that holds all local data in memory
-	Tuple local_cache;
+        private int source_num;
+        private Function superstep_fnc;      // superstep function
+        private MRData state;                // BSP state
+        private boolean orderp;              // will output be ordered?
+        private MRData source;               // BSP input
+        private Function acc_fnc;            // aggregator
+        private MRData acc_result;           // aggregation result
+        private static String[] all_peer_names;   // all BSP peer names
+        private static BSPPeer<MRContainer,MRContainer,MRContainer,MRContainer,MRContainer>[] all_peers;   // all BSP peers
+        // a master peer that coordinates and collects results of partial aggregations
+        private String masterTask;
+        // buffer for received messages -- regularly in a vector, but can be spilled in a local file
+        Bag msg_cache;
+        // the cache that holds all local data in memory
+        Tuple local_cache;
 
-	private static BSPPeer<MRContainer,MRContainer,MRContainer,MRContainer,MRContainer> getPeer ( String name ) {
-	    for ( int i = 0; i < all_peer_names.length; i++ )
-		if (all_peer_names[i].equals(name))
-		    return all_peers[i];
-	    throw new Error("Unknown peer: "+name);
-	}
+        private static BSPPeer<MRContainer,MRContainer,MRContainer,MRContainer,MRContainer> getPeer ( String name ) {
+            for ( int i = 0; i < all_peer_names.length; i++ )
+                if (all_peer_names[i].equals(name))
+                    return all_peers[i];
+            throw new Error("Unknown peer: "+name);
+        }
 
-	private static void setPeer ( BSPPeer<MRContainer,MRContainer,MRContainer,MRContainer,MRContainer> peer ) {
-	    String name = peer.getPeerName();
-	    for ( int i = 0; i < all_peer_names.length; i++ )
-		if (all_peer_names[i].equals(name))
-		    all_peers[i] = peer;
-	}
+        private static void setPeer ( BSPPeer<MRContainer,MRContainer,MRContainer,MRContainer,MRContainer> peer ) {
+            String name = peer.getPeerName();
+            for ( int i = 0; i < all_peer_names.length; i++ )
+                if (all_peer_names[i].equals(name))
+                    all_peers[i] = peer;
+        }
 
-	/** shuffle values to BSP peers based on uniform hashing on key */
-	private static String shuffle ( MRData key ) {
-	    return all_peer_names[Math.abs(key.hashCode()) % all_peer_names.length];
-	}
+        /** shuffle values to BSP peers based on uniform hashing on key */
+        private static String shuffle ( MRData key ) {
+            return all_peer_names[Math.abs(key.hashCode()) % all_peer_names.length];
+        }
 
-	/** to exit a BSP loop, all peers must agree to exit (this is used in BSPTranslate.bspSimplify) */
-	public static MR_bool synchronize ( MR_string peerName, MR_bool mr_exit ) {
-	    return synchronize(getPeer(peerName.get()),mr_exit);
-	}
+        /** to exit a BSP loop, all peers must agree to exit (this is used in BSPTranslate.bspSimplify) */
+        public static MR_bool synchronize ( MR_string peerName, MR_bool mr_exit ) {
+            return synchronize(getPeer(peerName.get()),mr_exit);
+        }
 
-	/** to exit a BSP loop, all peers must agree to exit (this is used in BSPTranslate.bspSimplify) */
-	public static MR_bool synchronize ( BSPPeer<MRContainer,MRContainer,MRContainer,MRContainer,MRContainer> peer, MR_bool mr_exit ) {
-	    if (!Config.hadoop_mode)
-		return mr_exit;
-	    // shortcut: if we know for sure that all peers want to exit/continue, we don't have to poll
-	    if (mr_exit == SystemFunctions.bsp_true_value          // must be ==, not equals
-		|| mr_exit == SystemFunctions.bsp_false_value)
-		return mr_exit;
-	    try {
-		// this case is only used for checking the exit condition of repeat/closure
-		boolean exit = mr_exit.get();
-		if (all_peer_names.length <= 1)
-		    return (exit) ? SystemFunctions.bsp_true_value : SystemFunctions.bsp_false_value;
-		if (!exit)
-		    // this peer is not ready to exit, so no peer should exit
-		    for ( String p: peer.getAllPeerNames() )
-			peer.send(p,new MRContainer(more_supersteps));
-		peer.sync();
-		// now exit is true if no peer sent a "more_supersteps" message
-		exit = peer.getNumCurrentMessages() == 0;
-		peer.clear();
-		return (exit) ? SystemFunctions.bsp_true_value : SystemFunctions.bsp_false_value;
-	    } catch (Exception ex) {
-		throw new Error(ex);
-	    }
-	}
+        /** to exit a BSP loop, all peers must agree to exit (this is used in BSPTranslate.bspSimplify) */
+        public static MR_bool synchronize ( BSPPeer<MRContainer,MRContainer,MRContainer,MRContainer,MRContainer> peer, MR_bool mr_exit ) {
+            if (!Config.hadoop_mode)
+                return mr_exit;
+            // shortcut: if we know for sure that all peers want to exit/continue, we don't have to poll
+            if (mr_exit == SystemFunctions.bsp_true_value          // must be ==, not equals
+                || mr_exit == SystemFunctions.bsp_false_value)
+                return mr_exit;
+            try {
+                // this case is only used for checking the exit condition of repeat/closure
+                boolean exit = mr_exit.get();
+                if (all_peer_names.length <= 1)
+                    return (exit) ? SystemFunctions.bsp_true_value : SystemFunctions.bsp_false_value;
+                if (!exit)
+                    // this peer is not ready to exit, so no peer should exit
+                    for ( String p: peer.getAllPeerNames() )
+                        peer.send(p,new MRContainer(more_supersteps));
+                peer.sync();
+                // now exit is true if no peer sent a "more_supersteps" message
+                exit = peer.getNumCurrentMessages() == 0;
+                peer.clear();
+                return (exit) ? SystemFunctions.bsp_true_value : SystemFunctions.bsp_false_value;
+            } catch (Exception ex) {
+                throw new Error(ex);
+            }
+        }
 
-	/** collect a bag from all peers by distributing the local copy s */
-	public static Bag distribute ( MR_string peerName, Bag s ) {
-	    BSPPeer<MRContainer,MRContainer,MRContainer,MRContainer,MRContainer> peer = getPeer(peerName.get());
-	    if (!Config.hadoop_mode)
-		return s;
-	    try {
-		for ( MRData e: s )
-		    for ( String p: all_peer_names )
-			peer.send(p,new MRContainer(e));
-		peer.sync();
-		MRContainer msg;
-		Bag res = new Bag();
-		while ((msg = peer.getCurrentMessage()) != null)
-		    if (!res.contains(msg.data()))
-			res.add(msg.data());
-		peer.clear();
-		return res;
-	    } catch (Exception ex) {
-		throw new Error(ex);
-	    }
-	}
+        /** collect a bag from all peers by distributing the local copy s */
+        public static Bag distribute ( MR_string peerName, Bag s ) {
+            BSPPeer<MRContainer,MRContainer,MRContainer,MRContainer,MRContainer> peer = getPeer(peerName.get());
+            if (!Config.hadoop_mode)
+                return s;
+            try {
+                for ( MRData e: s )
+                    for ( String p: all_peer_names )
+                        peer.send(p,new MRContainer(e));
+                peer.sync();
+                MRContainer msg;
+                Bag res = new Bag();
+                while ((msg = peer.getCurrentMessage()) != null)
+                    if (!res.contains(msg.data()))
+                        res.add(msg.data());
+                peer.clear();
+                return res;
+            } catch (Exception ex) {
+                throw new Error(ex);
+            }
+        }
 
-	private void readLocalSources ( BSPPeer<MRContainer,MRContainer,MRContainer,MRContainer,MRContainer> peer )
-	        throws IOException {
-	    MRContainer key = new MRContainer();
-	    MRContainer value = new MRContainer();
-	    while (peer.readNext(key,value)) {
-		Tuple p = (Tuple)(value.data());
-		((Bag)local_cache.get(((MR_int)p.first()).get())).add(p.second());
-	    }
-	}
+        private void readLocalSources ( BSPPeer<MRContainer,MRContainer,MRContainer,MRContainer,MRContainer> peer )
+                throws IOException {
+            MRContainer key = new MRContainer();
+            MRContainer value = new MRContainer();
+            while (peer.readNext(key,value)) {
+                Tuple p = (Tuple)(value.data());
+                ((Bag)local_cache.get(((MR_int)p.first()).get())).add(p.second());
+            }
+        }
 
-	private void writeLocalResult ( Bag result,
-					BSPPeer<MRContainer,MRContainer,MRContainer,MRContainer,MRContainer> peer )
-	        throws IOException {
-	    MRContainer key = new MRContainer();
-	    MRContainer value = new MRContainer();
-	    for ( MRData v: result )
-		if (orderp) {       // prepare for sorting
-		    Tuple t = (Tuple)v;
-		    key.set(t.get(1));
-		    value.set(t.get(0));
-		    peer.write(key,value);
-		} else {
-		    value.set(v);
-		    peer.write(null_key,value);
-		}
-	}
+        private void writeLocalResult ( Bag result,
+                                        BSPPeer<MRContainer,MRContainer,MRContainer,MRContainer,MRContainer> peer )
+                throws IOException {
+            MRContainer key = new MRContainer();
+            MRContainer value = new MRContainer();
+            for ( MRData v: result )
+                if (orderp) {       // prepare for sorting
+                    Tuple t = (Tuple)v;
+                    key.set(t.get(1));
+                    value.set(t.get(0));
+                    peer.write(key,value);
+                } else {
+                    value.set(v);
+                    peer.write(null_key,value);
+                }
+        }
 
-	/** receive messages from other peers */
-	private void receive_messages ( final BSPPeer<MRContainer,MRContainer,MRContainer,MRContainer,MRContainer> peer )
-	        throws IOException, SyncException, InterruptedException {
-	    if (Config.bsp_msg_size <= 0) {    // no buffering
-		msg_cache = new Bag(new BagIterator() {
-			MRContainer msg;
-			public boolean hasNext () {
-			    try {
-				return (msg = peer.getCurrentMessage()) != null;
-			    } catch (Exception ex) {
-				throw new Error(ex);
-			    }
-			} 
-			public MRData next () {
-			    return msg.data();
-			}
-		    });
-	    } else {
-		boolean expect_more = false;   // are we expecting more incoming messages?
-		do {
-		    // just in case this peer did a regular-sync() before the others did a sub-sync()
-		    expect_more = false;
-		    MRContainer msg;
-		    // cache the received messages
-		    while ((msg = peer.getCurrentMessage()) != null)
-			// if at least one peer sends a more_to_come message, then expect_more
-			if (msg.data().equals(more_to_come))
-			    expect_more = true;
-			else msg_cache.add(msg.data());
-		    if (expect_more)
-			peer.sync();   // sub-sync()
-		} while (expect_more);
-	    }
-	}
+        /** receive messages from other peers */
+        private void receive_messages ( final BSPPeer<MRContainer,MRContainer,MRContainer,MRContainer,MRContainer> peer )
+                throws IOException, SyncException, InterruptedException {
+            if (Config.bsp_msg_size <= 0) {    // no buffering
+                msg_cache = new Bag(new BagIterator() {
+                        MRContainer msg;
+                        public boolean hasNext () {
+                            try {
+                                return (msg = peer.getCurrentMessage()) != null;
+                            } catch (Exception ex) {
+                                throw new Error(ex);
+                            }
+                        } 
+                        public MRData next () {
+                            return msg.data();
+                        }
+                    });
+            } else {
+                boolean expect_more = false;   // are we expecting more incoming messages?
+                do {
+                    // just in case this peer did a regular-sync() before the others did a sub-sync()
+                    expect_more = false;
+                    MRContainer msg;
+                    // cache the received messages
+                    while ((msg = peer.getCurrentMessage()) != null)
+                        // if at least one peer sends a more_to_come message, then expect_more
+                        if (msg.data().equals(more_to_come))
+                            expect_more = true;
+                        else msg_cache.add(msg.data());
+                    if (expect_more)
+                        peer.sync();   // sub-sync()
+                } while (expect_more);
+            }
+        }
 
-	/** send the messages produced by a superstep to peers and then receive the replies */
-	private void send_messages ( Bag msgs,
-				     BSPPeer<MRContainer,MRContainer,MRContainer,MRContainer,MRContainer> peer )
-	        throws IOException, SyncException, InterruptedException {
-	    int size = 0;
-	    if (Config.bsp_msg_size > 0)
-		msg_cache.clear();
-	    for ( MRData m: msgs ) {
-		Tuple t = (Tuple)m;
-		// if there are too many messages to send, then sub-sync()
-		if ( Config.bsp_msg_size > 0 && size++ > Config.bsp_msg_size ) {
-		    // tell all peers that there is more to come after sync
-		    for ( String p: all_peer_names )
-			if (!peer.getPeerName().equals(p))
-			    peer.send(p,new MRContainer(more_to_come));
-		    peer.sync();  // sub-sync()
-		    size = 0;
-		    MRContainer msg;
-		    // cache the received messages
-		    while ((msg = peer.getCurrentMessage()) != null)
-			if (!msg.data().equals(more_to_come))
-			    msg_cache.add(msg.data());
-		};
-		// suffle messages based on key; needs new MRContainer object
-		peer.send(shuffle(t.get(0)),new MRContainer(t.get(1)));
-	    };
-	    peer.sync();   // regular-sync()
-	    receive_messages(peer);
-	}
+        /** send the messages produced by a superstep to peers and then receive the replies */
+        private void send_messages ( Bag msgs,
+                                     BSPPeer<MRContainer,MRContainer,MRContainer,MRContainer,MRContainer> peer )
+                throws IOException, SyncException, InterruptedException {
+            int size = 0;
+            if (Config.bsp_msg_size > 0)
+                msg_cache.clear();
+            for ( MRData m: msgs ) {
+                Tuple t = (Tuple)m;
+                // if there are too many messages to send, then sub-sync()
+                if ( Config.bsp_msg_size > 0 && size++ > Config.bsp_msg_size ) {
+                    // tell all peers that there is more to come after sync
+                    for ( String p: all_peer_names )
+                        if (!peer.getPeerName().equals(p))
+                            peer.send(p,new MRContainer(more_to_come));
+                    peer.sync();  // sub-sync()
+                    size = 0;
+                    MRContainer msg;
+                    // cache the received messages
+                    while ((msg = peer.getCurrentMessage()) != null)
+                        if (!msg.data().equals(more_to_come))
+                            msg_cache.add(msg.data());
+                };
+                // suffle messages based on key; needs new MRContainer object
+                peer.send(shuffle(t.get(0)),new MRContainer(t.get(1)));
+            };
+            peer.sync();   // regular-sync()
+            receive_messages(peer);
+        }
 
-	@Override
-	public void bsp ( BSPPeer<MRContainer,MRContainer,MRContainer,MRContainer,MRContainer> peer )
-	       throws IOException, SyncException, InterruptedException {
-	    final Tuple stepin = new Tuple(4);
-	    stepin.set(3,new MR_string(peer.getPeerName()));
-	    Tuple result;
-	    boolean skip = false;
-	    String tabs = "";
-	    int step = 0;
-	    boolean exit;
-	    readLocalSources(peer);
-	    setPeer(peer);
-	    do {
-		if (!skip)
-		    step++;
-		if (!skip && Config.trace_execution) {
-		    tabs = Interpreter.tabs(Interpreter.tab_count);
-		    System.err.println(tabs+"  Superstep "+step+" ["+peer.getPeerName()+"]:");
-		    System.err.println(tabs+"      messages ["+peer.getPeerName()+"]: "+msg_cache);
-		    System.err.println(tabs+"      state ["+peer.getPeerName()+"]: "+state);
-		    for ( int i = 0; i < local_cache.size(); i++)
-			if (local_cache.get(i) instanceof Bag && ((Bag)local_cache.get(i)).size() > 0)
-			    System.out.println(tabs+"      cache ["+peer.getPeerName()+"] "+i+": "+local_cache.get(i));
-		};
-		stepin.set(0,local_cache);
-		stepin.set(1,msg_cache);
-		stepin.set(2,state);
-		// evaluate one superstep
-		result = (Tuple)superstep_fnc.eval(stepin);
-		Bag msgs = (Bag)result.get(0);
-		exit = ((MR_bool)result.get(2)).get();
-		state = result.get(1);
-		// shortcuts: if we know for sure that all peers want to exit/continue
-		if (result.get(2) == SystemFunctions.bsp_true_value) {  // must be ==, not equals
-		    peer.sync();
-		    if (Config.trace_execution)
-			System.err.println(tabs+"      result ["+peer.getPeerName()+"]: "+result);
-		    break;
-		};
-		if (result.get(2) == SystemFunctions.bsp_false_value) {
-		    if (Config.trace_execution)
-			System.err.println(tabs+"      result ["+peer.getPeerName()+"]: "+result);
-		    send_messages(msgs,peer);
-		    skip = false;
-		    continue;
-		};
-		// shortcut: skip is true when NONE of the peers sent any messages
-		skip = (msgs == SystemFunctions.bsp_empty_bag);  // must be ==, not equals
-		if (skip)
-		    continue;
-		if (Config.trace_execution)
-		    System.err.println(tabs+"      result ["+peer.getPeerName()+"]: "+result);
-		exit = synchronize(peer,(MR_bool)result.get(2)).get();
-		send_messages(msgs,peer);
-	    } while (!exit);
-	    if (acc_result == null) {
-		// the BSP result is a bag that needs to be dumped to the HDFS
-		writeLocalResult((Bag)(local_cache.get(source_num)),peer);
-		// if there more results, dump them to HDFS
-		final MR_long key = new MR_long(0);
-		final MRContainer key_container = new MRContainer(key);
-		final MRContainer data_container = new MRContainer(new MR_int(0));
-		int loc = 0;
-		while ( loc < all_peer_names.length && peer.getPeerName().equals(all_peer_names[loc]) )
-		    loc++;
-		Configuration conf = peer.getConfiguration();
-		String[] out_paths = conf.get("mrql.output.paths").split(",");
-		for ( int i = 1; i < out_paths.length; i++ ) {
-		    String[] s = out_paths[i].split(":");
-		    int out_num = Integer.parseInt(s[0]);
-		    Path path = new Path(s[1]+"/peer"+loc);
-		    FileSystem fs = path.getFileSystem(conf);
-		    SequenceFile.Writer writer
-			= new SequenceFile.Writer(fs,conf,path,
-						  MRContainer.class,MRContainer.class);
-		    long count = 0;
-		    for ( MRData e: (Bag)(local_cache.get(out_num)) ) {
-			key.set(count++);
-			data_container.set(e);
-			writer.append(key_container,data_container);
-		    };
-		    writer.close();
-		};
-	    } else {
-		// the BSP result is an aggregation:
-		//     send the partial results to the master peer
-		peer.send(masterTask,new MRContainer(local_cache.get(source_num)));
-		peer.sync();
-		if (peer.getPeerName().equals(masterTask)) {
-		    // only the master peer collects the partial aggregations
-		    MRContainer msg;
-		    while ((msg = peer.getCurrentMessage()) != null)
-			acc_result = acc_fnc.eval(new Tuple(acc_result,msg.data()));
-		    // write the final aggregation result
-		    peer.write(null_key,new MRContainer(acc_result));
-	       }
-	    }
-	}
+        @Override
+        public void bsp ( BSPPeer<MRContainer,MRContainer,MRContainer,MRContainer,MRContainer> peer )
+               throws IOException, SyncException, InterruptedException {
+            final Tuple stepin = new Tuple(4);
+            stepin.set(3,new MR_string(peer.getPeerName()));
+            Tuple result;
+            boolean skip = false;
+            String tabs = "";
+            int step = 0;
+            boolean exit;
+            readLocalSources(peer);
+            setPeer(peer);
+            do {
+                if (!skip)
+                    step++;
+                if (!skip && Config.trace_execution) {
+                    tabs = Interpreter.tabs(Interpreter.tab_count);
+                    System.err.println(tabs+"  Superstep "+step+" ["+peer.getPeerName()+"]:");
+                    System.err.println(tabs+"      messages ["+peer.getPeerName()+"]: "+msg_cache);
+                    System.err.println(tabs+"      state ["+peer.getPeerName()+"]: "+state);
+                    for ( int i = 0; i < local_cache.size(); i++)
+                        if (local_cache.get(i) instanceof Bag && ((Bag)local_cache.get(i)).size() > 0)
+                            System.out.println(tabs+"      cache ["+peer.getPeerName()+"] "+i+": "+local_cache.get(i));
+                };
+                stepin.set(0,local_cache);
+                stepin.set(1,msg_cache);
+                stepin.set(2,state);
+                // evaluate one superstep
+                result = (Tuple)superstep_fnc.eval(stepin);
+                Bag msgs = (Bag)result.get(0);
+                exit = ((MR_bool)result.get(2)).get();
+                state = result.get(1);
+                // shortcuts: if we know for sure that all peers want to exit/continue
+                if (result.get(2) == SystemFunctions.bsp_true_value) {  // must be ==, not equals
+                    peer.sync();
+                    if (Config.trace_execution)
+                        System.err.println(tabs+"      result ["+peer.getPeerName()+"]: "+result);
+                    break;
+                };
+                if (result.get(2) == SystemFunctions.bsp_false_value) {
+                    if (Config.trace_execution)
+                        System.err.println(tabs+"      result ["+peer.getPeerName()+"]: "+result);
+                    send_messages(msgs,peer);
+                    skip = false;
+                    continue;
+                };
+                // shortcut: skip is true when NONE of the peers sent any messages
+                skip = (msgs == SystemFunctions.bsp_empty_bag);  // must be ==, not equals
+                if (skip)
+                    continue;
+                if (Config.trace_execution)
+                    System.err.println(tabs+"      result ["+peer.getPeerName()+"]: "+result);
+                exit = synchronize(peer,(MR_bool)result.get(2)).get();
+                send_messages(msgs,peer);
+            } while (!exit);
+            if (acc_result == null) {
+                // the BSP result is a bag that needs to be dumped to the HDFS
+                writeLocalResult((Bag)(local_cache.get(source_num)),peer);
+                // if there more results, dump them to HDFS
+                final MR_long key = new MR_long(0);
+                final MRContainer key_container = new MRContainer(key);
+                final MRContainer data_container = new MRContainer(new MR_int(0));
+                int loc = 0;
+                while ( loc < all_peer_names.length && peer.getPeerName().equals(all_peer_names[loc]) )
+                    loc++;
+                Configuration conf = peer.getConfiguration();
+                String[] out_paths = conf.get("mrql.output.paths").split(",");
+                for ( int i = 1; i < out_paths.length; i++ ) {
+                    String[] s = out_paths[i].split(":");
+                    int out_num = Integer.parseInt(s[0]);
+                    Path path = new Path(s[1]+"/peer"+loc);
+                    FileSystem fs = path.getFileSystem(conf);
+                    SequenceFile.Writer writer
+                        = new SequenceFile.Writer(fs,conf,path,
+                                                  MRContainer.class,MRContainer.class);
+                    long count = 0;
+                    for ( MRData e: (Bag)(local_cache.get(out_num)) ) {
+                        key.set(count++);
+                        data_container.set(e);
+                        writer.append(key_container,data_container);
+                    };
+                    writer.close();
+                };
+            } else {
+                // the BSP result is an aggregation:
+                //     send the partial results to the master peer
+                peer.send(masterTask,new MRContainer(local_cache.get(source_num)));
+                peer.sync();
+                if (peer.getPeerName().equals(masterTask)) {
+                    // only the master peer collects the partial aggregations
+                    MRContainer msg;
+                    while ((msg = peer.getCurrentMessage()) != null)
+                        acc_result = acc_fnc.eval(new Tuple(acc_result,msg.data()));
+                    // write the final aggregation result
+                    peer.write(null_key,new MRContainer(acc_result));
+               }
+            }
+        }
 
-	@Override
-	@SuppressWarnings("unchecked")
-	public void setup ( BSPPeer<MRContainer,MRContainer,MRContainer,MRContainer,MRContainer> peer ) {
-	    try {
-		super.setup(peer);
-		Configuration conf = peer.getConfiguration();
-		Config.read(conf);
-		if (Plan.conf == null)
-		    Plan.conf = conf;
-		all_peer_names = peer.getAllPeerNames();
-		all_peers = new BSPPeerImpl[all_peer_names.length];
-		Arrays.sort(all_peer_names);  // is this necessary?
-		source_num = conf.getInt("mrql.output.tag",0);
-		Tree code = Tree.parse(conf.get("mrql.superstep"));
-		superstep_fnc = functional_argument(conf,code);
-		code = Tree.parse(conf.get("mrql.initial.state"));
-		state = Interpreter.evalE(code);
-		if (conf.get("mrql.zero") != null && !conf.get("mrql.zero").equals("")) {
-		    code = Tree.parse(conf.get("mrql.zero"));
-		    acc_result = Interpreter.evalE(code);
-		    code = Tree.parse(conf.get("mrql.accumulator"));
-		    acc_fnc = functional_argument(conf,code);
-		} else acc_result = null;
-		orderp = conf.getBoolean("mrql.orderp",false);
-		masterTask = all_peer_names[peer.getNumPeers()/2];
-		msg_cache = new Bag(1000);
-		local_cache = new Tuple(max_input_files);
-		for ( int i = 0; i < max_input_files; i++ )
-		    local_cache.set(i,new Bag());
-	    } catch (Exception e) {
-		e.printStackTrace();
-		throw new Error("Cannot setup the Hama BSP job: "+e);
-	    }
-	}
+        @Override
+        @SuppressWarnings("unchecked")
+        public void setup ( BSPPeer<MRContainer,MRContainer,MRContainer,MRContainer,MRContainer> peer ) {
+            try {
+                super.setup(peer);
+                Configuration conf = peer.getConfiguration();
+                Config.read(conf);
+                if (Plan.conf == null)
+                    Plan.conf = conf;
+                all_peer_names = peer.getAllPeerNames();
+                all_peers = new BSPPeerImpl[all_peer_names.length];
+                Arrays.sort(all_peer_names);  // is this necessary?
+                source_num = conf.getInt("mrql.output.tag",0);
+                Tree code = Tree.parse(conf.get("mrql.superstep"));
+                superstep_fnc = functional_argument(conf,code);
+                code = Tree.parse(conf.get("mrql.initial.state"));
+                state = Interpreter.evalE(code);
+                if (conf.get("mrql.zero") != null && !conf.get("mrql.zero").equals("")) {
+                    code = Tree.parse(conf.get("mrql.zero"));
+                    acc_result = Interpreter.evalE(code);
+                    code = Tree.parse(conf.get("mrql.accumulator"));
+                    acc_fnc = functional_argument(conf,code);
+                } else acc_result = null;
+                orderp = conf.getBoolean("mrql.orderp",false);
+                masterTask = all_peer_names[peer.getNumPeers()/2];
+                msg_cache = new Bag(1000);
+                local_cache = new Tuple(max_input_files);
+                for ( int i = 0; i < max_input_files; i++ )
+                    local_cache.set(i,new Bag());
+            } catch (Exception e) {
+                e.printStackTrace();
+                throw new Error("Cannot setup the Hama BSP job: "+e);
+            }
+        }
 
-	@Override
-	public void cleanup ( BSPPeer<MRContainer,MRContainer,MRContainer,MRContainer,MRContainer> peer ) throws IOException {
-	    if (!Config.local_mode)
-		clean();
-	    local_cache = null;
-	    super.cleanup(peer);
-	}
+        @Override
+        public void cleanup ( BSPPeer<MRContainer,MRContainer,MRContainer,MRContainer,MRContainer> peer ) throws IOException {
+            if (!Config.local_mode)
+                clean();
+            local_cache = null;
+            super.cleanup(peer);
+        }
     }
 
     /** set Hama's min split size and number of BSP tasks (doesn't work with Hama 0.6.0) */
     public static void setupSplits ( BSPJob job, DataSet ds ) throws IOException {
-	long[] sizes = new long[ds.source.size()];
-	if (sizes.length > Config.nodes)
-	    throw new Error("Cannot distribute "+sizes.length+" files over "+Config.nodes+" BSP tasks");
-	for ( int i = 0; i < sizes.length; i++ )
-	    sizes[i] = ds.source.get(i).size(Plan.conf);
-	long total_size = 0;
-	for ( long size: sizes )
-	    total_size += size;
-	long split_size = Math.max(total_size/Config.nodes,100000);
-	int tasks = 0;
-	do {  // adjust split_size
-	    tasks = 0;
-	    for ( long size: sizes )
-		tasks += (int)Math.ceil(size/(double)split_size);
-	    if (tasks > Config.nodes)
-		split_size = (long)Math.ceil((double)split_size*1.01);
-	} while (tasks > Config.nodes);
-	job.setNumBspTask(tasks);
-	System.err.println("*** Using "+tasks+" BSP tasks (out of a max "+Config.nodes+")."
-			   +" Each task will handle about "+Math.min(total_size/Config.nodes,split_size)
-			   +" bytes of input data.");
-	job.set("bsp.min.split.size",Long.toString(split_size));
+        long[] sizes = new long[ds.source.size()];
+        if (sizes.length > Config.nodes)
+            throw new Error("Cannot distribute "+sizes.length+" files over "+Config.nodes+" BSP tasks");
+        for ( int i = 0; i < sizes.length; i++ )
+            sizes[i] = ds.source.get(i).size(Plan.conf);
+        long total_size = 0;
+        for ( long size: sizes )
+            total_size += size;
+        long split_size = Math.max(total_size/Config.nodes,100000);
+        int tasks = 0;
+        do {  // adjust split_size
+            tasks = 0;
+            for ( long size: sizes )
+                tasks += (int)Math.ceil(size/(double)split_size);
+            if (tasks > Config.nodes)
+                split_size = (long)Math.ceil((double)split_size*1.01);
+        } while (tasks > Config.nodes);
+        job.setNumBspTask(tasks);
+        System.err.println("*** Using "+tasks+" BSP tasks (out of a max "+Config.nodes+")."
+                           +" Each task will handle about "+Math.min(total_size/Config.nodes,split_size)
+                           +" bytes of input data.");
+        job.set("bsp.min.split.size",Long.toString(split_size));
     }
 
     /** Evaluate a BSP operation that returns a DataSet
@@ -401,52 +401,52 @@ final public class BSPPlan extends Plan {
      * @return a new data source that contains the result
      */
     public final static MRData BSP ( int[] source_nums, // output tags
-				     Tree superstep,    // superstep function
-				     Tree init_state,   // initial state
-				     boolean orderp,    // do we need to order the result?
-				     DataSet source     // input dataset
-				     ) throws Exception {
-	String[] newpaths = new String[source_nums.length];
-	newpaths[0] = new_path(conf);
-	conf.set("mrql.output.paths",source_nums[0]+":"+newpaths[0]);
-	for ( int i = 1; i < source_nums.length; i++ ) {
-	    newpaths[i] = new_path(conf);
-	    Path path = new Path(newpaths[1]);
-	    FileSystem fs = path.getFileSystem(conf);
-	    fs.mkdirs(path);
-	    conf.set("mrql.output.paths",conf.get("mrql.output.paths")+","+source_nums[i]+":"+newpaths[i]);
-	};
-	conf.set("mrql.superstep",superstep.toString());
-	conf.set("mrql.initial.state",init_state.toString());
-	conf.set("mrql.zero","");
-	conf.setInt("mrql.output.tag",source_nums[0]);
-	conf.setBoolean("mrql.orderp",orderp);
-	BSPJob job = new BSPJob((HamaConfiguration)conf,BSPop.class);
-	setupSplits(job,source);
-	job.setJobName(newpaths[0]);
-	distribute_compiled_arguments(getConfiguration(job));
-	job.setBspClass(BSPop.class);
-	Path outpath = new Path(newpaths[0]);
-	job.setOutputPath(outpath);
-	job.setOutputKeyClass(MRContainer.class);
-	job.setOutputValueClass(MRContainer.class);
-	job.setOutputFormat(SequenceFileOutputFormat.class);
-	job.setInputFormat(MultipleBSPInput.class);
-	FileInputFormat.setInputPaths(job,source.merge());
-	job.waitForCompletion(true);
-	if (source_nums.length == 1) {
-	    BinaryDataSource ds = new BinaryDataSource(source_nums[0],newpaths[0],conf);
-	    ds.to_be_merged = orderp;
-	    return new MR_dataset(new DataSet(ds,0,3));
-	} else {
-	    MRData[] s = new MRData[source_nums.length];
-	    for ( int i = 0; i < source_nums.length; i++ ) {
-		BinaryDataSource ds = new BinaryDataSource(source_nums[i],newpaths[i],conf);
-		ds.to_be_merged = orderp;
-		s[i] = new MR_dataset(new DataSet(ds,0,3));
-	    };
-	    return new Tuple(s);
-	}
+                                     Tree superstep,    // superstep function
+                                     Tree init_state,   // initial state
+                                     boolean orderp,    // do we need to order the result?
+                                     DataSet source     // input dataset
+                                     ) throws Exception {
+        String[] newpaths = new String[source_nums.length];
+        newpaths[0] = new_path(conf);
+        conf.set("mrql.output.paths",source_nums[0]+":"+newpaths[0]);
+        for ( int i = 1; i < source_nums.length; i++ ) {
+            newpaths[i] = new_path(conf);
+            Path path = new Path(newpaths[1]);
+            FileSystem fs = path.getFileSystem(conf);
+            fs.mkdirs(path);
+            conf.set("mrql.output.paths",conf.get("mrql.output.paths")+","+source_nums[i]+":"+newpaths[i]);
+        };
+        conf.set("mrql.superstep",superstep.toString());
+        conf.set("mrql.initial.state",init_state.toString());
+        conf.set("mrql.zero","");
+        conf.setInt("mrql.output.tag",source_nums[0]);
+        conf.setBoolean("mrql.orderp",orderp);
+        BSPJob job = new BSPJob((HamaConfiguration)conf,BSPop.class);
+        setupSplits(job,source);
+        job.setJobName(newpaths[0]);
+        distribute_compiled_arguments(getConfiguration(job));
+        job.setBspClass(BSPop.class);
+        Path outpath = new Path(newpaths[0]);
+        job.setOutputPath(outpath);
+        job.setOutputKeyClass(MRContainer.class);
+        job.setOutputValueClass(MRContainer.class);
+        job.setOutputFormat(SequenceFileOutputFormat.class);
+        job.setInputFormat(MultipleBSPInput.class);
+        FileInputFormat.setInputPaths(job,source.merge());
+        job.waitForCompletion(true);
+        if (source_nums.length == 1) {
+            BinaryDataSource ds = new BinaryDataSource(source_nums[0],newpaths[0],conf);
+            ds.to_be_merged = orderp;
+            return new MR_dataset(new DataSet(ds,0,3));
+        } else {
+            MRData[] s = new MRData[source_nums.length];
+            for ( int i = 0; i < source_nums.length; i++ ) {
+                BinaryDataSource ds = new BinaryDataSource(source_nums[i],newpaths[i],conf);
+                ds.to_be_merged = orderp;
+                s[i] = new MR_dataset(new DataSet(ds,0,3));
+            };
+            return new Tuple(s);
+        }
     }
 
     /** Evaluate a BSP operation that aggregates the results
@@ -459,43 +459,43 @@ final public class BSPPlan extends Plan {
      * @return the aggregation result
      */
     public final static MRData BSPaggregate ( int source_num,   // output tag
-					      Tree superstep,   // superstep function
-					      Tree init_state,  // initial state
-					      Tree acc_fnc,     // accumulator function
-					      Tree zero,        // zero value for the accumulator
-					      DataSet source    // input dataset
-					      ) throws Exception {
-	String newpath = new_path(conf);
-	conf.set("mrql.superstep",superstep.toString());
-	conf.set("mrql.initial.state",init_state.toString());
-	conf.set("mrql.accumulator",acc_fnc.toString());
-	conf.set("mrql.zero",zero.toString());
-	conf.setInt("mrql.output.tag",source_num);
-	conf.setBoolean("mrql.orderp",false);
-	BSPJob job = new BSPJob((HamaConfiguration)conf,BSPop.class);
-	setupSplits(job,source);
-	job.setJobName(newpath);
-	distribute_compiled_arguments(getConfiguration(job));
-	job.setBspClass(BSPop.class);
-	Path outpath = new Path(newpath);
-	job.setOutputPath(outpath);
-	job.setOutputKeyClass(MRContainer.class);
-	job.setOutputValueClass(MRContainer.class);
-	job.setOutputFormat(SequenceFileOutputFormat.class);
-	job.setInputFormat(MultipleBSPInput.class);
-	FileInputFormat.setInputPaths(job,source.merge());
-	job.waitForCompletion(true);
-	FileSystem fs = outpath.getFileSystem(conf);
-	FileStatus[] files = fs.listStatus(outpath);
-	for ( int i = 0; i < files.length; i++ ) {
-	    SequenceFile.Reader sreader = new SequenceFile.Reader(fs,files[i].getPath(),conf);
-	    MRContainer key = new MRContainer();
-	    MRContainer value = new MRContainer();
-	    sreader.next(key,value);
-	    sreader.close();
-	    if (value.data() != null)
-		return value.data();
-	};
-	return null;
+                                              Tree superstep,   // superstep function
+                                              Tree init_state,  // initial state
+                                              Tree acc_fnc,     // accumulator function
+                                              Tree zero,        // zero value for the accumulator
+                                              DataSet source    // input dataset
+                                              ) throws Exception {
+        String newpath = new_path(conf);
+        conf.set("mrql.superstep",superstep.toString());
+        conf.set("mrql.initial.state",init_state.toString());
+        conf.set("mrql.accumulator",acc_fnc.toString());
+        conf.set("mrql.zero",zero.toString());
+        conf.setInt("mrql.output.tag",source_num);
+        conf.setBoolean("mrql.orderp",false);
+        BSPJob job = new BSPJob((HamaConfiguration)conf,BSPop.class);
+        setupSplits(job,source);
+        job.setJobName(newpath);
+        distribute_compiled_arguments(getConfiguration(job));
+        job.setBspClass(BSPop.class);
+        Path outpath = new Path(newpath);
+        job.setOutputPath(outpath);
+        job.setOutputKeyClass(MRContainer.class);
+        job.setOutputValueClass(MRContainer.class);
+        job.setOutputFormat(SequenceFileOutputFormat.class);
+        job.setInputFormat(MultipleBSPInput.class);
+        FileInputFormat.setInputPaths(job,source.merge());
+        job.waitForCompletion(true);
+        FileSystem fs = outpath.getFileSystem(conf);
+        FileStatus[] files = fs.listStatus(outpath);
+        for ( int i = 0; i < files.length; i++ ) {
+            SequenceFile.Reader sreader = new SequenceFile.Reader(fs,files[i].getPath(),conf);
+            MRContainer key = new MRContainer();
+            MRContainer value = new MRContainer();
+            sreader.next(key,value);
+            sreader.close();
+            if (value.data() != null)
+                return value.data();
+        };
+        return null;
     }
  }

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/3de9e485/src/main/java/BSP/BinaryInputFormat.java
----------------------------------------------------------------------
diff --git a/src/main/java/BSP/BinaryInputFormat.java b/src/main/java/BSP/BinaryInputFormat.java
index 27f62ee..7f2229a 100644
--- a/src/main/java/BSP/BinaryInputFormat.java
+++ b/src/main/java/BSP/BinaryInputFormat.java
@@ -29,32 +29,32 @@ import org.apache.hama.HamaConfiguration;
 /** Input format for hadoop sequence files */
 final public class BinaryInputFormat extends MRQLFileInputFormat {
     public static class BinaryInputRecordReader extends SequenceFileRecordReader<MRContainer,MRContainer> {
-	final MRContainer result = new MRContainer();
-	final MRData source_num_data;
-	final int source_number;
+        final MRContainer result = new MRContainer();
+        final MRData source_num_data;
+        final int source_number;
 
-	public BinaryInputRecordReader ( FileSplit split,
-					 BSPJob job,
-					 int source_number ) throws IOException {
-	    super(BSPPlan.getConfiguration(job),split);
-	    this.source_number = source_number;
-	    source_num_data = new MR_int(source_number);
-	}
+        public BinaryInputRecordReader ( FileSplit split,
+                                         BSPJob job,
+                                         int source_number ) throws IOException {
+            super(BSPPlan.getConfiguration(job),split);
+            this.source_number = source_number;
+            source_num_data = new MR_int(source_number);
+        }
 
-	@Override
-	public synchronized boolean next ( MRContainer key, MRContainer value ) throws IOException {
-		boolean b = super.next(key,result);
-		value.set(new Tuple(source_num_data,result.data()));
-		return b;
-	}
+        @Override
+        public synchronized boolean next ( MRContainer key, MRContainer value ) throws IOException {
+                boolean b = super.next(key,result);
+                value.set(new Tuple(source_num_data,result.data()));
+                return b;
+        }
     }
 
     public RecordReader<MRContainer,MRContainer>
-	      getRecordReader ( InputSplit split,
-				BSPJob job ) throws IOException {
-	Configuration conf = BSPPlan.getConfiguration(job);
-	String path = ((FileSplit)split).getPath().toString();
-	BinaryDataSource ds = (BinaryDataSource)DataSource.get(path,conf);
-	return new BinaryInputRecordReader((FileSplit)split,job,ds.source_num);
+              getRecordReader ( InputSplit split,
+                                BSPJob job ) throws IOException {
+        Configuration conf = BSPPlan.getConfiguration(job);
+        String path = ((FileSplit)split).getPath().toString();
+        BinaryDataSource ds = (BinaryDataSource)DataSource.get(path,conf);
+        return new BinaryInputRecordReader((FileSplit)split,job,ds.source_num);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/3de9e485/src/main/java/BSP/Evaluator.gen
----------------------------------------------------------------------
diff --git a/src/main/java/BSP/Evaluator.gen b/src/main/java/BSP/Evaluator.gen
index 5331ab0..c0aabb6 100644
--- a/src/main/java/BSP/Evaluator.gen
+++ b/src/main/java/BSP/Evaluator.gen
@@ -29,8 +29,8 @@ final public class Evaluator extends Interpreter {
 
     /** initialize the BSP evaluator */
     final static void init ( Configuration conf ) {
-	Config.bsp_mode = true;
-	if (Config.hadoop_mode)
+        Config.bsp_mode = true;
+        if (Config.hadoop_mode)
             if (Config.local_mode) {
                 conf.set("bsp.master.address","local");
                 conf.set("hama.zookeeper.quorum","localhost");
@@ -52,7 +52,7 @@ final public class Evaluator extends Interpreter {
     }
 
     final static Configuration new_configuration () {
-	return new HamaConfiguration();
+        return new HamaConfiguration();
     }
 
     public static MR_bool synchronize ( MR_string peerName, MR_bool mr_exit ) {
@@ -73,30 +73,30 @@ final public class Evaluator extends Interpreter {
      * @return a DataSet (stored in HDFS)
      */
     final static MRData bsp ( Tree plan, Environment env ) throws Exception {
-	match plan {
-	case BSP(tuple(...ns),`superstep,`init_state,`o,...S):
-	    int[] os = new int[ns.length()];
-	    for ( int i = 0; i < os.length; i++ )
-		os[i] = (int)((LongLeaf)ns.nth(i)).value();
-	    DataSet ds = eval(S.head(),env);
-	    for ( Tree s: S.tail() )
-		ds.merge(eval(s,env));
-	    return BSPPlan.BSP(os,
-			       closure(superstep,env),
-			       init_state,
-			       o.equals(#<true>),
-			       ds);
-	case BSP(`n,`superstep,`init_state,`o,...S):
-	    DataSet ds = eval(S.head(),env);
-	    for ( Tree s: S.tail() )
-		ds.merge(eval(s,env));
-	    return BSPPlan.BSP(new int[]{(int)((LongLeaf)n).value()},
-			       closure(superstep,env),
-			       init_state,
-			       o.equals(#<true>),
-			       ds);
-	}
-	throw new Error("Cannot perform the BSP plan: "+plan);
+        match plan {
+        case BSP(tuple(...ns),`superstep,`init_state,`o,...S):
+            int[] os = new int[ns.length()];
+            for ( int i = 0; i < os.length; i++ )
+                os[i] = (int)((LongLeaf)ns.nth(i)).value();
+            DataSet ds = eval(S.head(),env);
+            for ( Tree s: S.tail() )
+                ds.merge(eval(s,env));
+            return BSPPlan.BSP(os,
+                               closure(superstep,env),
+                               init_state,
+                               o.equals(#<true>),
+                               ds);
+        case BSP(`n,`superstep,`init_state,`o,...S):
+            DataSet ds = eval(S.head(),env);
+            for ( Tree s: S.tail() )
+                ds.merge(eval(s,env));
+            return BSPPlan.BSP(new int[]{(int)((LongLeaf)n).value()},
+                               closure(superstep,env),
+                               init_state,
+                               o.equals(#<true>),
+                               ds);
+        }
+        throw new Error("Cannot perform the BSP plan: "+plan);
     }
 
     /** The Aggregate physical operator
@@ -107,32 +107,32 @@ final public class Evaluator extends Interpreter {
      * @return the aggregation result of type T
      */
     final static MRData aggregate ( Tree acc_fnc,
-				    Tree zero,
-				    Tree plan,
-				    Environment env ) throws Exception {
-	match plan {
-	case BSP(`n,`superstep,`init_state,`o,...S):
-	    DataSet ds = eval(S.head(),env,"-");
-	    for ( Tree s: S.tail() )
-		ds.merge(eval(s,env,"-"));
-	    return BSPPlan.BSPaggregate((int)((LongLeaf)n).value(),
-					closure(superstep,env),
-					init_state,
-					closure(acc_fnc,env),
-					zero,
-					ds);
-	}
-	throw new Error("Cannot perform the aggregation: "+plan);
+                                    Tree zero,
+                                    Tree plan,
+                                    Environment env ) throws Exception {
+        match plan {
+        case BSP(`n,`superstep,`init_state,`o,...S):
+            DataSet ds = eval(S.head(),env,"-");
+            for ( Tree s: S.tail() )
+                ds.merge(eval(s,env,"-"));
+            return BSPPlan.BSPaggregate((int)((LongLeaf)n).value(),
+                                        closure(superstep,env),
+                                        init_state,
+                                        closure(acc_fnc,env),
+                                        zero,
+                                        ds);
+        }
+        throw new Error("Cannot perform the aggregation: "+plan);
     }
 
     final static Tuple loop ( Tree e, Environment env ) {
-	throw new Error("The BSP Loop was supposed to be translated to a BSP task");
+        throw new Error("The BSP Loop was supposed to be translated to a BSP task");
     }
 
     final static DataSet eval ( final Tree e,
-				final Environment env,
-				final String counter ) {
-	return eval(e,env);
+                                final Environment env,
+                                final String counter ) {
+        return eval(e,env);
     }
 
     /** Evaluate MRQL BSP physical operators using Hama (returns a DataSet)
@@ -142,20 +142,20 @@ final public class Evaluator extends Interpreter {
      * @return a DataSet (stored in HDFS)
      */
     final static DataSet eval ( final Tree e, final Environment env ) {
-	if (Config.trace_execution) {
-	    tab_count += 3;
-	    System.out.println(tabs(tab_count)+print_query(e));
-	};
-	DataSet res = evalD(e,env);
-	if (Config.trace_execution) 
-	    try {
-		System.out.println(tabs(tab_count)
-				   +"-> "+Plan.collect(res,false));
-		tab_count -= 3;
-	    } catch (Exception ex) {
-		throw new Error("Cannot collect the operator output: "+e);
-	    };
-	return res;
+        if (Config.trace_execution) {
+            tab_count += 3;
+            System.out.println(tabs(tab_count)+print_query(e));
+        };
+        DataSet res = evalD(e,env);
+        if (Config.trace_execution) 
+            try {
+                System.out.println(tabs(tab_count)
+                                   +"-> "+Plan.collect(res,false));
+                tab_count -= 3;
+            } catch (Exception ex) {
+                throw new Error("Cannot collect the operator output: "+e);
+            };
+        return res;
     }
 
     /** Evaluate MRQL BSP physical operators using Hama (returns a DataSet)
@@ -164,61 +164,61 @@ final public class Evaluator extends Interpreter {
      * @return a DataSet (stored in HDFS)
      */
     final static DataSet evalD ( final Tree e, final Environment env ) {
-	try {
-	    match e {
-	    case BSPSource(`n,BinarySource(`file,_)):
-		return Plan.binarySource((int)((LongLeaf)n).value(),file.stringValue());
-	    case BSPSource(`n,ParsedSource(`parser,`file,...args)):
-		Class<? extends Parser> p = DataSource.parserDirectory.get(parser.toString());
-		if (p == null)
-		    throw new Error("Unknown parser: "+parser);
-		return Plan.parsedSource((int)((LongLeaf)n).value(),
-					 p,((MR_string)evalE(file,env)).get(),args);
-	    case BSPSource(`n,Generator(`min,`max,`size)):
-		return Plan.generator((int)((LongLeaf)n).value(),
-				      ((MR_long)evalE(min,env)).get(),
-				      ((MR_long)evalE(max,env)).get(),
-				      ((MR_long)evalE(size,env)).get());
-	    case Merge(`x,`y):
-		return Plan.merge(eval(x,env),eval(y,env));
-	    case Dump(`s):
-		return Plan.fileCache((Bag)evalE(s,env));
-	    case apply(`f,`arg):
-		if (!f.is_variable())
-		    return ((MR_dataset)evalF(f,env).eval(evalE(arg))).dataset();
-		MRData fnc = variable_lookup(f.toString(),global_env);
-		if (fnc == null)
-		    throw new Error("Unknown function: "+f);
-		MRData t = evalE(arg,env);
-		if (!(t instanceof Tuple))
-		    throw new Error("Expected a tuple in function application: "+t);
-		return ((MR_dataset)((Lambda)fnc).lambda().eval(t)).dataset();
-	    case `v:
-		if (!v.is_variable())
-		    fail;
-		MRData x = variable_lookup(v.toString(),env);
-		if (x != null)
-		    if (x instanceof MR_dataset)
-			return ((MR_dataset)x).dataset();
-		x = variable_lookup(v.toString(),global_env);
-		if (x != null)
-		    if (x instanceof MR_dataset)
-			return ((MR_dataset)x).dataset();
-		throw new Error("Variable "+v+" is not bound");
-	    };
-	    MRData d = bsp(e,env);
-	    if (d instanceof MR_dataset)
-		return ((MR_dataset)d).dataset();
-	    throw new Error("Cannot evaluate the BSP plan: "+e);
-	} catch (Error msg) {
-	    if (!Config.trace)
-		throw new Error(msg.getMessage());
-	    System.err.println(msg.getMessage());
-	    throw new Error("Evaluation error in: "+print_query(e));
-	} catch (Exception ex) {
-	    System.err.println(ex.getMessage());
-	    ex.printStackTrace();
-	    throw new Error("Evaluation error in: "+print_query(e));
-	}
+        try {
+            match e {
+            case BSPSource(`n,BinarySource(`file,_)):
+                return Plan.binarySource((int)((LongLeaf)n).value(),file.stringValue());
+            case BSPSource(`n,ParsedSource(`parser,`file,...args)):
+                Class<? extends Parser> p = DataSource.parserDirectory.get(parser.toString());
+                if (p == null)
+                    throw new Error("Unknown parser: "+parser);
+                return Plan.parsedSource((int)((LongLeaf)n).value(),
+                                         p,((MR_string)evalE(file,env)).get(),args);
+            case BSPSource(`n,Generator(`min,`max,`size)):
+                return Plan.generator((int)((LongLeaf)n).value(),
+                                      ((MR_long)evalE(min,env)).get(),
+                                      ((MR_long)evalE(max,env)).get(),
+                                      ((MR_long)evalE(size,env)).get());
+            case Merge(`x,`y):
+                return Plan.merge(eval(x,env),eval(y,env));
+            case Dump(`s):
+                return Plan.fileCache((Bag)evalE(s,env));
+            case apply(`f,`arg):
+                if (!f.is_variable())
+                    return ((MR_dataset)evalF(f,env).eval(evalE(arg))).dataset();
+                MRData fnc = variable_lookup(f.toString(),global_env);
+                if (fnc == null)
+                    throw new Error("Unknown function: "+f);
+                MRData t = evalE(arg,env);
+                if (!(t instanceof Tuple))
+                    throw new Error("Expected a tuple in function application: "+t);
+                return ((MR_dataset)((Lambda)fnc).lambda().eval(t)).dataset();
+            case `v:
+                if (!v.is_variable())
+                    fail;
+                MRData x = variable_lookup(v.toString(),env);
+                if (x != null)
+                    if (x instanceof MR_dataset)
+                        return ((MR_dataset)x).dataset();
+                x = variable_lookup(v.toString(),global_env);
+                if (x != null)
+                    if (x instanceof MR_dataset)
+                        return ((MR_dataset)x).dataset();
+                throw new Error("Variable "+v+" is not bound");
+            };
+            MRData d = bsp(e,env);
+            if (d instanceof MR_dataset)
+                return ((MR_dataset)d).dataset();
+            throw new Error("Cannot evaluate the BSP plan: "+e);
+        } catch (Error msg) {
+            if (!Config.trace)
+                throw new Error(msg.getMessage());
+            System.err.println(msg.getMessage());
+            throw new Error("Evaluation error in: "+print_query(e));
+        } catch (Exception ex) {
+            System.err.println(ex.getMessage());
+            ex.printStackTrace();
+            throw new Error("Evaluation error in: "+print_query(e));
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/3de9e485/src/main/java/BSP/GeneratorInputFormat.java
----------------------------------------------------------------------
diff --git a/src/main/java/BSP/GeneratorInputFormat.java b/src/main/java/BSP/GeneratorInputFormat.java
index cb13a83..ac55964 100644
--- a/src/main/java/BSP/GeneratorInputFormat.java
+++ b/src/main/java/BSP/GeneratorInputFormat.java
@@ -29,61 +29,61 @@ import org.apache.hama.bsp.*;
  *  an (offset,size) pair that generates the range of values [offset,offset+size] */
 final public class GeneratorInputFormat extends MRQLFileInputFormat {
     public static class GeneratorRecordReader implements RecordReader<MRContainer,MRContainer> {
-	final long offset;
-	final long size;
-	final int source_number;
-	final MRData source_num_data;
-	long index;
-	SequenceFile.Reader reader;
+        final long offset;
+        final long size;
+        final int source_number;
+        final MRData source_num_data;
+        long index;
+        SequenceFile.Reader reader;
 
-	public GeneratorRecordReader ( FileSplit split,
-				       int source_number,
-				       BSPJob job ) throws IOException {
-	    Configuration conf = BSPPlan.getConfiguration(job);
-	    Path path = split.getPath();
-	    FileSystem fs = path.getFileSystem(conf);
-	    reader = new SequenceFile.Reader(path.getFileSystem(conf),path,conf);
-	    MRContainer key = new MRContainer();
-	    MRContainer value = new MRContainer();
-	    reader.next(key,value);
-	    offset = ((MR_long)((Tuple)(value.data())).first()).get();
-	    size = ((MR_long)((Tuple)(value.data())).second()).get();
-	    this.source_number = source_number;
-	    source_num_data = new MR_int(source_number);
-	    index = -1;
-	}
+        public GeneratorRecordReader ( FileSplit split,
+                                       int source_number,
+                                       BSPJob job ) throws IOException {
+            Configuration conf = BSPPlan.getConfiguration(job);
+            Path path = split.getPath();
+            FileSystem fs = path.getFileSystem(conf);
+            reader = new SequenceFile.Reader(path.getFileSystem(conf),path,conf);
+            MRContainer key = new MRContainer();
+            MRContainer value = new MRContainer();
+            reader.next(key,value);
+            offset = ((MR_long)((Tuple)(value.data())).first()).get();
+            size = ((MR_long)((Tuple)(value.data())).second()).get();
+            this.source_number = source_number;
+            source_num_data = new MR_int(source_number);
+            index = -1;
+        }
 
-	public MRContainer createKey () {
-	    return new MRContainer(null);
-	}
+        public MRContainer createKey () {
+            return new MRContainer(null);
+        }
 
-	public MRContainer createValue () {
-	    return new MRContainer(null);
-	}
+        public MRContainer createValue () {
+            return new MRContainer(null);
+        }
 
-	public boolean next ( MRContainer key, MRContainer value ) throws IOException {
-	    index++;
-	    value.set(new Tuple(source_num_data,new MR_long(offset+index)));
-	    key.set(new MR_long(index));
-	    return index < size;
-	}
+        public boolean next ( MRContainer key, MRContainer value ) throws IOException {
+            index++;
+            value.set(new Tuple(source_num_data,new MR_long(offset+index)));
+            key.set(new MR_long(index));
+            return index < size;
+        }
 
-	public long getPos () throws IOException { return index; }
+        public long getPos () throws IOException { return index; }
 
-	public void close () throws IOException { reader.close(); }
+        public void close () throws IOException { reader.close(); }
 
-	public float getProgress () throws IOException {
-	    return index / (float)size;
-	}
+        public float getProgress () throws IOException {
+            return index / (float)size;
+        }
 
-	public void initialize ( InputSplit split, TaskAttemptContext context ) throws IOException { }
+        public void initialize ( InputSplit split, TaskAttemptContext context ) throws IOException { }
     }
 
     public RecordReader<MRContainer,MRContainer>
-	        getRecordReader ( InputSplit split, BSPJob job ) throws IOException {
-	Configuration conf = BSPPlan.getConfiguration(job);
-	String path = ((FileSplit)split).getPath().toString();
-	GeneratorDataSource ds = (GeneratorDataSource)DataSource.get(path,conf);
-	return new GeneratorRecordReader((FileSplit)split,ds.source_num,job);
+                getRecordReader ( InputSplit split, BSPJob job ) throws IOException {
+        Configuration conf = BSPPlan.getConfiguration(job);
+        String path = ((FileSplit)split).getPath().toString();
+        GeneratorDataSource ds = (GeneratorDataSource)DataSource.get(path,conf);
+        return new GeneratorRecordReader((FileSplit)split,ds.source_num,job);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/3de9e485/src/main/java/BSP/MRQLFileInputFormat.java
----------------------------------------------------------------------
diff --git a/src/main/java/BSP/MRQLFileInputFormat.java b/src/main/java/BSP/MRQLFileInputFormat.java
index c7a7abf..73c1fbb 100644
--- a/src/main/java/BSP/MRQLFileInputFormat.java
+++ b/src/main/java/BSP/MRQLFileInputFormat.java
@@ -31,38 +31,38 @@ abstract public class MRQLFileInputFormat extends FileInputFormat<MRContainer,MR
 
     /** record reader for map-reduce */
     abstract public RecordReader<MRContainer,MRContainer>
-	getRecordReader ( InputSplit split, BSPJob job ) throws IOException;
+        getRecordReader ( InputSplit split, BSPJob job ) throws IOException;
 
     /** materialize the input file into a memory Bag */
     public Bag materialize ( final Path file ) throws Exception {
-	final BSPJob job = new BSPJob((HamaConfiguration)Plan.conf,MRQLFileInputFormat.class);
-	job.setInputPath(file);
-	final InputSplit[] splits = getSplits(job,1);
-	final RecordReader<MRContainer,MRContainer> rd = getRecordReader(splits[0],job);
-	return new Bag(new BagIterator () {
-		RecordReader<MRContainer,MRContainer> reader = rd;
-		MRContainer key = reader.createKey();
-		MRContainer value = reader.createKey();
-		int i = 0;
-		public boolean hasNext () {
-		    try {
-			if (reader.next(key,value))
-			    return true;
-			do {
-			    if (++i >= splits.length)
-				return false;
-			    reader.close();
-			    reader = getRecordReader(splits[i],job);
-			} while (!reader.next(key,value));
-			return true;
-		    } catch (IOException e) {
-			throw new Error("Cannot collect values from an intermediate result");
-		    }
-		}
-		public MRData next () {
-		    return value.data();
-		}
-	    });
+        final BSPJob job = new BSPJob((HamaConfiguration)Plan.conf,MRQLFileInputFormat.class);
+        job.setInputPath(file);
+        final InputSplit[] splits = getSplits(job,1);
+        final RecordReader<MRContainer,MRContainer> rd = getRecordReader(splits[0],job);
+        return new Bag(new BagIterator () {
+                RecordReader<MRContainer,MRContainer> reader = rd;
+                MRContainer key = reader.createKey();
+                MRContainer value = reader.createKey();
+                int i = 0;
+                public boolean hasNext () {
+                    try {
+                        if (reader.next(key,value))
+                            return true;
+                        do {
+                            if (++i >= splits.length)
+                                return false;
+                            reader.close();
+                            reader = getRecordReader(splits[i],job);
+                        } while (!reader.next(key,value));
+                        return true;
+                    } catch (IOException e) {
+                        throw new Error("Cannot collect values from an intermediate result");
+                    }
+                }
+                public MRData next () {
+                    return value.data();
+                }
+            });
     }
 
     /** materialize the entire dataset into a Bag
@@ -71,37 +71,37 @@ abstract public class MRQLFileInputFormat extends FileInputFormat<MRContainer,MR
      * @return the Bag that contains the collected values
      */
     public final static Bag collect ( final DataSet x, boolean strip ) throws Exception {
-	Bag res = new Bag();
-	for ( DataSource s: x.source )
-	    if (s.to_be_merged)
-		res = res.union(Plan.merge(s));
-	    else {
-		Path path = new Path(s.path);
-		final FileSystem fs = path.getFileSystem(Plan.conf);
-		final FileStatus[] ds
-		    = fs.listStatus(path,
-				    new PathFilter () {
-			                public boolean accept ( Path path ) {
-					    return !path.getName().startsWith("_");
-					}
-				    });
-		Bag b = new Bag();
-		for ( FileStatus st: ds )
-		    b = b.union(s.inputFormat.newInstance().materialize(st.getPath()));
-		if (strip) {
-		    // remove source_num
-		    final Iterator<MRData> iter = b.iterator();
-		    b = new Bag(new BagIterator() {
-			    public boolean hasNext () {
-				return iter.hasNext();
-			    }
-			    public MRData next () {
-				return ((Tuple)iter.next()).get(1);
-			    }
-			});
-		};
-		res = res.union(b);
-	    };
-	return res;
+        Bag res = new Bag();
+        for ( DataSource s: x.source )
+            if (s.to_be_merged)
+                res = res.union(Plan.merge(s));
+            else {
+                Path path = new Path(s.path);
+                final FileSystem fs = path.getFileSystem(Plan.conf);
+                final FileStatus[] ds
+                    = fs.listStatus(path,
+                                    new PathFilter () {
+                                        public boolean accept ( Path path ) {
+                                            return !path.getName().startsWith("_");
+                                        }
+                                    });
+                Bag b = new Bag();
+                for ( FileStatus st: ds )
+                    b = b.union(s.inputFormat.newInstance().materialize(st.getPath()));
+                if (strip) {
+                    // remove source_num
+                    final Iterator<MRData> iter = b.iterator();
+                    b = new Bag(new BagIterator() {
+                            public boolean hasNext () {
+                                return iter.hasNext();
+                            }
+                            public MRData next () {
+                                return ((Tuple)iter.next()).get(1);
+                            }
+                        });
+                };
+                res = res.union(b);
+            };
+        return res;
     }
 }