You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mrql.apache.org by fe...@apache.org on 2014/03/13 15:24:35 UTC
[10/26] MRQL-32: Refactoring directory structure for Eclipse
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/src/main/java/core/Bag.java
----------------------------------------------------------------------
diff --git a/src/main/java/core/Bag.java b/src/main/java/core/Bag.java
deleted file mode 100644
index b092f30..0000000
--- a/src/main/java/core/Bag.java
+++ /dev/null
@@ -1,578 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.mrql;
-
-import java.util.*;
-import java.io.*;
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.fs.*;
-
-
-/**
- * A sequence of MRData.
- * There are 3 kinds of Bag implementations, which are converted at run-time, when necessary:
- * 1) vector-based (materialized): used for small bags (when size is less than Config.max_materialized_bag);
- * 2) stream-based: can be traversed only once; implemented as Java iterators;
- * 3) spilled to a local file: can be accessed multiple times
- */
-public class Bag extends MRData implements Iterable<MRData> {
- private final static long serialVersionUID = 64629834894869L;
- enum Modes { STREAMED, MATERIALIZED, SPILLED };
- private transient Modes mode;
- private transient ArrayList<MRData> content; // content of a materialized bag
- private transient BagIterator iterator; // iterator for a streamed bag
- private transient boolean consumed; // true, if the stream has already been used
- private transient String path; // local path that contains the spilled bag
- private transient SequenceFile.Writer writer; // the file writer for spiled bags
-
- /**
- * create an empty bag as an ArrayList
- */
- public Bag () {
- mode = Modes.MATERIALIZED;
- content = new ArrayList<MRData>();
- }
-
- /**
- * create an empty bag as an ArrayList with a given capacity
- * @param size initial capacity
- */
- public Bag ( final int size ) {
- mode = Modes.MATERIALIZED;
- content = new ArrayList<MRData>(size);
- }
-
- /**
- * in-memory Bag construction (an ArrayList) initialized with data
- * @param as a vector of MRData to insert in the Bag
- */
- public Bag ( final MRData ...as ) {
- mode = Modes.MATERIALIZED;
- content = new ArrayList<MRData>(as.length);
- for ( MRData a: as )
- content.add(a);
- }
-
- /**
- * in-memory Bag construction (an ArrayList) initialized with data
- * @param as a vector of MRData to insert in the Bag
- */
- public Bag ( final List<MRData> as ) {
- mode = Modes.MATERIALIZED;
- content = new ArrayList<MRData>(as.size());
- for ( MRData a: as )
- content.add(a);
- }
-
- /**
- * lazy construction (stream-based) of a Bag
- * @param i the Iterator that generates the Bag elements
- */
- public Bag ( final BagIterator i ) {
- mode = Modes.STREAMED;
- iterator = i;
- consumed = false;
- }
-
- /** is the Bag stored in an ArrayList? */
- public boolean materialized () {
- return mode == Modes.MATERIALIZED;
- }
-
- /** is the Bag stream-based? */
- public boolean streamed () {
- return mode == Modes.STREAMED;
- }
-
- /** is the Bag spilled into a file? */
- public boolean spilled () {
- return mode == Modes.SPILLED;
- }
-
- /** return the Bag size (cache it in memory if necessary) */
- public int size () {
- if (materialized())
- return content.size();
- if (streamed() && consumed)
- throw new Error("*** The collection stream has already been consumed");
- int i = 0;
- for ( MRData e: this )
- i++;
- if (streamed())
- consumed = true;
- return i;
- }
-
- /** trim the ArrayList that caches the Bag */
- public void trim () {
- if (materialized())
- content.trimToSize();
- }
-
- /** get the n'th element of a Bag (cache it in memory if necessary)
- * @param n the index
- * @return the n'th element
- */
- public MRData get ( final int n ) {
- if (materialized())
- if (n < size())
- return content.get(n);
- else throw new Error("List index out of range: "+n);
- if (streamed() && consumed)
- throw new Error("*** The collection stream has already been consumed");
- int i = 0;
- for ( MRData e: this )
- if (i++ == n)
- return e;
- if (streamed())
- consumed = true;
- throw new Error("Cannot retrieve the "+n+"th element of a sequence");
- }
-
- /** replace the n'th element of a Bag with a new value
- * @param n the index
- * @param value the new value
- * @return the Bag
- */
- public Bag set ( final int n, final MRData value ) {
- if (!materialized())
- throw new Error("Cannot replace an element of a non-materialized sequence");
- content.set(n,value);
- return this;
- }
-
- /** add a new value to a Bag (cache it in memory if necessary)
- * @param x the new value
- */
- public void add ( final MRData x ) {
- materialize();
- if (!spilled() && Config.hadoop_mode
- && size() >= Config.max_materialized_bag)
- spill();
- if (spilled())
- try {
- if (writer == null) { // writer was closed earlier for reading
- FileSystem fs = FileSystem.getLocal(Plan.conf);
- writer = SequenceFile.createWriter(fs,Plan.conf,new Path(path),
- MRContainer.class,NullWritable.class,
- SequenceFile.CompressionType.NONE);
- System.err.println("*** Appending elements to a spilled Bag: "+path);
- };
- writer.append(new MRContainer(x),NullWritable.get());
- } catch (IOException e) {
- throw new Error("Cannot append an element to a spilled Bag: "+path);
- }
- else content.add(x);
- }
-
- /** add a new value to a Bag (cache it in memory if necessary)
- * @param x the new value
- * @return the Bag
- */
- public Bag add_element ( final MRData x ) {
- add(x);
- return this;
- }
-
- /** add the elements of a Bag to the end of this Bag
- * @param b the Bag whose elements are copied
- * @return the Bag
- */
- public Bag addAll ( final Bag b ) {
- for ( MRData e: b )
- add(e);
- return this;
- }
-
- /** make this Bag empty (cache it in memory if necessary) */
- public void clear () {
- if (materialized())
- content.clear();
- else if (streamed()) {
- if (writer != null)
- try {
- writer.close();
- } catch (IOException ex) {
- throw new Error(ex);
- };
- writer = null;
- path = null;
- mode = Modes.MATERIALIZED;
- content = new ArrayList<MRData>(100);
- };
- mode = Modes.MATERIALIZED;
- content = new ArrayList<MRData>();
- }
-
- /** cache the Bag to an ArrayList when is absolutely necessary */
- public void materialize () {
- if (materialized() || spilled())
- return;
- Iterator<MRData> iter = iterator();
- mode = Modes.MATERIALIZED;
- writer = null;
- path = null;
- content = new ArrayList<MRData>(100);
- while ( iter.hasNext() )
- add(iter.next());
- if (materialized()) // it may have been spilled
- content.trimToSize();
- iterator = null;
- }
-
- private static Random random_generator = new Random();
-
- private static String new_path ( FileSystem fs ) throws IOException {
- Path p;
- do {
- p = new Path("file://"+Config.tmpDirectory+"/mrql"+(random_generator.nextInt(1000000)));
- } while (p.getFileSystem(Plan.conf).exists(p));
- String path = p.toString();
- Plan.temporary_paths.add(path);
- return path;
- }
-
- /** spill the Bag to a local file */
- private void spill () {
- if (!spilled() && Config.hadoop_mode)
- try {
- if (Plan.conf == null)
- Plan.conf = Evaluator.evaluator.new_configuration();
- final FileSystem fs = FileSystem.getLocal(Plan.conf);
- path = new_path(fs);
- System.err.println("*** Spilling a Bag to a local file: "+path);
- final Path p = new Path(path);
- writer = SequenceFile.createWriter(fs,Plan.conf,new Path(path),
- MRContainer.class,NullWritable.class,
- SequenceFile.CompressionType.NONE);
- for ( MRData e: this )
- writer.append(new MRContainer(e),NullWritable.get());
- mode = Modes.SPILLED;
- content = null;
- iterator = null;
- } catch (Exception e) {
- throw new Error("Cannot spill a Bag to a local file");
- }
- }
-
- /**
- * sort the Bag (cache it in memory if necessary).
- * If the Bag was spilled during caching, use external sorting
- */
- public void sort () {
- materialize();
- if (spilled()) // if it was spilled during materialize()
- try { // use external sorting
- if (writer != null)
- writer.close();
- FileSystem fs = FileSystem.getLocal(Plan.conf);
- SequenceFile.Sorter sorter
- = new SequenceFile.Sorter(fs,new Plan.MRContainerKeyComparator(),
- MRContainer.class,NullWritable.class,Plan.conf);
- String out_path = new_path(fs);
- System.err.println("*** Using external sorting on a spilled bag "+path+" -> "+out_path);
- sorter.setMemory(64*1024*1024);
- sorter.sort(new Path(path),new Path(out_path));
- path = out_path;
- writer = null;
- } catch (Exception ex) {
- throw new Error("Cannot sort a spilled bag");
- }
- else Collections.sort(content);
- }
-
- /** return the Bag Iterator */
- public Iterator<MRData> iterator () {
- if (spilled())
- try {
- if (writer != null)
- writer.close();
- writer = null;
- return new BagIterator () {
- final FileSystem fs = FileSystem.getLocal(Plan.conf);
- final SequenceFile.Reader reader = new SequenceFile.Reader(fs,new Path(path),Plan.conf);
- final MRContainer key = new MRContainer();
- final NullWritable value = NullWritable.get();
- MRData data;
- public boolean hasNext () {
- try {
- if (!reader.next(key,value)) {
- reader.close();
- return false;
- };
- data = key.data();
- return true;
- } catch (IOException e) {
- throw new Error("Cannot collect values from a spilled Bag");
- }
- }
- public MRData next () {
- return data;
- }
- };
- } catch (IOException e) {
- throw new Error("Cannot collect values from a spilled Bag");
- }
- else if (materialized())
- return content.iterator();
- else {
- if (consumed) // this should never happen
- throw new Error("*** The collection stream has already been consumed");
- consumed = true;
- return iterator;
- }
- }
-
- /** cache MRData in memory by caching all Bags at any place and depth in MRData */
- public void materializeAll () {
- materialize();
- for (MRData e: this)
- e.materializeAll();
- }
-
- /** concatenate the elements of a given Bag to the elements of this Bag.
- * Does not change either Bag
- * @param s the given Bag
- * @return a new Bag
- */
- public Bag union ( final Bag s ) {
- final Iterator<MRData> i1 = iterator();
- final Iterator<MRData> i2 = s.iterator();
- return new Bag(new BagIterator () {
- boolean first = true;
- public boolean hasNext () {
- if (first)
- if (i1.hasNext())
- return true;
- else {
- first = false;
- return i2.hasNext();
- }
- else return i2.hasNext();
- }
- public MRData next () {
- if (first)
- return i1.next();
- else return i2.next();
- }
- });
- }
-
- /** does this Bag contain an element?
- * Cache this Bag in memory befor tetsing if necessary
- * @param x the element to find
- */
- public boolean contains ( final MRData x ) {
- if (materialized())
- return content.contains(x);
- if (streamed() && consumed)
- throw new Error("*** The collection stream has already been consumed");
- for ( MRData e: this )
- if (x.equals(e))
- return true;
- if (streamed())
- consumed = true;
- return false;
- }
-
- /** if this Bag is a Map from keys to values (a Bag of (key,value) pairs),
- * find the value with the given key; raise an error if not found
- * @param key the search key
- * @return the value associated with the key
- */
- public MRData map_find ( final MRData key ) {
- if (streamed() && consumed)
- throw new Error("*** The collection stream has already been consumed");
- for ( MRData e: this ) {
- Tuple p = (Tuple) e;
- if (key.equals(p.first()))
- return p.second();
- };
- if (streamed())
- consumed = true;
- throw new Error("key "+key+" not found in map");
- }
-
- /** if this Bag is a Map from keys to values (a Bag of (key,value) pairs),
- * does it contain a given key?
- * @param key the search key
- */
- public boolean map_contains ( final MRData key ) {
- if (streamed() && consumed)
- throw new Error("*** The collection stream has already been consumed");
- for ( MRData e: this )
- if (key.equals(((Tuple)e).first()))
- return true;
- if (streamed())
- consumed = true;
- return false;
- }
-
- /** the output serializer for Bag.
- * Stream-based Bags are serialized lazily (without having to cache the Bag in memory)
- */
- final public void write ( DataOutput out ) throws IOException {
- if (materialized()) {
- out.writeByte(MRContainer.BAG);
- WritableUtils.writeVInt(out,size());
- for ( MRData e: this )
- e.write(out);
- } else {
- out.writeByte(MRContainer.LAZY_BAG);
- for ( MRData e: this )
- e.write(out);
- out.writeByte(MRContainer.END_OF_LAZY_BAG);
- }
- }
-
- /** the input serializer for Bag */
- final public static Bag read ( DataInput in ) throws IOException {
- int n = WritableUtils.readVInt(in);
- Bag bag = new Bag(n);
- for ( int i = 0; i < n; i++ )
- bag.add(MRContainer.read(in));
- return bag;
- }
-
- /** a lazy input serializer for a Bag (it doesn't need to cache a Bag in memory) */
- public static Bag lazy_read ( final DataInput in ) throws IOException {
- Bag bag = new Bag(100);
- MRData data = MRContainer.read(in);
- while (data != MRContainer.end_of_lazy_bag) {
- bag.add(data);
- data = MRContainer.read(in);
- };
- if (bag.materialized())
- bag.content.trimToSize();
- return bag;
- }
-
- /** the input serializer for Bag */
- public void readFields ( DataInput in ) throws IOException {
- int n = WritableUtils.readVInt(in);
- mode = Modes.MATERIALIZED;
- iterator = null;
- path = null;
- writer = null;
- if (content == null)
- content = new ArrayList<MRData>(n);
- else {
- content.clear();
- content.ensureCapacity(n);
- };
- for ( int i = 0; i < n; i++ )
- add(MRContainer.read(in));
- }
-
- private void writeObject ( ObjectOutputStream out ) throws IOException {
- materialize();
- WritableUtils.writeVInt(out,size());
- for ( MRData e: this )
- e.write(out);
- }
-
- private void readObject ( ObjectInputStream in ) throws IOException, ClassNotFoundException {
- int n = WritableUtils.readVInt(in);
- mode = Modes.MATERIALIZED;
- iterator = null;
- path = null;
- writer = null;
- content = new ArrayList<MRData>(n);
- for ( int i = 0; i < n; i++ )
- add(MRContainer.read(in));
- }
-
- private void readObjectNoData () throws ObjectStreamException { };
-
- /** compare this Bag with a given Bag by comparing their associated elements */
- public int compareTo ( MRData x ) {
- Bag xt = (Bag)x;
- Iterator<MRData> xi = xt.iterator();
- Iterator<MRData> yi = iterator();
- while ( xi.hasNext() && yi.hasNext() ) {
- int c = xi.next().compareTo(yi.next());
- if (c < 0)
- return -1;
- else if (c > 0)
- return 1;
- };
- if (xi.hasNext())
- return -1;
- else if (yi.hasNext())
- return 1;
- else return 0;
- }
-
- /** compare this Bag with a given Bag by comparing their associated elements */
- final public static int compare ( byte[] x, int xs, int xl, byte[] y, int ys, int yl, int[] size ) {
- try {
- int xn = WritableComparator.readVInt(x,xs);
- int xx = WritableUtils.decodeVIntSize(x[xs]);
- int yn = WritableComparator.readVInt(y,ys);
- int yy = WritableUtils.decodeVIntSize(y[ys]);
- for ( int i = 0; i < xn && i < yn; i++ ) {
- int k = MRContainer.compare(x,xs+xx,xl-xx,y,ys+yy,yl-yy,size);
- if (k != 0)
- return k;
- xx += size[0];
- yy += size[0];
- };
- size[0] = xx+1;
- if (xn > yn)
- return 1;
- if (xn < yn)
- return -1;
- return 0;
- } catch (IOException e) {
- throw new Error(e);
- }
- }
-
- /** is this Bag equal to another Bag (order is important) */
- public boolean equals ( Object x ) {
- if (!(x instanceof Bag))
- return false;
- Bag xt = (Bag) x;
- Iterator<MRData> xi = xt.iterator();
- Iterator<MRData> yi = iterator();
- while ( xi.hasNext() && yi.hasNext() )
- if ( !xi.next().equals(yi.next()) )
- return false;
- return xi.hasNext() || yi.hasNext();
- }
-
- /** the hash code of this Bag is the XOR of the hash code of its elements */
- public int hashCode () {
- int h = 127;
- for ( MRData e: this )
- h ^= e.hashCode();
- return Math.abs(h);
- }
-
- /** show the first few Bag elements (controlled by -bag_print) */
- public String toString () {
- materialize();
- StringBuffer b = new StringBuffer("{ ");
- int i = 0;
- for ( MRData e: this )
- if ( i++ < Config.max_bag_size_print )
- b.append(((i>1)?", ":"")+e);
- else return b.append(", ... }").toString();
- return b.append(" }").toString();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/src/main/java/core/BagIterator.java
----------------------------------------------------------------------
diff --git a/src/main/java/core/BagIterator.java b/src/main/java/core/BagIterator.java
deleted file mode 100644
index 0fd7354..0000000
--- a/src/main/java/core/BagIterator.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.mrql;
-
-import java.util.Iterator;
-
-/** an Iterator over Bags */
-abstract public class BagIterator implements Iterator<MRData> {
- public void remove () {
- throw new Error("Bag deletions are not permitted");
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/src/main/java/core/BinaryDataSource.java
----------------------------------------------------------------------
diff --git a/src/main/java/core/BinaryDataSource.java b/src/main/java/core/BinaryDataSource.java
deleted file mode 100644
index d4338ec..0000000
--- a/src/main/java/core/BinaryDataSource.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.mrql;
-
-import org.apache.hadoop.conf.Configuration;
-
-
-/** A DataSource used for storing intermediate results and data dumps */
-final public class BinaryDataSource extends DataSource {
- BinaryDataSource ( int source_num, String path, Configuration conf ) {
- super(source_num,path,Evaluator.evaluator.binaryInputFormat(),conf);
- }
-
- BinaryDataSource ( String path, Configuration conf ) {
- super(-1,path,Evaluator.evaluator.binaryInputFormat(),conf);
- }
-
- public String toString () {
- return "Binary"+separator+source_num+separator+path;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/src/main/java/core/ClassImporter.java
----------------------------------------------------------------------
diff --git a/src/main/java/core/ClassImporter.java b/src/main/java/core/ClassImporter.java
deleted file mode 100644
index 8efc1ae..0000000
--- a/src/main/java/core/ClassImporter.java
+++ /dev/null
@@ -1,202 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.mrql;
-
-import java.lang.reflect.Method;
-import org.apache.mrql.gen.*;
-import java.util.*;
-
-/** imports external Java methods into MRQL */
-final public class ClassImporter {
- final static boolean trace_imported_methods = false;
-
- final static String[] object_methods
- = { "hashCode", "getClass", "wait", "equals", "toString", "notify", "notifyAll" };
-
- static Vector<MethodInfo> methods = new Vector<MethodInfo>();
-
- public static void load_classes () {
- if (methods == null)
- methods = new Vector<MethodInfo>();
- if (methods.size() == 0) {
- importClass("org.apache.mrql.SystemFunctions");
- //****** import your classes with user-defined functions here
- }
- }
-
- private static boolean object_method ( String s ) {
- for (int i = 0; i < object_methods.length; i++)
- if (object_methods[i].equals(s))
- return true;
- return false;
- }
-
- private static Tree getType ( Class<?> c ) {
- String cn = c.getCanonicalName();
- Class<?>[] inf = c.getInterfaces();
- if (cn.equals("org.apache.mrql.MRData"))
- return new VariableLeaf("any");
- if (cn.startsWith("org.apache.mrql.MR_"))
- return new VariableLeaf(cn.substring(19));
- if (cn.equals("org.apache.mrql.Bag"))
- return new Node("bag",new Trees(new VariableLeaf("any")));
- if (cn.equals("org.apache.mrql.Inv"))
- return new VariableLeaf("any");
- if (cn.equals("org.apache.mrql.Union"))
- return new VariableLeaf("union");
- if (cn.equals("org.apache.mrql.Lambda"))
- return new VariableLeaf("any");
- if (inf.length > 0 && inf[0].equals("org.apache.mrql.MRData"))
- return new VariableLeaf("any");
- throw new Error("Unsupported type in imported method: "+cn);
- }
-
- private static Trees signature ( Method m ) {
- Class<?> co = m.getReturnType();
- Class<?>[] cs = m.getParameterTypes();
- Trees as = new Trees(getType(co));
- for (int i = 0; i < cs.length; i++)
- as = as.append(getType(cs[i]));
- return as;
- }
-
- public static String method_name ( int method_number ) {
- return methods.get(method_number).name;
- }
-
- public static Trees signature ( int method_number ) {
- return methods.get(method_number).signature;
- }
-
- /** import all Java methods from a given Java class */
- public static void importClass ( String class_name ) {
- try {
- Method[] ms = Class.forName(class_name).getMethods();
- Vector<MethodInfo> mv = new Vector<MethodInfo>();
- for (int i = 0; i < ms.length; i++)
- if (!object_method(ms[i].getName()) && ms[i].getModifiers() == 9)
- try {
- Trees sig = signature(ms[i]);
- MethodInfo m = new MethodInfo(ms[i].getName(),sig,ms[i]);
- mv.add(m);
- methods.add(m);
- } catch ( Exception e ) {
- System.out.println("Warning: method "+ms[i].getName()+" cannot be imported");
- System.out.println(e);
- throw new Error("");
- };
- Collections.sort(methods);
- if (Translator.functions == null)
- Translator.functions = Trees.nil;
- for ( MethodInfo m: methods )
- Translator.functions = Translator.functions.append(new Node(m.name,m.signature));
- if (trace_imported_methods) {
- System.out.print("Importing methods: ");
- for (int i = 0; i < mv.size(); i++ )
- System.out.print(mv.get(i).name+mv.get(i).signature.tail()
- +":"+mv.get(i).signature.head()+" ");
- System.out.println();
- }
- } catch (ClassNotFoundException x) {
- throw new Error("Undefined class: "+class_name);
- }
- }
-
- /** import a Java method with a given name from a given Java class */
- public static void importMethod ( String class_name, String method_name ) {
- try {
- Method[] ms = Class.forName(class_name).getMethods();
- MethodInfo m = null;
- for (int i = 0; i < ms.length; i++)
- if (ms[i].getName().equals(method_name)
- && !object_method(ms[i].getName()) && ms[i].getModifiers() == 9) {
- Trees sig = signature(ms[i]);
- m = new MethodInfo(ms[i].getName(),sig,ms[i]);
- Translator.functions = Translator.functions.append(new Node(ms[i].getName(),sig));
- break;
- };
- if (m == null)
- throw new Error("No such method: "+method_name);
- methods.add(m);
- Collections.sort(methods);
- if (trace_imported_methods)
- System.out.println("Importing method: "+m.name+m.signature.tail()
- +":"+m.signature.head()+" ");
- } catch (ClassNotFoundException x) {
- throw new Error("Undefined class: "+class_name);
- }
- }
-
- public static void print_methods () {
- for (int i = 0; i < methods.size(); i++ ) {
- MethodInfo m = methods.get(i);
- System.out.print(" "+m.name+":"+m.signature.tail()+"->"+m.signature.head());
- };
- }
-
- /** return the method specification of a system method with a given name over some expressions;
- * When the method is overloaded, find the most specific (in terms of arg subtyping)
- * @param method_name the given method name
- * @param args the method expressions
- * @return the method specification
- */
- public static Tree find_method ( String method_name, Trees args ) {
- for (int i = 0; i < methods.size(); i++ ) {
- MethodInfo m = methods.get(i);
- if (m.name.equals(method_name) && TypeInference.subtype(args,m.signature.tail()))
- return m.signature.head();
- };
- return null;
- }
-
- /** return the method number of a system method with a given name over some expressions;
- * When the method is overloaded, find the most specific (in terms of arg subtyping)
- * @param method_name the given method name
- * @param args the method expressions
- * @return the method number
- */
- public static int find_method_number ( String method_name, Trees args ) {
- for (int i = 0; i < methods.size(); i++ ) {
- MethodInfo m = methods.get(i);
- if (m.name.equals(method_name) && TypeInference.subtype(args,m.signature.tail()))
- return i;
- };
- return -1;
- }
-
- /** call a system method with a given number over MRData
- * @param method_number the method number
- * @param args in input arguments
- * @return the result of invoking this method over the args
- */
- public static MRData call ( int method_number, MRData... args ) {
- if (method_number < 0 || method_number >= methods.size())
- throw new Error("Run-time error (unknown method name)");
- MethodInfo m = methods.get(method_number);
- try {
- return (MRData)m.method.invoke(null,(Object[])args);
- } catch (Exception e) {
- Tuple t = new Tuple(args.length);
- for ( int i = 0; i < args.length; i++ )
- t.set(i,args[i]);
- System.err.println("Run-time error in method call: "+m.name+t+" of type "
- +m.signature.tail()+"->"+m.signature.head());
- throw new Error(e.toString());
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/src/main/java/core/Compiler.gen
----------------------------------------------------------------------
diff --git a/src/main/java/core/Compiler.gen b/src/main/java/core/Compiler.gen
deleted file mode 100644
index 668cdc8..0000000
--- a/src/main/java/core/Compiler.gen
+++ /dev/null
@@ -1,584 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.mrql;
-
-import org.apache.mrql.gen.*;
-import java.io.*;
-import javax.tools.*;
-import javax.tools.JavaCompiler.CompilationTask;
-import java.lang.reflect.*;
-import java.util.*;
-import java.net.*;
-import java.util.jar.*;
-import org.apache.hadoop.io.WritableComparable;
-
-
-/** compilation of MRQL expressions to Java code and then to Java bytecode */
-final public class Compiler extends Translator {
- static JavaCompiler compiler = ToolProvider.getSystemJavaCompiler();
- static DiagnosticCollector<JavaFileObject> diagnostics = new DiagnosticCollector<JavaFileObject>();
- final static String tmp_dir = "/tmp/mrql_jar_"+System.getProperty("user.name");
- public static String jar_path;
- static Trees in_memory_repeat_vars = #[];
- static int lambda_num = 0;
- static int user_functions_num = 0;
-
- /** Compile the MRQL functional arguments into Java bytecode */
- final private static class JavaSourceFromString extends SimpleJavaFileObject {
- final String code;
-
- JavaSourceFromString ( String name, String code ) {
- super(URI.create("string:///org/apache/mrql/" + name.replace('.','/') + Kind.SOURCE.extension),Kind.SOURCE);
- this.code = code;
- }
-
- @Override
- public CharSequence getCharContent ( boolean ignoreEncodingErrors ) {
- return code;
- }
- }
-
- private static String new_lambda_name () {
- return "MRQL_Lambda_"+(lambda_num++);
- }
-
- private static void add2jar ( File source, int offset, JarOutputStream target ) throws IOException {
- if (source.isDirectory()) {
- String name = source.getPath();
- if (name.length() > offset) {
- JarEntry entry = new JarEntry(name.substring(offset));
- entry.setTime(source.lastModified());
- target.putNextEntry(entry);
- target.closeEntry();
- };
- for ( File nestedFile: source.listFiles() )
- add2jar(nestedFile,offset,target);
- } else {
- JarEntry entry = new JarEntry(source.getPath().substring(offset));
- entry.setTime(source.lastModified());
- target.putNextEntry(entry);
- BufferedInputStream in = new BufferedInputStream(new FileInputStream(source));
- byte[] buffer = new byte[1024];
- int count = 1;
- while (count > 0) {
- count = in.read(buffer);
- if (count > 0)
- target.write(buffer,0,count);
- };
- target.closeEntry();
- in.close();
- }
- }
-
- private static void remove ( File file ) throws IOException {
- if (file.isDirectory())
- for ( File nestedFile: file.listFiles() )
- remove(nestedFile);
- file.delete();
- }
-
- public static void clean () throws IOException {
- in_memory_repeat_vars = #[];
- remove(new File(tmp_dir));
- }
-
- final private static Tree compile ( Tree e, StringBuffer out ) throws Exception {
- match e {
- case repeat(lambda(`v,`body),`s,`n):
- in_memory_repeat_vars = in_memory_repeat_vars.cons(v);
- return #<repeat(lambda(`v,`(compile(body,out))),`(compile(s,out)),`n)>;
- case `f(...al):
- if (!plans_with_distributed_lambdas.member(#<`f>))
- fail;
- Trees nl = #[];
- for ( Tree a: al)
- match Interpreter.closure(a,Interpreter.global_env) {
- case lambda(`v,`body):
- String fname = new_lambda_name();
- StringBuffer sb = new StringBuffer(1000);
- sb.append("final class "+fname+" extends Function {\n");
- sb.append(fname+" () {}\n");
- sb.append("final public MRData eval ( final MRData "+v
- +" ) { return "+compileE(body)+"; }\n}\n");
- out.append(sb);
- nl = nl.append(#<compiled(`fname,`a)>);
- case _: nl = nl.append(compile(a,out));
- };
- return #<`f(...nl)>;
- case `f(...al):
- Trees nl = #[];
- for ( Tree a: al)
- nl = nl.append(compile(a,out));
- return #<`f(...nl)>;
- };
- return e;
- }
-
- private static StringBuffer out;
-
- /** compile the functional arguments of the MRQL operators using the Java compiler
- * @param query the expression to compile
- * @return the query with all functional arguments compiled to Java bytecode
- */
- final public static Tree compile ( Tree query ) {
- try {
- user_functions_num = lambda_num++;
- // remove the old jar
- if (jar_path != null)
- remove(new File(jar_path));
- jar_path = tmp_dir+"/mrql_args_"+(new Random().nextInt(1000000))+".jar";
- out = new StringBuffer(1000);
- out.append("package org.apache.mrql;\n");
- Tree nq = compile(query,out);
- StringBuffer sb = new StringBuffer(1000);
- for ( String f: global_functions )
- match global_functions.lookup(f) {
- case function(tuple(...params),`otp,`body):
- sb.append("final public static "+get_MR_type(otp)+" "+f);
- if (params.is_empty())
- sb.append(" ()");
- else {
- match params.head() {
- case bind(`v,`tp):
- sb.append(" ( final "+get_MR_type(tp)+" "+v);
- };
- for ( Tree var: params.tail() )
- match var {
- case bind(`v,`tp):
- sb.append(", final "+get_MR_type(tp)+" "+v);
- }
- sb.append(" ) { return ("+get_MR_type(otp)+")");
- sb.append(compileE(body));
- sb.append("; }\n");
- }
- };
- out.append("final class UserFunctions_"+user_functions_num+" {\n");
- out.append(sb);
- out.append("}\n");
- String code = out.toString();
- //System.out.println(code);
- JavaFileObject file = new JavaSourceFromString("UserFunctions_"+user_functions_num,code);
- Iterable<? extends JavaFileObject> compilationUnits = Arrays.asList(file);
- List<String> optionList = new ArrayList<String>();
- (new File(tmp_dir)).mkdir();
- String dir = tmp_dir+"/classes_"+(new Random().nextInt(1000000));
- File fdir = new File(dir);
- fdir.mkdir();
- ClassLoader classLoader = Thread.currentThread().getContextClassLoader();
- String classpath = jar_path;
- String separator = System.getProperty("path.separator");
- for ( URL url: ((URLClassLoader) classLoader).getURLs() )
- classpath += separator+url.getFile();
- // use hadoop core jar
- classpath += separator + WritableComparable.class.getProtectionDomain().getCodeSource().getLocation().toString();
- optionList.addAll(Arrays.asList("-classpath",classpath));
- optionList.addAll(Arrays.asList("-d",dir));
- CompilationTask task = compiler.getTask(null,null,diagnostics,optionList,null,compilationUnits);
- boolean success = task.call();
- if (!success)
- for ( Diagnostic d: diagnostics.getDiagnostics() )
- System.err.println("*** Compilation error at line "+d.getLineNumber()+" position "
- +d.getColumnNumber()+": "+d.getMessage(Locale.US));
- Manifest manifest = new Manifest();
- manifest.getMainAttributes().put(Attributes.Name.MANIFEST_VERSION,"1.0");
- JarOutputStream target = new JarOutputStream(new FileOutputStream(jar_path),manifest);
- add2jar(new File(dir+"/"),dir.length()+1,target);
- target.close();
- remove(fdir);
- return nq;
- } catch (Exception e) {
- System.err.println("*** Warning: Unable to compile the query:\n"+query);
- if (Config.trace)
- e.printStackTrace(System.err);
- return query;
- }
- }
-
- /** load the Java class of the anonymous function with name lambda_name */
- final public static Function compiled ( ClassLoader cl, String lambda_name ) throws Exception {
- URL[] urls = ((URLClassLoader) cl).getURLs();
- URL[] new_urls = new URL[urls.length+1];
- for ( int i = 0; i < urls.length; i++ )
- new_urls[i+1] = urls[i];
- new_urls[0] = new URL("file://"+jar_path);
- URLClassLoader loader = new URLClassLoader(new_urls,cl);
- Class c = loader.loadClass("org.apache.mrql."+lambda_name);
- Constructor cc = c.getDeclaredConstructors()[0];
- cc.setAccessible(true);
- return (Function)cc.newInstance();
- }
-
- /** The Java type of an MRQL type */
- private static String get_MR_type ( Tree type ) {
- match type {
- case boolean: return "MR_bool";
- case byte: return "MR_byte";
- case short: return "MR_short";
- case int: return "MR_int";
- case long: return "MR_long";
- case float: return "MR_float";
- case double: return "MR_double";
- case char: return "MR_char";
- case string: return "MR_string";
- case union: return "Union";
- case bag(...): return "Bag";
- case list(...): return "Bag";
- };
- return "MRData";
- }
-
- private static Trees remove_duplicates ( Trees al ) {
- if (al.is_empty())
- return al;
- Trees el = remove_duplicates(al.tail());
- if (el.member(al.head()))
- return el;
- else return el.cons(al.head());
- }
-
- /** lambda lifting: generate Java code from an anonymous function */
- private static String compilef ( String v, Tree body ) throws Exception {
- String fname = new_lambda_name();
- Trees free_vars = remove_duplicates(free_variables(body,#[`v]));
- StringBuffer sb = new StringBuffer(1000);
- sb.append("final class "+fname+" extends Function {\n");
- for ( Tree var: free_vars )
- sb.append("MRData "+var+"; ");
- sb.append("\npublic "+fname+" (");
- if (free_vars.is_empty())
- sb.append(") {}\n");
- else {
- sb.append(" MRData "+free_vars.head());
- for ( Tree var: free_vars.tail() )
- sb.append(", MRData "+var);
- sb.append(" ) { ");
- for ( Tree var: free_vars )
- sb.append("this."+var+" = "+var+"; ");
- sb.append("}\n");
- };
- sb.append("final public MRData eval ( final MRData "+v
- +" ) { return "+compileE(body)+"; }\n}\n");
- out.append(sb);
- String s = "new "+fname+"(";
- if (!free_vars.is_empty()) {
- s += free_vars.head();
- for ( Tree var: free_vars.tail() )
- s += ","+var;
- };
- return s+")";
- }
-
- private static String compileF ( Tree fnc ) throws Exception {
- match fnc {
- case lambda(`v,`b):
- return compilef(v.toString(),b);
- case compiled(`f,`lm):
- // recompile the function
- String s = compileF(lm);
- ((Node)fnc).children().head = new VariableLeaf(s.substring(4,s.indexOf("("))); //destructive
- return s;
- case function(tuple(...params),`tp,`body):
- String ret = "new Lambda(new Function () { "
- +"final public MRData eval ( final MRData _x ) { ";
- for ( int i = 0; i < params.length(); i++ )
- match params.nth(i) {
- case bind(`v,_):
- ret += "final MRData "+v+" = ((Tuple)_x).get("+i+"); ";
- };
- return ret+" return "+compileE(body)+"; } })";
- };
- throw new Exception("Ill-formed lambda: "+fnc);
- }
-
- private static String compileEL ( Trees el ) throws Exception {
- if (el.is_empty())
- return "";
- String ret = compileE(el.head());
- for ( Tree a: el.tail() )
- ret += ","+compileE(a);
- return ret;
- }
-
- private static String compileE ( Tree e ) throws Exception {
- if (e == null)
- return "(new MR_byte(0))";
- if (e.equals(#<true>))
- return "(new MR_bool(true))";
- else if (e.equals(#<false>))
- return "(new MR_bool(false))";
- else if (e.equals(#<null>))
- return "(new MR_byte(0))";
- else if (e.is_variable())
- if (in_memory_repeat_vars.member(e))
- return "Interpreter.lookup_global_binding(\""+e.toString()+"\")";
- else return e.toString();
- else if (e.is_long())
- return "(new MR_int("+((LongLeaf)e).value()+"))";
- else if (e.is_double())
- return "(new MR_float("+((DoubleLeaf)e).value()+"))";
- else if (e.is_string())
- return "(new MR_string("+e.toString()+"))";
- match e {
- case callM(and,_,`x,`y):
- return "(new MR_bool(((MR_bool)"+compileE(x)
- +").get() && ((MR_bool)"+compileE(y)+").get()))";
- case callM(or,_,`x,`y):
- return "(new MR_bool(((MR_bool)"+compileE(x)
- +").get() || ((MR_bool)"+compileE(y)+").get()))";
- case callM(not,_,`x):
- return "(new MR_bool(!((MR_bool)"+compileE(x)+").get()))";
- case callM(`f,`n,...args):
- if (!n.is_long())
- fail;
- String ret = "SystemFunctions."+ClassImporter.method_name((int)((LongLeaf)n).value())+"(";
- Trees sig = ClassImporter.signature((int)((LongLeaf)n).value());
- for (int i = 0; i < args.length(); i++)
- ret += ((i > 0) ? ",(" : "(")+get_MR_type(sig.nth(i+1))+")("+compileE(args.nth(i))+")";
- return ret+")";
- case lambda(`v,`body):
- return "new Lambda("+compilef(v.toString(),body)+")";
- case nth(`x,`n):
- return "(((Tuple)("+compileE(x)+")).get("+((LongLeaf)n).value()+"))";
- case setNth(`x,`n,`v,`ret):
- return "(((Tuple)("+compileE(x)+")).set("+((LongLeaf)n).value()+","+compileE(v)+","+compileE(ret)+"))";
- case materialize(`u):
- return "MapReduceAlgebra.materialize("+compileE(u)+")";
- case let(`v,`u,`body):
- return "(new Function () { public MRData eval ( final MRData "+v
- +" ) { if ("+v+" instanceof Bag) ((Bag)"+v+").materialize(); return "
- +compileE(body)+"; }; }).eval("+compileE(u)+")";
- case cmap(`m,`s):
- return "MapReduceAlgebra.cmap("+compileF(m)+",(Bag)("+compileE(s)+"))";
- case filter(`p,`m,`s):
- return "MapReduceAlgebra.filter("+compileF(p)+","+compileF(m)
- +",(Bag)"+compileE(s)+")";
- case map(`m,`s):
- return "MapReduceAlgebra.map("+compileF(m)+",(Bag)"+compileE(s)+")";
- case range(`min,`max):
- return "MapReduceAlgebra.generator(((MR_long)"+compileE(min)+").get(),"
- +"((MR_long)"+compileE(max)+").get())";
- case call(`f,...args):
- return "("+compileF(f)+".eval("+compileE(#<tuple(...args)>)+"))";
- case tuple():
- return "(new Tuple())";
- case tuple(`x):
- return "(new Tuple("+compileE(x)+"))";
- case tuple(`a,...el):
- String ret = "(new Tuple("+compileE(a);
- for ( Tree x: el )
- ret += ","+compileE(x);
- return ret+"))";
- case tagged_union(`n,`u):
- return "(new Union((byte)"+((LongLeaf)n).value()+","+compileE(u)+"))";
- case union_value(`x):
- return "(((Union)"+compileE(x)+").value())";
- case union_tag(`x):
- return "(new MR_int(((Union)"+compileE(x)+").tag()))";
- // used for shortcutting sync in bsp supersteps
- case BAG():
- return "SystemFunctions.bsp_empty_bag";
- case TRUE():
- return "SystemFunctions.bsp_true_value";
- case FALSE():
- return "SystemFunctions.bsp_false_value";
- case `T():
- if (is_collection(T))
- return "(new Bag())";
- else fail
- case `T(e):
- if (is_collection(T))
- return "(new Bag("+compileE(e)+"))";
- else fail
- case `T(`a,...el):
- if (!is_collection(T))
- fail;
- String ret = "(new Bag("+compileE(a);
- for ( Tree x: el )
- ret += ",(MRData)"+compileE(x);
- return ret+"))";
- case if(`c,`x,`y):
- return "((((MR_bool)"+compileE(c)+").get())?"+compileE(x)+":"+compileE(y)+")";
- case synchronize(`peer,`b):
- return "SystemFunctions.synchronize(((MR_string)"+compileE(peer)+"),(MR_bool)"+compileE(b)+")";
- case distribute(`peer,`s):
- return "SystemFunctions.distribute(((MR_string)"+compileE(peer)+"),(Bag)"+compileE(s)+")";
- case mapReduce(`mx,`my,`s,_):
- return "MapReduceAlgebra.mapReduce("+compileF(mx)+","+compileF(my)+",(Bag)("+compileE(s)+"))";
- case mapReduce2(`mx,`my,`r,`x,`y,_):
- return "MapReduceAlgebra.mapReduce2("+compileF(mx)+","+compileF(my)+","+compileF(r)
- +",(Bag)("+compileE(x)+"),(Bag)("+compileE(y)+"))";
- case mapJoin(`kx,`ky,`r,`x,`y):
- return "MapReduceAlgebra.mapJoin("+compileF(kx)+","+compileF(ky)+","+compileF(r)
- +",(Bag)("+compileE(x)+"),(Bag)("+compileE(y)+"))";
- case join(`kx,`ky,`r,`x,`y):
- return "MapReduceAlgebra.join("+compileF(kx)+","+compileF(ky)+","+compileF(r)
- +",(Bag)("+compileE(x)+"),(Bag)("+compileE(y)+"))";
- case groupBy(`s):
- return "MapReduceAlgebra.groupBy((Bag)("+compileE(s)+"))";
- case index(`x,`n):
- return "SystemFunctions.index((Bag)("+compileE(x)+"),"+compileE(n)+")";
- case range(`x,`i,`j):
- return "SystemFunctions.range((Bag)("+compileE(x)+"),"+compileE(i)+","+compileE(j)+")";
- case map_index(`x,`key):
- return "((Bag)("+compileE(x)+")).map_find("+compileE(key)+")";
- case aggregate(`acc,`zero,`s):
- return "MapReduceAlgebra.aggregate("+compileF(acc)+","+compileE(zero)+",(Bag)("+compileE(s)+"))";
- case Aggregate(`acc,`zero,`s):
- return "MapReducePlan.aggregate("+compileF(acc)+","+compileE(zero)+","+compileM(s)+")";
- case mergeGroupByJoin(`kx,`ky,`gx,`gy,`m,`c,`r,`x,`y,`o):
- return "MapReduceAlgebra.mergeGroupByJoin("+compileF(kx)+","+compileF(ky)
- +","+compileF(gx)+","+compileF(gy)+","+compileF(m)+","+compileF(c)
- +","+compileF(r)+",(Bag)"+compileE(x)+",(Bag)"+compileE(y)+")";
- case function(tuple(...params),`tp,`body):
- return compileF(e);
- case typed(`x,_):
- return compileE(x);
- case apply(`f,tuple(...args)):
- if (!f.is_variable())
- fail;
- match global_functions.lookup(f.toString()) {
- case function(tuple(...params),`otp,`body):
- String ret = "UserFunctions_"+user_functions_num+"."+f+"(";
- if (args.is_empty())
- return ret+")";
- for ( int i = 0; i < params.length(); i++ )
- match params.nth(i) {
- case bind(_,`tp):
- ret += ((i==0)?"":",")+"("+get_MR_type(tp)+")"+compileE(args.nth(i));
- };
- return ret+")";
- };
- case apply(`f,`arg):
- if (!f.is_variable())
- fail;
- match global_functions.lookup(f.toString()) {
- case function(tuple(...params),`otp,`body):
- String ac = compileE(arg);
- String ret = "UserFunctions_"+user_functions_num+"."+f+"(";
- for ( int i = 0; i < params.length(); i++ )
- match params.nth(i) {
- case bind(_,`tp):
- ret += ((i==0)?"":",")+"("+get_MR_type(tp)+")((Tuple)"+ac+").get("+i+")";
- };
- return ret+")";
- };
- case apply(`f,`arg):
- if (!f.is_variable())
- return "("+compileF(f)+").eval("+compileE(arg)+")";
- else return "(((Lambda)"+compileE(f)+").lambda().eval("+compileE(arg)+"))";
- case Collect(`s):
- return "Plan.collect("+compileM(s)+")";
- case trace(`x):
- return compileE(x);
- case _:
- return compileM(e);
- };
- throw new Exception("Cannot compile: "+e);
- }
-
- final private static String compileM ( Tree e ) throws Exception {
- match e {
- case cMap(`f,`s):
- return "MapReduceAlgebra.cmap("+compileF(f)+",(Bag)"+compileM(s)+")";
- case AggregateMap(`f,`acc,`zero,`s):
- return "new Bag(MapReduceAlgebra.aggregate("+compileF(acc)+","+compileE(zero)
- +","+compileM(#<cMap(`f,`s)>)+"))";
- case MapReduce(`m,`r,`s,_):
- return "MapReduceAlgebra.mapReduce("+compileF(m)+","
- +compileF(r)+",(Bag)"+compileM(s)+")";
- case MapAggregateReduce(`m,`r,`acc,`zero,`s,_):
- return "new Bag(MapReduceAlgebra.aggregate("+compileF(acc)
- +","+compileE(zero)+","+compileM(#<MapReduce(`m,`r,`s)>)+"))";
- case MapCombineReduce(`m,`c,`r,`s,_):
- return "MapReduceAlgebra.mapReduce("+compileF(m)
- +","+compileF(r)+",(Bag)"+compileM(s)+")";
- case MapReduce2(`mx,`my,`c,`r,`x,`y,_):
- return "MapReduceAlgebra.mapReduce2("+compileF(mx)+","+compileF(my)
- +","+compileF(r)+",(Bag)"+compileM(x)+",(Bag)"+compileM(y)+")";
- case MapReduce2(`mx,`my,`r,`x,`y,_):
- return "MapReduceAlgebra.mapReduce2("+compileF(mx)+","+compileF(my)
- +","+compileF(r)+",(Bag)"+compileM(x)+",(Bag)"+compileM(y)+")";
- case MapAggregateReduce2(`mx,`my,`r,`acc,`zero,`x,`y,_):
- return "new Bag(MapReduceAlgebra.aggregate("+compileF(acc)+","+compileE(zero)
- +","+compileM(#< MapReduce2(`mx,`my,`r,`x,`y)>)+"))";
- case MapJoin(`kx,`ky,`r,`x,`y):
- return "MapReduceAlgebra.mapJoin("+compileF(kx)+","+compileF(ky)
- +","+compileF(r)+",(Bag)"+compileM(x)+",(Bag)"+compileM(y)+")";
- case MapAggregateJoin(`kx,`ky,`r,`acc,`zero,`x,`y):
- return "new Bag(MapReduceAlgebra.aggregate("+compileF(acc)
- +","+compileE(zero)+","+compileM(#<MapJoin(`kx,`ky,`r,`x,`y)>)+"))";
- case GroupByJoin(`kx,`ky,`gx,`gy,`m,`c,`r,`x,`y,`o):
- return "MapReduceAlgebra.groupByJoin("+compileF(kx)+","+compileF(ky)
- +","+compileF(gx)+","+compileF(gy)+","+compileF(m)+","+compileF(c)
- +","+compileF(r)+",(Bag)"+compileM(x)+",(Bag)"+compileM(y)+")";
- case CrossProduct(`mx,`my,`r,`x,`y):
- return "MapReduceAlgebra.crossProduct("+compileF(mx)+","+compileF(my)
- +","+compileF(r)+",(Bag)"+compileM(x)+",(Bag)"+compileM(y)+")";
- case CrossAggregateProduct(`mx,`my,`r,`acc,`zero,`x,`y):
- return "new Bag(MapReduceAlgebra.aggregate("+compileF(acc)+","+compileE(zero)
- +","+compileM(#<CrossProduct(`mx,`my,`r,`x,`y)>)+"))";
- case BSPSource(`n,BinarySource(`file,_)):
- if (Config.hadoop_mode)
- return "Plan.binarySource("+compileE(n)+",((MR_string)"
- +compileE(file)+").get())";
- else return "(Bag)MapReduceAlgebra.read_binary("+compileE(n)+",((MR_string)"
- +compileE(file)+").get())";
- case BinarySource(`file,_):
- if (Config.hadoop_mode)
- return "Plan.binarySource(((MR_string)"+compileE(file)+").get())";
- else return "(Bag)MapReduceAlgebra.read_binary(((MR_string)"+compileE(file)+").get())";
- case BSPSource(`n,ParsedSource(`parser,`file,...args)):
- if (!(n instanceof LongLeaf))
- fail;
- if (!Config.hadoop_mode)
- return "MapReduceAlgebra.parsedSource(((MR_int)"+compileE(n)+").get(),\""
- +parser+"\",((MR_string)"+compileE(file)+").get(),"
- +reify(args)+")";
- Class<? extends Parser> p = DataSource.parserDirectory.get(parser.toString());
- if (p == null)
- throw new Error("Unknown parser: "+parser);
- return "Plan.parsedSource("+p.getName()+".class,((MR_string)"+compileE(file)+").get(),"
- +reify(args)+")";
- case ParsedSource(`parser,`file,...args):
- if (!Config.hadoop_mode)
- return "MapReduceAlgebra.parsedSource(\""+parser+"\",((MR_string)"
- +compileE(file)+").get(),"+reify(args)+")";
- Class<? extends Parser> p = DataSource.parserDirectory.get(parser.toString());
- if (p == null)
- throw new Error("Unknown parser: "+parser);
- return "Plan.parsedSource("+p.getName()+".class,((MR_string)"+compileE(file)+").get(),"
- +reify(args)+")";
- case Merge(`x,`y):
- return "((Bag)"+compileM(x)+").union((Bag)"+compileM(y)+")";
- case Generator(`min,`max,`size):
- return "MapReduceAlgebra.generator(((MR_long)"+compileE(min)+").get(),"
- +"((MR_long)"+compileE(max)+").get())";
- case BSP(`n,`superstep,`state,`o,...as):
- String ds = "";
- for ( Tree a: as )
- ds += ",(Bag)("+compileM(a)+")";
- return "MapReduceAlgebra.BSP("+((LongLeaf)n).value()+","
- +compileF(superstep)+","+compileE(state)+","+o+","
- +"new Bag[]{"+ds.substring(1)+"})";
- case `v:
- if (v.is_variable())
- return v.toString();
- };
- throw new Exception("Cannot compile: "+e);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/src/main/java/core/Config.java
----------------------------------------------------------------------
diff --git a/src/main/java/core/Config.java b/src/main/java/core/Config.java
deleted file mode 100644
index a957d89..0000000
--- a/src/main/java/core/Config.java
+++ /dev/null
@@ -1,277 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.mrql;
-
-import org.apache.mrql.gen.VariableLeaf;
-import java.util.ArrayList;
-import java.io.FileInputStream;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-
-/** MRQL configuration parameters */
-final public class Config {
- public static boolean loaded = false;
-
- // true for using Hadoop HDFS file-system
- public static boolean hadoop_mode = false;
- // true for local execution (one node)
- public static boolean local_mode = false;
- // true for local execution (one node)
- public static boolean distributed_mode = false;
- // true for Hadoop map-reduce mode
- public static boolean map_reduce_mode = false;
- // true, for BSP mode using Hama
- public static boolean bsp_mode = false;
- // true, for Spark mode
- public static boolean spark_mode = false;
- // if true, it process the input interactively
- public static boolean interactive = true;
- // compile the MR functional arguments to Java bytecode at run-time
- // (each task-tracker repeats the compilation at the MR setup time)
- public static boolean compile_functional_arguments = true;
- // if true, generates info about all compilation and optimization steps
- public static boolean trace = false;
- // number of worker nodes
- public static int nodes = 2;
- // true, to disable mapJoin
- public static boolean noMapJoin = false;
- // max distributed cache size for MapJoin (fragment-replicate join) in MBs
- public static int mapjoin_size = 50;
- // max entries for in-mapper combiner before they are flushed out
- public static int map_cache_size = 100000;
- // max number of bag elements to print
- public static int max_bag_size_print = 20;
- // max size of materialized vector before is spilled to a file:
- public static int max_materialized_bag = 500000;
- // max number of incoming messages before a sub-sync()
- public static int bsp_msg_size = Integer.MAX_VALUE;
- // number of elements per mapper to process the range min...max
- public static long range_split_size = 100000;
- // max number of streams to merge simultaneously
- public static int max_merged_streams = 100;
- // the directory for temporary files and spilled bags
- public static String tmpDirectory = "/tmp/mrql_"+System.getProperty("user.name");
- // true, if we want to derive a combine function for MapReduce
- public static boolean use_combiner = true;
- // true, if we can use the rule that fuses a groupBy with a join over the same key
- public static boolean groupJoinOpt = true;
- // true, if we can use the rule that converts a self-join into a simple mapreduce
- public static boolean selfJoinOpt = true;
- // true for run-time trace of plans
- public static boolean trace_execution = false;
- // true for extensive run-time trace of expressions & plans
- public static boolean trace_exp_execution = false;
- // true if you don't want to print statistics
- public static boolean quiet_execution = false;
- // true if this is during testing
- public static boolean testing = false;
- // true to display INFO log messages
- public static boolean info = false;
-
- /** store the configuration parameters */
- public static void write ( Configuration conf ) {
- conf.setBoolean("mrql.hadoop.mode",hadoop_mode);
- conf.setBoolean("mrql.local.mode",local_mode);
- conf.setBoolean("mrql.distributed.mode",distributed_mode);
- conf.setBoolean("mrql.map.reduce.mode",map_reduce_mode);
- conf.setBoolean("mrql.bsp.mode",bsp_mode);
- conf.setBoolean("mrql.spark.mode",spark_mode);
- conf.setBoolean("mrql.interactive",interactive);
- conf.setBoolean("mrql.compile.functional.arguments",compile_functional_arguments);
- conf.setBoolean("mrql.trace",trace);
- conf.setInt("mrql.nodes",nodes);
- conf.setInt("mrql.mapjoin.size",mapjoin_size);
- conf.setInt("mrql.in.mapper.size",map_cache_size);
- conf.setInt("mrql.max.bag.size.print",max_bag_size_print);
- conf.setInt("mrql.max.materialized.bag",max_materialized_bag);
- conf.setInt("mrql.bsp.msg.size",bsp_msg_size);
- conf.setLong("mrql.range.split.size",range_split_size);
- conf.setInt("mrql.max.merged.streams",max_merged_streams);
- conf.set("mrql.tmp.directory",tmpDirectory);
- conf.setBoolean("mrql.use.combiner",use_combiner);
- conf.setBoolean("mrql.group.join.opt",groupJoinOpt);
- conf.setBoolean("mrql.self.join.opt",selfJoinOpt);
- conf.setBoolean("mrql.trace.execution",trace_execution);
- conf.setBoolean("mrql.trace.exp.execution",trace_exp_execution);
- conf.setBoolean("mrql.quiet.execution",quiet_execution);
- conf.setBoolean("mrql.testing",testing);
- conf.setBoolean("mrql.info",info);
- }
-
- /** load the configuration parameters */
- public static void read ( Configuration conf ) {
- if (loaded)
- return;
- loaded = true;
- hadoop_mode = conf.getBoolean("mrql.hadoop.mode",hadoop_mode);
- local_mode = conf.getBoolean("mrql.local.mode",local_mode);
- distributed_mode = conf.getBoolean("mrql.distributed.mode",distributed_mode);
- map_reduce_mode = conf.getBoolean("mrql.map.reduce.mode",map_reduce_mode);
- bsp_mode = conf.getBoolean("mrql.bsp.mode",bsp_mode);
- spark_mode = conf.getBoolean("mrql.spark.mode",spark_mode);
- interactive = conf.getBoolean("mrql.interactive",interactive);
- compile_functional_arguments = conf.getBoolean("mrql.compile.functional.arguments",compile_functional_arguments);
- trace = conf.getBoolean("mrql.trace",trace);
- nodes = conf.getInt("mrql.nodes",nodes);
- mapjoin_size = conf.getInt("mrql.mapjoin.size",mapjoin_size);
- map_cache_size = conf.getInt("mrql.in.mapper.size",map_cache_size);
- max_bag_size_print = conf.getInt("mrql.max.bag.size.print",max_bag_size_print);
- max_materialized_bag = conf.getInt("mrql.max.materialized.bag",max_materialized_bag);
- bsp_msg_size = conf.getInt("mrql.bsp.msg.size",bsp_msg_size);
- range_split_size = conf.getLong("mrql.range.split.size",range_split_size);
- max_merged_streams = conf.getInt("mrql.max.merged.streams",max_merged_streams);
- tmpDirectory = conf.get("mrql.tmp.directory");
- use_combiner = conf.getBoolean("mrql.use.combiner",use_combiner);
- groupJoinOpt = conf.getBoolean("mrql.group.join.opt",groupJoinOpt);
- selfJoinOpt = conf.getBoolean("mrql.self.join.opt",selfJoinOpt);
- trace_execution = conf.getBoolean("mrql.trace.execution",trace_execution);
- trace_exp_execution = conf.getBoolean("mrql.trace.exp.execution",trace_exp_execution);
- quiet_execution = conf.getBoolean("mrql.quiet.execution",quiet_execution);
- testing = conf.getBoolean("mrql.testing",testing);
- info = conf.getBoolean("mrql.info",info);
- }
-
- public static ArrayList<String> extra_args = new ArrayList<String>();
-
- /** read configuration parameters from the Main args */
- public static Bag parse_args ( String args[], Configuration conf ) throws Exception {
- int i = 0;
- int iargs = 0;
- extra_args = new ArrayList<String>();
- ClassImporter.load_classes();
- interactive = true;
- while (i < args.length) {
- if (args[i].equals("-local")) {
- local_mode = true;
- i++;
- } else if (args[i].equals("-dist")) {
- distributed_mode = true;
- i++;
- } else if (args[i].equals("-reducers")) {
- if (++i >= args.length)
- throw new Error("Expected number of reductions");
- nodes = Integer.parseInt(args[i]);
- i++;
- } else if (args[i].equals("-bsp")) {
- bsp_mode = true;
- i++;
- } else if (args[i].equals("-spark")) {
- spark_mode = true;
- i++;
- } else if (args[i].equals("-bsp_tasks")) {
- if (++i >= args.length && Integer.parseInt(args[i]) < 1)
- throw new Error("Expected max number of bsp tasks > 1");
- nodes = Integer.parseInt(args[i]);
- i++;
- } else if (args[i].equals("-nodes")) {
- if (++i >= args.length && Integer.parseInt(args[i]) < 1)
- throw new Error("Expected number of nodes > 1");
- nodes = Integer.parseInt(args[i]);
- i++;
- } else if (args[i].equals("-bsp_msg_size")) {
- if (++i >= args.length && Integer.parseInt(args[i]) < 10000)
- throw new Error("Expected max number of bsp messages before subsync() > 10000");
- bsp_msg_size = Integer.parseInt(args[i]);
- i++;
- } else if (args[i].equals("-mapjoin_size")) {
- if (++i >= args.length)
- throw new Error("Expected number of MBs");
- mapjoin_size = Integer.parseInt(args[i]);
- i++;
- } else if (args[i].equals("-cache_size")) {
- if (++i >= args.length)
- throw new Error("Expected number of entries");
- map_cache_size = Integer.parseInt(args[i]);
- i++;
- } else if (args[i].equals("-tmp")) {
- if (++i >= args.length)
- throw new Error("Expected a temporary directory");
- tmpDirectory = args[i];
- i++;
- } else if (args[i].equals("-bag_size")) {
- if (++i >= args.length && Integer.parseInt(args[i]) < 10000)
- throw new Error("Expected max size of materialized bag > 10000");
- max_materialized_bag = Integer.parseInt(args[i]);
- i++;
- } else if (args[i].equals("-bag_print")) {
- if (++i >= args.length)
- throw new Error("Expected number of bag elements to print");
- max_bag_size_print = Integer.parseInt(args[i]);
- i++;
- } else if (args[i].equals("-split_size")) {
- if (++i >= args.length)
- throw new Error("Expected a split size");
- range_split_size = Long.parseLong(args[i]);
- i++;
- } else if (args[i].equals("-max_merged")) {
- if (++i >= args.length)
- throw new Error("Expected a max number of merged streams");
- max_merged_streams = Integer.parseInt(args[i]);
- i++;
- } else if (args[i].equals("-trace")) {
- trace = true;
- i++;
- } else if (args[i].equals("-C")) {
- compile_functional_arguments = true;
- i++;
- } else if (args[i].equals("-NC")) {
- compile_functional_arguments = false;
- i++;
- } else if (args[i].equals("-P")) {
- trace_execution = true;
- i++;
- } else if (args[i].equals("-quiet")) {
- quiet_execution = true;
- i++;
- } else if (args[i].equals("-info")) {
- info = true;
- i++;
- } else if (args[i].equals("-trace_execution")) {
- trace_execution = true;
- trace_exp_execution = true;
- compile_functional_arguments = false;
- i++;
- } else if (args[i].equals("-methods")) {
- System.out.print("\nImported methods: ");
- ClassImporter.print_methods();
- System.out.println();
- System.out.print("\nAggregations:");
- Translator.print_aggregates();
- System.out.println();
- i++;
- } else if (args[i].charAt(0) == '-')
- throw new Error("Unknown MRQL parameter: "+args[i]);
- else {
- if (interactive) {
- Main.query_file = args[i++];
- interactive = false;
- } else extra_args.add(args[i++]);
- }
- };
- if (hadoop_mode)
- write(conf);
- Plan.conf = conf;
- Bag b = new Bag();
- for ( String s: extra_args )
- b.add(new MR_string(s));
- Interpreter.new_global_binding(new VariableLeaf("args").value(),b);
- return b;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/src/main/java/core/DataSet.java
----------------------------------------------------------------------
diff --git a/src/main/java/core/DataSet.java b/src/main/java/core/DataSet.java
deleted file mode 100644
index efe5646..0000000
--- a/src/main/java/core/DataSet.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.mrql;
-
-import java.util.List;
-import java.util.ArrayList;
-import org.apache.hadoop.conf.Configuration;
-
-
-/** The domain of the MRQL physical algebra is a set of DataSources */
-public class DataSet {
- public ArrayList<DataSource> source; // multiple sources
- public long counter; // a Hadoop user-defined counter used by the `repeat' operator
- public long records; // total number of dataset records
-
- /** Construct a DataSet that contains one DataSource
- * @param s the given DataSource
- * @param counter a Hadoop user-defined counter used by the `repeat' operator
- * @param records total number of dataset records
- */
- DataSet ( DataSource s, long counter, long records ) {
- source = new ArrayList<DataSource>();
- source.add(s);
- this.counter = counter;
- this.records = records;
- }
-
- /** Construct a set of DataSources
- * @param counter a Hadoop user-defined counter used by the `repeat' operator
- * @param records total number of dataset records
- */
- DataSet ( long counter, long records ) {
- source = new ArrayList<DataSource>();
- this.counter = counter;
- this.records = records;
- }
-
- /** add a DataSource to this DataSet */
- public void add ( DataSource s ) {
- source.add(s);
- }
-
- /** merge this DataSet with the given DataSet */
- public void merge ( DataSet ds ) {
- source.addAll(ds.source);
- counter += ds.counter;
- records += ds.records;
- }
-
- /** dataset size in bytes */
- public long size ( Configuration conf ) {
- long n = 0;
- for (DataSource s: source)
- n += s.size(conf);
- return n;
- }
-
- /** return a single DataSource path by merging all the DataSource paths in this DataSet */
- public String merge () {
- Object[] ds = source.toArray();
- String path = ((DataSource)ds[0]).path.toString();
- for ( int i = 1; i < ds.length; i++ )
- path += ","+((DataSource)ds[i]).path;
- return path;
- }
-
- /** return the first num values */
- public List<MRData> take ( int num ) {
- int count = num;
- ArrayList<MRData> res = new ArrayList<MRData>();
- for ( DataSource s: source ) {
- res.addAll(s.take(count));
- if (res.size() < count)
- count = count-res.size();
- else return res;
- };
- return res;
- }
-
- /** accumulate all dataset values */
- public MRData reduce ( MRData zero, Function acc ) {
- MRData res = zero;
- for ( DataSource s: source )
- res = s.reduce(res,acc);
- return res;
- }
-
- public String toString () {
- String p = "<"+counter;
- for (DataSource s: source)
- p += ","+s;
- return p+">";
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/src/main/java/core/DataSource.java
----------------------------------------------------------------------
diff --git a/src/main/java/core/DataSource.java b/src/main/java/core/DataSource.java
deleted file mode 100644
index 6a0ef4f..0000000
--- a/src/main/java/core/DataSource.java
+++ /dev/null
@@ -1,195 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.mrql;
-
-import org.apache.mrql.gen.*;
-import java.io.IOException;
-import java.util.List;
-import java.util.ArrayList;
-import java.util.Map;
-import java.util.HashMap;
-import java.util.Iterator;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.conf.Configuration;
-
-
-/** A DataSource is any input data source, such as a text file, a key/value map, a data base, an intermediate file, etc */
-public class DataSource {
- final public static String separator = "%%%";
- public static DataSourceDirectory dataSourceDirectory = new DataSourceDirectory();
- public static ParserDirectory parserDirectory = new ParserDirectory();
- private static boolean loaded = false;
-
- public String path;
- public Class<? extends MRQLFileInputFormat> inputFormat;
- public int source_num;
- public boolean to_be_merged; // if the path is a directory with multiple files, merge them
-
- final static class ParserDirectory extends HashMap<String,Class<? extends Parser>> {
- }
-
- /** A dictionary that maps data source paths to DataSource data.
- * It assumes that each path can only be associated with a single data source format and parser
- */
- final static class DataSourceDirectory extends HashMap<String,DataSource> {
- public void read ( Configuration conf ) {
- clear();
- for ( String s: conf.get("mrql.data.source.directory").split("@@@") ) {
- String[] p = s.split("===");
- put(p[0],DataSource.read(p[1],conf));
- }
- }
-
- public String toString () {
- String s = "";
- for ( String k: keySet() )
- s += "@@@"+k+"==="+get(k);
- if (s.equals(""))
- return s;
- else return s.substring(3);
- }
-
- public DataSource get ( String name ) {
- for ( Map.Entry<String,DataSource> e: entrySet() )
- if (name.startsWith(e.getKey()))
- return e.getValue();
- return null;
- }
-
- public void distribute ( Configuration conf ) {
- conf.set("mrql.data.source.directory",toString());
- }
- }
-
- DataSource () {}
-
- DataSource ( int source_num,
- String path,
- Class<? extends MRQLFileInputFormat> inputFormat,
- Configuration conf ) {
- this.source_num = source_num;
- this.path = path;
- this.inputFormat = inputFormat;
- to_be_merged = false;
- try {
- Path p = new Path(path);
- FileSystem fs = p.getFileSystem(conf);
- String complete_path = fs.getFileStatus(p).getPath().toString();
- //String complete_path = "file:"+path;
- this.path = complete_path;
- dataSourceDirectory.put(this.path,this);
- } catch (IOException e) {
- throw new Error(e);
- }
- }
-
- public static void loadParsers() {
- if (!loaded) {
- DataSource.parserDirectory.put("xml",XMLParser.class);
- DataSource.parserDirectory.put("json",JsonFormatParser.class);
- DataSource.parserDirectory.put("line",LineParser.class);
- loaded = true;
- }
- }
-
- static {
- loadParsers();
- }
-
- private static long size ( Path path, Configuration conf ) throws IOException {
- FileStatus s = path.getFileSystem(conf).getFileStatus(path);
- if (!s.isDir())
- return s.getLen();
- long size = 0;
- for ( FileStatus fs: path.getFileSystem(conf).listStatus(path) )
- size += fs.getLen();
- return size;
- }
-
- /** data set size in bytes */
- public long size ( Configuration conf ) {
- try {
- return size(new Path(path),conf);
- } catch (IOException e) {
- throw new Error(e);
- }
- }
-
- public static DataSource read ( String buffer, Configuration conf ) {
- try {
- String[] s = buffer.split(separator);
- int n = Integer.parseInt(s[1]);
- if (s[0].equals("Binary"))
- return new BinaryDataSource(n,s[2],conf);
- else if (s[0].equals("Generator"))
- return new GeneratorDataSource(n,s[2],conf);
- else if (s[0].equals("Text"))
- return new ParsedDataSource(n,s[3],parserDirectory.get(s[2]),((Node)Tree.parse(s[4])).children(),conf);
- else throw new Error("Unrecognized data source: "+s[0]);
- } catch (Exception e) {
- throw new Error(e);
- }
- }
-
- public static DataSource get ( String path, Configuration conf ) {
- if (dataSourceDirectory.isEmpty())
- dataSourceDirectory.read(conf);
- return dataSourceDirectory.get(path);
- }
-
- public static DataSource getCached ( String remote_path, String local_path, Configuration conf ) {
- DataSource ds = get(remote_path,conf);
- ds.path = local_path;
- dataSourceDirectory.put(local_path,ds);
- return ds;
- }
-
- /** return the first num values */
- public List<MRData> take ( int num ) {
- int count = num;
- try {
- ArrayList<MRData> res = new ArrayList<MRData>();
- Iterator<MRData> it = inputFormat.newInstance().materialize(new Path(path)).iterator();
- for ( int i = num; (num < 0 || i > 0) && it.hasNext(); i-- )
- if (Config.hadoop_mode && Config.bsp_mode)
- res.add(((Tuple)it.next()).get(1)); // strip tag in BSP mode
- else res.add(it.next());
- return res;
- } catch (Exception ex) {
- throw new Error(ex);
- }
- }
-
- static Tuple tuple_container = new Tuple(new Tuple(),new Tuple());
-
- /** accumulate all datasource values */
- public MRData reduce ( MRData zero, final Function acc ) {
- try {
- MRData res = zero;
- for ( MRData x: inputFormat.newInstance().materialize(new Path(path)) ) {
- if (Config.hadoop_mode && Config.bsp_mode)
- x = ((Tuple)x).get(1); // strip tag in BSP mode
- tuple_container.set(0,res).set(1,x);
- res = acc.eval(tuple_container);
- };
- return res;
- } catch (Exception ex) {
- throw new Error(ex);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/src/main/java/core/Environment.java
----------------------------------------------------------------------
diff --git a/src/main/java/core/Environment.java b/src/main/java/core/Environment.java
deleted file mode 100644
index da8dd84..0000000
--- a/src/main/java/core/Environment.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.mrql;
-
-import java.io.*;
-import org.apache.mrql.gen.Tree;
-
-
-/** the run-time environment for in-memory evaluation (binds variables to MRData) */
-final public class Environment implements Serializable {
- public String name;
- public MRData value;
- public Environment next;
-
- Environment ( String n, MRData v, Environment next ) {
- name = n;
- value = v;
- this.next = next;
- }
-
- private void writeObject(ObjectOutputStream out) throws IOException {
- out.defaultWriteObject();
- }
-
- private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
- in.defaultReadObject();
- name = Tree.add(name);
- }
-}