You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2008/05/22 21:04:26 UTC

svn commit: r659206 - in /incubator/pig/branches/types: src/org/apache/pig/ src/org/apache/pig/impl/logicalLayer/ src/org/apache/pig/impl/logicalLayer/parser/ test/org/apache/pig/test/

Author: gates
Date: Thu May 22 12:04:25 2008
New Revision: 659206

URL: http://svn.apache.org/viewvc?rev=659206&view=rev
Log:
PIG-159 Santhosh's work to chain together logical plans.


Modified:
    incubator/pig/branches/types/src/org/apache/pig/PigServer.java
    incubator/pig/branches/types/src/org/apache/pig/StandAloneParser.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalPlanBuilder.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
    incubator/pig/branches/types/test/org/apache/pig/test/TestLogToPhyCompiler.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java
    incubator/pig/branches/types/test/org/apache/pig/test/TestTypeChecking.java

Modified: incubator/pig/branches/types/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/PigServer.java?rev=659206&r1=659205&r2=659206&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/PigServer.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/PigServer.java Thu May 22 12:04:25 2008
@@ -84,7 +84,7 @@
     }
 
 
-    Map<String, LogicalPlan> aliases = new HashMap<String, LogicalPlan>();
+    Map<LogicalOperator, LogicalPlan> aliases = new HashMap<LogicalOperator, LogicalPlan>();
     Map<OperatorKey, LogicalOperator> opTable = new HashMap<OperatorKey, LogicalOperator>();
     Map<String, LogicalOperator> aliasOp = new HashMap<String, LogicalOperator>();
     Map<String, ExpressionOperator> defineAliases = new HashMap<String, ExpressionOperator>();
@@ -252,7 +252,7 @@
      * result
      */
     public Iterator<Tuple> openIterator(String id) throws IOException {
-        if (!aliases.containsKey(id))
+        if (!aliases.containsKey(aliasOp.get(id)))
             throw new IOException("Invalid alias: " + id);
 
         try {
@@ -471,7 +471,14 @@
     }
   
     public Map<String, LogicalPlan> getAliases() {
-        return this.aliases;
+        Map<String, LogicalPlan> aliasPlans = new HashMap<String, LogicalPlan>();
+        for(LogicalOperator op: this.aliases.keySet()) {
+            String alias = op.getAlias();
+            if(null != alias) {
+                aliasPlans.put(alias, this.aliases.get(op));
+            }
+        }
+        return aliasPlans;
     }
     
     public void shutdown() {

Modified: incubator/pig/branches/types/src/org/apache/pig/StandAloneParser.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/StandAloneParser.java?rev=659206&r1=659205&r2=659206&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/StandAloneParser.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/StandAloneParser.java Thu May 22 12:04:25 2008
@@ -21,6 +21,7 @@
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.util.Iterator;
+import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -28,6 +29,7 @@
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.impl.logicalLayer.LogicalOperator;
 
 public class StandAloneParser {
     
@@ -66,9 +68,10 @@
         try{        
             pig.registerQuery(query);
             System.out.print("Current aliases: ");
-            for (Iterator<String> it = pig.getAliases().keySet().iterator(); it.hasNext(); ) {
+            Map<String, LogicalPlan> aliasPlan = pig.getAliases();
+            for (Iterator<String> it = aliasPlan.keySet().iterator(); it.hasNext(); ) {
                 String alias = it.next();
-                LogicalPlan lp = pig.getAliases().get(alias);
+                LogicalPlan lp = aliasPlan.get(alias);
                 System.out.print(alias + "->" + lp.getLeaves().get(0).getSchema());
                 if (it.hasNext()) System.out.print(", \n");
                 else System.out.print("\n");

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalPlanBuilder.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalPlanBuilder.java?rev=659206&r1=659205&r2=659206&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalPlanBuilder.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalPlanBuilder.java Thu May 22 12:04:25 2008
@@ -40,7 +40,7 @@
 
     public LogicalPlan parse(String scope, 
                              String query, 
-                             Map<String, LogicalPlan> aliases,
+                             Map<LogicalOperator, LogicalPlan> aliases,
                              Map<OperatorKey, LogicalOperator> opTable,
                              Map<String, LogicalOperator> aliasOp,
                              Map<String, ExpressionOperator> defineAliases)

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=659206&r1=659205&r2=659206&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Thu May 22 12:04:25 2008
@@ -52,7 +52,7 @@
 
 public class QueryParser {
 	private PigContext pigContext;
-	private Map<String, LogicalPlan> aliases;
+	private Map<LogicalOperator, LogicalPlan> aliases;
 	private Map<OperatorKey, LogicalOperator> opTable;
 	private String scope;
 	private NodeIdGenerator nodeIdGen;
@@ -68,7 +68,7 @@
 	public QueryParser(InputStream in,
 					   PigContext pigContext, 
 					   String scope, 
-					   Map<String, LogicalPlan> aliases,
+					   Map<LogicalOperator, LogicalPlan> aliases,
 					   Map<OperatorKey, LogicalOperator> opTable,
 					   Map<String, LogicalOperator> aliasOp,
 					   Map<String, ExpressionOperator> defineAliases) {
@@ -270,8 +270,8 @@
 	 void addSplitOutput(LogicalPlan lp, LOSplit splitOp, String alias, LogicalPlan condPlan, int index) throws PlanException{
 		LogicalOperator splitOutput = new LOSplitOutput(lp, new OperatorKey(scope, getNextId()), index, condPlan);
 		splitOp.addOutput(splitOutput);
-		//splitOp.addOutputAlias(alias, condPlan);
 		addAlias(alias, splitOutput);
+        addLogicalPlan(splitOutput, lp);
 		
 		lp.add(splitOutput);
 		log.debug("Added alias: " + splitOutput.getAlias() + " class: " 
@@ -332,6 +332,34 @@
 	 //END
 	
 	private static Map<String, Byte> nameToTypeMap = DataType.genNameToTypeMap();
+
+    public void addLogicalPlan(LogicalOperator op, LogicalPlan plan) {
+        aliases.put(op, plan);
+    }
+
+    public LogicalPlan getLogicalPlan(LogicalOperator op) {
+        return aliases.get(op);
+    }
+
+    public void attachPlan(LogicalPlan lp, LogicalOperator root, LogicalPlan rootPlan) throws ParseException {
+        log.trace("Entering attachPlan");
+        lp.add(root);
+        log.debug("Added operator " + root + " to the logical plan " + lp);
+        if(null == rootPlan.getPredecessors(root)) {
+            log.trace("Exiting attachPlan");
+            return;
+        }
+        for(LogicalOperator rootPred: rootPlan.getPredecessors(root)) {
+            attachPlan(lp, rootPred, rootPlan);
+            try {
+                lp.connect(rootPred, root);
+                log.debug("Connected operator " + rootPred + " to " + root + " in the logical plan " + lp);
+            } catch (FrontendException fee) {
+                throw new ParseException(fee.getMessage());
+            }
+        }
+        log.trace("Exiting attachPlan");
+    }
 }
 
 	
@@ -435,8 +463,29 @@
 	)
 	{ 
 		if(null != root) {
-				log.debug("Root: " + root.getClass().getName() + " schema: " + root.getSchema());
+            log.debug("Adding " + root.getAlias() + " " + root + " to the lookup table " + aliases);
+            addLogicalPlan(root, lp);
+			log.debug("Root: " + root.getClass().getName() + " schema: " + root.getSchema());
 		}
+
+        ArrayList<LogicalOperator> roots = new ArrayList<LogicalOperator>(lp.getRoots().size());
+        for(LogicalOperator op: lp.getRoots()) {
+            roots.add(op);
+        }
+        
+        for(LogicalOperator op: roots) {
+            //At this point we have a logical plan for the pig statement
+            //In order to construct the entire logical plan we need to traverse
+            //each root and get the logical plan it belongs to. From each of those
+            //plans we need the predecessors of the root of the current logical plan
+            //and so on. This is a computationally intensive operatton but should
+            //be fine as its restricted to the parser
+
+            LogicalPlan rootPlan = aliases.get(op);
+            if(null != rootPlan) {
+                attachPlan(lp, op, rootPlan);
+            }
+        }
 		
 		log.trace("Exiting Parse");
 		return lp; 
@@ -564,7 +613,7 @@
 {
 	(
 	(
-	(<LOAD> op = LoadClause(lp) [<AS> (LOOKAHEAD(2) "(" schema = TupleSchema() ")" {op.setSchema(schema);log.debug("Load as schema()");schema.printAliases();} | fs = AtomSchema() {schema = new Schema(fs); op.setSchema(schema); log.info("Load as atomschema()");schema.printAliases();}) ])
+	(<LOAD> op = LoadClause(lp) [<AS> (LOOKAHEAD(2) "(" schema = TupleSchema() ")" {op.setSchema(schema);log.debug("Load as schema()");schema.printAliases();} | fs = AtomSchema() {schema = new Schema(fs); op.setSchema(schema); log.debug("Load as atomschema()");schema.printAliases();}) ])
 |	((<GROUP> | <COGROUP>) op = CogroupClause(lp))
 |	(<FILTER> op = FilterClause(lp))
 |   (<ORDER> op = OrderClause(lp))
@@ -1485,10 +1534,8 @@
 }
 {
 		(
-//		lhs = UnaryExpr(over,specs,lp,input)
 		lhs = CastExpr(over,specs,lp,input)
 		(
-//		( t = <STAR> | t = "/" | t = "%") rhs = UnaryExpr(over,specs,lp,input) 			
 		( t = <STAR> | t = "/" | t = "%") rhs = CastExpr(over,specs,lp,input) 			
 		{
 			assertAtomic(lhs,true);
@@ -1564,7 +1611,6 @@
 	log.trace("Entering NegativeExpr");
 }
 {
-//	"-" c1=UnaryExpr(over,specs,lp,input)
 	"-" c1=CastExpr(over,specs,lp,input)
 	{
 		ExpressionOperator eOp = new LONegative(lp, new OperatorKey(scope, getNextId()), c1);
@@ -1768,12 +1814,6 @@
 	[type = Type()] funcName=EvalFunction() "(" args=FuncDeclareArgs(lp) ")" 
 	{
 		ExpressionOperator userFunc = new LOUserFunc(lp, new OperatorKey(scope, getNextId()), funcName, args, type);
-		//lp.add(userFunc);
-		//log.debug("FuncDeclareSpec: Added operator " + userFunc.getClass().getName() + " " + userFunc + " to logical plan " + lp);
-		//for(ExpressionOperator exprOp: args) {
-			//lp.connect(exprOp, userFunc);
-			//log.debug("FuncDeclareSpec: Connected operator " + exprOp.getClass().getName() + " " + exprOp+ " to " + userFunc + " logical plan " + lp);
-		//}
 		log.trace("Exiting FuncDeclareSpec");
 		return userFunc;
 	}
@@ -1815,8 +1855,6 @@
 			project.setAlias(t.image); 
 		}
 		item = project;
-		//lp.add(project);
-		log.debug("FuncDeclareArgsItem: Added operator " + project.getClass().getName() + " " + project + " to logical plan " + lp);
 	}
 	)
 	{log.trace("Exiting FuncDeclareArgsItem");return item;}

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestLogToPhyCompiler.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestLogToPhyCompiler.java?rev=659206&r1=659205&r2=659206&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestLogToPhyCompiler.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestLogToPhyCompiler.java Thu May 22 12:04:25 2008
@@ -37,6 +37,7 @@
 import org.apache.pig.impl.logicalLayer.ExpressionOperator;
 import org.apache.pig.impl.logicalLayer.LogToPhyTranslationVisitor;
 import org.apache.pig.impl.logicalLayer.LogicalOperator;
+import org.apache.pig.impl.logicalLayer.LOLoad;
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
 import org.apache.pig.impl.logicalLayer.LogicalPlanBuilder;
 import org.apache.pig.impl.logicalLayer.OperatorKey;
@@ -382,13 +383,12 @@
                                            defineAliases);
             List<LogicalOperator> roots = lp.getRoots();
             
+            
             if(roots.size() > 0) {
-                if (logicalOpTable.get(roots.get(0)) instanceof LogicalOperator){
-                    System.out.println(query);
-                    System.out.println(logicalOpTable.get(roots.get(0)));
-                }
-                if ((roots.get(0)).getAlias()!=null){
-                    aliases.put((roots.get(0)).getAlias(), lp);
+                for(LogicalOperator op: roots) {
+                    if (!(op instanceof LOLoad)){
+                        throw new Exception("Cannot have a root that is not the load operator LOLoad. Found " + op.getClass().getName());
+                    }
                 }
             }
             
@@ -427,7 +427,7 @@
         return null;
     }
     
-    Map<String, LogicalPlan> aliases = new HashMap<String, LogicalPlan>();
+    Map<LogicalOperator, LogicalPlan> aliases = new HashMap<LogicalOperator, LogicalPlan>();
     Map<OperatorKey, LogicalOperator> logicalOpTable = new HashMap<OperatorKey, LogicalOperator>();
     Map<String, LogicalOperator> aliasOp = new HashMap<String, LogicalOperator>();
     Map<String, ExpressionOperator> defineAliases = new HashMap<String, ExpressionOperator>();

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java?rev=659206&r1=659205&r2=659206&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestLogicalPlanBuilder.java Thu May 22 12:04:25 2008
@@ -807,7 +807,7 @@
     @Test
     public void testQuery68() {
         buildPlan(" a = load 'input1';");
-        buildPlan(" b = foreach a generate {(16, 4.0e-2, 'hello'), (0.5f, 'another tuple', 12l, {()})};");
+        buildPlan(" b = foreach a generate 10, {(16, 4.0e-2, 'hello'), (0.5f, 'another tuple', 12l, {()})};");
     }
 
     @Test
@@ -875,7 +875,8 @@
 
     @Test
     public void testQuery75() {
-        buildPlan("union (load 'a'), (load 'b'), (load 'c');");
+        buildPlan("a = union (load 'a'), (load 'b'), (load 'c');");
+        buildPlan("b = foreach a {generate $0;} parallel 10;");
     }
     
     // Helper Functions
@@ -901,12 +902,10 @@
             List<LogicalOperator> roots = lp.getRoots();
             
             if(roots.size() > 0) {
-                if (logicalOpTable.get(roots.get(0)) instanceof LogicalOperator){
-                    System.out.println(query);
-                    System.out.println(logicalOpTable.get(roots.get(0)));
-                }
-                if ((roots.get(0)).getAlias()!=null){
-                    aliases.put((roots.get(0)).getAlias(), lp);
+                for(LogicalOperator op: roots) {
+                    if (!(op instanceof LOLoad)){
+                        throw new Exception("Cannot have a root that is not the load operator LOLoad. Found " + op.getClass().getName());
+                    }
                 }
             }
             
@@ -946,7 +945,7 @@
         return null;
     }
     
-    Map<String, LogicalPlan> aliases = new HashMap<String, LogicalPlan>();
+    Map<LogicalOperator, LogicalPlan> aliases = new HashMap<LogicalOperator, LogicalPlan>();
     Map<OperatorKey, LogicalOperator> logicalOpTable = new HashMap<OperatorKey, LogicalOperator>();
     Map<String, LogicalOperator> aliasOp = new HashMap<String, LogicalOperator>();
     Map<String, ExpressionOperator> defineAliases = new HashMap<String, ExpressionOperator>();

Modified: incubator/pig/branches/types/test/org/apache/pig/test/TestTypeChecking.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/test/org/apache/pig/test/TestTypeChecking.java?rev=659206&r1=659205&r2=659206&view=diff
==============================================================================
--- incubator/pig/branches/types/test/org/apache/pig/test/TestTypeChecking.java (original)
+++ incubator/pig/branches/types/test/org/apache/pig/test/TestTypeChecking.java Thu May 22 12:04:25 2008
@@ -207,7 +207,7 @@
                     System.out.println(logicalOpTable.get(roots.get(0)));
                 }
                 if ((roots.get(0)).getAlias()!=null){
-                    aliases.put((roots.get(0)).getAlias(), lp);
+                    aliases.put(roots.get(0), lp);
                 }
             }
 
@@ -232,7 +232,7 @@
         TypeCheckingTestUtil.printTypeGraph(plan) ;
     }
 
-    Map<String, LogicalPlan> aliases = new HashMap<String, LogicalPlan>();
+    Map<LogicalOperator, LogicalPlan> aliases = new HashMap<LogicalOperator, LogicalPlan>();
     Map<OperatorKey, LogicalOperator> logicalOpTable = new HashMap<OperatorKey, LogicalOperator>();
     Map<String, LogicalOperator> aliasOp = new HashMap<String, LogicalOperator>();
     Map<String, ExpressionOperator> defineAliases = new HashMap<String, ExpressionOperator>();