You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mrql.apache.org by fe...@apache.org on 2014/03/13 15:24:49 UTC
[24/26] MRQL-32: Refactoring directory structure for Eclipse
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/core/src/main/java/org/apache/mrql/Bag.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/Bag.java b/core/src/main/java/org/apache/mrql/Bag.java
new file mode 100644
index 0000000..b092f30
--- /dev/null
+++ b/core/src/main/java/org/apache/mrql/Bag.java
@@ -0,0 +1,578 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import java.util.*;
+import java.io.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.fs.*;
+
+
+/**
+ * A sequence of MRData.
+ * There are 3 kinds of Bag implementations, which are converted at run-time, when necessary:
+ * 1) vector-based (materialized): used for small bags (when size is less than Config.max_materialized_bag);
+ * 2) stream-based: can be traversed only once; implemented as Java iterators;
+ * 3) spilled to a local file: can be accessed multiple times
+ */
+public class Bag extends MRData implements Iterable<MRData> {
+ private final static long serialVersionUID = 64629834894869L;
+ enum Modes { STREAMED, MATERIALIZED, SPILLED };
+ private transient Modes mode;
+ private transient ArrayList<MRData> content; // content of a materialized bag
+ private transient BagIterator iterator; // iterator for a streamed bag
+ private transient boolean consumed; // true, if the stream has already been used
+ private transient String path; // local path that contains the spilled bag
+ private transient SequenceFile.Writer writer; // the file writer for spiled bags
+
+ /**
+ * create an empty bag as an ArrayList
+ */
+ public Bag () {
+ mode = Modes.MATERIALIZED;
+ content = new ArrayList<MRData>();
+ }
+
+ /**
+ * create an empty bag as an ArrayList with a given capacity
+ * @param size initial capacity
+ */
+ public Bag ( final int size ) {
+ mode = Modes.MATERIALIZED;
+ content = new ArrayList<MRData>(size);
+ }
+
+ /**
+ * in-memory Bag construction (an ArrayList) initialized with data
+ * @param as a vector of MRData to insert in the Bag
+ */
+ public Bag ( final MRData ...as ) {
+ mode = Modes.MATERIALIZED;
+ content = new ArrayList<MRData>(as.length);
+ for ( MRData a: as )
+ content.add(a);
+ }
+
+ /**
+ * in-memory Bag construction (an ArrayList) initialized with data
+ * @param as a vector of MRData to insert in the Bag
+ */
+ public Bag ( final List<MRData> as ) {
+ mode = Modes.MATERIALIZED;
+ content = new ArrayList<MRData>(as.size());
+ for ( MRData a: as )
+ content.add(a);
+ }
+
+ /**
+ * lazy construction (stream-based) of a Bag
+ * @param i the Iterator that generates the Bag elements
+ */
+ public Bag ( final BagIterator i ) {
+ mode = Modes.STREAMED;
+ iterator = i;
+ consumed = false;
+ }
+
+ /** is the Bag stored in an ArrayList? */
+ public boolean materialized () {
+ return mode == Modes.MATERIALIZED;
+ }
+
+ /** is the Bag stream-based? */
+ public boolean streamed () {
+ return mode == Modes.STREAMED;
+ }
+
+ /** is the Bag spilled into a file? */
+ public boolean spilled () {
+ return mode == Modes.SPILLED;
+ }
+
+ /** return the Bag size (cache it in memory if necessary) */
+ public int size () {
+ if (materialized())
+ return content.size();
+ if (streamed() && consumed)
+ throw new Error("*** The collection stream has already been consumed");
+ int i = 0;
+ for ( MRData e: this )
+ i++;
+ if (streamed())
+ consumed = true;
+ return i;
+ }
+
+ /** trim the ArrayList that caches the Bag */
+ public void trim () {
+ if (materialized())
+ content.trimToSize();
+ }
+
+ /** get the n'th element of a Bag (cache it in memory if necessary)
+ * @param n the index
+ * @return the n'th element
+ */
+ public MRData get ( final int n ) {
+ if (materialized())
+ if (n < size())
+ return content.get(n);
+ else throw new Error("List index out of range: "+n);
+ if (streamed() && consumed)
+ throw new Error("*** The collection stream has already been consumed");
+ int i = 0;
+ for ( MRData e: this )
+ if (i++ == n)
+ return e;
+ if (streamed())
+ consumed = true;
+ throw new Error("Cannot retrieve the "+n+"th element of a sequence");
+ }
+
+ /** replace the n'th element of a Bag with a new value
+ * @param n the index
+ * @param value the new value
+ * @return the Bag
+ */
+ public Bag set ( final int n, final MRData value ) {
+ if (!materialized())
+ throw new Error("Cannot replace an element of a non-materialized sequence");
+ content.set(n,value);
+ return this;
+ }
+
+ /** add a new value to a Bag (cache it in memory if necessary)
+ * @param x the new value
+ */
+ public void add ( final MRData x ) {
+ materialize();
+ if (!spilled() && Config.hadoop_mode
+ && size() >= Config.max_materialized_bag)
+ spill();
+ if (spilled())
+ try {
+ if (writer == null) { // writer was closed earlier for reading
+ FileSystem fs = FileSystem.getLocal(Plan.conf);
+ writer = SequenceFile.createWriter(fs,Plan.conf,new Path(path),
+ MRContainer.class,NullWritable.class,
+ SequenceFile.CompressionType.NONE);
+ System.err.println("*** Appending elements to a spilled Bag: "+path);
+ };
+ writer.append(new MRContainer(x),NullWritable.get());
+ } catch (IOException e) {
+ throw new Error("Cannot append an element to a spilled Bag: "+path);
+ }
+ else content.add(x);
+ }
+
+ /** add a new value to a Bag (cache it in memory if necessary)
+ * @param x the new value
+ * @return the Bag
+ */
+ public Bag add_element ( final MRData x ) {
+ add(x);
+ return this;
+ }
+
+ /** add the elements of a Bag to the end of this Bag
+ * @param b the Bag whose elements are copied
+ * @return the Bag
+ */
+ public Bag addAll ( final Bag b ) {
+ for ( MRData e: b )
+ add(e);
+ return this;
+ }
+
+ /** make this Bag empty (cache it in memory if necessary) */
+ public void clear () {
+ if (materialized())
+ content.clear();
+ else if (streamed()) {
+ if (writer != null)
+ try {
+ writer.close();
+ } catch (IOException ex) {
+ throw new Error(ex);
+ };
+ writer = null;
+ path = null;
+ mode = Modes.MATERIALIZED;
+ content = new ArrayList<MRData>(100);
+ };
+ mode = Modes.MATERIALIZED;
+ content = new ArrayList<MRData>();
+ }
+
+ /** cache the Bag to an ArrayList when is absolutely necessary */
+ public void materialize () {
+ if (materialized() || spilled())
+ return;
+ Iterator<MRData> iter = iterator();
+ mode = Modes.MATERIALIZED;
+ writer = null;
+ path = null;
+ content = new ArrayList<MRData>(100);
+ while ( iter.hasNext() )
+ add(iter.next());
+ if (materialized()) // it may have been spilled
+ content.trimToSize();
+ iterator = null;
+ }
+
+ private static Random random_generator = new Random();
+
+ private static String new_path ( FileSystem fs ) throws IOException {
+ Path p;
+ do {
+ p = new Path("file://"+Config.tmpDirectory+"/mrql"+(random_generator.nextInt(1000000)));
+ } while (p.getFileSystem(Plan.conf).exists(p));
+ String path = p.toString();
+ Plan.temporary_paths.add(path);
+ return path;
+ }
+
+ /** spill the Bag to a local file */
+ private void spill () {
+ if (!spilled() && Config.hadoop_mode)
+ try {
+ if (Plan.conf == null)
+ Plan.conf = Evaluator.evaluator.new_configuration();
+ final FileSystem fs = FileSystem.getLocal(Plan.conf);
+ path = new_path(fs);
+ System.err.println("*** Spilling a Bag to a local file: "+path);
+ final Path p = new Path(path);
+ writer = SequenceFile.createWriter(fs,Plan.conf,new Path(path),
+ MRContainer.class,NullWritable.class,
+ SequenceFile.CompressionType.NONE);
+ for ( MRData e: this )
+ writer.append(new MRContainer(e),NullWritable.get());
+ mode = Modes.SPILLED;
+ content = null;
+ iterator = null;
+ } catch (Exception e) {
+ throw new Error("Cannot spill a Bag to a local file");
+ }
+ }
+
+ /**
+ * sort the Bag (cache it in memory if necessary).
+ * If the Bag was spilled during caching, use external sorting
+ */
+ public void sort () {
+ materialize();
+ if (spilled()) // if it was spilled during materialize()
+ try { // use external sorting
+ if (writer != null)
+ writer.close();
+ FileSystem fs = FileSystem.getLocal(Plan.conf);
+ SequenceFile.Sorter sorter
+ = new SequenceFile.Sorter(fs,new Plan.MRContainerKeyComparator(),
+ MRContainer.class,NullWritable.class,Plan.conf);
+ String out_path = new_path(fs);
+ System.err.println("*** Using external sorting on a spilled bag "+path+" -> "+out_path);
+ sorter.setMemory(64*1024*1024);
+ sorter.sort(new Path(path),new Path(out_path));
+ path = out_path;
+ writer = null;
+ } catch (Exception ex) {
+ throw new Error("Cannot sort a spilled bag");
+ }
+ else Collections.sort(content);
+ }
+
+ /** return the Bag Iterator */
+ public Iterator<MRData> iterator () {
+ if (spilled())
+ try {
+ if (writer != null)
+ writer.close();
+ writer = null;
+ return new BagIterator () {
+ final FileSystem fs = FileSystem.getLocal(Plan.conf);
+ final SequenceFile.Reader reader = new SequenceFile.Reader(fs,new Path(path),Plan.conf);
+ final MRContainer key = new MRContainer();
+ final NullWritable value = NullWritable.get();
+ MRData data;
+ public boolean hasNext () {
+ try {
+ if (!reader.next(key,value)) {
+ reader.close();
+ return false;
+ };
+ data = key.data();
+ return true;
+ } catch (IOException e) {
+ throw new Error("Cannot collect values from a spilled Bag");
+ }
+ }
+ public MRData next () {
+ return data;
+ }
+ };
+ } catch (IOException e) {
+ throw new Error("Cannot collect values from a spilled Bag");
+ }
+ else if (materialized())
+ return content.iterator();
+ else {
+ if (consumed) // this should never happen
+ throw new Error("*** The collection stream has already been consumed");
+ consumed = true;
+ return iterator;
+ }
+ }
+
+ /** cache MRData in memory by caching all Bags at any place and depth in MRData */
+ public void materializeAll () {
+ materialize();
+ for (MRData e: this)
+ e.materializeAll();
+ }
+
+ /** concatenate the elements of a given Bag to the elements of this Bag.
+ * Does not change either Bag
+ * @param s the given Bag
+ * @return a new Bag
+ */
+ public Bag union ( final Bag s ) {
+ final Iterator<MRData> i1 = iterator();
+ final Iterator<MRData> i2 = s.iterator();
+ return new Bag(new BagIterator () {
+ boolean first = true;
+ public boolean hasNext () {
+ if (first)
+ if (i1.hasNext())
+ return true;
+ else {
+ first = false;
+ return i2.hasNext();
+ }
+ else return i2.hasNext();
+ }
+ public MRData next () {
+ if (first)
+ return i1.next();
+ else return i2.next();
+ }
+ });
+ }
+
+ /** does this Bag contain an element?
+ * Cache this Bag in memory befor tetsing if necessary
+ * @param x the element to find
+ */
+ public boolean contains ( final MRData x ) {
+ if (materialized())
+ return content.contains(x);
+ if (streamed() && consumed)
+ throw new Error("*** The collection stream has already been consumed");
+ for ( MRData e: this )
+ if (x.equals(e))
+ return true;
+ if (streamed())
+ consumed = true;
+ return false;
+ }
+
+ /** if this Bag is a Map from keys to values (a Bag of (key,value) pairs),
+ * find the value with the given key; raise an error if not found
+ * @param key the search key
+ * @return the value associated with the key
+ */
+ public MRData map_find ( final MRData key ) {
+ if (streamed() && consumed)
+ throw new Error("*** The collection stream has already been consumed");
+ for ( MRData e: this ) {
+ Tuple p = (Tuple) e;
+ if (key.equals(p.first()))
+ return p.second();
+ };
+ if (streamed())
+ consumed = true;
+ throw new Error("key "+key+" not found in map");
+ }
+
+ /** if this Bag is a Map from keys to values (a Bag of (key,value) pairs),
+ * does it contain a given key?
+ * @param key the search key
+ */
+ public boolean map_contains ( final MRData key ) {
+ if (streamed() && consumed)
+ throw new Error("*** The collection stream has already been consumed");
+ for ( MRData e: this )
+ if (key.equals(((Tuple)e).first()))
+ return true;
+ if (streamed())
+ consumed = true;
+ return false;
+ }
+
+ /** the output serializer for Bag.
+ * Stream-based Bags are serialized lazily (without having to cache the Bag in memory)
+ */
+ final public void write ( DataOutput out ) throws IOException {
+ if (materialized()) {
+ out.writeByte(MRContainer.BAG);
+ WritableUtils.writeVInt(out,size());
+ for ( MRData e: this )
+ e.write(out);
+ } else {
+ out.writeByte(MRContainer.LAZY_BAG);
+ for ( MRData e: this )
+ e.write(out);
+ out.writeByte(MRContainer.END_OF_LAZY_BAG);
+ }
+ }
+
+ /** the input serializer for Bag */
+ final public static Bag read ( DataInput in ) throws IOException {
+ int n = WritableUtils.readVInt(in);
+ Bag bag = new Bag(n);
+ for ( int i = 0; i < n; i++ )
+ bag.add(MRContainer.read(in));
+ return bag;
+ }
+
+ /** a lazy input serializer for a Bag (it doesn't need to cache a Bag in memory) */
+ public static Bag lazy_read ( final DataInput in ) throws IOException {
+ Bag bag = new Bag(100);
+ MRData data = MRContainer.read(in);
+ while (data != MRContainer.end_of_lazy_bag) {
+ bag.add(data);
+ data = MRContainer.read(in);
+ };
+ if (bag.materialized())
+ bag.content.trimToSize();
+ return bag;
+ }
+
+ /** the input serializer for Bag */
+ public void readFields ( DataInput in ) throws IOException {
+ int n = WritableUtils.readVInt(in);
+ mode = Modes.MATERIALIZED;
+ iterator = null;
+ path = null;
+ writer = null;
+ if (content == null)
+ content = new ArrayList<MRData>(n);
+ else {
+ content.clear();
+ content.ensureCapacity(n);
+ };
+ for ( int i = 0; i < n; i++ )
+ add(MRContainer.read(in));
+ }
+
+ private void writeObject ( ObjectOutputStream out ) throws IOException {
+ materialize();
+ WritableUtils.writeVInt(out,size());
+ for ( MRData e: this )
+ e.write(out);
+ }
+
+ private void readObject ( ObjectInputStream in ) throws IOException, ClassNotFoundException {
+ int n = WritableUtils.readVInt(in);
+ mode = Modes.MATERIALIZED;
+ iterator = null;
+ path = null;
+ writer = null;
+ content = new ArrayList<MRData>(n);
+ for ( int i = 0; i < n; i++ )
+ add(MRContainer.read(in));
+ }
+
+ private void readObjectNoData () throws ObjectStreamException { };
+
+ /** compare this Bag with a given Bag by comparing their associated elements */
+ public int compareTo ( MRData x ) {
+ Bag xt = (Bag)x;
+ Iterator<MRData> xi = xt.iterator();
+ Iterator<MRData> yi = iterator();
+ while ( xi.hasNext() && yi.hasNext() ) {
+ int c = xi.next().compareTo(yi.next());
+ if (c < 0)
+ return -1;
+ else if (c > 0)
+ return 1;
+ };
+ if (xi.hasNext())
+ return -1;
+ else if (yi.hasNext())
+ return 1;
+ else return 0;
+ }
+
+ /** compare this Bag with a given Bag by comparing their associated elements */
+ final public static int compare ( byte[] x, int xs, int xl, byte[] y, int ys, int yl, int[] size ) {
+ try {
+ int xn = WritableComparator.readVInt(x,xs);
+ int xx = WritableUtils.decodeVIntSize(x[xs]);
+ int yn = WritableComparator.readVInt(y,ys);
+ int yy = WritableUtils.decodeVIntSize(y[ys]);
+ for ( int i = 0; i < xn && i < yn; i++ ) {
+ int k = MRContainer.compare(x,xs+xx,xl-xx,y,ys+yy,yl-yy,size);
+ if (k != 0)
+ return k;
+ xx += size[0];
+ yy += size[0];
+ };
+ size[0] = xx+1;
+ if (xn > yn)
+ return 1;
+ if (xn < yn)
+ return -1;
+ return 0;
+ } catch (IOException e) {
+ throw new Error(e);
+ }
+ }
+
+ /** is this Bag equal to another Bag (order is important) */
+ public boolean equals ( Object x ) {
+ if (!(x instanceof Bag))
+ return false;
+ Bag xt = (Bag) x;
+ Iterator<MRData> xi = xt.iterator();
+ Iterator<MRData> yi = iterator();
+ while ( xi.hasNext() && yi.hasNext() )
+ if ( !xi.next().equals(yi.next()) )
+ return false;
+ return xi.hasNext() || yi.hasNext();
+ }
+
+ /** the hash code of this Bag is the XOR of the hash code of its elements */
+ public int hashCode () {
+ int h = 127;
+ for ( MRData e: this )
+ h ^= e.hashCode();
+ return Math.abs(h);
+ }
+
+ /** show the first few Bag elements (controlled by -bag_print) */
+ public String toString () {
+ materialize();
+ StringBuffer b = new StringBuffer("{ ");
+ int i = 0;
+ for ( MRData e: this )
+ if ( i++ < Config.max_bag_size_print )
+ b.append(((i>1)?", ":"")+e);
+ else return b.append(", ... }").toString();
+ return b.append(" }").toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/core/src/main/java/org/apache/mrql/BagIterator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/BagIterator.java b/core/src/main/java/org/apache/mrql/BagIterator.java
new file mode 100644
index 0000000..0fd7354
--- /dev/null
+++ b/core/src/main/java/org/apache/mrql/BagIterator.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import java.util.Iterator;
+
+/** an Iterator over Bags */
+abstract public class BagIterator implements Iterator<MRData> {
+ public void remove () {
+ throw new Error("Bag deletions are not permitted");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/core/src/main/java/org/apache/mrql/BinaryDataSource.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/BinaryDataSource.java b/core/src/main/java/org/apache/mrql/BinaryDataSource.java
new file mode 100644
index 0000000..d4338ec
--- /dev/null
+++ b/core/src/main/java/org/apache/mrql/BinaryDataSource.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import org.apache.hadoop.conf.Configuration;
+
+
+/** A DataSource used for storing intermediate results and data dumps */
+final public class BinaryDataSource extends DataSource {
+ BinaryDataSource ( int source_num, String path, Configuration conf ) {
+ super(source_num,path,Evaluator.evaluator.binaryInputFormat(),conf);
+ }
+
+ BinaryDataSource ( String path, Configuration conf ) {
+ super(-1,path,Evaluator.evaluator.binaryInputFormat(),conf);
+ }
+
+ public String toString () {
+ return "Binary"+separator+source_num+separator+path;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/core/src/main/java/org/apache/mrql/ClassImporter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/ClassImporter.java b/core/src/main/java/org/apache/mrql/ClassImporter.java
new file mode 100644
index 0000000..8efc1ae
--- /dev/null
+++ b/core/src/main/java/org/apache/mrql/ClassImporter.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import java.lang.reflect.Method;
+import org.apache.mrql.gen.*;
+import java.util.*;
+
+/** imports external Java methods into MRQL */
+final public class ClassImporter {
+ final static boolean trace_imported_methods = false;
+
+ final static String[] object_methods
+ = { "hashCode", "getClass", "wait", "equals", "toString", "notify", "notifyAll" };
+
+ static Vector<MethodInfo> methods = new Vector<MethodInfo>();
+
+ public static void load_classes () {
+ if (methods == null)
+ methods = new Vector<MethodInfo>();
+ if (methods.size() == 0) {
+ importClass("org.apache.mrql.SystemFunctions");
+ //****** import your classes with user-defined functions here
+ }
+ }
+
+ private static boolean object_method ( String s ) {
+ for (int i = 0; i < object_methods.length; i++)
+ if (object_methods[i].equals(s))
+ return true;
+ return false;
+ }
+
+ private static Tree getType ( Class<?> c ) {
+ String cn = c.getCanonicalName();
+ Class<?>[] inf = c.getInterfaces();
+ if (cn.equals("org.apache.mrql.MRData"))
+ return new VariableLeaf("any");
+ if (cn.startsWith("org.apache.mrql.MR_"))
+ return new VariableLeaf(cn.substring(19));
+ if (cn.equals("org.apache.mrql.Bag"))
+ return new Node("bag",new Trees(new VariableLeaf("any")));
+ if (cn.equals("org.apache.mrql.Inv"))
+ return new VariableLeaf("any");
+ if (cn.equals("org.apache.mrql.Union"))
+ return new VariableLeaf("union");
+ if (cn.equals("org.apache.mrql.Lambda"))
+ return new VariableLeaf("any");
+ if (inf.length > 0 && inf[0].equals("org.apache.mrql.MRData"))
+ return new VariableLeaf("any");
+ throw new Error("Unsupported type in imported method: "+cn);
+ }
+
+ private static Trees signature ( Method m ) {
+ Class<?> co = m.getReturnType();
+ Class<?>[] cs = m.getParameterTypes();
+ Trees as = new Trees(getType(co));
+ for (int i = 0; i < cs.length; i++)
+ as = as.append(getType(cs[i]));
+ return as;
+ }
+
+ public static String method_name ( int method_number ) {
+ return methods.get(method_number).name;
+ }
+
+ public static Trees signature ( int method_number ) {
+ return methods.get(method_number).signature;
+ }
+
+ /** import all Java methods from a given Java class */
+ public static void importClass ( String class_name ) {
+ try {
+ Method[] ms = Class.forName(class_name).getMethods();
+ Vector<MethodInfo> mv = new Vector<MethodInfo>();
+ for (int i = 0; i < ms.length; i++)
+ if (!object_method(ms[i].getName()) && ms[i].getModifiers() == 9)
+ try {
+ Trees sig = signature(ms[i]);
+ MethodInfo m = new MethodInfo(ms[i].getName(),sig,ms[i]);
+ mv.add(m);
+ methods.add(m);
+ } catch ( Exception e ) {
+ System.out.println("Warning: method "+ms[i].getName()+" cannot be imported");
+ System.out.println(e);
+ throw new Error("");
+ };
+ Collections.sort(methods);
+ if (Translator.functions == null)
+ Translator.functions = Trees.nil;
+ for ( MethodInfo m: methods )
+ Translator.functions = Translator.functions.append(new Node(m.name,m.signature));
+ if (trace_imported_methods) {
+ System.out.print("Importing methods: ");
+ for (int i = 0; i < mv.size(); i++ )
+ System.out.print(mv.get(i).name+mv.get(i).signature.tail()
+ +":"+mv.get(i).signature.head()+" ");
+ System.out.println();
+ }
+ } catch (ClassNotFoundException x) {
+ throw new Error("Undefined class: "+class_name);
+ }
+ }
+
+ /** import a Java method with a given name from a given Java class */
+ public static void importMethod ( String class_name, String method_name ) {
+ try {
+ Method[] ms = Class.forName(class_name).getMethods();
+ MethodInfo m = null;
+ for (int i = 0; i < ms.length; i++)
+ if (ms[i].getName().equals(method_name)
+ && !object_method(ms[i].getName()) && ms[i].getModifiers() == 9) {
+ Trees sig = signature(ms[i]);
+ m = new MethodInfo(ms[i].getName(),sig,ms[i]);
+ Translator.functions = Translator.functions.append(new Node(ms[i].getName(),sig));
+ break;
+ };
+ if (m == null)
+ throw new Error("No such method: "+method_name);
+ methods.add(m);
+ Collections.sort(methods);
+ if (trace_imported_methods)
+ System.out.println("Importing method: "+m.name+m.signature.tail()
+ +":"+m.signature.head()+" ");
+ } catch (ClassNotFoundException x) {
+ throw new Error("Undefined class: "+class_name);
+ }
+ }
+
+ public static void print_methods () {
+ for (int i = 0; i < methods.size(); i++ ) {
+ MethodInfo m = methods.get(i);
+ System.out.print(" "+m.name+":"+m.signature.tail()+"->"+m.signature.head());
+ };
+ }
+
+ /** return the method specification of a system method with a given name over some expressions;
+ * When the method is overloaded, find the most specific (in terms of arg subtyping)
+ * @param method_name the given method name
+ * @param args the method expressions
+ * @return the method specification
+ */
+ public static Tree find_method ( String method_name, Trees args ) {
+ for (int i = 0; i < methods.size(); i++ ) {
+ MethodInfo m = methods.get(i);
+ if (m.name.equals(method_name) && TypeInference.subtype(args,m.signature.tail()))
+ return m.signature.head();
+ };
+ return null;
+ }
+
+ /** return the method number of a system method with a given name over some expressions;
+ * When the method is overloaded, find the most specific (in terms of arg subtyping)
+ * @param method_name the given method name
+ * @param args the method expressions
+ * @return the method number
+ */
+ public static int find_method_number ( String method_name, Trees args ) {
+ for (int i = 0; i < methods.size(); i++ ) {
+ MethodInfo m = methods.get(i);
+ if (m.name.equals(method_name) && TypeInference.subtype(args,m.signature.tail()))
+ return i;
+ };
+ return -1;
+ }
+
+ /** call a system method with a given number over MRData
+ * @param method_number the method number
+ * @param args in input arguments
+ * @return the result of invoking this method over the args
+ */
+ public static MRData call ( int method_number, MRData... args ) {
+ if (method_number < 0 || method_number >= methods.size())
+ throw new Error("Run-time error (unknown method name)");
+ MethodInfo m = methods.get(method_number);
+ try {
+ return (MRData)m.method.invoke(null,(Object[])args);
+ } catch (Exception e) {
+ Tuple t = new Tuple(args.length);
+ for ( int i = 0; i < args.length; i++ )
+ t.set(i,args[i]);
+ System.err.println("Run-time error in method call: "+m.name+t+" of type "
+ +m.signature.tail()+"->"+m.signature.head());
+ throw new Error(e.toString());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/core/src/main/java/org/apache/mrql/Compiler.gen
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/Compiler.gen b/core/src/main/java/org/apache/mrql/Compiler.gen
new file mode 100644
index 0000000..668cdc8
--- /dev/null
+++ b/core/src/main/java/org/apache/mrql/Compiler.gen
@@ -0,0 +1,584 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import org.apache.mrql.gen.*;
+import java.io.*;
+import javax.tools.*;
+import javax.tools.JavaCompiler.CompilationTask;
+import java.lang.reflect.*;
+import java.util.*;
+import java.net.*;
+import java.util.jar.*;
+import org.apache.hadoop.io.WritableComparable;
+
+
+/** compilation of MRQL expressions to Java code and then to Java bytecode */
+final public class Compiler extends Translator {
+ static JavaCompiler compiler = ToolProvider.getSystemJavaCompiler();
+ static DiagnosticCollector<JavaFileObject> diagnostics = new DiagnosticCollector<JavaFileObject>();
+ final static String tmp_dir = "/tmp/mrql_jar_"+System.getProperty("user.name");
+ public static String jar_path;
+ static Trees in_memory_repeat_vars = #[];
+ static int lambda_num = 0;
+ static int user_functions_num = 0;
+
+ /** Compile the MRQL functional arguments into Java bytecode */
+ final private static class JavaSourceFromString extends SimpleJavaFileObject {
+ final String code;
+
+ JavaSourceFromString ( String name, String code ) {
+ super(URI.create("string:///org/apache/mrql/" + name.replace('.','/') + Kind.SOURCE.extension),Kind.SOURCE);
+ this.code = code;
+ }
+
+ @Override
+ public CharSequence getCharContent ( boolean ignoreEncodingErrors ) {
+ return code;
+ }
+ }
+
+ private static String new_lambda_name () {
+ return "MRQL_Lambda_"+(lambda_num++);
+ }
+
+ private static void add2jar ( File source, int offset, JarOutputStream target ) throws IOException {
+ if (source.isDirectory()) {
+ String name = source.getPath();
+ if (name.length() > offset) {
+ JarEntry entry = new JarEntry(name.substring(offset));
+ entry.setTime(source.lastModified());
+ target.putNextEntry(entry);
+ target.closeEntry();
+ };
+ for ( File nestedFile: source.listFiles() )
+ add2jar(nestedFile,offset,target);
+ } else {
+ JarEntry entry = new JarEntry(source.getPath().substring(offset));
+ entry.setTime(source.lastModified());
+ target.putNextEntry(entry);
+ BufferedInputStream in = new BufferedInputStream(new FileInputStream(source));
+ byte[] buffer = new byte[1024];
+ int count = 1;
+ while (count > 0) {
+ count = in.read(buffer);
+ if (count > 0)
+ target.write(buffer,0,count);
+ };
+ target.closeEntry();
+ in.close();
+ }
+ }
+
+ private static void remove ( File file ) throws IOException {
+ if (file.isDirectory())
+ for ( File nestedFile: file.listFiles() )
+ remove(nestedFile);
+ file.delete();
+ }
+
+ public static void clean () throws IOException {
+ in_memory_repeat_vars = #[];
+ remove(new File(tmp_dir));
+ }
+
+ final private static Tree compile ( Tree e, StringBuffer out ) throws Exception {
+ match e {
+ case repeat(lambda(`v,`body),`s,`n):
+ in_memory_repeat_vars = in_memory_repeat_vars.cons(v);
+ return #<repeat(lambda(`v,`(compile(body,out))),`(compile(s,out)),`n)>;
+ case `f(...al):
+ if (!plans_with_distributed_lambdas.member(#<`f>))
+ fail;
+ Trees nl = #[];
+ for ( Tree a: al)
+ match Interpreter.closure(a,Interpreter.global_env) {
+ case lambda(`v,`body):
+ String fname = new_lambda_name();
+ StringBuffer sb = new StringBuffer(1000);
+ sb.append("final class "+fname+" extends Function {\n");
+ sb.append(fname+" () {}\n");
+ sb.append("final public MRData eval ( final MRData "+v
+ +" ) { return "+compileE(body)+"; }\n}\n");
+ out.append(sb);
+ nl = nl.append(#<compiled(`fname,`a)>);
+ case _: nl = nl.append(compile(a,out));
+ };
+ return #<`f(...nl)>;
+ case `f(...al):
+ Trees nl = #[];
+ for ( Tree a: al)
+ nl = nl.append(compile(a,out));
+ return #<`f(...nl)>;
+ };
+ return e;
+ }
+
+ private static StringBuffer out;
+
+ /** compile the functional arguments of the MRQL operators using the Java compiler
+ * @param query the expression to compile
+ * @return the query with all functional arguments compiled to Java bytecode
+ */
+ final public static Tree compile ( Tree query ) {
+ try {
+ user_functions_num = lambda_num++;
+ // remove the old jar
+ if (jar_path != null)
+ remove(new File(jar_path));
+ jar_path = tmp_dir+"/mrql_args_"+(new Random().nextInt(1000000))+".jar";
+ out = new StringBuffer(1000);
+ out.append("package org.apache.mrql;\n");
+ Tree nq = compile(query,out);
+ StringBuffer sb = new StringBuffer(1000);
+ for ( String f: global_functions )
+ match global_functions.lookup(f) {
+ case function(tuple(...params),`otp,`body):
+ sb.append("final public static "+get_MR_type(otp)+" "+f);
+ if (params.is_empty())
+ sb.append(" ()");
+ else {
+ match params.head() {
+ case bind(`v,`tp):
+ sb.append(" ( final "+get_MR_type(tp)+" "+v);
+ };
+ for ( Tree var: params.tail() )
+ match var {
+ case bind(`v,`tp):
+ sb.append(", final "+get_MR_type(tp)+" "+v);
+ }
+ sb.append(" ) { return ("+get_MR_type(otp)+")");
+ sb.append(compileE(body));
+ sb.append("; }\n");
+ }
+ };
+ out.append("final class UserFunctions_"+user_functions_num+" {\n");
+ out.append(sb);
+ out.append("}\n");
+ String code = out.toString();
+ //System.out.println(code);
+ JavaFileObject file = new JavaSourceFromString("UserFunctions_"+user_functions_num,code);
+ Iterable<? extends JavaFileObject> compilationUnits = Arrays.asList(file);
+ List<String> optionList = new ArrayList<String>();
+ (new File(tmp_dir)).mkdir();
+ String dir = tmp_dir+"/classes_"+(new Random().nextInt(1000000));
+ File fdir = new File(dir);
+ fdir.mkdir();
+ ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
+ String classpath = jar_path;
+ String separator = System.getProperty("path.separator");
+ for ( URL url: ((URLClassLoader) classLoader).getURLs() )
+ classpath += separator+url.getFile();
+ // use hadoop core jar
+ classpath += separator + WritableComparable.class.getProtectionDomain().getCodeSource().getLocation().toString();
+ optionList.addAll(Arrays.asList("-classpath",classpath));
+ optionList.addAll(Arrays.asList("-d",dir));
+ CompilationTask task = compiler.getTask(null,null,diagnostics,optionList,null,compilationUnits);
+ boolean success = task.call();
+ if (!success)
+ for ( Diagnostic d: diagnostics.getDiagnostics() )
+ System.err.println("*** Compilation error at line "+d.getLineNumber()+" position "
+ +d.getColumnNumber()+": "+d.getMessage(Locale.US));
+ Manifest manifest = new Manifest();
+ manifest.getMainAttributes().put(Attributes.Name.MANIFEST_VERSION,"1.0");
+ JarOutputStream target = new JarOutputStream(new FileOutputStream(jar_path),manifest);
+ add2jar(new File(dir+"/"),dir.length()+1,target);
+ target.close();
+ remove(fdir);
+ return nq;
+ } catch (Exception e) {
+ System.err.println("*** Warning: Unable to compile the query:\n"+query);
+ if (Config.trace)
+ e.printStackTrace(System.err);
+ return query;
+ }
+ }
+
+ /** load the Java class of the anonymous function with name lambda_name */
+ final public static Function compiled ( ClassLoader cl, String lambda_name ) throws Exception {
+ URL[] urls = ((URLClassLoader) cl).getURLs();
+ URL[] new_urls = new URL[urls.length+1];
+ for ( int i = 0; i < urls.length; i++ )
+ new_urls[i+1] = urls[i];
+ new_urls[0] = new URL("file://"+jar_path);
+ URLClassLoader loader = new URLClassLoader(new_urls,cl);
+ Class c = loader.loadClass("org.apache.mrql."+lambda_name);
+ Constructor cc = c.getDeclaredConstructors()[0];
+ cc.setAccessible(true);
+ return (Function)cc.newInstance();
+ }
+
+ /** The Java type of an MRQL type */
+ private static String get_MR_type ( Tree type ) {
+ match type {
+ case boolean: return "MR_bool";
+ case byte: return "MR_byte";
+ case short: return "MR_short";
+ case int: return "MR_int";
+ case long: return "MR_long";
+ case float: return "MR_float";
+ case double: return "MR_double";
+ case char: return "MR_char";
+ case string: return "MR_string";
+ case union: return "Union";
+ case bag(...): return "Bag";
+ case list(...): return "Bag";
+ };
+ return "MRData";
+ }
+
+ private static Trees remove_duplicates ( Trees al ) {
+ if (al.is_empty())
+ return al;
+ Trees el = remove_duplicates(al.tail());
+ if (el.member(al.head()))
+ return el;
+ else return el.cons(al.head());
+ }
+
+ /** lambda lifting: generate Java code from an anonymous function */
+ private static String compilef ( String v, Tree body ) throws Exception {
+ String fname = new_lambda_name();
+ Trees free_vars = remove_duplicates(free_variables(body,#[`v]));
+ StringBuffer sb = new StringBuffer(1000);
+ sb.append("final class "+fname+" extends Function {\n");
+ for ( Tree var: free_vars )
+ sb.append("MRData "+var+"; ");
+ sb.append("\npublic "+fname+" (");
+ if (free_vars.is_empty())
+ sb.append(") {}\n");
+ else {
+ sb.append(" MRData "+free_vars.head());
+ for ( Tree var: free_vars.tail() )
+ sb.append(", MRData "+var);
+ sb.append(" ) { ");
+ for ( Tree var: free_vars )
+ sb.append("this."+var+" = "+var+"; ");
+ sb.append("}\n");
+ };
+ sb.append("final public MRData eval ( final MRData "+v
+ +" ) { return "+compileE(body)+"; }\n}\n");
+ out.append(sb);
+ String s = "new "+fname+"(";
+ if (!free_vars.is_empty()) {
+ s += free_vars.head();
+ for ( Tree var: free_vars.tail() )
+ s += ","+var;
+ };
+ return s+")";
+ }
+
+ private static String compileF ( Tree fnc ) throws Exception {
+ match fnc {
+ case lambda(`v,`b):
+ return compilef(v.toString(),b);
+ case compiled(`f,`lm):
+ // recompile the function
+ String s = compileF(lm);
+ ((Node)fnc).children().head = new VariableLeaf(s.substring(4,s.indexOf("("))); //destructive
+ return s;
+ case function(tuple(...params),`tp,`body):
+ String ret = "new Lambda(new Function () { "
+ +"final public MRData eval ( final MRData _x ) { ";
+ for ( int i = 0; i < params.length(); i++ )
+ match params.nth(i) {
+ case bind(`v,_):
+ ret += "final MRData "+v+" = ((Tuple)_x).get("+i+"); ";
+ };
+ return ret+" return "+compileE(body)+"; } })";
+ };
+ throw new Exception("Ill-formed lambda: "+fnc);
+ }
+
+ private static String compileEL ( Trees el ) throws Exception {
+ if (el.is_empty())
+ return "";
+ String ret = compileE(el.head());
+ for ( Tree a: el.tail() )
+ ret += ","+compileE(a);
+ return ret;
+ }
+
+ private static String compileE ( Tree e ) throws Exception {
+ if (e == null)
+ return "(new MR_byte(0))";
+ if (e.equals(#<true>))
+ return "(new MR_bool(true))";
+ else if (e.equals(#<false>))
+ return "(new MR_bool(false))";
+ else if (e.equals(#<null>))
+ return "(new MR_byte(0))";
+ else if (e.is_variable())
+ if (in_memory_repeat_vars.member(e))
+ return "Interpreter.lookup_global_binding(\""+e.toString()+"\")";
+ else return e.toString();
+ else if (e.is_long())
+ return "(new MR_int("+((LongLeaf)e).value()+"))";
+ else if (e.is_double())
+ return "(new MR_float("+((DoubleLeaf)e).value()+"))";
+ else if (e.is_string())
+ return "(new MR_string("+e.toString()+"))";
+ match e {
+ case callM(and,_,`x,`y):
+ return "(new MR_bool(((MR_bool)"+compileE(x)
+ +").get() && ((MR_bool)"+compileE(y)+").get()))";
+ case callM(or,_,`x,`y):
+ return "(new MR_bool(((MR_bool)"+compileE(x)
+ +").get() || ((MR_bool)"+compileE(y)+").get()))";
+ case callM(not,_,`x):
+ return "(new MR_bool(!((MR_bool)"+compileE(x)+").get()))";
+ case callM(`f,`n,...args):
+ if (!n.is_long())
+ fail;
+ String ret = "SystemFunctions."+ClassImporter.method_name((int)((LongLeaf)n).value())+"(";
+ Trees sig = ClassImporter.signature((int)((LongLeaf)n).value());
+ for (int i = 0; i < args.length(); i++)
+ ret += ((i > 0) ? ",(" : "(")+get_MR_type(sig.nth(i+1))+")("+compileE(args.nth(i))+")";
+ return ret+")";
+ case lambda(`v,`body):
+ return "new Lambda("+compilef(v.toString(),body)+")";
+ case nth(`x,`n):
+ return "(((Tuple)("+compileE(x)+")).get("+((LongLeaf)n).value()+"))";
+ case setNth(`x,`n,`v,`ret):
+ return "(((Tuple)("+compileE(x)+")).set("+((LongLeaf)n).value()+","+compileE(v)+","+compileE(ret)+"))";
+ case materialize(`u):
+ return "MapReduceAlgebra.materialize("+compileE(u)+")";
+ case let(`v,`u,`body):
+ return "(new Function () { public MRData eval ( final MRData "+v
+ +" ) { if ("+v+" instanceof Bag) ((Bag)"+v+").materialize(); return "
+ +compileE(body)+"; }; }).eval("+compileE(u)+")";
+ case cmap(`m,`s):
+ return "MapReduceAlgebra.cmap("+compileF(m)+",(Bag)("+compileE(s)+"))";
+ case filter(`p,`m,`s):
+ return "MapReduceAlgebra.filter("+compileF(p)+","+compileF(m)
+ +",(Bag)"+compileE(s)+")";
+ case map(`m,`s):
+ return "MapReduceAlgebra.map("+compileF(m)+",(Bag)"+compileE(s)+")";
+ case range(`min,`max):
+ return "MapReduceAlgebra.generator(((MR_long)"+compileE(min)+").get(),"
+ +"((MR_long)"+compileE(max)+").get())";
+ case call(`f,...args):
+ return "("+compileF(f)+".eval("+compileE(#<tuple(...args)>)+"))";
+ case tuple():
+ return "(new Tuple())";
+ case tuple(`x):
+ return "(new Tuple("+compileE(x)+"))";
+ case tuple(`a,...el):
+ String ret = "(new Tuple("+compileE(a);
+ for ( Tree x: el )
+ ret += ","+compileE(x);
+ return ret+"))";
+ case tagged_union(`n,`u):
+ return "(new Union((byte)"+((LongLeaf)n).value()+","+compileE(u)+"))";
+ case union_value(`x):
+ return "(((Union)"+compileE(x)+").value())";
+ case union_tag(`x):
+ return "(new MR_int(((Union)"+compileE(x)+").tag()))";
+ // used for shortcutting sync in bsp supersteps
+ case BAG():
+ return "SystemFunctions.bsp_empty_bag";
+ case TRUE():
+ return "SystemFunctions.bsp_true_value";
+ case FALSE():
+ return "SystemFunctions.bsp_false_value";
+ case `T():
+ if (is_collection(T))
+ return "(new Bag())";
+ else fail
+ case `T(e):
+ if (is_collection(T))
+ return "(new Bag("+compileE(e)+"))";
+ else fail
+ case `T(`a,...el):
+ if (!is_collection(T))
+ fail;
+ String ret = "(new Bag("+compileE(a);
+ for ( Tree x: el )
+ ret += ",(MRData)"+compileE(x);
+ return ret+"))";
+ case if(`c,`x,`y):
+ return "((((MR_bool)"+compileE(c)+").get())?"+compileE(x)+":"+compileE(y)+")";
+ case synchronize(`peer,`b):
+ return "SystemFunctions.synchronize(((MR_string)"+compileE(peer)+"),(MR_bool)"+compileE(b)+")";
+ case distribute(`peer,`s):
+ return "SystemFunctions.distribute(((MR_string)"+compileE(peer)+"),(Bag)"+compileE(s)+")";
+ case mapReduce(`mx,`my,`s,_):
+ return "MapReduceAlgebra.mapReduce("+compileF(mx)+","+compileF(my)+",(Bag)("+compileE(s)+"))";
+ case mapReduce2(`mx,`my,`r,`x,`y,_):
+ return "MapReduceAlgebra.mapReduce2("+compileF(mx)+","+compileF(my)+","+compileF(r)
+ +",(Bag)("+compileE(x)+"),(Bag)("+compileE(y)+"))";
+ case mapJoin(`kx,`ky,`r,`x,`y):
+ return "MapReduceAlgebra.mapJoin("+compileF(kx)+","+compileF(ky)+","+compileF(r)
+ +",(Bag)("+compileE(x)+"),(Bag)("+compileE(y)+"))";
+ case join(`kx,`ky,`r,`x,`y):
+ return "MapReduceAlgebra.join("+compileF(kx)+","+compileF(ky)+","+compileF(r)
+ +",(Bag)("+compileE(x)+"),(Bag)("+compileE(y)+"))";
+ case groupBy(`s):
+ return "MapReduceAlgebra.groupBy((Bag)("+compileE(s)+"))";
+ case index(`x,`n):
+ return "SystemFunctions.index((Bag)("+compileE(x)+"),"+compileE(n)+")";
+ case range(`x,`i,`j):
+ return "SystemFunctions.range((Bag)("+compileE(x)+"),"+compileE(i)+","+compileE(j)+")";
+ case map_index(`x,`key):
+ return "((Bag)("+compileE(x)+")).map_find("+compileE(key)+")";
+ case aggregate(`acc,`zero,`s):
+ return "MapReduceAlgebra.aggregate("+compileF(acc)+","+compileE(zero)+",(Bag)("+compileE(s)+"))";
+ case Aggregate(`acc,`zero,`s):
+ return "MapReducePlan.aggregate("+compileF(acc)+","+compileE(zero)+","+compileM(s)+")";
+ case mergeGroupByJoin(`kx,`ky,`gx,`gy,`m,`c,`r,`x,`y,`o):
+ return "MapReduceAlgebra.mergeGroupByJoin("+compileF(kx)+","+compileF(ky)
+ +","+compileF(gx)+","+compileF(gy)+","+compileF(m)+","+compileF(c)
+ +","+compileF(r)+",(Bag)"+compileE(x)+",(Bag)"+compileE(y)+")";
+ case function(tuple(...params),`tp,`body):
+ return compileF(e);
+ case typed(`x,_):
+ return compileE(x);
+ case apply(`f,tuple(...args)):
+ if (!f.is_variable())
+ fail;
+ match global_functions.lookup(f.toString()) {
+ case function(tuple(...params),`otp,`body):
+ String ret = "UserFunctions_"+user_functions_num+"."+f+"(";
+ if (args.is_empty())
+ return ret+")";
+ for ( int i = 0; i < params.length(); i++ )
+ match params.nth(i) {
+ case bind(_,`tp):
+ ret += ((i==0)?"":",")+"("+get_MR_type(tp)+")"+compileE(args.nth(i));
+ };
+ return ret+")";
+ };
+ case apply(`f,`arg):
+ if (!f.is_variable())
+ fail;
+ match global_functions.lookup(f.toString()) {
+ case function(tuple(...params),`otp,`body):
+ String ac = compileE(arg);
+ String ret = "UserFunctions_"+user_functions_num+"."+f+"(";
+ for ( int i = 0; i < params.length(); i++ )
+ match params.nth(i) {
+ case bind(_,`tp):
+ ret += ((i==0)?"":",")+"("+get_MR_type(tp)+")((Tuple)"+ac+").get("+i+")";
+ };
+ return ret+")";
+ };
+ case apply(`f,`arg):
+ if (!f.is_variable())
+ return "("+compileF(f)+").eval("+compileE(arg)+")";
+ else return "(((Lambda)"+compileE(f)+").lambda().eval("+compileE(arg)+"))";
+ case Collect(`s):
+ return "Plan.collect("+compileM(s)+")";
+ case trace(`x):
+ return compileE(x);
+ case _:
+ return compileM(e);
+ };
+ throw new Exception("Cannot compile: "+e);
+ }
+
+ final private static String compileM ( Tree e ) throws Exception {
+ match e {
+ case cMap(`f,`s):
+ return "MapReduceAlgebra.cmap("+compileF(f)+",(Bag)"+compileM(s)+")";
+ case AggregateMap(`f,`acc,`zero,`s):
+ return "new Bag(MapReduceAlgebra.aggregate("+compileF(acc)+","+compileE(zero)
+ +","+compileM(#<cMap(`f,`s)>)+"))";
+ case MapReduce(`m,`r,`s,_):
+ return "MapReduceAlgebra.mapReduce("+compileF(m)+","
+ +compileF(r)+",(Bag)"+compileM(s)+")";
+ case MapAggregateReduce(`m,`r,`acc,`zero,`s,_):
+ return "new Bag(MapReduceAlgebra.aggregate("+compileF(acc)
+ +","+compileE(zero)+","+compileM(#<MapReduce(`m,`r,`s)>)+"))";
+ case MapCombineReduce(`m,`c,`r,`s,_):
+ return "MapReduceAlgebra.mapReduce("+compileF(m)
+ +","+compileF(r)+",(Bag)"+compileM(s)+")";
+ case MapReduce2(`mx,`my,`c,`r,`x,`y,_):
+ return "MapReduceAlgebra.mapReduce2("+compileF(mx)+","+compileF(my)
+ +","+compileF(r)+",(Bag)"+compileM(x)+",(Bag)"+compileM(y)+")";
+ case MapReduce2(`mx,`my,`r,`x,`y,_):
+ return "MapReduceAlgebra.mapReduce2("+compileF(mx)+","+compileF(my)
+ +","+compileF(r)+",(Bag)"+compileM(x)+",(Bag)"+compileM(y)+")";
+ case MapAggregateReduce2(`mx,`my,`r,`acc,`zero,`x,`y,_):
+ return "new Bag(MapReduceAlgebra.aggregate("+compileF(acc)+","+compileE(zero)
+ +","+compileM(#< MapReduce2(`mx,`my,`r,`x,`y)>)+"))";
+ case MapJoin(`kx,`ky,`r,`x,`y):
+ return "MapReduceAlgebra.mapJoin("+compileF(kx)+","+compileF(ky)
+ +","+compileF(r)+",(Bag)"+compileM(x)+",(Bag)"+compileM(y)+")";
+ case MapAggregateJoin(`kx,`ky,`r,`acc,`zero,`x,`y):
+ return "new Bag(MapReduceAlgebra.aggregate("+compileF(acc)
+ +","+compileE(zero)+","+compileM(#<MapJoin(`kx,`ky,`r,`x,`y)>)+"))";
+ case GroupByJoin(`kx,`ky,`gx,`gy,`m,`c,`r,`x,`y,`o):
+ return "MapReduceAlgebra.groupByJoin("+compileF(kx)+","+compileF(ky)
+ +","+compileF(gx)+","+compileF(gy)+","+compileF(m)+","+compileF(c)
+ +","+compileF(r)+",(Bag)"+compileM(x)+",(Bag)"+compileM(y)+")";
+ case CrossProduct(`mx,`my,`r,`x,`y):
+ return "MapReduceAlgebra.crossProduct("+compileF(mx)+","+compileF(my)
+ +","+compileF(r)+",(Bag)"+compileM(x)+",(Bag)"+compileM(y)+")";
+ case CrossAggregateProduct(`mx,`my,`r,`acc,`zero,`x,`y):
+ return "new Bag(MapReduceAlgebra.aggregate("+compileF(acc)+","+compileE(zero)
+ +","+compileM(#<CrossProduct(`mx,`my,`r,`x,`y)>)+"))";
+ case BSPSource(`n,BinarySource(`file,_)):
+ if (Config.hadoop_mode)
+ return "Plan.binarySource("+compileE(n)+",((MR_string)"
+ +compileE(file)+").get())";
+ else return "(Bag)MapReduceAlgebra.read_binary("+compileE(n)+",((MR_string)"
+ +compileE(file)+").get())";
+ case BinarySource(`file,_):
+ if (Config.hadoop_mode)
+ return "Plan.binarySource(((MR_string)"+compileE(file)+").get())";
+ else return "(Bag)MapReduceAlgebra.read_binary(((MR_string)"+compileE(file)+").get())";
+ case BSPSource(`n,ParsedSource(`parser,`file,...args)):
+ if (!(n instanceof LongLeaf))
+ fail;
+ if (!Config.hadoop_mode)
+ return "MapReduceAlgebra.parsedSource(((MR_int)"+compileE(n)+").get(),\""
+ +parser+"\",((MR_string)"+compileE(file)+").get(),"
+ +reify(args)+")";
+ Class<? extends Parser> p = DataSource.parserDirectory.get(parser.toString());
+ if (p == null)
+ throw new Error("Unknown parser: "+parser);
+ return "Plan.parsedSource("+p.getName()+".class,((MR_string)"+compileE(file)+").get(),"
+ +reify(args)+")";
+ case ParsedSource(`parser,`file,...args):
+ if (!Config.hadoop_mode)
+ return "MapReduceAlgebra.parsedSource(\""+parser+"\",((MR_string)"
+ +compileE(file)+").get(),"+reify(args)+")";
+ Class<? extends Parser> p = DataSource.parserDirectory.get(parser.toString());
+ if (p == null)
+ throw new Error("Unknown parser: "+parser);
+ return "Plan.parsedSource("+p.getName()+".class,((MR_string)"+compileE(file)+").get(),"
+ +reify(args)+")";
+ case Merge(`x,`y):
+ return "((Bag)"+compileM(x)+").union((Bag)"+compileM(y)+")";
+ case Generator(`min,`max,`size):
+ return "MapReduceAlgebra.generator(((MR_long)"+compileE(min)+").get(),"
+ +"((MR_long)"+compileE(max)+").get())";
+ case BSP(`n,`superstep,`state,`o,...as):
+ String ds = "";
+ for ( Tree a: as )
+ ds += ",(Bag)("+compileM(a)+")";
+ return "MapReduceAlgebra.BSP("+((LongLeaf)n).value()+","
+ +compileF(superstep)+","+compileE(state)+","+o+","
+ +"new Bag[]{"+ds.substring(1)+"})";
+ case `v:
+ if (v.is_variable())
+ return v.toString();
+ };
+ throw new Exception("Cannot compile: "+e);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/core/src/main/java/org/apache/mrql/Config.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/Config.java b/core/src/main/java/org/apache/mrql/Config.java
new file mode 100644
index 0000000..a957d89
--- /dev/null
+++ b/core/src/main/java/org/apache/mrql/Config.java
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import org.apache.mrql.gen.VariableLeaf;
+import java.util.ArrayList;
+import java.io.FileInputStream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+
+/** MRQL configuration parameters */
+final public class Config {
+ public static boolean loaded = false;
+
+ // true for using Hadoop HDFS file-system
+ public static boolean hadoop_mode = false;
+ // true for local execution (one node)
+ public static boolean local_mode = false;
+ // true for local execution (one node)
+ public static boolean distributed_mode = false;
+ // true for Hadoop map-reduce mode
+ public static boolean map_reduce_mode = false;
+ // true, for BSP mode using Hama
+ public static boolean bsp_mode = false;
+ // true, for Spark mode
+ public static boolean spark_mode = false;
+ // if true, it process the input interactively
+ public static boolean interactive = true;
+ // compile the MR functional arguments to Java bytecode at run-time
+ // (each task-tracker repeats the compilation at the MR setup time)
+ public static boolean compile_functional_arguments = true;
+ // if true, generates info about all compilation and optimization steps
+ public static boolean trace = false;
+ // number of worker nodes
+ public static int nodes = 2;
+ // true, to disable mapJoin
+ public static boolean noMapJoin = false;
+ // max distributed cache size for MapJoin (fragment-replicate join) in MBs
+ public static int mapjoin_size = 50;
+ // max entries for in-mapper combiner before they are flushed out
+ public static int map_cache_size = 100000;
+ // max number of bag elements to print
+ public static int max_bag_size_print = 20;
+ // max size of materialized vector before is spilled to a file:
+ public static int max_materialized_bag = 500000;
+ // max number of incoming messages before a sub-sync()
+ public static int bsp_msg_size = Integer.MAX_VALUE;
+ // number of elements per mapper to process the range min...max
+ public static long range_split_size = 100000;
+ // max number of streams to merge simultaneously
+ public static int max_merged_streams = 100;
+ // the directory for temporary files and spilled bags
+ public static String tmpDirectory = "/tmp/mrql_"+System.getProperty("user.name");
+ // true, if we want to derive a combine function for MapReduce
+ public static boolean use_combiner = true;
+ // true, if we can use the rule that fuses a groupBy with a join over the same key
+ public static boolean groupJoinOpt = true;
+ // true, if we can use the rule that converts a self-join into a simple mapreduce
+ public static boolean selfJoinOpt = true;
+ // true for run-time trace of plans
+ public static boolean trace_execution = false;
+ // true for extensive run-time trace of expressions & plans
+ public static boolean trace_exp_execution = false;
+ // true if you don't want to print statistics
+ public static boolean quiet_execution = false;
+ // true if this is during testing
+ public static boolean testing = false;
+ // true to display INFO log messages
+ public static boolean info = false;
+
+ /** store the configuration parameters */
+ public static void write ( Configuration conf ) {
+ conf.setBoolean("mrql.hadoop.mode",hadoop_mode);
+ conf.setBoolean("mrql.local.mode",local_mode);
+ conf.setBoolean("mrql.distributed.mode",distributed_mode);
+ conf.setBoolean("mrql.map.reduce.mode",map_reduce_mode);
+ conf.setBoolean("mrql.bsp.mode",bsp_mode);
+ conf.setBoolean("mrql.spark.mode",spark_mode);
+ conf.setBoolean("mrql.interactive",interactive);
+ conf.setBoolean("mrql.compile.functional.arguments",compile_functional_arguments);
+ conf.setBoolean("mrql.trace",trace);
+ conf.setInt("mrql.nodes",nodes);
+ conf.setInt("mrql.mapjoin.size",mapjoin_size);
+ conf.setInt("mrql.in.mapper.size",map_cache_size);
+ conf.setInt("mrql.max.bag.size.print",max_bag_size_print);
+ conf.setInt("mrql.max.materialized.bag",max_materialized_bag);
+ conf.setInt("mrql.bsp.msg.size",bsp_msg_size);
+ conf.setLong("mrql.range.split.size",range_split_size);
+ conf.setInt("mrql.max.merged.streams",max_merged_streams);
+ conf.set("mrql.tmp.directory",tmpDirectory);
+ conf.setBoolean("mrql.use.combiner",use_combiner);
+ conf.setBoolean("mrql.group.join.opt",groupJoinOpt);
+ conf.setBoolean("mrql.self.join.opt",selfJoinOpt);
+ conf.setBoolean("mrql.trace.execution",trace_execution);
+ conf.setBoolean("mrql.trace.exp.execution",trace_exp_execution);
+ conf.setBoolean("mrql.quiet.execution",quiet_execution);
+ conf.setBoolean("mrql.testing",testing);
+ conf.setBoolean("mrql.info",info);
+ }
+
+ /** load the configuration parameters */
+ public static void read ( Configuration conf ) {
+ if (loaded)
+ return;
+ loaded = true;
+ hadoop_mode = conf.getBoolean("mrql.hadoop.mode",hadoop_mode);
+ local_mode = conf.getBoolean("mrql.local.mode",local_mode);
+ distributed_mode = conf.getBoolean("mrql.distributed.mode",distributed_mode);
+ map_reduce_mode = conf.getBoolean("mrql.map.reduce.mode",map_reduce_mode);
+ bsp_mode = conf.getBoolean("mrql.bsp.mode",bsp_mode);
+ spark_mode = conf.getBoolean("mrql.spark.mode",spark_mode);
+ interactive = conf.getBoolean("mrql.interactive",interactive);
+ compile_functional_arguments = conf.getBoolean("mrql.compile.functional.arguments",compile_functional_arguments);
+ trace = conf.getBoolean("mrql.trace",trace);
+ nodes = conf.getInt("mrql.nodes",nodes);
+ mapjoin_size = conf.getInt("mrql.mapjoin.size",mapjoin_size);
+ map_cache_size = conf.getInt("mrql.in.mapper.size",map_cache_size);
+ max_bag_size_print = conf.getInt("mrql.max.bag.size.print",max_bag_size_print);
+ max_materialized_bag = conf.getInt("mrql.max.materialized.bag",max_materialized_bag);
+ bsp_msg_size = conf.getInt("mrql.bsp.msg.size",bsp_msg_size);
+ range_split_size = conf.getLong("mrql.range.split.size",range_split_size);
+ max_merged_streams = conf.getInt("mrql.max.merged.streams",max_merged_streams);
+ tmpDirectory = conf.get("mrql.tmp.directory");
+ use_combiner = conf.getBoolean("mrql.use.combiner",use_combiner);
+ groupJoinOpt = conf.getBoolean("mrql.group.join.opt",groupJoinOpt);
+ selfJoinOpt = conf.getBoolean("mrql.self.join.opt",selfJoinOpt);
+ trace_execution = conf.getBoolean("mrql.trace.execution",trace_execution);
+ trace_exp_execution = conf.getBoolean("mrql.trace.exp.execution",trace_exp_execution);
+ quiet_execution = conf.getBoolean("mrql.quiet.execution",quiet_execution);
+ testing = conf.getBoolean("mrql.testing",testing);
+ info = conf.getBoolean("mrql.info",info);
+ }
+
+ public static ArrayList<String> extra_args = new ArrayList<String>();
+
+ /** read configuration parameters from the Main args */
+ public static Bag parse_args ( String args[], Configuration conf ) throws Exception {
+ int i = 0;
+ int iargs = 0;
+ extra_args = new ArrayList<String>();
+ ClassImporter.load_classes();
+ interactive = true;
+ while (i < args.length) {
+ if (args[i].equals("-local")) {
+ local_mode = true;
+ i++;
+ } else if (args[i].equals("-dist")) {
+ distributed_mode = true;
+ i++;
+ } else if (args[i].equals("-reducers")) {
+ if (++i >= args.length)
+ throw new Error("Expected number of reductions");
+ nodes = Integer.parseInt(args[i]);
+ i++;
+ } else if (args[i].equals("-bsp")) {
+ bsp_mode = true;
+ i++;
+ } else if (args[i].equals("-spark")) {
+ spark_mode = true;
+ i++;
+ } else if (args[i].equals("-bsp_tasks")) {
+ if (++i >= args.length && Integer.parseInt(args[i]) < 1)
+ throw new Error("Expected max number of bsp tasks > 1");
+ nodes = Integer.parseInt(args[i]);
+ i++;
+ } else if (args[i].equals("-nodes")) {
+ if (++i >= args.length && Integer.parseInt(args[i]) < 1)
+ throw new Error("Expected number of nodes > 1");
+ nodes = Integer.parseInt(args[i]);
+ i++;
+ } else if (args[i].equals("-bsp_msg_size")) {
+ if (++i >= args.length && Integer.parseInt(args[i]) < 10000)
+ throw new Error("Expected max number of bsp messages before subsync() > 10000");
+ bsp_msg_size = Integer.parseInt(args[i]);
+ i++;
+ } else if (args[i].equals("-mapjoin_size")) {
+ if (++i >= args.length)
+ throw new Error("Expected number of MBs");
+ mapjoin_size = Integer.parseInt(args[i]);
+ i++;
+ } else if (args[i].equals("-cache_size")) {
+ if (++i >= args.length)
+ throw new Error("Expected number of entries");
+ map_cache_size = Integer.parseInt(args[i]);
+ i++;
+ } else if (args[i].equals("-tmp")) {
+ if (++i >= args.length)
+ throw new Error("Expected a temporary directory");
+ tmpDirectory = args[i];
+ i++;
+ } else if (args[i].equals("-bag_size")) {
+ if (++i >= args.length && Integer.parseInt(args[i]) < 10000)
+ throw new Error("Expected max size of materialized bag > 10000");
+ max_materialized_bag = Integer.parseInt(args[i]);
+ i++;
+ } else if (args[i].equals("-bag_print")) {
+ if (++i >= args.length)
+ throw new Error("Expected number of bag elements to print");
+ max_bag_size_print = Integer.parseInt(args[i]);
+ i++;
+ } else if (args[i].equals("-split_size")) {
+ if (++i >= args.length)
+ throw new Error("Expected a split size");
+ range_split_size = Long.parseLong(args[i]);
+ i++;
+ } else if (args[i].equals("-max_merged")) {
+ if (++i >= args.length)
+ throw new Error("Expected a max number of merged streams");
+ max_merged_streams = Integer.parseInt(args[i]);
+ i++;
+ } else if (args[i].equals("-trace")) {
+ trace = true;
+ i++;
+ } else if (args[i].equals("-C")) {
+ compile_functional_arguments = true;
+ i++;
+ } else if (args[i].equals("-NC")) {
+ compile_functional_arguments = false;
+ i++;
+ } else if (args[i].equals("-P")) {
+ trace_execution = true;
+ i++;
+ } else if (args[i].equals("-quiet")) {
+ quiet_execution = true;
+ i++;
+ } else if (args[i].equals("-info")) {
+ info = true;
+ i++;
+ } else if (args[i].equals("-trace_execution")) {
+ trace_execution = true;
+ trace_exp_execution = true;
+ compile_functional_arguments = false;
+ i++;
+ } else if (args[i].equals("-methods")) {
+ System.out.print("\nImported methods: ");
+ ClassImporter.print_methods();
+ System.out.println();
+ System.out.print("\nAggregations:");
+ Translator.print_aggregates();
+ System.out.println();
+ i++;
+ } else if (args[i].charAt(0) == '-')
+ throw new Error("Unknown MRQL parameter: "+args[i]);
+ else {
+ if (interactive) {
+ Main.query_file = args[i++];
+ interactive = false;
+ } else extra_args.add(args[i++]);
+ }
+ };
+ if (hadoop_mode)
+ write(conf);
+ Plan.conf = conf;
+ Bag b = new Bag();
+ for ( String s: extra_args )
+ b.add(new MR_string(s));
+ Interpreter.new_global_binding(new VariableLeaf("args").value(),b);
+ return b;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/core/src/main/java/org/apache/mrql/DataSet.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/DataSet.java b/core/src/main/java/org/apache/mrql/DataSet.java
new file mode 100644
index 0000000..efe5646
--- /dev/null
+++ b/core/src/main/java/org/apache/mrql/DataSet.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import java.util.List;
+import java.util.ArrayList;
+import org.apache.hadoop.conf.Configuration;
+
+
+/** The domain of the MRQL physical algebra is a set of DataSources */
+public class DataSet {
+ public ArrayList<DataSource> source; // multiple sources
+ public long counter; // a Hadoop user-defined counter used by the `repeat' operator
+ public long records; // total number of dataset records
+
+ /** Construct a DataSet that contains one DataSource
+ * @param s the given DataSource
+ * @param counter a Hadoop user-defined counter used by the `repeat' operator
+ * @param records total number of dataset records
+ */
+ DataSet ( DataSource s, long counter, long records ) {
+ source = new ArrayList<DataSource>();
+ source.add(s);
+ this.counter = counter;
+ this.records = records;
+ }
+
+ /** Construct a set of DataSources
+ * @param counter a Hadoop user-defined counter used by the `repeat' operator
+ * @param records total number of dataset records
+ */
+ DataSet ( long counter, long records ) {
+ source = new ArrayList<DataSource>();
+ this.counter = counter;
+ this.records = records;
+ }
+
+ /** add a DataSource to this DataSet */
+ public void add ( DataSource s ) {
+ source.add(s);
+ }
+
+ /** merge this DataSet with the given DataSet */
+ public void merge ( DataSet ds ) {
+ source.addAll(ds.source);
+ counter += ds.counter;
+ records += ds.records;
+ }
+
+ /** dataset size in bytes */
+ public long size ( Configuration conf ) {
+ long n = 0;
+ for (DataSource s: source)
+ n += s.size(conf);
+ return n;
+ }
+
+ /** return a single DataSource path by merging all the DataSource paths in this DataSet */
+ public String merge () {
+ Object[] ds = source.toArray();
+ String path = ((DataSource)ds[0]).path.toString();
+ for ( int i = 1; i < ds.length; i++ )
+ path += ","+((DataSource)ds[i]).path;
+ return path;
+ }
+
+ /** return the first num values */
+ public List<MRData> take ( int num ) {
+ int count = num;
+ ArrayList<MRData> res = new ArrayList<MRData>();
+ for ( DataSource s: source ) {
+ res.addAll(s.take(count));
+ if (res.size() < count)
+ count = count-res.size();
+ else return res;
+ };
+ return res;
+ }
+
+ /** accumulate all dataset values */
+ public MRData reduce ( MRData zero, Function acc ) {
+ MRData res = zero;
+ for ( DataSource s: source )
+ res = s.reduce(res,acc);
+ return res;
+ }
+
+ public String toString () {
+ String p = "<"+counter;
+ for (DataSource s: source)
+ p += ","+s;
+ return p+">";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/core/src/main/java/org/apache/mrql/DataSource.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/DataSource.java b/core/src/main/java/org/apache/mrql/DataSource.java
new file mode 100644
index 0000000..6a0ef4f
--- /dev/null
+++ b/core/src/main/java/org/apache/mrql/DataSource.java
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import org.apache.mrql.gen.*;
+import java.io.IOException;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Iterator;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.conf.Configuration;
+
+
+/** A DataSource is any input data source, such as a text file, a key/value map, a data base, an intermediate file, etc */
+public class DataSource {
+ final public static String separator = "%%%";
+ public static DataSourceDirectory dataSourceDirectory = new DataSourceDirectory();
+ public static ParserDirectory parserDirectory = new ParserDirectory();
+ private static boolean loaded = false;
+
+ public String path;
+ public Class<? extends MRQLFileInputFormat> inputFormat;
+ public int source_num;
+ public boolean to_be_merged; // if the path is a directory with multiple files, merge them
+
+ final static class ParserDirectory extends HashMap<String,Class<? extends Parser>> {
+ }
+
+ /** A dictionary that maps data source paths to DataSource data.
+ * It assumes that each path can only be associated with a single data source format and parser
+ */
+ final static class DataSourceDirectory extends HashMap<String,DataSource> {
+ public void read ( Configuration conf ) {
+ clear();
+ for ( String s: conf.get("mrql.data.source.directory").split("@@@") ) {
+ String[] p = s.split("===");
+ put(p[0],DataSource.read(p[1],conf));
+ }
+ }
+
+ public String toString () {
+ String s = "";
+ for ( String k: keySet() )
+ s += "@@@"+k+"==="+get(k);
+ if (s.equals(""))
+ return s;
+ else return s.substring(3);
+ }
+
+ public DataSource get ( String name ) {
+ for ( Map.Entry<String,DataSource> e: entrySet() )
+ if (name.startsWith(e.getKey()))
+ return e.getValue();
+ return null;
+ }
+
+ public void distribute ( Configuration conf ) {
+ conf.set("mrql.data.source.directory",toString());
+ }
+ }
+
+ DataSource () {}
+
+ DataSource ( int source_num,
+ String path,
+ Class<? extends MRQLFileInputFormat> inputFormat,
+ Configuration conf ) {
+ this.source_num = source_num;
+ this.path = path;
+ this.inputFormat = inputFormat;
+ to_be_merged = false;
+ try {
+ Path p = new Path(path);
+ FileSystem fs = p.getFileSystem(conf);
+ String complete_path = fs.getFileStatus(p).getPath().toString();
+ //String complete_path = "file:"+path;
+ this.path = complete_path;
+ dataSourceDirectory.put(this.path,this);
+ } catch (IOException e) {
+ throw new Error(e);
+ }
+ }
+
+ public static void loadParsers() {
+ if (!loaded) {
+ DataSource.parserDirectory.put("xml",XMLParser.class);
+ DataSource.parserDirectory.put("json",JsonFormatParser.class);
+ DataSource.parserDirectory.put("line",LineParser.class);
+ loaded = true;
+ }
+ }
+
+ static {
+ loadParsers();
+ }
+
+ private static long size ( Path path, Configuration conf ) throws IOException {
+ FileStatus s = path.getFileSystem(conf).getFileStatus(path);
+ if (!s.isDir())
+ return s.getLen();
+ long size = 0;
+ for ( FileStatus fs: path.getFileSystem(conf).listStatus(path) )
+ size += fs.getLen();
+ return size;
+ }
+
+ /** data set size in bytes */
+ public long size ( Configuration conf ) {
+ try {
+ return size(new Path(path),conf);
+ } catch (IOException e) {
+ throw new Error(e);
+ }
+ }
+
+ public static DataSource read ( String buffer, Configuration conf ) {
+ try {
+ String[] s = buffer.split(separator);
+ int n = Integer.parseInt(s[1]);
+ if (s[0].equals("Binary"))
+ return new BinaryDataSource(n,s[2],conf);
+ else if (s[0].equals("Generator"))
+ return new GeneratorDataSource(n,s[2],conf);
+ else if (s[0].equals("Text"))
+ return new ParsedDataSource(n,s[3],parserDirectory.get(s[2]),((Node)Tree.parse(s[4])).children(),conf);
+ else throw new Error("Unrecognized data source: "+s[0]);
+ } catch (Exception e) {
+ throw new Error(e);
+ }
+ }
+
+ public static DataSource get ( String path, Configuration conf ) {
+ if (dataSourceDirectory.isEmpty())
+ dataSourceDirectory.read(conf);
+ return dataSourceDirectory.get(path);
+ }
+
+ public static DataSource getCached ( String remote_path, String local_path, Configuration conf ) {
+ DataSource ds = get(remote_path,conf);
+ ds.path = local_path;
+ dataSourceDirectory.put(local_path,ds);
+ return ds;
+ }
+
+ /** return the first num values */
+ public List<MRData> take ( int num ) {
+ int count = num;
+ try {
+ ArrayList<MRData> res = new ArrayList<MRData>();
+ Iterator<MRData> it = inputFormat.newInstance().materialize(new Path(path)).iterator();
+ for ( int i = num; (num < 0 || i > 0) && it.hasNext(); i-- )
+ if (Config.hadoop_mode && Config.bsp_mode)
+ res.add(((Tuple)it.next()).get(1)); // strip tag in BSP mode
+ else res.add(it.next());
+ return res;
+ } catch (Exception ex) {
+ throw new Error(ex);
+ }
+ }
+
+ static Tuple tuple_container = new Tuple(new Tuple(),new Tuple());
+
+ /** accumulate all datasource values */
+ public MRData reduce ( MRData zero, final Function acc ) {
+ try {
+ MRData res = zero;
+ for ( MRData x: inputFormat.newInstance().materialize(new Path(path)) ) {
+ if (Config.hadoop_mode && Config.bsp_mode)
+ x = ((Tuple)x).get(1); // strip tag in BSP mode
+ tuple_container.set(0,res).set(1,x);
+ res = acc.eval(tuple_container);
+ };
+ return res;
+ } catch (Exception ex) {
+ throw new Error(ex);
+ }
+ }
+}