You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2010/11/21 09:53:57 UTC

svn commit: r1037399 - in /pig/trunk: ./ src/org/apache/pig/impl/builtin/ src/org/apache/pig/newplan/logical/optimizer/ src/org/apache/pig/newplan/logical/rules/ test/org/apache/pig/test/

Author: daijy
Date: Sun Nov 21 08:53:57 2010
New Revision: 1037399

URL: http://svn.apache.org/viewvc?rev=1037399&view=rev
Log:
PIG-1732: New logical plan: logical plan get confused if we generate the same field twice in ForEach

Added:
    pig/trunk/src/org/apache/pig/impl/builtin/IdentityColumn.java
    pig/trunk/src/org/apache/pig/newplan/logical/rules/DuplicateForEachColumnRewrite.java
Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java
    pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1037399&r1=1037398&r2=1037399&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Sun Nov 21 08:53:57 2010
@@ -220,6 +220,9 @@ PIG-1309: Map-side Cogroup (ashutoshc)
 
 BUG FIXES
 
+PIG-1732: New logical plan: logical plan get confused if we generate the same
+field twice in ForEach (daijy)
+
 PIG-1737: New logical plan: Improve error messages when merge schema fail (daijy)
 
 PIG-1725: New logical plan: uidOnlySchema bug in LOGenerate (daijy)

Added: pig/trunk/src/org/apache/pig/impl/builtin/IdentityColumn.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/builtin/IdentityColumn.java?rev=1037399&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/builtin/IdentityColumn.java (added)
+++ pig/trunk/src/org/apache/pig/impl/builtin/IdentityColumn.java Sun Nov 21 08:53:57 2010
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.builtin;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+
+/**
+ * Returns the input, used by DuplicateForEachColumnRewrite
+ * to rewrite duplicate columns in ForEach. Not intended for 
+ * external use
+ */
+public class IdentityColumn extends EvalFunc<Object> {
+
+    @Override
+    public Object exec(Tuple input) throws ExecException {
+        return input.get(0);
+    }
+
+    @Override
+    public Schema outputSchema(Schema input) {
+        return input; 
+    }
+
+}

Modified: pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java?rev=1037399&r1=1037398&r2=1037399&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java Sun Nov 21 08:53:57 2010
@@ -25,6 +25,7 @@ import java.util.Set;
 import org.apache.pig.newplan.OperatorPlan;
 import org.apache.pig.newplan.logical.rules.AddForEach;
 import org.apache.pig.newplan.logical.rules.ColumnMapKeyPrune;
+import org.apache.pig.newplan.logical.rules.DuplicateForEachColumnRewrite;
 import org.apache.pig.newplan.logical.rules.FilterAboveForeach;
 import org.apache.pig.newplan.logical.rules.GroupByConstParallelSetter;
 import org.apache.pig.newplan.logical.rules.ImplicitSplitInserter;
@@ -55,10 +56,20 @@ public class LogicalPlanOptimizer extend
     protected List<Set<Rule>> buildRuleSets() {
         List<Set<Rule>> ls = new ArrayList<Set<Rule>>();	    
 
+        
+        // DuplicateForEachColumnRewrite set
+        // This insert Identity UDF in the case foreach duplicate field.
+        // This is because we need unique uid through out the plan
+        Set<Rule> s = new HashSet<Rule>();
+        Rule r = new DuplicateForEachColumnRewrite("DuplicateForEachColumnRewrite");
+        checkAndAddRule(s, r);
+        if (!s.isEmpty())
+            ls.add(s);
+        
         // ImplicitSplitInserter set
         // This set of rules Insert Foreach dedicated for casting after load
-        Set<Rule> s = new HashSet<Rule>();
-        Rule r = new ImplicitSplitInserter("ImplicitSplitInserter");
+        s = new HashSet<Rule>();
+        r = new ImplicitSplitInserter("ImplicitSplitInserter");
         checkAndAddRule(s, r);
         if (!s.isEmpty())
             ls.add(s);

Added: pig/trunk/src/org/apache/pig/newplan/logical/rules/DuplicateForEachColumnRewrite.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/rules/DuplicateForEachColumnRewrite.java?rev=1037399&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/rules/DuplicateForEachColumnRewrite.java (added)
+++ pig/trunk/src/org/apache/pig/newplan/logical/rules/DuplicateForEachColumnRewrite.java Sun Nov 21 08:53:57 2010
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan.logical.rules;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.pig.FuncSpec;
+import org.apache.pig.impl.builtin.IdentityColumn;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.util.Pair;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.OperatorSubPlan;
+import org.apache.pig.newplan.logical.expression.LogicalExpression;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.logical.expression.ProjectExpression;
+import org.apache.pig.newplan.logical.expression.UserFuncExpression;
+import org.apache.pig.newplan.logical.relational.LOCross;
+import org.apache.pig.newplan.logical.relational.LOForEach;
+import org.apache.pig.newplan.logical.relational.LOGenerate;
+import org.apache.pig.newplan.logical.relational.LOInnerLoad;
+import org.apache.pig.newplan.logical.relational.LOJoin;
+import org.apache.pig.newplan.logical.relational.LOSort;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.newplan.logical.relational.LogicalSchema;
+import org.apache.pig.newplan.logical.rules.PushDownForEachFlatten.PushDownForEachFlattenTransformer;
+import org.apache.pig.newplan.optimizer.Rule;
+import org.apache.pig.newplan.optimizer.Transformer;
+
+/*
+ * This rule rewrite duplicate column projection into Identity UDF.
+ * So that we can generate different uid for each column
+ */
+public class DuplicateForEachColumnRewrite extends Rule {
+
+    public DuplicateForEachColumnRewrite(String n) {
+        super(n, true);
+    }
+
+    @Override
+    protected OperatorPlan buildPattern() {
+        LogicalPlan plan = new LogicalPlan();
+        LogicalRelationalOperator foreach = new LOForEach(plan);
+        plan.add( foreach );
+        return plan;
+    }
+
+    @Override
+    public Transformer getNewTransformer() {
+        return new DuplicateForEachColumnRewriteTransformer();
+    }
+
+    class DuplicateForEachColumnRewriteTransformer extends Transformer {
+        private List<LogicalExpressionPlan> expPlansToInsertIdentity = new ArrayList<LogicalExpressionPlan>();
+        
+        @Override
+        public boolean check(OperatorPlan matched) throws FrontendException {
+            
+            LOForEach foreach = (LOForEach)matched.getSources().get(0);
+            LOGenerate gen = (LOGenerate)foreach.getInnerPlan().getSinks().get(0);
+            
+            List<LogicalExpressionPlan> expPlans = gen.getOutputPlans();
+            
+            List<Long> uidSeen = new ArrayList<Long>();
+            
+            for (LogicalExpressionPlan expPlan : expPlans) {
+                LogicalExpression exp = (LogicalExpression)expPlan.getSources().get(0);
+                if (exp.getFieldSchema()!=null) {
+                    long uid = exp.getFieldSchema().uid;
+                    if (uidSeen.contains(uid)) {
+                        expPlansToInsertIdentity.add(expPlan);
+                    }
+                    else {
+                        uidSeen.add(uid);
+                    }
+                }
+            }
+            
+            if (expPlansToInsertIdentity.isEmpty())
+                return false;
+            
+            return true;
+        } 
+        
+        @Override
+        public OperatorPlan reportChanges() {
+            return currentPlan;
+        }
+
+        @Override
+        public void transform(OperatorPlan matched) throws FrontendException {
+            for (LogicalExpressionPlan expPlan : expPlansToInsertIdentity) {
+                LogicalExpression oldRoot = (LogicalExpression)expPlan.getSources().get(0);
+                UserFuncExpression userFuncExpression = new UserFuncExpression(expPlan, new FuncSpec(IdentityColumn.class.getName()));
+                expPlan.connect(userFuncExpression, oldRoot);
+            }
+            expPlansToInsertIdentity.clear();
+        }
+    }
+}

Modified: pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java?rev=1037399&r1=1037398&r2=1037399&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java Sun Nov 21 08:53:57 2010
@@ -911,4 +911,41 @@ public class TestEvalPipeline2 extends T
         }
         fail();
     }
+    
+    // See PIG-1732
+    @Test
+    public void testForEachDupColumn() throws Exception{
+        String[] input1 = {
+                "1\t2",
+        };
+        
+        String[] input2 = {
+                "1\t1\t3",
+                "2\t4\t2"
+        };
+        
+        Util.createInputFile(cluster, "table_testForEachDupColumn1", input1);
+        Util.createInputFile(cluster, "table_testForEachDupColumn2", input2);
+        pigServer.registerQuery("a = load 'table_testForEachDupColumn1' as (a0, a1:int);");
+        pigServer.registerQuery("b = load 'table_testForEachDupColumn2' as (b0, b1:int, b2);");
+        pigServer.registerQuery("c = foreach a generate a0, a1, a1 as a2;");
+        pigServer.registerQuery("d = union b, c;");
+        pigServer.registerQuery("e = foreach d generate $1;");
+        
+        Iterator<Tuple> iter = pigServer.openIterator("e");
+        
+        Tuple t = iter.next();
+        assertTrue(t.size()==1);
+        assertTrue((Integer)t.get(0)==1);
+        
+        t = iter.next();
+        assertTrue(t.size()==1);
+        assertTrue((Integer)t.get(0)==4);
+        
+        t = iter.next();
+        assertTrue(t.size()==1);
+        assertTrue((Integer)t.get(0)==2);
+        
+        assertFalse(iter.hasNext());
+    }
 }