You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mrql.apache.org by fe...@apache.org on 2013/09/06 22:57:30 UTC
[12/18] MRQL-16: correct source files. ASF licenses,
and POMs for release
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/3de9e485/src/main/java/core/Interpreter.gen
----------------------------------------------------------------------
diff --git a/src/main/java/core/Interpreter.gen b/src/main/java/core/Interpreter.gen
index 1799c08..5cc50ce 100644
--- a/src/main/java/core/Interpreter.gen
+++ b/src/main/java/core/Interpreter.gen
@@ -31,122 +31,122 @@ public class Interpreter extends TypeInference {
/** retrieve variable binding */
public final static MRData variable_lookup ( final String v, final Environment environment ) {
- for ( Environment env = environment; env != null; env = env.next ) {
- if (v.equals(env.name))
- return env.value;
- };
- return null;
+ for ( Environment env = environment; env != null; env = env.next ) {
+ if (v.equals(env.name))
+ return env.value;
+ };
+ return null;
}
/** insert a new global variable binding */
public final static void new_global_binding ( final String var, final MRData value ) {
- if (value instanceof Bag)
- ((Bag)value).materialize();
- global_env = new Environment(var,value,global_env);
+ if (value instanceof Bag)
+ ((Bag)value).materialize();
+ global_env = new Environment(var,value,global_env);
}
/** remove a global variable binding */
public static void remove_global_binding ( String v ) {
- if (global_env == null)
- return;
- for ( Environment env = global_env; env.next != null; env = env.next )
- if (v.equals(env.next.name))
- env.next = env.next.next;
- if (global_env.name == v)
- global_env = global_env.next;
+ if (global_env == null)
+ return;
+ for ( Environment env = global_env; env.next != null; env = env.next )
+ if (v.equals(env.next.name))
+ env.next = env.next.next;
+ if (global_env.name == v)
+ global_env = global_env.next;
}
/** retrieve a global variable binding */
public static MRData lookup_global_binding ( String v ) {
- for ( Environment env = global_env; env != null; env = env.next )
- if (v.equals(env.name))
- return env.value;
- return null;
+ for ( Environment env = global_env; env != null; env = env.next )
+ if (v.equals(env.name))
+ return env.value;
+ return null;
}
public static void set_global_bindings ( Environment env ) {
- global_env = env;
+ global_env = env;
}
final static int coerce_method = ClassImporter.find_method_number("coerce",#[any,int]);
/** untyped reify: not type-correct but will not crash the run-time system */
private final static Tree reify ( final MRData x ) {
- if (x instanceof Bag) {
- Bag b = (Bag)x;
- Trees as = #[];
- for ( MRData e: b)
- as = as.append(reify(e));
- return #<list(...as)>;
- } else if (x instanceof Tuple) {
- Tuple t = (Tuple)x;
- Trees as = #[];
- for ( short i = 0; i < t.size(); i++ )
- as = as.append(reify(t.get(i)));
- return #<tuple(...as)>;
- } else if (x instanceof MR_string)
- return new StringLeaf(((MR_string)x).get());
- else if (x instanceof MR_short)
- return #<callM(coerce,`coerce_method,`(((MR_short)x).get()),`(MRContainer.SHORT))>;
- else if (x instanceof MR_int)
- return #<`(((MR_int)x).get())>;
- else if (x instanceof MR_long)
- return #<callM(coerce,`coerce_method,`((int)((MR_long)x).get()),`(MRContainer.LONG))>;
- else if (x instanceof MR_float)
- return #<`(((MR_float)x).get())>;
- else if (x instanceof MR_double)
- return #<callM(coerce,`coerce_method,`((float)(((MR_double)x).get())),`(MRContainer.DOUBLE))>;
- throw new Error("wrong MRData: "+x);
+ if (x instanceof Bag) {
+ Bag b = (Bag)x;
+ Trees as = #[];
+ for ( MRData e: b)
+ as = as.append(reify(e));
+ return #<list(...as)>;
+ } else if (x instanceof Tuple) {
+ Tuple t = (Tuple)x;
+ Trees as = #[];
+ for ( short i = 0; i < t.size(); i++ )
+ as = as.append(reify(t.get(i)));
+ return #<tuple(...as)>;
+ } else if (x instanceof MR_string)
+ return new StringLeaf(((MR_string)x).get());
+ else if (x instanceof MR_short)
+ return #<callM(coerce,`coerce_method,`(((MR_short)x).get()),`(MRContainer.SHORT))>;
+ else if (x instanceof MR_int)
+ return #<`(((MR_int)x).get())>;
+ else if (x instanceof MR_long)
+ return #<callM(coerce,`coerce_method,`((int)((MR_long)x).get()),`(MRContainer.LONG))>;
+ else if (x instanceof MR_float)
+ return #<`(((MR_float)x).get())>;
+ else if (x instanceof MR_double)
+ return #<callM(coerce,`coerce_method,`((float)(((MR_double)x).get())),`(MRContainer.DOUBLE))>;
+ throw new Error("wrong MRData: "+x);
}
/** evaluate an MRQL function in memory */
private final static Function evalf ( final String v,
- final Tree body,
- final Environment env ) {
- return new Function() {
- final public MRData eval ( final MRData x ) {
- return evalE(body,new Environment(v,x,env));
- }
- };
+ final Tree body,
+ final Environment env ) {
+ return new Function() {
+ final public MRData eval ( final MRData x ) {
+ return evalE(body,new Environment(v,x,env));
+ }
+ };
}
/** evaluate an MRQL function in memory */
public final static Function evalF ( Tree fnc, Environment env ) {
- match fnc {
- case compiled(`ln,`lm,...vars):
- try {
- return Compiler.compiled(Thread.currentThread().getContextClassLoader(),ln.toString());
- } catch (Exception ex) {
- System.err.println("*** Unable to retrieve the compiled lambda: "+fnc);
- return ((Lambda) evalE(lm)).lambda();
- }
- case lambda(`v,`b):
- return evalf(v.toString(),b,env);
- case function(tuple(...params),`tp,`body):
- String[] as = new String[params.length()];
- int i = 0;
- for ( Tree param: params )
- match param {
- case `bind(`v,_):
- as[i++] = v.toString();
- };
- return evalT(as,body,env);
- };
- throw new Error("Ill-formed lambda: "+fnc);
+ match fnc {
+ case compiled(`ln,`lm,...vars):
+ try {
+ return Compiler.compiled(Thread.currentThread().getContextClassLoader(),ln.toString());
+ } catch (Exception ex) {
+ System.err.println("*** Unable to retrieve the compiled lambda: "+fnc);
+ return ((Lambda) evalE(lm)).lambda();
+ }
+ case lambda(`v,`b):
+ return evalf(v.toString(),b,env);
+ case function(tuple(...params),`tp,`body):
+ String[] as = new String[params.length()];
+ int i = 0;
+ for ( Tree param: params )
+ match param {
+ case `bind(`v,_):
+ as[i++] = v.toString();
+ };
+ return evalT(as,body,env);
+ };
+ throw new Error("Ill-formed lambda: "+fnc);
}
/** evaluate an MRQL function in memory */
private final static Function evalT ( final String[] params,
- final Tree body,
- final Environment env ) {
- return new Function() {
- final public MRData eval ( final MRData x ) {
- Environment new_env = env;
- for ( int i = 0; i < params.length; i++ )
- new_env = new Environment(params[i],((Tuple)x).get(i),new_env);
- return evalE(body,new_env);
- }
- };
+ final Tree body,
+ final Environment env ) {
+ return new Function() {
+ final public MRData eval ( final MRData x ) {
+ Environment new_env = env;
+ for ( int i = 0; i < params.length; i++ )
+ new_env = new Environment(params[i],((Tuple)x).get(i),new_env);
+ return evalE(body,new_env);
+ }
+ };
}
final static String true_name = #<true>.toString();
@@ -159,573 +159,573 @@ public class Interpreter extends TypeInference {
static int tab_count = -3;
public static String tabs ( int n ) {
- StringBuffer b = new StringBuffer();
- for ( int i = 0; i < n; i++)
- b.append(' ');
- return b.toString();
+ StringBuffer b = new StringBuffer();
+ for ( int i = 0; i < n; i++)
+ b.append(' ');
+ return b.toString();
}
/** evaluate an MRQL expression in memory and print tracing info */
final static MRData evalE ( final Tree e, final Environment env ) {
- if (Config.trace_exp_execution) {
- tab_count += 3;
- System.out.println(tabs(tab_count)+print_query(e));
- };
- MRData res = evalEE(e,env);
- if (Config.trace_exp_execution) {
- System.out.println(tabs(tab_count)+"-> "+res);
- tab_count -= 3;
- };
- return res;
+ if (Config.trace_exp_execution) {
+ tab_count += 3;
+ System.out.println(tabs(tab_count)+print_query(e));
+ };
+ MRData res = evalEE(e,env);
+ if (Config.trace_exp_execution) {
+ System.out.println(tabs(tab_count)+"-> "+res);
+ tab_count -= 3;
+ };
+ return res;
}
/** evaluate an MRQL expression in memory */
private final static MRData evalEE ( final Tree e, final Environment env ) {
- try {
- if (e.is_variable()) {
- String v = e.toString();
- if (v == true_name)
- return true_value;
- else if (v == false_name)
- return false_value;
- else if (v == null_name)
- return null_value;
- MRData x = variable_lookup(v,env);
- if (x != null)
- return x;
- x = lookup_global_binding(v);
- if (x == null)
- throw new Error("Variable "+v+" is not bound");
- return x;
- } else if (e.is_long())
- return new MR_int((int)e.longValue());
- else if (e.is_double())
- return new MR_float((float)e.doubleValue());
- else if (e.is_string())
- return new MR_string(e.stringValue());
+ try {
+ if (e.is_variable()) {
+ String v = e.toString();
+ if (v == true_name)
+ return true_value;
+ else if (v == false_name)
+ return false_value;
+ else if (v == null_name)
+ return null_value;
+ MRData x = variable_lookup(v,env);
+ if (x != null)
+ return x;
+ x = lookup_global_binding(v);
+ if (x == null)
+ throw new Error("Variable "+v+" is not bound");
+ return x;
+ } else if (e.is_long())
+ return new MR_int((int)e.longValue());
+ else if (e.is_double())
+ return new MR_float((float)e.doubleValue());
+ else if (e.is_string())
+ return new MR_string(e.stringValue());
match e {
- case callM(and,_,`x,`y): // lazy
- return (((MR_bool)evalE(x,env)).get()) ? evalE(y,env) : false_value;
- case callM(or,_,`x,`y):
- return (((MR_bool)evalE(x,env)).get()) ? true_value : evalE(y,env);
- case callM(`f,`n,...args): // internal function call
- MRData[] as = new MRData[args.length()];
- for ( int i = 0; i < args.length(); i++ )
- as[i] = evalE(args.nth(i),env);
- return ClassImporter.call((int)n.longValue(),as);
- case compiled(`ln,_,...vars):
- return new Lambda(Compiler.compiled(Thread.currentThread().getContextClassLoader(),ln.toString()));
- case lambda(`v,`body):
- return new Lambda(evalf(v.toString(),body,env));
- case nth(`x,`n):
- return ((Tuple)evalE(x,env)).get((int)n.longValue());
- case setNth(`x,`n,`v,`ret):
- return ((Tuple)evalE(x,env)).set((int)n.longValue(),evalE(v,env),evalE(ret,env));
- case materialize(`u):
- return MapReduceAlgebra.materialize(evalE(u,env));
- case let(`v,`u,`body):
- MRData x = evalE(u,env);
- if (x instanceof Bag)
- ((Bag)x).materialize();
- return evalE(body,new Environment(v.toString(),x,env));
- case cmap(`f,`s):
- return MapReduceAlgebra.cmap(evalF(f,env),(Bag)evalE(s,env));
- case filter(`p,`m,`s):
- return MapReduceAlgebra.filter(evalF(p,env),evalF(m,env),(Bag)evalE(s,env));
- case map(`m,`s):
- return MapReduceAlgebra.map(evalF(m,env),(Bag)evalE(s,env));
- case repeat(lambda(`v,`b),`s,`n):
- final String nm = v.toString();
- final Tree body = b;
- if (Config.hadoop_mode) {
- Function loop_fnc = new Function () {
- public MRData eval ( MRData s ) {
- new_global_binding(nm,s);
- MRData val = new MR_dataset(Evaluator.eval(body,new Environment(nm,s,env),nm));
- return val;
- }; };
- return MapReduceAlgebra.repeat(loop_fnc,(Bag)evalE(s,env),((MR_int)evalE(n,env)).get());
- } else {
- Function loop_fnc = new Function () {
- public MRData eval ( MRData s ) {
- new_global_binding(nm,s);
- return evalM(body,new Environment(nm,s,env));
- }; };
- return MapReduceAlgebra.repeat(loop_fnc,(Bag)evalE(s,env),((MR_int)evalE(n,env)).get());
- }
- case repeat(`lm,`s,`n):
- return MapReduceAlgebra.repeat(evalF(lm,env),(Bag)evalE(s,env),((MR_int)evalE(n,env)).get());
- case range(`min,`max):
- return MapReduceAlgebra.generator(((MR_long)evalE(min,env)).get(),
- ((MR_long)evalE(max,env)).get());
- case call(`f,...args):
- Tuple t = new Tuple(args.length());
- int i = 0;
- for ( Tree a: args )
- t.set(i++,evalE(a,env));
- return evalF(f,env).eval(t);
- case tuple(`x,`y):
- return new Tuple(evalE(x,env),evalE(y,env));
- case tuple(`x,`y,`z):
- return new Tuple(evalE(x,env),evalE(y,env),evalE(z,env));
- case tuple(...el):
- Tuple t = new Tuple(el.length());
- int i = 0;
- for ( Tree a: el )
- t.set(i++,evalE(a,env));
- return t;
- case tagged_union(`n,`u):
- return new Union((byte)n.longValue(),evalE(u,env));
- case union_value(`x):
- return ((Union)evalE(x,env)).value();
- case union_tag(`x):
- return new MR_int(((Union)evalE(x,env)).tag());
- // used for shortcutting sync in bsp supersteps
- case BAG():
- return SystemFunctions.bsp_empty_bag;
- case TRUE():
- return SystemFunctions.bsp_true_value;
- case FALSE():
- return SystemFunctions.bsp_false_value;
- case `T(...el):
- if (!is_collection(T))
- fail;
- if (el.is_empty())
- return new Bag();
- Bag b = new Bag(el.length());
- for ( Tree a: el )
- b.add(evalE(a,env));
- return b;
- case if(`c,`x,`y):
- if (((MR_bool)evalE(c,env)).get())
- return evalE(x,env);
- else return evalE(y,env);
+ case callM(and,_,`x,`y): // lazy
+ return (((MR_bool)evalE(x,env)).get()) ? evalE(y,env) : false_value;
+ case callM(or,_,`x,`y):
+ return (((MR_bool)evalE(x,env)).get()) ? true_value : evalE(y,env);
+ case callM(`f,`n,...args): // internal function call
+ MRData[] as = new MRData[args.length()];
+ for ( int i = 0; i < args.length(); i++ )
+ as[i] = evalE(args.nth(i),env);
+ return ClassImporter.call((int)n.longValue(),as);
+ case compiled(`ln,_,...vars):
+ return new Lambda(Compiler.compiled(Thread.currentThread().getContextClassLoader(),ln.toString()));
+ case lambda(`v,`body):
+ return new Lambda(evalf(v.toString(),body,env));
+ case nth(`x,`n):
+ return ((Tuple)evalE(x,env)).get((int)n.longValue());
+ case setNth(`x,`n,`v,`ret):
+ return ((Tuple)evalE(x,env)).set((int)n.longValue(),evalE(v,env),evalE(ret,env));
+ case materialize(`u):
+ return MapReduceAlgebra.materialize(evalE(u,env));
+ case let(`v,`u,`body):
+ MRData x = evalE(u,env);
+ if (x instanceof Bag)
+ ((Bag)x).materialize();
+ return evalE(body,new Environment(v.toString(),x,env));
+ case cmap(`f,`s):
+ return MapReduceAlgebra.cmap(evalF(f,env),(Bag)evalE(s,env));
+ case filter(`p,`m,`s):
+ return MapReduceAlgebra.filter(evalF(p,env),evalF(m,env),(Bag)evalE(s,env));
+ case map(`m,`s):
+ return MapReduceAlgebra.map(evalF(m,env),(Bag)evalE(s,env));
+ case repeat(lambda(`v,`b),`s,`n):
+ final String nm = v.toString();
+ final Tree body = b;
+ if (Config.hadoop_mode) {
+ Function loop_fnc = new Function () {
+ public MRData eval ( MRData s ) {
+ new_global_binding(nm,s);
+ MRData val = new MR_dataset(Evaluator.eval(body,new Environment(nm,s,env),nm));
+ return val;
+ }; };
+ return MapReduceAlgebra.repeat(loop_fnc,(Bag)evalE(s,env),((MR_int)evalE(n,env)).get());
+ } else {
+ Function loop_fnc = new Function () {
+ public MRData eval ( MRData s ) {
+ new_global_binding(nm,s);
+ return evalM(body,new Environment(nm,s,env));
+ }; };
+ return MapReduceAlgebra.repeat(loop_fnc,(Bag)evalE(s,env),((MR_int)evalE(n,env)).get());
+ }
+ case repeat(`lm,`s,`n):
+ return MapReduceAlgebra.repeat(evalF(lm,env),(Bag)evalE(s,env),((MR_int)evalE(n,env)).get());
+ case range(`min,`max):
+ return MapReduceAlgebra.generator(((MR_long)evalE(min,env)).get(),
+ ((MR_long)evalE(max,env)).get());
+ case call(`f,...args):
+ Tuple t = new Tuple(args.length());
+ int i = 0;
+ for ( Tree a: args )
+ t.set(i++,evalE(a,env));
+ return evalF(f,env).eval(t);
+ case tuple(`x,`y):
+ return new Tuple(evalE(x,env),evalE(y,env));
+ case tuple(`x,`y,`z):
+ return new Tuple(evalE(x,env),evalE(y,env),evalE(z,env));
+ case tuple(...el):
+ Tuple t = new Tuple(el.length());
+ int i = 0;
+ for ( Tree a: el )
+ t.set(i++,evalE(a,env));
+ return t;
+ case tagged_union(`n,`u):
+ return new Union((byte)n.longValue(),evalE(u,env));
+ case union_value(`x):
+ return ((Union)evalE(x,env)).value();
+ case union_tag(`x):
+ return new MR_int(((Union)evalE(x,env)).tag());
+ // used for shortcutting sync in bsp supersteps
+ case BAG():
+ return SystemFunctions.bsp_empty_bag;
+ case TRUE():
+ return SystemFunctions.bsp_true_value;
+ case FALSE():
+ return SystemFunctions.bsp_false_value;
+ case `T(...el):
+ if (!is_collection(T))
+ fail;
+ if (el.is_empty())
+ return new Bag();
+ Bag b = new Bag(el.length());
+ for ( Tree a: el )
+ b.add(evalE(a,env));
+ return b;
+ case if(`c,`x,`y):
+ if (((MR_bool)evalE(c,env)).get())
+ return evalE(x,env);
+ else return evalE(y,env);
case Collect(`s):
- try {
- if (Config.hadoop_mode)
- return Plan.collect(Evaluator.eval(s,env,"-"));
- Bag b = evalS(s,env);
- b.materialize();
- return b;
- } catch (Exception ex) { throw new Error(ex); }
- case dataset_size(`x):
- return new MR_long(Plan.size(Evaluator.eval(x,env,"-")) / (1024*1024));
- case synchronize(`peer,`b):
- return Evaluator.synchronize(((MR_string)evalE(peer,env)),(MR_bool)evalE(b,env));
- case distribute(`peer,`s):
- return Evaluator.distribute(((MR_string)evalE(peer,env)),(Bag)evalE(s,env));
- case mapReduce(`m,`r,`s,_):
- return MapReduceAlgebra.mapReduce(evalF(m,env),
- evalF(r,env),
- (Bag)evalE(s,env));
- case mapReduce2(`mx,`my,`r,`x,`y,_):
- return MapReduceAlgebra.mapReduce2(
- evalF(mx,env),
- evalF(my,env),
- evalF(r,env),
- (Bag)evalE(x,env),
- (Bag)evalE(y,env));
- case mapJoin(`kx,`ky,`r,`x,`y):
- return MapReduceAlgebra.mapJoin(
- evalF(kx,env),
- evalF(ky,env),
- evalF(r,env),
- (Bag)evalE(x,env),
- (Bag)evalE(y,env));
- case crossProduct(`mx,`my,`r,`x,`y):
- return MapReduceAlgebra.crossProduct(
- evalF(mx,env),
- evalF(my,env),
- evalF(r,env),
- (Bag)evalE(x,env),
- (Bag)evalE(y,env));
- case groupBy(`s):
- return MapReduceAlgebra.groupBy((Bag)evalE(s,env));
- case orderBy(`s):
- return MapReduceAlgebra.groupBy((Bag)evalE(s,env));
- case index(`x,`n):
- MRData xv = evalE(x,env);
- MRData nv = evalE(n,env);
- if (xv instanceof MR_dataset)
- xv = Plan.collect(((MR_dataset)xv).dataset());
- Bag b = (Bag)xv;
- int k = (int)((MR_int)nv).get();
- return b.get(k);
- case range(`x,`i,`j):
- MRData xv = evalE(x,env);
- MRData ni = evalE(i,env);
- MRData nj = evalE(j,env);
- if (xv instanceof MR_dataset)
- xv = Plan.collect(((MR_dataset)xv).dataset());
- Bag b = (Bag)xv;
- int ki = (int)((MR_int)ni).get();
- int kj = (int)((MR_int)nj).get();
- Iterator<MRData> it = b.iterator();
- Bag s = new Bag();
- for ( int n = 0; it.hasNext() && n < ki; n++ )
- it.next();
- for ( int n = ki; it.hasNext() && n <= kj; n++ )
- s.add(it.next());
- return s;
- case map_index(`x,`key):
- MRData xv = evalE(x,env);
- MRData nk = evalE(key,env);
- if (xv instanceof MR_dataset)
- xv = Plan.collect(((MR_dataset)xv).dataset());
- return ((Bag)xv).map_find(nk);
- case aggregate(`acc,`zero,`s):
- return MapReduceAlgebra.aggregate(evalF(acc,env),evalE(zero,env),
- (Bag)evalE(s,env));
- case Aggregate(`acc,`zero,`s):
- if (Config.hadoop_mode)
- return Evaluator.aggregate(closure(acc,env),zero,s,env);
- else return MapReduceAlgebra.aggregate(evalF(acc,env),evalE(zero,env),evalM(s,env));
- case BSP(tuple(...ns),`superstep,`state,`o,...as):
- if (Config.hadoop_mode)
- return Evaluator.bsp(e,env);
- Bag[] ds = new Bag[as.length()];
- for ( int i = 0; i < ds.length; i++ )
- ds[i] = evalM(as.nth(i),env);
- int[] nn = new int[ns.length()];
- for ( int i = 0; i < ns.length(); i++ )
- nn[i] = (int)((LongLeaf)ns.nth(i)).value();
- return MapReduceAlgebra.BSP(nn,
- evalF(superstep,env),
- evalE(state,env),
- o.equals(#<true>),
- ds);
- case BSP(`n,`superstep,`state,`o,...as):
- if (Config.hadoop_mode)
- return Evaluator.bsp(e,env);
- Bag[] ds = new Bag[as.length()];
- for ( int i = 0; i < ds.length; i++ )
- ds[i] = evalM(as.nth(i),env);
- return MapReduceAlgebra.BSP(new int[]{(int)((LongLeaf)n).value()},
- evalF(superstep,env),
- evalE(state,env),
- o.equals(#<true>),
- ds);
- case Loop(lambda(tuple(...vs),tuple(...bs)),tuple(...ss),`num):
- if (Config.hadoop_mode)
- return Evaluator.loop(e,env);
- int limit = ((MR_int)evalE(num,env)).get();
- Bag[] s = new Bag[vs.length()];
- for ( int i = 0; i < vs.length(); i++ )
- s[i] = evalM(ss.nth(i),env);
- for ( int n = 0; n < limit; n++ ) {
- Environment nenv = env;
- for ( int i = 0; i < vs.length(); i ++ ) {
- s[i].materialize();
- nenv = new Environment(vs.nth(i).toString(),s[i],nenv);
- };
- for ( int i = 0; i < vs.length(); i ++ )
- s[i] = (Bag)evalM(bs.nth(i),nenv);
- };
- return new Tuple(s);
- case function(tuple(...params),`tp,`body):
- String[] as = new String[params.length()];
- int i = 0;
- for ( Tree param: params )
- match param {
- case `bind(`v,_):
- as[i++] = v.toString();
- };
- return new Lambda(evalT(as,body,env));
- case typed(`x,_):
- return evalE(x,env);
- case apply(`f,`arg):
- if (!f.is_variable())
- return evalF(f,env).eval(evalE(arg,env));
- MRData fnc = lookup_global_binding(f.toString());
- if (fnc == null) {
- String s = Plan.conf.get("mrql.global."+f);
- if (s != null)
- try {
- Tree ft = Tree.parse(s);
- TopLevel.store(f.toString(),ft);
- fnc = evalE(ft,env);
- new_global_binding(f.toString(),fnc);
- } catch (Exception ex) {
- throw new Error(ex);
- }
- };
- MRData t = evalE(arg,env);
- if (!(t instanceof Tuple))
- throw new Error("Expected a tuple in function application: "+t);
- return ((Lambda)fnc).lambda().eval(t);
- case trace(`x):
- MRData z = evalE(x,env);
- System.err.println("*** "+x+": "+z);
- return z;
- case BinarySource(...,`file,`tp):
- if (Config.hadoop_mode)
- if (collection_type(tp))
- return Plan.collect(Plan.binarySource(file.stringValue()));
- else return Plan.collect(Plan.binarySource(file.stringValue())).get(0);
- else return MapReduceAlgebra.read_binary(file.stringValue());
- case _:
- try {
- if (Config.hadoop_mode)
- return new MR_dataset(Evaluator.eval(e,env,"-"));
- else return evalS(e,env);
- } catch (Exception ex) { throw new Error(ex); }
+ try {
+ if (Config.hadoop_mode)
+ return Plan.collect(Evaluator.eval(s,env,"-"));
+ Bag b = evalS(s,env);
+ b.materialize();
+ return b;
+ } catch (Exception ex) { throw new Error(ex); }
+ case dataset_size(`x):
+ return new MR_long(Plan.size(Evaluator.eval(x,env,"-")) / (1024*1024));
+ case synchronize(`peer,`b):
+ return Evaluator.synchronize(((MR_string)evalE(peer,env)),(MR_bool)evalE(b,env));
+ case distribute(`peer,`s):
+ return Evaluator.distribute(((MR_string)evalE(peer,env)),(Bag)evalE(s,env));
+ case mapReduce(`m,`r,`s,_):
+ return MapReduceAlgebra.mapReduce(evalF(m,env),
+ evalF(r,env),
+ (Bag)evalE(s,env));
+ case mapReduce2(`mx,`my,`r,`x,`y,_):
+ return MapReduceAlgebra.mapReduce2(
+ evalF(mx,env),
+ evalF(my,env),
+ evalF(r,env),
+ (Bag)evalE(x,env),
+ (Bag)evalE(y,env));
+ case mapJoin(`kx,`ky,`r,`x,`y):
+ return MapReduceAlgebra.mapJoin(
+ evalF(kx,env),
+ evalF(ky,env),
+ evalF(r,env),
+ (Bag)evalE(x,env),
+ (Bag)evalE(y,env));
+ case crossProduct(`mx,`my,`r,`x,`y):
+ return MapReduceAlgebra.crossProduct(
+ evalF(mx,env),
+ evalF(my,env),
+ evalF(r,env),
+ (Bag)evalE(x,env),
+ (Bag)evalE(y,env));
+ case groupBy(`s):
+ return MapReduceAlgebra.groupBy((Bag)evalE(s,env));
+ case orderBy(`s):
+ return MapReduceAlgebra.groupBy((Bag)evalE(s,env));
+ case index(`x,`n):
+ MRData xv = evalE(x,env);
+ MRData nv = evalE(n,env);
+ if (xv instanceof MR_dataset)
+ xv = Plan.collect(((MR_dataset)xv).dataset());
+ Bag b = (Bag)xv;
+ int k = (int)((MR_int)nv).get();
+ return b.get(k);
+ case range(`x,`i,`j):
+ MRData xv = evalE(x,env);
+ MRData ni = evalE(i,env);
+ MRData nj = evalE(j,env);
+ if (xv instanceof MR_dataset)
+ xv = Plan.collect(((MR_dataset)xv).dataset());
+ Bag b = (Bag)xv;
+ int ki = (int)((MR_int)ni).get();
+ int kj = (int)((MR_int)nj).get();
+ Iterator<MRData> it = b.iterator();
+ Bag s = new Bag();
+ for ( int n = 0; it.hasNext() && n < ki; n++ )
+ it.next();
+ for ( int n = ki; it.hasNext() && n <= kj; n++ )
+ s.add(it.next());
+ return s;
+ case map_index(`x,`key):
+ MRData xv = evalE(x,env);
+ MRData nk = evalE(key,env);
+ if (xv instanceof MR_dataset)
+ xv = Plan.collect(((MR_dataset)xv).dataset());
+ return ((Bag)xv).map_find(nk);
+ case aggregate(`acc,`zero,`s):
+ return MapReduceAlgebra.aggregate(evalF(acc,env),evalE(zero,env),
+ (Bag)evalE(s,env));
+ case Aggregate(`acc,`zero,`s):
+ if (Config.hadoop_mode)
+ return Evaluator.aggregate(closure(acc,env),zero,s,env);
+ else return MapReduceAlgebra.aggregate(evalF(acc,env),evalE(zero,env),evalM(s,env));
+ case BSP(tuple(...ns),`superstep,`state,`o,...as):
+ if (Config.hadoop_mode)
+ return Evaluator.bsp(e,env);
+ Bag[] ds = new Bag[as.length()];
+ for ( int i = 0; i < ds.length; i++ )
+ ds[i] = evalM(as.nth(i),env);
+ int[] nn = new int[ns.length()];
+ for ( int i = 0; i < ns.length(); i++ )
+ nn[i] = (int)((LongLeaf)ns.nth(i)).value();
+ return MapReduceAlgebra.BSP(nn,
+ evalF(superstep,env),
+ evalE(state,env),
+ o.equals(#<true>),
+ ds);
+ case BSP(`n,`superstep,`state,`o,...as):
+ if (Config.hadoop_mode)
+ return Evaluator.bsp(e,env);
+ Bag[] ds = new Bag[as.length()];
+ for ( int i = 0; i < ds.length; i++ )
+ ds[i] = evalM(as.nth(i),env);
+ return MapReduceAlgebra.BSP(new int[]{(int)((LongLeaf)n).value()},
+ evalF(superstep,env),
+ evalE(state,env),
+ o.equals(#<true>),
+ ds);
+ case Loop(lambda(tuple(...vs),tuple(...bs)),tuple(...ss),`num):
+ if (Config.hadoop_mode)
+ return Evaluator.loop(e,env);
+ int limit = ((MR_int)evalE(num,env)).get();
+ Bag[] s = new Bag[vs.length()];
+ for ( int i = 0; i < vs.length(); i++ )
+ s[i] = evalM(ss.nth(i),env);
+ for ( int n = 0; n < limit; n++ ) {
+ Environment nenv = env;
+ for ( int i = 0; i < vs.length(); i ++ ) {
+ s[i].materialize();
+ nenv = new Environment(vs.nth(i).toString(),s[i],nenv);
+ };
+ for ( int i = 0; i < vs.length(); i ++ )
+ s[i] = (Bag)evalM(bs.nth(i),nenv);
+ };
+ return new Tuple(s);
+ case function(tuple(...params),`tp,`body):
+ String[] as = new String[params.length()];
+ int i = 0;
+ for ( Tree param: params )
+ match param {
+ case `bind(`v,_):
+ as[i++] = v.toString();
+ };
+ return new Lambda(evalT(as,body,env));
+ case typed(`x,_):
+ return evalE(x,env);
+ case apply(`f,`arg):
+ if (!f.is_variable())
+ return evalF(f,env).eval(evalE(arg,env));
+ MRData fnc = lookup_global_binding(f.toString());
+ if (fnc == null) {
+ String s = Plan.conf.get("mrql.global."+f);
+ if (s != null)
+ try {
+ Tree ft = Tree.parse(s);
+ TopLevel.store(f.toString(),ft);
+ fnc = evalE(ft,env);
+ new_global_binding(f.toString(),fnc);
+ } catch (Exception ex) {
+ throw new Error(ex);
+ }
+ };
+ MRData t = evalE(arg,env);
+ if (!(t instanceof Tuple))
+ throw new Error("Expected a tuple in function application: "+t);
+ return ((Lambda)fnc).lambda().eval(t);
+ case trace(`x):
+ MRData z = evalE(x,env);
+ System.err.println("*** "+x+": "+z);
+ return z;
+ case BinarySource(...,`file,`tp):
+ if (Config.hadoop_mode)
+ if (collection_type(tp))
+ return Plan.collect(Plan.binarySource(file.stringValue()));
+ else return Plan.collect(Plan.binarySource(file.stringValue())).get(0);
+ else return MapReduceAlgebra.read_binary(file.stringValue());
+ case _:
+ try {
+ if (Config.hadoop_mode)
+ return new MR_dataset(Evaluator.eval(e,env,"-"));
+ else return evalS(e,env);
+ } catch (Exception ex) { throw new Error(ex); }
};
- throw new Error("Cannot evaluate the expression: "+e);
- } catch (Error msg) {
- if (!Config.trace)
- throw new Error(msg.getMessage());
- System.err.println(msg.getMessage());
- msg.printStackTrace();
- throw new Error("Evaluation error in: "+print_query(e));
- } catch (Exception ex) {
- if (Config.trace) {
- System.err.println(ex.getMessage());
- ex.printStackTrace();
- }
- throw new Error("Evaluation error in: "+print_query(e));
- }
+ throw new Error("Cannot evaluate the expression: "+e);
+ } catch (Error msg) {
+ if (!Config.trace)
+ throw new Error(msg.getMessage());
+ System.err.println(msg.getMessage());
+ msg.printStackTrace();
+ throw new Error("Evaluation error in: "+print_query(e));
+ } catch (Exception ex) {
+ if (Config.trace) {
+ System.err.println(ex.getMessage());
+ ex.printStackTrace();
+ }
+ throw new Error("Evaluation error in: "+print_query(e));
+ }
}
/** evaluate an MRQL expression in memory */
final static MRData evalE ( final Tree e ) {
- return evalE(e,null);
+ return evalE(e,null);
}
/** evaluate MRQL physical operators in memory (returns a Bag) */
final static Bag evalS ( final Tree e, final Environment env ) {
- return evalM(e,env);
+ return evalM(e,env);
}
/** evaluate MRQL physical operators in memory (returns a Bag) */
final static Bag evalM ( final Tree e, final Environment env ) {
- if (Config.trace_execution) {
- tab_count += 3;
- System.out.println(tabs(tab_count)+print_query(e));
- };
- Bag res = evalMM(e,env);
- if (Config.trace_execution) {
- System.out.println(tabs(tab_count)+"-> "+res);
- tab_count -= 3;
- };
- return res;
+ if (Config.trace_execution) {
+ tab_count += 3;
+ System.out.println(tabs(tab_count)+print_query(e));
+ };
+ Bag res = evalMM(e,env);
+ if (Config.trace_execution) {
+ System.out.println(tabs(tab_count)+"-> "+res);
+ tab_count -= 3;
+ };
+ return res;
}
/** evaluate MRQL physical operators in memory (returns a Bag) */
final static Bag evalMM ( final Tree e, final Environment env ) {
- try {
- match e {
- case cMap(`f,`s):
- return MapReduceAlgebra.cmap(evalF(f,env),evalM(s,env));
- case AggregateMap(`f,`acc,`zero,`s):
- return new Bag(MapReduceAlgebra.aggregate(evalF(acc,env),evalE(zero,env),
- evalM(#<cMap(`f,`s)>,env)));
- case MapReduce(`m,`r,`s,_):
- return MapReduceAlgebra.mapReduce(
- evalF(m,env),
- evalF(r,env),
- evalM(s,env));
- case MapAggregateReduce(`m,`r,`acc,`zero,`s,_):
- return new Bag(MapReduceAlgebra.aggregate(evalF(acc,env),evalE(zero,env),
- evalM(#<MapReduce(`m,`r,`s,false)>,env)));
- case MapCombineReduce(`m,`c,`r,`s,_):
- return MapReduceAlgebra.mapReduce(
- evalF(m,env),
- evalF(r,env),
- evalM(s,env));
- case MapReduce2(`mx,`my,`r,`x,`y,_):
- return MapReduceAlgebra.mapReduce2(
- evalF(mx,env),
- evalF(my,env),
- evalF(r,env),
- evalM(x,env),
- evalM(y,env));
- case MapCombineReduce2(`mx,`my,`c,`r,`x,`y,_):
- return MapReduceAlgebra.mapReduce2(
- evalF(mx,env),
- evalF(my,env),
- evalF(r,env),
- evalM(x,env),
- evalM(y,env));
- case MapAggregateReduce2(`mx,`my,`r,`acc,`zero,`x,`y,_):
- return new Bag(MapReduceAlgebra.aggregate(evalF(acc,env),evalE(zero,env),
- evalM(#< MapReduce2(`mx,`my,`r,`x,`y,false)>,env)));
- case MapJoin(`kx,`ky,`r,`x,`y):
- return MapReduceAlgebra.mapJoin(
- evalF(kx,env),
- evalF(ky,env),
- evalF(r,env),
- evalM(x,env),
- evalM(y,env));
- case MapAggregateJoin(`kx,`ky,`r,`acc,`zero,`x,`y):
- return new Bag(MapReduceAlgebra.aggregate(evalF(acc,env),evalE(zero,env),
- evalM(#<MapJoin(`kx,`ky,`r,`x,`y)>,env)));
- case GroupByJoin(`kx,`ky,`gx,`gy,`m,`c,`r,`x,`y,`o):
- return MapReduceAlgebra.groupByJoin(
- evalF(kx,env),
- evalF(ky,env),
- evalF(gx,env),
- evalF(gy,env),
- evalF(m,env),
- evalF(c,env),
- evalF(r,env),
- evalM(x,env),
- evalM(y,env));
- case CrossProduct(`mx,`my,`r,`x,`y):
- return MapReduceAlgebra.crossProduct(
- evalF(mx,env),
- evalF(my,env),
- evalF(r,env),
- evalM(x,env),
- evalM(y,env));
- case CrossAggregateProduct(`mx,`my,`r,`acc,`zero,`x,`y):
- return new Bag(MapReduceAlgebra.aggregate(evalF(acc,env),evalE(zero,env),
- evalM(#<CrossProduct(`mx,`my,`r,`x,`y)>,env)));
- case BinarySource(`file,_):
- return (Bag)MapReduceAlgebra.read_binary(file.stringValue());
- case BSPSource(`n,BinarySource(`file,_)):
- return (Bag)MapReduceAlgebra.read_binary((int)((LongLeaf)n).value(),
- file.stringValue());
- case BSPSource(`n,ParsedSource(`parser,`file,...args)):
- if (!(n instanceof LongLeaf))
- fail;
- Parser p = DataSource.parserDirectory.get(parser.toString()).newInstance();
- if (p == null)
- throw new Error("Unknown parser: "+parser);
- return MapReduceAlgebra.parsedSource((int)(((LongLeaf)n).value()),p,
- ((MR_string)evalE(file,env)).get(),args);
- case ParsedSource(`parser,`file,...args):
- Parser p = DataSource.parserDirectory.get(parser.toString()).newInstance();
- if (p == null)
- throw new Error("Unknown parser: "+parser);
- return MapReduceAlgebra.parsedSource(p,((MR_string)evalE(file,env)).get(),args);
- case Merge(`x,`y):
- return evalM(x,env).union(evalM(y,env));
- case Repeat(lambda(`v,`b),`s,`n):
- final String vs = v.toString();
- final Tree body = b;
- Function loop = new Function() {
- final public MRData eval ( final MRData x ) {
- return evalM(body,new Environment(vs,x,env));
- }
- };
- return MapReduceAlgebra.repeat(loop,(Bag)evalM(s,env),((MR_int)evalE(n,env)).get());
- case Closure(lambda(`v,`b),`s,`n):
- final String vs = v.toString();
- final Tree body = b;
- Function loop = new Function() {
- final public MRData eval ( final MRData x ) {
- return evalM(body,new Environment(vs,x,env));
- }
- };
- return MapReduceAlgebra.closure(loop,(Bag)evalM(s,env),((MR_int)evalE(n,env)).get());
- case Generator(`min,`max,`size):
- return MapReduceAlgebra.generator(((MR_long)evalE(min,env)).get(),
- ((MR_long)evalE(max,env)).get());
- case BSPSource(`n,Generator(`min,`max,`size)):
- return MapReduceAlgebra.generator((int)((LongLeaf)n).value(),
- ((MR_long)evalE(min,env)).get(),
- ((MR_long)evalE(max,env)).get());
- case Dump(`s):
- Bag bs = (Bag)evalE(s,env);
- final Iterator<MRData> iter = bs.iterator();
- return new Bag(new BagIterator() {
- public boolean hasNext () {
- return iter.hasNext();
- }
- public MRData next () {
- return new Tuple(new MR_int(0),iter.next());
- }
- });
- case let(`v,`u,`body):
- return evalM(body,new Environment(v.toString(),evalE(u,env),env));
- case apply(`f,`arg):
- if (!f.is_variable())
- return (Bag)evalF(f,env).eval(evalE(arg));
- MRData fnc = lookup_global_binding(f.toString());
- if (fnc == null)
- throw new Error("Unknown function: "+f);
- MRData t = evalE(arg,env);
- if (!(t instanceof Tuple))
- throw new Error("Expected a tuple in function application: "+t);
- return (Bag)((Lambda)fnc).lambda().eval(t);
- case BSPSource(`n,`s):
- final MR_int i = new MR_int((int)((LongLeaf)n).value());
- Bag bs = (Bag)evalE(s,env);
- final Iterator<MRData> iter = bs.iterator();
- return new Bag(new BagIterator() {
- public boolean hasNext () {
- return iter.hasNext();
- }
- public MRData next () {
- return new Tuple(i,iter.next());
- }
- });
- case BSP(...):
- MRData res = evalE(e,env);
- if (res instanceof Bag)
- return (Bag)res;
- else return new Bag(res);
- case `v:
- if (!v.is_variable())
- fail;
- MRData x = variable_lookup(v.toString(),env);
- if (x != null)
- return (Bag)x;
- x = lookup_global_binding(v.toString());
- if (x != null)
- return (Bag)x;
- throw new Error("Variable "+v+" is not bound");
- };
- throw new Error("Cannot evaluate the plan: "+e);
- } catch (Error msg) {
- if (!Config.trace)
- throw new Error(msg.getMessage());
- System.err.println(msg.getMessage());
- msg.printStackTrace();
- throw new Error("Evaluation error in: "+print_query(e));
- } catch (Exception ex) {
- if (Config.trace)
- ex.printStackTrace();
- throw new Error("Evaluation error in: "+print_query(e));
- }
+ try {
+ match e {
+ case cMap(`f,`s):
+ return MapReduceAlgebra.cmap(evalF(f,env),evalM(s,env));
+ case AggregateMap(`f,`acc,`zero,`s):
+ return new Bag(MapReduceAlgebra.aggregate(evalF(acc,env),evalE(zero,env),
+ evalM(#<cMap(`f,`s)>,env)));
+ case MapReduce(`m,`r,`s,_):
+ return MapReduceAlgebra.mapReduce(
+ evalF(m,env),
+ evalF(r,env),
+ evalM(s,env));
+ case MapAggregateReduce(`m,`r,`acc,`zero,`s,_):
+ return new Bag(MapReduceAlgebra.aggregate(evalF(acc,env),evalE(zero,env),
+ evalM(#<MapReduce(`m,`r,`s,false)>,env)));
+ case MapCombineReduce(`m,`c,`r,`s,_):
+ return MapReduceAlgebra.mapReduce(
+ evalF(m,env),
+ evalF(r,env),
+ evalM(s,env));
+ case MapReduce2(`mx,`my,`r,`x,`y,_):
+ return MapReduceAlgebra.mapReduce2(
+ evalF(mx,env),
+ evalF(my,env),
+ evalF(r,env),
+ evalM(x,env),
+ evalM(y,env));
+ case MapCombineReduce2(`mx,`my,`c,`r,`x,`y,_):
+ return MapReduceAlgebra.mapReduce2(
+ evalF(mx,env),
+ evalF(my,env),
+ evalF(r,env),
+ evalM(x,env),
+ evalM(y,env));
+ case MapAggregateReduce2(`mx,`my,`r,`acc,`zero,`x,`y,_):
+ return new Bag(MapReduceAlgebra.aggregate(evalF(acc,env),evalE(zero,env),
+ evalM(#< MapReduce2(`mx,`my,`r,`x,`y,false)>,env)));
+ case MapJoin(`kx,`ky,`r,`x,`y):
+ return MapReduceAlgebra.mapJoin(
+ evalF(kx,env),
+ evalF(ky,env),
+ evalF(r,env),
+ evalM(x,env),
+ evalM(y,env));
+ case MapAggregateJoin(`kx,`ky,`r,`acc,`zero,`x,`y):
+ return new Bag(MapReduceAlgebra.aggregate(evalF(acc,env),evalE(zero,env),
+ evalM(#<MapJoin(`kx,`ky,`r,`x,`y)>,env)));
+ case GroupByJoin(`kx,`ky,`gx,`gy,`m,`c,`r,`x,`y,`o):
+ return MapReduceAlgebra.groupByJoin(
+ evalF(kx,env),
+ evalF(ky,env),
+ evalF(gx,env),
+ evalF(gy,env),
+ evalF(m,env),
+ evalF(c,env),
+ evalF(r,env),
+ evalM(x,env),
+ evalM(y,env));
+ case CrossProduct(`mx,`my,`r,`x,`y):
+ return MapReduceAlgebra.crossProduct(
+ evalF(mx,env),
+ evalF(my,env),
+ evalF(r,env),
+ evalM(x,env),
+ evalM(y,env));
+ case CrossAggregateProduct(`mx,`my,`r,`acc,`zero,`x,`y):
+ return new Bag(MapReduceAlgebra.aggregate(evalF(acc,env),evalE(zero,env),
+ evalM(#<CrossProduct(`mx,`my,`r,`x,`y)>,env)));
+ case BinarySource(`file,_):
+ return (Bag)MapReduceAlgebra.read_binary(file.stringValue());
+ case BSPSource(`n,BinarySource(`file,_)):
+ return (Bag)MapReduceAlgebra.read_binary((int)((LongLeaf)n).value(),
+ file.stringValue());
+ case BSPSource(`n,ParsedSource(`parser,`file,...args)):
+ if (!(n instanceof LongLeaf))
+ fail;
+ Parser p = DataSource.parserDirectory.get(parser.toString()).newInstance();
+ if (p == null)
+ throw new Error("Unknown parser: "+parser);
+ return MapReduceAlgebra.parsedSource((int)(((LongLeaf)n).value()),p,
+ ((MR_string)evalE(file,env)).get(),args);
+ case ParsedSource(`parser,`file,...args):
+ Parser p = DataSource.parserDirectory.get(parser.toString()).newInstance();
+ if (p == null)
+ throw new Error("Unknown parser: "+parser);
+ return MapReduceAlgebra.parsedSource(p,((MR_string)evalE(file,env)).get(),args);
+ case Merge(`x,`y):
+ return evalM(x,env).union(evalM(y,env));
+ case Repeat(lambda(`v,`b),`s,`n):
+ final String vs = v.toString();
+ final Tree body = b;
+ Function loop = new Function() {
+ final public MRData eval ( final MRData x ) {
+ return evalM(body,new Environment(vs,x,env));
+ }
+ };
+ return MapReduceAlgebra.repeat(loop,(Bag)evalM(s,env),((MR_int)evalE(n,env)).get());
+ case Closure(lambda(`v,`b),`s,`n):
+ final String vs = v.toString();
+ final Tree body = b;
+ Function loop = new Function() {
+ final public MRData eval ( final MRData x ) {
+ return evalM(body,new Environment(vs,x,env));
+ }
+ };
+ return MapReduceAlgebra.closure(loop,(Bag)evalM(s,env),((MR_int)evalE(n,env)).get());
+ case Generator(`min,`max,`size):
+ return MapReduceAlgebra.generator(((MR_long)evalE(min,env)).get(),
+ ((MR_long)evalE(max,env)).get());
+ case BSPSource(`n,Generator(`min,`max,`size)):
+ return MapReduceAlgebra.generator((int)((LongLeaf)n).value(),
+ ((MR_long)evalE(min,env)).get(),
+ ((MR_long)evalE(max,env)).get());
+ case Dump(`s):
+ Bag bs = (Bag)evalE(s,env);
+ final Iterator<MRData> iter = bs.iterator();
+ return new Bag(new BagIterator() {
+ public boolean hasNext () {
+ return iter.hasNext();
+ }
+ public MRData next () {
+ return new Tuple(new MR_int(0),iter.next());
+ }
+ });
+ case let(`v,`u,`body):
+ return evalM(body,new Environment(v.toString(),evalE(u,env),env));
+ case apply(`f,`arg):
+ if (!f.is_variable())
+ return (Bag)evalF(f,env).eval(evalE(arg));
+ MRData fnc = lookup_global_binding(f.toString());
+ if (fnc == null)
+ throw new Error("Unknown function: "+f);
+ MRData t = evalE(arg,env);
+ if (!(t instanceof Tuple))
+ throw new Error("Expected a tuple in function application: "+t);
+ return (Bag)((Lambda)fnc).lambda().eval(t);
+ case BSPSource(`n,`s):
+ final MR_int i = new MR_int((int)((LongLeaf)n).value());
+ Bag bs = (Bag)evalE(s,env);
+ final Iterator<MRData> iter = bs.iterator();
+ return new Bag(new BagIterator() {
+ public boolean hasNext () {
+ return iter.hasNext();
+ }
+ public MRData next () {
+ return new Tuple(i,iter.next());
+ }
+ });
+ case BSP(...):
+ MRData res = evalE(e,env);
+ if (res instanceof Bag)
+ return (Bag)res;
+ else return new Bag(res);
+ case `v:
+ if (!v.is_variable())
+ fail;
+ MRData x = variable_lookup(v.toString(),env);
+ if (x != null)
+ return (Bag)x;
+ x = lookup_global_binding(v.toString());
+ if (x != null)
+ return (Bag)x;
+ throw new Error("Variable "+v+" is not bound");
+ };
+ throw new Error("Cannot evaluate the plan: "+e);
+ } catch (Error msg) {
+ if (!Config.trace)
+ throw new Error(msg.getMessage());
+ System.err.println(msg.getMessage());
+ msg.printStackTrace();
+ throw new Error("Evaluation error in: "+print_query(e));
+ } catch (Exception ex) {
+ if (Config.trace)
+ ex.printStackTrace();
+ throw new Error("Evaluation error in: "+print_query(e));
+ }
}
/** replace all non-free variables with their reified values */
private final static Tree closure ( Tree e, Environment env, Trees local_vars ) {
- match e {
- case lambda(`x,`b):
- return #<lambda(`x,`(closure(b,env,local_vars.cons(x))))>;
- case apply(`f,...as):
- Trees bs = #[];
- for (Tree a: as)
- bs = bs.append(closure(a,env,local_vars));
- return #<apply(`f,...bs)>;
- case `f(...as):
- Trees bs = #[];
- for (Tree a: as)
- bs = bs.append(closure(a,env,local_vars));
- return #<`f(...bs)>;
- case null: return null;
- case `v:
- if (!v.is_variable())
- fail;
- if (local_vars.member(v))
- fail;
- MRData x = variable_lookup(v.toString(),env);
- if (x != null)
- if (!(x instanceof MR_dataset))
- return reify(x);
- x = lookup_global_binding(v.toString());
- if (x != null)
- if (!(x instanceof MR_dataset))
- return reify(x);
- };
- return e;
+ match e {
+ case lambda(`x,`b):
+ return #<lambda(`x,`(closure(b,env,local_vars.cons(x))))>;
+ case apply(`f,...as):
+ Trees bs = #[];
+ for (Tree a: as)
+ bs = bs.append(closure(a,env,local_vars));
+ return #<apply(`f,...bs)>;
+ case `f(...as):
+ Trees bs = #[];
+ for (Tree a: as)
+ bs = bs.append(closure(a,env,local_vars));
+ return #<`f(...bs)>;
+ case null: return null;
+ case `v:
+ if (!v.is_variable())
+ fail;
+ if (local_vars.member(v))
+ fail;
+ MRData x = variable_lookup(v.toString(),env);
+ if (x != null)
+ if (!(x instanceof MR_dataset))
+ return reify(x);
+ x = lookup_global_binding(v.toString());
+ if (x != null)
+ if (!(x instanceof MR_dataset))
+ return reify(x);
+ };
+ return e;
}
/** replace all non-free variables with their reified values */
final static Tree closure ( Tree e, Environment env ) {
- return closure(e,env,#[]);
+ return closure(e,env,#[]);
}
static Tree query_type;
@@ -734,83 +734,83 @@ public class Interpreter extends TypeInference {
/** translate an MRQL expression e into a physical plan */
final static Tree translate_expression ( Tree e ) {
- try {
- if (Config.trace)
- System.out.println("Query at line "+Main.parser.line_pos()+": "+print_query(e));
- Tree qt = TypeInference.type_inference(e);
- if (!Config.quiet_execution)
- System.out.println("Query type: "+print_type(qt));
- query_type = qt;
- Tree ne = Normalization.remove_groupby(e);
- if (Config.trace)
- System.out.println("After removing group-by:\n"+ne.pretty(0));
- ne = Simplification.rename(ne);
- if (Config.trace)
- System.out.println("After renaming variables:\n"+ne.pretty(0));
- ne = Simplification.rename(Normalization.normalize_all(ne));
- if (Config.trace)
- System.out.println("Normalized query:\n"+ne.pretty(0));
- type_inference(ne);
- ne = QueryPlan.best_plan(ne);
- if (Config.trace)
- System.out.println("Best plan:\n"+ne.pretty(0));
- ne = Simplification.rename(Translator.translate_select(ne));
- if (Config.trace)
- System.out.println("After removing select-queries:\n"+ne.pretty(0));
- type_inference(ne);
- ne = Simplification.simplify_all(ne);
- if (Config.trace)
- System.out.println("Algebra expression:\n"+ne.pretty(0));
- Tree pt = type_inference(ne);
- if (Config.trace)
- System.out.println("Algebraic type: "+print_type(pt));
- ne = AlgebraicOptimization.translate_all(ne);
- if (Config.trace)
- System.out.println("Translated expression:\n"+ne.pretty(0));
- Tree et = TypeInference.type_inference(ne);
- is_dataset = PlanGeneration.is_dataset_expr(ne);
- if (Config.trace)
- System.out.println("Physical plan type: "+print_type(et));
- repeat_variables = #[];
- ne = Simplification.simplify_all(ne);
- Tree plan = PlanGeneration.makePlan(ne);
- if (Config.bsp_mode) {
- BSPTranslator.reset();
- if (Config.trace)
- System.out.println("Physical plan:\n"+plan.pretty(0));
- plan = Materialization.materialize_terms(BSPTranslator.constructBSPplan(plan));
- if (Config.trace)
- System.out.println("BSP plan:\n"+plan.pretty(0));
- else {
- String splan = print_plan(plan,0,false);
- if (!splan.equals("") && !Config.quiet_execution)
- System.out.println("BSP plan:\n"+splan);
- }
- } else {
- if (Config.hadoop_mode)
- plan = PlanGeneration.physical_plan(plan);
- plan = Materialization.materialize_terms(AlgebraicOptimization.common_factoring(plan));
- if (Config.trace)
- System.out.println("Physical plan:\n"+plan.pretty(0));
- else {
- String splan = print_plan(plan,0,false);
- if (!splan.equals("") && !Config.quiet_execution)
- System.out.println("Physical plan:\n"+splan);
- }
- };
- if (Config.compile_functional_arguments)
- plan = Compiler.compile(plan);
- return plan;
- } catch (Error x) {
- if (Config.testing)
- throw new Error(x);
- if (!Config.trace && x.toString().endsWith("Type Error"))
- return null;
- if (x.getMessage() != null) // system error
- System.err.println("*** MRQL System Error at line "+Main.parser.line_pos()+": "+x);
- if (Config.trace)
- x.printStackTrace(System.err);
- return null;
- }
+ try {
+ if (Config.trace)
+ System.out.println("Query at line "+Main.parser.line_pos()+": "+print_query(e));
+ Tree qt = TypeInference.type_inference(e);
+ if (!Config.quiet_execution)
+ System.out.println("Query type: "+print_type(qt));
+ query_type = qt;
+ Tree ne = Normalization.remove_groupby(e);
+ if (Config.trace)
+ System.out.println("After removing group-by:\n"+ne.pretty(0));
+ ne = Simplification.rename(ne);
+ if (Config.trace)
+ System.out.println("After renaming variables:\n"+ne.pretty(0));
+ ne = Simplification.rename(Normalization.normalize_all(ne));
+ if (Config.trace)
+ System.out.println("Normalized query:\n"+ne.pretty(0));
+ type_inference(ne);
+ ne = QueryPlan.best_plan(ne);
+ if (Config.trace)
+ System.out.println("Best plan:\n"+ne.pretty(0));
+ ne = Simplification.rename(Translator.translate_select(ne));
+ if (Config.trace)
+ System.out.println("After removing select-queries:\n"+ne.pretty(0));
+ type_inference(ne);
+ ne = Simplification.simplify_all(ne);
+ if (Config.trace)
+ System.out.println("Algebra expression:\n"+ne.pretty(0));
+ Tree pt = type_inference(ne);
+ if (Config.trace)
+ System.out.println("Algebraic type: "+print_type(pt));
+ ne = AlgebraicOptimization.translate_all(ne);
+ if (Config.trace)
+ System.out.println("Translated expression:\n"+ne.pretty(0));
+ Tree et = TypeInference.type_inference(ne);
+ is_dataset = PlanGeneration.is_dataset_expr(ne);
+ if (Config.trace)
+ System.out.println("Physical plan type: "+print_type(et));
+ repeat_variables = #[];
+ ne = Simplification.simplify_all(ne);
+ Tree plan = PlanGeneration.makePlan(ne);
+ if (Config.bsp_mode) {
+ BSPTranslator.reset();
+ if (Config.trace)
+ System.out.println("Physical plan:\n"+plan.pretty(0));
+ plan = Materialization.materialize_terms(BSPTranslator.constructBSPplan(plan));
+ if (Config.trace)
+ System.out.println("BSP plan:\n"+plan.pretty(0));
+ else {
+ String splan = print_plan(plan,0,false);
+ if (!splan.equals("") && !Config.quiet_execution)
+ System.out.println("BSP plan:\n"+splan);
+ }
+ } else {
+ if (Config.hadoop_mode)
+ plan = PlanGeneration.physical_plan(plan);
+ plan = Materialization.materialize_terms(AlgebraicOptimization.common_factoring(plan));
+ if (Config.trace)
+ System.out.println("Physical plan:\n"+plan.pretty(0));
+ else {
+ String splan = print_plan(plan,0,false);
+ if (!splan.equals("") && !Config.quiet_execution)
+ System.out.println("Physical plan:\n"+splan);
+ }
+ };
+ if (Config.compile_functional_arguments)
+ plan = Compiler.compile(plan);
+ return plan;
+ } catch (Error x) {
+ if (Config.testing)
+ throw new Error(x);
+ if (!Config.trace && x.toString().endsWith("Type Error"))
+ return null;
+ if (x.getMessage() != null) // system error
+ System.err.println("*** MRQL System Error at line "+Main.parser.line_pos()+": "+x);
+ if (Config.trace)
+ x.printStackTrace(System.err);
+ return null;
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/3de9e485/src/main/java/core/Inv.java
----------------------------------------------------------------------
diff --git a/src/main/java/core/Inv.java b/src/main/java/core/Inv.java
index 78253c1..fade0db 100644
--- a/src/main/java/core/Inv.java
+++ b/src/main/java/core/Inv.java
@@ -32,31 +32,31 @@ final public class Inv extends MRData {
public MRData value () { return value; }
final public void write ( DataOutput out ) throws IOException {
- out.writeByte(MRContainer.INV);
- value.write(out);
+ out.writeByte(MRContainer.INV);
+ value.write(out);
}
final public static Inv read ( DataInput in ) throws IOException {
- return new Inv(MRContainer.read(in));
+ return new Inv(MRContainer.read(in));
}
public void readFields ( DataInput in ) throws IOException {
- value.readFields(in);
+ value.readFields(in);
}
public int compareTo ( MRData x ) {
- assert(x instanceof Inv);
- return -value.compareTo(((Inv)x).value);
+ assert(x instanceof Inv);
+ return -value.compareTo(((Inv)x).value);
}
final public static int compare ( byte[] x, int xs, int xl, byte[] y, int ys, int yl, int[] size ) {
- int n = MRContainer.compare(x,xs,xl,y,ys,yl,size);
- size[0] += 1;
- return -n;
+ int n = MRContainer.compare(x,xs,xl,y,ys,yl,size);
+ size[0] += 1;
+ return -n;
}
public boolean equals ( Object x ) {
- return value.equals(((Inv)x).value);
+ return value.equals(((Inv)x).value);
}
public int hashCode () { return value.hashCode(); }
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/3de9e485/src/main/java/core/JsonParser.java
----------------------------------------------------------------------
diff --git a/src/main/java/core/JsonParser.java b/src/main/java/core/JsonParser.java
index c87bcc9..b5376dd 100644
--- a/src/main/java/core/JsonParser.java
+++ b/src/main/java/core/JsonParser.java
@@ -31,60 +31,60 @@ public class JsonParser implements Parser {
JsonSplitter splitter;
public void initialize ( Trees args ) {
- try {
- if (args.length() > 0) {
- if (!(args.nth(0) instanceof Node)
- || !(((Node)args.nth(0)).name().equals("list")
- || ((Node)args.nth(0)).name().equals("bag")))
- throw new Error("Expected a bag of synchronization tagnames in JSON source: "+args.nth(0));
- Trees ts = ((Node)args.nth(0)).children();
- if (ts.length() == 0)
- throw new Error("Expected at least one synchronization tagname in JSON source: "+ts);
- tags = new String[ts.length()];
- for ( int i = 0; i < tags.length; i++ )
- if (ts.nth(i) instanceof StringLeaf)
- tags[i] = ((StringLeaf)(ts.nth(i))).value();
- else throw new Error("Expected a synchronization tagname in JSON source: "+ts.nth(i));
- }
- } catch (Exception e) {
- throw new Error(e);
- }
+ try {
+ if (args.length() > 0) {
+ if (!(args.nth(0) instanceof Node)
+ || !(((Node)args.nth(0)).name().equals("list")
+ || ((Node)args.nth(0)).name().equals("bag")))
+ throw new Error("Expected a bag of synchronization tagnames in JSON source: "+args.nth(0));
+ Trees ts = ((Node)args.nth(0)).children();
+ if (ts.length() == 0)
+ throw new Error("Expected at least one synchronization tagname in JSON source: "+ts);
+ tags = new String[ts.length()];
+ for ( int i = 0; i < tags.length; i++ )
+ if (ts.nth(i) instanceof StringLeaf)
+ tags[i] = ((StringLeaf)(ts.nth(i))).value();
+ else throw new Error("Expected a synchronization tagname in JSON source: "+ts.nth(i));
+ }
+ } catch (Exception e) {
+ throw new Error(e);
+ }
}
public void open ( String file ) {
- try {
- splitter = new JsonSplitter(tags,file,new DataOutputBuffer());
- } catch (Exception e) {
- throw new Error(e);
- }
+ try {
+ splitter = new JsonSplitter(tags,file,new DataOutputBuffer());
+ } catch (Exception e) {
+ throw new Error(e);
+ }
}
public void open ( FSDataInputStream fsin, long start, long end ) {
- try {
- splitter = new JsonSplitter(tags,fsin,start,end,new DataOutputBuffer());
- } catch (Exception e) {
- throw new Error(e);
- }
+ try {
+ splitter = new JsonSplitter(tags,fsin,start,end,new DataOutputBuffer());
+ } catch (Exception e) {
+ throw new Error(e);
+ }
}
public Tree type () { return new VariableLeaf("JSON"); }
public String slice () {
- if (splitter.hasNext()) {
- DataOutputBuffer b = splitter.next();
- return new String(b.getData(),0,b.getLength());
- } else return null;
+ if (splitter.hasNext()) {
+ DataOutputBuffer b = splitter.next();
+ return new String(b.getData(),0,b.getLength());
+ } else return null;
}
public Bag parse ( String s ) {
- try {
- JSONLex scanner = new JSONLex(new StringReader(s));
- JSONParser parser = new JSONParser(scanner);
- parser.parse();
- return new Bag(parser.top_level);
- } catch (Exception e) {
- System.err.println(e);
- return new Bag();
- }
+ try {
+ JSONLex scanner = new JSONLex(new StringReader(s));
+ JSONParser parser = new JSONParser(scanner);
+ parser.parse();
+ return new Bag(parser.top_level);
+ } catch (Exception e) {
+ System.err.println(e);
+ return new Bag();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/3de9e485/src/main/java/core/JsonSplitter.java
----------------------------------------------------------------------
diff --git a/src/main/java/core/JsonSplitter.java b/src/main/java/core/JsonSplitter.java
index 5e158f0..0e76390 100644
--- a/src/main/java/core/JsonSplitter.java
+++ b/src/main/java/core/JsonSplitter.java
@@ -37,100 +37,100 @@ final public class JsonSplitter implements Iterator<DataOutputBuffer> {
final DataOutputBuffer buffer;
JsonSplitter ( String[] tags, FSDataInputStream fsin, long start, long end,
- DataOutputBuffer buffer ) {
- in_memory = false;
- this.tags = tags;
- this.fsin = fsin;
- this.start = start;
- this.end = end;
- this.buffer = buffer;
- scanner = new JSONLex(fsin);
- try {
- fsin.seek(start);
- } catch ( IOException e ) {
- System.err.println("*** Cannot parse the data split: "+fsin);
- }
+ DataOutputBuffer buffer ) {
+ in_memory = false;
+ this.tags = tags;
+ this.fsin = fsin;
+ this.start = start;
+ this.end = end;
+ this.buffer = buffer;
+ scanner = new JSONLex(fsin);
+ try {
+ fsin.seek(start);
+ } catch ( IOException e ) {
+ System.err.println("*** Cannot parse the data split: "+fsin);
+ }
}
JsonSplitter ( String[] tags, String file, DataOutputBuffer buffer ) {
- in_memory = true;
- try {
- in = new FileInputStream(file);
- } catch ( Exception e ) {
- throw new Error("Cannot open the file: "+file);
- };
- this.tags = tags;
- this.buffer = buffer;
- scanner = new JSONLex(in);
+ in_memory = true;
+ try {
+ in = new FileInputStream(file);
+ } catch ( Exception e ) {
+ throw new Error("Cannot open the file: "+file);
+ };
+ this.tags = tags;
+ this.buffer = buffer;
+ scanner = new JSONLex(in);
}
public boolean hasNext () {
- try {
- if (in_memory || fsin.getPos() < end)
- if (skip())
- return store();
- return false;
- } catch (Exception e) {
- System.err.println(e);
- return false;
- }
+ try {
+ if (in_memory || fsin.getPos() < end)
+ if (skip())
+ return store();
+ return false;
+ } catch (Exception e) {
+ System.err.println(e);
+ return false;
+ }
}
public DataOutputBuffer next () {
- return buffer;
+ return buffer;
}
public void remove () { }
boolean is_start_tag ( String tagname ) {
- if (tags == null)
- return true;
- for (String tag: tags)
- if (tag.contentEquals(tagname))
- return true;
- return false;
+ if (tags == null)
+ return true;
+ for (String tag: tags)
+ if (tag.contentEquals(tagname))
+ return true;
+ return false;
}
/** skip until the beginning of a split element */
boolean skip () throws IOException {
- while (true) {
- Symbol s = scanner.next_token();
- if (s.sym == jsym.EOF || (!in_memory && fsin.getPos() >= end))
- return false;
- if (s.sym == jsym.STRING && is_start_tag((String)s.value)) {
- String tag = (String)s.value;
- if (scanner.next_token().sym == jsym.COLON) {
- buffer.reset();
- buffer.write('{');
- buffer.write('\"');
- for ( int i = 0; i < tag.length(); i++ )
- buffer.write(tag.charAt(i));
- buffer.write('\"');
- buffer.write(':');
- return true;
- }
- }
- }
+ while (true) {
+ Symbol s = scanner.next_token();
+ if (s.sym == jsym.EOF || (!in_memory && fsin.getPos() >= end))
+ return false;
+ if (s.sym == jsym.STRING && is_start_tag((String)s.value)) {
+ String tag = (String)s.value;
+ if (scanner.next_token().sym == jsym.COLON) {
+ buffer.reset();
+ buffer.write('{');
+ buffer.write('\"');
+ for ( int i = 0; i < tag.length(); i++ )
+ buffer.write(tag.charAt(i));
+ buffer.write('\"');
+ buffer.write(':');
+ return true;
+ }
+ }
+ }
}
/** store one split element into the buffer; may cross split boundaries */
boolean store () throws IOException {
- int nest = 0;
- while (true) {
- Symbol s = scanner.next_token();
- if (s.sym == jsym.EOF)
- return false;
- if (s.sym == jsym.O_BEGIN || s.sym == jsym.A_BEGIN)
- nest++;
- else if (s.sym == jsym.O_END || s.sym == jsym.A_END)
- nest--;
- String text = scanner.text();
- for ( int i = 0; i < text.length(); i++ )
- buffer.write(text.charAt(i));
- if (nest == 0) {
- buffer.write('}');
- return true;
- }
- }
+ int nest = 0;
+ while (true) {
+ Symbol s = scanner.next_token();
+ if (s.sym == jsym.EOF)
+ return false;
+ if (s.sym == jsym.O_BEGIN || s.sym == jsym.A_BEGIN)
+ nest++;
+ else if (s.sym == jsym.O_END || s.sym == jsym.A_END)
+ nest--;
+ String text = scanner.text();
+ for ( int i = 0; i < text.length(); i++ )
+ buffer.write(text.charAt(i));
+ if (nest == 0) {
+ buffer.write('}');
+ return true;
+ }
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/3de9e485/src/main/java/core/Lambda.java
----------------------------------------------------------------------
diff --git a/src/main/java/core/Lambda.java b/src/main/java/core/Lambda.java
index 6ec8472..d0cfd7a 100644
--- a/src/main/java/core/Lambda.java
+++ b/src/main/java/core/Lambda.java
@@ -35,18 +35,18 @@ final public class Lambda extends MRData {
public Function lambda () { return lambda; }
final public void write ( DataOutput out ) throws IOException {
- throw new Error("Functions are not serializable");
+ throw new Error("Functions are not serializable");
}
public void readFields ( DataInput in ) throws IOException {
- throw new Error("Functions are not serializable");
+ throw new Error("Functions are not serializable");
}
public int compareTo ( MRData x ) {
- throw new Error("Functions cannot be compared");
+ throw new Error("Functions cannot be compared");
}
public boolean equals ( Object x ) {
- throw new Error("Functions cannot be compared");
+ throw new Error("Functions cannot be compared");
}
}
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/3de9e485/src/main/java/core/LineParser.gen
----------------------------------------------------------------------
diff --git a/src/main/java/core/LineParser.gen b/src/main/java/core/LineParser.gen
index 5160002..f4af146 100644
--- a/src/main/java/core/LineParser.gen
+++ b/src/main/java/core/LineParser.gen
@@ -42,188 +42,188 @@ final public class LineParser implements Parser {
int type_length;
static byte[] relational_record ( Tree tp ) {
- match tp {
- case record(...al):
- Trees attrs = #[];
- byte[] types = new byte[al.length()];
- for ( int i = 0; i < types.length; i++ )
- match al.nth(i) {
- case bind(`v,any):
- types[i] = -1;
- if (attrs.member(v))
- TypeInference.type_error(tp,"Duplicate record attribute name: "+v);
- attrs = attrs.append(v);
- case bind(`v,`t):
- if (!t.is_variable())
- fail;
- types[i] = MRContainer.type_code(t.toString());
- if (attrs.member(v))
- TypeInference.type_error(tp,"Duplicate record attribute name: "+v);
- attrs = attrs.append(v);
- if (!MRContainer.basic_type(types[i]))
- TypeInference.error("Expected a basic type for a relational attribute: "+t);
- case `t: TypeInference.error("Expected a basic type for a relational attribute: "
- +TypeInference.print_type(t));
- };
- return types;
- case tuple(...al):
- byte[] types = new byte[al.length()];
- for ( int i = 0; i < types.length; i++ )
- match al.nth(i) {
- case any:
- types[i] = -1;
- case `t:
- if (!t.is_variable())
- fail;
- types[i] = MRContainer.type_code(t.toString());
- if (!MRContainer.basic_type(types[i]))
- TypeInference.error("Expected a basic type for a relational attribute: "+t);
- case `t: TypeInference.error("Expected a basic type for a relational attribute: "
- +TypeInference.print_type(t));
- };
- return types;
- };
- TypeInference.error("Expected a relational record or a tuple type: "
- +TypeInference.print_type(tp));
- return null;
+ match tp {
+ case record(...al):
+ Trees attrs = #[];
+ byte[] types = new byte[al.length()];
+ for ( int i = 0; i < types.length; i++ )
+ match al.nth(i) {
+ case bind(`v,any):
+ types[i] = -1;
+ if (attrs.member(v))
+ TypeInference.type_error(tp,"Duplicate record attribute name: "+v);
+ attrs = attrs.append(v);
+ case bind(`v,`t):
+ if (!t.is_variable())
+ fail;
+ types[i] = MRContainer.type_code(t.toString());
+ if (attrs.member(v))
+ TypeInference.type_error(tp,"Duplicate record attribute name: "+v);
+ attrs = attrs.append(v);
+ if (!MRContainer.basic_type(types[i]))
+ TypeInference.error("Expected a basic type for a relational attribute: "+t);
+ case `t: TypeInference.error("Expected a basic type for a relational attribute: "
+ +TypeInference.print_type(t));
+ };
+ return types;
+ case tuple(...al):
+ byte[] types = new byte[al.length()];
+ for ( int i = 0; i < types.length; i++ )
+ match al.nth(i) {
+ case any:
+ types[i] = -1;
+ case `t:
+ if (!t.is_variable())
+ fail;
+ types[i] = MRContainer.type_code(t.toString());
+ if (!MRContainer.basic_type(types[i]))
+ TypeInference.error("Expected a basic type for a relational attribute: "+t);
+ case `t: TypeInference.error("Expected a basic type for a relational attribute: "
+ +TypeInference.print_type(t));
+ };
+ return types;
+ };
+ TypeInference.error("Expected a relational record or a tuple type: "
+ +TypeInference.print_type(tp));
+ return null;
}
static Tree relational_record_type ( Tree tp ) {
- match tp {
- case record(...al):
- Trees ts = #[];
- for ( Tree a: al )
- match a {
- case bind(_,any): ;
- case `t: ts = ts.append(t);
- };
- return #<record(...ts)>;
- case tuple(...al):
- Trees ts = #[];
- for ( Tree a: al )
- if (!a.equals(#<any>))
- ts = ts.append(a);
- return #<tuple(...ts)>;
- };
- TypeInference.error("Expected a relational record type: "
- +TypeInference.print_type(tp));
- return null;
+ match tp {
+ case record(...al):
+ Trees ts = #[];
+ for ( Tree a: al )
+ match a {
+ case bind(_,any): ;
+ case `t: ts = ts.append(t);
+ };
+ return #<record(...ts)>;
+ case tuple(...al):
+ Trees ts = #[];
+ for ( Tree a: al )
+ if (!a.equals(#<any>))
+ ts = ts.append(a);
+ return #<tuple(...ts)>;
+ };
+ TypeInference.error("Expected a relational record type: "
+ +TypeInference.print_type(tp));
+ return null;
}
public Tree type () {
- return relational_record_type(type);
+ return relational_record_type(type);
}
public void initialize ( Trees args ) {
- if (Config.hadoop_mode && Plan.conf == null)
- Plan.conf = Evaluator.new_configuration();
- if (args.length() != 2)
- throw new Error("The line parser must have two arguments: "+args);
- if (!(args.nth(0) instanceof StringLeaf))
- throw new Error("Expected a delimiter: "+args.nth(0));
- delimiter = ((StringLeaf)args.nth(0)).value();
- if (delimiter.length() == 0)
- throw new Error("Expected a delimiter with at least one character: "+delimiter);
- type = ((Node)args.nth(1)).children().nth(0);
- types = relational_record(type);
- type_length = 0;
- for ( int i = 0; i < types.length; i++ )
- if (types[i] >= 0)
- type_length++;
- if (type_length < 1)
- TypeInference.error("A relational record type must have at least one component: "
- +TypeInference.print_type(type));
+ if (Config.hadoop_mode && Plan.conf == null)
+ Plan.conf = Evaluator.new_configuration();
+ if (args.length() != 2)
+ throw new Error("The line parser must have two arguments: "+args);
+ if (!(args.nth(0) instanceof StringLeaf))
+ throw new Error("Expected a delimiter: "+args.nth(0));
+ delimiter = ((StringLeaf)args.nth(0)).value();
+ if (delimiter.length() == 0)
+ throw new Error("Expected a delimiter with at least one character: "+delimiter);
+ type = ((Node)args.nth(1)).children().nth(0);
+ types = relational_record(type);
+ type_length = 0;
+ for ( int i = 0; i < types.length; i++ )
+ if (types[i] >= 0)
+ type_length++;
+ if (type_length < 1)
+ TypeInference.error("A relational record type must have at least one component: "
+ +TypeInference.print_type(type));
}
public void open ( String file ) {
- in_memory = true;
- try {
- buffered_in = new BufferedReader(new InputStreamReader(new FileInputStream(file)),
- 10000);
- } catch ( Exception e ) {
- throw new Error("Cannot open the file: "+file);
- }
+ in_memory = true;
+ try {
+ buffered_in = new BufferedReader(new InputStreamReader(new FileInputStream(file)),
+ 10000);
+ } catch ( Exception e ) {
+ throw new Error("Cannot open the file: "+file);
+ }
}
public void open ( FSDataInputStream fsin, long fstart, long fend ) {
- in_memory = false;
- this.fsin = fsin;
- start = fstart;
- end = fend;
- line = new Text();
- try {
- if (start != 0) { // for all but the first data split, skip the first record
- --start;
- fsin.seek(start);
- in = new LineReader(fsin,Plan.conf);
- start += in.readLine(new Text(),0,(int) Math.min(Integer.MAX_VALUE,end-start));
- } else in = new LineReader(fsin,Plan.conf);
- pos = start;
- } catch ( IOException e ) {
- System.err.println("*** Cannot parse the data split: "+fsin);
- this.start = end;
- }
+ in_memory = false;
+ this.fsin = fsin;
+ start = fstart;
+ end = fend;
+ line = new Text();
+ try {
+ if (start != 0) { // for all but the first data split, skip the first record
+ --start;
+ fsin.seek(start);
+ in = new LineReader(fsin,Plan.conf);
+ start += in.readLine(new Text(),0,(int) Math.min(Integer.MAX_VALUE,end-start));
+ } else in = new LineReader(fsin,Plan.conf);
+ pos = start;
+ } catch ( IOException e ) {
+ System.err.println("*** Cannot parse the data split: "+fsin);
+ this.start = end;
+ }
}
public String slice () {
- try {
- if (in_memory)
- return buffered_in.readLine();
- while (pos < end) {
- int newSize = in.readLine(line,maxLineLength,
- Math.max((int)Math.min(Integer.MAX_VALUE,end-pos),
- maxLineLength));
- if (newSize == 0)
- return null;
- pos += newSize;
- if (newSize < maxLineLength)
- return line.toString();
- };
- return null;
- } catch ( Exception e ) {
- System.err.println("*** Cannot slice the text: "+e);
- return "";
- }
+ try {
+ if (in_memory)
+ return buffered_in.readLine();
+ while (pos < end) {
+ int newSize = in.readLine(line,maxLineLength,
+ Math.max((int)Math.min(Integer.MAX_VALUE,end-pos),
+ maxLineLength));
+ if (newSize == 0)
+ return null;
+ pos += newSize;
+ if (newSize < maxLineLength)
+ return line.toString();
+ };
+ return null;
+ } catch ( Exception e ) {
+ System.err.println("*** Cannot slice the text: "+e);
+ return "";
+ }
}
private static MRData parse_value ( String text, byte type ) {
- switch (type) {
- case MRContainer.BYTE: return new MR_byte(Byte.parseByte(text));
- case MRContainer.SHORT: return new MR_short(Short.parseShort(text));
- case MRContainer.INT: return new MR_int(Integer.parseInt(text));
- case MRContainer.LONG: return new MR_long(Long.parseLong(text));
- case MRContainer.FLOAT: return new MR_float(Float.parseFloat(text));
- case MRContainer.DOUBLE: return new MR_double(Double.parseDouble(text));
- case MRContainer.CHAR: return new MR_char(text.charAt(0));
- case MRContainer.STRING: return new MR_string(text);
- };
- System.err.println("*** Cannot parse the type "+MRContainer.type_names[type]+" in '"+text+"'");
- return null;
+ switch (type) {
+ case MRContainer.BYTE: return new MR_byte(Byte.parseByte(text));
+ case MRContainer.SHORT: return new MR_short(Short.parseShort(text));
+ case MRContainer.INT: return new MR_int(Integer.parseInt(text));
+ case MRContainer.LONG: return new MR_long(Long.parseLong(text));
+ case MRContainer.FLOAT: return new MR_float(Float.parseFloat(text));
+ case MRContainer.DOUBLE: return new MR_double(Double.parseDouble(text));
+ case MRContainer.CHAR: return new MR_char(text.charAt(0));
+ case MRContainer.STRING: return new MR_string(text);
+ };
+ System.err.println("*** Cannot parse the type "+MRContainer.type_names[type]+" in '"+text+"'");
+ return null;
}
public Bag parse ( String line ) {
try {
- if (line == null)
- return new Bag();
+ if (line == null)
+ return new Bag();
Tuple t = new Tuple(type_length);
- int loc = 0;
- int j = 0;
+ int loc = 0;
+ int j = 0;
for ( int i = 0; i < types.length; i++ ) {
- int k = line.indexOf(delimiter,loc);
- if (types[i] >= 0) {
- String s = (k > 0) ? line.substring(loc,k) : line.substring(loc);
- MRData v = parse_value(s,types[i]);
- if (v == null)
- return new Bag();
- t.set(j++,v);
- };
- loc = k+delimiter.length();
- if (k < 0 && i+1 < types.length) {
- System.err.println("*** Incomplete parsed text line: "+line);
- return new Bag();
- }
- };
- return new Bag(t);
+ int k = line.indexOf(delimiter,loc);
+ if (types[i] >= 0) {
+ String s = (k > 0) ? line.substring(loc,k) : line.substring(loc);
+ MRData v = parse_value(s,types[i]);
+ if (v == null)
+ return new Bag();
+ t.set(j++,v);
+ };
+ loc = k+delimiter.length();
+ if (k < 0 && i+1 < types.length) {
+ System.err.println("*** Incomplete parsed text line: "+line);
+ return new Bag();
+ }
+ };
+ return new Bag(t);
} catch ( Exception e ) {
System.err.println("*** Cannot parse the text line: "+line);
return new Bag();