You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2010/11/01 23:51:11 UTC

svn commit: r1029876 - in /pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/newplan/logical/expression/ src/org/apache/pig/newplan/logical/optimizer/ src/org/apache/pig/newplan/logical/relational/ src/org/apache/pig/n...

Author: daijy
Date: Mon Nov  1 22:51:11 2010
New Revision: 1029876

URL: http://svn.apache.org/viewvc?rev=1029876&view=rev
Log:
PIG-1705: New logical plan: self-join fail for some queries

Added:
    pig/trunk/src/org/apache/pig/newplan/logical/optimizer/UidResetter.java
Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
    pig/trunk/src/org/apache/pig/newplan/logical/expression/LogicalExpression.java
    pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java
    pig/trunk/src/org/apache/pig/newplan/logical/optimizer/ProjectionPatcher.java
    pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCogroup.java
    pig/trunk/src/org/apache/pig/newplan/logical/relational/LOGenerate.java
    pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLoad.java
    pig/trunk/src/org/apache/pig/newplan/logical/relational/LOSplitOutput.java
    pig/trunk/src/org/apache/pig/newplan/logical/relational/LOStream.java
    pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java
    pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalRelationalOperator.java
    pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnPruneHelper.java
    pig/trunk/src/org/apache/pig/newplan/logical/rules/ImplicitSplitInserter.java
    pig/trunk/test/org/apache/pig/test/TestPruneColumn.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1029876&r1=1029875&r2=1029876&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Nov  1 22:51:11 2010
@@ -217,6 +217,8 @@ PIG-1309: Map-side Cogroup (ashutoshc)
 
 BUG FIXES
 
+PIG-1705: New logical plan: self-join fail for some queries (daijy)
+
 PIG-1704: Output Compression is not at work if the output path is absolute and there is a trailing / afte the compression suffix (yanz)
 
 PIG-1695: MergeForEach does not carry user defined schema if any one of the merged ForEach has user defined schema (daijy)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=1029876&r1=1029875&r2=1029876&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Mon Nov  1 22:51:11 2010
@@ -69,6 +69,7 @@ import org.apache.pig.newplan.logical.Lo
 import org.apache.pig.newplan.logical.expression.ConstantExpression;
 import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
 import org.apache.pig.newplan.logical.optimizer.SchemaResetter;
+import org.apache.pig.newplan.logical.optimizer.ProjectionPatcher.ProjectionFinder;
 import org.apache.pig.newplan.logical.relational.LOLimit;
 import org.apache.pig.newplan.logical.relational.LOSort;
 import org.apache.pig.newplan.logical.relational.LOSplit;

Modified: pig/trunk/src/org/apache/pig/newplan/logical/expression/LogicalExpression.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/expression/LogicalExpression.java?rev=1029876&r1=1029875&r2=1029876&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/expression/LogicalExpression.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/expression/LogicalExpression.java Mon Nov  1 22:51:11 2010
@@ -110,4 +110,12 @@ public abstract class LogicalExpression 
      */
     abstract public LogicalExpression deepCopy(LogicalExpressionPlan lgExpPlan) throws FrontendException;
 
+    /**
+     * Erase all cached uid, regenerate uid when we regenerating schema.
+     * This process currently only used in ImplicitSplitInsert, which will
+     * insert split and invalidate some uids in plan
+     */
+    public void resetUid() {
+        uidOnlyFieldSchema = null;
+    }
 }

Modified: pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java?rev=1029876&r1=1029875&r2=1029876&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java Mon Nov  1 22:51:11 2010
@@ -55,21 +55,21 @@ public class LogicalPlanOptimizer extend
     protected List<Set<Rule>> buildRuleSets() {
         List<Set<Rule>> ls = new ArrayList<Set<Rule>>();	    
 
-        // Logical expression simplifier
-        Set<Rule> s = new HashSet<Rule>();
-        // add logical expression simplification rule
-        Rule r = new LogicalExpressionSimplifier("FilterLogicExpressionSimplifier");
-        checkAndAddRule(s, r);
-        ls.add(s);
-
         // ImplicitSplitInserter set
         // This set of rules Insert Foreach dedicated for casting after load
-        s = new HashSet<Rule>();
-        r = new ImplicitSplitInserter("ImplicitSplitInserter");
+        Set<Rule> s = new HashSet<Rule>();
+        Rule r = new ImplicitSplitInserter("ImplicitSplitInserter");
         checkAndAddRule(s, r);
         if (!s.isEmpty())
             ls.add(s);
-        
+
+        // Logical expression simplifier
+        s = new HashSet<Rule>();
+        // add logical expression simplification rule
+        r = new LogicalExpressionSimplifier("FilterLogicExpressionSimplifier");
+        checkAndAddRule(s, r);
+        ls.add(s);
+
         // TypeCastInserter set
         // This set of rules Insert Foreach dedicated for casting after load
         s = new HashSet<Rule>();

Modified: pig/trunk/src/org/apache/pig/newplan/logical/optimizer/ProjectionPatcher.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/ProjectionPatcher.java?rev=1029876&r1=1029875&r2=1029876&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/optimizer/ProjectionPatcher.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/optimizer/ProjectionPatcher.java Mon Nov  1 22:51:11 2010
@@ -93,7 +93,7 @@ public class ProjectionPatcher implement
         }        
     }
     
-    private static class ProjectionFinder extends AllExpressionVisitor {
+    public static class ProjectionFinder extends AllExpressionVisitor {
 
         public ProjectionFinder(OperatorPlan plan) throws FrontendException {
             super(plan, new DependencyOrderWalker(plan));

Added: pig/trunk/src/org/apache/pig/newplan/logical/optimizer/UidResetter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/UidResetter.java?rev=1029876&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/optimizer/UidResetter.java (added)
+++ pig/trunk/src/org/apache/pig/newplan/logical/optimizer/UidResetter.java Mon Nov  1 22:51:11 2010
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan.logical.optimizer;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.util.MultiMap;
+import org.apache.pig.newplan.DependencyOrderWalker;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.PlanWalker;
+import org.apache.pig.newplan.ReverseDependencyOrderWalker;
+import org.apache.pig.newplan.logical.expression.AllSameExpressionVisitor;
+import org.apache.pig.newplan.logical.expression.LogicalExpression;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.logical.relational.LOCogroup;
+import org.apache.pig.newplan.logical.relational.LOCross;
+import org.apache.pig.newplan.logical.relational.LODistinct;
+import org.apache.pig.newplan.logical.relational.LOFilter;
+import org.apache.pig.newplan.logical.relational.LOForEach;
+import org.apache.pig.newplan.logical.relational.LOGenerate;
+import org.apache.pig.newplan.logical.relational.LOInnerLoad;
+import org.apache.pig.newplan.logical.relational.LOJoin;
+import org.apache.pig.newplan.logical.relational.LOLimit;
+import org.apache.pig.newplan.logical.relational.LOLoad;
+import org.apache.pig.newplan.logical.relational.LOSort;
+import org.apache.pig.newplan.logical.relational.LOSplit;
+import org.apache.pig.newplan.logical.relational.LOSplitOutput;
+import org.apache.pig.newplan.logical.relational.LOStore;
+import org.apache.pig.newplan.logical.relational.LOStream;
+import org.apache.pig.newplan.logical.relational.LOUnion;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalNodesVisitor;
+
+public class UidResetter extends LogicalRelationalNodesVisitor {
+
+    public UidResetter(OperatorPlan plan) throws FrontendException {
+        super(plan, new DependencyOrderWalker(plan));
+    }
+
+    @Override
+    public void visit(LOLoad load) throws FrontendException {
+        load.resetUid();
+    }
+
+    @Override
+    public void visit(LOFilter filter) throws FrontendException {
+        filter.resetUid();
+        ExpressionUidResetter uidResetter = new ExpressionUidResetter(filter.getFilterPlan());
+        uidResetter.visit();
+    }
+    
+    @Override
+    public void visit(LOStore store) throws FrontendException {
+        store.resetUid();
+    }
+    
+    @Override
+    public void visit(LOJoin join) throws FrontendException {
+        join.resetUid();
+        Collection<LogicalExpressionPlan> joinPlans = join.getExpressionPlanValues();
+        for (LogicalExpressionPlan joinPlan : joinPlans) {
+            ExpressionUidResetter fsResetter = new ExpressionUidResetter(joinPlan);
+            fsResetter.visit();
+        }
+    }
+    
+    @Override
+    public void visit(LOForEach foreach) throws FrontendException {
+        foreach.resetUid();
+        OperatorPlan innerPlan = foreach.getInnerPlan();
+        PlanWalker newWalker = currentWalker.spawnChildWalker(innerPlan);
+        pushWalker(newWalker);
+        currentWalker.walk(this);
+        popWalker();
+    }
+    
+    @Override
+    public void visit(LOGenerate gen) throws FrontendException {
+        gen.resetUid();
+        List<LogicalExpressionPlan> genPlans = gen.getOutputPlans();
+        for (LogicalExpressionPlan genPlan : genPlans) {
+            ExpressionUidResetter fsResetter = new ExpressionUidResetter(genPlan);
+            fsResetter.visit();
+        }
+    }
+    
+    @Override
+    public void visit(LOInnerLoad load) throws FrontendException {
+        load.resetUid();
+        load.getProjection().resetUid();
+    }
+
+    @Override
+    public void visit(LOCogroup loCogroup) throws FrontendException {
+        loCogroup.resetUid();
+        MultiMap<Integer, LogicalExpressionPlan> expPlans = loCogroup.getExpressionPlans();
+        for (LogicalExpressionPlan expPlan : expPlans.values()) {
+            ExpressionUidResetter uidResetter = new ExpressionUidResetter(expPlan);
+            uidResetter.visit();
+        }
+    }
+    
+    @Override
+    public void visit(LOSplit loSplit) throws FrontendException {
+        loSplit.resetUid();
+    }
+    
+    @Override
+    public void visit(LOSplitOutput loSplitOutput) throws FrontendException {
+        loSplitOutput.resetUid();
+        ExpressionUidResetter uidResetter = new ExpressionUidResetter(loSplitOutput.getFilterPlan());
+        uidResetter.visit();
+    }
+    
+    @Override
+    public void visit(LOUnion loUnion) throws FrontendException {
+        loUnion.resetUid();
+    }
+    
+    @Override
+    public void visit(LOSort loSort) throws FrontendException {
+        loSort.resetUid();
+        List<LogicalExpressionPlan> sortPlans = loSort.getSortColPlans();
+        for (LogicalExpressionPlan sortPlan : sortPlans) {
+            ExpressionUidResetter uidResetter = new ExpressionUidResetter(sortPlan);
+            uidResetter.visit();
+        }
+    }
+    
+    @Override
+    public void visit(LODistinct loDistinct) throws FrontendException {
+        loDistinct.resetUid();
+    }
+    
+    @Override
+    public void visit(LOLimit loLimit) throws FrontendException {
+        loLimit.resetUid();
+    }
+    
+    @Override
+    public void visit(LOCross loCross) throws FrontendException {
+        loCross.resetUid();
+    }
+    
+    @Override
+    public void visit(LOStream loStream) throws FrontendException {
+        loStream.resetUid();
+    }
+}
+
+class ExpressionUidResetter extends AllSameExpressionVisitor {
+    protected ExpressionUidResetter(OperatorPlan p) throws FrontendException {
+        super(p, new ReverseDependencyOrderWalker(p));
+    }
+
+    @Override
+    protected void execute(LogicalExpression op) throws FrontendException {
+        op.resetUid();
+    }
+}

Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCogroup.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCogroup.java?rev=1029876&r1=1029875&r2=1029876&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCogroup.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCogroup.java Mon Nov  1 22:51:11 2010
@@ -283,4 +283,9 @@ public class LOCogroup extends LogicalRe
         return mIsInner;
     }
 
+    @Override
+    public void resetUid() {
+        groupKeyUidOnlySchema = null;
+        generatedInputUids = new HashMap<Integer,Long>();
+    }
 }

Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LOGenerate.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOGenerate.java?rev=1029876&r1=1029875&r2=1029876&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LOGenerate.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LOGenerate.java Mon Nov  1 22:51:11 2010
@@ -275,4 +275,9 @@ public class LOGenerate extends LogicalR
     public void setUidOnlySchemas(List<LogicalSchema> uidOnlySchemas) {
         this.uidOnlySchemas = uidOnlySchemas;
     }
+    
+    @Override
+    public void resetUid() {
+        this.uidOnlySchemas = null;
+    }
 }

Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLoad.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLoad.java?rev=1029876&r1=1029875&r2=1029876&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLoad.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLoad.java Mon Nov  1 22:51:11 2010
@@ -194,4 +194,9 @@ public class LOLoad extends LogicalRelat
     public Configuration getConfiguration() {
         return conf;
     }
+    
+    @Override
+    public void resetUid() {
+        uidOnlySchema = null;
+    }
 }

Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LOSplitOutput.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOSplitOutput.java?rev=1029876&r1=1029875&r2=1029876&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LOSplitOutput.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LOSplitOutput.java Mon Nov  1 22:51:11 2010
@@ -18,13 +18,22 @@
 
 package org.apache.pig.newplan.logical.relational;
 
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
 import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.util.Pair;
 import org.apache.pig.newplan.Operator;
 import org.apache.pig.newplan.PlanVisitor;
+import org.apache.pig.newplan.logical.expression.LogicalExpression;
 import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.logical.relational.LogicalSchema.LogicalFieldSchema;
 
 public class LOSplitOutput extends LogicalRelationalOperator {
     private LogicalExpressionPlan filterPlan;
+    private Map<Long, Long> uidMapping = new HashMap<Long, Long>();
     public LOSplitOutput(LogicalPlan plan) {
         super("LOSplitOutput", plan);       
     }
@@ -50,7 +59,19 @@ public class LOSplitOutput extends Logic
         LogicalRelationalOperator input = null;
         input = (LogicalRelationalOperator)plan.getPredecessors(this).get(0);
         
-        schema = input.getSchema();
+        if (input.getSchema()!=null) {
+            schema = input.getSchema().deepCopy();
+            for (LogicalFieldSchema fs : schema.getFields()) {
+                if (uidMapping.containsKey(fs.uid)) {
+                    fs.uid = uidMapping.get(fs.uid);
+                }
+                else {
+                    long predUid = fs.uid;
+                    fs.uid = LogicalExpression.getNextUid();
+                    uidMapping.put(predUid, fs.uid);
+                }
+            }
+        }
         return schema;
     }   
     
@@ -71,4 +92,17 @@ public class LOSplitOutput extends Logic
             return false;
         }
     }
+    
+    @Override
+    public void resetUid() {
+        uidMapping = new HashMap<Long, Long>();
+    }
+    
+    public long getInputUids(long uid) {
+        for (Map.Entry<Long, Long> pair : uidMapping.entrySet()) {
+            if (pair.getValue()==uid)
+                return pair.getKey();
+        }
+        return -1;
+    }
 }

Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LOStream.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOStream.java?rev=1029876&r1=1029875&r2=1029876&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LOStream.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LOStream.java Mon Nov  1 22:51:11 2010
@@ -109,4 +109,8 @@ public class LOStream extends LogicalRel
         return castInserted;
     }
 
+    @Override
+    public void resetUid() {
+        uidOnlySchema = null;
+    }
 }

Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java?rev=1029876&r1=1029875&r2=1029876&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java Mon Nov  1 22:51:11 2010
@@ -120,4 +120,9 @@ public class LOUnion extends LogicalRela
         }
         return result;
     }
+    
+    @Override
+    public void resetUid() {
+        uidMapping = new ArrayList<Pair<Long, Long>>();
+    }
 }

Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalRelationalOperator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalRelationalOperator.java?rev=1029876&r1=1029875&r2=1029876&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalRelationalOperator.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalRelationalOperator.java Mon Nov  1 22:51:11 2010
@@ -83,7 +83,14 @@ abstract public class LogicalRelationalO
     public void resetSchema() {
         schema = null;
     }
- 
+    
+    /**
+     * Erase all cached uid, regenerate uid when we regenerating schema.
+     * This process currently only used in ImplicitSplitInsert, which will
+     * insert split and invalidate some uids in plan
+     */
+    public void resetUid() {
+    }
 
     /**
      * Get the requestedParallelism for this operator.

Modified: pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnPruneHelper.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnPruneHelper.java?rev=1029876&r1=1029875&r2=1029876&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnPruneHelper.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnPruneHelper.java Mon Nov  1 22:51:11 2010
@@ -367,7 +367,11 @@ public class ColumnPruneHelper {
             
             // the input uids contains all the output uids and
             // projections in splitOutput conditions
-            Set<Long> input = new HashSet<Long>(output);
+            Set<Long> input = new HashSet<Long>();
+            
+            for (long uid : output) {
+                input.add(splitOutput.getInputUids(uid));
+            }
             
             LogicalExpressionPlan exp = splitOutput.getFilterPlan();
             collectUids(splitOutput, exp, input);

Modified: pig/trunk/src/org/apache/pig/newplan/logical/rules/ImplicitSplitInserter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/rules/ImplicitSplitInserter.java?rev=1029876&r1=1029875&r2=1029876&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/rules/ImplicitSplitInserter.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/rules/ImplicitSplitInserter.java Mon Nov  1 22:51:11 2010
@@ -26,6 +26,7 @@ import org.apache.pig.data.DataType;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.util.Pair;
 import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.logical.optimizer.UidResetter;
 import org.apache.pig.newplan.logical.relational.LogicalPlan;
 import org.apache.pig.newplan.OperatorPlan;
 import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
@@ -127,6 +128,8 @@ public class ImplicitSplitInserter exten
           currentPlan.connect(splitOp, splitOutput);
           currentPlan.connect(splitOutput, pos.first, suc, pos.second);
         }
+        UidResetter uidResetter = new UidResetter(currentPlan);
+        uidResetter.visit();
       }
       
       @Override

Modified: pig/trunk/test/org/apache/pig/test/TestPruneColumn.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPruneColumn.java?rev=1029876&r1=1029875&r2=1029876&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPruneColumn.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPruneColumn.java Mon Nov  1 22:51:11 2010
@@ -59,6 +59,7 @@ public class TestPruneColumn extends Tes
     File tmpFile8;
     File tmpFile9;
     File tmpFile10;
+    File tmpFile11;
     File logFile;
 
     private static final String simpleEchoStreamingCommand;
@@ -149,6 +150,12 @@ public class TestPruneColumn extends Tes
         ps.println("1\t[1#1,2#1]\t2");
         ps.close();
 
+        tmpFile11 = File.createTempFile("prune", "txt");
+        ps = new PrintStream(new FileOutputStream(tmpFile11));
+        ps.println("1\t2\t3");
+        ps.println("1\t3\t2");
+        ps.println("2\t5\t2");
+        ps.close();
     }
     
     @After
@@ -1832,6 +1839,34 @@ public class TestPruneColumn extends Tes
 
         assertTrue(emptyLogFileMessage());
     }
+    
+    @Test
+    public void testSplit5() throws Exception {
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile11.toString(), pigServer.getPigContext()) + "' AS (a0:int, a1:int, a2:int);");
+        pigServer.registerQuery("B = foreach A generate a0, a1;");
+        pigServer.registerQuery("C = join A by a0, B by a0;");
+        pigServer.registerQuery("D = filter C by A::a1>=B::a1;");
+        Iterator<Tuple> iter = pigServer.openIterator("D");
+
+        assertTrue(iter.hasNext());
+        Tuple t = iter.next();
+        assertTrue(t.toString().equals("(1,2,3,1,2)"));
+        
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        assertTrue(t.toString().equals("(1,3,2,1,2)"));
+        
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        assertTrue(t.toString().equals("(1,3,2,1,3)"));
+
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        assertTrue(t.toString().equals("(2,5,2,2,5)"));
+        
+        assertTrue(emptyLogFileMessage());
+    }
+
 
     // See PIG-1493
     @Test
@@ -1889,6 +1924,4 @@ public class TestPruneColumn extends Tes
         reader1.close();
         reader2.close();
     }
-
-
 }