You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2010/10/19 21:53:37 UTC

svn commit: r1024382 - in /pig/branches/branch-0.8: ./ src/org/apache/pig/newplan/ src/org/apache/pig/newplan/logical/relational/ test/org/apache/pig/test/

Author: daijy
Date: Tue Oct 19 19:53:37 2010
New Revision: 1024382

URL: http://svn.apache.org/viewvc?rev=1024382&view=rev
Log:
PIG-1683: New logical plan: Nested foreach plan fail if one inner alias is refered more than once

Added:
    pig/branches/branch-0.8/src/org/apache/pig/newplan/ReverseDependencyOrderWalkerWOSeenChk.java
Modified:
    pig/branches/branch-0.8/CHANGES.txt
    pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
    pig/branches/branch-0.8/test/org/apache/pig/test/TestEvalPipeline2.java

Modified: pig/branches/branch-0.8/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/CHANGES.txt?rev=1024382&r1=1024381&r2=1024382&view=diff
==============================================================================
--- pig/branches/branch-0.8/CHANGES.txt (original)
+++ pig/branches/branch-0.8/CHANGES.txt Tue Oct 19 19:53:37 2010
@@ -200,6 +200,8 @@ PIG-1309: Map-side Cogroup (ashutoshc)
 
 BUG FIXES
 
+PIG-1683: New logical plan: Nested foreach plan fail if one inner alias is refered more than once (daijy)
+
 PIG-1542: log level not propogated to MR task loggers (nrai via daijy)
 
 PIG-1673: query with consecutive union-onschema statement errors out (thejas)

Added: pig/branches/branch-0.8/src/org/apache/pig/newplan/ReverseDependencyOrderWalkerWOSeenChk.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/src/org/apache/pig/newplan/ReverseDependencyOrderWalkerWOSeenChk.java?rev=1024382&view=auto
==============================================================================
--- pig/branches/branch-0.8/src/org/apache/pig/newplan/ReverseDependencyOrderWalkerWOSeenChk.java (added)
+++ pig/branches/branch-0.8/src/org/apache/pig/newplan/ReverseDependencyOrderWalkerWOSeenChk.java Tue Oct 19 19:53:37 2010
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.util.Utils;
+
+/**
+ * Visit a plan in the reverse of the dependency order.  That is, every node
+ * after every node that depends on it is visited.  Thus this is equivalent to
+ * doing a reverse topilogical sort on the graph and then visiting it in order.
+ */
+public class ReverseDependencyOrderWalkerWOSeenChk extends PlanWalker {
+
+    public ReverseDependencyOrderWalkerWOSeenChk(OperatorPlan plan) {
+        super(plan);
+    }
+
+    @Override
+    public PlanWalker spawnChildWalker(OperatorPlan plan) {
+        return new ReverseDependencyOrderWalker(plan);
+    }
+
+    /**
+     * Begin traversing the graph.
+     * @param visitor Visitor this walker is being used by.
+     * @throws VisitorException if an error is encountered while walking.
+     */
+    @Override
+    public void walk(PlanVisitor visitor) throws FrontendException {
+        // This is highly inefficient, but our graphs are small so it should be okay.
+        // The algorithm works by starting at any node in the graph, finding it's
+        // successors and calling itself for each of those successors.  When it
+        // finds a node that has no unfinished successors it puts that node in the
+        // list.  It then unwinds itself putting each of the other nodes in the list.
+        // It keeps track of what nodes it's seen as it goes so it doesn't put any
+        // nodes in the graph twice.
+
+        List<Operator> fifo = new ArrayList<Operator>();
+        List<Operator> roots = plan.getSources();
+        if (roots == null) return;
+        for (Operator op : roots) {
+            doAllSuccessors(op, fifo);
+        }
+
+        for (Operator op: fifo) {
+            op.accept(visitor);
+        }
+    }
+
+    protected void doAllSuccessors(Operator node,
+                                   Collection<Operator> fifo) throws FrontendException {
+        Collection<Operator> succs = Utils.mergeCollection(plan.getSuccessors(node), plan.getSoftLinkSuccessors(node));
+        if (succs != null && succs.size() > 0) {
+            // Do all our successors before ourself
+            for (Operator op : succs) {
+                doAllSuccessors(op, fifo);
+            }
+        }
+        // Now do ourself
+        fifo.add(node);
+    }
+}

Modified: pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java?rev=1024382&r1=1024381&r2=1024382&view=diff
==============================================================================
--- pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java (original)
+++ pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java Tue Oct 19 19:53:37 2010
@@ -20,7 +20,6 @@ package org.apache.pig.newplan.logical.r
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -64,10 +63,6 @@ import org.apache.pig.impl.builtin.GFCro
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.logicalLayer.LogicalOperator;
-import org.apache.pig.impl.logicalLayer.LogicalPlan;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.PlanException;
@@ -79,13 +74,12 @@ import org.apache.pig.newplan.Dependency
 import org.apache.pig.newplan.Operator;
 import org.apache.pig.newplan.OperatorPlan;
 import org.apache.pig.newplan.PlanWalker;
-import org.apache.pig.newplan.ReverseDependencyOrderWalker;
+import org.apache.pig.newplan.ReverseDependencyOrderWalkerWOSeenChk;
 import org.apache.pig.newplan.SubtreeDependencyOrderWalker;
 import org.apache.pig.newplan.logical.Util;
 import org.apache.pig.newplan.logical.expression.ExpToPhyTranslationVisitor;
 import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
 import org.apache.pig.newplan.logical.expression.ProjectExpression;
-import org.apache.pig.newplan.logical.relational.LogicalSchema.LogicalFieldSchema;
 import org.apache.pig.impl.util.Utils;
 
 public class LogToPhyTranslationVisitor extends LogicalRelationalNodesVisitor {
@@ -201,7 +195,7 @@ public class LogToPhyTranslationVisitor 
 
 //        PlanWalker childWalker = currentWalker
 //                .spawnChildWalker(filter.getFilterPlan());
-        PlanWalker childWalker = new ReverseDependencyOrderWalker(filter.getFilterPlan());
+        PlanWalker childWalker = new ReverseDependencyOrderWalkerWOSeenChk(filter.getFilterPlan());
         pushWalker(childWalker);
         //currentWalker.walk(this);
         currentWalker.walk(
@@ -245,7 +239,7 @@ public class LogToPhyTranslationVisitor 
         currentPlans.push(currentPlan);
         for (LogicalExpressionPlan plan : logPlans) {
             currentPlan = new PhysicalPlan();
-            PlanWalker childWalker = new ReverseDependencyOrderWalker(plan);
+            PlanWalker childWalker = new ReverseDependencyOrderWalkerWOSeenChk(plan);
             pushWalker(childWalker);
             childWalker.walk(new ExpToPhyTranslationVisitor( currentWalker.getPlan(), 
                     childWalker, sort, currentPlan, logToPhyMap));
@@ -514,7 +508,7 @@ public class LogToPhyTranslationVisitor 
         for (int i=0; i<exps.size(); i++) {
             currentPlan = new PhysicalPlan();
             // translate the expression plan
-            PlanWalker childWalker = new ReverseDependencyOrderWalker(exps.get(i));
+            PlanWalker childWalker = new ReverseDependencyOrderWalkerWOSeenChk(exps.get(i));
             pushWalker(childWalker);
             childWalker.walk(new ExpToPhyTranslationVisitor(exps.get(i),
                     childWalker, gen, currentPlan, logToPhyMap));            
@@ -615,7 +609,7 @@ public class LogToPhyTranslationVisitor 
             
             // We spawn a new Dependency Walker and use it 
             // PlanWalker childWalker = currentWalker.spawnChildWalker(lp);
-            PlanWalker childWalker = new ReverseDependencyOrderWalker(lp);
+            PlanWalker childWalker = new ReverseDependencyOrderWalkerWOSeenChk(lp);
             
             // Save the old walker and use childWalker as current Walker
             pushWalker(childWalker);
@@ -1316,7 +1310,7 @@ public class LogToPhyTranslationVisitor 
 
 //        PlanWalker childWalker = currentWalker
 //                .spawnChildWalker(filter.getFilterPlan());
-        PlanWalker childWalker = new ReverseDependencyOrderWalker(loSplitOutput.getFilterPlan());
+        PlanWalker childWalker = new ReverseDependencyOrderWalkerWOSeenChk(loSplitOutput.getFilterPlan());
         pushWalker(childWalker);
         //currentWalker.walk(this);
         currentWalker.walk(

Modified: pig/branches/branch-0.8/test/org/apache/pig/test/TestEvalPipeline2.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/test/org/apache/pig/test/TestEvalPipeline2.java?rev=1024382&r1=1024381&r2=1024382&view=diff
==============================================================================
--- pig/branches/branch-0.8/test/org/apache/pig/test/TestEvalPipeline2.java (original)
+++ pig/branches/branch-0.8/test/org/apache/pig/test/TestEvalPipeline2.java Tue Oct 19 19:53:37 2010
@@ -776,4 +776,31 @@ public class TestEvalPipeline2 extends T
         
         assertFalse(iter.hasNext());
     }
+
+    // See PIG-1683
+    @Test
+    public void testDuplicateReferenceInnerPlan() throws Exception{
+        String[] input1 = {
+                "1\t1\t1",
+        };
+        
+        String[] input2 = {
+                "1\t1",
+                "2\t2"
+        };
+        
+        Util.createInputFile(cluster, "table_testDuplicateReferenceInnerPlan1", input1);
+        Util.createInputFile(cluster, "table_testDuplicateReferenceInnerPlan2", input2);
+        pigServer.registerQuery("a = load 'table_testDuplicateReferenceInnerPlan1' as (a0, a1, a2);");
+        pigServer.registerQuery("b = load 'table_testDuplicateReferenceInnerPlan2' as (b0, b1);");
+        pigServer.registerQuery("c = join a by a0, b by b0;");
+        pigServer.registerQuery("d = foreach c {d0 = a::a1;d1 = a::a2;generate ((d0 is not null)? d0 : d1);};");
+        
+        Iterator<Tuple> iter = pigServer.openIterator("d");
+        
+        Tuple t = iter.next();
+        assertTrue(t.toString().equals("(1)"));
+        
+        assertFalse(iter.hasNext());
+    }
 }