You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2010/11/21 09:52:34 UTC
svn commit: r1037398 - in /pig/branches/branch-0.8: ./
src/org/apache/pig/impl/builtin/
src/org/apache/pig/newplan/logical/optimizer/
src/org/apache/pig/newplan/logical/rules/ test/org/apache/pig/test/
Author: daijy
Date: Sun Nov 21 08:52:34 2010
New Revision: 1037398
URL: http://svn.apache.org/viewvc?rev=1037398&view=rev
Log:
PIG-1732: New logical plan: logical plan get confused if we generate the same field twice in ForEach
Added:
pig/branches/branch-0.8/src/org/apache/pig/impl/builtin/IdentityColumn.java
pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/DuplicateForEachColumnRewrite.java
Modified:
pig/branches/branch-0.8/CHANGES.txt
pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java
pig/branches/branch-0.8/test/org/apache/pig/test/TestEvalPipeline2.java
Modified: pig/branches/branch-0.8/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/CHANGES.txt?rev=1037398&r1=1037397&r2=1037398&view=diff
==============================================================================
--- pig/branches/branch-0.8/CHANGES.txt (original)
+++ pig/branches/branch-0.8/CHANGES.txt Sun Nov 21 08:52:34 2010
@@ -207,6 +207,9 @@ PIG-1309: Map-side Cogroup (ashutoshc)
BUG FIXES
+PIG-1732: New logical plan: logical plan get confused if we generate the same
+field twice in ForEach (daijy)
+
PIG-1737: New logical plan: Improve error messages when merge schema fail (daijy)
PIG-1725: New logical plan: uidOnlySchema bug in LOGenerate (daijy)
Added: pig/branches/branch-0.8/src/org/apache/pig/impl/builtin/IdentityColumn.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/src/org/apache/pig/impl/builtin/IdentityColumn.java?rev=1037398&view=auto
==============================================================================
--- pig/branches/branch-0.8/src/org/apache/pig/impl/builtin/IdentityColumn.java (added)
+++ pig/branches/branch-0.8/src/org/apache/pig/impl/builtin/IdentityColumn.java Sun Nov 21 08:52:34 2010
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.builtin;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+
+/**
+ * Returns the input, used by DuplicateForEachColumnRewrite
+ * to rewrite duplicate columns in ForEach. Not intended for
+ * external use
+ */
+public class IdentityColumn extends EvalFunc<Object> {
+
+ @Override
+ public Object exec(Tuple input) throws ExecException {
+ return input.get(0);
+ }
+
+ @Override
+ public Schema outputSchema(Schema input) {
+ return input;
+ }
+
+}
Modified: pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java?rev=1037398&r1=1037397&r2=1037398&view=diff
==============================================================================
--- pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java (original)
+++ pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java Sun Nov 21 08:52:34 2010
@@ -25,6 +25,7 @@ import java.util.Set;
import org.apache.pig.newplan.OperatorPlan;
import org.apache.pig.newplan.logical.rules.AddForEach;
import org.apache.pig.newplan.logical.rules.ColumnMapKeyPrune;
+import org.apache.pig.newplan.logical.rules.DuplicateForEachColumnRewrite;
import org.apache.pig.newplan.logical.rules.FilterAboveForeach;
import org.apache.pig.newplan.logical.rules.GroupByConstParallelSetter;
import org.apache.pig.newplan.logical.rules.ImplicitSplitInserter;
@@ -55,10 +56,20 @@ public class LogicalPlanOptimizer extend
protected List<Set<Rule>> buildRuleSets() {
List<Set<Rule>> ls = new ArrayList<Set<Rule>>();
+
+ // DuplicateForEachColumnRewrite set
+ // This insert Identity UDF in the case foreach duplicate field.
+ // This is because we need unique uid through out the plan
+ Set<Rule> s = new HashSet<Rule>();
+ Rule r = new DuplicateForEachColumnRewrite("DuplicateForEachColumnRewrite");
+ checkAndAddRule(s, r);
+ if (!s.isEmpty())
+ ls.add(s);
+
// ImplicitSplitInserter set
// This set of rules Insert Foreach dedicated for casting after load
- Set<Rule> s = new HashSet<Rule>();
- Rule r = new ImplicitSplitInserter("ImplicitSplitInserter");
+ s = new HashSet<Rule>();
+ r = new ImplicitSplitInserter("ImplicitSplitInserter");
checkAndAddRule(s, r);
if (!s.isEmpty())
ls.add(s);
Added: pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/DuplicateForEachColumnRewrite.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/DuplicateForEachColumnRewrite.java?rev=1037398&view=auto
==============================================================================
--- pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/DuplicateForEachColumnRewrite.java (added)
+++ pig/branches/branch-0.8/src/org/apache/pig/newplan/logical/rules/DuplicateForEachColumnRewrite.java Sun Nov 21 08:52:34 2010
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan.logical.rules;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.pig.FuncSpec;
+import org.apache.pig.impl.builtin.IdentityColumn;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.util.Pair;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.OperatorSubPlan;
+import org.apache.pig.newplan.logical.expression.LogicalExpression;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.logical.expression.ProjectExpression;
+import org.apache.pig.newplan.logical.expression.UserFuncExpression;
+import org.apache.pig.newplan.logical.relational.LOCross;
+import org.apache.pig.newplan.logical.relational.LOForEach;
+import org.apache.pig.newplan.logical.relational.LOGenerate;
+import org.apache.pig.newplan.logical.relational.LOInnerLoad;
+import org.apache.pig.newplan.logical.relational.LOJoin;
+import org.apache.pig.newplan.logical.relational.LOSort;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.newplan.logical.relational.LogicalSchema;
+import org.apache.pig.newplan.logical.rules.PushDownForEachFlatten.PushDownForEachFlattenTransformer;
+import org.apache.pig.newplan.optimizer.Rule;
+import org.apache.pig.newplan.optimizer.Transformer;
+
+/*
+ * This rule rewrite duplicate column projection into Identity UDF.
+ * So that we can generate different uid for each column
+ */
+public class DuplicateForEachColumnRewrite extends Rule {
+
+ public DuplicateForEachColumnRewrite(String n) {
+ super(n, true);
+ }
+
+ @Override
+ protected OperatorPlan buildPattern() {
+ LogicalPlan plan = new LogicalPlan();
+ LogicalRelationalOperator foreach = new LOForEach(plan);
+ plan.add( foreach );
+ return plan;
+ }
+
+ @Override
+ public Transformer getNewTransformer() {
+ return new DuplicateForEachColumnRewriteTransformer();
+ }
+
+ class DuplicateForEachColumnRewriteTransformer extends Transformer {
+ private List<LogicalExpressionPlan> expPlansToInsertIdentity = new ArrayList<LogicalExpressionPlan>();
+
+ @Override
+ public boolean check(OperatorPlan matched) throws FrontendException {
+
+ LOForEach foreach = (LOForEach)matched.getSources().get(0);
+ LOGenerate gen = (LOGenerate)foreach.getInnerPlan().getSinks().get(0);
+
+ List<LogicalExpressionPlan> expPlans = gen.getOutputPlans();
+
+ List<Long> uidSeen = new ArrayList<Long>();
+
+ for (LogicalExpressionPlan expPlan : expPlans) {
+ LogicalExpression exp = (LogicalExpression)expPlan.getSources().get(0);
+ if (exp.getFieldSchema()!=null) {
+ long uid = exp.getFieldSchema().uid;
+ if (uidSeen.contains(uid)) {
+ expPlansToInsertIdentity.add(expPlan);
+ }
+ else {
+ uidSeen.add(uid);
+ }
+ }
+ }
+
+ if (expPlansToInsertIdentity.isEmpty())
+ return false;
+
+ return true;
+ }
+
+ @Override
+ public OperatorPlan reportChanges() {
+ return currentPlan;
+ }
+
+ @Override
+ public void transform(OperatorPlan matched) throws FrontendException {
+ for (LogicalExpressionPlan expPlan : expPlansToInsertIdentity) {
+ LogicalExpression oldRoot = (LogicalExpression)expPlan.getSources().get(0);
+ UserFuncExpression userFuncExpression = new UserFuncExpression(expPlan, new FuncSpec(IdentityColumn.class.getName()));
+ expPlan.connect(userFuncExpression, oldRoot);
+ }
+ expPlansToInsertIdentity.clear();
+ }
+ }
+}
Modified: pig/branches/branch-0.8/test/org/apache/pig/test/TestEvalPipeline2.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.8/test/org/apache/pig/test/TestEvalPipeline2.java?rev=1037398&r1=1037397&r2=1037398&view=diff
==============================================================================
--- pig/branches/branch-0.8/test/org/apache/pig/test/TestEvalPipeline2.java (original)
+++ pig/branches/branch-0.8/test/org/apache/pig/test/TestEvalPipeline2.java Sun Nov 21 08:52:34 2010
@@ -911,4 +911,41 @@ public class TestEvalPipeline2 extends T
}
fail();
}
+
+ // See PIG-1732
+ @Test
+ public void testForEachDupColumn() throws Exception{
+ String[] input1 = {
+ "1\t2",
+ };
+
+ String[] input2 = {
+ "1\t1\t3",
+ "2\t4\t2"
+ };
+
+ Util.createInputFile(cluster, "table_testForEachDupColumn1", input1);
+ Util.createInputFile(cluster, "table_testForEachDupColumn2", input2);
+ pigServer.registerQuery("a = load 'table_testForEachDupColumn1' as (a0, a1:int);");
+ pigServer.registerQuery("b = load 'table_testForEachDupColumn2' as (b0, b1:int, b2);");
+ pigServer.registerQuery("c = foreach a generate a0, a1, a1 as a2;");
+ pigServer.registerQuery("d = union b, c;");
+ pigServer.registerQuery("e = foreach d generate $1;");
+
+ Iterator<Tuple> iter = pigServer.openIterator("e");
+
+ Tuple t = iter.next();
+ assertTrue(t.size()==1);
+ assertTrue((Integer)t.get(0)==1);
+
+ t = iter.next();
+ assertTrue(t.size()==1);
+ assertTrue((Integer)t.get(0)==4);
+
+ t = iter.next();
+ assertTrue(t.size()==1);
+ assertTrue((Integer)t.get(0)==2);
+
+ assertFalse(iter.hasNext());
+ }
}