You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2010/06/15 02:49:01 UTC

svn commit: r954681 - in /hadoop/pig/trunk: ./ src/org/apache/pig/ src/org/apache/pig/impl/logicalLayer/ src/org/apache/pig/tools/grunt/ src/org/apache/pig/tools/pigscript/parser/ test/org/apache/pig/test/

Author: daijy
Date: Tue Jun 15 00:49:00 2010
New Revision: 954681

URL: http://svn.apache.org/viewvc?rev=954681&view=rev
Log:
PIG-972: Make describe work with nested foreach

Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/src/org/apache/pig/PigServer.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOForEach.java
    hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java
    hadoop/pig/trunk/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj
    hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java

Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=954681&r1=954680&r2=954681&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Tue Jun 15 00:49:00 2010
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-972: Make describe work with nested foreach (aniket486 via daijy)
+
 PIG-1438: [Performance] MultiQueryOptimizer should also merge DISTINCT jobs
 (rding)
 

Modified: hadoop/pig/trunk/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/PigServer.java?rev=954681&r1=954680&r2=954681&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/PigServer.java Tue Jun 15 00:49:00 2010
@@ -72,6 +72,7 @@ import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.logicalLayer.LOConst;
+import org.apache.pig.impl.logicalLayer.LOForEach;
 import org.apache.pig.impl.logicalLayer.LOLimit;
 import org.apache.pig.impl.logicalLayer.LOLoad;
 import org.apache.pig.impl.logicalLayer.LOSort;
@@ -593,6 +594,27 @@ public class PigServer {
             throw new FrontendException (msg, errCode, PigException.INPUT, false, null, fee);
         }
     }
+    
+    /**
+     * Write the schema for a nestedAlias to System.out. Denoted by alias::nestedAlias.
+     * @param alias Alias whose schema has nestedAlias
+     * @param nestedAlias Alias whose schema will be written out
+     * @return Schema of alias dumped
+     * @throws IOException
+     */
+    public Schema dumpSchemaNested(String alias, String nestedAlias) throws IOException{
+        LogicalPlan lp = getPlanFromAlias(alias, "describe");
+        lp = compileLp(alias, false);
+        LogicalOperator op = lp.getLeaves().get(0);
+        if(op instanceof LOForEach) {
+            return ((LOForEach)op).dumpNestedSchema(nestedAlias);
+        }
+        else {
+            int errCode = 1001;
+            String msg = "Unable to describe schema for " + alias + "::" + nestedAlias; 
+            throw new FrontendException (msg, errCode, PigException.INPUT, false, null);
+        }
+    }
 
     /**
      * Set the name of the job.  This name will get translated to mapred.job.name.

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOForEach.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOForEach.java?rev=954681&r1=954680&r2=954681&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOForEach.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOForEach.java Tue Jun 15 00:49:00 2010
@@ -17,18 +17,23 @@
  */
 package org.apache.pig.impl.logicalLayer;
 
+import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.HashMap;
 import java.util.Set;
-import java.util.Iterator;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.pig.PigException;
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.logicalLayer.optimizer.SchemaRemover;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.logicalLayer.schema.SchemaMergeException;
-import org.apache.pig.impl.logicalLayer.optimizer.SchemaRemover;
 import org.apache.pig.impl.plan.Operator;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.PlanException;
@@ -37,9 +42,6 @@ import org.apache.pig.impl.plan.Required
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.MultiMap;
 import org.apache.pig.impl.util.Pair;
-import org.apache.pig.data.DataType;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 
 
 public class LOForEach extends RelationalOperator {
@@ -434,7 +436,71 @@ public class LOForEach extends Relationa
         super.unsetSchema();
         mSchemaPlanMapping = new ArrayList<LogicalPlan>();
     }
-
+    
+    private void doAllSuccessors(LogicalPlan lp,
+                                LogicalOperator node,
+                                Set<LogicalOperator> seen,
+                                Collection<LogicalOperator> fifo) throws VisitorException {
+        if (!seen.contains(node)) {
+            // We haven't seen this one before.
+            Collection<LogicalOperator> succs = lp.getSuccessors(node);
+            if (succs != null && succs.size() > 0) {
+                // Do all our predecessors before ourself
+                for (LogicalOperator op : succs) {
+                    doAllSuccessors(lp, op, seen, fifo);
+                }
+            }
+            // Now do ourself
+            seen.add(node);
+            fifo.add(node);
+        }
+    }
+    
+    public Schema dumpNestedSchema(String nestedAlias) throws IOException {
+        boolean found = false;
+        // To avoid non-deterministic traversal, 
+        // we do a traversal from leaf to root with ReverseDependencyOrderWalker 
+        // this way schema we print is always the latest schema in the order in the script
+        // Also, since we do not allow union, join, cogroup, cross etc as part of inner plan
+        // we have a tree (not a DAG) as part of inner plan and hence traversal is simpler
+       
+        for(LogicalPlan lp : mForEachPlans) {
+            // Following walk is highly inefficient as we create a fifo list of all elements
+            // we need to traverse and then check for the suitable element
+            // but should be fine as our innerplans are expected to be small
+            // Also, although we are sure that innerplan is a tree instead of DAG
+            // We keep the algorithm assuming it is DAG, to avoid bugs later
+            // This is borrowed logic from ReverseDependencyOrderWalker ;)
+            List<LogicalOperator> fifo = new ArrayList<LogicalOperator>();
+            Set<LogicalOperator> seen = new HashSet<LogicalOperator>();
+            for(LogicalOperator op : lp.getRoots()) {
+                doAllSuccessors(lp, op, seen, fifo);
+            }
+            for(LogicalOperator op: fifo) {
+                if(!(op instanceof LOProject) && nestedAlias.equalsIgnoreCase(op.mAlias)) {
+                    found = true;
+                    // Expression operators do not have any schema
+                    if(!(op instanceof ExpressionOperator)) {
+                        Schema nestedSc = op.getSchema();
+                        System.out.println(nestedAlias + ": " + nestedSc.toString());
+                        return nestedSc;
+                    }
+                    else {
+                        int errCode = 1113;
+                        String msg = "Unable to describe schema for nested expression "+ nestedAlias; 
+                        throw new FrontendException (msg, errCode, PigException.INPUT, false, null);
+                    }
+                }
+            }
+            if(!found) {
+                int errCode = 1114;
+                String msg = "Unable to find schema for nested alias "+ nestedAlias; 
+                throw new FrontendException (msg, errCode, PigException.INPUT, false, null);
+            }
+        }
+        return null;
+    }
+    
     /**
      * @see org.apache.pig.impl.plan.Operator#clone()
      * Do not use the clone method directly. Operators are cloned when logical plans

Modified: hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java?rev=954681&r1=954680&r2=954681&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java Tue Jun 15 00:49:00 2010
@@ -227,11 +227,19 @@ public class GruntParser extends PigScri
     
     @Override
     protected void processDescribe(String alias) throws IOException {
+        String nestedAlias = null;
         if(mExplain == null) { // process only if not in "explain" mode
             if(alias==null) {
                 alias = mPigServer.getPigContext().getLastAlias();
             }
-            mPigServer.dumpSchema(alias);
+            if(alias.contains("::")) {
+                nestedAlias = alias.substring(alias.indexOf("::") + 2);
+                alias = alias.substring(0, alias.indexOf("::"));
+                mPigServer.dumpSchemaNested(alias, nestedAlias);
+            }
+            else {
+                mPigServer.dumpSchema(alias);
+            }
         } else {
             log.warn("'describe' statement is ignored while processing 'explain -script' or '-check'");
         }

Modified: hadoop/pig/trunk/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj?rev=954681&r1=954680&r2=954681&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj (original)
+++ hadoop/pig/trunk/src/org/apache/pig/tools/pigscript/parser/PigScriptParser.jj Tue Jun 15 00:49:00 2010
@@ -355,7 +355,7 @@ TOKEN:
 |	<#NUMBER: <INTEGER> | <FLOAT> | <FLOAT> ( ["e","E"] ([ "-","+"])? <FLOAT> )?>
 }
 
-TOKEN: {<IDENTIFIER: (<LETTER>)+(<DIGIT> | <LETTER> | <SPECIALCHAR>)*>}
+TOKEN: {<IDENTIFIER: (<LETTER>)+(<DIGIT> | <LETTER> | <SPECIALCHAR> | "::")*>}
 TOKEN: {<PATH: (~["(", ")", ";", "\r", " ", "\t", "\n"])+>}
 TOKEN : { <QUOTEDSTRING :  "'"
       (   (~["'","\\","\n","\r"])
@@ -454,8 +454,8 @@ void parse() throws IOException:
 	{processIllustrate(t1.image);}
 	|
 	<DESCRIBE>
-	(
 	t1 = <IDENTIFIER>
+	(
 	{processDescribe(t1.image);}
 	|
 		{processDescribe(null);}

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java?rev=954681&r1=954680&r2=954681&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java Tue Jun 15 00:49:00 2010
@@ -41,6 +41,7 @@ import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.test.utils.Identity;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -644,4 +645,31 @@ public class TestEvalPipeline2 extends T
         
         Util.deleteFile(cluster, "table_testCustomPartitionerCross");
     }
+    
+    // See PIG-972
+    @Test
+    public void testDescribeNestedAlias() throws Exception{
+        String[] input = {
+                "1\t3",
+                "2\t4",
+                "3\t5"
+        };
+        
+        Util.createInputFile(cluster, "table_testDescribeNestedAlias", input);
+        pigServer.registerQuery("A = LOAD 'table_testDescribeNestedAlias' as (a0, a1);");
+        pigServer.registerQuery("P = GROUP A by a1;");
+        // Test RelationalOperator
+        pigServer.registerQuery("B = FOREACH P { D = ORDER A by $0; generate D.$0; };");
+        
+        // Test ExpressionOperator - negative test case
+        pigServer.registerQuery("C = FOREACH A { D = a0/a1; E=a1/a0; generate E as newcol; };");
+        Schema schema = pigServer.dumpSchemaNested("B", "D");
+        assertTrue(schema.toString().equalsIgnoreCase("{a0: bytearray,a1: bytearray}"));
+        try {
+            schema = pigServer.dumpSchemaNested("C", "E");
+        } catch (FrontendException e) {
+            assertTrue(e.getErrorCode() == 1113);
+        }
+    }
+    
 }