You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mrql.apache.org by fe...@apache.org on 2014/03/13 15:24:38 UTC

[13/26] MRQL-32: Refactoring directory structure for Eclipse

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/src/main/java/BSP/BSPGeneratorInputFormat.java
----------------------------------------------------------------------
diff --git a/src/main/java/BSP/BSPGeneratorInputFormat.java b/src/main/java/BSP/BSPGeneratorInputFormat.java
deleted file mode 100644
index bdee64e..0000000
--- a/src/main/java/BSP/BSPGeneratorInputFormat.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.mrql;
-
-import org.apache.mrql.gen.Trees;
-import java.io.*;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hama.bsp.*;
-
-
-/** the FileInputFormat for data generators: it creates HDFS files, where each file contains
- *  an (offset,size) pair that generates the range of values [offset,offset+size] */
-final public class BSPGeneratorInputFormat extends BSPMRQLFileInputFormat {
-    public static class GeneratorRecordReader implements RecordReader<MRContainer,MRContainer> {
-        final long offset;
-        final long size;
-        final int source_number;
-        final MRData source_num_data;
-        long index;
-        SequenceFile.Reader reader;
-
-        public GeneratorRecordReader ( FileSplit split,
-                                       int source_number,
-                                       BSPJob job ) throws IOException {
-            Configuration conf = BSPPlan.getConfiguration(job);
-            Path path = split.getPath();
-            FileSystem fs = path.getFileSystem(conf);
-            reader = new SequenceFile.Reader(path.getFileSystem(conf),path,conf);
-            MRContainer key = new MRContainer();
-            MRContainer value = new MRContainer();
-            reader.next(key,value);
-            offset = ((MR_long)((Tuple)(value.data())).first()).get();
-            size = ((MR_long)((Tuple)(value.data())).second()).get();
-            this.source_number = source_number;
-            source_num_data = new MR_int(source_number);
-            index = -1;
-        }
-
-        public MRContainer createKey () {
-            return new MRContainer(null);
-        }
-
-        public MRContainer createValue () {
-            return new MRContainer(null);
-        }
-
-        public boolean next ( MRContainer key, MRContainer value ) throws IOException {
-            index++;
-            value.set(new Tuple(source_num_data,new MR_long(offset+index)));
-            key.set(new MR_long(index));
-            return index < size;
-        }
-
-        public long getPos () throws IOException { return index; }
-
-        public void close () throws IOException { reader.close(); }
-
-        public float getProgress () throws IOException {
-            return index / (float)size;
-        }
-
-        public void initialize ( InputSplit split, TaskAttemptContext context ) throws IOException { }
-    }
-
-    public RecordReader<MRContainer,MRContainer>
-                getRecordReader ( InputSplit split, BSPJob job ) throws IOException {
-        Configuration conf = BSPPlan.getConfiguration(job);
-        String path = ((FileSplit)split).getPath().toString();
-        GeneratorDataSource ds = (GeneratorDataSource)DataSource.get(path,conf);
-        return new GeneratorRecordReader((FileSplit)split,ds.source_num,job);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/src/main/java/BSP/BSPMRQLFileInputFormat.java
----------------------------------------------------------------------
diff --git a/src/main/java/BSP/BSPMRQLFileInputFormat.java b/src/main/java/BSP/BSPMRQLFileInputFormat.java
deleted file mode 100644
index 71518cd..0000000
--- a/src/main/java/BSP/BSPMRQLFileInputFormat.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.mrql;
-
-import java.io.*;
-import java.util.Iterator;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.io.*;
-import org.apache.hama.bsp.*;
-import org.apache.hama.HamaConfiguration;
-
-
-/** A superclass for all MRQL FileInputFormats */
-abstract public class BSPMRQLFileInputFormat extends FileInputFormat<MRContainer,MRContainer> implements MRQLFileInputFormat {
-    public BSPMRQLFileInputFormat () {}
-
-    /** record reader for map-reduce */
-    abstract public RecordReader<MRContainer,MRContainer>
-        getRecordReader ( InputSplit split, BSPJob job ) throws IOException;
-
-    /** materialize the input file into a memory Bag */
-    public Bag materialize ( final Path file ) throws IOException {
-        final BSPJob job = new BSPJob((HamaConfiguration)Plan.conf,MRQLFileInputFormat.class);
-        job.setInputPath(file);
-        final InputSplit[] splits = getSplits(job,1);
-        final RecordReader<MRContainer,MRContainer> rd = getRecordReader(splits[0],job);
-        return new Bag(new BagIterator () {
-                RecordReader<MRContainer,MRContainer> reader = rd;
-                MRContainer key = reader.createKey();
-                MRContainer value = reader.createKey();
-                int i = 0;
-                public boolean hasNext () {
-                    try {
-                        if (reader.next(key,value))
-                            return true;
-                        do {
-                            if (++i >= splits.length)
-                                return false;
-                            reader.close();
-                            reader = getRecordReader(splits[i],job);
-                        } while (!reader.next(key,value));
-                        return true;
-                    } catch (IOException e) {
-                        throw new Error("Cannot collect values from an intermediate result");
-                    }
-                }
-                public MRData next () {
-                    return value.data();
-                }
-            });
-    }
-
-    /** materialize the entire dataset into a Bag
-     * @param x the DataSet in HDFS to collect values from
-     * @param strip true if you want to stripout the source id (used in BSP sources)
-     * @return the Bag that contains the collected values
-     */
-    public final Bag collect ( final DataSet x, boolean strip ) throws Exception {
-        Bag res = new Bag();
-        for ( DataSource s: x.source )
-            if (s.to_be_merged)
-                res = res.union(Plan.merge(s));
-            else {
-                Path path = new Path(s.path);
-                final FileSystem fs = path.getFileSystem(Plan.conf);
-                final FileStatus[] ds
-                    = fs.listStatus(path,
-                                    new PathFilter () {
-                                        public boolean accept ( Path path ) {
-                                            return !path.getName().startsWith("_");
-                                        }
-                                    });
-                Bag b = new Bag();
-                for ( FileStatus st: ds )
-                    b = b.union(s.inputFormat.newInstance().materialize(st.getPath()));
-                if (strip) {
-                    // remove source_num
-                    final Iterator<MRData> iter = b.iterator();
-                    b = new Bag(new BagIterator() {
-                            public boolean hasNext () {
-                                return iter.hasNext();
-                            }
-                            public MRData next () {
-                                return ((Tuple)iter.next()).get(1);
-                            }
-                        });
-                };
-                res = res.union(b);
-            };
-        return res;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/src/main/java/BSP/BSPParsedInputFormat.java
----------------------------------------------------------------------
diff --git a/src/main/java/BSP/BSPParsedInputFormat.java b/src/main/java/BSP/BSPParsedInputFormat.java
deleted file mode 100644
index 7da9a2e..0000000
--- a/src/main/java/BSP/BSPParsedInputFormat.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.mrql;
-
-import org.apache.mrql.gen.Trees;
-import java.io.*;
-import java.util.Iterator;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hama.bsp.*;
-
-
-/** A FileInputFormat for text files (CVS, XML, JSON, ...) */
-final public class BSPParsedInputFormat extends BSPMRQLFileInputFormat {
-    public static class ParsedRecordReader implements RecordReader<MRContainer,MRContainer> {
-        final FSDataInputStream fsin;
-        final long start;
-        final long end;
-        final int source_number;
-        final MRData source_num_data;
-        Iterator<MRData> result;
-        Parser parser;
-
-        public ParsedRecordReader ( FileSplit split,
-                                    BSPJob job,
-                                    Class<? extends Parser> parser_class,
-                                    int source_number,
-                                    Trees args ) throws IOException {
-            Configuration conf = BSPPlan.getConfiguration(job);
-            start = split.getStart();
-            end = start + split.getLength();
-            Path file = split.getPath();
-            FileSystem fs = file.getFileSystem(conf);
-            fsin = fs.open(split.getPath());
-            try {
-                parser = parser_class.newInstance();
-            } catch (Exception ex) {
-                throw new Error("Unrecognized parser:"+parser_class);
-            };
-            this.source_number = source_number;
-            source_num_data = new MR_int(source_number);
-            parser.initialize(args);
-            parser.open(fsin,start,end);
-            result = null;
-        }
-
-        public MRContainer createKey () {
-            return new MRContainer();
-        }
-
-        public MRContainer createValue () {
-            return new MRContainer();
-        }
-
-        public synchronized boolean next ( MRContainer key, MRContainer value ) throws IOException {
-            while (result == null || !result.hasNext()) {
-                String s = parser.slice();
-                if (s == null)
-                    return false;
-                result = parser.parse(s).iterator();
-            };
-            value.set(new Tuple(source_num_data,(MRData)result.next()));
-            key.set(new MR_long(fsin.getPos()));
-            return true;
-        }
-
-        public synchronized long getPos () throws IOException { return fsin.getPos(); }
-
-        public synchronized void close () throws IOException { fsin.close(); }
-
-        public float getProgress () throws IOException {
-            if (end == start)
-                return 0.0f;
-            else return Math.min(1.0f, (getPos() - start) / (float)(end - start));
-        }
-    }
-
-    public RecordReader<MRContainer,MRContainer>
-              getRecordReader ( InputSplit split,
-                                BSPJob job ) throws IOException {
-        Configuration conf = BSPPlan.getConfiguration(job);
-        String path = ((FileSplit)split).getPath().toString();
-        ParsedDataSource ds = (ParsedDataSource)DataSource.get(path,conf);
-        return new ParsedRecordReader((FileSplit)split,job,ds.parser,ds.source_num,(Trees)ds.args);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/src/main/java/BSP/BSPPlan.java
----------------------------------------------------------------------
diff --git a/src/main/java/BSP/BSPPlan.java b/src/main/java/BSP/BSPPlan.java
deleted file mode 100644
index bca8ab9..0000000
--- a/src/main/java/BSP/BSPPlan.java
+++ /dev/null
@@ -1,507 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.mrql;
-
-import org.apache.mrql.gen.*;
-import java.io.*;
-import java.util.Arrays;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.io.*;
-import org.apache.hama.bsp.*;
-import org.apache.hama.bsp.sync.SyncException;
-import org.apache.hama.HamaConfiguration;
-import org.apache.hadoop.conf.Configuration;
-
-
-/** Evaluate a BSP plan using Hama */
-final public class BSPPlan extends Plan {
-
-    final static Configuration getConfiguration ( BSPJob job ) {
-        return job.getConf();   // use job.getConfiguration() for Hama 0.6.0
-    }
-
-    /** The BSP evaluator */
-    final static class BSPop extends BSP<MRContainer,MRContainer,MRContainer,MRContainer,MRContainer> {
-        final static MRContainer null_key = new MRContainer(new MR_byte(0));
-        // special message for sub-sync()
-        final static MRData more_to_come = new MR_sync();
-        final static MRData more_supersteps = new MR_more_bsp_steps();
-
-        private int source_num;
-        private Function superstep_fnc;      // superstep function
-        private MRData state;                // BSP state
-        private boolean orderp;              // will output be ordered?
-        private MRData source;               // BSP input
-        private Function acc_fnc;            // aggregator
-        private MRData acc_result;           // aggregation result
-        private static String[] all_peer_names;   // all BSP peer names
-        private static BSPPeer<MRContainer,MRContainer,MRContainer,MRContainer,MRContainer>[] all_peers;   // all BSP peers
-        // a master peer that coordinates and collects results of partial aggregations
-        private String masterTask;
-        // buffer for received messages -- regularly in a vector, but can be spilled in a local file
-        Bag msg_cache;
-        // the cache that holds all local data in memory
-        Tuple local_cache;
-
-        private static BSPPeer<MRContainer,MRContainer,MRContainer,MRContainer,MRContainer> getPeer ( String name ) {
-            for ( int i = 0; i < all_peer_names.length; i++ )
-                if (all_peer_names[i].equals(name))
-                    return all_peers[i];
-            throw new Error("Unknown peer: "+name);
-        }
-
-        private static void setPeer ( BSPPeer<MRContainer,MRContainer,MRContainer,MRContainer,MRContainer> peer ) {
-            String name = peer.getPeerName();
-            for ( int i = 0; i < all_peer_names.length; i++ )
-                if (all_peer_names[i].equals(name))
-                    all_peers[i] = peer;
-        }
-
-        /** shuffle values to BSP peers based on uniform hashing on key */
-        private static String shuffle ( MRData key ) {
-            return all_peer_names[Math.abs(key.hashCode()) % all_peer_names.length];
-        }
-
-        /** to exit a BSP loop, all peers must agree to exit (this is used in BSPTranslate.bspSimplify) */
-        public static MR_bool synchronize ( MR_string peerName, MR_bool mr_exit ) {
-            return synchronize(getPeer(peerName.get()),mr_exit);
-        }
-
-        /** to exit a BSP loop, all peers must agree to exit (this is used in BSPTranslate.bspSimplify) */
-        public static MR_bool synchronize ( BSPPeer<MRContainer,MRContainer,MRContainer,MRContainer,MRContainer> peer, MR_bool mr_exit ) {
-            if (!Config.hadoop_mode)
-                return mr_exit;
-            // shortcut: if we know for sure that all peers want to exit/continue, we don't have to poll
-            if (mr_exit == SystemFunctions.bsp_true_value          // must be ==, not equals
-                || mr_exit == SystemFunctions.bsp_false_value)
-                return mr_exit;
-            try {
-                // this case is only used for checking the exit condition of repeat/closure
-                boolean exit = mr_exit.get();
-                if (all_peer_names.length <= 1)
-                    return (exit) ? SystemFunctions.bsp_true_value : SystemFunctions.bsp_false_value;
-                if (!exit)
-                    // this peer is not ready to exit, so no peer should exit
-                    for ( String p: peer.getAllPeerNames() )
-                        peer.send(p,new MRContainer(more_supersteps));
-                peer.sync();
-                // now exit is true if no peer sent a "more_supersteps" message
-                exit = peer.getNumCurrentMessages() == 0;
-                peer.clear();
-                return (exit) ? SystemFunctions.bsp_true_value : SystemFunctions.bsp_false_value;
-            } catch (Exception ex) {
-                throw new Error(ex);
-            }
-        }
-
-        /** collect a bag from all peers by distributing the local copy s */
-        public static Bag distribute ( MR_string peerName, Bag s ) {
-            BSPPeer<MRContainer,MRContainer,MRContainer,MRContainer,MRContainer> peer = getPeer(peerName.get());
-            if (!Config.hadoop_mode)
-                return s;
-            try {
-                for ( MRData e: s )
-                    for ( String p: all_peer_names )
-                        peer.send(p,new MRContainer(e));
-                peer.sync();
-                MRContainer msg;
-                Bag res = new Bag();
-                while ((msg = peer.getCurrentMessage()) != null)
-                    if (!res.contains(msg.data()))
-                        res.add(msg.data());
-                peer.clear();
-                return res;
-            } catch (Exception ex) {
-                throw new Error(ex);
-            }
-        }
-
-        private void readLocalSources ( BSPPeer<MRContainer,MRContainer,MRContainer,MRContainer,MRContainer> peer )
-                throws IOException {
-            MRContainer key = new MRContainer();
-            MRContainer value = new MRContainer();
-            while (peer.readNext(key,value)) {
-                Tuple p = (Tuple)(value.data());
-                ((Bag)local_cache.get(((MR_int)p.first()).get())).add(p.second());
-            }
-        }
-
-        private void writeLocalResult ( Bag result,
-                                        BSPPeer<MRContainer,MRContainer,MRContainer,MRContainer,MRContainer> peer )
-                throws IOException {
-            MRContainer key = new MRContainer();
-            MRContainer value = new MRContainer();
-            for ( MRData v: result )
-                if (orderp) {       // prepare for sorting
-                    Tuple t = (Tuple)v;
-                    key.set(t.get(1));
-                    value.set(t.get(0));
-                    peer.write(key,value);
-                } else {
-                    value.set(v);
-                    peer.write(null_key,value);
-                }
-        }
-
-        /** receive messages from other peers */
-        private void receive_messages ( final BSPPeer<MRContainer,MRContainer,MRContainer,MRContainer,MRContainer> peer )
-                throws IOException, SyncException, InterruptedException {
-            if (Config.bsp_msg_size <= 0) {    // no buffering
-                msg_cache = new Bag(new BagIterator() {
-                        MRContainer msg;
-                        public boolean hasNext () {
-                            try {
-                                return (msg = peer.getCurrentMessage()) != null;
-                            } catch (Exception ex) {
-                                throw new Error(ex);
-                            }
-                        } 
-                        public MRData next () {
-                            return msg.data();
-                        }
-                    });
-            } else {
-                boolean expect_more = false;   // are we expecting more incoming messages?
-                do {
-                    // just in case this peer did a regular-sync() before the others did a sub-sync()
-                    expect_more = false;
-                    MRContainer msg;
-                    // cache the received messages
-                    while ((msg = peer.getCurrentMessage()) != null)
-                        // if at least one peer sends a more_to_come message, then expect_more
-                        if (msg.data().equals(more_to_come))
-                            expect_more = true;
-                        else msg_cache.add(msg.data());
-                    if (expect_more)
-                        peer.sync();   // sub-sync()
-                } while (expect_more);
-            }
-        }
-
-        /** send the messages produced by a superstep to peers and then receive the replies */
-        private void send_messages ( Bag msgs,
-                                     BSPPeer<MRContainer,MRContainer,MRContainer,MRContainer,MRContainer> peer )
-                throws IOException, SyncException, InterruptedException {
-            int size = 0;
-            if (Config.bsp_msg_size > 0)
-                msg_cache.clear();
-            for ( MRData m: msgs ) {
-                Tuple t = (Tuple)m;
-                // if there are too many messages to send, then sub-sync()
-                if ( Config.bsp_msg_size > 0 && size++ > Config.bsp_msg_size ) {
-                    // tell all peers that there is more to come after sync
-                    for ( String p: all_peer_names )
-                        if (!peer.getPeerName().equals(p))
-                            peer.send(p,new MRContainer(more_to_come));
-                    peer.sync();  // sub-sync()
-                    size = 0;
-                    MRContainer msg;
-                    // cache the received messages
-                    while ((msg = peer.getCurrentMessage()) != null)
-                        if (!msg.data().equals(more_to_come))
-                            msg_cache.add(msg.data());
-                };
-                // suffle messages based on key; needs new MRContainer object
-                peer.send(shuffle(t.get(0)),new MRContainer(t.get(1)));
-            };
-            peer.sync();   // regular-sync()
-            receive_messages(peer);
-        }
-
-        @Override
-        public void bsp ( BSPPeer<MRContainer,MRContainer,MRContainer,MRContainer,MRContainer> peer )
-               throws IOException, SyncException, InterruptedException {
-            final Tuple stepin = new Tuple(4);
-            stepin.set(3,new MR_string(peer.getPeerName()));
-            Tuple result;
-            boolean skip = false;
-            String tabs = "";
-            int step = 0;
-            boolean exit;
-            if (Evaluator.evaluator == null)
-                try {
-                    Evaluator.evaluator = (Evaluator)Class.forName("org.apache.mrql.BSPEvaluator").newInstance();
-                } catch (Exception ex) {
-                    throw new Error(ex);
-                };
-            readLocalSources(peer);
-            setPeer(peer);
-            do {
-                if (!skip)
-                    step++;
-                if (!skip && Config.trace_execution) {
-                    tabs = Interpreter.tabs(Interpreter.tab_count);
-                    System.err.println(tabs+"  Superstep "+step+" ["+peer.getPeerName()+"]:");
-                    System.err.println(tabs+"      messages ["+peer.getPeerName()+"]: "+msg_cache);
-                    System.err.println(tabs+"      state ["+peer.getPeerName()+"]: "+state);
-                    for ( int i = 0; i < local_cache.size(); i++)
-                        if (local_cache.get(i) instanceof Bag && ((Bag)local_cache.get(i)).size() > 0)
-                            System.out.println(tabs+"      cache ["+peer.getPeerName()+"] "+i+": "+local_cache.get(i));
-                };
-                stepin.set(0,local_cache);
-                stepin.set(1,msg_cache);
-                stepin.set(2,state);
-                // evaluate one superstep
-                result = (Tuple)superstep_fnc.eval(stepin);
-                Bag msgs = (Bag)result.get(0);
-                exit = ((MR_bool)result.get(2)).get();
-                state = result.get(1);
-                // shortcuts: if we know for sure that all peers want to exit/continue
-                if (result.get(2) == SystemFunctions.bsp_true_value) {  // must be ==, not equals
-                    peer.sync();
-                    if (Config.trace_execution)
-                        System.err.println(tabs+"      result ["+peer.getPeerName()+"]: "+result);
-                    break;
-                };
-                if (result.get(2) == SystemFunctions.bsp_false_value) {
-                    if (Config.trace_execution)
-                        System.err.println(tabs+"      result ["+peer.getPeerName()+"]: "+result);
-                    send_messages(msgs,peer);
-                    skip = false;
-                    continue;
-                };
-                // shortcut: skip is true when NONE of the peers sent any messages
-                skip = (msgs == SystemFunctions.bsp_empty_bag);  // must be ==, not equals
-                if (skip)
-                    continue;
-                if (Config.trace_execution)
-                    System.err.println(tabs+"      result ["+peer.getPeerName()+"]: "+result);
-                exit = synchronize(peer,(MR_bool)result.get(2)).get();
-                send_messages(msgs,peer);
-            } while (!exit);
-            if (acc_result == null) {
-                // the BSP result is a bag that needs to be dumped to the HDFS
-                writeLocalResult((Bag)(local_cache.get(source_num)),peer);
-                // if there more results, dump them to HDFS
-                final MR_long key = new MR_long(0);
-                final MRContainer key_container = new MRContainer(key);
-                final MRContainer data_container = new MRContainer(new MR_int(0));
-                int loc = 0;
-                while ( loc < all_peer_names.length && peer.getPeerName().equals(all_peer_names[loc]) )
-                    loc++;
-                Configuration conf = peer.getConfiguration();
-                String[] out_paths = conf.get("mrql.output.paths").split(",");
-                for ( int i = 1; i < out_paths.length; i++ ) {
-                    String[] s = out_paths[i].split(":");
-                    int out_num = Integer.parseInt(s[0]);
-                    Path path = new Path(s[1]+"/peer"+loc);
-                    FileSystem fs = path.getFileSystem(conf);
-                    SequenceFile.Writer writer
-                        = new SequenceFile.Writer(fs,conf,path,
-                                                  MRContainer.class,MRContainer.class);
-                    long count = 0;
-                    for ( MRData e: (Bag)(local_cache.get(out_num)) ) {
-                        key.set(count++);
-                        data_container.set(e);
-                        writer.append(key_container,data_container);
-                    };
-                    writer.close();
-                };
-            } else {
-                // the BSP result is an aggregation:
-                //     send the partial results to the master peer
-                peer.send(masterTask,new MRContainer(local_cache.get(source_num)));
-                peer.sync();
-                if (peer.getPeerName().equals(masterTask)) {
-                    // only the master peer collects the partial aggregations
-                    MRContainer msg;
-                    while ((msg = peer.getCurrentMessage()) != null)
-                        acc_result = acc_fnc.eval(new Tuple(acc_result,msg.data()));
-                    // write the final aggregation result
-                    peer.write(null_key,new MRContainer(acc_result));
-               }
-            }
-        }
-
-        @Override
-        @SuppressWarnings("unchecked")
-        public void setup ( BSPPeer<MRContainer,MRContainer,MRContainer,MRContainer,MRContainer> peer ) {
-            try {
-                super.setup(peer);
-                Configuration conf = peer.getConfiguration();
-                Config.read(conf);
-                if (Plan.conf == null)
-                    Plan.conf = conf;
-                all_peer_names = peer.getAllPeerNames();
-                all_peers = new BSPPeerImpl[all_peer_names.length];
-                Arrays.sort(all_peer_names);  // is this necessary?
-                source_num = conf.getInt("mrql.output.tag",0);
-                Tree code = Tree.parse(conf.get("mrql.superstep"));
-                superstep_fnc = functional_argument(conf,code);
-                code = Tree.parse(conf.get("mrql.initial.state"));
-                state = Interpreter.evalE(code);
-                if (conf.get("mrql.zero") != null && !conf.get("mrql.zero").equals("")) {
-                    code = Tree.parse(conf.get("mrql.zero"));
-                    acc_result = Interpreter.evalE(code);
-                    code = Tree.parse(conf.get("mrql.accumulator"));
-                    acc_fnc = functional_argument(conf,code);
-                } else acc_result = null;
-                orderp = conf.getBoolean("mrql.orderp",false);
-                masterTask = all_peer_names[peer.getNumPeers()/2];
-                msg_cache = new Bag(1000);
-                local_cache = new Tuple(max_input_files);
-                for ( int i = 0; i < max_input_files; i++ )
-                    local_cache.set(i,new Bag());
-            } catch (Exception e) {
-                e.printStackTrace();
-                throw new Error("Cannot setup the Hama BSP job: "+e);
-            }
-        }
-
-        @Override
-        public void cleanup ( BSPPeer<MRContainer,MRContainer,MRContainer,MRContainer,MRContainer> peer ) throws IOException {
-            if (!Config.local_mode)
-                clean();
-            local_cache = null;
-            super.cleanup(peer);
-        }
-    }
-
-    /** set Hama's min split size and number of BSP tasks (doesn't work with Hama 0.6.0) */
-    public static void setupSplits ( BSPJob job, DataSet ds ) throws IOException {
-        long[] sizes = new long[ds.source.size()];
-        if (sizes.length > Config.nodes)
-            throw new Error("Cannot distribute "+sizes.length+" files over "+Config.nodes+" BSP tasks");
-        for ( int i = 0; i < sizes.length; i++ )
-            sizes[i] = ds.source.get(i).size(Plan.conf);
-        long total_size = 0;
-        for ( long size: sizes )
-            total_size += size;
-        long split_size = Math.max(total_size/Config.nodes,100000);
-        int tasks = 0;
-        do {  // adjust split_size
-            tasks = 0;
-            for ( long size: sizes )
-                tasks += (int)Math.ceil(size/(double)split_size);
-            if (tasks > Config.nodes)
-                split_size = (long)Math.ceil((double)split_size*1.01);
-        } while (tasks > Config.nodes);
-        job.setNumBspTask(tasks);
-        System.err.println("*** Using "+tasks+" BSP tasks (out of a max "+Config.nodes+")."
-                           +" Each task will handle about "+Math.min(total_size/Config.nodes,split_size)
-                           +" bytes of input data.");
-        job.set("bsp.min.split.size",Long.toString(split_size));
-    }
-
-    /** Evaluate a BSP operation that returns a DataSet
-     * @param source_nums   output tags
-     * @param superstep     the superstep function
-     * @param init_state    initial state
-     * @param orderp        do we need to order the result?
-     * @param source        input dataset
-     * @return a new data source that contains the result
-     */
-    public final static MRData BSP ( int[] source_nums, // output tags
-                                     Tree superstep,    // superstep function
-                                     Tree init_state,   // initial state
-                                     boolean orderp,    // do we need to order the result?
-                                     DataSet source     // input dataset
-                                     ) throws Exception {
-        String[] newpaths = new String[source_nums.length];
-        newpaths[0] = new_path(conf);
-        conf.set("mrql.output.paths",source_nums[0]+":"+newpaths[0]);
-        for ( int i = 1; i < source_nums.length; i++ ) {
-            newpaths[i] = new_path(conf);
-            Path path = new Path(newpaths[1]);
-            FileSystem fs = path.getFileSystem(conf);
-            fs.mkdirs(path);
-            conf.set("mrql.output.paths",conf.get("mrql.output.paths")+","+source_nums[i]+":"+newpaths[i]);
-        };
-        conf.set("mrql.superstep",superstep.toString());
-        conf.set("mrql.initial.state",init_state.toString());
-        conf.set("mrql.zero","");
-        conf.setInt("mrql.output.tag",source_nums[0]);
-        conf.setBoolean("mrql.orderp",orderp);
-        BSPJob job = new BSPJob((HamaConfiguration)conf,BSPop.class);
-        setupSplits(job,source);
-        job.setJobName(newpaths[0]);
-        distribute_compiled_arguments(getConfiguration(job));
-        job.setBspClass(BSPop.class);
-        Path outpath = new Path(newpaths[0]);
-        job.setOutputPath(outpath);
-        job.setOutputKeyClass(MRContainer.class);
-        job.setOutputValueClass(MRContainer.class);
-        job.setOutputFormat(SequenceFileOutputFormat.class);
-        job.setInputFormat(MultipleBSPInput.class);
-        FileInputFormat.setInputPaths(job,source.merge());
-        job.waitForCompletion(true);
-        if (source_nums.length == 1) {
-            BinaryDataSource ds = new BinaryDataSource(source_nums[0],newpaths[0],conf);
-            ds.to_be_merged = orderp;
-            return new MR_dataset(new DataSet(ds,0,3));
-        } else {
-            MRData[] s = new MRData[source_nums.length];
-            for ( int i = 0; i < source_nums.length; i++ ) {
-                BinaryDataSource ds = new BinaryDataSource(source_nums[i],newpaths[i],conf);
-                ds.to_be_merged = orderp;
-                s[i] = new MR_dataset(new DataSet(ds,0,3));
-            };
-            return new Tuple(s);
-        }
-    }
-
-    /** Evaluate a BSP operation that aggregates the results
-     * @param source_num    output tag
-     * @param superstep     the superstep function
-     * @param init_state    initial state
-     * @param acc_fnc       accumulator function
-     * @param zero          zero value for the accumulator
-     * @param source        input dataset
-     * @return the aggregation result
-     */
-    public final static MRData BSPaggregate ( int source_num,   // output tag
-                                              Tree superstep,   // superstep function
-                                              Tree init_state,  // initial state
-                                              Tree acc_fnc,     // accumulator function
-                                              Tree zero,        // zero value for the accumulator
-                                              DataSet source    // input dataset
-                                              ) throws Exception {
-        String newpath = new_path(conf);
-        conf.set("mrql.superstep",superstep.toString());
-        conf.set("mrql.initial.state",init_state.toString());
-        conf.set("mrql.accumulator",acc_fnc.toString());
-        conf.set("mrql.zero",zero.toString());
-        conf.setInt("mrql.output.tag",source_num);
-        conf.setBoolean("mrql.orderp",false);
-        BSPJob job = new BSPJob((HamaConfiguration)conf,BSPop.class);
-        setupSplits(job,source);
-        job.setJobName(newpath);
-        distribute_compiled_arguments(getConfiguration(job));
-        job.setBspClass(BSPop.class);
-        Path outpath = new Path(newpath);
-        job.setOutputPath(outpath);
-        job.setOutputKeyClass(MRContainer.class);
-        job.setOutputValueClass(MRContainer.class);
-        job.setOutputFormat(SequenceFileOutputFormat.class);
-        job.setInputFormat(MultipleBSPInput.class);
-        FileInputFormat.setInputPaths(job,source.merge());
-        job.waitForCompletion(true);
-        FileSystem fs = outpath.getFileSystem(conf);
-        FileStatus[] files = fs.listStatus(outpath);
-        for ( int i = 0; i < files.length; i++ ) {
-            SequenceFile.Reader sreader = new SequenceFile.Reader(fs,files[i].getPath(),conf);
-            MRContainer key = new MRContainer();
-            MRContainer value = new MRContainer();
-            sreader.next(key,value);
-            sreader.close();
-            if (value.data() != null)
-                return value.data();
-        };
-        return null;
-    }
- }

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/src/main/java/BSP/MultipleBSPInput.java
----------------------------------------------------------------------
diff --git a/src/main/java/BSP/MultipleBSPInput.java b/src/main/java/BSP/MultipleBSPInput.java
deleted file mode 100644
index aa43423..0000000
--- a/src/main/java/BSP/MultipleBSPInput.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.mrql;
-
-import org.apache.mrql.gen.Trees;
-import java.io.*;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hama.bsp.*;
-
-
-/** A FileInputFormat for multiple files, where each file may be associated with
- *   a different FileInputFormat */
-final public class MultipleBSPInput extends BSPMRQLFileInputFormat {
-    public RecordReader<MRContainer,MRContainer>
-              getRecordReader ( InputSplit split, BSPJob job ) throws IOException {
-        if (Evaluator.evaluator == null)
-            try {
-                Evaluator.evaluator = (Evaluator)Class.forName("org.apache.mrql.BSPEvaluator").newInstance();
-            } catch (Exception ex) {
-                throw new Error(ex);
-            };
-        String path = ((FileSplit)split).getPath().toString();
-        Configuration conf = BSPPlan.getConfiguration(job);
-        DataSource ds = DataSource.get(path,conf);
-        Plan.conf = conf;
-        if (ds instanceof ParsedDataSource)
-            return new BSPParsedInputFormat.ParsedRecordReader((FileSplit)split,
-                                                               job,
-                                                               ((ParsedDataSource)ds).parser,
-                                                               ds.source_num,
-                                                               (Trees)((ParsedDataSource)ds).args);
-        else if (ds instanceof BinaryDataSource)
-            return new BSPBinaryInputFormat.BinaryInputRecordReader((FileSplit)split,job,ds.source_num);
-        else if (ds instanceof GeneratorDataSource)
-            return new BSPGeneratorInputFormat.GeneratorRecordReader((FileSplit)split,ds.source_num,job);
-        else throw new Error("Unknown data source: "+ds+" for path "+path);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/src/main/java/MapReduce/CrossProductOperation.java
----------------------------------------------------------------------
diff --git a/src/main/java/MapReduce/CrossProductOperation.java b/src/main/java/MapReduce/CrossProductOperation.java
deleted file mode 100644
index a9216cc..0000000
--- a/src/main/java/MapReduce/CrossProductOperation.java
+++ /dev/null
@@ -1,235 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.mrql;
-
-import org.apache.mrql.gen.*;
-import java.io.*;
-import java.net.URI;
-import java.util.List;
-import java.util.Vector;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.*;
-import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.apache.hadoop.filecache.DistributedCache;
-
-
-/** The CrossProduct physical operation (similar to block-nested loop) */
-final public class CrossProductOperation extends MapReducePlan {
-
-    /** The mapper for the CrossProduct operation */
-    private final static class crossProductMapper extends Mapper<MRContainer,MRContainer,MRContainer,MRContainer> {
-        private static String counter;       // a Hadoop user-defined counter used in the repeat operation
-        private static Function reduce_fnc;  // the reduce function
-        private static Function map_fnc;     // the mapper function
-        private static DataSet cached_dataset;
-        private final static List<MRData> outer
-            = new Vector<MRData>(Config.map_cache_size);  // fix-size cache for the outer
-        private static int index;
-        private static MRContainer last_key;
-        private static URI[] uris;
-        private static Path[] local_paths;
-        private static Function acc_fnc;     // aggregator
-        private static MRData result;        // aggregation result
-        private static Tuple pair = new Tuple(2);
-        private static MRContainer container = new MRContainer(new MR_int(0));
-
-        private void write ( MRContainer key, MRData value, Context context )
-                     throws IOException, InterruptedException {
-            if (result != null) {  // aggregation
-                pair.set(0,result);
-                pair.set(1,value);
-                result = acc_fnc.eval(pair);
-            } else if (counter.equals("-")) {
-                container.set(value);
-                context.write(key,container);
-            } else {     // increment the repetition counter if the repeat condition is true
-                Tuple t = (Tuple)value;
-                if (((MR_bool)t.second()).get())
-                    context.getCounter("mrql",counter).increment(1);
-                container.set(t.first());
-                context.write(key,container);
-            }
-        }
-
-        @Override
-        public void map ( MRContainer key, MRContainer value, Context context )
-                    throws IOException, InterruptedException {
-            try {
-                last_key = key;
-                for ( MRData x: (Bag)map_fnc.eval(value.data()) )
-                    if (index++ == Config.map_cache_size) {
-                        for ( MRData y: cached_data(context.getConfiguration()) ) {
-                            pair.set(1,y);
-                            for ( MRData z: outer ) {
-                                pair.set(0,z);
-                                for ( MRData v: (Bag)reduce_fnc.eval(pair) )
-                                    write(key,v,context);
-                            }
-                        };
-                        index = 0;
-                        outer.clear();
-                    } else outer.add(x);
-            } catch (Exception e) {
-                throw new Error("Cannot perform the crossProduct: "+e);
-            }
-        }
-
-        protected Bag cached_data ( final Configuration conf ) {
-            try {
-                Bag res = new Bag();
-                final FileSystem fs = FileSystem.getLocal(conf);
-                for ( int i = 0; i < local_paths.length; i++ ) {
-                    // hadoop 0.20.2 distributed cache doesn't work in stand-alone
-                    final Path path = (conf.get("mapred.job.tracker").equals("local"))
-                                      ? new Path(uris[i].toString())
-                                      : local_paths[i];
-                    if (path.getName().endsWith(".jar"))
-                        continue;
-                    res = res.union(new Bag(new BagIterator () {
-                            final SequenceFile.Reader reader = new SequenceFile.Reader(fs,path,conf);
-                            final MRContainer key = new MRContainer(new MR_int(0));
-                            final MRContainer value = new MRContainer(new MR_int(0));
-                            public boolean hasNext () {
-                                try {
-                                    boolean done = reader.next(key,value);
-                                    if (!done)
-                                        reader.close();
-                                    return done;
-                                } catch (IOException e) {
-                                    throw new Error("Cannot collect values from distributed cache");
-                                }
-                            }
-                            public MRData next () {
-                                return value.data();
-                            }
-                        }));
-                };
-                return res;
-            } catch (Exception e) {
-                throw new Error("Cannot setup the cross product: "+e);
-            }
-        }
-
-        @Override
-        protected void setup ( Context context ) throws IOException,InterruptedException {
-            super.setup(context);
-            try {
-                Configuration conf = context.getConfiguration();
-                Config.read(conf);
-                if (Plan.conf == null)
-                    Plan.conf = conf;
-                Tree code = Tree.parse(conf.get("mrql.reducer"));
-                reduce_fnc = functional_argument(conf,code);
-                code = Tree.parse(conf.get("mrql.mapper"));
-                map_fnc = functional_argument(conf,code);
-                if (conf.get("mrql.zero") != null) {
-                    code = Tree.parse(conf.get("mrql.zero"));
-                    result = Interpreter.evalE(code);
-                    code = Tree.parse(conf.get("mrql.accumulator"));
-                    acc_fnc = functional_argument(conf,code);
-                } else result = null;
-                counter = conf.get("mrql.counter");
-                uris = DistributedCache.getCacheFiles(conf);
-                local_paths = DistributedCache.getLocalCacheFiles(conf);
-                index = 0;
-            } catch (Exception e) {
-                throw new Error("Cannot setup the crossProduct: "+e);
-            }
-        }
-
-        @Override
-        protected void cleanup ( Context context ) throws IOException,InterruptedException {
-            if (index > 0)
-                try {
-                    for ( MRData y: cached_data(context.getConfiguration()) ) {
-                        pair.set(1,y);
-                        for ( MRData z: outer ) {
-                            pair.set(0,z);
-                            for ( MRData v: (Bag)reduce_fnc.eval(pair) )
-                                write(last_key,v,context);
-                        }
-                    };
-                } catch (Exception e) {
-                    throw new Error("Cannot cleanup the crossProduct: "+e);
-                };
-            index = 0;
-            outer.clear();
-            if (result != null)  // emit the result of aggregation
-                context.write(new MRContainer(new MR_int(0)),new MRContainer(result));
-            super.cleanup(context);
-        }
-    }
-
-    /** The CrossProduct physical operator (similar to block-nested loop)
-     * @param mx              left mapper
-     * @param my              right mapper
-     * @param reduce_fnc      reducer
-     * @param acc_fnc         optional accumulator function
-     * @param zero            optional the zero value for the accumulator
-     * @param X               the left source
-     * @param Y               the right source (stored in distributed cache)
-     * @param stop_counter    optional counter used in repeat operation
-     * @return a new data source that contains the result
-     */
-    public final static DataSet crossProduct ( Tree mx,              // left mapper
-                                               Tree my,              // right mapper
-                                               Tree reduce_fnc,      // reducer
-                                               Tree acc_fnc,         // optional accumulator function
-                                               Tree zero,            // optional the zero value for the accumulator
-                                               DataSet X,            // the left source
-                                               DataSet Y,            // the right source (stored in distributed cache)
-                                               String stop_counter ) // optional counter used in repeat operation
-                                 throws Exception {
-        DataSet ds = MapOperation.cMap(my,null,null,Y,"-");
-        String newpath = new_path(conf);
-        conf.set("mrql.reducer",reduce_fnc.toString());
-        conf.set("mrql.mapper",mx.toString());
-        if (zero != null) {
-            conf.set("mrql.accumulator",acc_fnc.toString());
-            conf.set("mrql.zero",zero.toString());
-            conf.set("mapred.min.split.size","268435456");
-        } else conf.set("mrql.zero","");
-        conf.set("mrql.counter",stop_counter);
-        Job job = new Job(conf,newpath);
-        distribute_compiled_arguments(job.getConfiguration());
-        job.setJarByClass(MapReducePlan.class);
-        job.setOutputKeyClass(MRContainer.class);
-        job.setOutputValueClass(MRContainer.class);
-        job.setOutputFormatClass(SequenceFileOutputFormat.class);
-        PathFilter pf = new PathFilter () { public boolean accept ( Path path ) {
-                                                return !path.getName().startsWith("_");
-                                            } };
-        for (DataSource p: ds.source) {
-            Path path = new Path(p.path);
-            for ( FileStatus s: path.getFileSystem(conf).listStatus(path,pf) )
-                DistributedCache.addCacheFile(s.getPath().toUri(),job.getConfiguration());
-        };
-        for (DataSource p: X.source)
-            MultipleInputs.addInputPath(job,new Path(p.path),(Class<? extends MapReduceMRQLFileInputFormat>)p.inputFormat,crossProductMapper.class);
-        FileOutputFormat.setOutputPath(job,new Path(newpath));
-        job.setNumReduceTasks(0);
-        job.waitForCompletion(true);
-        long c = (stop_counter.equals("-")) ? 0
-                 : job.getCounters().findCounter("mrql",stop_counter).getValue();
-        return new DataSet(new BinaryDataSource(newpath,conf),c,outputRecords(job));
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/src/main/java/MapReduce/GroupByJoinPlan.java
----------------------------------------------------------------------
diff --git a/src/main/java/MapReduce/GroupByJoinPlan.java b/src/main/java/MapReduce/GroupByJoinPlan.java
deleted file mode 100644
index 4f93e1e..0000000
--- a/src/main/java/MapReduce/GroupByJoinPlan.java
+++ /dev/null
@@ -1,464 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.mrql;
-
-import org.apache.mrql.gen.*;
-import java.io.*;
-import java.util.*;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.*;
-import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-
-
-/**
- *   A map-reduce job that captures a join with group-by. Similar to matrix multiplication.<br/>
- *   It captures queries of the form:
- * <pre>
- *      select r(kx,ky,c(z))
- *        from x in X, y in Y, z = mp(x,y)
- *       where jx(x) = jy(y)
- *       group by (kx,ky): (gx(x),gy(y));
- * </pre>
- *   where: mp: map function, r: reduce function, c: combine function,
- *          jx: left join key function, jy: right join key function,
- *          gx: left group-by function, gy: right group-by function.
- * <br/>
- *   Example: matrix multiplication:
- * <pre>
- *      select ( sum(z), i, j )
- *        from (x,i,k) in X, (y,k,j) in Y, z = x*y
- *       group by (i,j);
- * </pre>
- *   It uses m*n partitions, so that n/m=|X|/|Y| and a hash table of size |X|/n*|Y|/m can fit in memory M.
- *   That is, n = |X|/sqrt(M), m = |Y|/sqrt(M).
- *   Each partition generates |X|/n*|Y|/m data. It replicates X n times and Y m times.
- *   Uses a hash-table H of size |X|/n*|Y|/m
- *   MapReduce pseudo-code:
- * <pre>
- *   mapX ( x )
- *     for i = 0,n-1
- *        emit ( ((hash(gx(x)) % m)+m*i, jx(x), 1), (1,x) )
- *
- *   mapY ( y )
- *     for i = 0,m-1
- *        emit ( ((hash(gy(y)) % n)*m+i, jy(y), 2), (2,y) )
- * </pre>
- *   mapper output key: (partition,joinkey,tag),  value: (tag,data) <br/>
- *   Partitioner: over partition                                    <br/>
- *   GroupingComparator: over partition and joinkey                 <br/>
- *   SortComparator: major partition, minor joinkey, sub-minor tag  <br/>
- * <pre>
- *   reduce ( (p,_,_), s )
- *     if p != current_partition
- *        flush()
- *        current_partition = p
- *     read x from s first and store it to xs
- *     for each y from the rest of s
- *        for each x in xs
- *            H[(gx(x),gy(y))] = c( H[(gx(x),gy(y))], mp((x,y)) )
- * </pre>
- *   where flush() is: for each ((kx,ky),v) in H: emit r((kx,ky),v)
- */
-final public class GroupByJoinPlan extends Plan {
-
-    /** mapper output key: (partition,joinkey,tag) */
-    private final static class GroupByJoinKey implements Writable {
-        public int partition;  // one of n*m
-        public byte tag;       // 1 or 2
-        public MRData key;
-
-        GroupByJoinKey () {}
-        GroupByJoinKey  ( int p, byte t, MRData k ) {
-            partition = p;
-            tag = t;
-            key = k;
-        }
-
-        public void write ( DataOutput out ) throws IOException {
-            out.writeByte(tag);
-            WritableUtils.writeVInt(out,partition);
-            key.write(out);
-        }
-
-        public void readFields ( DataInput in ) throws IOException {
-            tag = in.readByte();
-            partition = WritableUtils.readVInt(in);
-            key = MRContainer.read(in);
-        }
-
-        public String toString () { return "["+partition+","+tag+","+key+"]"; }
-    }
-
-    /** partition based on key.partition only */
-    private final static class GroupByJoinPartitioner extends Partitioner<GroupByJoinKey,MRContainer> {
-        final public int getPartition ( GroupByJoinKey key, MRContainer value, int numPartitions ) {
-            return key.partition % numPartitions;
-        }
-    }
-
-    /** sorting with major order key.partition, minor key.key, minor key.tag */
-    private final static class GroupByJoinSortComparator implements RawComparator<GroupByJoinKey> {
-        int[] container_size;
-
-        public GroupByJoinSortComparator () {
-            container_size = new int[1];
-        }
-
-        final public int compare ( byte[] x, int xs, int xl, byte[] y, int ys, int yl ) {
-            try {
-                int c = WritableComparator.readVInt(x,xs+1)-WritableComparator.readVInt(y,ys+1);
-                if (c != 0)
-                    return c;
-                int tsize = 1+WritableUtils.decodeVIntSize(x[xs+1]);
-                c = MRContainer.compare(x,xs+tsize,xl-tsize,y,ys+tsize,yl-tsize,container_size);
-                if (c != 0)
-                    return c;
-                return x[xs] - y[ys];
-            } catch (IOException e) {
-                throw new Error(e);
-            }
-        }
-
-        final public int compare ( GroupByJoinKey x, GroupByJoinKey y ) {
-            int c = x.partition - y.partition;
-            if (c != 0)
-                return c;
-            c = x.key.compareTo(y.key);
-            if (c != 0)
-                return c;
-            return x.tag - y.tag;
-        }
-    }
-
-    /** grouping by key.partition and key.key */
-    private final static class GroupByJoinGroupingComparator implements RawComparator<GroupByJoinKey> {
-        int[] container_size;
-
-        public GroupByJoinGroupingComparator() {
-            container_size = new int[1];
-        }
-
-        final public int compare ( byte[] x, int xs, int xl, byte[] y, int ys, int yl ) {
-            try {
-                int c = WritableComparator.readVInt(x,xs+1)-WritableComparator.readVInt(y,ys+1);
-                if (c != 0)
-                    return c;
-                int tsize = 1+WritableUtils.decodeVIntSize(x[xs+1]);
-                return MRContainer.compare(x,xs+tsize,xl-tsize,y,ys+tsize,yl-tsize,container_size);
-            } catch (IOException e) {
-                throw new Error(e);
-            }
-        }
-
-        final public int compare ( GroupByJoinKey x, GroupByJoinKey y ) {
-            int c = x.partition - y.partition;
-            return (c != 0) ? c : x.key.compareTo(y.key);
-        }
-    }
-
-    /** the left GroupByJoin mapper */
-    private final static class MapperLeft extends Mapper<MRContainer,MRContainer,GroupByJoinKey,MRContainer> {
-        private static int n, m;
-        private static Function left_join_key_fnc;
-        private static Function left_groupby_fnc;
-        private static GroupByJoinKey ckey = new GroupByJoinKey(0,(byte)1,new MR_int(0));
-        private static Tuple tvalue = (new Tuple(2)).set(0,new MR_byte(1));
-        private static MRContainer cvalue = new MRContainer(tvalue);
-
-        @Override
-        public void map ( MRContainer key, MRContainer value, Context context )
-                    throws IOException, InterruptedException {
-            MRData data = value.data();
-            for ( int i = 0; i < n; i++ ) {
-                ckey.partition = (left_groupby_fnc.eval(data).hashCode() % m)+m*i;
-                ckey.key = left_join_key_fnc.eval(data);
-                tvalue.set(1,data);
-                context.write(ckey,cvalue);
-            }
-        }
-
-        @Override
-        protected void setup ( Context context ) throws IOException,InterruptedException {
-            super.setup(context);
-            try {
-                Configuration conf = context.getConfiguration();
-                Config.read(conf);
-                if (Plan.conf == null)
-                    Plan.conf = conf;
-                Tree code = Tree.parse(conf.get("mrql.join.key.left"));
-                left_join_key_fnc = functional_argument(conf,code);
-                code = Tree.parse(conf.get("mrql.groupby.left"));
-                left_groupby_fnc = functional_argument(conf,code);
-                m = conf.getInt("mrql.m",1);
-                n = conf.getInt("mrql.n",1);
-            } catch (Exception e) {
-                throw new Error("Cannot retrieve the left mapper plan");
-            }
-        }
-    }
-
-    /** the right GroupByJoin mapper */
-    private final static class MapperRight extends Mapper<MRContainer,MRContainer,GroupByJoinKey,MRContainer> {
-        private static int n, m;
-        private static Function right_join_key_fnc;
-        private static Function right_groupby_fnc;
-        private static GroupByJoinKey ckey = new GroupByJoinKey(0,(byte)2,new MR_int(0));
-        private static Tuple tvalue = (new Tuple(2)).set(0,new MR_byte(2));
-        private static MRContainer cvalue = new MRContainer(tvalue);
-
-        @Override
-        public void map ( MRContainer key, MRContainer value, Context context )
-                    throws IOException, InterruptedException {
-            MRData data = value.data();
-            for ( int i = 0; i < m; i++ ) {
-                ckey.partition = (right_groupby_fnc.eval(data).hashCode() % n)*m+i;
-                ckey.key = right_join_key_fnc.eval(data);
-                tvalue.set(1,data);
-                context.write(ckey,cvalue);
-            }
-        }
-
-        @Override
-        protected void setup ( Context context ) throws IOException,InterruptedException {
-            super.setup(context);
-            try {
-                Configuration conf = context.getConfiguration();
-                Config.read(conf);
-                if (Plan.conf == null)
-                    Plan.conf = conf;
-                Tree code = Tree.parse(conf.get("mrql.join.key.right"));
-                right_join_key_fnc = functional_argument(conf,code);
-                code = Tree.parse(conf.get("mrql.groupby.right"));
-                right_groupby_fnc = functional_argument(conf,code);
-                m = conf.getInt("mrql.m",1);
-                n = conf.getInt("mrql.n",1);
-            } catch (Exception e) {
-                throw new Error("Cannot retrieve the right mapper plan");
-            }
-        }
-    }
-
-    /** the GroupByJoin reducer */
-    private static class JoinReducer extends Reducer<GroupByJoinKey,MRContainer,MRContainer,MRContainer> {
-        private static String counter;            // a Hadoop user-defined counter used in the repeat operation
-        private static int n, m;                  // n*m partitioners
-        private static Function left_groupby_fnc; // left group-by function
-        private static Function right_groupby_fnc;// right group-by function
-        private static Function map_fnc;          // the map function
-        private static Function combine_fnc;      // the combine function
-        private static Function reduce_fnc;       // the reduce function
-        private static Bag left = new Bag();      // a cached bag of input fragments from left input
-        private static int current_partition = -1;
-        private static Hashtable<MRData,MRData> hashTable;  // in-reducer combiner
-        private static Tuple pair = new Tuple(2);
-        private static MRContainer ckey = new MRContainer(new MR_int(0));
-        private static MRContainer cvalue = new MRContainer(new MR_int(0));
-        private static MRContainer container = new MRContainer(new MR_int(0));
-        private static Tuple tkey = new Tuple(2);
-        private static Bag tbag = new Bag(2);
-
-        private static void write ( MRContainer key, MRData value, Context context )
-                       throws IOException, InterruptedException {
-            if (counter.equals("-")) {
-                container.set(value);
-                context.write(key,container);
-            } else {     // increment the repetition counter if the repeat condition is true
-                Tuple t = (Tuple)value;
-                if (((MR_bool)t.second()).get())
-                    context.getCounter("mrql",counter).increment(1);
-                container.set(t.first());
-                context.write(key,container);
-            }
-        }
-
-        private void store ( MRData key, MRData value ) throws IOException {
-            MRData old = hashTable.get(key);
-            Tuple k = (Tuple)key;
-            pair.set(0,key);
-            for ( MRData e: (Bag)map_fnc.eval(value) )
-                if (old == null)
-                    hashTable.put(key,e);
-                else {
-                    tbag.clear();
-                    tbag.add_element(e).add_element(old);
-                    pair.set(1,tbag);
-                    for ( MRData z: (Bag)combine_fnc.eval(pair) )
-                        hashTable.put(key,z);  // normally, done once
-                }
-        }
-
-        protected static void flush_table ( Context context ) throws IOException, InterruptedException {
-            Enumeration<MRData> en = hashTable.keys();
-            while (en.hasMoreElements()) {
-                MRData key = en.nextElement();
-                MRData value = hashTable.get(key);
-                ckey.set(key);
-                pair.set(0,key);
-                tbag.clear();
-                tbag.add_element(value);
-                pair.set(1,tbag);
-                for ( MRData e: (Bag)reduce_fnc.eval(pair) )
-                    write(ckey,e,context);
-            };
-            hashTable.clear();
-        }
-
-        @Override
-        public void reduce ( GroupByJoinKey key, Iterable<MRContainer> values, Context context )
-                    throws IOException, InterruptedException {
-            if (key.partition != current_partition && current_partition > 0) {
-                // at the end of a partition, flush the hash table
-                flush_table(context);
-                current_partition = key.partition;
-            };
-            left.clear();
-            Tuple p = null;
-            final Iterator<MRContainer> i = values.iterator();
-            // left tuples arrive before right tuples; cache the left values into the left bag
-            while (i.hasNext()) {
-                p = (Tuple)i.next().data();
-                if (((MR_byte)p.first()).get() == 2)
-                    break;
-                left.add(p.second());
-                p = null;
-            };
-            // the previous value was from right
-            if (p != null) {
-                MRData y = p.second();
-                MRData gy = right_groupby_fnc.eval(y);
-                // cross product with left (must use new Tuples)
-                for ( MRData x: left )
-                    store(new Tuple(left_groupby_fnc.eval(x),gy),new Tuple(x,y));
-                // the rest of values are from right
-                while (i.hasNext()) {
-                    y = ((Tuple)i.next().data()).second();
-                    gy = right_groupby_fnc.eval(y);
-                    // cross product with left (must use new Tuples)
-                    for ( MRData x: left )
-                        store(new Tuple(left_groupby_fnc.eval(x),gy),new Tuple(x,y));
-                }
-            }
-        }
-
-        @Override
-        protected void setup ( Context context ) throws IOException,InterruptedException {
-            super.setup(context);
-            try {
-                Configuration conf = context.getConfiguration();
-                Config.read(conf);
-                if (Plan.conf == null)
-                    Plan.conf = conf;
-                Tree code = Tree.parse(conf.get("mrql.groupby.left"));
-                left_groupby_fnc = functional_argument(conf,code);
-                code = Tree.parse(conf.get("mrql.groupby.right"));
-                right_groupby_fnc = functional_argument(conf,code);
-                m = conf.getInt("mrql.m",1);
-                n = conf.getInt("mrql.n",1);
-                code = Tree.parse(conf.get("mrql.mapper"));
-                map_fnc = functional_argument(conf,code);
-                code = Tree.parse(conf.get("mrql.combiner"));
-                combine_fnc = functional_argument(conf,code);
-                code = Tree.parse(conf.get("mrql.reducer"));
-                reduce_fnc = functional_argument(conf,code);
-                counter = conf.get("mrql.counter");
-                hashTable = new Hashtable<MRData,MRData>(Config.map_cache_size);
-            } catch (Exception e) {
-                throw new Error("Cannot retrieve the reducer plan");
-            }
-        }
-
-        @Override
-        protected void cleanup ( Context context ) throws IOException,InterruptedException {
-            if (hashTable != null)
-                flush_table(context);
-            hashTable = null; // garbage-collect it
-            super.cleanup(context);
-        }
-    }
-
-    /** the GroupByJoin operation
-     * @param left_join_key_fnc   left join key function
-     * @param right_join_key_fnc  right join key function
-     * @param left_groupby_fnc    left group-by function
-     * @param right_groupby_fnc   right group-by function
-     * @param map_fnc             map function
-     * @param combine_fnc         combine function
-     * @param reduce_fnc          reduce function
-     * @param X                   left data set
-     * @param Y                   right data set
-     * @param num_reducers        number of reducers
-     * @param n                   left dimension of the reducer grid
-     * @param m                   right dimension of the reducer grid
-     * @param stop_counter        optional counter used in repeat operation
-     * @return a DataSet that contains the result
-     */
-    public final static DataSet groupByJoin
-                 ( Tree left_join_key_fnc,       // left join key function
-                   Tree right_join_key_fnc,      // right join key function
-                   Tree left_groupby_fnc,        // left group-by function
-                   Tree right_groupby_fnc,       // right group-by function
-                   Tree map_fnc,                 // map function
-                   Tree combine_fnc,             // combine function
-                   Tree reduce_fnc,              // reduce function
-                   DataSet X,                    // left data set
-                   DataSet Y,                    // right data set
-                   int num_reducers,             // number of reducers
-                   int n, int m,                 // dimensions of the reducer grid
-                   String stop_counter )         // optional counter used in repeat operation
-              throws Exception {
-        String newpath = new_path(conf);
-        conf.set("mrql.join.key.left",left_join_key_fnc.toString());
-        conf.set("mrql.join.key.right",right_join_key_fnc.toString());
-        conf.set("mrql.groupby.left",left_groupby_fnc.toString());
-        conf.set("mrql.groupby.right",right_groupby_fnc.toString());
-        conf.setInt("mrql.m",m);
-        conf.setInt("mrql.n",n);
-        conf.set("mrql.mapper",map_fnc.toString());
-        conf.set("mrql.combiner",combine_fnc.toString());
-        conf.set("mrql.reducer",reduce_fnc.toString());
-        conf.set("mrql.counter",stop_counter);
-        Job job = new Job(conf,newpath);
-        distribute_compiled_arguments(job.getConfiguration());
-        job.setMapOutputKeyClass(GroupByJoinKey.class);
-        job.setJarByClass(GroupByJoinPlan.class);
-        job.setOutputKeyClass(MRContainer.class);
-        job.setOutputValueClass(MRContainer.class);
-        job.setPartitionerClass(GroupByJoinPartitioner.class);
-        job.setSortComparatorClass(GroupByJoinSortComparator.class);
-        job.setGroupingComparatorClass(GroupByJoinGroupingComparator.class);
-        job.setOutputFormatClass(SequenceFileOutputFormat.class);
-        FileOutputFormat.setOutputPath(job,new Path(newpath));
-        for (DataSource p: X.source)
-            MultipleInputs.addInputPath(job,new Path(p.path),(Class<? extends MapReduceMRQLFileInputFormat>)p.inputFormat,MapperLeft.class);
-        for (DataSource p: Y.source)
-            MultipleInputs.addInputPath(job,new Path(p.path),(Class<? extends MapReduceMRQLFileInputFormat>)p.inputFormat,MapperRight.class);
-        job.setReducerClass(JoinReducer.class);
-        if (num_reducers > 0)
-            job.setNumReduceTasks(num_reducers);
-        job.waitForCompletion(true);
-        long c = (stop_counter.equals("-")) ? 0
-                 : job.getCounters().findCounter("mrql",stop_counter).getValue();
-        DataSource s = new BinaryDataSource(newpath,conf);
-        s.to_be_merged = false;
-        return new DataSet(s,c,MapReducePlan.outputRecords(job));
-    }
-}