You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mrql.apache.org by fe...@apache.org on 2013/09/06 22:57:20 UTC
[02/18] MRQL-16: correct source files. ASF licenses,
and POMs for release
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/3de9e485/src/main/java/gen/Trees.java
----------------------------------------------------------------------
diff --git a/src/main/java/gen/Trees.java b/src/main/java/gen/Trees.java
index 79b70c3..86f65ca 100644
--- a/src/main/java/gen/Trees.java
+++ b/src/main/java/gen/Trees.java
@@ -29,9 +29,9 @@ final class TreeIterator implements Iterator<Tree> {
public boolean hasNext () { return trees.tail != null; }
public Tree next () {
- Tree res = trees.head;
- trees = trees.tail;
- return res;
+ Tree res = trees.head;
+ trees = trees.tail;
+ return res;
}
public void remove () { trees = trees.tail; }
@@ -44,151 +44,151 @@ final public class Trees implements Iterable<Tree>, Serializable {
public Trees tail;
public Trees ( Tree head, Trees tail ) {
- if (tail == null)
- throw new Error("Gen: an empty list of nodes must be nil, not null");
- this.head = head;
- this.tail = tail;
+ if (tail == null)
+ throw new Error("Gen: an empty list of nodes must be nil, not null");
+ this.head = head;
+ this.tail = tail;
}
public Trees () {
- head = null;
- tail = null;
+ head = null;
+ tail = null;
}
public final static Trees nil = new Trees();
public Trees ( Tree head ) {
- this.head = head;
- tail = nil;
+ this.head = head;
+ tail = nil;
}
public Tree head () {
- if (tail == null)
- throw new Error("Gen: tried to retrieve the head of an empty list of nodes");
- return head;
+ if (tail == null)
+ throw new Error("Gen: tried to retrieve the head of an empty list of nodes");
+ return head;
}
public Trees tail () {
- if (tail == null)
- throw new Error("Gen: tried to retrieve the tail of an empty list of nodes");
- return tail;
+ if (tail == null)
+ throw new Error("Gen: tried to retrieve the tail of an empty list of nodes");
+ return tail;
}
public boolean is_empty () {
- return (tail == null);
+ return (tail == null);
}
/* number of nodes */
public int length () {
- int n = 0;
- for (Trees r = this; !r.is_empty(); r = r.tail)
- n += 1;
- return n;
+ int n = 0;
+ for (Trees r = this; !r.is_empty(); r = r.tail)
+ n += 1;
+ return n;
}
/* put an Tree e at the beginning of the nodes */
public Trees cons ( Tree e ) {
- return new Trees(e,this);
+ return new Trees(e,this);
}
/* put an Tree e at the end of the nodes */
public Trees append ( Tree e ) {
- if (is_empty())
- return new Trees(e);
- else {
- Trees temp = new Trees(e,new Trees(e));
- Trees res = temp;
- for (Trees r = this; !r.is_empty(); r = r.tail) {
- temp.tail = temp.tail.cons(r.head);
- temp = temp.tail;
- };
- return res.tail;
- }
+ if (is_empty())
+ return new Trees(e);
+ else {
+ Trees temp = new Trees(e,new Trees(e));
+ Trees res = temp;
+ for (Trees r = this; !r.is_empty(); r = r.tail) {
+ temp.tail = temp.tail.cons(r.head);
+ temp = temp.tail;
+ };
+ return res.tail;
+ }
}
/* append two lists of nodes */
public Trees append ( Trees s ) {
- if (is_empty())
- return s;
- else if (s.is_empty())
- return this;
- else {
- Trees temp = s.cons(s.head);
- Trees res = temp;
- for (Trees r = this; !r.is_empty(); r = r.tail)
- { temp.tail = temp.tail.cons(r.head);
- temp = temp.tail;
- }
- return res.tail;
- }
+ if (is_empty())
+ return s;
+ else if (s.is_empty())
+ return this;
+ else {
+ Trees temp = s.cons(s.head);
+ Trees res = temp;
+ for (Trees r = this; !r.is_empty(); r = r.tail)
+ { temp.tail = temp.tail.cons(r.head);
+ temp = temp.tail;
+ }
+ return res.tail;
+ }
}
/* reverse the order of nodes */
public Trees reverse () {
- Trees res = nil;
- for (Trees r = this; !r.is_empty(); r = r.tail)
- res = res.cons(r.head);
- return res;
+ Trees res = nil;
+ for (Trees r = this; !r.is_empty(); r = r.tail)
+ res = res.cons(r.head);
+ return res;
}
/* is e one of the nodes? */
public boolean member ( Tree e ) {
- for (Trees r = this; !r.is_empty(); r = r.tail)
- if (r.head.equals(e))
- return true;
- return false;
+ for (Trees r = this; !r.is_empty(); r = r.tail)
+ if (r.head.equals(e))
+ return true;
+ return false;
}
/* return the nth node */
public Tree nth ( int n ) {
- Trees r = this;
- for (int i = 0; !r.is_empty() && i < n; r = r.tail(), i++)
- ;
- if (r.is_empty())
- throw new Error("Gen: tried to retrieve a nonexistent nth element from a list of nodes");
- else return r.head;
+ Trees r = this;
+ for (int i = 0; !r.is_empty() && i < n; r = r.tail(), i++)
+ ;
+ if (r.is_empty())
+ throw new Error("Gen: tried to retrieve a nonexistent nth element from a list of nodes");
+ else return r.head;
}
/* deep equality */
public boolean equals ( Trees s ) {
- Trees n = this;
- Trees m = s;
- for(; n.tail != null && m.tail != null; n = n.tail, m = m.tail )
- if (!n.head.equals(m.head))
- return false;
- return (m.tail == null) && (n.tail == null);
+ Trees n = this;
+ Trees m = s;
+ for(; n.tail != null && m.tail != null; n = n.tail, m = m.tail )
+ if (!n.head.equals(m.head))
+ return false;
+ return (m.tail == null) && (n.tail == null);
}
protected int size () {
- int n = 1;
- for (Trees r = this; !r.is_empty(); r = r.tail)
- n += r.head.size()+1;
- return n;
+ int n = 1;
+ for (Trees r = this; !r.is_empty(); r = r.tail)
+ n += r.head.size()+1;
+ return n;
}
public Iterator<Tree> iterator () { return new TreeIterator(this); }
/* print the nodes */
public String toString () {
- if (is_empty())
- return "()";
- String s = "(" + head;
- for (Trees r = tail; !r.is_empty(); r = r.tail)
- s = s + "," + r.head;
- return s + ")";
+ if (is_empty())
+ return "()";
+ String s = "(" + head;
+ for (Trees r = tail; !r.is_empty(); r = r.tail)
+ s = s + "," + r.head;
+ return s + ")";
}
/* pretty-print the nodes */
public String pretty ( int position ) {
- if (is_empty() || (position+size() <= screen_size))
- return toString();
- String s = "(" + head.pretty(position+1);
- for (Trees r=tail; !r.is_empty(); r=r.tail) {
- s = s + ",\n";
- for (int i=0; i<position+1; i++)
- s = s + " ";
- s = s + r.head.pretty(position+1);
- };
- return s + ")";
+ if (is_empty() || (position+size() <= screen_size))
+ return toString();
+ String s = "(" + head.pretty(position+1);
+ for (Trees r=tail; !r.is_empty(); r=r.tail) {
+ s = s + ",\n";
+ for (int i=0; i<position+1; i++)
+ s = s + " ";
+ s = s + r.head.pretty(position+1);
+ };
+ return s + ")";
}
}
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/3de9e485/src/main/java/gen/VariableLeaf.java
----------------------------------------------------------------------
diff --git a/src/main/java/gen/VariableLeaf.java b/src/main/java/gen/VariableLeaf.java
index 624e8bc..c4c0ed9 100644
--- a/src/main/java/gen/VariableLeaf.java
+++ b/src/main/java/gen/VariableLeaf.java
@@ -24,15 +24,15 @@ final public class VariableLeaf extends Tree {
public String value;
public VariableLeaf ( String s ) {
- super();
- value = Tree.add(s);
+ super();
+ value = Tree.add(s);
}
final public String value () { return value; }
public boolean equals ( Tree e ) {
- return (e instanceof VariableLeaf)
- && value == ((VariableLeaf) e).value;
+ return (e instanceof VariableLeaf)
+ && value == ((VariableLeaf) e).value;
}
protected int size () { return value.length(); }
@@ -42,11 +42,11 @@ final public class VariableLeaf extends Tree {
public String pretty ( int position ) { return value; }
private void writeObject(ObjectOutputStream out) throws IOException {
- out.defaultWriteObject();
+ out.defaultWriteObject();
}
private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
- in.defaultReadObject();
- value = Tree.add(value);
+ in.defaultReadObject();
+ value = Tree.add(value);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/3de9e485/src/main/java/spark/BinaryInputFormat.java
----------------------------------------------------------------------
diff --git a/src/main/java/spark/BinaryInputFormat.java b/src/main/java/spark/BinaryInputFormat.java
index 337e045..ad364bb 100644
--- a/src/main/java/spark/BinaryInputFormat.java
+++ b/src/main/java/spark/BinaryInputFormat.java
@@ -26,28 +26,28 @@ import org.apache.hadoop.mapred.*;
/** Input format for hadoop sequence files */
final public class BinaryInputFormat extends MRQLFileInputFormat {
public static class BinaryInputRecordReader extends SequenceFileRecordReader<MRContainer,MRContainer> {
- final MRContainer result = new MRContainer();
+ final MRContainer result = new MRContainer();
- public BinaryInputRecordReader ( FileSplit split,
- JobConf job ) throws IOException {
- super(job,split);
- }
+ public BinaryInputRecordReader ( FileSplit split,
+ JobConf job ) throws IOException {
+ super(job,split);
+ }
- @Override
- public synchronized boolean next ( MRContainer key, MRContainer value ) throws IOException {
- boolean b = super.next(key,result);
- value.set(result.data());
- return b;
- }
+ @Override
+ public synchronized boolean next ( MRContainer key, MRContainer value ) throws IOException {
+ boolean b = super.next(key,result);
+ value.set(result.data());
+ return b;
+ }
}
@Override
public RecordReader<MRContainer,MRContainer>
- getRecordReader ( InputSplit split,
- JobConf job,
- Reporter reporter ) throws IOException {
- String path = ((FileSplit)split).getPath().toString();
- BinaryDataSource ds = (BinaryDataSource)DataSource.get(path,job);
- return new BinaryInputRecordReader((FileSplit)split,job);
+ getRecordReader ( InputSplit split,
+ JobConf job,
+ Reporter reporter ) throws IOException {
+ String path = ((FileSplit)split).getPath().toString();
+ BinaryDataSource ds = (BinaryDataSource)DataSource.get(path,job);
+ return new BinaryInputRecordReader((FileSplit)split,job);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/3de9e485/src/main/java/spark/Evaluator.gen
----------------------------------------------------------------------
diff --git a/src/main/java/spark/Evaluator.gen b/src/main/java/spark/Evaluator.gen
index be7287a..30905b1 100644
--- a/src/main/java/spark/Evaluator.gen
+++ b/src/main/java/spark/Evaluator.gen
@@ -49,94 +49,94 @@ final public class Evaluator extends Interpreter implements Serializable {
/** initialize the Spark evaluator */
final static void init ( Configuration conf ) throws IOException {
- Config.spark_mode = true;
- if (Config.hadoop_mode && Config.local_mode)
- spark_context = new JavaSparkContext("local["+Config.nodes+"]",
- "MRQL Local Spark Evaluator");
- else if (Config.hadoop_mode) {
- HashMap<String,String> env = new HashMap<String,String>();
- data_source_directory = System.getenv("FS_DEFAULT_NAME")+"/"+Plan.new_path(conf);
- env.put("mrql.data.source.directory",data_source_directory);
- spark_context = new JavaSparkContext(System.getenv("SPARK_MASTER"),
- "MRQL Spark Evaluator",
- System.getenv("SPARK_HOME"),
- new String[]{mrql_jar},
- env);
- Plan.conf = spark_context.hadoopConfiguration();
- FileSystem.setDefaultUri(Plan.conf,System.getenv("FS_DEFAULT_NAME"));
- }
+ Config.spark_mode = true;
+ if (Config.hadoop_mode && Config.local_mode)
+ spark_context = new JavaSparkContext("local["+Config.nodes+"]",
+ "MRQL Local Spark Evaluator");
+ else if (Config.hadoop_mode) {
+ HashMap<String,String> env = new HashMap<String,String>();
+ data_source_directory = System.getenv("FS_DEFAULT_NAME")+"/"+Plan.new_path(conf);
+ env.put("mrql.data.source.directory",data_source_directory);
+ spark_context = new JavaSparkContext(System.getenv("SPARK_MASTER"),
+ "MRQL Spark Evaluator",
+ System.getenv("SPARK_HOME"),
+ new String[]{mrql_jar},
+ env);
+ Plan.conf = spark_context.hadoopConfiguration();
+ FileSystem.setDefaultUri(Plan.conf,System.getenv("FS_DEFAULT_NAME"));
+ }
}
/** shutdown the Spark evaluator */
final static void shutdown ( Configuration conf ) {
- spark_context.stop();
- spark_context = null;
- System.clearProperty("spark.driver.port");
+ spark_context.stop();
+ spark_context = null;
+ System.clearProperty("spark.driver.port");
}
final static void initialize_query () {
- spark_context.clearJars();
- spark_context.addJar(mrql_jar);
- Plan.distribute_compiled_arguments(Plan.conf);
- if (Config.compile_functional_arguments)
- spark_context.addJar(Plan.conf.get("mrql.jar.path"));
+ spark_context.clearJars();
+ spark_context.addJar(mrql_jar);
+ Plan.distribute_compiled_arguments(Plan.conf);
+ if (Config.compile_functional_arguments)
+ spark_context.addJar(Plan.conf.get("mrql.jar.path"));
}
final static Configuration new_configuration () {
- return new Configuration();
+ return new Configuration();
}
public static MR_bool synchronize ( MR_string peerName, MR_bool mr_exit ) {
- throw new Error("You can only synchronize BSP tasks");
+ throw new Error("You can only synchronize BSP tasks");
}
public static Bag distribute ( MR_string peerName, Bag s ) {
- throw new Error("You can only distribute bags among BSP tasks");
+ throw new Error("You can only distribute bags among BSP tasks");
}
final static Bag collect ( final DataSet x ) throws Exception {
- return Plan.collect(x);
+ return Plan.collect(x);
}
final static MRData bsp ( Tree plan, Environment env ) throws Exception {
- throw new Error("You can not run a BSP task in Spark mode");
+ throw new Error("You can not run a BSP task in Spark mode");
}
/** used by the master to send parsing details (eg, record types) to workers */
public static void dump_source_dir () throws IOException {
- if (Config.local_mode)
- return;
- DataSource.dataSourceDirectory.distribute(Plan.conf);
- Path path = new Path(data_source_directory);
- FileSystem fs = path.getFileSystem(Plan.conf);
- PrintStream ps = new PrintStream(fs.create(path,true));
- ps.println(Plan.conf.get("mrql.data.source.directory"));
- ps.close();
+ if (Config.local_mode)
+ return;
+ DataSource.dataSourceDirectory.distribute(Plan.conf);
+ Path path = new Path(data_source_directory);
+ FileSystem fs = path.getFileSystem(Plan.conf);
+ PrintStream ps = new PrintStream(fs.create(path,true));
+ ps.println(Plan.conf.get("mrql.data.source.directory"));
+ ps.close();
}
/** executed by a worker when reading parsed input (see ParsedInputFormat) */
public static void load_source_dir () throws IOException {
- if (Config.local_mode)
- return;
- if (Plan.conf == null)
- Plan.conf = new_configuration();
- // the name of the file that contains the source directory details is passed
- // to workers through the HashMap environment in the JavaSparkContext
- Path path = new Path(System.getenv("mrql.data.source.directory"));
- FileSystem fs = path.getFileSystem(Plan.conf);
- BufferedReader ftp = new BufferedReader(new InputStreamReader(fs.open(path)));
- Plan.conf.set("mrql.data.source.directory",ftp.readLine());
- DataSource.dataSourceDirectory.read(Plan.conf);
- ftp.close();
+ if (Config.local_mode)
+ return;
+ if (Plan.conf == null)
+ Plan.conf = new_configuration();
+ // the name of the file that contains the source directory details is passed
+ // to workers through the HashMap environment in the JavaSparkContext
+ Path path = new Path(System.getenv("mrql.data.source.directory"));
+ FileSystem fs = path.getFileSystem(Plan.conf);
+ BufferedReader ftp = new BufferedReader(new InputStreamReader(fs.open(path)));
+ Plan.conf.set("mrql.data.source.directory",ftp.readLine());
+ DataSource.dataSourceDirectory.read(Plan.conf);
+ ftp.close();
}
private static Function2<MRData,MRData,MRData> accumulator ( Tree acc_fnc, Environment env ) {
- final org.apache.mrql.Function f = evalF(acc_fnc,env);
- return new Function2<MRData,MRData,MRData>() {
- public MRData call ( MRData x, MRData y ) {
- return f.eval(new Tuple(x,y));
- }
- };
+ final org.apache.mrql.Function f = evalF(acc_fnc,env);
+ return new Function2<MRData,MRData,MRData>() {
+ public MRData call ( MRData x, MRData y ) {
+ return f.eval(new Tuple(x,y));
+ }
+ };
}
/** The Aggregate physical operator
@@ -147,69 +147,69 @@ final public class Evaluator extends Interpreter implements Serializable {
* @return the aggregation result of type T
*/
final static MRData aggregate ( Tree acc_fnc,
- Tree zero,
- Tree plan,
- Environment env ) throws Exception {
- Function2<MRData,MRData,MRData> f2 = accumulator(acc_fnc,env);
- MRData z = evalE(zero,env);
- match plan {
- case AggregateMap(`m,`acc,_,`s):
- return evalD(#<cMap(`m,`s)>,env)
- .aggregate(z,accumulator(acc,env),f2);
- case MapAggregateReduce(`m,`r,`acc,_,`s,`o):
- if (acc.equals(#<null>))
- fail;
- return evalD(#<MapReduce(`m,`r,`s,`o)>,env)
- .aggregate(z,accumulator(acc,env),f2);
- case CrossAggregateProduct(`mx,`my,`r,`acc,_,`x,`y):
- if (acc.equals(#<null>))
- fail;
- return evalD(#<CrossProduct(`mx,`my,`r,`x,`y)>,env)
- .aggregate(z,accumulator(acc,env),f2);
- case MapAggregateReduce2(`mx,`my,`r,`acc,_,`x,`y,`o):
- if (acc.equals(#<null>))
- fail;
- return evalD(#<MapReduce2(`mx,`my,`r,`x,`y,`o)>,env)
- .aggregate(z,accumulator(acc,env),f2);
- case MapAggregateJoin(`mx,`my,`r,`acc,_,`x,`y):
- if (acc.equals(#<null>))
- fail;
- return evalD(#<MapJoin(`mx,`my,`r,`x,`y)>,env)
- .aggregate(z,accumulator(acc,env),f2);
- };
- throw new Error("Unrecognized aggregation: "+plan);
+ Tree zero,
+ Tree plan,
+ Environment env ) throws Exception {
+ Function2<MRData,MRData,MRData> f2 = accumulator(acc_fnc,env);
+ MRData z = evalE(zero,env);
+ match plan {
+ case AggregateMap(`m,`acc,_,`s):
+ return evalD(#<cMap(`m,`s)>,env)
+ .aggregate(z,accumulator(acc,env),f2);
+ case MapAggregateReduce(`m,`r,`acc,_,`s,`o):
+ if (acc.equals(#<null>))
+ fail;
+ return evalD(#<MapReduce(`m,`r,`s,`o)>,env)
+ .aggregate(z,accumulator(acc,env),f2);
+ case CrossAggregateProduct(`mx,`my,`r,`acc,_,`x,`y):
+ if (acc.equals(#<null>))
+ fail;
+ return evalD(#<CrossProduct(`mx,`my,`r,`x,`y)>,env)
+ .aggregate(z,accumulator(acc,env),f2);
+ case MapAggregateReduce2(`mx,`my,`r,`acc,_,`x,`y,`o):
+ if (acc.equals(#<null>))
+ fail;
+ return evalD(#<MapReduce2(`mx,`my,`r,`x,`y,`o)>,env)
+ .aggregate(z,accumulator(acc,env),f2);
+ case MapAggregateJoin(`mx,`my,`r,`acc,_,`x,`y):
+ if (acc.equals(#<null>))
+ fail;
+ return evalD(#<MapJoin(`mx,`my,`r,`x,`y)>,env)
+ .aggregate(z,accumulator(acc,env),f2);
+ };
+ throw new Error("Unrecognized aggregation: "+plan);
}
/** Evaluate a loop a fixed # of times */
final static Tuple loop ( Tree e, Environment env ) throws Exception {
- match e {
- case Loop(lambda(tuple(...vs),tuple(...bs)),tuple(...ss),`num):
- int limit = ((MR_int)evalE(num,env)).get();
- MR_rdd[] s = new MR_rdd[vs.length()];
- for ( int i = 0; i < vs.length(); i++ )
- s[i] = new MR_rdd(eval(ss.nth(i),env));
- for ( int n = 0; n < limit; n++ ) {
- Environment nenv = env;
- for ( int i = 0; i < vs.length(); i ++ )
- nenv = new Environment(vs.nth(i).toString(),s[i],nenv);
- for ( int i = 0; i < vs.length(); i ++ )
- s[i] = new MR_rdd(eval(bs.nth(i),nenv));
- };
- return new Tuple(s);
- };
- throw new Error("Wrong Loop format");
+ match e {
+ case Loop(lambda(tuple(...vs),tuple(...bs)),tuple(...ss),`num):
+ int limit = ((MR_int)evalE(num,env)).get();
+ MR_rdd[] s = new MR_rdd[vs.length()];
+ for ( int i = 0; i < vs.length(); i++ )
+ s[i] = new MR_rdd(eval(ss.nth(i),env));
+ for ( int n = 0; n < limit; n++ ) {
+ Environment nenv = env;
+ for ( int i = 0; i < vs.length(); i ++ )
+ nenv = new Environment(vs.nth(i).toString(),s[i],nenv);
+ for ( int i = 0; i < vs.length(); i ++ )
+ s[i] = new MR_rdd(eval(bs.nth(i),nenv));
+ };
+ return new Tuple(s);
+ };
+ throw new Error("Wrong Loop format");
}
private static Bag bag ( final List<MRData> s ) {
- final Iterator<MRData> i = s.iterator();
- return new Bag(new BagIterator() {
- public MRData next () {
- return i.next();
- }
- public boolean hasNext () {
- return i.hasNext();
- }
- });
+ final Iterator<MRData> i = s.iterator();
+ return new Bag(new BagIterator() {
+ public MRData next () {
+ return i.next();
+ }
+ public boolean hasNext () {
+ return i.hasNext();
+ }
+ });
}
final static TaskContext context = new TaskContext(0,0,0,null);
@@ -219,32 +219,32 @@ final public class Evaluator extends Interpreter implements Serializable {
* @return a lazy bag that contains all RDD elements
*/
public static Bag bag ( final JavaRDD<MRData> rdd ) {
- final JavaRDD<MRData> rd = rdd.cache();
- rd.count(); // force the evaluation of all pending Spark operations
- final List<Partition> ps = rd.splits();
- return new Bag(new BagIterator() {
- Iterator<MRData> i = null;
- int c = 0;
- public MRData next () {
- return i.next();
- }
- public boolean hasNext () {
- do {
- if (i != null && i.hasNext())
- return true;
- if (c >= ps.size())
- return false;
- i = rdd.iterator(ps.get(c++),context);
- } while (true);
- }
- });
+ final JavaRDD<MRData> rd = rdd.cache();
+ rd.count(); // force the evaluation of all pending Spark operations
+ final List<Partition> ps = rd.splits();
+ return new Bag(new BagIterator() {
+ Iterator<MRData> i = null;
+ int c = 0;
+ public MRData next () {
+ return i.next();
+ }
+ public boolean hasNext () {
+ do {
+ if (i != null && i.hasNext())
+ return true;
+ if (c >= ps.size())
+ return false;
+ i = rdd.iterator(ps.get(c++),context);
+ } while (true);
+ }
+ });
}
private final static Function<MRData,MRData> get_first
- = new Function<MRData,MRData>() {
- public MRData call ( MRData value ) {
- return ((Tuple)value).first();
- };
+ = new Function<MRData,MRData>() {
+ public MRData call ( MRData value ) {
+ return ((Tuple)value).first();
+ };
};
/** Evaluate an MRQL physical plan using Spark
@@ -254,23 +254,23 @@ final public class Evaluator extends Interpreter implements Serializable {
* @return a DataSet
*/
final static DataSet eval ( final Tree e,
- final Environment env,
- final String counter ) {
- JavaRDD<MRData> rd = eval(e,env);
- int count = 0;
- if (!counter.equals("-")) {
- final Accumulator<Integer> c = spark_context.intAccumulator(0);
- rd = rd.cache();
- rd.foreach(new VoidFunction<MRData>() {
- public void call ( MRData value ) {
- if (((MR_bool)((Tuple)value).second()).get())
- c.add(1); // count the true results (the results that need another loop step)
- }
- });
- count = c.value();
- rd = rd.map(get_first);
- };
- return new DataSet(new RDDDataSource(rd),count,0);
+ final Environment env,
+ final String counter ) {
+ JavaRDD<MRData> rd = eval(e,env);
+ int count = 0;
+ if (!counter.equals("-")) {
+ final Accumulator<Integer> c = spark_context.intAccumulator(0);
+ rd = rd.cache();
+ rd.foreach(new VoidFunction<MRData>() {
+ public void call ( MRData value ) {
+ if (((MR_bool)((Tuple)value).second()).get())
+ c.add(1); // count the true results (the results that need another loop step)
+ }
+ });
+ count = c.value();
+ rd = rd.map(get_first);
+ };
+ return new DataSet(new RDDDataSource(rd),count,0);
}
/** Evaluate an MRQL physical plan using Spark and print tracing info
@@ -279,136 +279,136 @@ final public class Evaluator extends Interpreter implements Serializable {
* @return a Spark RDD
*/
final static JavaRDD<MRData> eval ( final Tree e, final Environment env ) {
- if (Config.trace_execution) {
- tab_count += 3;
- System.out.println(tabs(tab_count)+print_query(e));
- };
- final JavaRDD<MRData> res = evalD(e,env);
- if (Config.trace_execution)
- try {
- System.out.println(tabs(tab_count)+"-> "+res.collect());
- tab_count -= 3;
- } catch (Exception ex) {
- throw new Error("Cannot collect the operator output: "+e);
- };
- return res;
+ if (Config.trace_execution) {
+ tab_count += 3;
+ System.out.println(tabs(tab_count)+print_query(e));
+ };
+ final JavaRDD<MRData> res = evalD(e,env);
+ if (Config.trace_execution)
+ try {
+ System.out.println(tabs(tab_count)+"-> "+res.collect());
+ tab_count -= 3;
+ } catch (Exception ex) {
+ throw new Error("Cannot collect the operator output: "+e);
+ };
+ return res;
}
/* convert an MRQL lambda to a Spark Function */
private static FlatMapFunction<MRData,MRData> cmap_fnc ( Tree fnc, Environment env ) {
- final org.apache.mrql.Function f = evalF(fnc,env);
- return new FlatMapFunction<MRData,MRData>() {
- public Iterable<MRData> call ( MRData value ) {
- return (Bag)f.eval(value);
- }
- };
+ final org.apache.mrql.Function f = evalF(fnc,env);
+ return new FlatMapFunction<MRData,MRData>() {
+ public Iterable<MRData> call ( MRData value ) {
+ return (Bag)f.eval(value);
+ }
+ };
}
/* group-by s and then reduce by fnc; if o is true, sort after group-by */
private static JavaRDD<MRData> groupBy ( JavaRDD<MRData> s, Tree fnc, Environment env, Tree o ) {
- match o {
- case true: // the result must be sorted
- return s.groupBy(get_first)
- .sortByKey()
- .map(new Function<Tuple2<MRData,List<MRData>>,MRData>() {
- public MRData call ( Tuple2<MRData,List<MRData>> value ) {
- return new Tuple(value._1,bag(value._2));
- }
- })
- .flatMap(cmap_fnc(fnc,env)).map(new Function<MRData,MRData>() {
- public MRData call ( MRData value ) {
- return ((Tuple)value).second();
- }
- });
- };
- return s.groupBy(get_first)
- .map(new Function<Tuple2<MRData,List<MRData>>,MRData> () {
- public MRData call ( Tuple2<MRData,List<MRData>> value ) {
- final Iterator<MRData> i = value._2.iterator();
- return new Tuple(value._1,new Bag(new BagIterator() {
- public MRData next () {
- return ((Tuple)i.next()).second();
- }
- public boolean hasNext () {
- return i.hasNext();
- }
- }));
- }
- })
- .flatMap(cmap_fnc(fnc,env));
+ match o {
+ case true: // the result must be sorted
+ return s.groupBy(get_first)
+ .sortByKey()
+ .map(new Function<Tuple2<MRData,List<MRData>>,MRData>() {
+ public MRData call ( Tuple2<MRData,List<MRData>> value ) {
+ return new Tuple(value._1,bag(value._2));
+ }
+ })
+ .flatMap(cmap_fnc(fnc,env)).map(new Function<MRData,MRData>() {
+ public MRData call ( MRData value ) {
+ return ((Tuple)value).second();
+ }
+ });
+ };
+ return s.groupBy(get_first)
+ .map(new Function<Tuple2<MRData,List<MRData>>,MRData> () {
+ public MRData call ( Tuple2<MRData,List<MRData>> value ) {
+ final Iterator<MRData> i = value._2.iterator();
+ return new Tuple(value._1,new Bag(new BagIterator() {
+ public MRData next () {
+ return ((Tuple)i.next()).second();
+ }
+ public boolean hasNext () {
+ return i.hasNext();
+ }
+ }));
+ }
+ })
+ .flatMap(cmap_fnc(fnc,env));
}
private static JavaRDD<MRData> containerData ( JavaPairRDD<MRContainer,MRContainer> rd ) {
- final Environment env = Interpreter.global_env;
- return rd.map(new Function<Tuple2<MRContainer,MRContainer>,MRData>() {
- // need to pass the global bindings (the in-memory repeat vars) to workers
- boolean first = true;
- public MRData call ( Tuple2<MRContainer,MRContainer> value ) {
- if (first) {
- // need to pass the global bindings (the in-memory repeat vars) to workers
- Interpreter.set_global_bindings(env);
- first = false;
- };
- return value._2.data();
- }
- });
+ final Environment env = Interpreter.global_env;
+ return rd.map(new Function<Tuple2<MRContainer,MRContainer>,MRData>() {
+ // need to pass the global bindings (the in-memory repeat vars) to workers
+ boolean first = true;
+ public MRData call ( Tuple2<MRContainer,MRContainer> value ) {
+ if (first) {
+ // need to pass the global bindings (the in-memory repeat vars) to workers
+ Interpreter.set_global_bindings(env);
+ first = false;
+ };
+ return value._2.data();
+ }
+ });
}
private static Iterable<Tuple2<MRData,MRData>> joinIterator ( final Iterator<MRData> i ) {
- return new Iterable<Tuple2<MRData,MRData>>() {
- public Iterator<Tuple2<MRData,MRData>> iterator() {
- return new Iterator<Tuple2<MRData,MRData>> () {
- public Tuple2<MRData,MRData> next () {
- Tuple data = (Tuple)i.next();
- return new Tuple2<MRData,MRData>(data.first(),data.second());
- }
- public boolean hasNext () {
- return i.hasNext();
- }
- public void remove () {}
- };
- }
- };
+ return new Iterable<Tuple2<MRData,MRData>>() {
+ public Iterator<Tuple2<MRData,MRData>> iterator() {
+ return new Iterator<Tuple2<MRData,MRData>> () {
+ public Tuple2<MRData,MRData> next () {
+ Tuple data = (Tuple)i.next();
+ return new Tuple2<MRData,MRData>(data.first(),data.second());
+ }
+ public boolean hasNext () {
+ return i.hasNext();
+ }
+ public void remove () {}
+ };
+ }
+ };
}
private static FlatMapFunction<Iterator<MRData>,MRData> combiner_fnc ( final org.apache.mrql.Function f ) {
- return new FlatMapFunction<Iterator<MRData>,MRData>() {
- public Iterable<MRData> call ( final Iterator<MRData> i ) {
- return MapReduceAlgebra.cmap(new org.apache.mrql.Function() {
- public MRData eval ( MRData x ) {
- final MRData key = ((Tuple)x).first();
- final Iterator<MRData> it = ((Bag)f.eval(x)).iterator();
- return new Bag(new BagIterator() {
- public MRData next () {
- return new Tuple(key,it.next());
- }
- public boolean hasNext () {
- return it.hasNext();
- }
- });
- }},
- MapReduceAlgebra.groupBy(new Bag(new BagIterator() {
- public MRData next () {
- return i.next();
- }
- public boolean hasNext () {
- return i.hasNext();
- }
- })));
- }
- };
+ return new FlatMapFunction<Iterator<MRData>,MRData>() {
+ public Iterable<MRData> call ( final Iterator<MRData> i ) {
+ return MapReduceAlgebra.cmap(new org.apache.mrql.Function() {
+ public MRData eval ( MRData x ) {
+ final MRData key = ((Tuple)x).first();
+ final Iterator<MRData> it = ((Bag)f.eval(x)).iterator();
+ return new Bag(new BagIterator() {
+ public MRData next () {
+ return new Tuple(key,it.next());
+ }
+ public boolean hasNext () {
+ return it.hasNext();
+ }
+ });
+ }},
+ MapReduceAlgebra.groupBy(new Bag(new BagIterator() {
+ public MRData next () {
+ return i.next();
+ }
+ public boolean hasNext () {
+ return i.hasNext();
+ }
+ })));
+ }
+ };
}
private static Hashtable<MRData,Bag> make_built_table ( List<Tuple2<MRData,MRData>> values ) {
- Hashtable<MRData,Bag> built_table = new Hashtable<MRData,Bag>(Config.map_cache_size);
- for ( Tuple2<MRData,MRData> t: values ) {
- Bag entries = built_table.get(t._1);
- built_table.put(t._1,
- (entries == null)
- ? (new Bag(t._2))
- : entries.add_element(t._2));
- };
- return built_table;
+ Hashtable<MRData,Bag> built_table = new Hashtable<MRData,Bag>(Config.map_cache_size);
+ for ( Tuple2<MRData,MRData> t: values ) {
+ Bag entries = built_table.get(t._1);
+ built_table.put(t._1,
+ (entries == null)
+ ? (new Bag(t._2))
+ : entries.add_element(t._2));
+ };
+ return built_table;
}
/** Evaluate MRQL physical operators using Spark
@@ -417,272 +417,272 @@ final public class Evaluator extends Interpreter implements Serializable {
* @return a Spark RDD
*/
final static JavaRDD<MRData> evalD ( final Tree e, final Environment env ) {
- try {
- match e {
- case MapAggregateReduce(`m,`r,null,_,`s,`o):
- return evalD(#<MapReduce(`m,`r,`s,`o)>,env);
- case CrossAggregateProduct(`mx,`my,`r,null,_,`x,`y):
- return evalD(#<CrossProduct(`mx,`my,`r,`x,`y)>,env);
- case MapAggregateReduce2(`mx,`my,`r,null,_,`x,`y,`o):
- return evalD(#<MapReduce2(`mx,`my,`r,`x,`y,`o)>,env);
- case MapAggregateJoin(`mx,`my,`r,null,_,`x,`y):
- return evalD(#<MapJoin(`mx,`my,`r,`x,`y)>,env);
- case cMap(`f,`s):
- return eval(s,env).flatMap(cmap_fnc(f,env));
- case MapReduce(`m,`r,`s,`o):
- return groupBy(eval(s,env).flatMap(cmap_fnc(m,env)),r,env,o);
- case MapCombineReduce(`m,`c,`r,`s,`o):
- return groupBy(eval(s,env).flatMap(cmap_fnc(m,env))
- .mapPartitions(combiner_fnc(evalF(c,env))),r,env,o);
- case MapCombineReduce2(`mx,`my,`c,`r,`x,`y,`o):
- return evalD(#<MapReduce2(`mx,`my,`r,`x,`y,`o)>,env).mapPartitions(combiner_fnc(evalF(c,env)));
- case CrossProduct(`mx,`my,`r,`x,`y):
- final org.apache.mrql.Function fr = evalF(r,env);
- return eval(x,env)
- .flatMap(cmap_fnc(mx,env))
- .cartesian(eval(y,env).flatMap(cmap_fnc(my,env)))
- .flatMap(new FlatMapFunction<Tuple2<MRData,MRData>,MRData>() {
- public Iterable<MRData> call ( Tuple2<MRData,MRData> value ) {
- return (Bag)fr.eval(new Tuple(value._1,value._2));
- }
- });
- case MapReduce2(`mx,`my,`r,`x,`y,`o):
- final org.apache.mrql.Function fx = evalF(mx,env);
- final org.apache.mrql.Function fy = evalF(my,env);
- final org.apache.mrql.Function fr = evalF(r,env);
- JavaPairRDD<MRData,MRData> xs
- = eval(x,env).flatMap(new PairFlatMapFunction<MRData,MRData,MRData>() {
- public Iterable<Tuple2<MRData,MRData>> call ( MRData value ) {
- return joinIterator(((Bag)fx.eval(value)).iterator());
- }
- });
- JavaPairRDD<MRData,MRData> ys
- = eval(y,env).flatMap(new PairFlatMapFunction<MRData,MRData,MRData>() {
- public Iterable<Tuple2<MRData,MRData>> call ( MRData value ) {
- return joinIterator(((Bag)fy.eval(value)).iterator());
- }
- });
- return xs.cogroup(ys)
- .flatMap(new FlatMapFunction<Tuple2<MRData,Tuple2<List<MRData>,List<MRData>>>,MRData>() {
- public Iterable<MRData> call ( Tuple2<MRData,Tuple2<List<MRData>,List<MRData>>> value ) {
- return (Bag)fr.eval(new Tuple(bag(value._2._1),bag(value._2._2)));
- }
- });
- case GroupByJoin(`kx,`ky,`gx,`gy,`mp,`c,`r,`x,`y,`o):
- final int n = 10;
- final int m = 10;
- final org.apache.mrql.Function fkx = evalF(kx,env);
- final org.apache.mrql.Function fky = evalF(ky,env);
- final org.apache.mrql.Function fgx = evalF(gx,env);
- final org.apache.mrql.Function fgy = evalF(gy,env);
- final org.apache.mrql.Function fm = evalF(mp,env);
- final org.apache.mrql.Function fc = evalF(c,env);
- final org.apache.mrql.Function fr = evalF(r,env);
- JavaPairRDD<MRData,MRData> xs
- = eval(x,env).flatMap(new PairFlatMapFunction<MRData,MRData,MRData>() {
- public Iterable<Tuple2<MRData,MRData>> call ( final MRData value ) {
- return new Iterable<Tuple2<MRData,MRData>>() {
- public Iterator<Tuple2<MRData,MRData>> iterator() {
- return new Iterator<Tuple2<MRData,MRData>>() {
- int i = 0;
- public Tuple2<MRData,MRData> next () {
- MRData key = new MR_int((fgx.eval(value).hashCode() % m)+m*i);
- i++;
- return new Tuple2<MRData,MRData>(key,new Tuple(fkx.eval(value),value));
- }
- public boolean hasNext () {
- return i < n;
- }
- public void remove () {}
- };
- }
- };
- }
- });
- JavaPairRDD<MRData,MRData> ys
- = eval(y,env).flatMap(new PairFlatMapFunction<MRData,MRData,MRData>() {
- public Iterable<Tuple2<MRData,MRData>> call ( final MRData value ) {
- return new Iterable<Tuple2<MRData,MRData>>() {
- public Iterator<Tuple2<MRData,MRData>> iterator() {
- return new Iterator<Tuple2<MRData,MRData>>() {
- int i = 0;
- public Tuple2<MRData,MRData> next () {
- MRData key = new MR_int((fgy.eval(value).hashCode() % n)*m+i);
- i++;
- return new Tuple2<MRData,MRData>(key,new Tuple(fky.eval(value),value));
- }
- public boolean hasNext () {
- return i < m;
- }
- public void remove () {}
- };
- }
- };
- }
- });
- return xs.cogroup(ys)
- .flatMap(new FlatMapFunction<Tuple2<MRData,Tuple2<List<MRData>,List<MRData>>>,MRData>() {
- public Iterable<MRData> call ( Tuple2<MRData,Tuple2<List<MRData>,List<MRData>>> value ) {
- final Iterator<MRData> i = (MapReduceAlgebra.mergeGroupByJoin(fkx,fky,fgx,fgy,fm,fc,fr,
- bag(value._2._1),bag(value._2._2))).iterator();
- return new Iterable<MRData>() {
- public Iterator<MRData> iterator() {
- return new Iterator<MRData>() {
- public MRData next () {
- return i.next();
- }
- public boolean hasNext () {
- return i.hasNext();
- }
- public void remove () {}
- };
- }
- };
- }
- });
- case MapJoin(`mx,`my,`r,`x,`y):
- final org.apache.mrql.Function fx = evalF(mx,env);
- final org.apache.mrql.Function fy = evalF(my,env);
- final org.apache.mrql.Function fr = evalF(r,env);
- final Broadcast<List<Tuple2<MRData,MRData>>> ys
- = spark_context.broadcast(eval(y,env).flatMap(new PairFlatMapFunction<MRData,MRData,MRData>() {
- public Iterable<Tuple2<MRData,MRData>> call ( MRData value ) {
- return joinIterator(((Bag)fy.eval(value)).iterator());
- }
- }).collect());
- return eval(x,env).flatMap(new FlatMapFunction<MRData,MRData>() {
- final Hashtable<MRData,Bag> built_table = make_built_table(ys.value());
- public Iterable<MRData> call ( MRData value ) {
- final Iterator<MRData> i = ((Bag)fx.eval(value)).iterator();
- return new Iterable<MRData>() {
- public Iterator<MRData> iterator() {
- return new Iterator<MRData>() {
- Tuple p;
- Iterator<MRData> ix = null;
- public MRData next () {
- return ix.next();
- }
- public boolean hasNext () {
- if (ix != null && ix.hasNext())
- return true;
- while (i.hasNext()) {
- p = (Tuple)i.next();
- MRData pd = built_table.get(p.first());
- if (pd == null)
- continue;
- Bag bb = ((Bag)fr.eval(new Tuple(p.second(),pd)));
- ix = bb.iterator();
- if (ix.hasNext())
- return true;
- };
- return false;
- }
- public void remove () {}
- };
- }
- };
- }
- });
- case BinarySource(`file,_):
- String path = ((MR_string)evalE(file,env)).get();
- new BinaryDataSource(path,Plan.conf);
- return containerData(spark_context.sequenceFile(file.stringValue(),
- MRContainer.class,MRContainer.class,
- Config.nodes));
- case ParsedSource(`parser,`file,...args):
- String path = ((MR_string)evalE(file,env)).get();
- Class<? extends Parser> p = DataSource.parserDirectory.get(parser.toString());
- if (p == null)
- throw new Error("Unknown parser: "+parser);
- new ParsedDataSource(path,p,args,Plan.conf);
- dump_source_dir();
- return containerData(spark_context.hadoopFile(path,ParsedInputFormat.class,
- MRContainer.class,MRContainer.class,
- Config.nodes));
- case Merge(`x,`y):
- return eval(x,env).union(eval(y,env));
- case Repeat(lambda(`v,`b),`s,`n):
- int max_num = ((MR_int)evalE(n,env)).get();
- JavaRDD<MRData> rd;
- JavaRDD<MRData> res = eval(s,env).cache();
- int i = 0;
- boolean cont = true;
- do {
- rd = eval(b,new Environment(v.toString(),new MR_rdd(res),env)).cache();
- res = rd.map(get_first).cache();
- Integer true_results
- = rd.aggregate(new Integer(0),
- new Function2<Integer,MRData,Integer>() {
- public Integer call ( Integer x, MRData y ) {
- return (((MR_bool)((Tuple)y).second()).get()) ? x+1 : x;
- }
- },
- new Function2<Integer,Integer,Integer>() {
- public Integer call ( Integer x, Integer y ) { return x+y; }
- });
- i++;
- cont = true_results > 0 && i <= max_num;
- System.err.println("*** Repeat #"+i+": "+true_results+" true results");
- } while (cont);
- return res;
- case Closure(lambda(`v,`b),`s,`m):
- int max_num = ((MR_int)evalE(m,env)).get();
- JavaRDD<MRData> res = eval(s,env).cache();
- long n = 0;
- long old = 0;
- int i = 0;
- boolean cont = true;
- do {
- res = eval(b,new Environment(v.toString(),new MR_rdd(res),env)).cache();
- old = n;
- n = res.count();
- i++;
- System.err.println("*** Repeat #"+i+": "+(old-n)+" new records");
- } while (old < n && i <= max_num);
- return res;
- case Generator(`min,`max,`size):
- DataSet ds = Plan.generator(((MR_long)evalE(min,env)).get(),
- ((MR_long)evalE(max,env)).get(),
- ((MR_long)evalE(size,env)).get());
- JavaRDD<MRData> rd = null;
- for ( DataSource d: ds.source )
- if (rd == null)
- rd = containerData(spark_context.hadoopFile(d.path,GeneratorInputFormat.class,
- MRContainer.class,MRContainer.class,1));
- else rd = rd.union(containerData(spark_context.hadoopFile(d.path,GeneratorInputFormat.class,
- MRContainer.class,MRContainer.class,1)));
- return rd;
- case let(`v,`u,`body):
- return eval(body,new Environment(v.toString(),evalE(u,env),env));
- case Let(`v,`u,`body):
- return eval(body,new Environment(v.toString(),new MR_rdd(eval(u,env)),env));
- case If(`c,`x,`y):
- if (((MR_bool)evalE(c,env)).get())
- return eval(x,env);
- else return eval(y,env);
- case `v:
- if (!v.is_variable())
- fail;
- MRData x = variable_lookup(v.toString(),env);
- if (x != null)
- if (x instanceof MR_rdd)
- return ((MR_rdd)x).rdd();
- x = variable_lookup(v.toString(),global_env);
- if (x != null)
- if (x instanceof MR_rdd)
- return ((MR_rdd)x).rdd();
- throw new Error("Variable "+v+" is not bound");
- };
- throw new Error("Cannot evaluate the Spark plan: "+e);
- } catch (Error msg) {
- if (!Config.trace)
- throw new Error(msg.getMessage());
- System.err.println(msg.getMessage());
- throw new Error("Evaluation error in: "+print_query(e));
- } catch (Exception ex) {
- System.err.println(ex.getMessage());
- ex.printStackTrace();
- throw new Error("Evaluation error in: "+print_query(e));
- }
+ try {
+ match e {
+ case MapAggregateReduce(`m,`r,null,_,`s,`o):
+ return evalD(#<MapReduce(`m,`r,`s,`o)>,env);
+ case CrossAggregateProduct(`mx,`my,`r,null,_,`x,`y):
+ return evalD(#<CrossProduct(`mx,`my,`r,`x,`y)>,env);
+ case MapAggregateReduce2(`mx,`my,`r,null,_,`x,`y,`o):
+ return evalD(#<MapReduce2(`mx,`my,`r,`x,`y,`o)>,env);
+ case MapAggregateJoin(`mx,`my,`r,null,_,`x,`y):
+ return evalD(#<MapJoin(`mx,`my,`r,`x,`y)>,env);
+ case cMap(`f,`s):
+ return eval(s,env).flatMap(cmap_fnc(f,env));
+ case MapReduce(`m,`r,`s,`o):
+ return groupBy(eval(s,env).flatMap(cmap_fnc(m,env)),r,env,o);
+ case MapCombineReduce(`m,`c,`r,`s,`o):
+ return groupBy(eval(s,env).flatMap(cmap_fnc(m,env))
+ .mapPartitions(combiner_fnc(evalF(c,env))),r,env,o);
+ case MapCombineReduce2(`mx,`my,`c,`r,`x,`y,`o):
+ return evalD(#<MapReduce2(`mx,`my,`r,`x,`y,`o)>,env).mapPartitions(combiner_fnc(evalF(c,env)));
+ case CrossProduct(`mx,`my,`r,`x,`y):
+ final org.apache.mrql.Function fr = evalF(r,env);
+ return eval(x,env)
+ .flatMap(cmap_fnc(mx,env))
+ .cartesian(eval(y,env).flatMap(cmap_fnc(my,env)))
+ .flatMap(new FlatMapFunction<Tuple2<MRData,MRData>,MRData>() {
+ public Iterable<MRData> call ( Tuple2<MRData,MRData> value ) {
+ return (Bag)fr.eval(new Tuple(value._1,value._2));
+ }
+ });
+ case MapReduce2(`mx,`my,`r,`x,`y,`o):
+ final org.apache.mrql.Function fx = evalF(mx,env);
+ final org.apache.mrql.Function fy = evalF(my,env);
+ final org.apache.mrql.Function fr = evalF(r,env);
+ JavaPairRDD<MRData,MRData> xs
+ = eval(x,env).flatMap(new PairFlatMapFunction<MRData,MRData,MRData>() {
+ public Iterable<Tuple2<MRData,MRData>> call ( MRData value ) {
+ return joinIterator(((Bag)fx.eval(value)).iterator());
+ }
+ });
+ JavaPairRDD<MRData,MRData> ys
+ = eval(y,env).flatMap(new PairFlatMapFunction<MRData,MRData,MRData>() {
+ public Iterable<Tuple2<MRData,MRData>> call ( MRData value ) {
+ return joinIterator(((Bag)fy.eval(value)).iterator());
+ }
+ });
+ return xs.cogroup(ys)
+ .flatMap(new FlatMapFunction<Tuple2<MRData,Tuple2<List<MRData>,List<MRData>>>,MRData>() {
+ public Iterable<MRData> call ( Tuple2<MRData,Tuple2<List<MRData>,List<MRData>>> value ) {
+ return (Bag)fr.eval(new Tuple(bag(value._2._1),bag(value._2._2)));
+ }
+ });
+ case GroupByJoin(`kx,`ky,`gx,`gy,`mp,`c,`r,`x,`y,`o):
+ final int n = 10;
+ final int m = 10;
+ final org.apache.mrql.Function fkx = evalF(kx,env);
+ final org.apache.mrql.Function fky = evalF(ky,env);
+ final org.apache.mrql.Function fgx = evalF(gx,env);
+ final org.apache.mrql.Function fgy = evalF(gy,env);
+ final org.apache.mrql.Function fm = evalF(mp,env);
+ final org.apache.mrql.Function fc = evalF(c,env);
+ final org.apache.mrql.Function fr = evalF(r,env);
+ JavaPairRDD<MRData,MRData> xs
+ = eval(x,env).flatMap(new PairFlatMapFunction<MRData,MRData,MRData>() {
+ public Iterable<Tuple2<MRData,MRData>> call ( final MRData value ) {
+ return new Iterable<Tuple2<MRData,MRData>>() {
+ public Iterator<Tuple2<MRData,MRData>> iterator() {
+ return new Iterator<Tuple2<MRData,MRData>>() {
+ int i = 0;
+ public Tuple2<MRData,MRData> next () {
+ MRData key = new MR_int((fgx.eval(value).hashCode() % m)+m*i);
+ i++;
+ return new Tuple2<MRData,MRData>(key,new Tuple(fkx.eval(value),value));
+ }
+ public boolean hasNext () {
+ return i < n;
+ }
+ public void remove () {}
+ };
+ }
+ };
+ }
+ });
+ JavaPairRDD<MRData,MRData> ys
+ = eval(y,env).flatMap(new PairFlatMapFunction<MRData,MRData,MRData>() {
+ public Iterable<Tuple2<MRData,MRData>> call ( final MRData value ) {
+ return new Iterable<Tuple2<MRData,MRData>>() {
+ public Iterator<Tuple2<MRData,MRData>> iterator() {
+ return new Iterator<Tuple2<MRData,MRData>>() {
+ int i = 0;
+ public Tuple2<MRData,MRData> next () {
+ MRData key = new MR_int((fgy.eval(value).hashCode() % n)*m+i);
+ i++;
+ return new Tuple2<MRData,MRData>(key,new Tuple(fky.eval(value),value));
+ }
+ public boolean hasNext () {
+ return i < m;
+ }
+ public void remove () {}
+ };
+ }
+ };
+ }
+ });
+ return xs.cogroup(ys)
+ .flatMap(new FlatMapFunction<Tuple2<MRData,Tuple2<List<MRData>,List<MRData>>>,MRData>() {
+ public Iterable<MRData> call ( Tuple2<MRData,Tuple2<List<MRData>,List<MRData>>> value ) {
+ final Iterator<MRData> i = (MapReduceAlgebra.mergeGroupByJoin(fkx,fky,fgx,fgy,fm,fc,fr,
+ bag(value._2._1),bag(value._2._2))).iterator();
+ return new Iterable<MRData>() {
+ public Iterator<MRData> iterator() {
+ return new Iterator<MRData>() {
+ public MRData next () {
+ return i.next();
+ }
+ public boolean hasNext () {
+ return i.hasNext();
+ }
+ public void remove () {}
+ };
+ }
+ };
+ }
+ });
+ case MapJoin(`mx,`my,`r,`x,`y):
+ final org.apache.mrql.Function fx = evalF(mx,env);
+ final org.apache.mrql.Function fy = evalF(my,env);
+ final org.apache.mrql.Function fr = evalF(r,env);
+ final Broadcast<List<Tuple2<MRData,MRData>>> ys
+ = spark_context.broadcast(eval(y,env).flatMap(new PairFlatMapFunction<MRData,MRData,MRData>() {
+ public Iterable<Tuple2<MRData,MRData>> call ( MRData value ) {
+ return joinIterator(((Bag)fy.eval(value)).iterator());
+ }
+ }).collect());
+ return eval(x,env).flatMap(new FlatMapFunction<MRData,MRData>() {
+ final Hashtable<MRData,Bag> built_table = make_built_table(ys.value());
+ public Iterable<MRData> call ( MRData value ) {
+ final Iterator<MRData> i = ((Bag)fx.eval(value)).iterator();
+ return new Iterable<MRData>() {
+ public Iterator<MRData> iterator() {
+ return new Iterator<MRData>() {
+ Tuple p;
+ Iterator<MRData> ix = null;
+ public MRData next () {
+ return ix.next();
+ }
+ public boolean hasNext () {
+ if (ix != null && ix.hasNext())
+ return true;
+ while (i.hasNext()) {
+ p = (Tuple)i.next();
+ MRData pd = built_table.get(p.first());
+ if (pd == null)
+ continue;
+ Bag bb = ((Bag)fr.eval(new Tuple(p.second(),pd)));
+ ix = bb.iterator();
+ if (ix.hasNext())
+ return true;
+ };
+ return false;
+ }
+ public void remove () {}
+ };
+ }
+ };
+ }
+ });
+ case BinarySource(`file,_):
+ String path = ((MR_string)evalE(file,env)).get();
+ new BinaryDataSource(path,Plan.conf);
+ return containerData(spark_context.sequenceFile(file.stringValue(),
+ MRContainer.class,MRContainer.class,
+ Config.nodes));
+ case ParsedSource(`parser,`file,...args):
+ String path = ((MR_string)evalE(file,env)).get();
+ Class<? extends Parser> p = DataSource.parserDirectory.get(parser.toString());
+ if (p == null)
+ throw new Error("Unknown parser: "+parser);
+ new ParsedDataSource(path,p,args,Plan.conf);
+ dump_source_dir();
+ return containerData(spark_context.hadoopFile(path,ParsedInputFormat.class,
+ MRContainer.class,MRContainer.class,
+ Config.nodes));
+ case Merge(`x,`y):
+ return eval(x,env).union(eval(y,env));
+ case Repeat(lambda(`v,`b),`s,`n):
+ int max_num = ((MR_int)evalE(n,env)).get();
+ JavaRDD<MRData> rd;
+ JavaRDD<MRData> res = eval(s,env).cache();
+ int i = 0;
+ boolean cont = true;
+ do {
+ rd = eval(b,new Environment(v.toString(),new MR_rdd(res),env)).cache();
+ res = rd.map(get_first).cache();
+ Integer true_results
+ = rd.aggregate(new Integer(0),
+ new Function2<Integer,MRData,Integer>() {
+ public Integer call ( Integer x, MRData y ) {
+ return (((MR_bool)((Tuple)y).second()).get()) ? x+1 : x;
+ }
+ },
+ new Function2<Integer,Integer,Integer>() {
+ public Integer call ( Integer x, Integer y ) { return x+y; }
+ });
+ i++;
+ cont = true_results > 0 && i <= max_num;
+ System.err.println("*** Repeat #"+i+": "+true_results+" true results");
+ } while (cont);
+ return res;
+ case Closure(lambda(`v,`b),`s,`m):
+ int max_num = ((MR_int)evalE(m,env)).get();
+ JavaRDD<MRData> res = eval(s,env).cache();
+ long n = 0;
+ long old = 0;
+ int i = 0;
+ boolean cont = true;
+ do {
+ res = eval(b,new Environment(v.toString(),new MR_rdd(res),env)).cache();
+ old = n;
+ n = res.count();
+ i++;
+ System.err.println("*** Repeat #"+i+": "+(old-n)+" new records");
+ } while (old < n && i <= max_num);
+ return res;
+ case Generator(`min,`max,`size):
+ DataSet ds = Plan.generator(((MR_long)evalE(min,env)).get(),
+ ((MR_long)evalE(max,env)).get(),
+ ((MR_long)evalE(size,env)).get());
+ JavaRDD<MRData> rd = null;
+ for ( DataSource d: ds.source )
+ if (rd == null)
+ rd = containerData(spark_context.hadoopFile(d.path,GeneratorInputFormat.class,
+ MRContainer.class,MRContainer.class,1));
+ else rd = rd.union(containerData(spark_context.hadoopFile(d.path,GeneratorInputFormat.class,
+ MRContainer.class,MRContainer.class,1)));
+ return rd;
+ case let(`v,`u,`body):
+ return eval(body,new Environment(v.toString(),evalE(u,env),env));
+ case Let(`v,`u,`body):
+ return eval(body,new Environment(v.toString(),new MR_rdd(eval(u,env)),env));
+ case If(`c,`x,`y):
+ if (((MR_bool)evalE(c,env)).get())
+ return eval(x,env);
+ else return eval(y,env);
+ case `v:
+ if (!v.is_variable())
+ fail;
+ MRData x = variable_lookup(v.toString(),env);
+ if (x != null)
+ if (x instanceof MR_rdd)
+ return ((MR_rdd)x).rdd();
+ x = variable_lookup(v.toString(),global_env);
+ if (x != null)
+ if (x instanceof MR_rdd)
+ return ((MR_rdd)x).rdd();
+ throw new Error("Variable "+v+" is not bound");
+ };
+ throw new Error("Cannot evaluate the Spark plan: "+e);
+ } catch (Error msg) {
+ if (!Config.trace)
+ throw new Error(msg.getMessage());
+ System.err.println(msg.getMessage());
+ throw new Error("Evaluation error in: "+print_query(e));
+ } catch (Exception ex) {
+ System.err.println(ex.getMessage());
+ ex.printStackTrace();
+ throw new Error("Evaluation error in: "+print_query(e));
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/3de9e485/src/main/java/spark/GeneratorInputFormat.java
----------------------------------------------------------------------
diff --git a/src/main/java/spark/GeneratorInputFormat.java b/src/main/java/spark/GeneratorInputFormat.java
index 61d7c34..c9e6066 100644
--- a/src/main/java/spark/GeneratorInputFormat.java
+++ b/src/main/java/spark/GeneratorInputFormat.java
@@ -29,52 +29,52 @@ import org.apache.hadoop.mapred.*;
* an (offset,size) pair that generates the range of values [offset,offset+size] */
final public class GeneratorInputFormat extends MRQLFileInputFormat {
public static class GeneratorRecordReader implements RecordReader<MRContainer,MRContainer> {
- final long offset;
- final long size;
- long index;
- SequenceFile.Reader reader;
+ final long offset;
+ final long size;
+ long index;
+ SequenceFile.Reader reader;
- public GeneratorRecordReader ( FileSplit split,
- Configuration conf ) throws IOException {
- Path path = split.getPath();
- FileSystem fs = path.getFileSystem(conf);
- reader = new SequenceFile.Reader(path.getFileSystem(conf),path,conf);
- MRContainer key = new MRContainer();
- MRContainer value = new MRContainer();
- reader.next(key,value);
- offset = ((MR_long)((Tuple)(value.data())).first()).get();
- size = ((MR_long)((Tuple)(value.data())).second()).get();
- index = -1;
- }
+ public GeneratorRecordReader ( FileSplit split,
+ Configuration conf ) throws IOException {
+ Path path = split.getPath();
+ FileSystem fs = path.getFileSystem(conf);
+ reader = new SequenceFile.Reader(path.getFileSystem(conf),path,conf);
+ MRContainer key = new MRContainer();
+ MRContainer value = new MRContainer();
+ reader.next(key,value);
+ offset = ((MR_long)((Tuple)(value.data())).first()).get();
+ size = ((MR_long)((Tuple)(value.data())).second()).get();
+ index = -1;
+ }
- public MRContainer createKey () {
- return new MRContainer(null);
- }
+ public MRContainer createKey () {
+ return new MRContainer(null);
+ }
- public MRContainer createValue () {
- return new MRContainer(null);
- }
+ public MRContainer createValue () {
+ return new MRContainer(null);
+ }
- public boolean next ( MRContainer key, MRContainer value ) throws IOException {
- index++;
- value.set(new MR_long(offset+index));
- key.set(new MR_long(index));
- return index < size;
- }
+ public boolean next ( MRContainer key, MRContainer value ) throws IOException {
+ index++;
+ value.set(new MR_long(offset+index));
+ key.set(new MR_long(index));
+ return index < size;
+ }
- public long getPos () throws IOException { return index; }
+ public long getPos () throws IOException { return index; }
- public void close () throws IOException { reader.close(); }
+ public void close () throws IOException { reader.close(); }
- public float getProgress () throws IOException {
- return index / (float)size;
- }
+ public float getProgress () throws IOException {
+ return index / (float)size;
+ }
}
public RecordReader<MRContainer,MRContainer>
- getRecordReader ( InputSplit split, JobConf job, Reporter reporter ) throws IOException {
- return (RecordReader<MRContainer,MRContainer>)
- new GeneratorRecordReader((FileSplit)split,job);
+ getRecordReader ( InputSplit split, JobConf job, Reporter reporter ) throws IOException {
+ return (RecordReader<MRContainer,MRContainer>)
+ new GeneratorRecordReader((FileSplit)split,job);
}
/** Insert all results from the generators stored in path into a Bag.
@@ -83,31 +83,31 @@ final public class GeneratorInputFormat extends MRQLFileInputFormat {
* @return a Bag that contains all data
*/
public Bag materialize ( final Path path ) throws IOException {
- Configuration conf = Plan.conf;
- FileSystem fs = path.getFileSystem(conf);
- final SequenceFile.Reader reader = new SequenceFile.Reader(path.getFileSystem(conf),path,conf);
- final MRContainer key = new MRContainer();
- final MRContainer value = new MRContainer();
- return new Bag(new BagIterator () {
- long offset = 0;
- long size = 0;
- long i = 0;
- public boolean hasNext () {
- if (++i >= offset+size)
- try {
- if (!reader.next(key,value))
- return false;
- offset = ((MR_long)((Tuple)(value.data())).first()).get();
- size = ((MR_long)((Tuple)(value.data())).second()).get();
- i = offset;
- } catch (IOException e) {
- throw new Error("Cannot collect values from a generator");
- };
- return true;
- }
- public MRData next () {
- return new MR_long(i);
- }
- });
+ Configuration conf = Plan.conf;
+ FileSystem fs = path.getFileSystem(conf);
+ final SequenceFile.Reader reader = new SequenceFile.Reader(path.getFileSystem(conf),path,conf);
+ final MRContainer key = new MRContainer();
+ final MRContainer value = new MRContainer();
+ return new Bag(new BagIterator () {
+ long offset = 0;
+ long size = 0;
+ long i = 0;
+ public boolean hasNext () {
+ if (++i >= offset+size)
+ try {
+ if (!reader.next(key,value))
+ return false;
+ offset = ((MR_long)((Tuple)(value.data())).first()).get();
+ size = ((MR_long)((Tuple)(value.data())).second()).get();
+ i = offset;
+ } catch (IOException e) {
+ throw new Error("Cannot collect values from a generator");
+ };
+ return true;
+ }
+ public MRData next () {
+ return new MR_long(i);
+ }
+ });
}
}
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/3de9e485/src/main/java/spark/MRQLFileInputFormat.java
----------------------------------------------------------------------
diff --git a/src/main/java/spark/MRQLFileInputFormat.java b/src/main/java/spark/MRQLFileInputFormat.java
index ca26e43..73b0910 100644
--- a/src/main/java/spark/MRQLFileInputFormat.java
+++ b/src/main/java/spark/MRQLFileInputFormat.java
@@ -30,39 +30,39 @@ abstract public class MRQLFileInputFormat extends FileInputFormat<MRContainer,MR
/** record reader for spark */
abstract public RecordReader<MRContainer,MRContainer>
- getRecordReader ( InputSplit split, JobConf job, Reporter reporter ) throws IOException;
+ getRecordReader ( InputSplit split, JobConf job, Reporter reporter ) throws IOException;
/** materialize the input file into a memory Bag */
public Bag materialize ( final Path file ) throws Exception {
- final JobConf job = new JobConf(Plan.conf,MRQLFileInputFormat.class);
- setInputPaths(job,file);
- final InputSplit[] splits = getSplits(job,1);
- final Reporter reporter = null;
- final RecordReader<MRContainer,MRContainer> rd = getRecordReader(splits[0],job,reporter);
- return new Bag(new BagIterator () {
- RecordReader<MRContainer,MRContainer> reader = rd;
- MRContainer key = reader.createKey();
- MRContainer value = reader.createKey();
- int i = 0;
- public boolean hasNext () {
- try {
- if (reader.next(key,value))
- return true;
- do {
- if (++i >= splits.length)
- return false;
- reader.close();
- reader = getRecordReader(splits[i],job,reporter);
- } while (!reader.next(key,value));
- return true;
- } catch (IOException e) {
- throw new Error("Cannot collect values from an intermediate result");
- }
- }
- public MRData next () {
- return value.data();
- }
- });
+ final JobConf job = new JobConf(Plan.conf,MRQLFileInputFormat.class);
+ setInputPaths(job,file);
+ final InputSplit[] splits = getSplits(job,1);
+ final Reporter reporter = null;
+ final RecordReader<MRContainer,MRContainer> rd = getRecordReader(splits[0],job,reporter);
+ return new Bag(new BagIterator () {
+ RecordReader<MRContainer,MRContainer> reader = rd;
+ MRContainer key = reader.createKey();
+ MRContainer value = reader.createKey();
+ int i = 0;
+ public boolean hasNext () {
+ try {
+ if (reader.next(key,value))
+ return true;
+ do {
+ if (++i >= splits.length)
+ return false;
+ reader.close();
+ reader = getRecordReader(splits[i],job,reporter);
+ } while (!reader.next(key,value));
+ return true;
+ } catch (IOException e) {
+ throw new Error("Cannot collect values from an intermediate result");
+ }
+ }
+ public MRData next () {
+ return value.data();
+ }
+ });
}
/** materialize the entire dataset into a Bag
@@ -71,13 +71,13 @@ abstract public class MRQLFileInputFormat extends FileInputFormat<MRContainer,MR
* @return the Bag that contains the collected values
*/
public final static Bag collect ( final DataSet x, boolean strip ) throws Exception {
- Bag res = new Bag();
- for ( DataSource s: x.source )
- if (s instanceof RDDDataSource)
- res = res.union(Evaluator.bag(((RDDDataSource)s).rdd));
- else if (s.to_be_merged)
- res = res.union(Plan.merge(s));
- else res = res.union(s.inputFormat.newInstance().materialize(new Path(s.path)));
- return res;
+ Bag res = new Bag();
+ for ( DataSource s: x.source )
+ if (s instanceof RDDDataSource)
+ res = res.union(Evaluator.bag(((RDDDataSource)s).rdd));
+ else if (s.to_be_merged)
+ res = res.union(Plan.merge(s));
+ else res = res.union(s.inputFormat.newInstance().materialize(new Path(s.path)));
+ return res;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/3de9e485/src/main/java/spark/MR_rdd.java
----------------------------------------------------------------------
diff --git a/src/main/java/spark/MR_rdd.java b/src/main/java/spark/MR_rdd.java
index e2739df..4b3eff5 100644
--- a/src/main/java/spark/MR_rdd.java
+++ b/src/main/java/spark/MR_rdd.java
@@ -37,18 +37,18 @@ final public class MR_rdd extends MRData {
public JavaRDD<MRData> rdd () { return rdd; }
final public void write ( DataOutput out ) throws IOException {
- throw new Error("RDDs are not serializable");
+ throw new Error("RDDs are not serializable");
}
public void readFields ( DataInput in ) throws IOException {
- throw new Error("RDDs are not serializable");
+ throw new Error("RDDs are not serializable");
}
public int compareTo ( MRData x ) {
- throw new Error("RDDs cannot be compared");
+ throw new Error("RDDs cannot be compared");
}
public boolean equals ( Object x ) {
- throw new Error("RDDs cannot be compared");
+ throw new Error("RDDs cannot be compared");
}
}
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/3de9e485/src/main/java/spark/ParsedInputFormat.java
----------------------------------------------------------------------
diff --git a/src/main/java/spark/ParsedInputFormat.java b/src/main/java/spark/ParsedInputFormat.java
index 016c3e1..8ca7db6 100644
--- a/src/main/java/spark/ParsedInputFormat.java
+++ b/src/main/java/spark/ParsedInputFormat.java
@@ -29,69 +29,69 @@ import org.apache.hadoop.conf.Configuration;
/** A FileInputFormat for text files (CVS, XML, JSON, ...) */
final public class ParsedInputFormat extends MRQLFileInputFormat {
public static class ParsedRecordReader implements RecordReader<MRContainer,MRContainer> {
- final FSDataInputStream fsin;
- final long start;
- final long end;
- Iterator<MRData> result;
- Parser parser;
+ final FSDataInputStream fsin;
+ final long start;
+ final long end;
+ Iterator<MRData> result;
+ Parser parser;
- public ParsedRecordReader ( FileSplit split,
- Configuration conf,
- Class<? extends Parser> parser_class,
- Trees args ) throws IOException {
- start = split.getStart();
- end = start + split.getLength();
- Path file = split.getPath();
- FileSystem fs = file.getFileSystem(conf);
- fsin = fs.open(split.getPath());
- try {
- parser = parser_class.newInstance();
- } catch (Exception ex) {
- throw new Error("Unrecognized parser:"+parser_class);
- };
- parser.initialize(args);
- parser.open(fsin,start,end);
- result = null;
- }
+ public ParsedRecordReader ( FileSplit split,
+ Configuration conf,
+ Class<? extends Parser> parser_class,
+ Trees args ) throws IOException {
+ start = split.getStart();
+ end = start + split.getLength();
+ Path file = split.getPath();
+ FileSystem fs = file.getFileSystem(conf);
+ fsin = fs.open(split.getPath());
+ try {
+ parser = parser_class.newInstance();
+ } catch (Exception ex) {
+ throw new Error("Unrecognized parser:"+parser_class);
+ };
+ parser.initialize(args);
+ parser.open(fsin,start,end);
+ result = null;
+ }
public MRContainer createKey () {
- return new MRContainer();
- }
+ return new MRContainer();
+ }
- public MRContainer createValue () {
- return new MRContainer();
- }
+ public MRContainer createValue () {
+ return new MRContainer();
+ }
- public synchronized boolean next ( MRContainer key, MRContainer value ) throws IOException {
- while (result == null || !result.hasNext()) {
- String s = parser.slice();
- if (s == null)
- return false;
- result = parser.parse(s).iterator();
- };
- value.set((MRData)result.next());
- key.set(new MR_long(fsin.getPos()));
- return true;
- }
+ public synchronized boolean next ( MRContainer key, MRContainer value ) throws IOException {
+ while (result == null || !result.hasNext()) {
+ String s = parser.slice();
+ if (s == null)
+ return false;
+ result = parser.parse(s).iterator();
+ };
+ value.set((MRData)result.next());
+ key.set(new MR_long(fsin.getPos()));
+ return true;
+ }
- public synchronized long getPos () throws IOException { return fsin.getPos(); }
+ public synchronized long getPos () throws IOException { return fsin.getPos(); }
- public synchronized void close () throws IOException { fsin.close(); }
+ public synchronized void close () throws IOException { fsin.close(); }
- public float getProgress () throws IOException {
- if (end == start)
- return 0.0f;
- else return Math.min(1.0f, (getPos() - start) / (float)(end - start));
- }
+ public float getProgress () throws IOException {
+ if (end == start)
+ return 0.0f;
+ else return Math.min(1.0f, (getPos() - start) / (float)(end - start));
+ }
}
public RecordReader<MRContainer,MRContainer>
- getRecordReader ( InputSplit split,
- JobConf job,
- Reporter reporter ) throws IOException {
- Evaluator.load_source_dir(); // load the parsed source parameters from a file
- String path = ((FileSplit)split).getPath().toString();
- ParsedDataSource ds = (ParsedDataSource)DataSource.get(path,Plan.conf);
- return new ParsedRecordReader((FileSplit)split,job,ds.parser,(Trees)ds.args);
+ getRecordReader ( InputSplit split,
+ JobConf job,
+ Reporter reporter ) throws IOException {
+ Evaluator.load_source_dir(); // load the parsed source parameters from a file
+ String path = ((FileSplit)split).getPath().toString();
+ ParsedDataSource ds = (ParsedDataSource)DataSource.get(path,Plan.conf);
+ return new ParsedRecordReader((FileSplit)split,job,ds.parser,(Trees)ds.args);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/3de9e485/src/main/java/spark/RDDDataSource.java
----------------------------------------------------------------------
diff --git a/src/main/java/spark/RDDDataSource.java b/src/main/java/spark/RDDDataSource.java
index 2e7ef63..609c753 100644
--- a/src/main/java/spark/RDDDataSource.java
+++ b/src/main/java/spark/RDDDataSource.java
@@ -25,12 +25,12 @@ final public class RDDDataSource extends DataSource {
JavaRDD<MRData> rdd;
RDDDataSource ( JavaRDD<MRData> rdd ) {
- super();
- this.rdd = rdd;
+ super();
+ this.rdd = rdd;
}
@Override
public long size ( Configuration conf ) {
- return rdd.count();
+ return rdd.count();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/3de9e485/tests/queries/core_1.mrql
----------------------------------------------------------------------
diff --git a/tests/queries/core_1.mrql b/tests/queries/core_1.mrql
index 50794d7..dd221d3 100644
--- a/tests/queries/core_1.mrql
+++ b/tests/queries/core_1.mrql
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
[1,2,3,4][2];
[1..1000][10:20];
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/3de9e485/tests/queries/distinct_1.mrql
----------------------------------------------------------------------
diff --git a/tests/queries/distinct_1.mrql b/tests/queries/distinct_1.mrql
index 4118729..46a19c9 100644
--- a/tests/queries/distinct_1.mrql
+++ b/tests/queries/distinct_1.mrql
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
C = source(line,"tests/data/customer.tbl","|",type(<CUSTKEY:int,NAME:string>));
O = source(line,"tests/data/orders.tbl","|",type(<ORDERKEY:string,CUSTKEY:int,ORDERSTATUS:any,TOTALPRICE:float>));
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/3de9e485/tests/queries/factorization_1.mrql
----------------------------------------------------------------------
diff --git a/tests/queries/factorization_1.mrql b/tests/queries/factorization_1.mrql
index 13b86d2..e46abd3 100644
--- a/tests/queries/factorization_1.mrql
+++ b/tests/queries/factorization_1.mrql
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
Mmatrix = source(line,"tests/data/Xmatrix.txt",",",type( (double,long,long) ));
Hmatrix = source(line,"tests/data/Ymatrix.txt",",",type( (double,long,long) ));
Wmatrix = source(line,"tests/data/Ymatrix.txt",",",type( (double,long,long) ));
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/3de9e485/tests/queries/group_by_1.mrql
----------------------------------------------------------------------
diff --git a/tests/queries/group_by_1.mrql b/tests/queries/group_by_1.mrql
index 8210e02..119c260 100644
--- a/tests/queries/group_by_1.mrql
+++ b/tests/queries/group_by_1.mrql
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
C = source(line,"tests/data/customer.tbl","|",type(<CUSTKEY:int,NAME:string>));
O = source(line,"tests/data/orders.tbl","|",type(<ORDERKEY:string,CUSTKEY:int,ORDERSTATUS:any,TOTALPRICE:float>));
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/3de9e485/tests/queries/group_by_having_1.mrql
----------------------------------------------------------------------
diff --git a/tests/queries/group_by_having_1.mrql b/tests/queries/group_by_having_1.mrql
index 07ba883..31f6fa4 100644
--- a/tests/queries/group_by_having_1.mrql
+++ b/tests/queries/group_by_having_1.mrql
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
C = source(line,"tests/data/customer.tbl","|",type(<CUSTKEY:int,NAME:string>));
O = source(line,"tests/data/orders.tbl","|",type(<ORDERKEY:string,CUSTKEY:int,ORDERSTATUS:any,TOTALPRICE:float>));
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/3de9e485/tests/queries/group_by_order_by_1.mrql
----------------------------------------------------------------------
diff --git a/tests/queries/group_by_order_by_1.mrql b/tests/queries/group_by_order_by_1.mrql
index 0652c84..e72b463 100644
--- a/tests/queries/group_by_order_by_1.mrql
+++ b/tests/queries/group_by_order_by_1.mrql
@@ -1,3 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
C = source(line,"tests/data/customer.tbl","|",type(<CUSTKEY:int,NAME:string>));
O = source(line,"tests/data/orders.tbl","|",type(<ORDERKEY:string,CUSTKEY:int,ORDERSTATUS:any,TOTALPRICE:float>));