You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2010/10/19 21:53:37 UTC
svn commit: r1024382 - in /pig/branches/branch-0.8: ./
src/org/apache/pig/newplan/ src/org/apache/pig/newplan/logical/relational/
test/org/apache/pig/test/
Author: daijy
Date: Tue Oct 19 19:53:37 2010
New Revision: 1024382
URL: http://svn.apache.org/viewvc?rev=1024382&view=rev
Log:
PIG-1683: New logical plan: Nested foreach plan fail if one inner alias is refered more than once
Added:
pig/branches/branch-0.8/src/org/apache/pig/newplan/ReverseDependencyOrderWalkerWOSeenChk.java
Modified:
pig/branches/branch-0.8/CHANGES.txt
pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
pig/branches/branch-0.8/test/org/apache/pig/test/TestEvalPipeline2.java
Modified: pig/branches/branch-0.8/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/CHANGES.txt?rev=1024382&r1=1024381&r2=1024382&view=diff
==============================================================================
--- pig/branches/branch-0.8/CHANGES.txt (original)
+++ pig/branches/branch-0.8/CHANGES.txt Tue Oct 19 19:53:37 2010
@@ -200,6 +200,8 @@ PIG-1309: Map-side Cogroup (ashutoshc)
BUG FIXES
+PIG-1683: New logical plan: Nested foreach plan fail if one inner alias is refered more than once (daijy)
+
PIG-1542: log level not propogated to MR task loggers (nrai via daijy)
PIG-1673: query with consecutive union-onschema statement errors out (thejas)
Added: pig/branches/branch-0.8/src/org/apache/pig/newplan/ReverseDependencyOrderWalkerWOSeenChk.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/src/org/apache/pig/newplan/ReverseDependencyOrderWalkerWOSeenChk.java?rev=1024382&view=auto
==============================================================================
--- pig/branches/branch-0.8/src/org/apache/pig/newplan/ReverseDependencyOrderWalkerWOSeenChk.java (added)
+++ pig/branches/branch-0.8/src/org/apache/pig/newplan/ReverseDependencyOrderWalkerWOSeenChk.java Tue Oct 19 19:53:37 2010
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.util.Utils;
+
+/**
+ * Visit a plan in the reverse of the dependency order. That is, every node
+ * after every node that depends on it is visited. Thus this is equivalent to
+ * doing a reverse topilogical sort on the graph and then visiting it in order.
+ */
+public class ReverseDependencyOrderWalkerWOSeenChk extends PlanWalker {
+
+ public ReverseDependencyOrderWalkerWOSeenChk(OperatorPlan plan) {
+ super(plan);
+ }
+
+ @Override
+ public PlanWalker spawnChildWalker(OperatorPlan plan) {
+ return new ReverseDependencyOrderWalker(plan);
+ }
+
+ /**
+ * Begin traversing the graph.
+ * @param visitor Visitor this walker is being used by.
+ * @throws VisitorException if an error is encountered while walking.
+ */
+ @Override
+ public void walk(PlanVisitor visitor) throws FrontendException {
+ // This is highly inefficient, but our graphs are small so it should be okay.
+ // The algorithm works by starting at any node in the graph, finding it's
+ // successors and calling itself for each of those successors. When it
+ // finds a node that has no unfinished successors it puts that node in the
+ // list. It then unwinds itself putting each of the other nodes in the list.
+ // It keeps track of what nodes it's seen as it goes so it doesn't put any
+ // nodes in the graph twice.
+
+ List<Operator> fifo = new ArrayList<Operator>();
+ List<Operator> roots = plan.getSources();
+ if (roots == null) return;
+ for (Operator op : roots) {
+ doAllSuccessors(op, fifo);
+ }
+
+ for (Operator op: fifo) {
+ op.accept(visitor);
+ }
+ }
+
+ protected void doAllSuccessors(Operator node,
+ Collection<Operator> fifo) throws FrontendException {
+ Collection<Operator> succs = Utils.mergeCollection(plan.getSuccessors(node), plan.getSoftLinkSuccessors(node));
+ if (succs != null && succs.size() > 0) {
+ // Do all our successors before ourself
+ for (Operator op : succs) {
+ doAllSuccessors(op, fifo);
+ }
+ }
+ // Now do ourself
+ fifo.add(node);
+ }
+}
Modified: pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java?rev=1024382&r1=1024381&r2=1024382&view=diff
==============================================================================
--- pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java (original)
+++ pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java Tue Oct 19 19:53:37 2010
@@ -20,7 +20,6 @@ package org.apache.pig.newplan.logical.r
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -64,10 +63,6 @@ import org.apache.pig.impl.builtin.GFCro
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.logicalLayer.LogicalOperator;
-import org.apache.pig.impl.logicalLayer.LogicalPlan;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.PlanException;
@@ -79,13 +74,12 @@ import org.apache.pig.newplan.Dependency
import org.apache.pig.newplan.Operator;
import org.apache.pig.newplan.OperatorPlan;
import org.apache.pig.newplan.PlanWalker;
-import org.apache.pig.newplan.ReverseDependencyOrderWalker;
+import org.apache.pig.newplan.ReverseDependencyOrderWalkerWOSeenChk;
import org.apache.pig.newplan.SubtreeDependencyOrderWalker;
import org.apache.pig.newplan.logical.Util;
import org.apache.pig.newplan.logical.expression.ExpToPhyTranslationVisitor;
import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
import org.apache.pig.newplan.logical.expression.ProjectExpression;
-import org.apache.pig.newplan.logical.relational.LogicalSchema.LogicalFieldSchema;
import org.apache.pig.impl.util.Utils;
public class LogToPhyTranslationVisitor extends LogicalRelationalNodesVisitor {
@@ -201,7 +195,7 @@ public class LogToPhyTranslationVisitor
// PlanWalker childWalker = currentWalker
// .spawnChildWalker(filter.getFilterPlan());
- PlanWalker childWalker = new ReverseDependencyOrderWalker(filter.getFilterPlan());
+ PlanWalker childWalker = new ReverseDependencyOrderWalkerWOSeenChk(filter.getFilterPlan());
pushWalker(childWalker);
//currentWalker.walk(this);
currentWalker.walk(
@@ -245,7 +239,7 @@ public class LogToPhyTranslationVisitor
currentPlans.push(currentPlan);
for (LogicalExpressionPlan plan : logPlans) {
currentPlan = new PhysicalPlan();
- PlanWalker childWalker = new ReverseDependencyOrderWalker(plan);
+ PlanWalker childWalker = new ReverseDependencyOrderWalkerWOSeenChk(plan);
pushWalker(childWalker);
childWalker.walk(new ExpToPhyTranslationVisitor( currentWalker.getPlan(),
childWalker, sort, currentPlan, logToPhyMap));
@@ -514,7 +508,7 @@ public class LogToPhyTranslationVisitor
for (int i=0; i<exps.size(); i++) {
currentPlan = new PhysicalPlan();
// translate the expression plan
- PlanWalker childWalker = new ReverseDependencyOrderWalker(exps.get(i));
+ PlanWalker childWalker = new ReverseDependencyOrderWalkerWOSeenChk(exps.get(i));
pushWalker(childWalker);
childWalker.walk(new ExpToPhyTranslationVisitor(exps.get(i),
childWalker, gen, currentPlan, logToPhyMap));
@@ -615,7 +609,7 @@ public class LogToPhyTranslationVisitor
// We spawn a new Dependency Walker and use it
// PlanWalker childWalker = currentWalker.spawnChildWalker(lp);
- PlanWalker childWalker = new ReverseDependencyOrderWalker(lp);
+ PlanWalker childWalker = new ReverseDependencyOrderWalkerWOSeenChk(lp);
// Save the old walker and use childWalker as current Walker
pushWalker(childWalker);
@@ -1316,7 +1310,7 @@ public class LogToPhyTranslationVisitor
// PlanWalker childWalker = currentWalker
// .spawnChildWalker(filter.getFilterPlan());
- PlanWalker childWalker = new ReverseDependencyOrderWalker(loSplitOutput.getFilterPlan());
+ PlanWalker childWalker = new ReverseDependencyOrderWalkerWOSeenChk(loSplitOutput.getFilterPlan());
pushWalker(childWalker);
//currentWalker.walk(this);
currentWalker.walk(
Modified: pig/branches/branch-0.8/test/org/apache/pig/test/TestEvalPipeline2.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/test/org/apache/pig/test/TestEvalPipeline2.java?rev=1024382&r1=1024381&r2=1024382&view=diff
==============================================================================
--- pig/branches/branch-0.8/test/org/apache/pig/test/TestEvalPipeline2.java (original)
+++ pig/branches/branch-0.8/test/org/apache/pig/test/TestEvalPipeline2.java Tue Oct 19 19:53:37 2010
@@ -776,4 +776,31 @@ public class TestEvalPipeline2 extends T
assertFalse(iter.hasNext());
}
+
+ // See PIG-1683
+ @Test
+ public void testDuplicateReferenceInnerPlan() throws Exception{
+ String[] input1 = {
+ "1\t1\t1",
+ };
+
+ String[] input2 = {
+ "1\t1",
+ "2\t2"
+ };
+
+ Util.createInputFile(cluster, "table_testDuplicateReferenceInnerPlan1", input1);
+ Util.createInputFile(cluster, "table_testDuplicateReferenceInnerPlan2", input2);
+ pigServer.registerQuery("a = load 'table_testDuplicateReferenceInnerPlan1' as (a0, a1, a2);");
+ pigServer.registerQuery("b = load 'table_testDuplicateReferenceInnerPlan2' as (b0, b1);");
+ pigServer.registerQuery("c = join a by a0, b by b0;");
+ pigServer.registerQuery("d = foreach c {d0 = a::a1;d1 = a::a2;generate ((d0 is not null)? d0 : d1);};");
+
+ Iterator<Tuple> iter = pigServer.openIterator("d");
+
+ Tuple t = iter.next();
+ assertTrue(t.toString().equals("(1)"));
+
+ assertFalse(iter.hasNext());
+ }
}