You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mrql.apache.org by fe...@apache.org on 2013/10/01 03:29:29 UTC
[2/5] MRQL-19: Fix license issues to prepare a new release candidate
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/17a0ed95/src/main/java/spark/Evaluator.gen
----------------------------------------------------------------------
diff --git a/src/main/java/spark/Evaluator.gen b/src/main/java/spark/Evaluator.gen
deleted file mode 100644
index 30905b1..0000000
--- a/src/main/java/spark/Evaluator.gen
+++ /dev/null
@@ -1,688 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.mrql;
-
-import java_cup.runtime.*;
-import org.apache.mrql.gen.*;
-import java.util.*;
-import java.io.*;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.FileInputFormat;
-import scala.Tuple2;
-import spark.TaskContext;
-import spark.Partition;
-import spark.Accumulator;
-import spark.broadcast.Broadcast;
-import spark.api.java.JavaRDD;
-import spark.api.java.JavaPairRDD;
-import spark.api.java.JavaSparkContext;
-import spark.api.java.function.Function2;
-import spark.api.java.function.PairFunction;
-import spark.api.java.function.FlatMapFunction;
-import spark.api.java.function.PairFlatMapFunction;
-import spark.api.java.function.Function;
-import spark.api.java.function.VoidFunction;
-
-
-/** Evaluates physical plans in Spark mode */
-final public class Evaluator extends Interpreter implements Serializable {
- static JavaSparkContext spark_context;
- final static String mrql_jar = Evaluator.class.getProtectionDomain().getCodeSource().getLocation().toString();
- static String data_source_directory;
-
- /** initialize the Spark evaluator */
- final static void init ( Configuration conf ) throws IOException {
- Config.spark_mode = true;
- if (Config.hadoop_mode && Config.local_mode)
- spark_context = new JavaSparkContext("local["+Config.nodes+"]",
- "MRQL Local Spark Evaluator");
- else if (Config.hadoop_mode) {
- HashMap<String,String> env = new HashMap<String,String>();
- data_source_directory = System.getenv("FS_DEFAULT_NAME")+"/"+Plan.new_path(conf);
- env.put("mrql.data.source.directory",data_source_directory);
- spark_context = new JavaSparkContext(System.getenv("SPARK_MASTER"),
- "MRQL Spark Evaluator",
- System.getenv("SPARK_HOME"),
- new String[]{mrql_jar},
- env);
- Plan.conf = spark_context.hadoopConfiguration();
- FileSystem.setDefaultUri(Plan.conf,System.getenv("FS_DEFAULT_NAME"));
- }
- }
-
- /** shutdown the Spark evaluator */
- final static void shutdown ( Configuration conf ) {
- spark_context.stop();
- spark_context = null;
- System.clearProperty("spark.driver.port");
- }
-
- final static void initialize_query () {
- spark_context.clearJars();
- spark_context.addJar(mrql_jar);
- Plan.distribute_compiled_arguments(Plan.conf);
- if (Config.compile_functional_arguments)
- spark_context.addJar(Plan.conf.get("mrql.jar.path"));
- }
-
- final static Configuration new_configuration () {
- return new Configuration();
- }
-
- public static MR_bool synchronize ( MR_string peerName, MR_bool mr_exit ) {
- throw new Error("You can only synchronize BSP tasks");
- }
-
- public static Bag distribute ( MR_string peerName, Bag s ) {
- throw new Error("You can only distribute bags among BSP tasks");
- }
-
- final static Bag collect ( final DataSet x ) throws Exception {
- return Plan.collect(x);
- }
-
- final static MRData bsp ( Tree plan, Environment env ) throws Exception {
- throw new Error("You can not run a BSP task in Spark mode");
- }
-
- /** used by the master to send parsing details (eg, record types) to workers */
- public static void dump_source_dir () throws IOException {
- if (Config.local_mode)
- return;
- DataSource.dataSourceDirectory.distribute(Plan.conf);
- Path path = new Path(data_source_directory);
- FileSystem fs = path.getFileSystem(Plan.conf);
- PrintStream ps = new PrintStream(fs.create(path,true));
- ps.println(Plan.conf.get("mrql.data.source.directory"));
- ps.close();
- }
-
- /** executed by a worker when reading parsed input (see ParsedInputFormat) */
- public static void load_source_dir () throws IOException {
- if (Config.local_mode)
- return;
- if (Plan.conf == null)
- Plan.conf = new_configuration();
- // the name of the file that contains the source directory details is passed
- // to workers through the HashMap environment in the JavaSparkContext
- Path path = new Path(System.getenv("mrql.data.source.directory"));
- FileSystem fs = path.getFileSystem(Plan.conf);
- BufferedReader ftp = new BufferedReader(new InputStreamReader(fs.open(path)));
- Plan.conf.set("mrql.data.source.directory",ftp.readLine());
- DataSource.dataSourceDirectory.read(Plan.conf);
- ftp.close();
- }
-
- private static Function2<MRData,MRData,MRData> accumulator ( Tree acc_fnc, Environment env ) {
- final org.apache.mrql.Function f = evalF(acc_fnc,env);
- return new Function2<MRData,MRData,MRData>() {
- public MRData call ( MRData x, MRData y ) {
- return f.eval(new Tuple(x,y));
- }
- };
- }
-
- /** The Aggregate physical operator
- * @param acc_fnc the accumulator function from (T,T) to T
- * @param zero the zero element of type T
- * @param plan the plan that constructs the dataset that contains the bag of values {T}
- * @param env contains bindings fro variables to values (MRData)
- * @return the aggregation result of type T
- */
- final static MRData aggregate ( Tree acc_fnc,
- Tree zero,
- Tree plan,
- Environment env ) throws Exception {
- Function2<MRData,MRData,MRData> f2 = accumulator(acc_fnc,env);
- MRData z = evalE(zero,env);
- match plan {
- case AggregateMap(`m,`acc,_,`s):
- return evalD(#<cMap(`m,`s)>,env)
- .aggregate(z,accumulator(acc,env),f2);
- case MapAggregateReduce(`m,`r,`acc,_,`s,`o):
- if (acc.equals(#<null>))
- fail;
- return evalD(#<MapReduce(`m,`r,`s,`o)>,env)
- .aggregate(z,accumulator(acc,env),f2);
- case CrossAggregateProduct(`mx,`my,`r,`acc,_,`x,`y):
- if (acc.equals(#<null>))
- fail;
- return evalD(#<CrossProduct(`mx,`my,`r,`x,`y)>,env)
- .aggregate(z,accumulator(acc,env),f2);
- case MapAggregateReduce2(`mx,`my,`r,`acc,_,`x,`y,`o):
- if (acc.equals(#<null>))
- fail;
- return evalD(#<MapReduce2(`mx,`my,`r,`x,`y,`o)>,env)
- .aggregate(z,accumulator(acc,env),f2);
- case MapAggregateJoin(`mx,`my,`r,`acc,_,`x,`y):
- if (acc.equals(#<null>))
- fail;
- return evalD(#<MapJoin(`mx,`my,`r,`x,`y)>,env)
- .aggregate(z,accumulator(acc,env),f2);
- };
- throw new Error("Unrecognized aggregation: "+plan);
- }
-
- /** Evaluate a loop a fixed # of times */
- final static Tuple loop ( Tree e, Environment env ) throws Exception {
- match e {
- case Loop(lambda(tuple(...vs),tuple(...bs)),tuple(...ss),`num):
- int limit = ((MR_int)evalE(num,env)).get();
- MR_rdd[] s = new MR_rdd[vs.length()];
- for ( int i = 0; i < vs.length(); i++ )
- s[i] = new MR_rdd(eval(ss.nth(i),env));
- for ( int n = 0; n < limit; n++ ) {
- Environment nenv = env;
- for ( int i = 0; i < vs.length(); i ++ )
- nenv = new Environment(vs.nth(i).toString(),s[i],nenv);
- for ( int i = 0; i < vs.length(); i ++ )
- s[i] = new MR_rdd(eval(bs.nth(i),nenv));
- };
- return new Tuple(s);
- };
- throw new Error("Wrong Loop format");
- }
-
- private static Bag bag ( final List<MRData> s ) {
- final Iterator<MRData> i = s.iterator();
- return new Bag(new BagIterator() {
- public MRData next () {
- return i.next();
- }
- public boolean hasNext () {
- return i.hasNext();
- }
- });
- }
-
- final static TaskContext context = new TaskContext(0,0,0,null);
-
- /** Convert a Spark RDD into a lazy bag
- * @param rdd the Spark RDD
- * @return a lazy bag that contains all RDD elements
- */
- public static Bag bag ( final JavaRDD<MRData> rdd ) {
- final JavaRDD<MRData> rd = rdd.cache();
- rd.count(); // force the evaluation of all pending Spark operations
- final List<Partition> ps = rd.splits();
- return new Bag(new BagIterator() {
- Iterator<MRData> i = null;
- int c = 0;
- public MRData next () {
- return i.next();
- }
- public boolean hasNext () {
- do {
- if (i != null && i.hasNext())
- return true;
- if (c >= ps.size())
- return false;
- i = rdd.iterator(ps.get(c++),context);
- } while (true);
- }
- });
- }
-
- private final static Function<MRData,MRData> get_first
- = new Function<MRData,MRData>() {
- public MRData call ( MRData value ) {
- return ((Tuple)value).first();
- };
- };
-
- /** Evaluate an MRQL physical plan using Spark
- * @param e the physical plan
- * @param env contains bindings fro variables to values (MRData)
- * @param counter the name of the counter used in loops
- * @return a DataSet
- */
- final static DataSet eval ( final Tree e,
- final Environment env,
- final String counter ) {
- JavaRDD<MRData> rd = eval(e,env);
- int count = 0;
- if (!counter.equals("-")) {
- final Accumulator<Integer> c = spark_context.intAccumulator(0);
- rd = rd.cache();
- rd.foreach(new VoidFunction<MRData>() {
- public void call ( MRData value ) {
- if (((MR_bool)((Tuple)value).second()).get())
- c.add(1); // count the true results (the results that need another loop step)
- }
- });
- count = c.value();
- rd = rd.map(get_first);
- };
- return new DataSet(new RDDDataSource(rd),count,0);
- }
-
- /** Evaluate an MRQL physical plan using Spark and print tracing info
- * @param e the physical plan
- * @param env contains bindings from variables to values (MRData)
- * @return a Spark RDD
- */
- final static JavaRDD<MRData> eval ( final Tree e, final Environment env ) {
- if (Config.trace_execution) {
- tab_count += 3;
- System.out.println(tabs(tab_count)+print_query(e));
- };
- final JavaRDD<MRData> res = evalD(e,env);
- if (Config.trace_execution)
- try {
- System.out.println(tabs(tab_count)+"-> "+res.collect());
- tab_count -= 3;
- } catch (Exception ex) {
- throw new Error("Cannot collect the operator output: "+e);
- };
- return res;
- }
-
- /* convert an MRQL lambda to a Spark Function */
- private static FlatMapFunction<MRData,MRData> cmap_fnc ( Tree fnc, Environment env ) {
- final org.apache.mrql.Function f = evalF(fnc,env);
- return new FlatMapFunction<MRData,MRData>() {
- public Iterable<MRData> call ( MRData value ) {
- return (Bag)f.eval(value);
- }
- };
- }
-
- /* group-by s and then reduce by fnc; if o is true, sort after group-by */
- private static JavaRDD<MRData> groupBy ( JavaRDD<MRData> s, Tree fnc, Environment env, Tree o ) {
- match o {
- case true: // the result must be sorted
- return s.groupBy(get_first)
- .sortByKey()
- .map(new Function<Tuple2<MRData,List<MRData>>,MRData>() {
- public MRData call ( Tuple2<MRData,List<MRData>> value ) {
- return new Tuple(value._1,bag(value._2));
- }
- })
- .flatMap(cmap_fnc(fnc,env)).map(new Function<MRData,MRData>() {
- public MRData call ( MRData value ) {
- return ((Tuple)value).second();
- }
- });
- };
- return s.groupBy(get_first)
- .map(new Function<Tuple2<MRData,List<MRData>>,MRData> () {
- public MRData call ( Tuple2<MRData,List<MRData>> value ) {
- final Iterator<MRData> i = value._2.iterator();
- return new Tuple(value._1,new Bag(new BagIterator() {
- public MRData next () {
- return ((Tuple)i.next()).second();
- }
- public boolean hasNext () {
- return i.hasNext();
- }
- }));
- }
- })
- .flatMap(cmap_fnc(fnc,env));
- }
-
- private static JavaRDD<MRData> containerData ( JavaPairRDD<MRContainer,MRContainer> rd ) {
- final Environment env = Interpreter.global_env;
- return rd.map(new Function<Tuple2<MRContainer,MRContainer>,MRData>() {
- // need to pass the global bindings (the in-memory repeat vars) to workers
- boolean first = true;
- public MRData call ( Tuple2<MRContainer,MRContainer> value ) {
- if (first) {
- // need to pass the global bindings (the in-memory repeat vars) to workers
- Interpreter.set_global_bindings(env);
- first = false;
- };
- return value._2.data();
- }
- });
- }
-
- private static Iterable<Tuple2<MRData,MRData>> joinIterator ( final Iterator<MRData> i ) {
- return new Iterable<Tuple2<MRData,MRData>>() {
- public Iterator<Tuple2<MRData,MRData>> iterator() {
- return new Iterator<Tuple2<MRData,MRData>> () {
- public Tuple2<MRData,MRData> next () {
- Tuple data = (Tuple)i.next();
- return new Tuple2<MRData,MRData>(data.first(),data.second());
- }
- public boolean hasNext () {
- return i.hasNext();
- }
- public void remove () {}
- };
- }
- };
- }
-
- private static FlatMapFunction<Iterator<MRData>,MRData> combiner_fnc ( final org.apache.mrql.Function f ) {
- return new FlatMapFunction<Iterator<MRData>,MRData>() {
- public Iterable<MRData> call ( final Iterator<MRData> i ) {
- return MapReduceAlgebra.cmap(new org.apache.mrql.Function() {
- public MRData eval ( MRData x ) {
- final MRData key = ((Tuple)x).first();
- final Iterator<MRData> it = ((Bag)f.eval(x)).iterator();
- return new Bag(new BagIterator() {
- public MRData next () {
- return new Tuple(key,it.next());
- }
- public boolean hasNext () {
- return it.hasNext();
- }
- });
- }},
- MapReduceAlgebra.groupBy(new Bag(new BagIterator() {
- public MRData next () {
- return i.next();
- }
- public boolean hasNext () {
- return i.hasNext();
- }
- })));
- }
- };
- }
-
- private static Hashtable<MRData,Bag> make_built_table ( List<Tuple2<MRData,MRData>> values ) {
- Hashtable<MRData,Bag> built_table = new Hashtable<MRData,Bag>(Config.map_cache_size);
- for ( Tuple2<MRData,MRData> t: values ) {
- Bag entries = built_table.get(t._1);
- built_table.put(t._1,
- (entries == null)
- ? (new Bag(t._2))
- : entries.add_element(t._2));
- };
- return built_table;
- }
-
- /** Evaluate MRQL physical operators using Spark
- * @param e the physical plan
- * @param env contains bindings fro variables to values (MRData)
- * @return a Spark RDD
- */
- final static JavaRDD<MRData> evalD ( final Tree e, final Environment env ) {
- try {
- match e {
- case MapAggregateReduce(`m,`r,null,_,`s,`o):
- return evalD(#<MapReduce(`m,`r,`s,`o)>,env);
- case CrossAggregateProduct(`mx,`my,`r,null,_,`x,`y):
- return evalD(#<CrossProduct(`mx,`my,`r,`x,`y)>,env);
- case MapAggregateReduce2(`mx,`my,`r,null,_,`x,`y,`o):
- return evalD(#<MapReduce2(`mx,`my,`r,`x,`y,`o)>,env);
- case MapAggregateJoin(`mx,`my,`r,null,_,`x,`y):
- return evalD(#<MapJoin(`mx,`my,`r,`x,`y)>,env);
- case cMap(`f,`s):
- return eval(s,env).flatMap(cmap_fnc(f,env));
- case MapReduce(`m,`r,`s,`o):
- return groupBy(eval(s,env).flatMap(cmap_fnc(m,env)),r,env,o);
- case MapCombineReduce(`m,`c,`r,`s,`o):
- return groupBy(eval(s,env).flatMap(cmap_fnc(m,env))
- .mapPartitions(combiner_fnc(evalF(c,env))),r,env,o);
- case MapCombineReduce2(`mx,`my,`c,`r,`x,`y,`o):
- return evalD(#<MapReduce2(`mx,`my,`r,`x,`y,`o)>,env).mapPartitions(combiner_fnc(evalF(c,env)));
- case CrossProduct(`mx,`my,`r,`x,`y):
- final org.apache.mrql.Function fr = evalF(r,env);
- return eval(x,env)
- .flatMap(cmap_fnc(mx,env))
- .cartesian(eval(y,env).flatMap(cmap_fnc(my,env)))
- .flatMap(new FlatMapFunction<Tuple2<MRData,MRData>,MRData>() {
- public Iterable<MRData> call ( Tuple2<MRData,MRData> value ) {
- return (Bag)fr.eval(new Tuple(value._1,value._2));
- }
- });
- case MapReduce2(`mx,`my,`r,`x,`y,`o):
- final org.apache.mrql.Function fx = evalF(mx,env);
- final org.apache.mrql.Function fy = evalF(my,env);
- final org.apache.mrql.Function fr = evalF(r,env);
- JavaPairRDD<MRData,MRData> xs
- = eval(x,env).flatMap(new PairFlatMapFunction<MRData,MRData,MRData>() {
- public Iterable<Tuple2<MRData,MRData>> call ( MRData value ) {
- return joinIterator(((Bag)fx.eval(value)).iterator());
- }
- });
- JavaPairRDD<MRData,MRData> ys
- = eval(y,env).flatMap(new PairFlatMapFunction<MRData,MRData,MRData>() {
- public Iterable<Tuple2<MRData,MRData>> call ( MRData value ) {
- return joinIterator(((Bag)fy.eval(value)).iterator());
- }
- });
- return xs.cogroup(ys)
- .flatMap(new FlatMapFunction<Tuple2<MRData,Tuple2<List<MRData>,List<MRData>>>,MRData>() {
- public Iterable<MRData> call ( Tuple2<MRData,Tuple2<List<MRData>,List<MRData>>> value ) {
- return (Bag)fr.eval(new Tuple(bag(value._2._1),bag(value._2._2)));
- }
- });
- case GroupByJoin(`kx,`ky,`gx,`gy,`mp,`c,`r,`x,`y,`o):
- final int n = 10;
- final int m = 10;
- final org.apache.mrql.Function fkx = evalF(kx,env);
- final org.apache.mrql.Function fky = evalF(ky,env);
- final org.apache.mrql.Function fgx = evalF(gx,env);
- final org.apache.mrql.Function fgy = evalF(gy,env);
- final org.apache.mrql.Function fm = evalF(mp,env);
- final org.apache.mrql.Function fc = evalF(c,env);
- final org.apache.mrql.Function fr = evalF(r,env);
- JavaPairRDD<MRData,MRData> xs
- = eval(x,env).flatMap(new PairFlatMapFunction<MRData,MRData,MRData>() {
- public Iterable<Tuple2<MRData,MRData>> call ( final MRData value ) {
- return new Iterable<Tuple2<MRData,MRData>>() {
- public Iterator<Tuple2<MRData,MRData>> iterator() {
- return new Iterator<Tuple2<MRData,MRData>>() {
- int i = 0;
- public Tuple2<MRData,MRData> next () {
- MRData key = new MR_int((fgx.eval(value).hashCode() % m)+m*i);
- i++;
- return new Tuple2<MRData,MRData>(key,new Tuple(fkx.eval(value),value));
- }
- public boolean hasNext () {
- return i < n;
- }
- public void remove () {}
- };
- }
- };
- }
- });
- JavaPairRDD<MRData,MRData> ys
- = eval(y,env).flatMap(new PairFlatMapFunction<MRData,MRData,MRData>() {
- public Iterable<Tuple2<MRData,MRData>> call ( final MRData value ) {
- return new Iterable<Tuple2<MRData,MRData>>() {
- public Iterator<Tuple2<MRData,MRData>> iterator() {
- return new Iterator<Tuple2<MRData,MRData>>() {
- int i = 0;
- public Tuple2<MRData,MRData> next () {
- MRData key = new MR_int((fgy.eval(value).hashCode() % n)*m+i);
- i++;
- return new Tuple2<MRData,MRData>(key,new Tuple(fky.eval(value),value));
- }
- public boolean hasNext () {
- return i < m;
- }
- public void remove () {}
- };
- }
- };
- }
- });
- return xs.cogroup(ys)
- .flatMap(new FlatMapFunction<Tuple2<MRData,Tuple2<List<MRData>,List<MRData>>>,MRData>() {
- public Iterable<MRData> call ( Tuple2<MRData,Tuple2<List<MRData>,List<MRData>>> value ) {
- final Iterator<MRData> i = (MapReduceAlgebra.mergeGroupByJoin(fkx,fky,fgx,fgy,fm,fc,fr,
- bag(value._2._1),bag(value._2._2))).iterator();
- return new Iterable<MRData>() {
- public Iterator<MRData> iterator() {
- return new Iterator<MRData>() {
- public MRData next () {
- return i.next();
- }
- public boolean hasNext () {
- return i.hasNext();
- }
- public void remove () {}
- };
- }
- };
- }
- });
- case MapJoin(`mx,`my,`r,`x,`y):
- final org.apache.mrql.Function fx = evalF(mx,env);
- final org.apache.mrql.Function fy = evalF(my,env);
- final org.apache.mrql.Function fr = evalF(r,env);
- final Broadcast<List<Tuple2<MRData,MRData>>> ys
- = spark_context.broadcast(eval(y,env).flatMap(new PairFlatMapFunction<MRData,MRData,MRData>() {
- public Iterable<Tuple2<MRData,MRData>> call ( MRData value ) {
- return joinIterator(((Bag)fy.eval(value)).iterator());
- }
- }).collect());
- return eval(x,env).flatMap(new FlatMapFunction<MRData,MRData>() {
- final Hashtable<MRData,Bag> built_table = make_built_table(ys.value());
- public Iterable<MRData> call ( MRData value ) {
- final Iterator<MRData> i = ((Bag)fx.eval(value)).iterator();
- return new Iterable<MRData>() {
- public Iterator<MRData> iterator() {
- return new Iterator<MRData>() {
- Tuple p;
- Iterator<MRData> ix = null;
- public MRData next () {
- return ix.next();
- }
- public boolean hasNext () {
- if (ix != null && ix.hasNext())
- return true;
- while (i.hasNext()) {
- p = (Tuple)i.next();
- MRData pd = built_table.get(p.first());
- if (pd == null)
- continue;
- Bag bb = ((Bag)fr.eval(new Tuple(p.second(),pd)));
- ix = bb.iterator();
- if (ix.hasNext())
- return true;
- };
- return false;
- }
- public void remove () {}
- };
- }
- };
- }
- });
- case BinarySource(`file,_):
- String path = ((MR_string)evalE(file,env)).get();
- new BinaryDataSource(path,Plan.conf);
- return containerData(spark_context.sequenceFile(file.stringValue(),
- MRContainer.class,MRContainer.class,
- Config.nodes));
- case ParsedSource(`parser,`file,...args):
- String path = ((MR_string)evalE(file,env)).get();
- Class<? extends Parser> p = DataSource.parserDirectory.get(parser.toString());
- if (p == null)
- throw new Error("Unknown parser: "+parser);
- new ParsedDataSource(path,p,args,Plan.conf);
- dump_source_dir();
- return containerData(spark_context.hadoopFile(path,ParsedInputFormat.class,
- MRContainer.class,MRContainer.class,
- Config.nodes));
- case Merge(`x,`y):
- return eval(x,env).union(eval(y,env));
- case Repeat(lambda(`v,`b),`s,`n):
- int max_num = ((MR_int)evalE(n,env)).get();
- JavaRDD<MRData> rd;
- JavaRDD<MRData> res = eval(s,env).cache();
- int i = 0;
- boolean cont = true;
- do {
- rd = eval(b,new Environment(v.toString(),new MR_rdd(res),env)).cache();
- res = rd.map(get_first).cache();
- Integer true_results
- = rd.aggregate(new Integer(0),
- new Function2<Integer,MRData,Integer>() {
- public Integer call ( Integer x, MRData y ) {
- return (((MR_bool)((Tuple)y).second()).get()) ? x+1 : x;
- }
- },
- new Function2<Integer,Integer,Integer>() {
- public Integer call ( Integer x, Integer y ) { return x+y; }
- });
- i++;
- cont = true_results > 0 && i <= max_num;
- System.err.println("*** Repeat #"+i+": "+true_results+" true results");
- } while (cont);
- return res;
- case Closure(lambda(`v,`b),`s,`m):
- int max_num = ((MR_int)evalE(m,env)).get();
- JavaRDD<MRData> res = eval(s,env).cache();
- long n = 0;
- long old = 0;
- int i = 0;
- boolean cont = true;
- do {
- res = eval(b,new Environment(v.toString(),new MR_rdd(res),env)).cache();
- old = n;
- n = res.count();
- i++;
- System.err.println("*** Repeat #"+i+": "+(old-n)+" new records");
- } while (old < n && i <= max_num);
- return res;
- case Generator(`min,`max,`size):
- DataSet ds = Plan.generator(((MR_long)evalE(min,env)).get(),
- ((MR_long)evalE(max,env)).get(),
- ((MR_long)evalE(size,env)).get());
- JavaRDD<MRData> rd = null;
- for ( DataSource d: ds.source )
- if (rd == null)
- rd = containerData(spark_context.hadoopFile(d.path,GeneratorInputFormat.class,
- MRContainer.class,MRContainer.class,1));
- else rd = rd.union(containerData(spark_context.hadoopFile(d.path,GeneratorInputFormat.class,
- MRContainer.class,MRContainer.class,1)));
- return rd;
- case let(`v,`u,`body):
- return eval(body,new Environment(v.toString(),evalE(u,env),env));
- case Let(`v,`u,`body):
- return eval(body,new Environment(v.toString(),new MR_rdd(eval(u,env)),env));
- case If(`c,`x,`y):
- if (((MR_bool)evalE(c,env)).get())
- return eval(x,env);
- else return eval(y,env);
- case `v:
- if (!v.is_variable())
- fail;
- MRData x = variable_lookup(v.toString(),env);
- if (x != null)
- if (x instanceof MR_rdd)
- return ((MR_rdd)x).rdd();
- x = variable_lookup(v.toString(),global_env);
- if (x != null)
- if (x instanceof MR_rdd)
- return ((MR_rdd)x).rdd();
- throw new Error("Variable "+v+" is not bound");
- };
- throw new Error("Cannot evaluate the Spark plan: "+e);
- } catch (Error msg) {
- if (!Config.trace)
- throw new Error(msg.getMessage());
- System.err.println(msg.getMessage());
- throw new Error("Evaluation error in: "+print_query(e));
- } catch (Exception ex) {
- System.err.println(ex.getMessage());
- ex.printStackTrace();
- throw new Error("Evaluation error in: "+print_query(e));
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/17a0ed95/src/main/java/spark/GeneratorInputFormat.java
----------------------------------------------------------------------
diff --git a/src/main/java/spark/GeneratorInputFormat.java b/src/main/java/spark/GeneratorInputFormat.java
deleted file mode 100644
index c9e6066..0000000
--- a/src/main/java/spark/GeneratorInputFormat.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.mrql;
-
-import java.io.*;
-import java.util.Iterator;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.*;
-
-
-/** the FileInputFormat for data generators: it creates HDFS files, where each file contains
- * an (offset,size) pair that generates the range of values [offset,offset+size] */
-final public class GeneratorInputFormat extends MRQLFileInputFormat {
- public static class GeneratorRecordReader implements RecordReader<MRContainer,MRContainer> {
- final long offset;
- final long size;
- long index;
- SequenceFile.Reader reader;
-
- public GeneratorRecordReader ( FileSplit split,
- Configuration conf ) throws IOException {
- Path path = split.getPath();
- FileSystem fs = path.getFileSystem(conf);
- reader = new SequenceFile.Reader(path.getFileSystem(conf),path,conf);
- MRContainer key = new MRContainer();
- MRContainer value = new MRContainer();
- reader.next(key,value);
- offset = ((MR_long)((Tuple)(value.data())).first()).get();
- size = ((MR_long)((Tuple)(value.data())).second()).get();
- index = -1;
- }
-
- public MRContainer createKey () {
- return new MRContainer(null);
- }
-
- public MRContainer createValue () {
- return new MRContainer(null);
- }
-
- public boolean next ( MRContainer key, MRContainer value ) throws IOException {
- index++;
- value.set(new MR_long(offset+index));
- key.set(new MR_long(index));
- return index < size;
- }
-
- public long getPos () throws IOException { return index; }
-
- public void close () throws IOException { reader.close(); }
-
- public float getProgress () throws IOException {
- return index / (float)size;
- }
- }
-
- public RecordReader<MRContainer,MRContainer>
- getRecordReader ( InputSplit split, JobConf job, Reporter reporter ) throws IOException {
- return (RecordReader<MRContainer,MRContainer>)
- new GeneratorRecordReader((FileSplit)split,job);
- }
-
- /** Insert all results from the generators stored in path into a Bag.
- * The Bag is lazily constructed.
- * @param path the path directory that contains the generator data (offset,size)
- * @return a Bag that contains all data
- */
- public Bag materialize ( final Path path ) throws IOException {
- Configuration conf = Plan.conf;
- FileSystem fs = path.getFileSystem(conf);
- final SequenceFile.Reader reader = new SequenceFile.Reader(path.getFileSystem(conf),path,conf);
- final MRContainer key = new MRContainer();
- final MRContainer value = new MRContainer();
- return new Bag(new BagIterator () {
- long offset = 0;
- long size = 0;
- long i = 0;
- public boolean hasNext () {
- if (++i >= offset+size)
- try {
- if (!reader.next(key,value))
- return false;
- offset = ((MR_long)((Tuple)(value.data())).first()).get();
- size = ((MR_long)((Tuple)(value.data())).second()).get();
- i = offset;
- } catch (IOException e) {
- throw new Error("Cannot collect values from a generator");
- };
- return true;
- }
- public MRData next () {
- return new MR_long(i);
- }
- });
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/17a0ed95/src/main/java/spark/MRQLFileInputFormat.java
----------------------------------------------------------------------
diff --git a/src/main/java/spark/MRQLFileInputFormat.java b/src/main/java/spark/MRQLFileInputFormat.java
deleted file mode 100644
index 73b0910..0000000
--- a/src/main/java/spark/MRQLFileInputFormat.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.mrql;
-
-import java.io.*;
-import java.util.Iterator;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.mapred.*;
-
-
-/** A superclass for all MRQL FileInputFormats */
-abstract public class MRQLFileInputFormat extends FileInputFormat<MRContainer,MRContainer> {
- public MRQLFileInputFormat () {}
-
- /** record reader for spark */
- abstract public RecordReader<MRContainer,MRContainer>
- getRecordReader ( InputSplit split, JobConf job, Reporter reporter ) throws IOException;
-
- /** materialize the input file into a memory Bag */
- public Bag materialize ( final Path file ) throws Exception {
- final JobConf job = new JobConf(Plan.conf,MRQLFileInputFormat.class);
- setInputPaths(job,file);
- final InputSplit[] splits = getSplits(job,1);
- final Reporter reporter = null;
- final RecordReader<MRContainer,MRContainer> rd = getRecordReader(splits[0],job,reporter);
- return new Bag(new BagIterator () {
- RecordReader<MRContainer,MRContainer> reader = rd;
- MRContainer key = reader.createKey();
- MRContainer value = reader.createKey();
- int i = 0;
- public boolean hasNext () {
- try {
- if (reader.next(key,value))
- return true;
- do {
- if (++i >= splits.length)
- return false;
- reader.close();
- reader = getRecordReader(splits[i],job,reporter);
- } while (!reader.next(key,value));
- return true;
- } catch (IOException e) {
- throw new Error("Cannot collect values from an intermediate result");
- }
- }
- public MRData next () {
- return value.data();
- }
- });
- }
-
- /** materialize the entire dataset into a Bag
- * @param x the DataSet in HDFS to collect values from
- * @param strip is not used in MapReduce mode
- * @return the Bag that contains the collected values
- */
- public final static Bag collect ( final DataSet x, boolean strip ) throws Exception {
- Bag res = new Bag();
- for ( DataSource s: x.source )
- if (s instanceof RDDDataSource)
- res = res.union(Evaluator.bag(((RDDDataSource)s).rdd));
- else if (s.to_be_merged)
- res = res.union(Plan.merge(s));
- else res = res.union(s.inputFormat.newInstance().materialize(new Path(s.path)));
- return res;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/17a0ed95/src/main/java/spark/ParsedInputFormat.java
----------------------------------------------------------------------
diff --git a/src/main/java/spark/ParsedInputFormat.java b/src/main/java/spark/ParsedInputFormat.java
deleted file mode 100644
index 8ca7db6..0000000
--- a/src/main/java/spark/ParsedInputFormat.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.mrql;
-
-import org.apache.mrql.gen.Trees;
-import java.io.*;
-import java.util.Iterator;
-import org.apache.hadoop.fs.*;
-import org.apache.hadoop.io.*;
-import org.apache.hadoop.mapred.*;
-import org.apache.hadoop.conf.Configuration;
-
-
-/** A FileInputFormat for text files (CVS, XML, JSON, ...) */
-final public class ParsedInputFormat extends MRQLFileInputFormat {
- public static class ParsedRecordReader implements RecordReader<MRContainer,MRContainer> {
- final FSDataInputStream fsin;
- final long start;
- final long end;
- Iterator<MRData> result;
- Parser parser;
-
- public ParsedRecordReader ( FileSplit split,
- Configuration conf,
- Class<? extends Parser> parser_class,
- Trees args ) throws IOException {
- start = split.getStart();
- end = start + split.getLength();
- Path file = split.getPath();
- FileSystem fs = file.getFileSystem(conf);
- fsin = fs.open(split.getPath());
- try {
- parser = parser_class.newInstance();
- } catch (Exception ex) {
- throw new Error("Unrecognized parser:"+parser_class);
- };
- parser.initialize(args);
- parser.open(fsin,start,end);
- result = null;
- }
-
- public MRContainer createKey () {
- return new MRContainer();
- }
-
- public MRContainer createValue () {
- return new MRContainer();
- }
-
- public synchronized boolean next ( MRContainer key, MRContainer value ) throws IOException {
- while (result == null || !result.hasNext()) {
- String s = parser.slice();
- if (s == null)
- return false;
- result = parser.parse(s).iterator();
- };
- value.set((MRData)result.next());
- key.set(new MR_long(fsin.getPos()));
- return true;
- }
-
- public synchronized long getPos () throws IOException { return fsin.getPos(); }
-
- public synchronized void close () throws IOException { fsin.close(); }
-
- public float getProgress () throws IOException {
- if (end == start)
- return 0.0f;
- else return Math.min(1.0f, (getPos() - start) / (float)(end - start));
- }
- }
-
- public RecordReader<MRContainer,MRContainer>
- getRecordReader ( InputSplit split,
- JobConf job,
- Reporter reporter ) throws IOException {
- Evaluator.load_source_dir(); // load the parsed source parameters from a file
- String path = ((FileSplit)split).getPath().toString();
- ParsedDataSource ds = (ParsedDataSource)DataSource.get(path,Plan.conf);
- return new ParsedRecordReader((FileSplit)split,job,ds.parser,(Trees)ds.args);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/17a0ed95/src/main/java/spark/SparkBinaryInputFormat.java
----------------------------------------------------------------------
diff --git a/src/main/java/spark/SparkBinaryInputFormat.java b/src/main/java/spark/SparkBinaryInputFormat.java
new file mode 100644
index 0000000..f11c796
--- /dev/null
+++ b/src/main/java/spark/SparkBinaryInputFormat.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import java.io.*;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapred.*;
+
+
+/** Input format for hadoop sequence files */
+final public class SparkBinaryInputFormat extends SparkMRQLFileInputFormat {
+ public static class BinaryInputRecordReader extends SequenceFileRecordReader<MRContainer,MRContainer> {
+ final MRContainer result = new MRContainer();
+
+ public BinaryInputRecordReader ( FileSplit split,
+ JobConf job ) throws IOException {
+ super(job,split);
+ }
+
+ @Override
+ public synchronized boolean next ( MRContainer key, MRContainer value ) throws IOException {
+ boolean b = super.next(key,result);
+ value.set(result.data());
+ return b;
+ }
+ }
+
+ @Override
+ public RecordReader<MRContainer,MRContainer>
+ getRecordReader ( InputSplit split,
+ JobConf job,
+ Reporter reporter ) throws IOException {
+ String path = ((FileSplit)split).getPath().toString();
+ BinaryDataSource ds = (BinaryDataSource)DataSource.get(path,job);
+ return new BinaryInputRecordReader((FileSplit)split,job);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/17a0ed95/src/main/java/spark/SparkEvaluator.gen
----------------------------------------------------------------------
diff --git a/src/main/java/spark/SparkEvaluator.gen b/src/main/java/spark/SparkEvaluator.gen
new file mode 100644
index 0000000..54d4da0
--- /dev/null
+++ b/src/main/java/spark/SparkEvaluator.gen
@@ -0,0 +1,713 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import java_cup.runtime.*;
+import org.apache.mrql.gen.*;
+import java_cup.runtime.Scanner;
+import java.util.*;
+import java.io.*;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.FileInputFormat;
+import scala.Tuple2;
+import spark.TaskContext;
+import spark.Partition;
+import spark.Accumulator;
+import spark.broadcast.Broadcast;
+import spark.api.java.JavaRDD;
+import spark.api.java.JavaPairRDD;
+import spark.api.java.JavaSparkContext;
+import spark.api.java.function.Function2;
+import spark.api.java.function.PairFunction;
+import spark.api.java.function.FlatMapFunction;
+import spark.api.java.function.PairFlatMapFunction;
+import spark.api.java.function.Function;
+import spark.api.java.function.VoidFunction;
+
+
+/** Evaluates physical plans in Apache Spark mode */
+final public class SparkEvaluator extends Evaluator implements Serializable {
+ static JavaSparkContext spark_context;
+ final static String spark_jar = SparkEvaluator.class.getProtectionDomain().getCodeSource().getLocation().toString();
+ final static String core_jar = Interpreter.class.getProtectionDomain().getCodeSource().getLocation().toString();
+ final static String gen_jar = Tree.class.getProtectionDomain().getCodeSource().getLocation().toString();
+ final static String scanner_jar = Scanner.class.getProtectionDomain().getCodeSource().getLocation().toString();
+ static String data_source_directory;
+ final static Bag empty_bag = new Bag();
+
+ /** initialize the Spark evaluator */
+ final public void init ( Configuration conf ) {
+ Config.spark_mode = true;
+ try {
+ if (Config.hadoop_mode && Config.local_mode)
+ spark_context = new JavaSparkContext("local["+Config.nodes+"]",
+ "Apache MRQL Local Spark Evaluator");
+ else if (Config.hadoop_mode) {
+ HashMap<String,String> env = new HashMap<String,String>();
+ data_source_directory = System.getenv("FS_DEFAULT_NAME")+"/"+Plan.new_path(conf);
+ env.put("mrql.data.source.directory",data_source_directory);
+ spark_context = new JavaSparkContext(System.getenv("SPARK_MASTER"),
+ "Apache MRQL Spark Evaluator",
+ System.getenv("SPARK_HOME"),
+ new String[]{spark_jar,core_jar,gen_jar,scanner_jar},
+ env);
+ Plan.conf = spark_context.hadoopConfiguration();
+ FileSystem.setDefaultUri(Plan.conf,System.getenv("FS_DEFAULT_NAME"));
+ }
+ } catch (IOException ex) {
+ throw new Error(ex);
+ }
+ }
+
+ /** shutdown the Spark evaluator */
+ final public void shutdown ( Configuration conf ) {
+ spark_context.stop();
+ spark_context = null;
+ System.clearProperty("spark.driver.port");
+ }
+
+ final public void initialize_query () {
+ spark_context.clearJars();
+ spark_context.addJar(spark_jar);
+ spark_context.addJar(core_jar);
+ spark_context.addJar(gen_jar);
+ spark_context.addJar(scanner_jar);
+ Plan.distribute_compiled_arguments(Plan.conf);
+ if (Config.compile_functional_arguments)
+ spark_context.addJar(Plan.conf.get("mrql.jar.path"));
+ }
+
+ final public Configuration new_configuration () {
+ return new Configuration();
+ }
+
+ final public MR_bool synchronize ( MR_string peerName, MR_bool mr_exit ) {
+ throw new Error("You can only synchronize BSP tasks");
+ }
+
+ final public Bag distribute ( MR_string peerName, Bag s ) {
+ throw new Error("You can only distribute bags among BSP tasks");
+ }
+
+ final public Bag collect ( final DataSet x ) throws Exception {
+ return Plan.collect(x);
+ }
+
+ final public MRData bsp ( Tree plan, Environment env ) throws Exception {
+ throw new Error("You can not run a BSP task in Spark mode");
+ }
+
+ /** return the FileInputFormat for parsed files (CSV, XML, JSON, etc) */
+ final public Class<? extends MRQLFileInputFormat> parsedInputFormat () {
+ return SparkParsedInputFormat.class;
+ }
+
+ /** return the FileInputFormat for binary files */
+ final public Class<? extends MRQLFileInputFormat> binaryInputFormat () {
+ return SparkBinaryInputFormat.class;
+ }
+
+ /** return the FileInputFormat for data generator files */
+ final public Class<? extends MRQLFileInputFormat> generatorInputFormat () {
+ return SparkGeneratorInputFormat.class;
+ }
+
+ /** used by the master to send parsing details (eg, record types) to workers */
+ public static void dump_source_dir () throws IOException {
+ if (Config.local_mode)
+ return;
+ DataSource.dataSourceDirectory.distribute(Plan.conf);
+ Path path = new Path(data_source_directory);
+ FileSystem fs = path.getFileSystem(Plan.conf);
+ PrintStream ps = new PrintStream(fs.create(path,true));
+ ps.println(Plan.conf.get("mrql.data.source.directory"));
+ ps.close();
+ }
+
+ /** executed by a worker when reading parsed input (see SparkParsedInputFormat) */
+ public static void load_source_dir () throws IOException {
+ if (Config.local_mode)
+ return;
+ if (Plan.conf == null)
+ Plan.conf = Evaluator.evaluator.new_configuration();
+ // the name of the file that contains the source directory details is passed
+ // to workers through the HashMap environment in the JavaSparkContext
+ Path path = new Path(System.getenv("mrql.data.source.directory"));
+ FileSystem fs = path.getFileSystem(Plan.conf);
+ BufferedReader ftp = new BufferedReader(new InputStreamReader(fs.open(path)));
+ Plan.conf.set("mrql.data.source.directory",ftp.readLine());
+ DataSource.dataSourceDirectory.read(Plan.conf);
+ ftp.close();
+ }
+
+ private static Function2<MRData,MRData,MRData> accumulator ( Tree acc_fnc, Environment env ) {
+ final org.apache.mrql.Function f = evalF(acc_fnc,env);
+ return new Function2<MRData,MRData,MRData>() {
+ public MRData call ( MRData x, MRData y ) {
+ return f.eval(new Tuple(x,y));
+ }
+ };
+ }
+
+ /** The Aggregate physical operator
+ * @param acc_fnc the accumulator function from (T,T) to T
+ * @param zero the zero element of type T
+ * @param plan the plan that constructs the dataset that contains the bag of values {T}
+ * @param env contains bindings fro variables to values (MRData)
+ * @return the aggregation result of type T
+ */
+ final public MRData aggregate ( Tree acc_fnc,
+ Tree zero,
+ Tree plan,
+ Environment env ) throws Exception {
+ Function2<MRData,MRData,MRData> f2 = accumulator(acc_fnc,env);
+ MRData z = evalE(zero,env);
+ match plan {
+ case AggregateMap(`m,`acc,_,`s):
+ return evalD(#<cMap(`m,`s)>,env)
+ .aggregate(z,accumulator(acc,env),f2);
+ case MapAggregateReduce(`m,`r,`acc,_,`s,`o):
+ if (acc.equals(#<null>))
+ fail;
+ return evalD(#<MapReduce(`m,`r,`s,`o)>,env)
+ .aggregate(z,accumulator(acc,env),f2);
+ case CrossAggregateProduct(`mx,`my,`r,`acc,_,`x,`y):
+ if (acc.equals(#<null>))
+ fail;
+ return evalD(#<CrossProduct(`mx,`my,`r,`x,`y)>,env)
+ .aggregate(z,accumulator(acc,env),f2);
+ case MapAggregateReduce2(`mx,`my,`r,`acc,_,`x,`y,`o):
+ if (acc.equals(#<null>))
+ fail;
+ return evalD(#<MapReduce2(`mx,`my,`r,`x,`y,`o)>,env)
+ .aggregate(z,accumulator(acc,env),f2);
+ case MapAggregateJoin(`mx,`my,`r,`acc,_,`x,`y):
+ if (acc.equals(#<null>))
+ fail;
+ return evalD(#<MapJoin(`mx,`my,`r,`x,`y)>,env)
+ .aggregate(z,accumulator(acc,env),f2);
+ };
+ throw new Error("Unrecognized aggregation: "+plan);
+ }
+
+ /** Evaluate a loop a fixed # of times */
+ final public Tuple loop ( Tree e, Environment env ) throws Exception {
+ match e {
+ case Loop(lambda(tuple(...vs),tuple(...bs)),tuple(...ss),`num):
+ int limit = ((MR_int)evalE(num,env)).get();
+ MR_rdd[] s = new MR_rdd[vs.length()];
+ for ( int i = 0; i < vs.length(); i++ )
+ s[i] = new MR_rdd(eval(ss.nth(i),env));
+ for ( int n = 0; n < limit; n++ ) {
+ Environment nenv = env;
+ for ( int i = 0; i < vs.length(); i ++ )
+ nenv = new Environment(vs.nth(i).toString(),s[i],nenv);
+ for ( int i = 0; i < vs.length(); i ++ )
+ s[i] = new MR_rdd(eval(bs.nth(i),nenv));
+ };
+ return new Tuple(s);
+ };
+ throw new Error("Wrong Loop format");
+ }
+
+ private static Bag bag ( final List<MRData> s ) {
+ final Iterator<MRData> i = s.iterator();
+ return new Bag(new BagIterator() {
+ public MRData next () {
+ return i.next();
+ }
+ public boolean hasNext () {
+ return i.hasNext();
+ }
+ });
+ }
+
+ final static TaskContext context = new TaskContext(0,0,0,null);
+
+ /** Convert a Spark RDD into a lazy bag
+ * @param rdd the Spark RDD
+ * @return a lazy bag that contains all RDD elements
+ */
+ public static Bag bag ( final JavaRDD<MRData> rdd ) {
+ final JavaRDD<MRData> rd = rdd.cache();
+ rd.count(); // force the evaluation of all pending Spark operations
+ final List<Partition> ps = rd.splits();
+ return new Bag(new BagIterator() {
+ Iterator<MRData> i = null;
+ int c = 0;
+ public MRData next () {
+ return i.next();
+ }
+ public boolean hasNext () {
+ do {
+ if (i != null && i.hasNext())
+ return true;
+ if (c >= ps.size())
+ return false;
+ i = rdd.iterator(ps.get(c++),context);
+ } while (true);
+ }
+ });
+ }
+
+ private final static Function<MRData,MRData> get_first
+ = new Function<MRData,MRData>() {
+ public MRData call ( MRData value ) {
+ return ((Tuple)value).first();
+ };
+ };
+
+ /** Evaluate an MRQL physical plan using Spark
+ * @param e the physical plan
+ * @param env contains bindings fro variables to values (MRData)
+ * @param counter the name of the counter used in loops
+ * @return a DataSet
+ */
+ final public DataSet eval ( final Tree e,
+ final Environment env,
+ final String counter ) {
+ JavaRDD<MRData> rd = eval(e,env);
+ int count = 0;
+ if (!counter.equals("-")) {
+ final Accumulator<Integer> c = spark_context.intAccumulator(0);
+ rd = rd.cache();
+ rd.foreach(new VoidFunction<MRData>() {
+ public void call ( MRData value ) {
+ if (((MR_bool)((Tuple)value).second()).get())
+ c.add(1); // count the true results (the results that need another loop step)
+ }
+ });
+ count = c.value();
+ rd = rd.map(get_first);
+ };
+ return new DataSet(new RDDDataSource(rd),count,0);
+ }
+
+ /** Evaluate an MRQL physical plan using Spark and print tracing info
+ * @param e the physical plan
+ * @param env contains bindings from variables to values (MRData)
+ * @return a Spark RDD
+ */
+ final public JavaRDD<MRData> eval ( final Tree e, final Environment env ) {
+ if (Config.trace_execution) {
+ tab_count += 3;
+ System.out.println(tabs(tab_count)+print_query(e));
+ };
+ final JavaRDD<MRData> res = evalD(e,env);
+ if (Config.trace_execution)
+ try {
+ System.out.println(tabs(tab_count)+"-> "+res.collect());
+ tab_count -= 3;
+ } catch (Exception ex) {
+ throw new Error("Cannot collect the operator output: "+e);
+ };
+ return res;
+ }
+
+ /* convert an MRQL lambda to a Spark Function */
+ private static FlatMapFunction<MRData,MRData> cmap_fnc ( Tree fnc, Environment env ) {
+ final org.apache.mrql.Function f = evalF(fnc,env);
+ return new FlatMapFunction<MRData,MRData>() {
+ public Iterable<MRData> call ( MRData value ) {
+ return (Bag)f.eval(value);
+ }
+ };
+ }
+
+ /* group-by s and then reduce by fnc; if o is true, sort after group-by */
+ private static JavaRDD<MRData> groupBy ( JavaRDD<MRData> s, Tree fnc, Environment env, Tree o ) {
+ match o {
+ case true: // the result must be sorted
+ return s.groupBy(get_first)
+ .sortByKey()
+ .map(new Function<Tuple2<MRData,List<MRData>>,MRData>() {
+ public MRData call ( Tuple2<MRData,List<MRData>> value ) {
+ return new Tuple(value._1,bag(value._2));
+ }
+ })
+ .flatMap(cmap_fnc(fnc,env)).map(new Function<MRData,MRData>() {
+ public MRData call ( MRData value ) {
+ return ((Tuple)value).second();
+ }
+ });
+ };
+ return s.groupBy(get_first)
+ .map(new Function<Tuple2<MRData,List<MRData>>,MRData> () {
+ public MRData call ( Tuple2<MRData,List<MRData>> value ) {
+ final Iterator<MRData> i = value._2.iterator();
+ return new Tuple(value._1,new Bag(new BagIterator() {
+ public MRData next () {
+ return ((Tuple)i.next()).second();
+ }
+ public boolean hasNext () {
+ return i.hasNext();
+ }
+ }));
+ }
+ })
+ .flatMap(cmap_fnc(fnc,env));
+ }
+
+ private static JavaRDD<MRData> containerData ( JavaPairRDD<MRContainer,MRContainer> rd ) {
+ final Environment env = Interpreter.global_env;
+ return rd.map(new Function<Tuple2<MRContainer,MRContainer>,MRData>() {
+ // need to pass the global bindings (the in-memory repeat vars) to workers
+ boolean first = true;
+ public MRData call ( Tuple2<MRContainer,MRContainer> value ) {
+ if (first) {
+ // need to pass the global bindings (the in-memory repeat vars) to workers
+ Interpreter.set_global_bindings(env);
+ first = false;
+ };
+ return value._2.data();
+ }
+ });
+ }
+
+ private static Iterable<Tuple2<MRData,MRData>> joinIterator ( final Iterator<MRData> i ) {
+ return new Iterable<Tuple2<MRData,MRData>>() {
+ public Iterator<Tuple2<MRData,MRData>> iterator() {
+ return new Iterator<Tuple2<MRData,MRData>> () {
+ public Tuple2<MRData,MRData> next () {
+ Tuple data = (Tuple)i.next();
+ return new Tuple2<MRData,MRData>(data.first(),data.second());
+ }
+ public boolean hasNext () {
+ return i.hasNext();
+ }
+ public void remove () {}
+ };
+ }
+ };
+ }
+
+ private static FlatMapFunction<Iterator<MRData>,MRData> combiner_fnc ( final org.apache.mrql.Function f ) {
+ return new FlatMapFunction<Iterator<MRData>,MRData>() {
+ public Iterable<MRData> call ( final Iterator<MRData> i ) {
+ return MapReduceAlgebra.cmap(new org.apache.mrql.Function() {
+ public MRData eval ( MRData x ) {
+ final MRData key = ((Tuple)x).first();
+ final Iterator<MRData> it = ((Bag)f.eval(x)).iterator();
+ return new Bag(new BagIterator() {
+ public MRData next () {
+ return new Tuple(key,it.next());
+ }
+ public boolean hasNext () {
+ return it.hasNext();
+ }
+ });
+ }},
+ MapReduceAlgebra.groupBy(new Bag(new BagIterator() {
+ public MRData next () {
+ return i.next();
+ }
+ public boolean hasNext () {
+ return i.hasNext();
+ }
+ })));
+ }
+ };
+ }
+
+ private static Hashtable<MRData,Bag> make_built_table ( List<Tuple2<MRData,MRData>> values ) {
+ Hashtable<MRData,Bag> built_table = new Hashtable<MRData,Bag>(Config.map_cache_size);
+ for ( Tuple2<MRData,MRData> t: values ) {
+ Bag entries = built_table.get(t._1);
+ built_table.put(t._1,
+ (entries == null)
+ ? (new Bag(t._2))
+ : entries.add_element(t._2));
+ };
+ return built_table;
+ }
+
+ /** Evaluate MRQL physical operators using Spark
+ * @param e the physical plan
+ * @param env contains bindings fro variables to values (MRData)
+ * @return a Spark RDD
+ */
+ final public JavaRDD<MRData> evalD ( final Tree e, final Environment env ) {
+ try {
+ match e {
+ case MapAggregateReduce(`m,`r,null,_,`s,`o):
+ return evalD(#<MapReduce(`m,`r,`s,`o)>,env);
+ case CrossAggregateProduct(`mx,`my,`r,null,_,`x,`y):
+ return evalD(#<CrossProduct(`mx,`my,`r,`x,`y)>,env);
+ case MapAggregateReduce2(`mx,`my,`r,null,_,`x,`y,`o):
+ return evalD(#<MapReduce2(`mx,`my,`r,`x,`y,`o)>,env);
+ case MapAggregateJoin(`mx,`my,`r,null,_,`x,`y):
+ return evalD(#<MapJoin(`mx,`my,`r,`x,`y)>,env);
+ case cMap(`f,`s):
+ return eval(s,env).flatMap(cmap_fnc(f,env));
+ case MapReduce(`m,`r,`s,`o):
+ return groupBy(eval(s,env).flatMap(cmap_fnc(m,env)),r,env,o);
+ case MapCombineReduce(`m,`c,`r,`s,`o):
+ return groupBy(eval(s,env).flatMap(cmap_fnc(m,env))
+ .mapPartitions(combiner_fnc(evalF(c,env))),r,env,o);
+ case MapCombineReduce2(`mx,`my,`c,`r,`x,`y,`o):
+ return evalD(#<MapReduce2(`mx,`my,`r,`x,`y,`o)>,env).mapPartitions(combiner_fnc(evalF(c,env)));
+ case CrossProduct(`mx,`my,`r,`x,`y):
+ final org.apache.mrql.Function fr = evalF(r,env);
+ return eval(x,env)
+ .flatMap(cmap_fnc(mx,env))
+ .cartesian(eval(y,env).flatMap(cmap_fnc(my,env)))
+ .flatMap(new FlatMapFunction<Tuple2<MRData,MRData>,MRData>() {
+ public Iterable<MRData> call ( Tuple2<MRData,MRData> value ) {
+ return (Bag)fr.eval(new Tuple(value._1,value._2));
+ }
+ });
+ case MapReduce2(`mx,`my,`r,`x,`y,`o):
+ final org.apache.mrql.Function fx = evalF(mx,env);
+ final org.apache.mrql.Function fy = evalF(my,env);
+ final org.apache.mrql.Function fr = evalF(r,env);
+ JavaPairRDD<MRData,MRData> xs
+ = eval(x,env).flatMap(new PairFlatMapFunction<MRData,MRData,MRData>() {
+ public Iterable<Tuple2<MRData,MRData>> call ( MRData value ) {
+ return joinIterator(((Bag)fx.eval(value)).iterator());
+ }
+ });
+ JavaPairRDD<MRData,MRData> ys
+ = eval(y,env).flatMap(new PairFlatMapFunction<MRData,MRData,MRData>() {
+ public Iterable<Tuple2<MRData,MRData>> call ( MRData value ) {
+ return joinIterator(((Bag)fy.eval(value)).iterator());
+ }
+ });
+ return xs.cogroup(ys)
+ .flatMap(new FlatMapFunction<Tuple2<MRData,Tuple2<List<MRData>,List<MRData>>>,MRData>() {
+ public Iterable<MRData> call ( Tuple2<MRData,Tuple2<List<MRData>,List<MRData>>> value ) {
+ return (Bag)fr.eval(new Tuple(bag(value._2._1),bag(value._2._2)));
+ }
+ });
+ case GroupByJoin(`kx,`ky,`gx,`gy,`mp,`c,`r,`x,`y,`o):
+ final int n = 10;
+ final int m = 10;
+ final org.apache.mrql.Function fkx = evalF(kx,env);
+ final org.apache.mrql.Function fky = evalF(ky,env);
+ final org.apache.mrql.Function fgx = evalF(gx,env);
+ final org.apache.mrql.Function fgy = evalF(gy,env);
+ final org.apache.mrql.Function fm = evalF(mp,env);
+ final org.apache.mrql.Function fc = evalF(c,env);
+ final org.apache.mrql.Function fr = evalF(r,env);
+ JavaPairRDD<MRData,MRData> xs
+ = eval(x,env).flatMap(new PairFlatMapFunction<MRData,MRData,MRData>() {
+ public Iterable<Tuple2<MRData,MRData>> call ( final MRData value ) {
+ return new Iterable<Tuple2<MRData,MRData>>() {
+ public Iterator<Tuple2<MRData,MRData>> iterator() {
+ return new Iterator<Tuple2<MRData,MRData>>() {
+ int i = 0;
+ public Tuple2<MRData,MRData> next () {
+ MRData key = new MR_int((fgx.eval(value).hashCode() % m)+m*i);
+ i++;
+ return new Tuple2<MRData,MRData>(key,new Tuple(fkx.eval(value),value));
+ }
+ public boolean hasNext () {
+ return i < n;
+ }
+ public void remove () {}
+ };
+ }
+ };
+ }
+ });
+ JavaPairRDD<MRData,MRData> ys
+ = eval(y,env).flatMap(new PairFlatMapFunction<MRData,MRData,MRData>() {
+ public Iterable<Tuple2<MRData,MRData>> call ( final MRData value ) {
+ return new Iterable<Tuple2<MRData,MRData>>() {
+ public Iterator<Tuple2<MRData,MRData>> iterator() {
+ return new Iterator<Tuple2<MRData,MRData>>() {
+ int i = 0;
+ public Tuple2<MRData,MRData> next () {
+ MRData key = new MR_int((fgy.eval(value).hashCode() % n)*m+i);
+ i++;
+ return new Tuple2<MRData,MRData>(key,new Tuple(fky.eval(value),value));
+ }
+ public boolean hasNext () {
+ return i < m;
+ }
+ public void remove () {}
+ };
+ }
+ };
+ }
+ });
+ return xs.cogroup(ys)
+ .flatMap(new FlatMapFunction<Tuple2<MRData,Tuple2<List<MRData>,List<MRData>>>,MRData>() {
+ public Iterable<MRData> call ( Tuple2<MRData,Tuple2<List<MRData>,List<MRData>>> value ) {
+ final Iterator<MRData> i = (MapReduceAlgebra.mergeGroupByJoin(fkx,fky,fgx,fgy,fm,fc,fr,
+ bag(value._2._1),bag(value._2._2))).iterator();
+ return new Iterable<MRData>() {
+ public Iterator<MRData> iterator() {
+ return new Iterator<MRData>() {
+ public MRData next () {
+ return i.next();
+ }
+ public boolean hasNext () {
+ return i.hasNext();
+ }
+ public void remove () {}
+ };
+ }
+ };
+ }
+ });
+ case MapJoin(`mx,`my,`r,`x,`y):
+ final org.apache.mrql.Function fx = evalF(mx,env);
+ final org.apache.mrql.Function fy = evalF(my,env);
+ final org.apache.mrql.Function fr = evalF(r,env);
+ final Broadcast<List<Tuple2<MRData,MRData>>> ys
+ = spark_context.broadcast(eval(y,env).flatMap(new PairFlatMapFunction<MRData,MRData,MRData>() {
+ public Iterable<Tuple2<MRData,MRData>> call ( MRData value ) {
+ return joinIterator(((Bag)fy.eval(value)).iterator());
+ }
+ }).collect());
+ return eval(x,env).flatMap(new FlatMapFunction<MRData,MRData>() {
+ final Hashtable<MRData,Bag> built_table = make_built_table(ys.value());
+ public Iterable<MRData> call ( MRData value ) {
+ final Iterator<MRData> i = ((Bag)fx.eval(value)).iterator();
+ return new Iterable<MRData>() {
+ public Iterator<MRData> iterator() {
+ return new Iterator<MRData>() {
+ Tuple p;
+ Iterator<MRData> ix = null;
+ public MRData next () {
+ return ix.next();
+ }
+ public boolean hasNext () {
+ if (ix != null && ix.hasNext())
+ return true;
+ while (i.hasNext()) {
+ p = (Tuple)i.next();
+ MRData pd = built_table.get(p.first());
+ Bag bb = ((Bag)fr.eval(new Tuple(p.second(),(pd == null) ? empty_bag : pd)));
+ ix = bb.iterator();
+ if (ix.hasNext())
+ return true;
+ };
+ return false;
+ }
+ public void remove () {}
+ };
+ }
+ };
+ }
+ });
+ case BinarySource(`file,_):
+ String path = ((MR_string)evalE(file,env)).get();
+ new BinaryDataSource(path,Plan.conf);
+ return containerData(spark_context.sequenceFile(file.stringValue(),
+ MRContainer.class,MRContainer.class,
+ Config.nodes));
+ case ParsedSource(`parser,`file,...args):
+ String path = ((MR_string)evalE(file,env)).get();
+ Class<? extends Parser> p = DataSource.parserDirectory.get(parser.toString());
+ if (p == null)
+ throw new Error("Unknown parser: "+parser);
+ new ParsedDataSource(path,p,args,Plan.conf);
+ dump_source_dir();
+ return containerData(spark_context.hadoopFile(path,SparkParsedInputFormat.class,
+ MRContainer.class,MRContainer.class,
+ Config.nodes));
+ case Merge(`x,`y):
+ return eval(x,env).union(eval(y,env));
+ case Repeat(lambda(`v,`b),`s,`n):
+ int max_num = ((MR_int)evalE(n,env)).get();
+ JavaRDD<MRData> rd;
+ JavaRDD<MRData> res = eval(s,env).cache();
+ int i = 0;
+ boolean cont = true;
+ do {
+ rd = eval(b,new Environment(v.toString(),new MR_rdd(res),env)).cache();
+ res = rd.map(get_first).cache();
+ Integer true_results
+ = rd.aggregate(new Integer(0),
+ new Function2<Integer,MRData,Integer>() {
+ public Integer call ( Integer x, MRData y ) {
+ return (((MR_bool)((Tuple)y).second()).get()) ? x+1 : x;
+ }
+ },
+ new Function2<Integer,Integer,Integer>() {
+ public Integer call ( Integer x, Integer y ) { return x+y; }
+ });
+ i++;
+ cont = true_results > 0 && i <= max_num;
+ System.err.println("*** Repeat #"+i+": "+true_results+" true results");
+ } while (cont);
+ return res;
+ case Closure(lambda(`v,`b),`s,`m):
+ int max_num = ((MR_int)evalE(m,env)).get();
+ JavaRDD<MRData> res = eval(s,env).cache();
+ long n = 0;
+ long old = 0;
+ int i = 0;
+ boolean cont = true;
+ do {
+ res = eval(b,new Environment(v.toString(),new MR_rdd(res),env)).cache();
+ old = n;
+ n = res.count();
+ i++;
+ System.err.println("*** Repeat #"+i+": "+(old-n)+" new records");
+ } while (old < n && i <= max_num);
+ return res;
+ case Generator(`min,`max,`size):
+ DataSet ds = Plan.generator(((MR_long)evalE(min,env)).get(),
+ ((MR_long)evalE(max,env)).get(),
+ ((MR_long)evalE(size,env)).get());
+ JavaRDD<MRData> rd = null;
+ for ( DataSource d: ds.source )
+ if (rd == null)
+ rd = containerData(spark_context.hadoopFile(d.path,SparkGeneratorInputFormat.class,
+ MRContainer.class,MRContainer.class,1));
+ else rd = rd.union(containerData(spark_context.hadoopFile(d.path,SparkGeneratorInputFormat.class,
+ MRContainer.class,MRContainer.class,1)));
+ return rd;
+ case let(`v,`u,`body):
+ return eval(body,new Environment(v.toString(),evalE(u,env),env));
+ case Let(`v,`u,`body):
+ return eval(body,new Environment(v.toString(),new MR_rdd(eval(u,env)),env));
+ case If(`c,`x,`y):
+ if (((MR_bool)evalE(c,env)).get())
+ return eval(x,env);
+ else return eval(y,env);
+ case `v:
+ if (!v.is_variable())
+ fail;
+ MRData x = variable_lookup(v.toString(),env);
+ if (x != null)
+ if (x instanceof MR_rdd)
+ return ((MR_rdd)x).rdd();
+ x = variable_lookup(v.toString(),global_env);
+ if (x != null)
+ if (x instanceof MR_rdd)
+ return ((MR_rdd)x).rdd();
+ throw new Error("Variable "+v+" is not bound");
+ };
+ throw new Error("Cannot evaluate the Spark plan: "+e);
+ } catch (Error msg) {
+ if (!Config.trace)
+ throw new Error(msg.getMessage());
+ System.err.println(msg.getMessage());
+ throw new Error("Evaluation error in: "+print_query(e));
+ } catch (Exception ex) {
+ System.err.println(ex.getMessage());
+ ex.printStackTrace();
+ throw new Error("Evaluation error in: "+print_query(e));
+ }
+ }
+}