You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mrql.apache.org by fe...@apache.org on 2014/03/13 15:24:41 UTC
[16/26] MRQL-32: Refactoring directory structure for Eclipse
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/gen/src/main/java/org/apache/mrql/gen/Node.java
----------------------------------------------------------------------
diff --git a/gen/src/main/java/org/apache/mrql/gen/Node.java b/gen/src/main/java/org/apache/mrql/gen/Node.java
new file mode 100644
index 0000000..8bc55d4
--- /dev/null
+++ b/gen/src/main/java/org/apache/mrql/gen/Node.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql.gen;
+
+import java.io.*;
+
+
+final public class Node extends Tree {
+ public String name;
+ public Trees children;
+
+ public Node ( String name, Trees children ) {
+ super();
+ this.name = Tree.add(name);
+ this.children = children;
+ }
+
+ public Node ( String name ) {
+ super();
+ this.name = name;
+ children = Trees.nil;
+ }
+
+ final public String name () { return name; }
+
+ final public Trees children () { return children; }
+
+ public boolean equals ( Tree e ) {
+ return (e instanceof Node)
+ && name == ((Node) e).name
+ && children.equals(((Node) e).children);
+ }
+
+ protected int size () {
+ return name().length()+children().size();
+ }
+
+ public String toString () {
+ if (Character.isLetter(name.charAt(0))
+ || !(children().length()==2))
+ return name + children().toString();
+ else return "(" + children().head().toString() + name
+ + children().tail().head().toString() + ")";
+ }
+
+ public String pretty ( int position ) {
+ if (Character.isLetter(name.charAt(0))
+ || !(children().length()==2))
+ return name + children().pretty(position+name.length());
+ else return "(" + children().head().toString() + name
+ + children().tail().head().toString() + ")";
+ }
+
+ private void writeObject(ObjectOutputStream out) throws IOException {
+ out.defaultWriteObject();
+ }
+
+ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+ in.defaultReadObject();
+ name = Tree.add(name);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/gen/src/main/java/org/apache/mrql/gen/StringLeaf.java
----------------------------------------------------------------------
diff --git a/gen/src/main/java/org/apache/mrql/gen/StringLeaf.java b/gen/src/main/java/org/apache/mrql/gen/StringLeaf.java
new file mode 100644
index 0000000..5aa41c9
--- /dev/null
+++ b/gen/src/main/java/org/apache/mrql/gen/StringLeaf.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql.gen;
+
+import java.io.*;
+
+
+final public class StringLeaf extends Tree {
+ public String value;
+
+ public StringLeaf ( String s ) {
+ super();
+ value = s;
+ }
+
+ public String value () { return value; }
+
+ public boolean equals ( Tree e ) {
+ return (e instanceof StringLeaf)
+ && value.equals(((StringLeaf) e).value);
+ }
+
+ protected int size () { return value.length()+2; }
+
+ public String toString () {
+ return "\"" + value + "\"";
+ }
+
+ public String pretty ( int position ) {
+ return toString();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/gen/src/main/java/org/apache/mrql/gen/SymbolTable.java
----------------------------------------------------------------------
diff --git a/gen/src/main/java/org/apache/mrql/gen/SymbolTable.java b/gen/src/main/java/org/apache/mrql/gen/SymbolTable.java
new file mode 100644
index 0000000..52ea868
--- /dev/null
+++ b/gen/src/main/java/org/apache/mrql/gen/SymbolTable.java
@@ -0,0 +1,193 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql.gen;
+
+import java.io.*;
+import java.util.Iterator;
+
+
+/* A symbol in the symbol table */
+class SymbolCell {
+ String name;
+ Tree binding;
+ SymbolCell next;
+ SymbolCell ( String n, Tree v, SymbolCell r ) { name=n; binding=v; next=r; }
+}
+
+
+final class SymbolTableIterator implements Iterator<String> {
+ SymbolTable table;
+ SymbolTable original_table;
+
+ SymbolTableIterator ( SymbolTable t ) {
+ table = new SymbolTable();
+ original_table = t;
+ for ( int i = 0; i < t.symbol_table_size; i++ )
+ table.symbol_table[i] = t.symbol_table[i];
+ for ( int i = 0; i < t.scope_stack.length; i++ )
+ table.scope_stack[i] = t.scope_stack[i];
+ table.scope_stack_top = t.scope_stack_top;
+ }
+
+ public boolean hasNext () {
+ if (table.scope_stack_top <= 0)
+ return false;
+ int n = table.scope_stack[table.scope_stack_top-1];
+ if (n < 0 || !table.symbol_table[n].binding.equals(original_table.lookup(table.symbol_table[n].name))) {
+ table.scope_stack_top--;
+ return hasNext();
+ };
+ return true;
+ }
+
+ public String next () {
+ int n = table.scope_stack[--table.scope_stack_top];
+ String var = table.symbol_table[n].name;
+ table.symbol_table[n] = table.symbol_table[n].next;
+ return var;
+ }
+
+ public void remove () {}
+}
+
+
+public class SymbolTable implements Iterable<String> {
+ final static int symbol_table_size = 997;
+ final static int initial_scope_stack_length = 1000;
+
+ SymbolCell[] symbol_table;
+ int[] scope_stack;
+ int scope_stack_top = 0;
+
+ public SymbolTable () {
+ symbol_table = new SymbolCell[symbol_table_size];
+ scope_stack = new int[initial_scope_stack_length];
+ scope_stack_top = 0;
+ for (int i = 0; i < symbol_table_size; i++)
+ symbol_table[i] = null;
+ }
+
+ public Iterator<String> iterator () {
+ return new SymbolTableIterator(this);
+ }
+
+ /* a hashing function for strings */
+ int hash ( String s ) {
+ return Math.abs(s.hashCode()) % symbol_table_size;
+ }
+
+ /* insert a new item in the symbol table */
+ public void insert ( String key, Tree binding ) {
+ int loc = hash(key);
+ symbol_table[loc] = new SymbolCell(key,binding,symbol_table[loc]);
+ if (scope_stack_top >= scope_stack.length) {
+ int[] v = new int[scope_stack.length*2];
+ for ( int i = 0; i < scope_stack.length; i++ )
+ v[i] = scope_stack[i];
+ scope_stack = v;
+ };
+ scope_stack[scope_stack_top++] = loc;
+ }
+
+ /* replace an item with a given name in the symbol table */
+ public void replace ( String key, Tree binding ) {
+ int loc = hash(key);
+ for (SymbolCell s = symbol_table[loc]; s != null; s=s.next)
+ if (s.name.equals(key))
+ s.binding = binding;
+ }
+
+ /* remove an item with a given name from the symbol table */
+ public boolean remove ( String key ) {
+ int loc = hash(key);
+ SymbolCell prev = symbol_table[loc];
+ if (prev == null)
+ return false;
+ if (prev.name.equals(key)) {
+ symbol_table[loc] = prev.next;
+ return true;
+ };
+ for (SymbolCell s = prev.next; s != null; s=s.next, prev=prev.next)
+ if (s.name.equals(key)) {
+ prev.next = s.next;
+ return true;
+ };
+ return false;
+ }
+
+ /* lookup for an item in the symbol table */
+ public Tree lookup ( String key ) {
+ int loc = hash(key);
+ for (SymbolCell s = symbol_table[loc]; s != null; s=s.next)
+ if (s.name.equals(key))
+ return s.binding;
+ return null; // if not found
+ }
+
+ /* return true if key is local */
+ public boolean is_local ( String key ) {
+ int loc = hash(key);
+ int i = 0;
+ for ( SymbolCell s = symbol_table[loc]; s != null; s = s.next, i++ )
+ if (s.name.equals(key)) {
+ int k = 0;
+ for ( int j = scope_stack_top-1; j >= 0 && scope_stack[j] >= 0; j--)
+ if (scope_stack[j] == loc)
+ if (k++ == i)
+ return true;
+ return false;
+ };
+ return false; // if not found
+ }
+
+ /* start a new environment */
+ public void begin_scope () {
+ if (scope_stack_top >= scope_stack.length) {
+ int[] v = new int[scope_stack.length*2];
+ for ( int i = 0; i < scope_stack.length; i++ )
+ v[i] = scope_stack[i];
+ scope_stack = v;
+ };
+ scope_stack[scope_stack_top++] = -1;
+ }
+
+ /* pop the last environment */
+ public void end_scope () {
+ int i = scope_stack_top-1;
+ for (; scope_stack[i]>=0 && i>0; i--) {
+ int loc = scope_stack[i];
+ symbol_table[loc] = symbol_table[loc].next;
+ };
+ scope_stack_top = i;
+ }
+
+ /* display the content of the symbol table */
+ public void display () {
+ SymbolCell[] s = new SymbolCell[symbol_table_size];
+ for (int i = 0; i<symbol_table_size; i++)
+ s[i] = symbol_table[i];
+ for (int i = scope_stack_top-1; i>=0; i--)
+ if (scope_stack[i] == -1)
+ System.out.println("----------------");
+ else {
+ SymbolCell c = s[scope_stack[i]];
+ s[scope_stack[i]] = c.next;
+ System.out.println(c.name + ": " + c.binding);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/gen/src/main/java/org/apache/mrql/gen/Tree.java
----------------------------------------------------------------------
diff --git a/gen/src/main/java/org/apache/mrql/gen/Tree.java b/gen/src/main/java/org/apache/mrql/gen/Tree.java
new file mode 100644
index 0000000..7ddc582
--- /dev/null
+++ b/gen/src/main/java/org/apache/mrql/gen/Tree.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql.gen;
+
+import java_cup.runtime.*;
+import java.io.*;
+import java.util.HashMap;
+
+
+abstract public class Tree implements Serializable {
+
+ /* the current line number in the parsed file */
+ public static int line_number = 0;
+
+ /* the current char position in the parsed file */
+ public static int position_number = 0;
+
+ /* true when Trees are parsed rather than processed */
+ public static boolean parsed = false;
+
+ /* the line number of the Tree in the parsed file */
+ public int line;
+
+ /* the char position of the Tree in the parsed file */
+ public int position;
+
+ Tree () { line = line_number; position = position_number; }
+
+ /* deep equality */
+ public abstract boolean equals ( Tree e );
+
+ final public boolean is_node () { return (this instanceof Node); }
+ final public boolean is_variable () { return (this instanceof VariableLeaf); }
+ final public boolean is_long () { return (this instanceof LongLeaf); }
+ final public boolean is_string () { return (this instanceof StringLeaf); }
+ final public boolean is_double () { return (this instanceof DoubleLeaf); }
+
+ final public String variableValue () { return (this instanceof VariableLeaf) ? ((VariableLeaf)this).value() : ""; }
+ final public long longValue () { return (this instanceof LongLeaf) ? ((LongLeaf)this).value() : (long)0; }
+ final public String stringValue () { return (this instanceof StringLeaf) ? ((StringLeaf)this).value() : ""; }
+ final public double doubleValue () { return (this instanceof DoubleLeaf) ? ((DoubleLeaf)this).value() : (double)0.0; }
+
+ /* size used for pretty() */
+ protected abstract int size ();
+
+ /* print the Tree into a string */
+ public abstract String toString ();
+
+ /* pretty-print the Tree padded with position space characters */
+ public abstract String pretty ( int position );
+
+ private static Tree fix_tree ( Tree e ) {
+ if (e instanceof Node) {
+ Trees cs = Trees.nil;
+ for ( Tree a: ((Node) e).children().tail() )
+ cs = cs.append(fix_tree(a));
+ return new Node(((VariableLeaf)(((Node) e).children().head())).value(),cs);
+ } else return e;
+ }
+
+ /* the inverse of toString() */
+ final public static synchronized Tree parse ( String s ) throws Exception {
+ GenParser.scanner = new GenLex(new StringReader("#<"+s+">"));
+ GenParser.out = new PrintStream(new ByteArrayOutputStream());
+ new GenParser(GenParser.scanner).parse();
+ return fix_tree(GenParser.parse_tree);
+ }
+
+ private static HashMap<String,String> names = new HashMap<String,String>(1000);
+
+ public static String add ( String s ) {
+ String ns = names.get(s);
+ if (ns == null) {
+ names.put(s,s);
+ return s;
+ } else return ns;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/gen/src/main/java/org/apache/mrql/gen/Trees.java
----------------------------------------------------------------------
diff --git a/gen/src/main/java/org/apache/mrql/gen/Trees.java b/gen/src/main/java/org/apache/mrql/gen/Trees.java
new file mode 100644
index 0000000..86f65ca
--- /dev/null
+++ b/gen/src/main/java/org/apache/mrql/gen/Trees.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql.gen;
+
+import java.util.Iterator;
+import java.io.*;
+
+
+final class TreeIterator implements Iterator<Tree> {
+ Trees trees;
+
+ TreeIterator ( Trees trees ) { this.trees = trees; }
+
+ public boolean hasNext () { return trees.tail != null; }
+
+ public Tree next () {
+ Tree res = trees.head;
+ trees = trees.tail;
+ return res;
+ }
+
+ public void remove () { trees = trees.tail; }
+}
+
+
+final public class Trees implements Iterable<Tree>, Serializable {
+ private final static int screen_size = 100;
+ public Tree head;
+ public Trees tail;
+
+ public Trees ( Tree head, Trees tail ) {
+ if (tail == null)
+ throw new Error("Gen: an empty list of nodes must be nil, not null");
+ this.head = head;
+ this.tail = tail;
+ }
+
+ public Trees () {
+ head = null;
+ tail = null;
+ }
+
+ public final static Trees nil = new Trees();
+
+ public Trees ( Tree head ) {
+ this.head = head;
+ tail = nil;
+ }
+
+ public Tree head () {
+ if (tail == null)
+ throw new Error("Gen: tried to retrieve the head of an empty list of nodes");
+ return head;
+ }
+
+ public Trees tail () {
+ if (tail == null)
+ throw new Error("Gen: tried to retrieve the tail of an empty list of nodes");
+ return tail;
+ }
+
+ public boolean is_empty () {
+ return (tail == null);
+ }
+
+ /* number of nodes */
+ public int length () {
+ int n = 0;
+ for (Trees r = this; !r.is_empty(); r = r.tail)
+ n += 1;
+ return n;
+ }
+
+ /* put an Tree e at the beginning of the nodes */
+ public Trees cons ( Tree e ) {
+ return new Trees(e,this);
+ }
+
+ /* put an Tree e at the end of the nodes */
+ public Trees append ( Tree e ) {
+ if (is_empty())
+ return new Trees(e);
+ else {
+ Trees temp = new Trees(e,new Trees(e));
+ Trees res = temp;
+ for (Trees r = this; !r.is_empty(); r = r.tail) {
+ temp.tail = temp.tail.cons(r.head);
+ temp = temp.tail;
+ };
+ return res.tail;
+ }
+ }
+
+ /* append two lists of nodes */
+ public Trees append ( Trees s ) {
+ if (is_empty())
+ return s;
+ else if (s.is_empty())
+ return this;
+ else {
+ Trees temp = s.cons(s.head);
+ Trees res = temp;
+ for (Trees r = this; !r.is_empty(); r = r.tail)
+ { temp.tail = temp.tail.cons(r.head);
+ temp = temp.tail;
+ }
+ return res.tail;
+ }
+ }
+
+ /* reverse the order of nodes */
+ public Trees reverse () {
+ Trees res = nil;
+ for (Trees r = this; !r.is_empty(); r = r.tail)
+ res = res.cons(r.head);
+ return res;
+ }
+
+ /* is e one of the nodes? */
+ public boolean member ( Tree e ) {
+ for (Trees r = this; !r.is_empty(); r = r.tail)
+ if (r.head.equals(e))
+ return true;
+ return false;
+ }
+
+ /* return the nth node */
+ public Tree nth ( int n ) {
+ Trees r = this;
+ for (int i = 0; !r.is_empty() && i < n; r = r.tail(), i++)
+ ;
+ if (r.is_empty())
+ throw new Error("Gen: tried to retrieve a nonexistent nth element from a list of nodes");
+ else return r.head;
+ }
+
+ /* deep equality */
+ public boolean equals ( Trees s ) {
+ Trees n = this;
+ Trees m = s;
+ for(; n.tail != null && m.tail != null; n = n.tail, m = m.tail )
+ if (!n.head.equals(m.head))
+ return false;
+ return (m.tail == null) && (n.tail == null);
+ }
+
+ protected int size () {
+ int n = 1;
+ for (Trees r = this; !r.is_empty(); r = r.tail)
+ n += r.head.size()+1;
+ return n;
+ }
+
+ public Iterator<Tree> iterator () { return new TreeIterator(this); }
+
+ /* print the nodes */
+ public String toString () {
+ if (is_empty())
+ return "()";
+ String s = "(" + head;
+ for (Trees r = tail; !r.is_empty(); r = r.tail)
+ s = s + "," + r.head;
+ return s + ")";
+ }
+
+ /* pretty-print the nodes */
+ public String pretty ( int position ) {
+ if (is_empty() || (position+size() <= screen_size))
+ return toString();
+ String s = "(" + head.pretty(position+1);
+ for (Trees r=tail; !r.is_empty(); r=r.tail) {
+ s = s + ",\n";
+ for (int i=0; i<position+1; i++)
+ s = s + " ";
+ s = s + r.head.pretty(position+1);
+ };
+ return s + ")";
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/gen/src/main/java/org/apache/mrql/gen/VariableLeaf.java
----------------------------------------------------------------------
diff --git a/gen/src/main/java/org/apache/mrql/gen/VariableLeaf.java b/gen/src/main/java/org/apache/mrql/gen/VariableLeaf.java
new file mode 100644
index 0000000..c4c0ed9
--- /dev/null
+++ b/gen/src/main/java/org/apache/mrql/gen/VariableLeaf.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql.gen;
+
+import java.io.*;
+
+
+final public class VariableLeaf extends Tree {
+ public String value;
+
+ public VariableLeaf ( String s ) {
+ super();
+ value = Tree.add(s);
+ }
+
+ final public String value () { return value; }
+
+ public boolean equals ( Tree e ) {
+ return (e instanceof VariableLeaf)
+ && value == ((VariableLeaf) e).value;
+ }
+
+ protected int size () { return value.length(); }
+
+ public String toString () { return value; }
+
+ public String pretty ( int position ) { return value; }
+
+ private void writeObject(ObjectOutputStream out) throws IOException {
+ out.defaultWriteObject();
+ }
+
+ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+ in.defaultReadObject();
+ value = Tree.add(value);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/gen/src/main/java/org/apache/mrql/gen/gen.cup
----------------------------------------------------------------------
diff --git a/gen/src/main/java/org/apache/mrql/gen/gen.cup b/gen/src/main/java/org/apache/mrql/gen/gen.cup
new file mode 100644
index 0000000..91432ba
--- /dev/null
+++ b/gen/src/main/java/org/apache/mrql/gen/gen.cup
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql.gen;
+
+import java_cup.runtime.*;
+import java.util.Stack;
+
+parser code {:
+
+ static int[] tokens = {
+ GenSym.error, GenSym.CHAR, GenSym.ID, GenSym.CSTRING, GenSym.LONG, GenSym.DOUBLE,
+ GenSym.META, GenSym.MATCH, GenSym.CASE, GenSym.IS, GenSym.COLON, GenSym.COMMA,
+ GenSym.GT, GenSym.LP, GenSym.RP, GenSym.LB, GenSym.RB, GenSym.LSB, GenSym.RSB,
+ GenSym.BQ, GenSym.BQP, GenSym.DOTSP, GenSym.ANY, GenSym.OPER, GenSym.FAIL
+ };
+
+ static String[] token_names = {
+ "error", "character", "identifier", "string", "integer", "float",
+ "#<", "match", "case", "is", ":", ",", ">", "(", ")", "[", "]", "{", "}",
+ "`", "`(", "...(", "_", "operator", "fail"
+ };
+
+ public static String print ( Symbol s ) {
+ for (int i=0; i<tokens.length; i++)
+ if (tokens[i]==s.sym)
+ { String res = token_names[i] + " ";
+ if (s.value==null)
+ return res;
+ else if (s.value instanceof Long)
+ return res + ((Long) s.value).longValue();
+ else if (s.value instanceof Double)
+ return res + ((Double) s.value).doubleValue();
+ else if (s.value instanceof String)
+ return res + (String) s.value;
+ }
+ return "?";
+ }
+
+ public static java.io.PrintStream out;
+
+ public static GenLex scanner;
+
+ public static Tree parse_tree;
+
+ public static int newlines = 0;
+
+ public static int lcase = 0;
+
+ public static Stack<String> labels = new Stack<String>();
+
+ public void syntax_error ( Symbol token ) {
+ throw new Error("*** Syntax Error: " + print(token) + " (line: " + scanner.line_pos()
+ + ", position: " + scanner.char_pos() + ")");
+ }
+
+:};
+
+/* Terminals (tokens returned by the scanner). */
+terminal META, METAL, MATCH, CASE, FAIL, IS, DOTS, COLON, COMMA, GT, LP, RP, LB, RB, LSB, RSB, BQ, BQP, DOTSP, ANY;
+
+terminal String CHAR;
+terminal String ID;
+terminal Long LONG;
+terminal Double DOUBLE;
+terminal String CSTRING;
+terminal String OPER;
+
+non terminal Tree exp, name, variable, rest;
+non terminal Trees expl;
+non terminal String case, cases, schar, jcode, scode;
+non terminal pcode, prog;
+
+precedence left OPER;
+precedence nonassoc META;
+precedence nonassoc MATCH;
+precedence nonassoc CASE;
+precedence nonassoc CSTRING;
+precedence nonassoc CHAR;
+
+start with prog;
+
+prog ::= pcode
+ ;
+pcode ::= scode:s {: parser.out.print(s); :}
+ | pcode scode:s {: parser.out.print(s); :}
+ ;
+jcode ::= scode:s {: RESULT = s; :}
+ | jcode:c scode:s {: RESULT = c+s; :}
+ ;
+scode ::= schar:s {: RESULT = s; :}
+ | LSB RSB {: RESULT = "{}"; :}
+ | LSB jcode:c RSB {: RESULT = "{"+c+"}"; :}
+ | METAL RB {: RESULT = "Trees.nil"; :}
+ | META exp:e GT {: GenParser.parse_tree = e;
+ RESULT = Meta.reify(e);
+ for (int i=0; i < parser.newlines; i++)
+ RESULT += "\n";
+ parser.newlines = 0; :}
+ | METAL expl:r RB {: RESULT = Meta.reify(new Node("Node",
+ new Trees(new VariableLeaf("F"),r)));
+ RESULT = RESULT.substring(13,RESULT.length()-1);
+ for (int i=0; i < parser.newlines; i++)
+ RESULT += "\n";
+ parser.newlines = 0; :}
+ | MATCH {: Meta.level++;
+ GenLex.context.new_context();
+ GenLex.context.in_match_body();
+ :}
+ jcode:c LSB cases:el RSB {: RESULT = "{ boolean FOUND_" + Meta.level + " = false; Tree E_"
+ + Meta.level + " = " + c + "; " + el + " }";
+ GenLex.context.close_context(GenParser.scanner);
+ for (int i=0; i < parser.newlines; i++)
+ RESULT += "\n";
+ parser.newlines = 0;
+ Meta.level--; :}
+ | FAIL {: RESULT = "{ FOUND_" + Meta.level + "=false; break " + GenParser.labels.peek() + "; }"; :}
+ ;
+schar ::= CHAR:c {: RESULT = c; :}
+ | CSTRING:s {: RESULT = Crypt.encrypt(s); :}
+ ;
+cases ::= jcode:j case:c {: RESULT = j+c; :}
+ | cases:cl case:c {: RESULT = cl + " if (!FOUND_" + Meta.level + ") " + c; :}
+ ;
+case ::= CASE {: GenParser.labels.push("LCASE_"+GenParser.lcase++); :}
+ exp:e COLON jcode:j {: Condition m = Meta.pattern(e,"E_" + Meta.level);
+ RESULT = "{ " + GenParser.labels.pop() + ": ";
+ RESULT += (m.pred.equals("true")) ? "" : "if (" + m.pred + ") ";
+ for (int i=0; i < parser.newlines; i++)
+ RESULT += "\n";
+ parser.newlines = 0;
+ RESULT += "{ " + m.stmt + " FOUND_" + Meta.level
+ + "=true; " + j + " }";
+ for (int i = 0; i <= m.unmatched_brackets; i++)
+ RESULT += "}";
+ RESULT += ";"; :}
+ ;
+name ::= ID:s {: RESULT = new VariableLeaf(s); :}
+ ;
+variable ::= name:s {: RESULT = s; :}
+ | BQ name:s {: RESULT = new Node("Escape",new Trees(s)); :}
+ | BQP jcode:c RP {: RESULT = new Node("Code",new Trees(new StringLeaf(c))); :}
+ | ANY {: RESULT = new VariableLeaf("_any_"); :}
+ ;
+exp ::= LONG:n {: RESULT = new LongLeaf(n.longValue()); :}
+ | DOUBLE:n {: RESULT = new DoubleLeaf(n.doubleValue()); :}
+ | CSTRING:s {: RESULT = new StringLeaf(s); :}
+ | variable:e {: RESULT = e; :}
+ | variable:e LP RP {: RESULT = new Node("Node",new Trees(e)); :}
+ | variable:e LP expl:el RP {: RESULT = new Node("Node",new Trees(e,el)); :}
+ | BQ name:s LB exp:e RB {: RESULT = new Node("Higher",new Trees(s,new Trees(e))); :}
+ | name:s IS exp:e {: RESULT = new Node("IS",new Trees(s,new Trees(e))); :}
+ | LP exp:e RP {: RESULT = e; :}
+ | exp:e1 OPER:o exp:e2 {: RESULT = new Node("Node",new Trees(new VariableLeaf(o),
+ new Trees(e1,new Trees(e2)))); :}
+ ;
+expl ::= exp:e {: RESULT = new Trees(e); :}
+ | rest:e {: RESULT = new Trees(e); :}
+ | expl:el COMMA rest:e {: RESULT = el.append(e); :}
+ | expl:el COMMA exp:e {: RESULT = el.append(e); :}
+ ;
+rest ::= DOTS name:n {: RESULT = new Node("Dots",new Trees(n)); :}
+ | DOTSP jcode:c RP {: RESULT = new Node("Dots",new Trees(new StringLeaf(c))); :}
+ | DOTS {: RESULT = new Node("Dots"); :}
+ ;
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/gen/src/main/java/org/apache/mrql/gen/gen.lex
----------------------------------------------------------------------
diff --git a/gen/src/main/java/org/apache/mrql/gen/gen.lex b/gen/src/main/java/org/apache/mrql/gen/gen.lex
new file mode 100644
index 0000000..06b073d
--- /dev/null
+++ b/gen/src/main/java/org/apache/mrql/gen/gen.lex
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql.gen;
+
+import java_cup.runtime.Symbol;
+
+final class Context {
+ public int[] parentheses;
+ public boolean[] match_begin;
+ public int current;
+ public Context () {
+ current = 0;
+ parentheses = new int[1000];
+ match_begin = new boolean[1000];
+ parentheses[0] = 0;
+ match_begin[0] = false;
+ }
+ public boolean no_parentheses () {
+ return parentheses[current] == 0;
+ }
+ public void new_context () {
+ parentheses[++current] = 0;
+ match_begin[current] = false;
+ }
+ public void close_context ( GenLex lex ) {
+ if (parentheses[current--] > 0)
+ lex.error("Unbalanced Parentheses in Gen construction/pattern");
+ if (current < 0)
+ lex.error("Wrong match statement");
+ }
+ public void open_parenthesis () {
+ parentheses[current]++;
+ }
+ public boolean close_parenthesis () {
+ return (--parentheses[current] == 0) && current > 0 && !match_begin[current];
+ }
+ public void in_match_body () {
+ match_begin[current] = true;
+ }
+ public boolean match_body () { return match_begin[current]; }
+}
+%%
+%state gen
+%class GenLex
+%public
+%line
+%char
+%cup
+
+%eofval{
+ return new java_cup.runtime.Symbol(GenSym.EOF);
+%eofval}
+
+%{
+ public void error ( String msg ) {
+ throw new Error("*** Lexical Error: " + msg + " (line: " + line_pos() + ", position: " + char_pos() + ")");
+ }
+
+ public static Context context = new Context();
+
+ static int prev_char_pos = -1;
+
+ public int line_pos () { return yyline+1; }
+
+ public int char_pos () { return yychar-prev_char_pos; }
+
+ public Symbol symbol ( int s ) {
+ Tree.line_number = line_pos();
+ Tree.position_number = char_pos();
+ //System.out.println(context.parentheses[context.current]+" "+context.match_begin[context.current]+" "+GenParser.print(new Symbol(s)));
+ return new Symbol(s);
+ }
+
+ public Symbol symbol ( int s, Object o ) {
+ Tree.line_number = line_pos();
+ Tree.position_number = char_pos();
+ //System.out.println(context.parentheses[context.current]+" "+context.match_begin[context.current]+" "+GenParser.print(new Symbol(s,o)));
+ return new Symbol(s,o);
+ }
+%}
+
+DIGIT = [0-9]
+ID = [a-zA-Z_][a-zA-Z0-9_]*
+OPER = [!@#$%\^\&*-+=|\\~]+
+NEWLINE = [\n\r]
+DIGITS = {DIGIT}+
+INT = ({DIGIT}|[1-9]{DIGITS}|-{DIGIT}|-[1-9]{DIGITS})
+FRAC = [.]{DIGITS}
+EXP = [eE][+-]?{DIGITS}
+DOUBLE = ({INT}{FRAC}|{INT}{EXP}|{INT}{FRAC}{EXP})
+
+%%
+<gen> {INT} { return symbol(GenSym.LONG,new Long(yytext())); }
+<gen> {DOUBLE} { return symbol(GenSym.DOUBLE,new Double(yytext())); }
+<gen> ":" { yybegin(YYINITIAL);
+ context.close_context(this);
+ return symbol(GenSym.COLON); }
+<gen> "_" { return symbol(GenSym.ANY); }
+<gen> "," { return symbol(GenSym.COMMA); }
+<gen> "`(" { context.new_context();
+ context.open_parenthesis();
+ yybegin(YYINITIAL);
+ return symbol(GenSym.BQP);
+ }
+<gen> "...(" { context.new_context();
+ context.open_parenthesis();
+ yybegin(YYINITIAL);
+ return symbol(GenSym.DOTSP);
+ }
+<gen> "`" { return symbol(GenSym.BQ); }
+<gen> "..." { return symbol(GenSym.DOTS); }
+<gen> "(" { context.open_parenthesis();
+ return symbol(GenSym.LP);
+ }
+<gen> ")" { context.close_parenthesis();
+ return symbol(GenSym.RP);
+ }
+<gen> "[" { context.open_parenthesis();
+ return symbol(GenSym.LB);
+ }
+<gen> "]" { context.close_parenthesis();
+ if (context.no_parentheses())
+ { yybegin(YYINITIAL);
+ context.close_context(this);
+ };
+ return symbol(GenSym.RB);
+ }
+<gen> ">" { yybegin(YYINITIAL);
+ context.close_parenthesis();
+ context.close_context(this);
+ return symbol(GenSym.GT);
+ }
+<gen> "is" { return symbol(GenSym.IS); }
+<gen> {ID} { return symbol(GenSym.ID,yytext()); }
+<gen> {OPER} { return symbol(GenSym.OPER,yytext()); }
+<gen> \/\*[^*/]*\*\/ { for (char c: yytext().toCharArray())
+ if (c=='\n' || c=='\r')
+ GenParser.newlines++;
+ prev_char_pos = yychar; }
+<gen> "//"[^\n\r]* { prev_char_pos = 0; }
+<gen> [ \t\f] {}
+<gen> {NEWLINE} { GenParser.newlines++; prev_char_pos = yychar; }
+<gen> . { error("Illegal character in Gen construction/pattern: "+yytext()); }
+<YYINITIAL> "match" { return symbol(GenSym.MATCH); }
+<YYINITIAL> "case" { if (!context.match_body())
+ return symbol(GenSym.CHAR,yytext());
+ context.new_context();
+ yybegin(gen);
+ return symbol(GenSym.CASE);
+ }
+<YYINITIAL> "fail" { return symbol(GenSym.FAIL); }
+<YYINITIAL> "`" { error("Backquote outside a Gen construction/pattern"); }
+<YYINITIAL> "#<" { context.new_context();
+ context.open_parenthesis();
+ yybegin(gen);
+ return symbol(GenSym.META);
+ }
+<YYINITIAL> "#[" { context.new_context();
+ context.open_parenthesis();
+ yybegin(gen);
+ return symbol(GenSym.METAL);
+ }
+<YYINITIAL> "{" { context.open_parenthesis();
+ if (context.match_body())
+ return symbol(GenSym.LSB);
+ else return symbol(GenSym.CHAR,yytext());
+ }
+<YYINITIAL> "("|"[" { context.open_parenthesis();
+ return symbol(GenSym.CHAR,yytext());
+ }
+<YYINITIAL> "}" { context.close_parenthesis();
+ if (context.match_body())
+ return symbol(GenSym.RSB);
+ else return symbol(GenSym.CHAR,yytext());
+ }
+<YYINITIAL> ")" { if (context.close_parenthesis())
+ { context.close_context(this);
+ yybegin(gen);
+ return symbol(GenSym.RP);
+ } else return symbol(GenSym.CHAR,yytext());
+ }
+<YYINITIAL> "]" { if (context.close_parenthesis())
+ { context.close_context(this);
+ yybegin(gen);
+ return symbol(GenSym.RB);
+ } else return symbol(GenSym.CHAR,yytext());
+ }
+\"[^\"]*\" { return symbol(GenSym.CSTRING,yytext().substring(1,yytext().length()-1)); }
+<YYINITIAL> {ID} { return symbol(GenSym.CHAR,yytext()); }
+<YYINITIAL> {OPER} { return symbol(GenSym.CHAR,yytext()); }
+<YYINITIAL> \/\*[^*/]*\*\/ { prev_char_pos = yychar; return symbol(GenSym.CHAR,yytext()); }
+<YYINITIAL> "//"[^\n\r]* { prev_char_pos = 0; return symbol(GenSym.CHAR,yytext()); }
+<YYINITIAL> [ \t\f] { return symbol(GenSym.CHAR,yytext()); }
+<YYINITIAL> {NEWLINE} { prev_char_pos = yychar; return symbol(GenSym.CHAR,yytext()); }
+<YYINITIAL> . { return symbol(GenSym.CHAR,yytext()); }
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/mapreduce/pom.xml
----------------------------------------------------------------------
diff --git a/mapreduce/pom.xml b/mapreduce/pom.xml
new file mode 100644
index 0000000..aa8834e
--- /dev/null
+++ b/mapreduce/pom.xml
@@ -0,0 +1,191 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <groupId>org.apache.mrql</groupId>
+ <artifactId>mrql-mr</artifactId>
+ <packaging>jar</packaging>
+ <name>Apache MRQL MapReduce mode</name>
+ <description>Apache MRQL evaluation in MapReduce mode on Apache Hadoop</description>
+ <url>http://mrql.incubator.apache.org/</url>
+ <inceptionYear>2013</inceptionYear>
+
+ <parent>
+ <groupId>org.apache.mrql</groupId>
+ <artifactId>mrql-parent</artifactId>
+ <version>0.9.1-incubating-SNAPSHOT</version>
+ </parent>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.mrql</groupId>
+ <artifactId>mrql-gen</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.mrql</groupId>
+ <artifactId>mrql-core</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <!-- needed for testing in local mode -->
+ <dependency>
+ <groupId>commons-io</groupId>
+ <artifactId>commons-io</artifactId>
+ <version>2.4</version>
+ </dependency>
+ </dependencies>
+
+ <profiles>
+ <profile>
+ <id>MultipleInputs</id>
+ <activation>
+ <property>
+ <name>MultipleInputs</name>
+ </property>
+ </activation>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>1.8</version>
+ <executions>
+ <execution>
+ <phase>generate-sources</phase>
+ <goals><goal>add-source</goal></goals>
+ <configuration>
+ <sources>
+ <source>../src/main/java/multipleinputs</source>
+ <source>${project.build.directory}/generated-sources</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+ </profile>
+ </profiles>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>1.8</version>
+ <executions>
+ <execution>
+ <phase>generate-sources</phase>
+ <goals><goal>add-source</goal></goals>
+ <configuration>
+ <sources>
+ <source>${project.build.directory}/generated-sources</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>2.1</version>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <artifactSet>
+ <includes>
+ <include>org.apache.mrql:*</include>
+ </includes>
+ <excludes>
+ <exclude>org.apache.mrql:mrql-gen</exclude>
+ <exclude>org.apache.mrql:mrql-core</exclude>
+ </excludes>
+ </artifactSet>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <version>2.4</version>
+ <configuration>
+ <finalName>mrql-mr-${project.version}</finalName>
+ <outputDirectory>${project.parent.basedir}/lib</outputDirectory>
+ <useDefaultManifestFile>true</useDefaultManifestFile>
+ </configuration>
+ </plugin>
+ <plugin>
+ <artifactId>maven-antrun-plugin</artifactId>
+ <version>1.7</version>
+ <executions>
+ <execution>
+ <id>gen</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ <configuration>
+ <target>
+ <mkdir dir="${project.build.directory}/generated-sources/org/apache/mrql" />
+ <property name="compile_classpath" refid="maven.compile.classpath" />
+ <fileset id="mr.gen.path" dir="src/main/java/org/apache/mrql" includes="*.gen" />
+ <pathconvert pathsep=" " property="mr.gen.files" refid="mr.gen.path" />
+ <java classname="org.apache.mrql.gen.Main" classpath="../lib/mrql-gen-${project.version}.jar:${compile_classpath}">
+ <arg line="${mr.gen.files}" />
+ <arg line="-o" />
+ <arg file="${project.build.directory}/generated-sources/org/apache/mrql" />
+ </java>
+ </target>
+ </configuration>
+ </execution>
+ <execution>
+ <id>validate</id>
+ <phase>test</phase>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ <configuration>
+ <target name="validate_hadoop" if="tests" description="Validate all test queries on Apache Hadoop">
+ <property name="runtime_classpath" refid="maven.runtime.classpath" />
+ <echo message="Evaluating test queries in memory (Map-Reduce mode):" />
+ <java classname="org.apache.mrql.Test" classpath="${runtime_classpath}" dir=".." fork="yes">
+ <arg file="../tests/queries" />
+ <arg file="../tests/results/mr-memory" />
+ <arg file="../tests/error_log.txt" />
+ </java>
+ <echo message="Evaluating test queries in Apache Hadoop local mode:" />
+ <java classname="org.apache.mrql.Test" classpath="../lib/mrql-mr-${project.version}.jar:${runtime_classpath}" dir=".." fork="yes" error="/dev/null">
+ <arg line="-local" />
+ <arg file="../tests/queries" />
+ <arg file="../tests/results/hadoop" />
+ <arg file="../tests/error_log.txt" />
+ </java>
+ </target>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/mapreduce/src/main/java/org/apache/mrql/CrossProductOperation.java
----------------------------------------------------------------------
diff --git a/mapreduce/src/main/java/org/apache/mrql/CrossProductOperation.java b/mapreduce/src/main/java/org/apache/mrql/CrossProductOperation.java
new file mode 100644
index 0000000..a9216cc
--- /dev/null
+++ b/mapreduce/src/main/java/org/apache/mrql/CrossProductOperation.java
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import org.apache.mrql.gen.*;
+import java.io.*;
+import java.net.URI;
+import java.util.List;
+import java.util.Vector;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.filecache.DistributedCache;
+
+
+/** The CrossProduct physical operation (similar to block-nested loop) */
+final public class CrossProductOperation extends MapReducePlan {
+
+ /** The mapper for the CrossProduct operation */
+ private final static class crossProductMapper extends Mapper<MRContainer,MRContainer,MRContainer,MRContainer> {
+ private static String counter; // a Hadoop user-defined counter used in the repeat operation
+ private static Function reduce_fnc; // the reduce function
+ private static Function map_fnc; // the mapper function
+ private static DataSet cached_dataset;
+ private final static List<MRData> outer
+ = new Vector<MRData>(Config.map_cache_size); // fix-size cache for the outer
+ private static int index;
+ private static MRContainer last_key;
+ private static URI[] uris;
+ private static Path[] local_paths;
+ private static Function acc_fnc; // aggregator
+ private static MRData result; // aggregation result
+ private static Tuple pair = new Tuple(2);
+ private static MRContainer container = new MRContainer(new MR_int(0));
+
+ private void write ( MRContainer key, MRData value, Context context )
+ throws IOException, InterruptedException {
+ if (result != null) { // aggregation
+ pair.set(0,result);
+ pair.set(1,value);
+ result = acc_fnc.eval(pair);
+ } else if (counter.equals("-")) {
+ container.set(value);
+ context.write(key,container);
+ } else { // increment the repetition counter if the repeat condition is true
+ Tuple t = (Tuple)value;
+ if (((MR_bool)t.second()).get())
+ context.getCounter("mrql",counter).increment(1);
+ container.set(t.first());
+ context.write(key,container);
+ }
+ }
+
+ @Override
+ public void map ( MRContainer key, MRContainer value, Context context )
+ throws IOException, InterruptedException {
+ try {
+ last_key = key;
+ for ( MRData x: (Bag)map_fnc.eval(value.data()) )
+ if (index++ == Config.map_cache_size) {
+ for ( MRData y: cached_data(context.getConfiguration()) ) {
+ pair.set(1,y);
+ for ( MRData z: outer ) {
+ pair.set(0,z);
+ for ( MRData v: (Bag)reduce_fnc.eval(pair) )
+ write(key,v,context);
+ }
+ };
+ index = 0;
+ outer.clear();
+ } else outer.add(x);
+ } catch (Exception e) {
+ throw new Error("Cannot perform the crossProduct: "+e);
+ }
+ }
+
+ protected Bag cached_data ( final Configuration conf ) {
+ try {
+ Bag res = new Bag();
+ final FileSystem fs = FileSystem.getLocal(conf);
+ for ( int i = 0; i < local_paths.length; i++ ) {
+ // hadoop 0.20.2 distributed cache doesn't work in stand-alone
+ final Path path = (conf.get("mapred.job.tracker").equals("local"))
+ ? new Path(uris[i].toString())
+ : local_paths[i];
+ if (path.getName().endsWith(".jar"))
+ continue;
+ res = res.union(new Bag(new BagIterator () {
+ final SequenceFile.Reader reader = new SequenceFile.Reader(fs,path,conf);
+ final MRContainer key = new MRContainer(new MR_int(0));
+ final MRContainer value = new MRContainer(new MR_int(0));
+ public boolean hasNext () {
+ try {
+ boolean done = reader.next(key,value);
+ if (!done)
+ reader.close();
+ return done;
+ } catch (IOException e) {
+ throw new Error("Cannot collect values from distributed cache");
+ }
+ }
+ public MRData next () {
+ return value.data();
+ }
+ }));
+ };
+ return res;
+ } catch (Exception e) {
+ throw new Error("Cannot setup the cross product: "+e);
+ }
+ }
+
+ @Override
+ protected void setup ( Context context ) throws IOException,InterruptedException {
+ super.setup(context);
+ try {
+ Configuration conf = context.getConfiguration();
+ Config.read(conf);
+ if (Plan.conf == null)
+ Plan.conf = conf;
+ Tree code = Tree.parse(conf.get("mrql.reducer"));
+ reduce_fnc = functional_argument(conf,code);
+ code = Tree.parse(conf.get("mrql.mapper"));
+ map_fnc = functional_argument(conf,code);
+ if (conf.get("mrql.zero") != null) {
+ code = Tree.parse(conf.get("mrql.zero"));
+ result = Interpreter.evalE(code);
+ code = Tree.parse(conf.get("mrql.accumulator"));
+ acc_fnc = functional_argument(conf,code);
+ } else result = null;
+ counter = conf.get("mrql.counter");
+ uris = DistributedCache.getCacheFiles(conf);
+ local_paths = DistributedCache.getLocalCacheFiles(conf);
+ index = 0;
+ } catch (Exception e) {
+ throw new Error("Cannot setup the crossProduct: "+e);
+ }
+ }
+
+ @Override
+ protected void cleanup ( Context context ) throws IOException,InterruptedException {
+ if (index > 0)
+ try {
+ for ( MRData y: cached_data(context.getConfiguration()) ) {
+ pair.set(1,y);
+ for ( MRData z: outer ) {
+ pair.set(0,z);
+ for ( MRData v: (Bag)reduce_fnc.eval(pair) )
+ write(last_key,v,context);
+ }
+ };
+ } catch (Exception e) {
+ throw new Error("Cannot cleanup the crossProduct: "+e);
+ };
+ index = 0;
+ outer.clear();
+ if (result != null) // emit the result of aggregation
+ context.write(new MRContainer(new MR_int(0)),new MRContainer(result));
+ super.cleanup(context);
+ }
+ }
+
+ /** The CrossProduct physical operator (similar to block-nested loop)
+ * @param mx left mapper
+ * @param my right mapper
+ * @param reduce_fnc reducer
+ * @param acc_fnc optional accumulator function
+ * @param zero optional the zero value for the accumulator
+ * @param X the left source
+ * @param Y the right source (stored in distributed cache)
+ * @param stop_counter optional counter used in repeat operation
+ * @return a new data source that contains the result
+ */
+ public final static DataSet crossProduct ( Tree mx, // left mapper
+ Tree my, // right mapper
+ Tree reduce_fnc, // reducer
+ Tree acc_fnc, // optional accumulator function
+ Tree zero, // optional the zero value for the accumulator
+ DataSet X, // the left source
+ DataSet Y, // the right source (stored in distributed cache)
+ String stop_counter ) // optional counter used in repeat operation
+ throws Exception {
+ DataSet ds = MapOperation.cMap(my,null,null,Y,"-");
+ String newpath = new_path(conf);
+ conf.set("mrql.reducer",reduce_fnc.toString());
+ conf.set("mrql.mapper",mx.toString());
+ if (zero != null) {
+ conf.set("mrql.accumulator",acc_fnc.toString());
+ conf.set("mrql.zero",zero.toString());
+ conf.set("mapred.min.split.size","268435456");
+ } else conf.set("mrql.zero","");
+ conf.set("mrql.counter",stop_counter);
+ Job job = new Job(conf,newpath);
+ distribute_compiled_arguments(job.getConfiguration());
+ job.setJarByClass(MapReducePlan.class);
+ job.setOutputKeyClass(MRContainer.class);
+ job.setOutputValueClass(MRContainer.class);
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+ PathFilter pf = new PathFilter () { public boolean accept ( Path path ) {
+ return !path.getName().startsWith("_");
+ } };
+ for (DataSource p: ds.source) {
+ Path path = new Path(p.path);
+ for ( FileStatus s: path.getFileSystem(conf).listStatus(path,pf) )
+ DistributedCache.addCacheFile(s.getPath().toUri(),job.getConfiguration());
+ };
+ for (DataSource p: X.source)
+ MultipleInputs.addInputPath(job,new Path(p.path),(Class<? extends MapReduceMRQLFileInputFormat>)p.inputFormat,crossProductMapper.class);
+ FileOutputFormat.setOutputPath(job,new Path(newpath));
+ job.setNumReduceTasks(0);
+ job.waitForCompletion(true);
+ long c = (stop_counter.equals("-")) ? 0
+ : job.getCounters().findCounter("mrql",stop_counter).getValue();
+ return new DataSet(new BinaryDataSource(newpath,conf),c,outputRecords(job));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-mrql/blob/1adaa71c/mapreduce/src/main/java/org/apache/mrql/GroupByJoinPlan.java
----------------------------------------------------------------------
diff --git a/mapreduce/src/main/java/org/apache/mrql/GroupByJoinPlan.java b/mapreduce/src/main/java/org/apache/mrql/GroupByJoinPlan.java
new file mode 100644
index 0000000..4f93e1e
--- /dev/null
+++ b/mapreduce/src/main/java/org/apache/mrql/GroupByJoinPlan.java
@@ -0,0 +1,464 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.mrql;
+
+import org.apache.mrql.gen.*;
+import java.io.*;
+import java.util.*;
+import org.apache.hadoop.fs.*;
+import org.apache.hadoop.io.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.lib.input.MultipleInputs;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+
+
+/**
+ * A map-reduce job that captures a join with group-by. Similar to matrix multiplication.<br/>
+ * It captures queries of the form:
+ * <pre>
+ * select r(kx,ky,c(z))
+ * from x in X, y in Y, z = mp(x,y)
+ * where jx(x) = jy(y)
+ * group by (kx,ky): (gx(x),gy(y));
+ * </pre>
+ * where: mp: map function, r: reduce function, c: combine function,
+ * jx: left join key function, jy: right join key function,
+ * gx: left group-by function, gy: right group-by function.
+ * <br/>
+ * Example: matrix multiplication:
+ * <pre>
+ * select ( sum(z), i, j )
+ * from (x,i,k) in X, (y,k,j) in Y, z = x*y
+ * group by (i,j);
+ * </pre>
+ * It uses m*n partitions, so that n/m=|X|/|Y| and a hash table of size |X|/n*|Y|/m can fit in memory M.
+ * That is, n = |X|/sqrt(M), m = |Y|/sqrt(M).
+ * Each partition generates |X|/n*|Y|/m data. It replicates X n times and Y m times.
+ * Uses a hash-table H of size |X|/n*|Y|/m
+ * MapReduce pseudo-code:
+ * <pre>
+ * mapX ( x )
+ * for i = 0,n-1
+ * emit ( ((hash(gx(x)) % m)+m*i, jx(x), 1), (1,x) )
+ *
+ * mapY ( y )
+ * for i = 0,m-1
+ * emit ( ((hash(gy(y)) % n)*m+i, jy(y), 2), (2,y) )
+ * </pre>
+ * mapper output key: (partition,joinkey,tag), value: (tag,data) <br/>
+ * Partitioner: over partition <br/>
+ * GroupingComparator: over partition and joinkey <br/>
+ * SortComparator: major partition, minor joinkey, sub-minor tag <br/>
+ * <pre>
+ * reduce ( (p,_,_), s )
+ * if p != current_partition
+ * flush()
+ * current_partition = p
+ * read x from s first and store it to xs
+ * for each y from the rest of s
+ * for each x in xs
+ * H[(gx(x),gy(y))] = c( H[(gx(x),gy(y))], mp((x,y)) )
+ * </pre>
+ * where flush() is: for each ((kx,ky),v) in H: emit r((kx,ky),v)
+ */
+final public class GroupByJoinPlan extends Plan {
+
+ /** mapper output key: (partition,joinkey,tag) */
+ private final static class GroupByJoinKey implements Writable {
+ public int partition; // one of n*m
+ public byte tag; // 1 or 2
+ public MRData key;
+
+ GroupByJoinKey () {}
+ GroupByJoinKey ( int p, byte t, MRData k ) {
+ partition = p;
+ tag = t;
+ key = k;
+ }
+
+ public void write ( DataOutput out ) throws IOException {
+ out.writeByte(tag);
+ WritableUtils.writeVInt(out,partition);
+ key.write(out);
+ }
+
+ public void readFields ( DataInput in ) throws IOException {
+ tag = in.readByte();
+ partition = WritableUtils.readVInt(in);
+ key = MRContainer.read(in);
+ }
+
+ public String toString () { return "["+partition+","+tag+","+key+"]"; }
+ }
+
+ /** partition based on key.partition only */
+ private final static class GroupByJoinPartitioner extends Partitioner<GroupByJoinKey,MRContainer> {
+ final public int getPartition ( GroupByJoinKey key, MRContainer value, int numPartitions ) {
+ return key.partition % numPartitions;
+ }
+ }
+
+ /** sorting with major order key.partition, minor key.key, minor key.tag */
+ private final static class GroupByJoinSortComparator implements RawComparator<GroupByJoinKey> {
+ int[] container_size;
+
+ public GroupByJoinSortComparator () {
+ container_size = new int[1];
+ }
+
+ final public int compare ( byte[] x, int xs, int xl, byte[] y, int ys, int yl ) {
+ try {
+ int c = WritableComparator.readVInt(x,xs+1)-WritableComparator.readVInt(y,ys+1);
+ if (c != 0)
+ return c;
+ int tsize = 1+WritableUtils.decodeVIntSize(x[xs+1]);
+ c = MRContainer.compare(x,xs+tsize,xl-tsize,y,ys+tsize,yl-tsize,container_size);
+ if (c != 0)
+ return c;
+ return x[xs] - y[ys];
+ } catch (IOException e) {
+ throw new Error(e);
+ }
+ }
+
+ final public int compare ( GroupByJoinKey x, GroupByJoinKey y ) {
+ int c = x.partition - y.partition;
+ if (c != 0)
+ return c;
+ c = x.key.compareTo(y.key);
+ if (c != 0)
+ return c;
+ return x.tag - y.tag;
+ }
+ }
+
+ /** grouping by key.partition and key.key */
+ private final static class GroupByJoinGroupingComparator implements RawComparator<GroupByJoinKey> {
+ int[] container_size;
+
+ public GroupByJoinGroupingComparator() {
+ container_size = new int[1];
+ }
+
+ final public int compare ( byte[] x, int xs, int xl, byte[] y, int ys, int yl ) {
+ try {
+ int c = WritableComparator.readVInt(x,xs+1)-WritableComparator.readVInt(y,ys+1);
+ if (c != 0)
+ return c;
+ int tsize = 1+WritableUtils.decodeVIntSize(x[xs+1]);
+ return MRContainer.compare(x,xs+tsize,xl-tsize,y,ys+tsize,yl-tsize,container_size);
+ } catch (IOException e) {
+ throw new Error(e);
+ }
+ }
+
+ final public int compare ( GroupByJoinKey x, GroupByJoinKey y ) {
+ int c = x.partition - y.partition;
+ return (c != 0) ? c : x.key.compareTo(y.key);
+ }
+ }
+
+ /** the left GroupByJoin mapper */
+ private final static class MapperLeft extends Mapper<MRContainer,MRContainer,GroupByJoinKey,MRContainer> {
+ private static int n, m;
+ private static Function left_join_key_fnc;
+ private static Function left_groupby_fnc;
+ private static GroupByJoinKey ckey = new GroupByJoinKey(0,(byte)1,new MR_int(0));
+ private static Tuple tvalue = (new Tuple(2)).set(0,new MR_byte(1));
+ private static MRContainer cvalue = new MRContainer(tvalue);
+
+ @Override
+ public void map ( MRContainer key, MRContainer value, Context context )
+ throws IOException, InterruptedException {
+ MRData data = value.data();
+ for ( int i = 0; i < n; i++ ) {
+ ckey.partition = (left_groupby_fnc.eval(data).hashCode() % m)+m*i;
+ ckey.key = left_join_key_fnc.eval(data);
+ tvalue.set(1,data);
+ context.write(ckey,cvalue);
+ }
+ }
+
+ @Override
+ protected void setup ( Context context ) throws IOException,InterruptedException {
+ super.setup(context);
+ try {
+ Configuration conf = context.getConfiguration();
+ Config.read(conf);
+ if (Plan.conf == null)
+ Plan.conf = conf;
+ Tree code = Tree.parse(conf.get("mrql.join.key.left"));
+ left_join_key_fnc = functional_argument(conf,code);
+ code = Tree.parse(conf.get("mrql.groupby.left"));
+ left_groupby_fnc = functional_argument(conf,code);
+ m = conf.getInt("mrql.m",1);
+ n = conf.getInt("mrql.n",1);
+ } catch (Exception e) {
+ throw new Error("Cannot retrieve the left mapper plan");
+ }
+ }
+ }
+
+ /** the right GroupByJoin mapper */
+ private final static class MapperRight extends Mapper<MRContainer,MRContainer,GroupByJoinKey,MRContainer> {
+ private static int n, m;
+ private static Function right_join_key_fnc;
+ private static Function right_groupby_fnc;
+ private static GroupByJoinKey ckey = new GroupByJoinKey(0,(byte)2,new MR_int(0));
+ private static Tuple tvalue = (new Tuple(2)).set(0,new MR_byte(2));
+ private static MRContainer cvalue = new MRContainer(tvalue);
+
+ @Override
+ public void map ( MRContainer key, MRContainer value, Context context )
+ throws IOException, InterruptedException {
+ MRData data = value.data();
+ for ( int i = 0; i < m; i++ ) {
+ ckey.partition = (right_groupby_fnc.eval(data).hashCode() % n)*m+i;
+ ckey.key = right_join_key_fnc.eval(data);
+ tvalue.set(1,data);
+ context.write(ckey,cvalue);
+ }
+ }
+
+ @Override
+ protected void setup ( Context context ) throws IOException,InterruptedException {
+ super.setup(context);
+ try {
+ Configuration conf = context.getConfiguration();
+ Config.read(conf);
+ if (Plan.conf == null)
+ Plan.conf = conf;
+ Tree code = Tree.parse(conf.get("mrql.join.key.right"));
+ right_join_key_fnc = functional_argument(conf,code);
+ code = Tree.parse(conf.get("mrql.groupby.right"));
+ right_groupby_fnc = functional_argument(conf,code);
+ m = conf.getInt("mrql.m",1);
+ n = conf.getInt("mrql.n",1);
+ } catch (Exception e) {
+ throw new Error("Cannot retrieve the right mapper plan");
+ }
+ }
+ }
+
+ /** the GroupByJoin reducer */
+ private static class JoinReducer extends Reducer<GroupByJoinKey,MRContainer,MRContainer,MRContainer> {
+ private static String counter; // a Hadoop user-defined counter used in the repeat operation
+ private static int n, m; // n*m partitioners
+ private static Function left_groupby_fnc; // left group-by function
+ private static Function right_groupby_fnc;// right group-by function
+ private static Function map_fnc; // the map function
+ private static Function combine_fnc; // the combine function
+ private static Function reduce_fnc; // the reduce function
+ private static Bag left = new Bag(); // a cached bag of input fragments from left input
+ private static int current_partition = -1;
+ private static Hashtable<MRData,MRData> hashTable; // in-reducer combiner
+ private static Tuple pair = new Tuple(2);
+ private static MRContainer ckey = new MRContainer(new MR_int(0));
+ private static MRContainer cvalue = new MRContainer(new MR_int(0));
+ private static MRContainer container = new MRContainer(new MR_int(0));
+ private static Tuple tkey = new Tuple(2);
+ private static Bag tbag = new Bag(2);
+
+ private static void write ( MRContainer key, MRData value, Context context )
+ throws IOException, InterruptedException {
+ if (counter.equals("-")) {
+ container.set(value);
+ context.write(key,container);
+ } else { // increment the repetition counter if the repeat condition is true
+ Tuple t = (Tuple)value;
+ if (((MR_bool)t.second()).get())
+ context.getCounter("mrql",counter).increment(1);
+ container.set(t.first());
+ context.write(key,container);
+ }
+ }
+
+ private void store ( MRData key, MRData value ) throws IOException {
+ MRData old = hashTable.get(key);
+ Tuple k = (Tuple)key;
+ pair.set(0,key);
+ for ( MRData e: (Bag)map_fnc.eval(value) )
+ if (old == null)
+ hashTable.put(key,e);
+ else {
+ tbag.clear();
+ tbag.add_element(e).add_element(old);
+ pair.set(1,tbag);
+ for ( MRData z: (Bag)combine_fnc.eval(pair) )
+ hashTable.put(key,z); // normally, done once
+ }
+ }
+
+ protected static void flush_table ( Context context ) throws IOException, InterruptedException {
+ Enumeration<MRData> en = hashTable.keys();
+ while (en.hasMoreElements()) {
+ MRData key = en.nextElement();
+ MRData value = hashTable.get(key);
+ ckey.set(key);
+ pair.set(0,key);
+ tbag.clear();
+ tbag.add_element(value);
+ pair.set(1,tbag);
+ for ( MRData e: (Bag)reduce_fnc.eval(pair) )
+ write(ckey,e,context);
+ };
+ hashTable.clear();
+ }
+
+ @Override
+ public void reduce ( GroupByJoinKey key, Iterable<MRContainer> values, Context context )
+ throws IOException, InterruptedException {
+ if (key.partition != current_partition && current_partition > 0) {
+ // at the end of a partition, flush the hash table
+ flush_table(context);
+ current_partition = key.partition;
+ };
+ left.clear();
+ Tuple p = null;
+ final Iterator<MRContainer> i = values.iterator();
+ // left tuples arrive before right tuples; cache the left values into the left bag
+ while (i.hasNext()) {
+ p = (Tuple)i.next().data();
+ if (((MR_byte)p.first()).get() == 2)
+ break;
+ left.add(p.second());
+ p = null;
+ };
+ // the previous value was from right
+ if (p != null) {
+ MRData y = p.second();
+ MRData gy = right_groupby_fnc.eval(y);
+ // cross product with left (must use new Tuples)
+ for ( MRData x: left )
+ store(new Tuple(left_groupby_fnc.eval(x),gy),new Tuple(x,y));
+ // the rest of values are from right
+ while (i.hasNext()) {
+ y = ((Tuple)i.next().data()).second();
+ gy = right_groupby_fnc.eval(y);
+ // cross product with left (must use new Tuples)
+ for ( MRData x: left )
+ store(new Tuple(left_groupby_fnc.eval(x),gy),new Tuple(x,y));
+ }
+ }
+ }
+
+ @Override
+ protected void setup ( Context context ) throws IOException,InterruptedException {
+ super.setup(context);
+ try {
+ Configuration conf = context.getConfiguration();
+ Config.read(conf);
+ if (Plan.conf == null)
+ Plan.conf = conf;
+ Tree code = Tree.parse(conf.get("mrql.groupby.left"));
+ left_groupby_fnc = functional_argument(conf,code);
+ code = Tree.parse(conf.get("mrql.groupby.right"));
+ right_groupby_fnc = functional_argument(conf,code);
+ m = conf.getInt("mrql.m",1);
+ n = conf.getInt("mrql.n",1);
+ code = Tree.parse(conf.get("mrql.mapper"));
+ map_fnc = functional_argument(conf,code);
+ code = Tree.parse(conf.get("mrql.combiner"));
+ combine_fnc = functional_argument(conf,code);
+ code = Tree.parse(conf.get("mrql.reducer"));
+ reduce_fnc = functional_argument(conf,code);
+ counter = conf.get("mrql.counter");
+ hashTable = new Hashtable<MRData,MRData>(Config.map_cache_size);
+ } catch (Exception e) {
+ throw new Error("Cannot retrieve the reducer plan");
+ }
+ }
+
+ @Override
+ protected void cleanup ( Context context ) throws IOException,InterruptedException {
+ if (hashTable != null)
+ flush_table(context);
+ hashTable = null; // garbage-collect it
+ super.cleanup(context);
+ }
+ }
+
+ /** the GroupByJoin operation
+ * @param left_join_key_fnc left join key function
+ * @param right_join_key_fnc right join key function
+ * @param left_groupby_fnc left group-by function
+ * @param right_groupby_fnc right group-by function
+ * @param map_fnc map function
+ * @param combine_fnc combine function
+ * @param reduce_fnc reduce function
+ * @param X left data set
+ * @param Y right data set
+ * @param num_reducers number of reducers
+ * @param n left dimension of the reducer grid
+ * @param m right dimension of the reducer grid
+ * @param stop_counter optional counter used in repeat operation
+ * @return a DataSet that contains the result
+ */
+ public final static DataSet groupByJoin
+ ( Tree left_join_key_fnc, // left join key function
+ Tree right_join_key_fnc, // right join key function
+ Tree left_groupby_fnc, // left group-by function
+ Tree right_groupby_fnc, // right group-by function
+ Tree map_fnc, // map function
+ Tree combine_fnc, // combine function
+ Tree reduce_fnc, // reduce function
+ DataSet X, // left data set
+ DataSet Y, // right data set
+ int num_reducers, // number of reducers
+ int n, int m, // dimensions of the reducer grid
+ String stop_counter ) // optional counter used in repeat operation
+ throws Exception {
+ String newpath = new_path(conf);
+ conf.set("mrql.join.key.left",left_join_key_fnc.toString());
+ conf.set("mrql.join.key.right",right_join_key_fnc.toString());
+ conf.set("mrql.groupby.left",left_groupby_fnc.toString());
+ conf.set("mrql.groupby.right",right_groupby_fnc.toString());
+ conf.setInt("mrql.m",m);
+ conf.setInt("mrql.n",n);
+ conf.set("mrql.mapper",map_fnc.toString());
+ conf.set("mrql.combiner",combine_fnc.toString());
+ conf.set("mrql.reducer",reduce_fnc.toString());
+ conf.set("mrql.counter",stop_counter);
+ Job job = new Job(conf,newpath);
+ distribute_compiled_arguments(job.getConfiguration());
+ job.setMapOutputKeyClass(GroupByJoinKey.class);
+ job.setJarByClass(GroupByJoinPlan.class);
+ job.setOutputKeyClass(MRContainer.class);
+ job.setOutputValueClass(MRContainer.class);
+ job.setPartitionerClass(GroupByJoinPartitioner.class);
+ job.setSortComparatorClass(GroupByJoinSortComparator.class);
+ job.setGroupingComparatorClass(GroupByJoinGroupingComparator.class);
+ job.setOutputFormatClass(SequenceFileOutputFormat.class);
+ FileOutputFormat.setOutputPath(job,new Path(newpath));
+ for (DataSource p: X.source)
+ MultipleInputs.addInputPath(job,new Path(p.path),(Class<? extends MapReduceMRQLFileInputFormat>)p.inputFormat,MapperLeft.class);
+ for (DataSource p: Y.source)
+ MultipleInputs.addInputPath(job,new Path(p.path),(Class<? extends MapReduceMRQLFileInputFormat>)p.inputFormat,MapperRight.class);
+ job.setReducerClass(JoinReducer.class);
+ if (num_reducers > 0)
+ job.setNumReduceTasks(num_reducers);
+ job.waitForCompletion(true);
+ long c = (stop_counter.equals("-")) ? 0
+ : job.getCounters().findCounter("mrql",stop_counter).getValue();
+ DataSource s = new BinaryDataSource(newpath,conf);
+ s.to_be_merged = false;
+ return new DataSet(s,c,MapReducePlan.outputRecords(job));
+ }
+}