You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mrql.apache.org by fe...@apache.org on 2014/03/13 15:24:48 UTC
[23/26] MRQL-32: Refactoring directory structure for Eclipse
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/core/src/main/java/org/apache/mrql/Environment.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/Environment.java b/core/src/main/java/org/apache/mrql/Environment.java
new file mode 100644
index 0000000..da8dd84
--- /dev/null
+++ b/core/src/main/java/org/apache/mrql/Environment.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import java.io.*;
+import org.apache.mrql.gen.Tree;
+
+
+/** the run-time environment for in-memory evaluation (binds variables to MRData) */
+final public class Environment implements Serializable {
+ public String name;
+ public MRData value;
+ public Environment next;
+
+ Environment ( String n, MRData v, Environment next ) {
+ name = n;
+ value = v;
+ this.next = next;
+ }
+
+ private void writeObject(ObjectOutputStream out) throws IOException {
+ out.defaultWriteObject();
+ }
+
+ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+ in.defaultReadObject();
+ name = Tree.add(name);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/core/src/main/java/org/apache/mrql/Evaluator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/Evaluator.java b/core/src/main/java/org/apache/mrql/Evaluator.java
new file mode 100644
index 0000000..28d356f
--- /dev/null
+++ b/core/src/main/java/org/apache/mrql/Evaluator.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import java_cup.runtime.*;
+import org.apache.mrql.gen.*;
+import java.io.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+
+
+/** Evaluates physical plans using one of the evaluation engines */
+abstract public class Evaluator extends Interpreter {
+
+ /** the current MRQL evaluator */
+ public static Evaluator evaluator;
+
+ /** initialize the evaluator */
+ abstract public void init ( Configuration conf );
+
+ /** shutdown the evaluator */
+ abstract public void shutdown ( Configuration conf );
+
+ /** initialize the query evaluation */
+ abstract public void initialize_query ();
+
+ /** create a new evaluation configuration */
+ abstract public Configuration new_configuration ();
+
+ /** synchronize peers in BSP mode */
+ public MR_bool synchronize ( MR_string peerName, MR_bool mr_exit ) {
+ throw new Error("You can only synchronize BSP tasks");
+ }
+
+ /** distribute a bag among peers in BSP mode */
+ public Bag distribute ( MR_string peerName, Bag s ) {
+ throw new Error("You can only distribute bags among BSP tasks");
+ }
+
+ /** run a BSP task */
+ public MRData bsp ( Tree plan, Environment env ) throws Exception {
+ throw new Error("You can only run a BSP task in BSP mode");
+ }
+
+ /** return the FileInputFormat for parsed files (CSV, XML, JSON, etc) */
+ abstract public Class<? extends MRQLFileInputFormat> parsedInputFormat ();
+
+ /** return the FileInputFormat for binary files */
+ abstract public Class<? extends MRQLFileInputFormat> binaryInputFormat ();
+
+ /** return the FileInputFormat for data generator files */
+ abstract public Class<? extends MRQLFileInputFormat> generatorInputFormat ();
+
+ /** The Aggregate physical operator
+ * @param acc_fnc the accumulator function from (T,T) to T
+ * @param zero the zero element of type T
+ * @param plan the plan that constructs the dataset that contains the bag of values {T}
+ * @param env contains bindings fro variables to values (MRData)
+ * @return the aggregation result of type T
+ */
+ abstract public MRData aggregate ( Tree acc_fnc,
+ Tree zero,
+ Tree plan,
+ Environment env ) throws Exception;
+
+ /** Evaluate a loop a fixed number of times */
+ abstract public Tuple loop ( Tree e, Environment env ) throws Exception;
+
+ /** Evaluate a MRQL physical plan and print tracing info
+ * @param e the physical plan
+ * @param env contains bindings fro variables to values (MRData)
+ * @return a DataSet (stored in HDFS)
+ */
+ abstract public DataSet eval ( final Tree e,
+ final Environment env,
+ final String counter );
+
+ final static MR_long counter_key = new MR_long(0);
+ final static MRContainer counter_container = new MRContainer(counter_key);
+ final static MRContainer value_container = new MRContainer(new MR_int(0));
+
+ /** dump MRQL data into a sequence file */
+ public void dump ( String file, Tree type, MRData data ) throws Exception {
+ Path path = new Path(file);
+ FileSystem fs = path.getFileSystem(Plan.conf);
+ PrintStream ftp = new PrintStream(fs.create(path.suffix(".type")));
+ ftp.print("2@"+type.toString()+"\n");
+ ftp.close();
+ SequenceFile.Writer writer
+ = new SequenceFile.Writer(fs,Plan.conf,path,
+ MRContainer.class,MRContainer.class);
+ if (data instanceof MR_dataset)
+ data = Plan.collect(((MR_dataset)data).dataset());
+ if (data instanceof Bag) {
+ Bag s = (Bag)data;
+ long i = 0;
+ for ( MRData e: s ) {
+ counter_key.set(i++);
+ value_container.set(e);
+ writer.append(counter_container,value_container);
+ }
+ } else {
+ counter_key.set(0);
+ value_container.set(data);
+ writer.append(counter_container,value_container);
+ };
+ writer.close();
+ }
+
+ /** dump MRQL data into a text CVS file */
+ public void dump_text ( String file, Tree type, MRData data ) throws Exception {
+ int ps = Config.max_bag_size_print;
+ Config.max_bag_size_print = -1;
+ final PrintStream out = (Config.hadoop_mode)
+ ? Plan.print_stream(file)
+ : new PrintStream(file);
+ if (data instanceof MR_dataset)
+ data = Plan.collect(((MR_dataset)data).dataset());
+ if (Translator.collection_type(type)) {
+ Tree tp = ((Node)type).children().head();
+ if (tp instanceof Node && ((Node)tp).name().equals("tuple")) {
+ Trees ts = ((Node)tp).children();
+ for ( MRData x: (Bag)data ) {
+ Tuple t = (Tuple)x;
+ out.print(print(t.get((short)0),ts.nth(0)));
+ for ( short i = 1; i < t.size(); i++ )
+ out.print(","+print(t.get(i),ts.nth(i)));
+ out.println();
+ }
+ } else for ( MRData x: (Bag)data )
+ out.println(print(x,tp));
+ } else out.println(print(data,query_type));
+ Config.max_bag_size_print = ps;
+ out.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/core/src/main/java/org/apache/mrql/Function.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/Function.java b/core/src/main/java/org/apache/mrql/Function.java
new file mode 100644
index 0000000..1f1d4e1
--- /dev/null
+++ b/core/src/main/java/org/apache/mrql/Function.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import java.io.*;
+
+
+/**
+ * An anonymous function from MRData to MRData (a lambda abstraction)
+ * Must provide a concrete implementation for eval (the lambda body)
+ */
+abstract public class Function implements Serializable {
+ /**
+ * Evaluate the anonymous function from MRData to MRData
+ * @param arg the operand to be evaluated
+ * @return the result of evaluating
+ */
+ abstract public MRData eval ( final MRData arg );
+}
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/core/src/main/java/org/apache/mrql/GeneratorDataSource.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/GeneratorDataSource.java b/core/src/main/java/org/apache/mrql/GeneratorDataSource.java
new file mode 100644
index 0000000..380f50d
--- /dev/null
+++ b/core/src/main/java/org/apache/mrql/GeneratorDataSource.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import java.io.IOException;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.conf.Configuration;
+
+
+/** A DataSource used for processing the range min..max */
+final public class GeneratorDataSource extends DataSource {
+ GeneratorDataSource ( int source_num, String path, Configuration conf ) {
+ super(source_num,path,Evaluator.evaluator.generatorInputFormat(),conf);
+ }
+
+ GeneratorDataSource ( String path, Configuration conf ) {
+ super(-1,path,Evaluator.evaluator.generatorInputFormat(),conf);
+ }
+
+ public static long size ( Path path, Configuration conf ) throws IOException {
+ // each file generates range_split_size long integers
+ FileStatus s = path.getFileSystem(conf).getFileStatus(path);
+ if (!s.isDir())
+ return Config.range_split_size*8;
+ long size = 0;
+ for ( FileStatus fs: path.getFileSystem(conf).listStatus(path) )
+ size += Config.range_split_size*8;
+ return size;
+ }
+
+ public String toString () {
+ return "Generator"+separator+source_num+separator+path;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/core/src/main/java/org/apache/mrql/Interpreter.gen
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/Interpreter.gen b/core/src/main/java/org/apache/mrql/Interpreter.gen
new file mode 100644
index 0000000..97c76ce
--- /dev/null
+++ b/core/src/main/java/org/apache/mrql/Interpreter.gen
@@ -0,0 +1,833 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import org.apache.mrql.gen.*;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.io.PrintStream;
+
+
+/** The MRQL interpreter */
+public class Interpreter extends TypeInference {
+
+ public final static Tree identity_mapper = #<lambda(x,bag(x))>;
+
+ protected static Environment global_env = null;
+
+ /** retrieve variable binding */
+ public final static MRData variable_lookup ( final String v, final Environment environment ) {
+ for ( Environment env = environment; env != null; env = env.next ) {
+ if (v.equals(env.name))
+ return env.value;
+ };
+ return null;
+ }
+
+ /** insert a new global variable binding */
+ public final static void new_global_binding ( final String var, final MRData value ) {
+ if (value instanceof Bag)
+ ((Bag)value).materialize();
+ global_env = new Environment(var,value,global_env);
+ }
+
+ /** remove a global variable binding */
+ public static void remove_global_binding ( String v ) {
+ if (global_env == null)
+ return;
+ for ( Environment env = global_env; env.next != null; env = env.next )
+ if (v.equals(env.next.name))
+ env.next = env.next.next;
+ if (global_env.name == v)
+ global_env = global_env.next;
+ }
+
+ /** retrieve a global variable binding */
+ public static MRData lookup_global_binding ( String v ) {
+ for ( Environment env = global_env; env != null; env = env.next )
+ if (v.equals(env.name))
+ return env.value;
+ return null;
+ }
+
+ public static void set_global_bindings ( Environment env ) {
+ global_env = env;
+ }
+
+ final static int coerce_method = ClassImporter.find_method_number("coerce",#[any,int]);
+
+ /** untyped reify: not type-correct but will not crash the run-time system */
+ private final static Tree reify ( final MRData x ) {
+ if (x instanceof Bag) {
+ Bag b = (Bag)x;
+ Trees as = #[];
+ for ( MRData e: b)
+ as = as.append(reify(e));
+ return #<list(...as)>;
+ } else if (x instanceof Tuple) {
+ Tuple t = (Tuple)x;
+ Trees as = #[];
+ for ( short i = 0; i < t.size(); i++ )
+ as = as.append(reify(t.get(i)));
+ return #<tuple(...as)>;
+ } else if (x instanceof MR_string)
+ return new StringLeaf(((MR_string)x).get());
+ else if (x instanceof MR_short)
+ return #<callM(coerce,`coerce_method,`(((MR_short)x).get()),`(MRContainer.SHORT))>;
+ else if (x instanceof MR_int)
+ return #<`(((MR_int)x).get())>;
+ else if (x instanceof MR_long)
+ return #<callM(coerce,`coerce_method,`((int)((MR_long)x).get()),`(MRContainer.LONG))>;
+ else if (x instanceof MR_float)
+ return #<`(((MR_float)x).get())>;
+ else if (x instanceof MR_double)
+ return #<callM(coerce,`coerce_method,`((float)(((MR_double)x).get())),`(MRContainer.DOUBLE))>;
+ throw new Error("wrong MRData: "+x);
+ }
+
+ /** evaluate an MRQL function in memory */
+ private final static Function evalf ( final String v,
+ final Tree body,
+ final Environment env ) {
+ return new Function() {
+ final public MRData eval ( final MRData x ) {
+ return evalE(body,new Environment(v,x,env));
+ }
+ };
+ }
+
+ /** evaluate an MRQL function in memory */
+ public final static Function evalF ( Tree fnc, Environment env ) {
+ match fnc {
+ case compiled(`ln,`lm,...vars):
+ try {
+ return Compiler.compiled(Thread.currentThread().getContextClassLoader(),ln.toString());
+ } catch (Exception ex) {
+ System.err.println("*** Unable to retrieve the compiled lambda: "+fnc);
+ return ((Lambda) evalE(lm)).lambda();
+ }
+ case lambda(`v,`b):
+ return evalf(v.toString(),b,env);
+ case function(tuple(...params),`tp,`body):
+ String[] as = new String[params.length()];
+ int i = 0;
+ for ( Tree param: params )
+ match param {
+ case `bind(`v,_):
+ as[i++] = v.toString();
+ };
+ return evalT(as,body,env);
+ };
+ throw new Error("Ill-formed lambda: "+fnc);
+ }
+
+ /** evaluate an MRQL function in memory */
+ private final static Function evalT ( final String[] params,
+ final Tree body,
+ final Environment env ) {
+ return new Function() {
+ final public MRData eval ( final MRData x ) {
+ Environment new_env = env;
+ for ( int i = 0; i < params.length; i++ )
+ new_env = new Environment(params[i],((Tuple)x).get(i),new_env);
+ return evalE(body,new_env);
+ }
+ };
+ }
+
+ final static String true_name = #<true>.toString();
+ final static String false_name = #<false>.toString();
+ final static String null_name = #<null>.toString();
+ final static MRData null_value = new Tuple(0);
+ final static MRData true_value = new MR_bool(true);
+ final static MRData false_value = new MR_bool(false);
+
+ static int tab_count = -3;
+
+ public static String tabs ( int n ) {
+ StringBuffer b = new StringBuffer();
+ for ( int i = 0; i < n; i++)
+ b.append(' ');
+ return b.toString();
+ }
+
+ /** evaluate an MRQL expression in memory and print tracing info */
+ final static MRData evalE ( final Tree e, final Environment env ) {
+ if (Config.trace_exp_execution) {
+ tab_count += 3;
+ System.out.println(tabs(tab_count)+print_query(e));
+ };
+ MRData res = evalEE(e,env);
+ if (Config.trace_exp_execution) {
+ System.out.println(tabs(tab_count)+"-> "+res);
+ tab_count -= 3;
+ };
+ return res;
+ }
+
+ /** evaluate an MRQL expression in memory */
+ private final static MRData evalEE ( final Tree e, final Environment env ) {
+ try {
+ if (e.is_variable()) {
+ String v = e.toString();
+ if (v == true_name)
+ return true_value;
+ else if (v == false_name)
+ return false_value;
+ else if (v == null_name)
+ return null_value;
+ MRData x = variable_lookup(v,env);
+ if (x != null)
+ return x;
+ x = lookup_global_binding(v);
+ if (x == null)
+ throw new Error("Variable "+v+" is not bound");
+ return x;
+ } else if (e.is_long())
+ return new MR_int((int)e.longValue());
+ else if (e.is_double())
+ return new MR_float((float)e.doubleValue());
+ else if (e.is_string())
+ return new MR_string(e.stringValue());
+ match e {
+ case callM(and,_,`x,`y): // lazy
+ return (((MR_bool)evalE(x,env)).get()) ? evalE(y,env) : false_value;
+ case callM(or,_,`x,`y):
+ return (((MR_bool)evalE(x,env)).get()) ? true_value : evalE(y,env);
+ case callM(`f,`n,...args): // internal function call
+ MRData[] as = new MRData[args.length()];
+ for ( int i = 0; i < args.length(); i++ )
+ as[i] = evalE(args.nth(i),env);
+ return ClassImporter.call((int)n.longValue(),as);
+ case compiled(`ln,_,...vars):
+ return new Lambda(Compiler.compiled(Thread.currentThread().getContextClassLoader(),ln.toString()));
+ case lambda(`v,`body):
+ return new Lambda(evalf(v.toString(),body,env));
+ case nth(`x,`n):
+ return ((Tuple)evalE(x,env)).get((int)n.longValue());
+ case setNth(`x,`n,`v,`ret):
+ return ((Tuple)evalE(x,env)).set((int)n.longValue(),evalE(v,env),evalE(ret,env));
+ case materialize(`u):
+ return MapReduceAlgebra.materialize(evalE(u,env));
+ case let(`v,`u,`body):
+ MRData x = evalE(u,env);
+ if (x instanceof Bag)
+ ((Bag)x).materialize();
+ return evalE(body,new Environment(v.toString(),x,env));
+ case cmap(`f,`s):
+ return MapReduceAlgebra.cmap(evalF(f,env),(Bag)evalE(s,env));
+ case filter(`p,`m,`s):
+ return MapReduceAlgebra.filter(evalF(p,env),evalF(m,env),(Bag)evalE(s,env));
+ case map(`m,`s):
+ return MapReduceAlgebra.map(evalF(m,env),(Bag)evalE(s,env));
+ case repeat(lambda(`v,`b),`s,`n):
+ final String nm = v.toString();
+ final Tree body = b;
+ if (Config.hadoop_mode) {
+ Function loop_fnc = new Function () {
+ public MRData eval ( MRData s ) {
+ new_global_binding(nm,s);
+ MRData val = new MR_dataset(Evaluator.evaluator.eval(body,new Environment(nm,s,env),nm));
+ return val;
+ }; };
+ return MapReduceAlgebra.repeat(loop_fnc,(Bag)evalE(s,env),((MR_int)evalE(n,env)).get());
+ } else {
+ Function loop_fnc = new Function () {
+ public MRData eval ( MRData s ) {
+ new_global_binding(nm,s);
+ return evalM(body,new Environment(nm,s,env));
+ }; };
+ return MapReduceAlgebra.repeat(loop_fnc,(Bag)evalE(s,env),((MR_int)evalE(n,env)).get());
+ }
+ case repeat(`lm,`s,`n):
+ return MapReduceAlgebra.repeat(evalF(lm,env),(Bag)evalE(s,env),((MR_int)evalE(n,env)).get());
+ case range(`min,`max):
+ return MapReduceAlgebra.generator(((MR_long)evalE(min,env)).get(),
+ ((MR_long)evalE(max,env)).get());
+ case call(`f,...args):
+ Tuple t = new Tuple(args.length());
+ int i = 0;
+ for ( Tree a: args )
+ t.set(i++,evalE(a,env));
+ return evalF(f,env).eval(t);
+ case tuple(`x,`y):
+ return new Tuple(evalE(x,env),evalE(y,env));
+ case tuple(`x,`y,`z):
+ return new Tuple(evalE(x,env),evalE(y,env),evalE(z,env));
+ case tuple(...el):
+ Tuple t = new Tuple(el.length());
+ int i = 0;
+ for ( Tree a: el )
+ t.set(i++,evalE(a,env));
+ return t;
+ case tagged_union(`n,`u):
+ return new Union((byte)n.longValue(),evalE(u,env));
+ case union_value(`x):
+ return ((Union)evalE(x,env)).value();
+ case union_tag(`x):
+ return new MR_int(((Union)evalE(x,env)).tag());
+ // used for shortcutting sync in bsp supersteps
+ case BAG():
+ return SystemFunctions.bsp_empty_bag;
+ case TRUE():
+ return SystemFunctions.bsp_true_value;
+ case FALSE():
+ return SystemFunctions.bsp_false_value;
+ case `T(...el):
+ if (!is_collection(T))
+ fail;
+ if (el.is_empty())
+ return new Bag();
+ Bag b = new Bag(el.length());
+ for ( Tree a: el )
+ b.add(evalE(a,env));
+ return b;
+ case if(`c,`x,`y):
+ if (((MR_bool)evalE(c,env)).get())
+ return evalE(x,env);
+ else return evalE(y,env);
+ case Collect(`s):
+ try {
+ if (Config.hadoop_mode)
+ return Plan.collect(Evaluator.evaluator.eval(s,env,"-"));
+ Bag b = evalS(s,env);
+ b.materialize();
+ return b;
+ } catch (Exception ex) { throw new Error(ex); }
+ case dataset_size(`x):
+ return new MR_long(Plan.size(Evaluator.evaluator.eval(x,env,"-")) / (1024*1024));
+ case synchronize(`peer,`b):
+ return Evaluator.evaluator.synchronize(((MR_string)evalE(peer,env)),(MR_bool)evalE(b,env));
+ case distribute(`peer,`s):
+ return Evaluator.evaluator.distribute(((MR_string)evalE(peer,env)),(Bag)evalE(s,env));
+ case mapReduce(`m,`r,`s,_):
+ return MapReduceAlgebra.mapReduce(evalF(m,env),
+ evalF(r,env),
+ (Bag)evalE(s,env));
+ case mapReduce2(`mx,`my,`r,`x,`y,_):
+ return MapReduceAlgebra.mapReduce2(
+ evalF(mx,env),
+ evalF(my,env),
+ evalF(r,env),
+ (Bag)evalE(x,env),
+ (Bag)evalE(y,env));
+ case mapJoin(`kx,`ky,`r,`x,`y):
+ return MapReduceAlgebra.mapJoin(
+ evalF(kx,env),
+ evalF(ky,env),
+ evalF(r,env),
+ (Bag)evalE(x,env),
+ (Bag)evalE(y,env));
+ case crossProduct(`mx,`my,`r,`x,`y):
+ return MapReduceAlgebra.crossProduct(
+ evalF(mx,env),
+ evalF(my,env),
+ evalF(r,env),
+ (Bag)evalE(x,env),
+ (Bag)evalE(y,env));
+ case groupBy(`s):
+ return MapReduceAlgebra.groupBy((Bag)evalE(s,env));
+ case orderBy(`s):
+ return MapReduceAlgebra.groupBy((Bag)evalE(s,env));
+ case index(`x,`n):
+ MRData xv = evalE(x,env);
+ MRData nv = evalE(n,env);
+ final int k = (int)((MR_int)nv).get();
+ if (k < 0)
+ throw new Error("Negative list index: "+k);
+ if (xv instanceof MR_dataset) {
+ List<MRData> res = ((MR_dataset)xv).dataset().take(k+1);
+ if (k >= res.size())
+ throw new Error("List index out of bounds: "+k);
+ return res.get(k);
+ };
+ Bag b = (Bag)xv;
+ return b.get(k);
+ case range(`x,`i,`j):
+ MRData xv = evalE(x,env);
+ MRData ni = evalE(i,env);
+ MRData nj = evalE(j,env);
+ int ki = (int)((MR_int)ni).get();
+ int kj = (int)((MR_int)nj).get();
+ if (ki < 0 || kj < ki)
+ throw new Error("Wrong list range: ["+ki+","+kj+"]");
+ Iterator<MRData> it = (xv instanceof MR_dataset)
+ ? ((MR_dataset)xv).dataset().take(kj+1).iterator()
+ : ((Bag)xv).iterator();
+ Bag s = new Bag();
+ for ( int n = 0; it.hasNext() && n < ki; n++ )
+ it.next();
+ for ( int n = ki; it.hasNext() && n <= kj; n++ )
+ s.add(it.next());
+ return s;
+ case map_index(`x,`key):
+ MRData xv = evalE(x,env);
+ final MRData nk = evalE(key,env);
+ if (xv instanceof MR_dataset) {
+ xv = ((MR_dataset)xv).dataset().reduce(new Tuple(),new Function() {
+ public MRData eval ( MRData value ) {
+ Tuple p = (Tuple)value;
+ Tuple y = (Tuple)p.second();
+ return (y.first().equals(nk)) ? y.second() : p.first();
+ }
+ });
+ if (xv instanceof Tuple && ((Tuple)xv).size() == 0)
+ throw new Error("Map key not found: "+nk);
+ return xv;
+ };
+ return ((Bag)xv).map_find(nk);
+ case aggregate(`acc,`zero,`s):
+ return MapReduceAlgebra.aggregate(evalF(acc,env),evalE(zero,env),
+ (Bag)evalE(s,env));
+ case Aggregate(`acc,`zero,`s):
+ if (Config.hadoop_mode)
+ return Evaluator.evaluator.aggregate(closure(acc,env),zero,s,env);
+ else return MapReduceAlgebra.aggregate(evalF(acc,env),evalE(zero,env),evalM(s,env));
+ case mergeGroupByJoin(`kx,`ky,`gx,`gy,`m,`c,`r,`x,`y,`o):
+ return MapReduceAlgebra.mergeGroupByJoin(evalF(kx,env),evalF(ky,env),evalF(gx,env),evalF(gy,env),
+ evalF(m,env),evalF(c,env),evalF(r,env),
+ (Bag)evalE(x,env),(Bag)evalE(y,env));
+ case BSP(tuple(...ns),`superstep,`state,`o,...as):
+ if (Config.hadoop_mode)
+ return Evaluator.evaluator.bsp(e,env);
+ Bag[] ds = new Bag[as.length()];
+ for ( int i = 0; i < ds.length; i++ )
+ ds[i] = evalM(as.nth(i),env);
+ int[] nn = new int[ns.length()];
+ for ( int i = 0; i < ns.length(); i++ )
+ nn[i] = (int)((LongLeaf)ns.nth(i)).value();
+ return MapReduceAlgebra.BSP(nn,
+ evalF(superstep,env),
+ evalE(state,env),
+ o.equals(#<true>),
+ ds);
+ case BSP(`n,`superstep,`state,`o,...as):
+ if (Config.hadoop_mode)
+ return Evaluator.evaluator.bsp(e,env);
+ Bag[] ds = new Bag[as.length()];
+ for ( int i = 0; i < ds.length; i++ )
+ ds[i] = evalM(as.nth(i),env);
+ return MapReduceAlgebra.BSP(new int[]{(int)((LongLeaf)n).value()},
+ evalF(superstep,env),
+ evalE(state,env),
+ o.equals(#<true>),
+ ds);
+ case Loop(lambda(tuple(...vs),tuple(...bs)),tuple(...ss),`num):
+ if (Config.hadoop_mode)
+ return Evaluator.evaluator.loop(e,env);
+ int limit = ((MR_int)evalE(num,env)).get();
+ Bag[] s = new Bag[vs.length()];
+ for ( int i = 0; i < vs.length(); i++ )
+ s[i] = evalM(ss.nth(i),env);
+ for ( int n = 0; n < limit; n++ ) {
+ Environment nenv = env;
+ for ( int i = 0; i < vs.length(); i ++ ) {
+ s[i].materialize();
+ nenv = new Environment(vs.nth(i).toString(),s[i],nenv);
+ };
+ for ( int i = 0; i < vs.length(); i ++ )
+ s[i] = (Bag)evalM(bs.nth(i),nenv);
+ };
+ return new Tuple(s);
+ case function(tuple(...params),`tp,`body):
+ String[] as = new String[params.length()];
+ int i = 0;
+ for ( Tree param: params )
+ match param {
+ case `bind(`v,_):
+ as[i++] = v.toString();
+ };
+ return new Lambda(evalT(as,body,env));
+ case typed(`x,_):
+ return evalE(x,env);
+ case apply(`f,`arg):
+ if (!f.is_variable())
+ return evalF(f,env).eval(evalE(arg,env));
+ MRData fnc = lookup_global_binding(f.toString());
+ if (fnc == null) {
+ String s = Plan.conf.get("mrql.global."+f);
+ if (s != null)
+ try {
+ Tree ft = Tree.parse(s);
+ TopLevel.store(f.toString(),ft);
+ fnc = evalE(ft,env);
+ new_global_binding(f.toString(),fnc);
+ } catch (Exception ex) {
+ throw new Error(ex);
+ }
+ };
+ MRData t = evalE(arg,env);
+ if (!(t instanceof Tuple))
+ throw new Error("Expected a tuple in function application: "+t);
+ return ((Lambda)fnc).lambda().eval(t);
+ case trace(`x):
+ MRData z = evalE(x,env);
+ System.err.println("*** "+x+": "+z);
+ return z;
+ case _:
+ try {
+ if (Config.hadoop_mode)
+ return new MR_dataset(Evaluator.evaluator.eval(e,env,"-"));
+ else return evalS(e,env);
+ } catch (Exception ex) { throw new Error(ex); }
+ };
+ throw new Error("Cannot evaluate the expression: "+e);
+ } catch (Error msg) {
+ if (!Config.trace)
+ throw new Error(msg.getMessage());
+ System.err.println(msg.getMessage());
+ msg.printStackTrace();
+ throw new Error("Evaluation error in: "+print_query(e));
+ } catch (Exception ex) {
+ if (Config.trace) {
+ System.err.println(ex.getMessage());
+ ex.printStackTrace();
+ }
+ throw new Error("Evaluation error in: "+print_query(e));
+ }
+ }
+
+ /** evaluate an MRQL expression in memory */
+ final static MRData evalE ( final Tree e ) {
+ return evalE(e,null);
+ }
+
+ /** evaluate MRQL physical operators in memory (returns a Bag) */
+ final static Bag evalS ( final Tree e, final Environment env ) {
+ return evalM(e,env);
+ }
+
+ /** evaluate MRQL physical operators in memory (returns a Bag) */
+ final static Bag evalM ( final Tree e, final Environment env ) {
+ if (Config.trace_execution) {
+ tab_count += 3;
+ System.out.println(tabs(tab_count)+print_query(e));
+ };
+ Bag res = evalMM(e,env);
+ if (Config.trace_execution) {
+ System.out.println(tabs(tab_count)+"-> "+res);
+ tab_count -= 3;
+ };
+ return res;
+ }
+
+ /** evaluate MRQL physical operators in memory (returns a Bag) */
+ final static Bag evalMM ( final Tree e, final Environment env ) {
+ try {
+ match e {
+ case cMap(`f,`s):
+ return MapReduceAlgebra.cmap(evalF(f,env),evalM(s,env));
+ case AggregateMap(`f,`acc,`zero,`s):
+ return new Bag(MapReduceAlgebra.aggregate(evalF(acc,env),evalE(zero,env),
+ evalM(#<cMap(`f,`s)>,env)));
+ case MapReduce(`m,`r,`s,_):
+ return MapReduceAlgebra.mapReduce(
+ evalF(m,env),
+ evalF(r,env),
+ evalM(s,env));
+ case MapAggregateReduce(`m,`r,`acc,`zero,`s,_):
+ return new Bag(MapReduceAlgebra.aggregate(evalF(acc,env),evalE(zero,env),
+ evalM(#<MapReduce(`m,`r,`s,false)>,env)));
+ case MapCombineReduce(`m,`c,`r,`s,_):
+ return MapReduceAlgebra.mapReduce(
+ evalF(m,env),
+ evalF(r,env),
+ evalM(s,env));
+ case MapReduce2(`mx,`my,`r,`x,`y,_):
+ return MapReduceAlgebra.mapReduce2(
+ evalF(mx,env),
+ evalF(my,env),
+ evalF(r,env),
+ evalM(x,env),
+ evalM(y,env));
+ case MapCombineReduce2(`mx,`my,`c,`r,`x,`y,_):
+ return MapReduceAlgebra.mapReduce2(
+ evalF(mx,env),
+ evalF(my,env),
+ evalF(r,env),
+ evalM(x,env),
+ evalM(y,env));
+ case MapAggregateReduce2(`mx,`my,`r,`acc,`zero,`x,`y,_):
+ return new Bag(MapReduceAlgebra.aggregate(evalF(acc,env),evalE(zero,env),
+ evalM(#< MapReduce2(`mx,`my,`r,`x,`y,false)>,env)));
+ case MapJoin(`kx,`ky,`r,`x,`y):
+ return MapReduceAlgebra.mapJoin(
+ evalF(kx,env),
+ evalF(ky,env),
+ evalF(r,env),
+ evalM(x,env),
+ evalM(y,env));
+ case MapAggregateJoin(`kx,`ky,`r,`acc,`zero,`x,`y):
+ return new Bag(MapReduceAlgebra.aggregate(evalF(acc,env),evalE(zero,env),
+ evalM(#<MapJoin(`kx,`ky,`r,`x,`y)>,env)));
+ case GroupByJoin(`kx,`ky,`gx,`gy,`m,`c,`r,`x,`y,`o):
+ return MapReduceAlgebra.groupByJoin(
+ evalF(kx,env),
+ evalF(ky,env),
+ evalF(gx,env),
+ evalF(gy,env),
+ evalF(m,env),
+ evalF(c,env),
+ evalF(r,env),
+ evalM(x,env),
+ evalM(y,env));
+ case CrossProduct(`mx,`my,`r,`x,`y):
+ return MapReduceAlgebra.crossProduct(
+ evalF(mx,env),
+ evalF(my,env),
+ evalF(r,env),
+ evalM(x,env),
+ evalM(y,env));
+ case CrossAggregateProduct(`mx,`my,`r,`acc,`zero,`x,`y):
+ return new Bag(MapReduceAlgebra.aggregate(evalF(acc,env),evalE(zero,env),
+ evalM(#<CrossProduct(`mx,`my,`r,`x,`y)>,env)));
+ case BinarySource(`file,_):
+ return (Bag)MapReduceAlgebra.read_binary(file.stringValue());
+ case BSPSource(`n,BinarySource(`file,_)):
+ return (Bag)MapReduceAlgebra.read_binary((int)((LongLeaf)n).value(),
+ file.stringValue());
+ case BSPSource(`n,ParsedSource(`parser,`file,...args)):
+ if (!(n instanceof LongLeaf))
+ fail;
+ Parser p = DataSource.parserDirectory.get(parser.toString()).newInstance();
+ if (p == null)
+ throw new Error("Unknown parser: "+parser);
+ return MapReduceAlgebra.parsedSource((int)(((LongLeaf)n).value()),p,
+ ((MR_string)evalE(file,env)).get(),args);
+ case ParsedSource(`parser,`file,...args):
+ Parser p = DataSource.parserDirectory.get(parser.toString()).newInstance();
+ if (p == null)
+ throw new Error("Unknown parser: "+parser);
+ return MapReduceAlgebra.parsedSource(p,((MR_string)evalE(file,env)).get(),args);
+ case Merge(`x,`y):
+ return evalM(x,env).union(evalM(y,env));
+ case Repeat(lambda(`v,`b),`s,`n):
+ final String vs = v.toString();
+ final Tree body = b;
+ Function loop = new Function() {
+ final public MRData eval ( final MRData x ) {
+ return evalM(body,new Environment(vs,x,env));
+ }
+ };
+ return MapReduceAlgebra.repeat(loop,(Bag)evalM(s,env),((MR_int)evalE(n,env)).get());
+ case Closure(lambda(`v,`b),`s,`n):
+ final String vs = v.toString();
+ final Tree body = b;
+ Function loop = new Function() {
+ final public MRData eval ( final MRData x ) {
+ return evalM(body,new Environment(vs,x,env));
+ }
+ };
+ return MapReduceAlgebra.closure(loop,(Bag)evalM(s,env),((MR_int)evalE(n,env)).get());
+ case Generator(`min,`max,`size):
+ return MapReduceAlgebra.generator(((MR_long)evalE(min,env)).get(),
+ ((MR_long)evalE(max,env)).get());
+ case BSPSource(`n,Generator(`min,`max,`size)):
+ return MapReduceAlgebra.generator((int)((LongLeaf)n).value(),
+ ((MR_long)evalE(min,env)).get(),
+ ((MR_long)evalE(max,env)).get());
+ case Dump(`s):
+ Bag bs = (Bag)evalE(s,env);
+ final Iterator<MRData> iter = bs.iterator();
+ return new Bag(new BagIterator() {
+ public boolean hasNext () {
+ return iter.hasNext();
+ }
+ public MRData next () {
+ return new Tuple(new MR_int(0),iter.next());
+ }
+ });
+ case let(`v,`u,`body):
+ return evalM(body,new Environment(v.toString(),evalE(u,env),env));
+ case apply(`f,`arg):
+ if (!f.is_variable())
+ return (Bag)evalF(f,env).eval(evalE(arg));
+ MRData fnc = lookup_global_binding(f.toString());
+ if (fnc == null)
+ throw new Error("Unknown function: "+f);
+ MRData t = evalE(arg,env);
+ if (!(t instanceof Tuple))
+ throw new Error("Expected a tuple in function application: "+t);
+ return (Bag)((Lambda)fnc).lambda().eval(t);
+ case BSPSource(`n,`s):
+ final MR_int i = new MR_int((int)((LongLeaf)n).value());
+ Bag bs = (Bag)evalE(s,env);
+ final Iterator<MRData> iter = bs.iterator();
+ return new Bag(new BagIterator() {
+ public boolean hasNext () {
+ return iter.hasNext();
+ }
+ public MRData next () {
+ return new Tuple(i,iter.next());
+ }
+ });
+ case BSP(...):
+ MRData res = evalE(e,env);
+ if (res instanceof Bag)
+ return (Bag)res;
+ else return new Bag(res);
+ case `v:
+ if (!v.is_variable())
+ fail;
+ MRData x = variable_lookup(v.toString(),env);
+ if (x != null)
+ return (Bag)x;
+ x = lookup_global_binding(v.toString());
+ if (x != null)
+ return (Bag)x;
+ throw new Error("Variable "+v+" is not bound");
+ };
+ throw new Error("Cannot evaluate the plan: "+e);
+ } catch (Error msg) {
+ if (!Config.trace)
+ throw new Error(msg.getMessage());
+ System.err.println(msg.getMessage());
+ msg.printStackTrace();
+ throw new Error("Evaluation error in: "+print_query(e));
+ } catch (Exception ex) {
+ if (Config.trace)
+ ex.printStackTrace();
+ throw new Error("Evaluation error in: "+print_query(e));
+ }
+ }
+
+ /** replace all non-free variables with their reified values */
+ private final static Tree closure ( Tree e, Environment env, Trees local_vars ) {
+ match e {
+ case lambda(`x,`b):
+ return #<lambda(`x,`(closure(b,env,local_vars.cons(x))))>;
+ case apply(`f,...as):
+ Trees bs = #[];
+ for (Tree a: as)
+ bs = bs.append(closure(a,env,local_vars));
+ return #<apply(`f,...bs)>;
+ case `f(...as):
+ Trees bs = #[];
+ for (Tree a: as)
+ bs = bs.append(closure(a,env,local_vars));
+ return #<`f(...bs)>;
+ case null: return null;
+ case `v:
+ if (!v.is_variable())
+ fail;
+ if (local_vars.member(v))
+ fail;
+ MRData x = variable_lookup(v.toString(),env);
+ if (x != null)
+ if (!(x instanceof MR_dataset))
+ return reify(x);
+ x = lookup_global_binding(v.toString());
+ if (x != null)
+ if (!(x instanceof MR_dataset))
+ return reify(x);
+ };
+ return e;
+ }
+
+ /** replace all non-free variables with their reified values */
+ final static Tree closure ( Tree e, Environment env ) {
+ return closure(e,env,#[]);
+ }
+
+ static Tree query_type;
+ static Tree query_plan;
+ static boolean is_dataset;
+
+ /** translate an MRQL expression e into a physical plan */
+ final static Tree translate_expression ( Tree e ) {
+ try {
+ if (Config.trace)
+ System.out.println("Query at line "+Main.parser.line_pos()+": "+print_query(e));
+ Tree qt = TypeInference.type_inference(e);
+ if (!Config.quiet_execution)
+ System.out.println("Query type: "+print_type(qt));
+ query_type = qt;
+ Tree ne = Normalization.remove_groupby(e);
+ if (Config.trace)
+ System.out.println("After removing group-by:\n"+ne.pretty(0));
+ ne = Simplification.rename(ne);
+ if (Config.trace)
+ System.out.println("After renaming variables:\n"+ne.pretty(0));
+ ne = Simplification.rename(Normalization.normalize_all(ne));
+ if (Config.trace)
+ System.out.println("Normalized query:\n"+ne.pretty(0));
+ type_inference(ne);
+ ne = QueryPlan.best_plan(ne);
+ if (Config.trace)
+ System.out.println("Best plan:\n"+ne.pretty(0));
+ ne = Simplification.rename(Translator.translate_select(ne));
+ if (Config.trace)
+ System.out.println("After removing select-queries:\n"+ne.pretty(0));
+ type_inference(ne);
+ ne = Simplification.simplify_all(ne);
+ if (Config.trace)
+ System.out.println("Algebra expression:\n"+ne.pretty(0));
+ Tree pt = type_inference(ne);
+ if (Config.trace)
+ System.out.println("Algebraic type: "+print_type(pt));
+ ne = AlgebraicOptimization.translate_all(ne);
+ if (Config.trace)
+ System.out.println("Translated expression:\n"+ne.pretty(0));
+ Tree et = TypeInference.type_inference(ne);
+ is_dataset = PlanGeneration.is_dataset_expr(ne);
+ if (Config.trace)
+ System.out.println("Physical plan type: "+print_type(et));
+ repeat_variables = #[];
+ ne = Simplification.simplify_all(ne);
+ Tree plan = PlanGeneration.makePlan(ne);
+ if (Config.bsp_mode) {
+ BSPTranslator.reset();
+ if (Config.trace)
+ System.out.println("Physical plan:\n"+plan.pretty(0));
+ plan = Materialization.materialize_terms(BSPTranslator.constructBSPplan(plan));
+ if (Config.trace)
+ System.out.println("BSP plan:\n"+plan.pretty(0));
+ else {
+ String splan = print_plan(plan,0,false);
+ if (!splan.equals("") && !Config.quiet_execution)
+ System.out.println("BSP plan:\n"+splan);
+ }
+ } else {
+ if (Config.hadoop_mode)
+ plan = PlanGeneration.physical_plan(plan);
+ plan = Materialization.materialize_terms(AlgebraicOptimization.common_factoring(plan));
+ if (Config.trace)
+ System.out.println("Physical plan:\n"+plan.pretty(0));
+ else {
+ String splan = print_plan(plan,0,false);
+ if (!splan.equals("") && !Config.quiet_execution)
+ System.out.println("Physical plan:\n"+splan);
+ }
+ };
+ if (Config.compile_functional_arguments)
+ plan = Compiler.compile(plan);
+ return plan;
+ } catch (Error x) {
+ if (Config.testing)
+ throw new Error(x);
+ if (!Config.trace && x.toString().endsWith("Type Error"))
+ return null;
+ if (x.getMessage() != null) // system error
+ System.err.println("*** MRQL System Error at line "+Main.parser.line_pos()+": "+x);
+ if (Config.trace)
+ x.printStackTrace(System.err);
+ return null;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/core/src/main/java/org/apache/mrql/Inv.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/Inv.java b/core/src/main/java/org/apache/mrql/Inv.java
new file mode 100644
index 0000000..3faa7fe
--- /dev/null
+++ b/core/src/main/java/org/apache/mrql/Inv.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import java.io.IOException;
+import java.io.DataInput;
+import java.io.DataOutput;
+
+
+final public class Inv extends MRData {
+ private MRData value;
+
+ Inv ( MRData v ) { value = v; }
+
+ public void materializeAll () { value.materializeAll(); };
+
+ public MRData value () { return value; }
+
+ final public void write ( DataOutput out ) throws IOException {
+ out.writeByte(MRContainer.INV);
+ value.write(out);
+ }
+
+ final public static Inv read ( DataInput in ) throws IOException {
+ return new Inv(MRContainer.read(in));
+ }
+
+ public void readFields ( DataInput in ) throws IOException {
+ value.readFields(in);
+ }
+
+ public int compareTo ( MRData x ) {
+ assert(x instanceof Inv);
+ return -value.compareTo(((Inv)x).value);
+ }
+
+ final public static int compare ( byte[] x, int xs, int xl, byte[] y, int ys, int yl, int[] size ) {
+ int n = MRContainer.compare(x,xs,xl,y,ys,yl,size);
+ size[0] += 1;
+ return -n;
+ }
+
+ public boolean equals ( Object x ) {
+ return x instanceof Inv && value.equals(((Inv)x).value);
+ }
+
+ public int hashCode () { return value.hashCode(); }
+
+ public String toString () { return "inv("+value.toString()+")"; }
+}
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/core/src/main/java/org/apache/mrql/JSON.cup
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/JSON.cup b/core/src/main/java/org/apache/mrql/JSON.cup
new file mode 100644
index 0000000..c045437
--- /dev/null
+++ b/core/src/main/java/org/apache/mrql/JSON.cup
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import java_cup.runtime.*;
+
+terminal TRUE, FALSE, NULL, COLON, COMMA, O_BEGIN, O_END, A_BEGIN, A_END;
+
+terminal String STRING;
+terminal Long INTEGER;
+terminal Double DOUBLE;
+
+non terminal MRData top, json, pair, value;
+non terminal Bag members, elements;
+
+precedence left O_BEGIN, O_END, A_BEGIN, A_END;
+precedence left COMMA;
+precedence left COLON;
+
+start with top;
+
+top ::= json:e {: RESULT = e; :}
+ ;
+json ::= O_BEGIN O_END {: RESULT = new Union((byte)0,new Bag()); :}
+ | O_BEGIN members:m O_END {: RESULT = new Union((byte)0,m); :}
+ | A_BEGIN A_END {: RESULT = new Union((byte)1,new Bag()); :}
+ | A_BEGIN elements:m A_END {: RESULT = new Union((byte)1,m); :}
+ | O_BEGIN O_BEGIN INTEGER:n O_END O_END {: RESULT = new MR_variable((int)n.longValue()); :}
+ ;
+members ::= pair:p {: RESULT = new Bag(p); :}
+ | members:m COMMA pair:p {: RESULT = m.add_element(p); :}
+ ;
+pair ::= STRING:s COLON value:v {: RESULT = (new Tuple(2)).set(0,new MR_string(s)).set(1,v); :}
+ ;
+elements ::= value:v {: RESULT = new Bag(v); :}
+ | elements:es COMMA value:v {: RESULT = es.add_element(v); :}
+ ;
+value ::= json:e {: RESULT = e; :}
+ | STRING:s {: RESULT = new Union((byte)2,new MR_string(s)); :}
+ | INTEGER:n {: RESULT = new Union((byte)3,new MR_long(n.longValue())); :}
+ | DOUBLE:n {: RESULT = new Union((byte)4,new MR_double(n.doubleValue())); :}
+ | TRUE {: RESULT = new Union((byte)5,new MR_bool(true)); :}
+ | FALSE {: RESULT = new Union((byte)5,new MR_bool(false)); :}
+ | NULL {: RESULT = new Union((byte)6,new Tuple(0)); :}
+ ;
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/core/src/main/java/org/apache/mrql/JSON.lex
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/JSON.lex b/core/src/main/java/org/apache/mrql/JSON.lex
new file mode 100644
index 0000000..c75d1bb
--- /dev/null
+++ b/core/src/main/java/org/apache/mrql/JSON.lex
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import java_cup.runtime.Symbol;
+
+
+%%
+%class JSONLex
+%public
+%line
+%char
+%cup
+%eofval{
+ return symbol(jsym.EOF);
+%eofval}
+%{
+ public String text () { return yytext(); }
+
+ public int line_pos () { return yyline+1; }
+
+ public int char_pos () { return yychar; }
+
+ public static Symbol symbol ( int s ) {
+ return new Symbol(s);
+ }
+
+ public static Symbol symbol ( int s, Object o ) {
+ return new Symbol(s,o);
+ }
+
+%}
+
+INT = [+-]?[0-9]+
+DOUBLE = [+-]?[0-9]+([\.][0-9]+)?([eE][+-]?[0-9]+)?
+
+%%
+
+\"[^\"]*\" { return symbol(jsym.STRING,yytext().substring(1,yytext().length()-1)); }
+{INT} { return symbol(jsym.INTEGER,new Long(yytext())); }
+{DOUBLE} { return symbol(jsym.DOUBLE,new Double(yytext())); }
+true { return symbol(jsym.TRUE); }
+false { return symbol(jsym.FALSE); }
+null { return symbol(jsym.NULL); }
+\{ { return symbol(jsym.O_BEGIN); }
+\} { return symbol(jsym.O_END); }
+\[ { return symbol(jsym.A_BEGIN); }
+\] { return symbol(jsym.A_END); }
+\, { return symbol(jsym.COMMA); }
+\: { return symbol(jsym.COLON); }
+[ \t\f] { }
+[\r\n] { }
+. { throw new Error("Illegal character: "+yytext()); }
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/core/src/main/java/org/apache/mrql/JsonFormatParser.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/JsonFormatParser.java b/core/src/main/java/org/apache/mrql/JsonFormatParser.java
new file mode 100644
index 0000000..77aa891
--- /dev/null
+++ b/core/src/main/java/org/apache/mrql/JsonFormatParser.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import org.apache.mrql.gen.*;
+import java.io.StringReader;
+import java.nio.ByteBuffer;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.io.DataOutputBuffer;
+import java_cup.runtime.Symbol;
+
+
+/** The JSON parser */
+public class JsonFormatParser implements Parser {
+ String[] tags; // split tags
+ JsonSplitter splitter;
+
+ public void initialize ( Trees args ) {
+ try {
+ if (args.length() > 0) {
+ if (!(args.nth(0) instanceof Node)
+ || !(((Node)args.nth(0)).name().equals("list")
+ || ((Node)args.nth(0)).name().equals("bag")))
+ throw new Error("Must provide a bag of synchronization property names to split the JSON source: "+args.nth(0));
+ Trees ts = ((Node)args.nth(0)).children();
+ if (ts.length() == 0)
+ throw new Error("Expected at least one synchronization tagname in JSON source: "+ts);
+ tags = new String[ts.length()];
+ for ( int i = 0; i < tags.length; i++ )
+ if (ts.nth(i) instanceof StringLeaf)
+ tags[i] = ((StringLeaf)(ts.nth(i))).value();
+ else throw new Error("Expected a synchronization tagname in JSON source: "+ts.nth(i));
+ }
+ } catch (Exception e) {
+ throw new Error(e);
+ }
+ }
+
+ public void open ( String file ) {
+ try {
+ splitter = new JsonSplitter(tags,file,new DataOutputBuffer());
+ } catch (Exception e) {
+ throw new Error(e);
+ }
+ }
+
+ public void open ( FSDataInputStream fsin, long start, long end ) {
+ try {
+ splitter = new JsonSplitter(tags,fsin,start,end,new DataOutputBuffer());
+ } catch (Exception e) {
+ throw new Error(e);
+ }
+ }
+
+ public Tree type () { return new VariableLeaf("JSON"); }
+
+ public String slice () {
+ if (splitter.hasNext()) {
+ DataOutputBuffer b = splitter.next();
+ return new String(b.getData(),0,b.getLength());
+ } else return null;
+ }
+
+ public Bag parse ( String s ) {
+ try {
+ JSONLex scanner = new JSONLex(new StringReader(s));
+ JSONParser parser = new JSONParser(scanner);
+ Symbol sym = parser.parse();
+ return new Bag((MRData)sym.value);
+ } catch (Exception e) {
+ System.err.println(e);
+ return new Bag();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/core/src/main/java/org/apache/mrql/JsonSplitter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/JsonSplitter.java b/core/src/main/java/org/apache/mrql/JsonSplitter.java
new file mode 100644
index 0000000..7017e72
--- /dev/null
+++ b/core/src/main/java/org/apache/mrql/JsonSplitter.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import org.apache.mrql.gen.*;
+import java_cup.runtime.Symbol;
+import java.util.Iterator;
+import java.io.*;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.io.DataOutputBuffer;
+
+
+/** Extract the JSON objects tagged by tags from a data split of the input stream (fsin) */
+final public class JsonSplitter implements Iterator<DataOutputBuffer> {
+ boolean in_memory;
+ FSDataInputStream fsin; // for HDFS processing
+ InputStream in; // for in-memory processing
+ JSONLex scanner;
+ String[] tags;
+ long start;
+ long end;
+ final DataOutputBuffer buffer;
+
+ JsonSplitter ( String[] tags, FSDataInputStream fsin, long start, long end,
+ DataOutputBuffer buffer ) {
+ in_memory = false;
+ this.tags = tags;
+ this.fsin = fsin;
+ this.end = end;
+ this.buffer = buffer;
+ try {
+ fsin.seek(start);
+ this.start = (start == 0) ? start : sync(start);
+ fsin.seek(this.start);
+ scanner = new JSONLex(fsin);
+ } catch ( IOException e ) {
+ System.err.println("*** Cannot parse the data split: "+fsin);
+ }
+ }
+
+ JsonSplitter ( String[] tags, String file, DataOutputBuffer buffer ) {
+ in_memory = true;
+ try {
+ in = new FileInputStream(file);
+ } catch ( Exception e ) {
+ throw new Error("Cannot open the file: "+file);
+ };
+ this.tags = tags;
+ this.buffer = buffer;
+ scanner = new JSONLex(in);
+ }
+
+ private long sync ( long start ) {
+ try {
+ long first_quote = -1;
+ for ( long offset = 0; ; offset++ ) {
+ char c = (char)fsin.read();
+ if (c == '\"') {
+ if (first_quote >= 0)
+ if ((char)fsin.read() == ':')
+ return start+first_quote;
+ first_quote = offset;
+ }
+ }
+ } catch (IOException ex) {
+ return (long)0;
+ }
+ }
+
+ public boolean hasNext () {
+ try {
+ if (in_memory || start+scanner.char_pos() < end)
+ if (skip())
+ return store();
+ return false;
+ } catch (Exception e) {
+ System.err.println(e);
+ return false;
+ }
+ }
+
+ public DataOutputBuffer next () {
+ return buffer;
+ }
+
+ public void remove () { }
+
+ boolean is_start_tag ( String tagname ) {
+ if (tags == null)
+ return true;
+ for (String tag: tags)
+ if (tag.contentEquals(tagname))
+ return true;
+ return false;
+ }
+
+ /** skip until the beginning of a split element */
+ boolean skip () throws IOException {
+ while (true) {
+ Symbol s = scanner.next_token();
+ if (s.sym == jsym.EOF || (!in_memory && start+scanner.char_pos() >= end))
+ return false;
+ if (s.sym == jsym.STRING && is_start_tag((String)s.value)) {
+ String tag = (String)s.value;
+ if (scanner.next_token().sym == jsym.COLON) {
+ buffer.reset();
+ buffer.write('{');
+ buffer.write('\"');
+ for ( int i = 0; i < tag.length(); i++ )
+ buffer.write(tag.charAt(i));
+ buffer.write('\"');
+ buffer.write(':');
+ return true;
+ }
+ }
+ }
+ }
+
+ /** store one split element into the buffer; may cross split boundaries */
+ boolean store () throws IOException {
+ int nest = 0;
+ while (true) {
+ Symbol s = scanner.next_token();
+ if (s.sym == jsym.EOF)
+ return false;
+ if (s.sym == jsym.O_BEGIN || s.sym == jsym.A_BEGIN)
+ nest++;
+ else if (s.sym == jsym.O_END || s.sym == jsym.A_END)
+ nest--;
+ String text = scanner.text();
+ for ( int i = 0; i < text.length(); i++ )
+ buffer.write(text.charAt(i));
+ if (nest == 0) {
+ buffer.write('}');
+ return true;
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/core/src/main/java/org/apache/mrql/Lambda.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/Lambda.java b/core/src/main/java/org/apache/mrql/Lambda.java
new file mode 100644
index 0000000..d0cfd7a
--- /dev/null
+++ b/core/src/main/java/org/apache/mrql/Lambda.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import java.io.IOException;
+import java.io.DataInput;
+import java.io.DataOutput;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.fs.*;
+
+
+/** an anonymous function encapsulated as MRData */
+final public class Lambda extends MRData {
+ private Function lambda;
+
+ public Lambda ( Function f ) { lambda = f; }
+
+ public void materializeAll () {};
+
+ public Function lambda () { return lambda; }
+
+ final public void write ( DataOutput out ) throws IOException {
+ throw new Error("Functions are not serializable");
+ }
+
+ public void readFields ( DataInput in ) throws IOException {
+ throw new Error("Functions are not serializable");
+ }
+
+ public int compareTo ( MRData x ) {
+ throw new Error("Functions cannot be compared");
+ }
+
+ public boolean equals ( Object x ) {
+ throw new Error("Functions cannot be compared");
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/core/src/main/java/org/apache/mrql/LineParser.gen
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/LineParser.gen b/core/src/main/java/org/apache/mrql/LineParser.gen
new file mode 100644
index 0000000..fb1e068
--- /dev/null
+++ b/core/src/main/java/org/apache/mrql/LineParser.gen
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import org.apache.mrql.gen.*;
+import java.io.*;
+import java.util.*;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.util.LineReader;
+
+
+/** A parser for line-oriented, character delimited text files (such as CVS) */
+final public class LineParser implements Parser {
+ final static int maxLineLength = 1000;
+ boolean in_memory;
+ FSDataInputStream fsin; // for HDFS processing
+ LineReader in;
+ BufferedReader buffered_in; // for in-memory processing
+ Text line;
+ long start;
+ long end;
+ long pos;
+ String delimiter;
+ Tree type;
+ byte[] types; // a vector of basic type ids (see MRContainer in MapReduceData)
+ int type_length;
+
+ static byte[] relational_record ( Tree tp ) {
+ match tp {
+ case record(...al):
+ Trees attrs = #[];
+ byte[] types = new byte[al.length()];
+ for ( int i = 0; i < types.length; i++ )
+ match al.nth(i) {
+ case bind(`v,any):
+ types[i] = -1;
+ if (attrs.member(v))
+ TypeInference.type_error(tp,"Duplicate record attribute name: "+v);
+ attrs = attrs.append(v);
+ case bind(`v,`t):
+ if (!t.is_variable())
+ fail;
+ types[i] = MRContainer.type_code(t.toString());
+ if (attrs.member(v))
+ TypeInference.type_error(tp,"Duplicate record attribute name: "+v);
+ attrs = attrs.append(v);
+ if (!MRContainer.basic_type(types[i]))
+ TypeInference.error("Expected a basic type for a relational attribute: "+t);
+ case `t: TypeInference.error("Expected a basic type for a relational attribute: "
+ +TypeInference.print_type(t));
+ };
+ return types;
+ case tuple(...al):
+ byte[] types = new byte[al.length()];
+ for ( int i = 0; i < types.length; i++ )
+ match al.nth(i) {
+ case any:
+ types[i] = -1;
+ case `t:
+ if (!t.is_variable())
+ fail;
+ types[i] = MRContainer.type_code(t.toString());
+ if (!MRContainer.basic_type(types[i]))
+ TypeInference.error("Expected a basic type for a relational attribute: "+t);
+ case `t: TypeInference.error("Expected a basic type for a relational attribute: "
+ +TypeInference.print_type(t));
+ };
+ return types;
+ };
+ TypeInference.error("Expected a relational record or a tuple type: "
+ +TypeInference.print_type(tp));
+ return null;
+ }
+
+ static Tree relational_record_type ( Tree tp ) {
+ match tp {
+ case record(...al):
+ Trees ts = #[];
+ for ( Tree a: al )
+ match a {
+ case bind(_,any): ;
+ case `t: ts = ts.append(t);
+ };
+ return #<record(...ts)>;
+ case tuple(...al):
+ Trees ts = #[];
+ for ( Tree a: al )
+ if (!a.equals(#<any>))
+ ts = ts.append(a);
+ return #<tuple(...ts)>;
+ };
+ TypeInference.error("Expected a relational record type: "
+ +TypeInference.print_type(tp));
+ return null;
+ }
+
+ public Tree type () {
+ return relational_record_type(type);
+ }
+
+ public void initialize ( Trees args ) {
+ if (Config.hadoop_mode && Plan.conf == null)
+ Plan.conf = Evaluator.evaluator.new_configuration();
+ if (args.length() != 2)
+ throw new Error("The line parser must have two arguments: "+args);
+ if (!(args.nth(0) instanceof StringLeaf))
+ throw new Error("Expected a delimiter: "+args.nth(0));
+ delimiter = ((StringLeaf)args.nth(0)).value();
+ if (delimiter.length() == 0)
+ throw new Error("Expected a delimiter with at least one character: "+delimiter);
+ type = ((Node)args.nth(1)).children().nth(0);
+ types = relational_record(type);
+ type_length = 0;
+ for ( int i = 0; i < types.length; i++ )
+ if (types[i] >= 0)
+ type_length++;
+ if (type_length < 1)
+ TypeInference.error("A relational record type must have at least one component: "
+ +TypeInference.print_type(type));
+ }
+
+ public void open ( String file ) {
+ in_memory = true;
+ try {
+ buffered_in = new BufferedReader(new InputStreamReader(new FileInputStream(file)),
+ 10000);
+ } catch ( Exception e ) {
+ throw new Error("Cannot open the file: "+file);
+ }
+ }
+
+ public void open ( FSDataInputStream fsin, long fstart, long fend ) {
+ in_memory = false;
+ this.fsin = fsin;
+ start = fstart;
+ end = fend;
+ line = new Text();
+ try {
+ if (start != 0) { // for all but the first data split, skip the first record
+ --start;
+ fsin.seek(start);
+ in = new LineReader(fsin,Plan.conf);
+ start += in.readLine(new Text(),0,(int) Math.min(Integer.MAX_VALUE,end-start));
+ } else in = new LineReader(fsin,Plan.conf);
+ pos = start;
+ } catch ( IOException e ) {
+ System.err.println("*** Cannot parse the data split: "+fsin);
+ this.start = end;
+ }
+ }
+
+ public String slice () {
+ try {
+ if (in_memory)
+ return buffered_in.readLine();
+ while (pos < end) {
+ int newSize = in.readLine(line,maxLineLength,
+ Math.max((int)Math.min(Integer.MAX_VALUE,end-pos),
+ maxLineLength));
+ if (newSize == 0)
+ return null;
+ pos += newSize;
+ if (newSize < maxLineLength)
+ return line.toString();
+ };
+ return null;
+ } catch ( Exception e ) {
+ System.err.println("*** Cannot slice the text: "+e);
+ return "";
+ }
+ }
+
+ private static MRData parse_value ( String text, byte type ) {
+ switch (type) {
+ case MRContainer.BYTE: return new MR_byte(Byte.parseByte(text));
+ case MRContainer.SHORT: return new MR_short(Short.parseShort(text));
+ case MRContainer.INT: return new MR_int(Integer.parseInt(text));
+ case MRContainer.LONG: return new MR_long(Long.parseLong(text));
+ case MRContainer.FLOAT: return new MR_float(Float.parseFloat(text));
+ case MRContainer.DOUBLE: return new MR_double(Double.parseDouble(text));
+ case MRContainer.CHAR: return new MR_char(text.charAt(0));
+ case MRContainer.STRING: return new MR_string(text);
+ };
+ System.err.println("*** Cannot parse the type "+MRContainer.type_names[type]+" in '"+text+"'");
+ return null;
+ }
+
+ public Bag parse ( String line ) {
+ try {
+ if (line == null)
+ return new Bag();
+ Tuple t = new Tuple(type_length);
+ int loc = 0;
+ int j = 0;
+ for ( int i = 0; i < types.length; i++ ) {
+ int k = line.indexOf(delimiter,loc);
+ if (types[i] >= 0) {
+ String s = (k > 0) ? line.substring(loc,k) : line.substring(loc);
+ MRData v = parse_value(s,types[i]);
+ if (v == null)
+ return new Bag();
+ t.set(j++,v);
+ };
+ loc = k+delimiter.length();
+ if (k < 0 && i+1 < types.length) {
+ System.err.println("*** Incomplete parsed text line: "+line);
+ return new Bag();
+ }
+ };
+ return new Bag(t);
+ } catch ( Exception e ) {
+ System.err.println("*** Cannot parse the text line: "+line);
+ return new Bag();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/core/src/main/java/org/apache/mrql/MRContainer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/mrql/MRContainer.java b/core/src/main/java/org/apache/mrql/MRContainer.java
new file mode 100644
index 0000000..d72c452
--- /dev/null
+++ b/core/src/main/java/org/apache/mrql/MRContainer.java
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import java.io.*;
+import org.apache.hadoop.io.WritableComparable;
+
+
+/** A container for MRData that implements read (the deserializer) */
+final public class MRContainer implements WritableComparable<MRContainer>, Serializable {
+ transient MRData data;
+
+ public final static byte BOOLEAN = 0, BYTE = 1, SHORT = 2, INT = 3, LONG = 4,
+ FLOAT = 5, DOUBLE = 6, CHAR = 7, STRING = 8, PAIR = 9, TUPLE = 10, BAG = 11,
+ LAZY_BAG = 12, END_OF_LAZY_BAG = 13, UNION = 14, INV = 15, LAMBDA = 16,
+ VARIABLE = 17, TRIPLE = 18, NULL = 19, DATASET = 20, SYNC = 99, MORE_BSP_STEPS = 98;
+
+ public final static byte[] type_codes
+ = { BOOLEAN, BYTE, SHORT, INT, LONG, FLOAT, DOUBLE, CHAR, STRING, NULL, PAIR, TRIPLE,
+ TUPLE, BAG, LAZY_BAG, END_OF_LAZY_BAG, UNION, INV, LAMBDA, VARIABLE, SYNC };
+
+ public final static String[] type_names
+ = { "boolean", "byte", "short", "int", "long", "float", "double", "char", "string",
+ "null", "pair", "triple", "tuple", "bag", "lazy_bag", "end_of_lazy_bag", "union",
+ "inv", "lambda", "variable", "sync", "more_bsp_steps" };
+
+ public static byte type_code ( String type_name ) {
+ for ( byte i = 0; i < type_names.length; i ++ )
+ if (type_names[i].equals(type_name))
+ return type_codes[i];
+ return -1;
+ }
+
+ public static boolean basic_type ( byte type_code ) {
+ return type_code >= 0 && type_code <= 8;
+ }
+
+ MRContainer ( MRData d ) { data = d; }
+
+ MRContainer () { data = null; }
+
+ public final static MRData end_of_lazy_bag = new MR_EOLB();
+
+ MRData data () { return data; }
+
+ public void set ( MRData v ) { data = v; }
+ final public void write ( DataOutput out ) throws IOException { data.write(out); }
+ public void readFields ( DataInput in ) throws IOException { data = read(in); }
+ public int compareTo ( MRContainer x ) { return data.compareTo(x.data); }
+ public boolean equals ( Object x ) { return data.equals(x); }
+ public int hashCode () { return data.hashCode(); }
+ public String toString () { return data.toString(); }
+
+ final public static MRData read ( DataInput in ) throws IOException {
+ final byte tag = in.readByte();
+ switch (tag) {
+ case TUPLE: return Tuple.read(in);
+ case NULL: return new Tuple(0);
+ case PAIR: return Tuple.read2(in);
+ case TRIPLE: return Tuple.read3(in);
+ case BAG: return Bag.read(in);
+ case LAZY_BAG: return Bag.lazy_read(in);
+ case END_OF_LAZY_BAG: return end_of_lazy_bag;
+ case UNION: return Union.read(in);
+ case INV: return Inv.read(in);
+ case BOOLEAN: return MR_bool.read(in);
+ case BYTE: return MR_byte.read(in);
+ case SHORT: return MR_short.read(in);
+ case INT: return MR_int.read(in);
+ case LONG: return MR_long.read(in);
+ case FLOAT: return MR_float.read(in);
+ case DOUBLE: return MR_double.read(in);
+ case CHAR: return MR_char.read(in);
+ case STRING: return MR_string.read(in);
+ case SYNC: return new MR_sync();
+ case MORE_BSP_STEPS: return new MR_more_bsp_steps();
+ };
+ throw new Error("Unrecognized MRQL type tag: "+tag);
+ }
+
+ final public static int compare ( byte[] x, int xs, int xl, byte[] y, int ys, int yl, int[] size ) {
+ if (x[xs] != y[ys])
+ return x[xs] - y[ys];
+ switch (x[xs]) {
+ case TUPLE: return Tuple.compare(x,xs+1,xl-1,y,ys+1,yl-1,size);
+ case NULL: return 0;
+ case PAIR: return Tuple.compare2(x,xs+1,xl-1,y,ys+1,yl-1,size);
+ case TRIPLE: return Tuple.compare3(x,xs+1,xl-1,y,ys+1,yl-1,size);
+ case BAG: return Bag.compare(x,xs+1,xl-1,y,ys+1,yl-1,size);
+ case UNION: return Union.compare(x,xs+1,xl-1,y,ys+1,yl-1,size);
+ case INV: return Inv.compare(x,xs+1,xl-1,y,ys+1,yl-1,size);
+ case BOOLEAN: return MR_bool.compare(x,xs+1,xl-1,y,ys+1,yl-1,size);
+ case BYTE: return MR_byte.compare(x,xs+1,xl-1,y,ys+1,yl-1,size);
+ case SHORT: return MR_short.compare(x,xs+1,xl-1,y,ys+1,yl-1,size);
+ case INT: return MR_int.compare(x,xs+1,xl-1,y,ys+1,yl-1,size);
+ case LONG: return MR_long.compare(x,xs+1,xl-1,y,ys+1,yl-1,size);
+ case FLOAT: return MR_float.compare(x,xs+1,xl-1,y,ys+1,yl-1,size);
+ case DOUBLE: return MR_double.compare(x,xs+1,xl-1,y,ys+1,yl-1,size);
+ case CHAR: return MR_char.compare(x,xs+1,xl-1,y,ys+1,yl-1,size);
+ case STRING: return MR_string.compare(x,xs+1,xl-1,y,ys+1,yl-1,size);
+ case SYNC: return 0;
+ case MORE_BSP_STEPS: return 0;
+ };
+ throw new Error("Unrecognized MRQL type tag: "+x[xs]);
+ }
+
+ private void writeObject ( ObjectOutputStream out ) throws IOException {
+ data.write(out);
+ }
+
+ private void readObject ( ObjectInputStream in ) throws IOException, ClassNotFoundException {
+ data = read(in);
+ }
+
+ private void readObjectNoData () throws ObjectStreamException {}
+
+ final static class MR_EOLB extends MRData {
+ MR_EOLB () {}
+
+ public void materializeAll () {};
+
+ final public void write ( DataOutput out ) throws IOException {
+ out.writeByte(MRContainer.END_OF_LAZY_BAG);
+ }
+
+ public void readFields ( DataInput in ) throws IOException {}
+
+ public int compareTo ( MRData x ) { return 0; }
+
+ public boolean equals ( Object x ) { return x instanceof MR_EOLB; }
+
+ public int hashCode () { return 0; }
+
+ public String toString () {
+ return "end_of_lazy_bag";
+ }
+ }
+}