You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by th...@apache.org on 2011/08/11 02:45:50 UTC
svn commit: r1156419 - in /pig/trunk: ./
src/org/apache/pig/backend/hadoop/executionengine/
src/org/apache/pig/newplan/logical/optimizer/
src/org/apache/pig/newplan/logical/rules/ test/org/apache/pig/test/
Author: thejas
Date: Thu Aug 11 00:45:50 2011
New Revision: 1156419
URL: http://svn.apache.org/viewvc?rev=1156419&view=rev
Log:
PIG-2176: add logical plan assumption checker
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
pig/trunk/src/org/apache/pig/newplan/logical/optimizer/SchemaResetter.java
pig/trunk/src/org/apache/pig/newplan/logical/rules/DuplicateForEachColumnRewrite.java
pig/trunk/src/org/apache/pig/newplan/logical/rules/ImplicitSplitInserter.java
pig/trunk/test/org/apache/pig/test/Util.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1156419&r1=1156418&r2=1156419&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Aug 11 00:45:50 2011
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-2176: add logical plan assumption checker (thejas)
+
PIG-1631: Support to 2 level nested foreach (aniket486 via daijy)
PIG-2191: Reduce amount of log spam generated by UDFs (dvryaboy)
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=1156419&r1=1156418&r2=1156419&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Thu Aug 11 00:45:50 2011
@@ -239,7 +239,7 @@ public class HExecutionEngine {
UidResetter uidResetter = new UidResetter( plan );
uidResetter.visit();
- SchemaResetter schemaResetter = new SchemaResetter( plan );
+ SchemaResetter schemaResetter = new SchemaResetter( plan, true /*skip duplicate uid check*/ );
schemaResetter.visit();
HashSet<String> optimizerRules = null;
Modified: pig/trunk/src/org/apache/pig/newplan/logical/optimizer/SchemaResetter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/SchemaResetter.java?rev=1156419&r1=1156418&r2=1156419&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/optimizer/SchemaResetter.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/optimizer/SchemaResetter.java Thu Aug 11 00:45:50 2011
@@ -18,9 +18,13 @@
package org.apache.pig.newplan.logical.optimizer;
import java.util.Collection;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
+import org.apache.pig.PigException;
import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.plan.PlanValidationException;
import org.apache.pig.impl.util.MultiMap;
import org.apache.pig.newplan.DependencyOrderWalker;
import org.apache.pig.newplan.OperatorPlan;
@@ -46,17 +50,29 @@ import org.apache.pig.newplan.logical.re
import org.apache.pig.newplan.logical.relational.LOStream;
import org.apache.pig.newplan.logical.relational.LOUnion;
import org.apache.pig.newplan.logical.relational.LogicalRelationalNodesVisitor;
+import org.apache.pig.newplan.logical.relational.LogicalSchema;
+import org.apache.pig.newplan.logical.relational.LogicalSchema.LogicalFieldSchema;
public class SchemaResetter extends LogicalRelationalNodesVisitor {
+ // uid duplicates are removed only after optimizer rule
+ // DuplicateForEachColumnRewrite has run. So disable it in calls before that
+ boolean skipDuplicateUidCheck = true;
+
public SchemaResetter(OperatorPlan plan) throws FrontendException {
+ this(plan, false);
+ }
+
+ public SchemaResetter(OperatorPlan plan, boolean skipDuplicateUidCheck)
+ throws FrontendException {
super(plan, new DependencyOrderWalker(plan));
+ this.skipDuplicateUidCheck = skipDuplicateUidCheck;
}
@Override
public void visit(LOLoad load) throws FrontendException {
load.resetSchema();
- load.getSchema();
+ validate(load.getSchema());
}
@Override
@@ -64,13 +80,13 @@ public class SchemaResetter extends Logi
filter.resetSchema();
FieldSchemaResetter fsResetter = new FieldSchemaResetter(filter.getFilterPlan());
fsResetter.visit();
- filter.getSchema();
+ validate(filter.getSchema());
}
@Override
public void visit(LOStore store) throws FrontendException {
store.resetSchema();
- store.getSchema();
+ validate(store.getSchema());
}
@Override
@@ -81,7 +97,7 @@ public class SchemaResetter extends Logi
FieldSchemaResetter fsResetter = new FieldSchemaResetter(joinPlan);
fsResetter.visit();
}
- join.getSchema();
+ validate(join.getSchema());
}
@Override
@@ -92,7 +108,7 @@ public class SchemaResetter extends Logi
pushWalker(newWalker);
currentWalker.walk(this);
popWalker();
- foreach.getSchema();
+ validate(foreach.getSchema());
}
@Override
@@ -103,7 +119,7 @@ public class SchemaResetter extends Logi
FieldSchemaResetter fsResetter = new FieldSchemaResetter(genPlan);
fsResetter.visit();
}
- gen.getSchema();
+ validate(gen.getSchema());
}
@Override
@@ -121,13 +137,13 @@ public class SchemaResetter extends Logi
FieldSchemaResetter fsResetter = new FieldSchemaResetter(expPlan);
fsResetter.visit();
}
- loCogroup.getSchema();
+ validate(loCogroup.getSchema());
}
@Override
public void visit(LOSplit loSplit) throws FrontendException {
loSplit.resetSchema();
- loSplit.getSchema();
+ validate(loSplit.getSchema());
}
@Override
@@ -135,13 +151,13 @@ public class SchemaResetter extends Logi
loSplitOutput.resetSchema();
FieldSchemaResetter fsResetter = new FieldSchemaResetter(loSplitOutput.getFilterPlan());
fsResetter.visit();
- loSplitOutput.getSchema();
+ validate(loSplitOutput.getSchema());
}
@Override
public void visit(LOUnion loUnion) throws FrontendException {
loUnion.resetSchema();
- loUnion.getSchema();
+ validate(loUnion.getSchema());
}
@Override
@@ -152,13 +168,13 @@ public class SchemaResetter extends Logi
FieldSchemaResetter fsResetter = new FieldSchemaResetter(sortPlan);
fsResetter.visit();
}
- loSort.getSchema();
+ validate(loSort.getSchema());
}
@Override
public void visit(LODistinct loDistinct) throws FrontendException {
loDistinct.resetSchema();
- loDistinct.getSchema();
+ validate(loDistinct.getSchema());
}
@Override
@@ -169,20 +185,64 @@ public class SchemaResetter extends Logi
loLimit.getLimitPlan());
fsResetter.visit();
}
- loLimit.getSchema();
+ validate(loLimit.getSchema());
}
@Override
public void visit(LOCross loCross) throws FrontendException {
loCross.resetSchema();
- loCross.getSchema();
+ validate(loCross.getSchema());
}
@Override
public void visit(LOStream loStream) throws FrontendException {
loStream.resetSchema();
- loStream.getSchema();
+ validate(loStream.getSchema());
}
+
+
+ /**
+ * Check if schema is valid (ready to be part of a final logical plan)
+ * @param schema
+ * @throws PlanValidationException if the if any field in schema has uid -1
+ * or (skipDuplicateUidCheck is true and there are duplicate uids in schema)
+ */
+ public void validate(LogicalSchema schema)
+ throws PlanValidationException{
+
+ if(schema == null)
+ return;
+
+ Set<Long> uidsSeen = new HashSet<Long>();
+ for(LogicalFieldSchema fs : schema.getFields()){
+
+ if(!skipDuplicateUidCheck){
+ //check duplicate uid
+ if(!uidsSeen.add(fs.uid)){
+ // uid already seen
+ String msg = "Logical plan invalid state: duplicate uid in " +
+ "schema : " + schema;
+ throw new PlanValidationException(
+ msg,
+ 2270,
+ PigException.BUG
+ );
+ }
+ }
+
+ if(fs.uid < 0){
+ String msg = "Logical plan invalid state: invalid uid " + fs.uid +
+ " in schema : " + schema;
+ throw new PlanValidationException(
+ msg,
+ 2271,
+ PigException.BUG
+ );
+
+ }
+ }
+ }
+
}
class FieldSchemaResetter extends AllSameExpressionVisitor {
Modified: pig/trunk/src/org/apache/pig/newplan/logical/rules/DuplicateForEachColumnRewrite.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/rules/DuplicateForEachColumnRewrite.java?rev=1156419&r1=1156418&r2=1156419&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/rules/DuplicateForEachColumnRewrite.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/rules/DuplicateForEachColumnRewrite.java Thu Aug 11 00:45:50 2011
@@ -167,7 +167,9 @@ public class DuplicateForEachColumnRewri
uidResetter.visit();
// Manually regenerate schema since we skip listener
- SchemaResetter schemaResetter = new SchemaResetter(currentPlan);
+ // skip duplicate uid check in schema as it would be fixed in
+ // only portion of the plan
+ SchemaResetter schemaResetter = new SchemaResetter(currentPlan, true);
schemaResetter.visit();
}
}
Modified: pig/trunk/src/org/apache/pig/newplan/logical/rules/ImplicitSplitInserter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/rules/ImplicitSplitInserter.java?rev=1156419&r1=1156418&r2=1156419&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/rules/ImplicitSplitInserter.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/rules/ImplicitSplitInserter.java Thu Aug 11 00:45:50 2011
@@ -139,7 +139,7 @@ public class ImplicitSplitInserter exten
uidResetter.visit();
// Manually regenerate schema since we skip listener
- SchemaResetter schemaResetter = new SchemaResetter(currentPlan);
+ SchemaResetter schemaResetter = new SchemaResetter(currentPlan, true);
schemaResetter.visit();
}
Modified: pig/trunk/test/org/apache/pig/test/Util.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/Util.java?rev=1156419&r1=1156418&r2=1156419&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/Util.java (original)
+++ pig/trunk/test/org/apache/pig/test/Util.java Thu Aug 11 00:45:50 2011
@@ -712,7 +712,8 @@ public class Util {
UidResetter uidResetter = new UidResetter( lp );
uidResetter.visit();
- SchemaResetter schemaResetter = new SchemaResetter( lp );
+ SchemaResetter schemaResetter =
+ new SchemaResetter( lp, true /*disable duplicate uid check*/ );
schemaResetter.visit();
LoadStoreFuncDupSignatureValidator loadStoreFuncDupSignatureValidator = new LoadStoreFuncDupSignatureValidator(lp);