You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mrql.apache.org by fe...@apache.org on 2014/03/13 15:24:38 UTC
[13/26] MRQL-32: Refactoring directory structure for Eclipse
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/src/main/java/BSP/BSPGeneratorInputFormat.java
----------------------------------------------------------------------
diff --git a/src/main/java/BSP/BSPGeneratorInputFormat.java b/src/main/java/BSP/BSPGeneratorInputFormat.java
deleted file mode 100644
index bdee64e..0000000
--- a/src/main/java/BSP/BSPGeneratorInputFormat.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.mrql;
-
-import org.apache.mrql.gen.Trees;
-import java.io.*;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hama.bsp.*;
-
-
-/** the FileInputFormat for data generators: it creates HDFS files, where each file contains
- * an (offset,size) pair that generates the range of values [offset,offset+size] */
-final public class BSPGeneratorInputFormat extends BSPMRQLFileInputFormat {
- public static class GeneratorRecordReader implements RecordReader<MRContainer,MRContainer> {
- final long offset;
- final long size;
- final int source_number;
- final MRData source_num_data;
- long index;
- SequenceFile.Reader reader;
-
- public GeneratorRecordReader ( FileSplit split,
- int source_number,
- BSPJob job ) throws IOException {
- Configuration conf = BSPPlan.getConfiguration(job);
- Path path = split.getPath();
- FileSystem fs = path.getFileSystem(conf);
- reader = new SequenceFile.Reader(path.getFileSystem(conf),path,conf);
- MRContainer key = new MRContainer();
- MRContainer value = new MRContainer();
- reader.next(key,value);
- offset = ((MR_long)((Tuple)(value.data())).first()).get();
- size = ((MR_long)((Tuple)(value.data())).second()).get();
- this.source_number = source_number;
- source_num_data = new MR_int(source_number);
- index = -1;
- }
-
- public MRContainer createKey () {
- return new MRContainer(null);
- }
-
- public MRContainer createValue () {
- return new MRContainer(null);
- }
-
- public boolean next ( MRContainer key, MRContainer value ) throws IOException {
- index++;
- value.set(new Tuple(source_num_data,new MR_long(offset+index)));
- key.set(new MR_long(index));
- return index < size;
- }
-
- public long getPos () throws IOException { return index; }
-
- public void close () throws IOException { reader.close(); }
-
- public float getProgress () throws IOException {
- return index / (float)size;
- }
-
- public void initialize ( InputSplit split, TaskAttemptContext context ) throws IOException { }
- }
-
- public RecordReader<MRContainer,MRContainer>
- getRecordReader ( InputSplit split, BSPJob job ) throws IOException {
- Configuration conf = BSPPlan.getConfiguration(job);
- String path = ((FileSplit)split).getPath().toString();
- GeneratorDataSource ds = (GeneratorDataSource)DataSource.get(path,conf);
- return new GeneratorRecordReader((FileSplit)split,ds.source_num,job);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/src/main/java/BSP/BSPMRQLFileInputFormat.java
----------------------------------------------------------------------
diff --git a/src/main/java/BSP/BSPMRQLFileInputFormat.java b/src/main/java/BSP/BSPMRQLFileInputFormat.java
deleted file mode 100644
index 71518cd..0000000
--- a/src/main/java/BSP/BSPMRQLFileInputFormat.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.mrql;
-
-import java.io.*;
-import java.util.Iterator;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.io.*;
-import org.apache.hama.bsp.*;
-import org.apache.hama.HamaConfiguration;
-
-
-/** A superclass for all MRQL FileInputFormats */
-abstract public class BSPMRQLFileInputFormat extends FileInputFormat<MRContainer,MRContainer> implements MRQLFileInputFormat {
- public BSPMRQLFileInputFormat () {}
-
- /** record reader for map-reduce */
- abstract public RecordReader<MRContainer,MRContainer>
- getRecordReader ( InputSplit split, BSPJob job ) throws IOException;
-
- /** materialize the input file into a memory Bag */
- public Bag materialize ( final Path file ) throws IOException {
- final BSPJob job = new BSPJob((HamaConfiguration)Plan.conf,MRQLFileInputFormat.class);
- job.setInputPath(file);
- final InputSplit[] splits = getSplits(job,1);
- final RecordReader<MRContainer,MRContainer> rd = getRecordReader(splits[0],job);
- return new Bag(new BagIterator () {
- RecordReader<MRContainer,MRContainer> reader = rd;
- MRContainer key = reader.createKey();
- MRContainer value = reader.createKey();
- int i = 0;
- public boolean hasNext () {
- try {
- if (reader.next(key,value))
- return true;
- do {
- if (++i >= splits.length)
- return false;
- reader.close();
- reader = getRecordReader(splits[i],job);
- } while (!reader.next(key,value));
- return true;
- } catch (IOException e) {
- throw new Error("Cannot collect values from an intermediate result");
- }
- }
- public MRData next () {
- return value.data();
- }
- });
- }
-
- /** materialize the entire dataset into a Bag
- * @param x the DataSet in HDFS to collect values from
- * @param strip true if you want to stripout the source id (used in BSP sources)
- * @return the Bag that contains the collected values
- */
- public final Bag collect ( final DataSet x, boolean strip ) throws Exception {
- Bag res = new Bag();
- for ( DataSource s: x.source )
- if (s.to_be_merged)
- res = res.union(Plan.merge(s));
- else {
- Path path = new Path(s.path);
- final FileSystem fs = path.getFileSystem(Plan.conf);
- final FileStatus[] ds
- = fs.listStatus(path,
- new PathFilter () {
- public boolean accept ( Path path ) {
- return !path.getName().startsWith("_");
- }
- });
- Bag b = new Bag();
- for ( FileStatus st: ds )
- b = b.union(s.inputFormat.newInstance().materialize(st.getPath()));
- if (strip) {
- // remove source_num
- final Iterator<MRData> iter = b.iterator();
- b = new Bag(new BagIterator() {
- public boolean hasNext () {
- return iter.hasNext();
- }
- public MRData next () {
- return ((Tuple)iter.next()).get(1);
- }
- });
- };
- res = res.union(b);
- };
- return res;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/src/main/java/BSP/BSPParsedInputFormat.java
----------------------------------------------------------------------
diff --git a/src/main/java/BSP/BSPParsedInputFormat.java b/src/main/java/BSP/BSPParsedInputFormat.java
deleted file mode 100644
index 7da9a2e..0000000
--- a/src/main/java/BSP/BSPParsedInputFormat.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.mrql;
-
-import org.apache.mrql.gen.Trees;
-import java.io.*;
-import java.util.Iterator;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hama.bsp.*;
-
-
-/** A FileInputFormat for text files (CVS, XML, JSON, ...) */
-final public class BSPParsedInputFormat extends BSPMRQLFileInputFormat {
- public static class ParsedRecordReader implements RecordReader<MRContainer,MRContainer> {
- final FSDataInputStream fsin;
- final long start;
- final long end;
- final int source_number;
- final MRData source_num_data;
- Iterator<MRData> result;
- Parser parser;
-
- public ParsedRecordReader ( FileSplit split,
- BSPJob job,
- Class<? extends Parser> parser_class,
- int source_number,
- Trees args ) throws IOException {
- Configuration conf = BSPPlan.getConfiguration(job);
- start = split.getStart();
- end = start + split.getLength();
- Path file = split.getPath();
- FileSystem fs = file.getFileSystem(conf);
- fsin = fs.open(split.getPath());
- try {
- parser = parser_class.newInstance();
- } catch (Exception ex) {
- throw new Error("Unrecognized parser:"+parser_class);
- };
- this.source_number = source_number;
- source_num_data = new MR_int(source_number);
- parser.initialize(args);
- parser.open(fsin,start,end);
- result = null;
- }
-
- public MRContainer createKey () {
- return new MRContainer();
- }
-
- public MRContainer createValue () {
- return new MRContainer();
- }
-
- public synchronized boolean next ( MRContainer key, MRContainer value ) throws IOException {
- while (result == null || !result.hasNext()) {
- String s = parser.slice();
- if (s == null)
- return false;
- result = parser.parse(s).iterator();
- };
- value.set(new Tuple(source_num_data,(MRData)result.next()));
- key.set(new MR_long(fsin.getPos()));
- return true;
- }
-
- public synchronized long getPos () throws IOException { return fsin.getPos(); }
-
- public synchronized void close () throws IOException { fsin.close(); }
-
- public float getProgress () throws IOException {
- if (end == start)
- return 0.0f;
- else return Math.min(1.0f, (getPos() - start) / (float)(end - start));
- }
- }
-
- public RecordReader<MRContainer,MRContainer>
- getRecordReader ( InputSplit split,
- BSPJob job ) throws IOException {
- Configuration conf = BSPPlan.getConfiguration(job);
- String path = ((FileSplit)split).getPath().toString();
- ParsedDataSource ds = (ParsedDataSource)DataSource.get(path,conf);
- return new ParsedRecordReader((FileSplit)split,job,ds.parser,ds.source_num,(Trees)ds.args);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/src/main/java/BSP/BSPPlan.java
----------------------------------------------------------------------
diff --git a/src/main/java/BSP/BSPPlan.java b/src/main/java/BSP/BSPPlan.java
deleted file mode 100644
index bca8ab9..0000000
--- a/src/main/java/BSP/BSPPlan.java
+++ /dev/null
@@ -1,507 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.mrql;
-
-import org.apache.mrql.gen.*;
-import java.io.*;
-import java.util.Arrays;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.io.*;
-import org.apache.hama.bsp.*;
-import org.apache.hama.bsp.sync.SyncException;
-import org.apache.hama.HamaConfiguration;
-import org.apache.hadoop.conf.Configuration;
-
-
-/** Evaluate a BSP plan using Hama */
-final public class BSPPlan extends Plan {
-
- final static Configuration getConfiguration ( BSPJob job ) {
- return job.getConf(); // use job.getConfiguration() for Hama 0.6.0
- }
-
- /** The BSP evaluator */
- final static class BSPop extends BSP<MRContainer,MRContainer,MRContainer,MRContainer,MRContainer> {
- final static MRContainer null_key = new MRContainer(new MR_byte(0));
- // special message for sub-sync()
- final static MRData more_to_come = new MR_sync();
- final static MRData more_supersteps = new MR_more_bsp_steps();
-
- private int source_num;
- private Function superstep_fnc; // superstep function
- private MRData state; // BSP state
- private boolean orderp; // will output be ordered?
- private MRData source; // BSP input
- private Function acc_fnc; // aggregator
- private MRData acc_result; // aggregation result
- private static String[] all_peer_names; // all BSP peer names
- private static BSPPeer<MRContainer,MRContainer,MRContainer,MRContainer,MRContainer>[] all_peers; // all BSP peers
- // a master peer that coordinates and collects results of partial aggregations
- private String masterTask;
- // buffer for received messages -- regularly in a vector, but can be spilled in a local file
- Bag msg_cache;
- // the cache that holds all local data in memory
- Tuple local_cache;
-
- private static BSPPeer<MRContainer,MRContainer,MRContainer,MRContainer,MRContainer> getPeer ( String name ) {
- for ( int i = 0; i < all_peer_names.length; i++ )
- if (all_peer_names[i].equals(name))
- return all_peers[i];
- throw new Error("Unknown peer: "+name);
- }
-
- private static void setPeer ( BSPPeer<MRContainer,MRContainer,MRContainer,MRContainer,MRContainer> peer ) {
- String name = peer.getPeerName();
- for ( int i = 0; i < all_peer_names.length; i++ )
- if (all_peer_names[i].equals(name))
- all_peers[i] = peer;
- }
-
- /** shuffle values to BSP peers based on uniform hashing on key */
- private static String shuffle ( MRData key ) {
- return all_peer_names[Math.abs(key.hashCode()) % all_peer_names.length];
- }
-
- /** to exit a BSP loop, all peers must agree to exit (this is used in BSPTranslate.bspSimplify) */
- public static MR_bool synchronize ( MR_string peerName, MR_bool mr_exit ) {
- return synchronize(getPeer(peerName.get()),mr_exit);
- }
-
- /** to exit a BSP loop, all peers must agree to exit (this is used in BSPTranslate.bspSimplify) */
- public static MR_bool synchronize ( BSPPeer<MRContainer,MRContainer,MRContainer,MRContainer,MRContainer> peer, MR_bool mr_exit ) {
- if (!Config.hadoop_mode)
- return mr_exit;
- // shortcut: if we know for sure that all peers want to exit/continue, we don't have to poll
- if (mr_exit == SystemFunctions.bsp_true_value // must be ==, not equals
- || mr_exit == SystemFunctions.bsp_false_value)
- return mr_exit;
- try {
- // this case is only used for checking the exit condition of repeat/closure
- boolean exit = mr_exit.get();
- if (all_peer_names.length <= 1)
- return (exit) ? SystemFunctions.bsp_true_value : SystemFunctions.bsp_false_value;
- if (!exit)
- // this peer is not ready to exit, so no peer should exit
- for ( String p: peer.getAllPeerNames() )
- peer.send(p,new MRContainer(more_supersteps));
- peer.sync();
- // now exit is true if no peer sent a "more_supersteps" message
- exit = peer.getNumCurrentMessages() == 0;
- peer.clear();
- return (exit) ? SystemFunctions.bsp_true_value : SystemFunctions.bsp_false_value;
- } catch (Exception ex) {
- throw new Error(ex);
- }
- }
-
- /** collect a bag from all peers by distributing the local copy s */
- public static Bag distribute ( MR_string peerName, Bag s ) {
- BSPPeer<MRContainer,MRContainer,MRContainer,MRContainer,MRContainer> peer = getPeer(peerName.get());
- if (!Config.hadoop_mode)
- return s;
- try {
- for ( MRData e: s )
- for ( String p: all_peer_names )
- peer.send(p,new MRContainer(e));
- peer.sync();
- MRContainer msg;
- Bag res = new Bag();
- while ((msg = peer.getCurrentMessage()) != null)
- if (!res.contains(msg.data()))
- res.add(msg.data());
- peer.clear();
- return res;
- } catch (Exception ex) {
- throw new Error(ex);
- }
- }
-
- private void readLocalSources ( BSPPeer<MRContainer,MRContainer,MRContainer,MRContainer,MRContainer> peer )
- throws IOException {
- MRContainer key = new MRContainer();
- MRContainer value = new MRContainer();
- while (peer.readNext(key,value)) {
- Tuple p = (Tuple)(value.data());
- ((Bag)local_cache.get(((MR_int)p.first()).get())).add(p.second());
- }
- }
-
- private void writeLocalResult ( Bag result,
- BSPPeer<MRContainer,MRContainer,MRContainer,MRContainer,MRContainer> peer )
- throws IOException {
- MRContainer key = new MRContainer();
- MRContainer value = new MRContainer();
- for ( MRData v: result )
- if (orderp) { // prepare for sorting
- Tuple t = (Tuple)v;
- key.set(t.get(1));
- value.set(t.get(0));
- peer.write(key,value);
- } else {
- value.set(v);
- peer.write(null_key,value);
- }
- }
-
- /** receive messages from other peers */
- private void receive_messages ( final BSPPeer<MRContainer,MRContainer,MRContainer,MRContainer,MRContainer> peer )
- throws IOException, SyncException, InterruptedException {
- if (Config.bsp_msg_size <= 0) { // no buffering
- msg_cache = new Bag(new BagIterator() {
- MRContainer msg;
- public boolean hasNext () {
- try {
- return (msg = peer.getCurrentMessage()) != null;
- } catch (Exception ex) {
- throw new Error(ex);
- }
- }
- public MRData next () {
- return msg.data();
- }
- });
- } else {
- boolean expect_more = false; // are we expecting more incoming messages?
- do {
- // just in case this peer did a regular-sync() before the others did a sub-sync()
- expect_more = false;
- MRContainer msg;
- // cache the received messages
- while ((msg = peer.getCurrentMessage()) != null)
- // if at least one peer sends a more_to_come message, then expect_more
- if (msg.data().equals(more_to_come))
- expect_more = true;
- else msg_cache.add(msg.data());
- if (expect_more)
- peer.sync(); // sub-sync()
- } while (expect_more);
- }
- }
-
- /** send the messages produced by a superstep to peers and then receive the replies */
- private void send_messages ( Bag msgs,
- BSPPeer<MRContainer,MRContainer,MRContainer,MRContainer,MRContainer> peer )
- throws IOException, SyncException, InterruptedException {
- int size = 0;
- if (Config.bsp_msg_size > 0)
- msg_cache.clear();
- for ( MRData m: msgs ) {
- Tuple t = (Tuple)m;
- // if there are too many messages to send, then sub-sync()
- if ( Config.bsp_msg_size > 0 && size++ > Config.bsp_msg_size ) {
- // tell all peers that there is more to come after sync
- for ( String p: all_peer_names )
- if (!peer.getPeerName().equals(p))
- peer.send(p,new MRContainer(more_to_come));
- peer.sync(); // sub-sync()
- size = 0;
- MRContainer msg;
- // cache the received messages
- while ((msg = peer.getCurrentMessage()) != null)
- if (!msg.data().equals(more_to_come))
- msg_cache.add(msg.data());
- };
- // suffle messages based on key; needs new MRContainer object
- peer.send(shuffle(t.get(0)),new MRContainer(t.get(1)));
- };
- peer.sync(); // regular-sync()
- receive_messages(peer);
- }
-
- @Override
- public void bsp ( BSPPeer<MRContainer,MRContainer,MRContainer,MRContainer,MRContainer> peer )
- throws IOException, SyncException, InterruptedException {
- final Tuple stepin = new Tuple(4);
- stepin.set(3,new MR_string(peer.getPeerName()));
- Tuple result;
- boolean skip = false;
- String tabs = "";
- int step = 0;
- boolean exit;
- if (Evaluator.evaluator == null)
- try {
- Evaluator.evaluator = (Evaluator)Class.forName("org.apache.mrql.BSPEvaluator").newInstance();
- } catch (Exception ex) {
- throw new Error(ex);
- };
- readLocalSources(peer);
- setPeer(peer);
- do {
- if (!skip)
- step++;
- if (!skip && Config.trace_execution) {
- tabs = Interpreter.tabs(Interpreter.tab_count);
- System.err.println(tabs+" Superstep "+step+" ["+peer.getPeerName()+"]:");
- System.err.println(tabs+" messages ["+peer.getPeerName()+"]: "+msg_cache);
- System.err.println(tabs+" state ["+peer.getPeerName()+"]: "+state);
- for ( int i = 0; i < local_cache.size(); i++)
- if (local_cache.get(i) instanceof Bag && ((Bag)local_cache.get(i)).size() > 0)
- System.out.println(tabs+" cache ["+peer.getPeerName()+"] "+i+": "+local_cache.get(i));
- };
- stepin.set(0,local_cache);
- stepin.set(1,msg_cache);
- stepin.set(2,state);
- // evaluate one superstep
- result = (Tuple)superstep_fnc.eval(stepin);
- Bag msgs = (Bag)result.get(0);
- exit = ((MR_bool)result.get(2)).get();
- state = result.get(1);
- // shortcuts: if we know for sure that all peers want to exit/continue
- if (result.get(2) == SystemFunctions.bsp_true_value) { // must be ==, not equals
- peer.sync();
- if (Config.trace_execution)
- System.err.println(tabs+" result ["+peer.getPeerName()+"]: "+result);
- break;
- };
- if (result.get(2) == SystemFunctions.bsp_false_value) {
- if (Config.trace_execution)
- System.err.println(tabs+" result ["+peer.getPeerName()+"]: "+result);
- send_messages(msgs,peer);
- skip = false;
- continue;
- };
- // shortcut: skip is true when NONE of the peers sent any messages
- skip = (msgs == SystemFunctions.bsp_empty_bag); // must be ==, not equals
- if (skip)
- continue;
- if (Config.trace_execution)
- System.err.println(tabs+" result ["+peer.getPeerName()+"]: "+result);
- exit = synchronize(peer,(MR_bool)result.get(2)).get();
- send_messages(msgs,peer);
- } while (!exit);
- if (acc_result == null) {
- // the BSP result is a bag that needs to be dumped to the HDFS
- writeLocalResult((Bag)(local_cache.get(source_num)),peer);
- // if there more results, dump them to HDFS
- final MR_long key = new MR_long(0);
- final MRContainer key_container = new MRContainer(key);
- final MRContainer data_container = new MRContainer(new MR_int(0));
- int loc = 0;
- while ( loc < all_peer_names.length && peer.getPeerName().equals(all_peer_names[loc]) )
- loc++;
- Configuration conf = peer.getConfiguration();
- String[] out_paths = conf.get("mrql.output.paths").split(",");
- for ( int i = 1; i < out_paths.length; i++ ) {
- String[] s = out_paths[i].split(":");
- int out_num = Integer.parseInt(s[0]);
- Path path = new Path(s[1]+"/peer"+loc);
- FileSystem fs = path.getFileSystem(conf);
- SequenceFile.Writer writer
- = new SequenceFile.Writer(fs,conf,path,
- MRContainer.class,MRContainer.class);
- long count = 0;
- for ( MRData e: (Bag)(local_cache.get(out_num)) ) {
- key.set(count++);
- data_container.set(e);
- writer.append(key_container,data_container);
- };
- writer.close();
- };
- } else {
- // the BSP result is an aggregation:
- // send the partial results to the master peer
- peer.send(masterTask,new MRContainer(local_cache.get(source_num)));
- peer.sync();
- if (peer.getPeerName().equals(masterTask)) {
- // only the master peer collects the partial aggregations
- MRContainer msg;
- while ((msg = peer.getCurrentMessage()) != null)
- acc_result = acc_fnc.eval(new Tuple(acc_result,msg.data()));
- // write the final aggregation result
- peer.write(null_key,new MRContainer(acc_result));
- }
- }
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public void setup ( BSPPeer<MRContainer,MRContainer,MRContainer,MRContainer,MRContainer> peer ) {
- try {
- super.setup(peer);
- Configuration conf = peer.getConfiguration();
- Config.read(conf);
- if (Plan.conf == null)
- Plan.conf = conf;
- all_peer_names = peer.getAllPeerNames();
- all_peers = new BSPPeerImpl[all_peer_names.length];
- Arrays.sort(all_peer_names); // is this necessary?
- source_num = conf.getInt("mrql.output.tag",0);
- Tree code = Tree.parse(conf.get("mrql.superstep"));
- superstep_fnc = functional_argument(conf,code);
- code = Tree.parse(conf.get("mrql.initial.state"));
- state = Interpreter.evalE(code);
- if (conf.get("mrql.zero") != null && !conf.get("mrql.zero").equals("")) {
- code = Tree.parse(conf.get("mrql.zero"));
- acc_result = Interpreter.evalE(code);
- code = Tree.parse(conf.get("mrql.accumulator"));
- acc_fnc = functional_argument(conf,code);
- } else acc_result = null;
- orderp = conf.getBoolean("mrql.orderp",false);
- masterTask = all_peer_names[peer.getNumPeers()/2];
- msg_cache = new Bag(1000);
- local_cache = new Tuple(max_input_files);
- for ( int i = 0; i < max_input_files; i++ )
- local_cache.set(i,new Bag());
- } catch (Exception e) {
- e.printStackTrace();
- throw new Error("Cannot setup the Hama BSP job: "+e);
- }
- }
-
- @Override
- public void cleanup ( BSPPeer<MRContainer,MRContainer,MRContainer,MRContainer,MRContainer> peer ) throws IOException {
- if (!Config.local_mode)
- clean();
- local_cache = null;
- super.cleanup(peer);
- }
- }
-
- /** set Hama's min split size and number of BSP tasks (doesn't work with Hama 0.6.0) */
- public static void setupSplits ( BSPJob job, DataSet ds ) throws IOException {
- long[] sizes = new long[ds.source.size()];
- if (sizes.length > Config.nodes)
- throw new Error("Cannot distribute "+sizes.length+" files over "+Config.nodes+" BSP tasks");
- for ( int i = 0; i < sizes.length; i++ )
- sizes[i] = ds.source.get(i).size(Plan.conf);
- long total_size = 0;
- for ( long size: sizes )
- total_size += size;
- long split_size = Math.max(total_size/Config.nodes,100000);
- int tasks = 0;
- do { // adjust split_size
- tasks = 0;
- for ( long size: sizes )
- tasks += (int)Math.ceil(size/(double)split_size);
- if (tasks > Config.nodes)
- split_size = (long)Math.ceil((double)split_size*1.01);
- } while (tasks > Config.nodes);
- job.setNumBspTask(tasks);
- System.err.println("*** Using "+tasks+" BSP tasks (out of a max "+Config.nodes+")."
- +" Each task will handle about "+Math.min(total_size/Config.nodes,split_size)
- +" bytes of input data.");
- job.set("bsp.min.split.size",Long.toString(split_size));
- }
-
- /** Evaluate a BSP operation that returns a DataSet
- * @param source_nums output tags
- * @param superstep the superstep function
- * @param init_state initial state
- * @param orderp do we need to order the result?
- * @param source input dataset
- * @return a new data source that contains the result
- */
- public final static MRData BSP ( int[] source_nums, // output tags
- Tree superstep, // superstep function
- Tree init_state, // initial state
- boolean orderp, // do we need to order the result?
- DataSet source // input dataset
- ) throws Exception {
- String[] newpaths = new String[source_nums.length];
- newpaths[0] = new_path(conf);
- conf.set("mrql.output.paths",source_nums[0]+":"+newpaths[0]);
- for ( int i = 1; i < source_nums.length; i++ ) {
- newpaths[i] = new_path(conf);
- Path path = new Path(newpaths[1]);
- FileSystem fs = path.getFileSystem(conf);
- fs.mkdirs(path);
- conf.set("mrql.output.paths",conf.get("mrql.output.paths")+","+source_nums[i]+":"+newpaths[i]);
- };
- conf.set("mrql.superstep",superstep.toString());
- conf.set("mrql.initial.state",init_state.toString());
- conf.set("mrql.zero","");
- conf.setInt("mrql.output.tag",source_nums[0]);
- conf.setBoolean("mrql.orderp",orderp);
- BSPJob job = new BSPJob((HamaConfiguration)conf,BSPop.class);
- setupSplits(job,source);
- job.setJobName(newpaths[0]);
- distribute_compiled_arguments(getConfiguration(job));
- job.setBspClass(BSPop.class);
- Path outpath = new Path(newpaths[0]);
- job.setOutputPath(outpath);
- job.setOutputKeyClass(MRContainer.class);
- job.setOutputValueClass(MRContainer.class);
- job.setOutputFormat(SequenceFileOutputFormat.class);
- job.setInputFormat(MultipleBSPInput.class);
- FileInputFormat.setInputPaths(job,source.merge());
- job.waitForCompletion(true);
- if (source_nums.length == 1) {
- BinaryDataSource ds = new BinaryDataSource(source_nums[0],newpaths[0],conf);
- ds.to_be_merged = orderp;
- return new MR_dataset(new DataSet(ds,0,3));
- } else {
- MRData[] s = new MRData[source_nums.length];
- for ( int i = 0; i < source_nums.length; i++ ) {
- BinaryDataSource ds = new BinaryDataSource(source_nums[i],newpaths[i],conf);
- ds.to_be_merged = orderp;
- s[i] = new MR_dataset(new DataSet(ds,0,3));
- };
- return new Tuple(s);
- }
- }
-
- /** Evaluate a BSP operation that aggregates the results
- * @param source_num output tag
- * @param superstep the superstep function
- * @param init_state initial state
- * @param acc_fnc accumulator function
- * @param zero zero value for the accumulator
- * @param source input dataset
- * @return the aggregation result
- */
- public final static MRData BSPaggregate ( int source_num, // output tag
- Tree superstep, // superstep function
- Tree init_state, // initial state
- Tree acc_fnc, // accumulator function
- Tree zero, // zero value for the accumulator
- DataSet source // input dataset
- ) throws Exception {
- String newpath = new_path(conf);
- conf.set("mrql.superstep",superstep.toString());
- conf.set("mrql.initial.state",init_state.toString());
- conf.set("mrql.accumulator",acc_fnc.toString());
- conf.set("mrql.zero",zero.toString());
- conf.setInt("mrql.output.tag",source_num);
- conf.setBoolean("mrql.orderp",false);
- BSPJob job = new BSPJob((HamaConfiguration)conf,BSPop.class);
- setupSplits(job,source);
- job.setJobName(newpath);
- distribute_compiled_arguments(getConfiguration(job));
- job.setBspClass(BSPop.class);
- Path outpath = new Path(newpath);
- job.setOutputPath(outpath);
- job.setOutputKeyClass(MRContainer.class);
- job.setOutputValueClass(MRContainer.class);
- job.setOutputFormat(SequenceFileOutputFormat.class);
- job.setInputFormat(MultipleBSPInput.class);
- FileInputFormat.setInputPaths(job,source.merge());
- job.waitForCompletion(true);
- FileSystem fs = outpath.getFileSystem(conf);
- FileStatus[] files = fs.listStatus(outpath);
- for ( int i = 0; i < files.length; i++ ) {
- SequenceFile.Reader sreader = new SequenceFile.Reader(fs,files[i].getPath(),conf);
- MRContainer key = new MRContainer();
- MRContainer value = new MRContainer();
- sreader.next(key,value);
- sreader.close();
- if (value.data() != null)
- return value.data();
- };
- return null;
- }
- }
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/src/main/java/BSP/MultipleBSPInput.java
----------------------------------------------------------------------
diff --git a/src/main/java/BSP/MultipleBSPInput.java b/src/main/java/BSP/MultipleBSPInput.java
deleted file mode 100644
index aa43423..0000000
--- a/src/main/java/BSP/MultipleBSPInput.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.mrql;
-
-import org.apache.mrql.gen.Trees;
-import java.io.*;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hama.bsp.*;
-
-
-/** A FileInputFormat for multiple files, where each file may be associated with
- * a different FileInputFormat */
-final public class MultipleBSPInput extends BSPMRQLFileInputFormat {
- public RecordReader<MRContainer,MRContainer>
- getRecordReader ( InputSplit split, BSPJob job ) throws IOException {
- if (Evaluator.evaluator == null)
- try {
- Evaluator.evaluator = (Evaluator)Class.forName("org.apache.mrql.BSPEvaluator").newInstance();
- } catch (Exception ex) {
- throw new Error(ex);
- };
- String path = ((FileSplit)split).getPath().toString();
- Configuration conf = BSPPlan.getConfiguration(job);
- DataSource ds = DataSource.get(path,conf);
- Plan.conf = conf;
- if (ds instanceof ParsedDataSource)
- return new BSPParsedInputFormat.ParsedRecordReader((FileSplit)split,
- job,
- ((ParsedDataSource)ds).parser,
- ds.source_num,
- (Trees)((ParsedDataSource)ds).args);
- else if (ds instanceof BinaryDataSource)
- return new BSPBinaryInputFormat.BinaryInputRecordReader((FileSplit)split,job,ds.source_num);
- else if (ds instanceof GeneratorDataSource)
- return new BSPGeneratorInputFormat.GeneratorRecordReader((FileSplit)split,ds.source_num,job);
- else throw new Error("Unknown data source: "+ds+" for path "+path);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/src/main/java/MapReduce/CrossProductOperation.java
----------------------------------------------------------------------
diff --git a/src/main/java/MapReduce/CrossProductOperation.java b/src/main/java/MapReduce/CrossProductOperation.java
deleted file mode 100644
index a9216cc..0000000
--- a/src/main/java/MapReduce/CrossProductOperation.java
+++ /dev/null
@@ -1,235 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.mrql;
-
-import org.apache.mrql.gen.*;
-import java.io.*;
-import java.net.URI;
-import java.util.List;
-import java.util.Vector;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.*;
-import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-import org.apache.hadoop.filecache.DistributedCache;
-
-
-/** The CrossProduct physical operation (similar to block-nested loop) */
-final public class CrossProductOperation extends MapReducePlan {
-
- /** The mapper for the CrossProduct operation */
- private final static class crossProductMapper extends Mapper<MRContainer,MRContainer,MRContainer,MRContainer> {
- private static String counter; // a Hadoop user-defined counter used in the repeat operation
- private static Function reduce_fnc; // the reduce function
- private static Function map_fnc; // the mapper function
- private static DataSet cached_dataset;
- private final static List<MRData> outer
- = new Vector<MRData>(Config.map_cache_size); // fix-size cache for the outer
- private static int index;
- private static MRContainer last_key;
- private static URI[] uris;
- private static Path[] local_paths;
- private static Function acc_fnc; // aggregator
- private static MRData result; // aggregation result
- private static Tuple pair = new Tuple(2);
- private static MRContainer container = new MRContainer(new MR_int(0));
-
- private void write ( MRContainer key, MRData value, Context context )
- throws IOException, InterruptedException {
- if (result != null) { // aggregation
- pair.set(0,result);
- pair.set(1,value);
- result = acc_fnc.eval(pair);
- } else if (counter.equals("-")) {
- container.set(value);
- context.write(key,container);
- } else { // increment the repetition counter if the repeat condition is true
- Tuple t = (Tuple)value;
- if (((MR_bool)t.second()).get())
- context.getCounter("mrql",counter).increment(1);
- container.set(t.first());
- context.write(key,container);
- }
- }
-
- @Override
- public void map ( MRContainer key, MRContainer value, Context context )
- throws IOException, InterruptedException {
- try {
- last_key = key;
- for ( MRData x: (Bag)map_fnc.eval(value.data()) )
- if (index++ == Config.map_cache_size) {
- for ( MRData y: cached_data(context.getConfiguration()) ) {
- pair.set(1,y);
- for ( MRData z: outer ) {
- pair.set(0,z);
- for ( MRData v: (Bag)reduce_fnc.eval(pair) )
- write(key,v,context);
- }
- };
- index = 0;
- outer.clear();
- } else outer.add(x);
- } catch (Exception e) {
- throw new Error("Cannot perform the crossProduct: "+e);
- }
- }
-
- protected Bag cached_data ( final Configuration conf ) {
- try {
- Bag res = new Bag();
- final FileSystem fs = FileSystem.getLocal(conf);
- for ( int i = 0; i < local_paths.length; i++ ) {
- // hadoop 0.20.2 distributed cache doesn't work in stand-alone
- final Path path = (conf.get("mapred.job.tracker").equals("local"))
- ? new Path(uris[i].toString())
- : local_paths[i];
- if (path.getName().endsWith(".jar"))
- continue;
- res = res.union(new Bag(new BagIterator () {
- final SequenceFile.Reader reader = new SequenceFile.Reader(fs,path,conf);
- final MRContainer key = new MRContainer(new MR_int(0));
- final MRContainer value = new MRContainer(new MR_int(0));
- public boolean hasNext () {
- try {
- boolean done = reader.next(key,value);
- if (!done)
- reader.close();
- return done;
- } catch (IOException e) {
- throw new Error("Cannot collect values from distributed cache");
- }
- }
- public MRData next () {
- return value.data();
- }
- }));
- };
- return res;
- } catch (Exception e) {
- throw new Error("Cannot setup the cross product: "+e);
- }
- }
-
- @Override
- protected void setup ( Context context ) throws IOException,InterruptedException {
- super.setup(context);
- try {
- Configuration conf = context.getConfiguration();
- Config.read(conf);
- if (Plan.conf == null)
- Plan.conf = conf;
- Tree code = Tree.parse(conf.get("mrql.reducer"));
- reduce_fnc = functional_argument(conf,code);
- code = Tree.parse(conf.get("mrql.mapper"));
- map_fnc = functional_argument(conf,code);
- if (conf.get("mrql.zero") != null) {
- code = Tree.parse(conf.get("mrql.zero"));
- result = Interpreter.evalE(code);
- code = Tree.parse(conf.get("mrql.accumulator"));
- acc_fnc = functional_argument(conf,code);
- } else result = null;
- counter = conf.get("mrql.counter");
- uris = DistributedCache.getCacheFiles(conf);
- local_paths = DistributedCache.getLocalCacheFiles(conf);
- index = 0;
- } catch (Exception e) {
- throw new Error("Cannot setup the crossProduct: "+e);
- }
- }
-
- @Override
- protected void cleanup ( Context context ) throws IOException,InterruptedException {
- if (index > 0)
- try {
- for ( MRData y: cached_data(context.getConfiguration()) ) {
- pair.set(1,y);
- for ( MRData z: outer ) {
- pair.set(0,z);
- for ( MRData v: (Bag)reduce_fnc.eval(pair) )
- write(last_key,v,context);
- }
- };
- } catch (Exception e) {
- throw new Error("Cannot cleanup the crossProduct: "+e);
- };
- index = 0;
- outer.clear();
- if (result != null) // emit the result of aggregation
- context.write(new MRContainer(new MR_int(0)),new MRContainer(result));
- super.cleanup(context);
- }
- }
-
- /** The CrossProduct physical operator (similar to block-nested loop)
- * @param mx left mapper
- * @param my right mapper
- * @param reduce_fnc reducer
- * @param acc_fnc optional accumulator function
- * @param zero optional the zero value for the accumulator
- * @param X the left source
- * @param Y the right source (stored in distributed cache)
- * @param stop_counter optional counter used in repeat operation
- * @return a new data source that contains the result
- */
- public final static DataSet crossProduct ( Tree mx, // left mapper
- Tree my, // right mapper
- Tree reduce_fnc, // reducer
- Tree acc_fnc, // optional accumulator function
- Tree zero, // optional the zero value for the accumulator
- DataSet X, // the left source
- DataSet Y, // the right source (stored in distributed cache)
- String stop_counter ) // optional counter used in repeat operation
- throws Exception {
- DataSet ds = MapOperation.cMap(my,null,null,Y,"-");
- String newpath = new_path(conf);
- conf.set("mrql.reducer",reduce_fnc.toString());
- conf.set("mrql.mapper",mx.toString());
- if (zero != null) {
- conf.set("mrql.accumulator",acc_fnc.toString());
- conf.set("mrql.zero",zero.toString());
- conf.set("mapred.min.split.size","268435456");
- } else conf.set("mrql.zero","");
- conf.set("mrql.counter",stop_counter);
- Job job = new Job(conf,newpath);
- distribute_compiled_arguments(job.getConfiguration());
- job.setJarByClass(MapReducePlan.class);
- job.setOutputKeyClass(MRContainer.class);
- job.setOutputValueClass(MRContainer.class);
- job.setOutputFormatClass(SequenceFileOutputFormat.class);
- PathFilter pf = new PathFilter () { public boolean accept ( Path path ) {
- return !path.getName().startsWith("_");
- } };
- for (DataSource p: ds.source) {
- Path path = new Path(p.path);
- for ( FileStatus s: path.getFileSystem(conf).listStatus(path,pf) )
- DistributedCache.addCacheFile(s.getPath().toUri(),job.getConfiguration());
- };
- for (DataSource p: X.source)
- MultipleInputs.addInputPath(job,new Path(p.path),(Class<? extends MapReduceMRQLFileInputFormat>)p.inputFormat,crossProductMapper.class);
- FileOutputFormat.setOutputPath(job,new Path(newpath));
- job.setNumReduceTasks(0);
- job.waitForCompletion(true);
- long c = (stop_counter.equals("-")) ? 0
- : job.getCounters().findCounter("mrql",stop_counter).getValue();
- return new DataSet(new BinaryDataSource(newpath,conf),c,outputRecords(job));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/src/main/java/MapReduce/GroupByJoinPlan.java
----------------------------------------------------------------------
diff --git a/src/main/java/MapReduce/GroupByJoinPlan.java b/src/main/java/MapReduce/GroupByJoinPlan.java
deleted file mode 100644
index 4f93e1e..0000000
--- a/src/main/java/MapReduce/GroupByJoinPlan.java
+++ /dev/null
@@ -1,464 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.mrql;
-
-import org.apache.mrql.gen.*;
-import java.io.*;
-import java.util.*;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.*;
-import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
-import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
-
-
-/**
- * A map-reduce job that captures a join with group-by. Similar to matrix multiplication.<br/>
- * It captures queries of the form:
- * <pre>
- * select r(kx,ky,c(z))
- * from x in X, y in Y, z = mp(x,y)
- * where jx(x) = jy(y)
- * group by (kx,ky): (gx(x),gy(y));
- * </pre>
- * where: mp: map function, r: reduce function, c: combine function,
- * jx: left join key function, jy: right join key function,
- * gx: left group-by function, gy: right group-by function.
- * <br/>
- * Example: matrix multiplication:
- * <pre>
- * select ( sum(z), i, j )
- * from (x,i,k) in X, (y,k,j) in Y, z = x*y
- * group by (i,j);
- * </pre>
- * It uses m*n partitions, so that n/m=|X|/|Y| and a hash table of size |X|/n*|Y|/m can fit in memory M.
- * That is, n = |X|/sqrt(M), m = |Y|/sqrt(M).
- * Each partition generates |X|/n*|Y|/m data. It replicates X n times and Y m times.
- * Uses a hash-table H of size |X|/n*|Y|/m
- * MapReduce pseudo-code:
- * <pre>
- * mapX ( x )
- * for i = 0,n-1
- * emit ( ((hash(gx(x)) % m)+m*i, jx(x), 1), (1,x) )
- *
- * mapY ( y )
- * for i = 0,m-1
- * emit ( ((hash(gy(y)) % n)*m+i, jy(y), 2), (2,y) )
- * </pre>
- * mapper output key: (partition,joinkey,tag), value: (tag,data) <br/>
- * Partitioner: over partition <br/>
- * GroupingComparator: over partition and joinkey <br/>
- * SortComparator: major partition, minor joinkey, sub-minor tag <br/>
- * <pre>
- * reduce ( (p,_,_), s )
- * if p != current_partition
- * flush()
- * current_partition = p
- * read x from s first and store it to xs
- * for each y from the rest of s
- * for each x in xs
- * H[(gx(x),gy(y))] = c( H[(gx(x),gy(y))], mp((x,y)) )
- * </pre>
- * where flush() is: for each ((kx,ky),v) in H: emit r((kx,ky),v)
- */
-final public class GroupByJoinPlan extends Plan {
-
- /** mapper output key: (partition,joinkey,tag) */
- private final static class GroupByJoinKey implements Writable {
- public int partition; // one of n*m
- public byte tag; // 1 or 2
- public MRData key;
-
- GroupByJoinKey () {}
- GroupByJoinKey ( int p, byte t, MRData k ) {
- partition = p;
- tag = t;
- key = k;
- }
-
- public void write ( DataOutput out ) throws IOException {
- out.writeByte(tag);
- WritableUtils.writeVInt(out,partition);
- key.write(out);
- }
-
- public void readFields ( DataInput in ) throws IOException {
- tag = in.readByte();
- partition = WritableUtils.readVInt(in);
- key = MRContainer.read(in);
- }
-
- public String toString () { return "["+partition+","+tag+","+key+"]"; }
- }
-
- /** partition based on key.partition only */
- private final static class GroupByJoinPartitioner extends Partitioner<GroupByJoinKey,MRContainer> {
- final public int getPartition ( GroupByJoinKey key, MRContainer value, int numPartitions ) {
- return key.partition % numPartitions;
- }
- }
-
- /** sorting with major order key.partition, minor key.key, minor key.tag */
- private final static class GroupByJoinSortComparator implements RawComparator<GroupByJoinKey> {
- int[] container_size;
-
- public GroupByJoinSortComparator () {
- container_size = new int[1];
- }
-
- final public int compare ( byte[] x, int xs, int xl, byte[] y, int ys, int yl ) {
- try {
- int c = WritableComparator.readVInt(x,xs+1)-WritableComparator.readVInt(y,ys+1);
- if (c != 0)
- return c;
- int tsize = 1+WritableUtils.decodeVIntSize(x[xs+1]);
- c = MRContainer.compare(x,xs+tsize,xl-tsize,y,ys+tsize,yl-tsize,container_size);
- if (c != 0)
- return c;
- return x[xs] - y[ys];
- } catch (IOException e) {
- throw new Error(e);
- }
- }
-
- final public int compare ( GroupByJoinKey x, GroupByJoinKey y ) {
- int c = x.partition - y.partition;
- if (c != 0)
- return c;
- c = x.key.compareTo(y.key);
- if (c != 0)
- return c;
- return x.tag - y.tag;
- }
- }
-
- /** grouping by key.partition and key.key */
- private final static class GroupByJoinGroupingComparator implements RawComparator<GroupByJoinKey> {
- int[] container_size;
-
- public GroupByJoinGroupingComparator() {
- container_size = new int[1];
- }
-
- final public int compare ( byte[] x, int xs, int xl, byte[] y, int ys, int yl ) {
- try {
- int c = WritableComparator.readVInt(x,xs+1)-WritableComparator.readVInt(y,ys+1);
- if (c != 0)
- return c;
- int tsize = 1+WritableUtils.decodeVIntSize(x[xs+1]);
- return MRContainer.compare(x,xs+tsize,xl-tsize,y,ys+tsize,yl-tsize,container_size);
- } catch (IOException e) {
- throw new Error(e);
- }
- }
-
- final public int compare ( GroupByJoinKey x, GroupByJoinKey y ) {
- int c = x.partition - y.partition;
- return (c != 0) ? c : x.key.compareTo(y.key);
- }
- }
-
- /** the left GroupByJoin mapper */
- private final static class MapperLeft extends Mapper<MRContainer,MRContainer,GroupByJoinKey,MRContainer> {
- private static int n, m;
- private static Function left_join_key_fnc;
- private static Function left_groupby_fnc;
- private static GroupByJoinKey ckey = new GroupByJoinKey(0,(byte)1,new MR_int(0));
- private static Tuple tvalue = (new Tuple(2)).set(0,new MR_byte(1));
- private static MRContainer cvalue = new MRContainer(tvalue);
-
- @Override
- public void map ( MRContainer key, MRContainer value, Context context )
- throws IOException, InterruptedException {
- MRData data = value.data();
- for ( int i = 0; i < n; i++ ) {
- ckey.partition = (left_groupby_fnc.eval(data).hashCode() % m)+m*i;
- ckey.key = left_join_key_fnc.eval(data);
- tvalue.set(1,data);
- context.write(ckey,cvalue);
- }
- }
-
- @Override
- protected void setup ( Context context ) throws IOException,InterruptedException {
- super.setup(context);
- try {
- Configuration conf = context.getConfiguration();
- Config.read(conf);
- if (Plan.conf == null)
- Plan.conf = conf;
- Tree code = Tree.parse(conf.get("mrql.join.key.left"));
- left_join_key_fnc = functional_argument(conf,code);
- code = Tree.parse(conf.get("mrql.groupby.left"));
- left_groupby_fnc = functional_argument(conf,code);
- m = conf.getInt("mrql.m",1);
- n = conf.getInt("mrql.n",1);
- } catch (Exception e) {
- throw new Error("Cannot retrieve the left mapper plan");
- }
- }
- }
-
- /** the right GroupByJoin mapper */
- private final static class MapperRight extends Mapper<MRContainer,MRContainer,GroupByJoinKey,MRContainer> {
- private static int n, m;
- private static Function right_join_key_fnc;
- private static Function right_groupby_fnc;
- private static GroupByJoinKey ckey = new GroupByJoinKey(0,(byte)2,new MR_int(0));
- private static Tuple tvalue = (new Tuple(2)).set(0,new MR_byte(2));
- private static MRContainer cvalue = new MRContainer(tvalue);
-
- @Override
- public void map ( MRContainer key, MRContainer value, Context context )
- throws IOException, InterruptedException {
- MRData data = value.data();
- for ( int i = 0; i < m; i++ ) {
- ckey.partition = (right_groupby_fnc.eval(data).hashCode() % n)*m+i;
- ckey.key = right_join_key_fnc.eval(data);
- tvalue.set(1,data);
- context.write(ckey,cvalue);
- }
- }
-
- @Override
- protected void setup ( Context context ) throws IOException,InterruptedException {
- super.setup(context);
- try {
- Configuration conf = context.getConfiguration();
- Config.read(conf);
- if (Plan.conf == null)
- Plan.conf = conf;
- Tree code = Tree.parse(conf.get("mrql.join.key.right"));
- right_join_key_fnc = functional_argument(conf,code);
- code = Tree.parse(conf.get("mrql.groupby.right"));
- right_groupby_fnc = functional_argument(conf,code);
- m = conf.getInt("mrql.m",1);
- n = conf.getInt("mrql.n",1);
- } catch (Exception e) {
- throw new Error("Cannot retrieve the right mapper plan");
- }
- }
- }
-
- /** the GroupByJoin reducer */
- private static class JoinReducer extends Reducer<GroupByJoinKey,MRContainer,MRContainer,MRContainer> {
- private static String counter; // a Hadoop user-defined counter used in the repeat operation
- private static int n, m; // n*m partitioners
- private static Function left_groupby_fnc; // left group-by function
- private static Function right_groupby_fnc;// right group-by function
- private static Function map_fnc; // the map function
- private static Function combine_fnc; // the combine function
- private static Function reduce_fnc; // the reduce function
- private static Bag left = new Bag(); // a cached bag of input fragments from left input
- private static int current_partition = -1;
- private static Hashtable<MRData,MRData> hashTable; // in-reducer combiner
- private static Tuple pair = new Tuple(2);
- private static MRContainer ckey = new MRContainer(new MR_int(0));
- private static MRContainer cvalue = new MRContainer(new MR_int(0));
- private static MRContainer container = new MRContainer(new MR_int(0));
- private static Tuple tkey = new Tuple(2);
- private static Bag tbag = new Bag(2);
-
- private static void write ( MRContainer key, MRData value, Context context )
- throws IOException, InterruptedException {
- if (counter.equals("-")) {
- container.set(value);
- context.write(key,container);
- } else { // increment the repetition counter if the repeat condition is true
- Tuple t = (Tuple)value;
- if (((MR_bool)t.second()).get())
- context.getCounter("mrql",counter).increment(1);
- container.set(t.first());
- context.write(key,container);
- }
- }
-
- private void store ( MRData key, MRData value ) throws IOException {
- MRData old = hashTable.get(key);
- Tuple k = (Tuple)key;
- pair.set(0,key);
- for ( MRData e: (Bag)map_fnc.eval(value) )
- if (old == null)
- hashTable.put(key,e);
- else {
- tbag.clear();
- tbag.add_element(e).add_element(old);
- pair.set(1,tbag);
- for ( MRData z: (Bag)combine_fnc.eval(pair) )
- hashTable.put(key,z); // normally, done once
- }
- }
-
- protected static void flush_table ( Context context ) throws IOException, InterruptedException {
- Enumeration<MRData> en = hashTable.keys();
- while (en.hasMoreElements()) {
- MRData key = en.nextElement();
- MRData value = hashTable.get(key);
- ckey.set(key);
- pair.set(0,key);
- tbag.clear();
- tbag.add_element(value);
- pair.set(1,tbag);
- for ( MRData e: (Bag)reduce_fnc.eval(pair) )
- write(ckey,e,context);
- };
- hashTable.clear();
- }
-
- @Override
- public void reduce ( GroupByJoinKey key, Iterable<MRContainer> values, Context context )
- throws IOException, InterruptedException {
- if (key.partition != current_partition && current_partition > 0) {
- // at the end of a partition, flush the hash table
- flush_table(context);
- current_partition = key.partition;
- };
- left.clear();
- Tuple p = null;
- final Iterator<MRContainer> i = values.iterator();
- // left tuples arrive before right tuples; cache the left values into the left bag
- while (i.hasNext()) {
- p = (Tuple)i.next().data();
- if (((MR_byte)p.first()).get() == 2)
- break;
- left.add(p.second());
- p = null;
- };
- // the previous value was from right
- if (p != null) {
- MRData y = p.second();
- MRData gy = right_groupby_fnc.eval(y);
- // cross product with left (must use new Tuples)
- for ( MRData x: left )
- store(new Tuple(left_groupby_fnc.eval(x),gy),new Tuple(x,y));
- // the rest of values are from right
- while (i.hasNext()) {
- y = ((Tuple)i.next().data()).second();
- gy = right_groupby_fnc.eval(y);
- // cross product with left (must use new Tuples)
- for ( MRData x: left )
- store(new Tuple(left_groupby_fnc.eval(x),gy),new Tuple(x,y));
- }
- }
- }
-
- @Override
- protected void setup ( Context context ) throws IOException,InterruptedException {
- super.setup(context);
- try {
- Configuration conf = context.getConfiguration();
- Config.read(conf);
- if (Plan.conf == null)
- Plan.conf = conf;
- Tree code = Tree.parse(conf.get("mrql.groupby.left"));
- left_groupby_fnc = functional_argument(conf,code);
- code = Tree.parse(conf.get("mrql.groupby.right"));
- right_groupby_fnc = functional_argument(conf,code);
- m = conf.getInt("mrql.m",1);
- n = conf.getInt("mrql.n",1);
- code = Tree.parse(conf.get("mrql.mapper"));
- map_fnc = functional_argument(conf,code);
- code = Tree.parse(conf.get("mrql.combiner"));
- combine_fnc = functional_argument(conf,code);
- code = Tree.parse(conf.get("mrql.reducer"));
- reduce_fnc = functional_argument(conf,code);
- counter = conf.get("mrql.counter");
- hashTable = new Hashtable<MRData,MRData>(Config.map_cache_size);
- } catch (Exception e) {
- throw new Error("Cannot retrieve the reducer plan");
- }
- }
-
- @Override
- protected void cleanup ( Context context ) throws IOException,InterruptedException {
- if (hashTable != null)
- flush_table(context);
- hashTable = null; // garbage-collect it
- super.cleanup(context);
- }
- }
-
- /** the GroupByJoin operation
- * @param left_join_key_fnc left join key function
- * @param right_join_key_fnc right join key function
- * @param left_groupby_fnc left group-by function
- * @param right_groupby_fnc right group-by function
- * @param map_fnc map function
- * @param combine_fnc combine function
- * @param reduce_fnc reduce function
- * @param X left data set
- * @param Y right data set
- * @param num_reducers number of reducers
- * @param n left dimension of the reducer grid
- * @param m right dimension of the reducer grid
- * @param stop_counter optional counter used in repeat operation
- * @return a DataSet that contains the result
- */
- public final static DataSet groupByJoin
- ( Tree left_join_key_fnc, // left join key function
- Tree right_join_key_fnc, // right join key function
- Tree left_groupby_fnc, // left group-by function
- Tree right_groupby_fnc, // right group-by function
- Tree map_fnc, // map function
- Tree combine_fnc, // combine function
- Tree reduce_fnc, // reduce function
- DataSet X, // left data set
- DataSet Y, // right data set
- int num_reducers, // number of reducers
- int n, int m, // dimensions of the reducer grid
- String stop_counter ) // optional counter used in repeat operation
- throws Exception {
- String newpath = new_path(conf);
- conf.set("mrql.join.key.left",left_join_key_fnc.toString());
- conf.set("mrql.join.key.right",right_join_key_fnc.toString());
- conf.set("mrql.groupby.left",left_groupby_fnc.toString());
- conf.set("mrql.groupby.right",right_groupby_fnc.toString());
- conf.setInt("mrql.m",m);
- conf.setInt("mrql.n",n);
- conf.set("mrql.mapper",map_fnc.toString());
- conf.set("mrql.combiner",combine_fnc.toString());
- conf.set("mrql.reducer",reduce_fnc.toString());
- conf.set("mrql.counter",stop_counter);
- Job job = new Job(conf,newpath);
- distribute_compiled_arguments(job.getConfiguration());
- job.setMapOutputKeyClass(GroupByJoinKey.class);
- job.setJarByClass(GroupByJoinPlan.class);
- job.setOutputKeyClass(MRContainer.class);
- job.setOutputValueClass(MRContainer.class);
- job.setPartitionerClass(GroupByJoinPartitioner.class);
- job.setSortComparatorClass(GroupByJoinSortComparator.class);
- job.setGroupingComparatorClass(GroupByJoinGroupingComparator.class);
- job.setOutputFormatClass(SequenceFileOutputFormat.class);
- FileOutputFormat.setOutputPath(job,new Path(newpath));
- for (DataSource p: X.source)
- MultipleInputs.addInputPath(job,new Path(p.path),(Class<? extends MapReduceMRQLFileInputFormat>)p.inputFormat,MapperLeft.class);
- for (DataSource p: Y.source)
- MultipleInputs.addInputPath(job,new Path(p.path),(Class<? extends MapReduceMRQLFileInputFormat>)p.inputFormat,MapperRight.class);
- job.setReducerClass(JoinReducer.class);
- if (num_reducers > 0)
- job.setNumReduceTasks(num_reducers);
- job.waitForCompletion(true);
- long c = (stop_counter.equals("-")) ? 0
- : job.getCounters().findCounter("mrql",stop_counter).getValue();
- DataSource s = new BinaryDataSource(newpath,conf);
- s.to_be_merged = false;
- return new DataSet(s,c,MapReducePlan.outputRecords(job));
- }
-}