You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2008/05/22 21:04:26 UTC
svn commit: r659206 - in /incubator/pig/branches/types: src/org/apache/pig/
src/org/apache/pig/impl/logicalLayer/
src/org/apache/pig/impl/logicalLayer/parser/ test/org/apache/pig/test/
Author: gates
Date: Thu May 22 12:04:25 2008
New Revision: 659206
URL: http://svn.apache.org/viewvc?rev=659206&view=rev
Log:
PIG-159 Santhosh's work to chain together logical plans.
Modified:
incubator/pig/branches/types/src/org/apache/pig/PigServer.java
incubator/pig/branches/types/src/org/apache/pig/StandAloneParser.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalPlanBuilder.java
incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
incubator/pig/branches/types/test/org/apache/pig/test/TestLogToPhyCompiler.java
incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java
incubator/pig/branches/types/test/org/apache/pig/test/TestTypeChecking.java
Modified: incubator/pig/branches/types/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/PigServer.java?rev=659206&r1=659205&r2=659206&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/PigServer.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/PigServer.java Thu May 22 12:04:25 2008
@@ -84,7 +84,7 @@
}
- Map<String, LogicalPlan> aliases = new HashMap<String, LogicalPlan>();
+ Map<LogicalOperator, LogicalPlan> aliases = new HashMap<LogicalOperator, LogicalPlan>();
Map<OperatorKey, LogicalOperator> opTable = new HashMap<OperatorKey, LogicalOperator>();
Map<String, LogicalOperator> aliasOp = new HashMap<String, LogicalOperator>();
Map<String, ExpressionOperator> defineAliases = new HashMap<String, ExpressionOperator>();
@@ -252,7 +252,7 @@
* result
*/
public Iterator<Tuple> openIterator(String id) throws IOException {
- if (!aliases.containsKey(id))
+ if (!aliases.containsKey(aliasOp.get(id)))
throw new IOException("Invalid alias: " + id);
try {
@@ -471,7 +471,14 @@
}
public Map<String, LogicalPlan> getAliases() {
- return this.aliases;
+ Map<String, LogicalPlan> aliasPlans = new HashMap<String, LogicalPlan>();
+ for(LogicalOperator op: this.aliases.keySet()) {
+ String alias = op.getAlias();
+ if(null != alias) {
+ aliasPlans.put(alias, this.aliases.get(op));
+ }
+ }
+ return aliasPlans;
}
public void shutdown() {
Modified: incubator/pig/branches/types/src/org/apache/pig/StandAloneParser.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/StandAloneParser.java?rev=659206&r1=659205&r2=659206&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/StandAloneParser.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/StandAloneParser.java Thu May 22 12:04:25 2008
@@ -21,6 +21,7 @@
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Iterator;
+import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -28,6 +29,7 @@
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.impl.logicalLayer.LogicalOperator;
public class StandAloneParser {
@@ -66,9 +68,10 @@
try{
pig.registerQuery(query);
System.out.print("Current aliases: ");
- for (Iterator<String> it = pig.getAliases().keySet().iterator(); it.hasNext(); ) {
+ Map<String, LogicalPlan> aliasPlan = pig.getAliases();
+ for (Iterator<String> it = aliasPlan.keySet().iterator(); it.hasNext(); ) {
String alias = it.next();
- LogicalPlan lp = pig.getAliases().get(alias);
+ LogicalPlan lp = aliasPlan.get(alias);
System.out.print(alias + "->" + lp.getLeaves().get(0).getSchema());
if (it.hasNext()) System.out.print(", \n");
else System.out.print("\n");
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalPlanBuilder.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalPlanBuilder.java?rev=659206&r1=659205&r2=659206&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalPlanBuilder.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalPlanBuilder.java Thu May 22 12:04:25 2008
@@ -40,7 +40,7 @@
public LogicalPlan parse(String scope,
String query,
- Map<String, LogicalPlan> aliases,
+ Map<LogicalOperator, LogicalPlan> aliases,
Map<OperatorKey, LogicalOperator> opTable,
Map<String, LogicalOperator> aliasOp,
Map<String, ExpressionOperator> defineAliases)
Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=659206&r1=659205&r2=659206&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Thu May 22 12:04:25 2008
@@ -52,7 +52,7 @@
public class QueryParser {
private PigContext pigContext;
- private Map<String, LogicalPlan> aliases;
+ private Map<LogicalOperator, LogicalPlan> aliases;
private Map<OperatorKey, LogicalOperator> opTable;
private String scope;
private NodeIdGenerator nodeIdGen;
@@ -68,7 +68,7 @@
public QueryParser(InputStream in,
PigContext pigContext,
String scope,
- Map<String, LogicalPlan> aliases,
+ Map<LogicalOperator, LogicalPlan> aliases,
Map<OperatorKey, LogicalOperator> opTable,
Map<String, LogicalOperator> aliasOp,
Map<String, ExpressionOperator> defineAliases) {
@@ -270,8 +270,8 @@
void addSplitOutput(LogicalPlan lp, LOSplit splitOp, String alias, LogicalPlan condPlan, int index) throws PlanException{
LogicalOperator splitOutput = new LOSplitOutput(lp, new OperatorKey(scope, getNextId()), index, condPlan);
splitOp.addOutput(splitOutput);
- //splitOp.addOutputAlias(alias, condPlan);
addAlias(alias, splitOutput);
+ addLogicalPlan(splitOutput, lp);
lp.add(splitOutput);
log.debug("Added alias: " + splitOutput.getAlias() + " class: "
@@ -332,6 +332,34 @@
//END
private static Map<String, Byte> nameToTypeMap = DataType.genNameToTypeMap();
+
+ public void addLogicalPlan(LogicalOperator op, LogicalPlan plan) {
+ aliases.put(op, plan);
+ }
+
+ public LogicalPlan getLogicalPlan(LogicalOperator op) {
+ return aliases.get(op);
+ }
+
+ public void attachPlan(LogicalPlan lp, LogicalOperator root, LogicalPlan rootPlan) throws ParseException {
+ log.trace("Entering attachPlan");
+ lp.add(root);
+ log.debug("Added operator " + root + " to the logical plan " + lp);
+ if(null == rootPlan.getPredecessors(root)) {
+ log.trace("Exiting attachPlan");
+ return;
+ }
+ for(LogicalOperator rootPred: rootPlan.getPredecessors(root)) {
+ attachPlan(lp, rootPred, rootPlan);
+ try {
+ lp.connect(rootPred, root);
+ log.debug("Connected operator " + rootPred + " to " + root + " in the logical plan " + lp);
+ } catch (FrontendException fee) {
+ throw new ParseException(fee.getMessage());
+ }
+ }
+ log.trace("Exiting attachPlan");
+ }
}
@@ -435,8 +463,29 @@
)
{
if(null != root) {
- log.debug("Root: " + root.getClass().getName() + " schema: " + root.getSchema());
+ log.debug("Adding " + root.getAlias() + " " + root + " to the lookup table " + aliases);
+ addLogicalPlan(root, lp);
+ log.debug("Root: " + root.getClass().getName() + " schema: " + root.getSchema());
}
+
+ ArrayList<LogicalOperator> roots = new ArrayList<LogicalOperator>(lp.getRoots().size());
+ for(LogicalOperator op: lp.getRoots()) {
+ roots.add(op);
+ }
+
+ for(LogicalOperator op: roots) {
+ //At this point we have a logical plan for the pig statement
+ //In order to construct the entire logical plan we need to traverse
+ //each root and get the logical plan it belongs to. From each of those
+ //plans we need the predecessors of the root of the current logical plan
+ //and so on. This is a computationally intensive operatton but should
+ //be fine as its restricted to the parser
+
+ LogicalPlan rootPlan = aliases.get(op);
+ if(null != rootPlan) {
+ attachPlan(lp, op, rootPlan);
+ }
+ }
log.trace("Exiting Parse");
return lp;
@@ -564,7 +613,7 @@
{
(
(
- (<LOAD> op = LoadClause(lp) [<AS> (LOOKAHEAD(2) "(" schema = TupleSchema() ")" {op.setSchema(schema);log.debug("Load as schema()");schema.printAliases();} | fs = AtomSchema() {schema = new Schema(fs); op.setSchema(schema); log.info("Load as atomschema()");schema.printAliases();}) ])
+ (<LOAD> op = LoadClause(lp) [<AS> (LOOKAHEAD(2) "(" schema = TupleSchema() ")" {op.setSchema(schema);log.debug("Load as schema()");schema.printAliases();} | fs = AtomSchema() {schema = new Schema(fs); op.setSchema(schema); log.debug("Load as atomschema()");schema.printAliases();}) ])
| ((<GROUP> | <COGROUP>) op = CogroupClause(lp))
| (<FILTER> op = FilterClause(lp))
| (<ORDER> op = OrderClause(lp))
@@ -1485,10 +1534,8 @@
}
{
(
-// lhs = UnaryExpr(over,specs,lp,input)
lhs = CastExpr(over,specs,lp,input)
(
-// ( t = <STAR> | t = "/" | t = "%") rhs = UnaryExpr(over,specs,lp,input)
( t = <STAR> | t = "/" | t = "%") rhs = CastExpr(over,specs,lp,input)
{
assertAtomic(lhs,true);
@@ -1564,7 +1611,6 @@
log.trace("Entering NegativeExpr");
}
{
-// "-" c1=UnaryExpr(over,specs,lp,input)
"-" c1=CastExpr(over,specs,lp,input)
{
ExpressionOperator eOp = new LONegative(lp, new OperatorKey(scope, getNextId()), c1);
@@ -1768,12 +1814,6 @@
[type = Type()] funcName=EvalFunction() "(" args=FuncDeclareArgs(lp) ")"
{
ExpressionOperator userFunc = new LOUserFunc(lp, new OperatorKey(scope, getNextId()), funcName, args, type);
- //lp.add(userFunc);
- //log.debug("FuncDeclareSpec: Added operator " + userFunc.getClass().getName() + " " + userFunc + " to logical plan " + lp);
- //for(ExpressionOperator exprOp: args) {
- //lp.connect(exprOp, userFunc);
- //log.debug("FuncDeclareSpec: Connected operator " + exprOp.getClass().getName() + " " + exprOp+ " to " + userFunc + " logical plan " + lp);
- //}
log.trace("Exiting FuncDeclareSpec");
return userFunc;
}
@@ -1815,8 +1855,6 @@
project.setAlias(t.image);
}
item = project;
- //lp.add(project);
- log.debug("FuncDeclareArgsItem: Added operator " + project.getClass().getName() + " " + project + " to logical plan " + lp);
}
)
{log.trace("Exiting FuncDeclareArgsItem");return item;}
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestLogToPhyCompiler.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestLogToPhyCompiler.java?rev=659206&r1=659205&r2=659206&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestLogToPhyCompiler.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestLogToPhyCompiler.java Thu May 22 12:04:25 2008
@@ -37,6 +37,7 @@
import org.apache.pig.impl.logicalLayer.ExpressionOperator;
import org.apache.pig.impl.logicalLayer.LogToPhyTranslationVisitor;
import org.apache.pig.impl.logicalLayer.LogicalOperator;
+import org.apache.pig.impl.logicalLayer.LOLoad;
import org.apache.pig.impl.logicalLayer.LogicalPlan;
import org.apache.pig.impl.logicalLayer.LogicalPlanBuilder;
import org.apache.pig.impl.logicalLayer.OperatorKey;
@@ -382,13 +383,12 @@
defineAliases);
List<LogicalOperator> roots = lp.getRoots();
+
if(roots.size() > 0) {
- if (logicalOpTable.get(roots.get(0)) instanceof LogicalOperator){
- System.out.println(query);
- System.out.println(logicalOpTable.get(roots.get(0)));
- }
- if ((roots.get(0)).getAlias()!=null){
- aliases.put((roots.get(0)).getAlias(), lp);
+ for(LogicalOperator op: roots) {
+ if (!(op instanceof LOLoad)){
+ throw new Exception("Cannot have a root that is not the load operator LOLoad. Found " + op.getClass().getName());
+ }
}
}
@@ -427,7 +427,7 @@
return null;
}
- Map<String, LogicalPlan> aliases = new HashMap<String, LogicalPlan>();
+ Map<LogicalOperator, LogicalPlan> aliases = new HashMap<LogicalOperator, LogicalPlan>();
Map<OperatorKey, LogicalOperator> logicalOpTable = new HashMap<OperatorKey, LogicalOperator>();
Map<String, LogicalOperator> aliasOp = new HashMap<String, LogicalOperator>();
Map<String, ExpressionOperator> defineAliases = new HashMap<String, ExpressionOperator>();
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java?rev=659206&r1=659205&r2=659206&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java Thu May 22 12:04:25 2008
@@ -807,7 +807,7 @@
@Test
public void testQuery68() {
buildPlan(" a = load 'input1';");
- buildPlan(" b = foreach a generate {(16, 4.0e-2, 'hello'), (0.5f, 'another tuple', 12l, {()})};");
+ buildPlan(" b = foreach a generate 10, {(16, 4.0e-2, 'hello'), (0.5f, 'another tuple', 12l, {()})};");
}
@Test
@@ -875,7 +875,8 @@
@Test
public void testQuery75() {
- buildPlan("union (load 'a'), (load 'b'), (load 'c');");
+ buildPlan("a = union (load 'a'), (load 'b'), (load 'c');");
+ buildPlan("b = foreach a {generate $0;} parallel 10;");
}
// Helper Functions
@@ -901,12 +902,10 @@
List<LogicalOperator> roots = lp.getRoots();
if(roots.size() > 0) {
- if (logicalOpTable.get(roots.get(0)) instanceof LogicalOperator){
- System.out.println(query);
- System.out.println(logicalOpTable.get(roots.get(0)));
- }
- if ((roots.get(0)).getAlias()!=null){
- aliases.put((roots.get(0)).getAlias(), lp);
+ for(LogicalOperator op: roots) {
+ if (!(op instanceof LOLoad)){
+ throw new Exception("Cannot have a root that is not the load operator LOLoad. Found " + op.getClass().getName());
+ }
}
}
@@ -946,7 +945,7 @@
return null;
}
- Map<String, LogicalPlan> aliases = new HashMap<String, LogicalPlan>();
+ Map<LogicalOperator, LogicalPlan> aliases = new HashMap<LogicalOperator, LogicalPlan>();
Map<OperatorKey, LogicalOperator> logicalOpTable = new HashMap<OperatorKey, LogicalOperator>();
Map<String, LogicalOperator> aliasOp = new HashMap<String, LogicalOperator>();
Map<String, ExpressionOperator> defineAliases = new HashMap<String, ExpressionOperator>();
Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestTypeChecking.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestTypeChecking.java?rev=659206&r1=659205&r2=659206&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestTypeChecking.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestTypeChecking.java Thu May 22 12:04:25 2008
@@ -207,7 +207,7 @@
System.out.println(logicalOpTable.get(roots.get(0)));
}
if ((roots.get(0)).getAlias()!=null){
- aliases.put((roots.get(0)).getAlias(), lp);
+ aliases.put(roots.get(0), lp);
}
}
@@ -232,7 +232,7 @@
TypeCheckingTestUtil.printTypeGraph(plan) ;
}
- Map<String, LogicalPlan> aliases = new HashMap<String, LogicalPlan>();
+ Map<LogicalOperator, LogicalPlan> aliases = new HashMap<LogicalOperator, LogicalPlan>();
Map<OperatorKey, LogicalOperator> logicalOpTable = new HashMap<OperatorKey, LogicalOperator>();
Map<String, LogicalOperator> aliasOp = new HashMap<String, LogicalOperator>();
Map<String, ExpressionOperator> defineAliases = new HashMap<String, ExpressionOperator>();