You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by th...@apache.org on 2011/08/11 02:45:50 UTC

svn commit: r1156419 - in /pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/newplan/logical/optimizer/ src/org/apache/pig/newplan/logical/rules/ test/org/apache/pig/test/

Author: thejas
Date: Thu Aug 11 00:45:50 2011
New Revision: 1156419

URL: http://svn.apache.org/viewvc?rev=1156419&view=rev
Log:
PIG-2176: add logical plan assumption checker

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
    pig/trunk/src/org/apache/pig/newplan/logical/optimizer/SchemaResetter.java
    pig/trunk/src/org/apache/pig/newplan/logical/rules/DuplicateForEachColumnRewrite.java
    pig/trunk/src/org/apache/pig/newplan/logical/rules/ImplicitSplitInserter.java
    pig/trunk/test/org/apache/pig/test/Util.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1156419&r1=1156418&r2=1156419&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Aug 11 00:45:50 2011
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
 
 IMPROVEMENTS
 
+PIG-2176: add logical plan assumption checker  (thejas)
+
 PIG-1631: Support to 2 level nested foreach (aniket486 via daijy)
 
 PIG-2191: Reduce amount of log spam generated by UDFs (dvryaboy)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=1156419&r1=1156418&r2=1156419&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Thu Aug 11 00:45:50 2011
@@ -239,7 +239,7 @@ public class HExecutionEngine {
         UidResetter uidResetter = new UidResetter( plan );
         uidResetter.visit();
         
-        SchemaResetter schemaResetter = new SchemaResetter( plan );
+        SchemaResetter schemaResetter = new SchemaResetter( plan, true /*skip duplicate uid check*/ );
         schemaResetter.visit();
         
         HashSet<String> optimizerRules = null;

Modified: pig/trunk/src/org/apache/pig/newplan/logical/optimizer/SchemaResetter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/SchemaResetter.java?rev=1156419&r1=1156418&r2=1156419&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/optimizer/SchemaResetter.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/optimizer/SchemaResetter.java Thu Aug 11 00:45:50 2011
@@ -18,9 +18,13 @@
 package org.apache.pig.newplan.logical.optimizer;
 
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
+import org.apache.pig.PigException;
 import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.plan.PlanValidationException;
 import org.apache.pig.impl.util.MultiMap;
 import org.apache.pig.newplan.DependencyOrderWalker;
 import org.apache.pig.newplan.OperatorPlan;
@@ -46,17 +50,29 @@ import org.apache.pig.newplan.logical.re
 import org.apache.pig.newplan.logical.relational.LOStream;
 import org.apache.pig.newplan.logical.relational.LOUnion;
 import org.apache.pig.newplan.logical.relational.LogicalRelationalNodesVisitor;
+import org.apache.pig.newplan.logical.relational.LogicalSchema;
+import org.apache.pig.newplan.logical.relational.LogicalSchema.LogicalFieldSchema;
 
 public class SchemaResetter extends LogicalRelationalNodesVisitor {
 
+    // uid duplicates are removed only after optimizer rule 
+    // DuplicateForEachColumnRewrite has run. So disable it in calls before that
+    boolean skipDuplicateUidCheck = true;
+    
     public SchemaResetter(OperatorPlan plan) throws FrontendException {
+        this(plan, false);
+    }
+
+    public SchemaResetter(OperatorPlan plan, boolean skipDuplicateUidCheck) 
+            throws FrontendException {
         super(plan, new DependencyOrderWalker(plan));
+        this.skipDuplicateUidCheck = skipDuplicateUidCheck;
     }
 
     @Override
     public void visit(LOLoad load) throws FrontendException {
         load.resetSchema();
-        load.getSchema();
+        validate(load.getSchema());
     }
 
     @Override
@@ -64,13 +80,13 @@ public class SchemaResetter extends Logi
         filter.resetSchema();
         FieldSchemaResetter fsResetter = new FieldSchemaResetter(filter.getFilterPlan());
         fsResetter.visit();
-        filter.getSchema();
+        validate(filter.getSchema());
     }
     
     @Override
     public void visit(LOStore store) throws FrontendException {
         store.resetSchema();
-        store.getSchema();
+        validate(store.getSchema());
     }
     
     @Override
@@ -81,7 +97,7 @@ public class SchemaResetter extends Logi
             FieldSchemaResetter fsResetter = new FieldSchemaResetter(joinPlan);
             fsResetter.visit();
         }
-        join.getSchema();
+        validate(join.getSchema());
     }
     
     @Override
@@ -92,7 +108,7 @@ public class SchemaResetter extends Logi
         pushWalker(newWalker);
         currentWalker.walk(this);
         popWalker();
-        foreach.getSchema();
+        validate(foreach.getSchema());
     }
     
     @Override
@@ -103,7 +119,7 @@ public class SchemaResetter extends Logi
             FieldSchemaResetter fsResetter = new FieldSchemaResetter(genPlan);
             fsResetter.visit();
         }
-        gen.getSchema();
+        validate(gen.getSchema());
     }
     
     @Override
@@ -121,13 +137,13 @@ public class SchemaResetter extends Logi
             FieldSchemaResetter fsResetter = new FieldSchemaResetter(expPlan);
             fsResetter.visit();
         }
-        loCogroup.getSchema();
+        validate(loCogroup.getSchema());
     }
     
     @Override
     public void visit(LOSplit loSplit) throws FrontendException {
         loSplit.resetSchema();
-        loSplit.getSchema();
+        validate(loSplit.getSchema());
     }
     
     @Override
@@ -135,13 +151,13 @@ public class SchemaResetter extends Logi
         loSplitOutput.resetSchema();
         FieldSchemaResetter fsResetter = new FieldSchemaResetter(loSplitOutput.getFilterPlan());
         fsResetter.visit();
-        loSplitOutput.getSchema();
+        validate(loSplitOutput.getSchema());
     }
     
     @Override
     public void visit(LOUnion loUnion) throws FrontendException {
         loUnion.resetSchema();
-        loUnion.getSchema();
+        validate(loUnion.getSchema());
     }
     
     @Override
@@ -152,13 +168,13 @@ public class SchemaResetter extends Logi
             FieldSchemaResetter fsResetter = new FieldSchemaResetter(sortPlan);
             fsResetter.visit();
         }
-        loSort.getSchema();
+        validate(loSort.getSchema());
     }
     
     @Override
     public void visit(LODistinct loDistinct) throws FrontendException {
         loDistinct.resetSchema();
-        loDistinct.getSchema();
+        validate(loDistinct.getSchema());
     }
     
     @Override
@@ -169,20 +185,64 @@ public class SchemaResetter extends Logi
                     loLimit.getLimitPlan());
             fsResetter.visit();
         }
-        loLimit.getSchema();
+        validate(loLimit.getSchema());
     }
     
     @Override
     public void visit(LOCross loCross) throws FrontendException {
         loCross.resetSchema();
-        loCross.getSchema();
+        validate(loCross.getSchema());
     }
     
     @Override
     public void visit(LOStream loStream) throws FrontendException {
         loStream.resetSchema();
-        loStream.getSchema();
+        validate(loStream.getSchema());
     }
+
+
+    /**
+     * Check if schema is valid (ready to be part of a final logical plan)
+     * @param schema
+     * @throws PlanValidationException if the if any field in schema has uid -1
+     * or (skipDuplicateUidCheck is true and there are duplicate uids in schema) 
+     */
+    public void validate(LogicalSchema schema)
+            throws PlanValidationException{
+        
+        if(schema == null)
+            return;
+        
+        Set<Long> uidsSeen = new HashSet<Long>();
+        for(LogicalFieldSchema fs : schema.getFields()){
+            
+            if(!skipDuplicateUidCheck){
+                //check duplicate uid
+                if(!uidsSeen.add(fs.uid)){
+                    // uid already seen
+                    String msg = "Logical plan invalid state: duplicate uid in " +
+                            "schema : " + schema;
+                    throw new PlanValidationException(
+                            msg,
+                            2270,
+                            PigException.BUG
+                            );
+                }
+            }
+            
+            if(fs.uid < 0){
+                String msg = "Logical plan invalid state: invalid uid " + fs.uid + 
+                        " in schema : " + schema;
+                throw new PlanValidationException(
+                        msg,
+                        2271,
+                        PigException.BUG
+                        );
+                
+            }
+        }
+    }
+
 }
 
 class FieldSchemaResetter extends AllSameExpressionVisitor {

Modified: pig/trunk/src/org/apache/pig/newplan/logical/rules/DuplicateForEachColumnRewrite.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/rules/DuplicateForEachColumnRewrite.java?rev=1156419&r1=1156418&r2=1156419&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/rules/DuplicateForEachColumnRewrite.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/rules/DuplicateForEachColumnRewrite.java Thu Aug 11 00:45:50 2011
@@ -167,7 +167,9 @@ public class DuplicateForEachColumnRewri
             uidResetter.visit();
             
             // Manually regenerate schema since we skip listener
-            SchemaResetter schemaResetter = new SchemaResetter(currentPlan);
+            // skip duplicate uid check in schema as it would be fixed in 
+            // only portion of the plan
+            SchemaResetter schemaResetter = new SchemaResetter(currentPlan, true);
             schemaResetter.visit();
         }
     }

Modified: pig/trunk/src/org/apache/pig/newplan/logical/rules/ImplicitSplitInserter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/rules/ImplicitSplitInserter.java?rev=1156419&r1=1156418&r2=1156419&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/rules/ImplicitSplitInserter.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/rules/ImplicitSplitInserter.java Thu Aug 11 00:45:50 2011
@@ -139,7 +139,7 @@ public class ImplicitSplitInserter exten
         uidResetter.visit();
 
         // Manually regenerate schema since we skip listener
-        SchemaResetter schemaResetter = new SchemaResetter(currentPlan);
+        SchemaResetter schemaResetter = new SchemaResetter(currentPlan, true);
         schemaResetter.visit();
       }
       

Modified: pig/trunk/test/org/apache/pig/test/Util.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/Util.java?rev=1156419&r1=1156418&r2=1156419&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/Util.java (original)
+++ pig/trunk/test/org/apache/pig/test/Util.java Thu Aug 11 00:45:50 2011
@@ -712,7 +712,8 @@ public class Util {
         UidResetter uidResetter = new UidResetter( lp );
         uidResetter.visit();
         
-        SchemaResetter schemaResetter = new SchemaResetter( lp );
+        SchemaResetter schemaResetter = 
+                new SchemaResetter( lp, true /*disable duplicate uid check*/ );
         schemaResetter.visit();
         
         LoadStoreFuncDupSignatureValidator loadStoreFuncDupSignatureValidator = new LoadStoreFuncDupSignatureValidator(lp);