You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2010/08/06 19:24:52 UTC
svn commit: r983062 - in /hadoop/pig/trunk:
src/org/apache/pig/newplan/logical/rules/ImplicitSplitInserter.java
test/org/apache/pig/test/TestNewPlanImplicitSplit.java
Author: daijy
Date: Fri Aug 6 17:24:52 2010
New Revision: 983062
URL: http://svn.apache.org/viewvc?rev=983062&view=rev
Log:
PIG-1496: Mandatory rule ImplicitSplitInserter
Added:
hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/ImplicitSplitInserter.java
hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanImplicitSplit.java
Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/ImplicitSplitInserter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/ImplicitSplitInserter.java?rev=983062&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/ImplicitSplitInserter.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/ImplicitSplitInserter.java Fri Aug 6 17:24:52 2010
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan.logical.rules;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Iterator;
+
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.util.Pair;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.logical.expression.ConstantExpression;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.newplan.optimizer.Rule;
+import org.apache.pig.newplan.optimizer.Transformer;
+import org.apache.pig.newplan.logical.relational.LOSplit;
+import org.apache.pig.newplan.logical.relational.LOStore;
+import org.apache.pig.newplan.logical.relational.LOSplitOutput;
+import org.apache.pig.newplan.logical.relational.LogicalSchema;
+
+
+/**
+ * Super class for all rules that operates on the whole plan. It doesn't look for
+ * a specific pattern. An example of such kind rule is ColumnPrune.
+ *
+ */
+public class ImplicitSplitInserter extends Rule {
+
+ public ImplicitSplitInserter(String n) {
+ super(n, true);
+ }
+
+ @Override
+ public List<OperatorPlan> match(OperatorPlan plan) throws FrontendException {
+ // Look to see if this is a non-split node with two outputs. If so
+ // it matches.
+ currentPlan = plan;
+ List<OperatorPlan> ll = new ArrayList<OperatorPlan>();
+ Iterator<Operator> ops = plan.getOperators();
+ while (ops.hasNext()) {
+ Operator op = ops.next();
+ if (op instanceof LOSplit || op instanceof LOStore)
+ continue;
+ List<Operator> succs = plan.getSuccessors(op);
+ if (succs != null && succs.size() >= 2) {
+ OperatorPlan match = new LogicalPlan();
+ match.add(op);
+ ll.add(match);
+ }
+ }
+ return ll;
+ }
+
+ @Override
+ public Transformer getNewTransformer() {
+ return new ImplicitSplitInserterTransformer();
+ }
+
+ public class ImplicitSplitInserterTransformer extends Transformer {
+ @Override
+ public boolean check(OperatorPlan matched) throws FrontendException {
+ return true;
+ }
+
+ @Override
+ public void transform(OperatorPlan matched) throws FrontendException {
+ if (matched == null || matched instanceof LOSplit || matched instanceof LOStore
+ || matched.size() != 1)
+ throw new FrontendException("Invalid match in ImplicitSplitInserter rule.", 2244);
+
+ // For two successors of op here is a pictorial
+ // representation of the change required:
+ // BEFORE:
+ // Succ1 Succ2
+ // \ /
+ // op
+
+ // SHOULD BECOME:
+
+ // AFTER:
+ // Succ1 Succ2
+ // | |
+ // SplitOutput SplitOutput
+ // \ /
+ // Split
+ // |
+ // op
+
+ Operator op = matched.getSources().get(0);
+ List<Operator> succs = currentPlan.getSuccessors(op);
+ if (succs == null || succs.size() < 2)
+ throw new FrontendException("Invalid match in ImplicitSplitInserter rule.", 2243);
+ LOSplit splitOp = new LOSplit(currentPlan);
+ splitOp.setAlias(((LogicalRelationalOperator) op).getAlias());
+ Operator[] sucs = succs.toArray(new Operator[0]);
+ currentPlan.add(splitOp);
+ currentPlan.connect(op, splitOp);
+ for (Operator suc : sucs) {
+ // position is remembered in order to maintain the order of the successors
+ Pair<Integer, Integer> pos = currentPlan.disconnect(op, suc);
+ LogicalExpressionPlan filterPlan = new LogicalExpressionPlan();
+ LogicalSchema.LogicalFieldSchema fs = new LogicalSchema.LogicalFieldSchema(null, null, DataType.BOOLEAN);
+ new ConstantExpression(filterPlan, Boolean.valueOf(true), fs);
+ LOSplitOutput splitOutput = new LOSplitOutput((LogicalPlan) currentPlan, filterPlan);
+ splitOutput.setAlias(splitOp.getAlias());
+ currentPlan.add(splitOutput);
+ currentPlan.connect(splitOp, splitOutput);
+ currentPlan.connect(splitOutput, pos.first, suc, pos.second);
+ }
+ }
+
+ @Override
+ public OperatorPlan reportChanges() {
+ return currentPlan;
+ }
+ }
+
+ @Override
+ protected OperatorPlan buildPattern() {
+ return null;
+ }
+}
Added: hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanImplicitSplit.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanImplicitSplit.java?rev=983062&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanImplicitSplit.java (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanImplicitSplit.java Fri Aug 6 17:24:52 2010
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+
+import static org.apache.pig.test.utils.TypeCheckingTestUtil.printMessageCollector;
+import static org.apache.pig.test.utils.TypeCheckingTestUtil.printTypeGraph;
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
+import java.util.HashMap;
+import java.util.Iterator;
+
+import junit.framework.TestCase;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.impl.logicalLayer.PlanSetter;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.validators.TypeCheckingValidator;
+import org.apache.pig.impl.plan.CompilationMessageCollector;
+import org.apache.pig.test.utils.LogicalPlanTester;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestNewPlanImplicitSplit {
+ private PigServer pigServer;
+ static MiniCluster cluster = MiniCluster.buildCluster();
+
+ @Before
+ public void setUp() throws Exception {
+ pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ pigServer.getPigContext().getProperties().setProperty("pig.usenewlogicalplan", "true");
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ }
+
+ @AfterClass
+ public static void oneTimeTearDown() throws Exception {
+ cluster.shutDown();
+ }
+
+ @Test
+ public void testImplicitSplit() throws Exception{
+ int LOOP_SIZE = 20;
+ String[] input = new String[LOOP_SIZE];
+ for(int i = 1; i <= LOOP_SIZE; i++) {
+ input[i-1] = Integer.toString(i);
+ }
+ String inputFileName = "testImplicitSplit-input.txt";
+ Util.createInputFile(cluster, inputFileName, input);
+ pigServer.registerQuery("A = LOAD '" + inputFileName + "';");
+ pigServer.registerQuery("B = filter A by $0<=10;");
+ pigServer.registerQuery("C = filter A by $0>10;");
+ pigServer.registerQuery("D = union B,C;");
+ Iterator<Tuple> iter = pigServer.openIterator("D");
+ if(!iter.hasNext()) fail("No Output received");
+ int cnt = 0;
+ while(iter.hasNext()){
+ Tuple t = iter.next();
+ ++cnt;
+ }
+ assertEquals(20, cnt);
+ Util.deleteFile(cluster, inputFileName);
+ }
+
+ @Test
+ public void testImplicitSplitInCoGroup() throws Exception {
+ // this query is similar to the one reported in JIRA - PIG-537
+ // Create input file
+ String input1 = "testImplicitSplitInCoGroup-input1.txt";
+ String input2 = "testImplicitSplitInCoGroup-input2.txt";
+ Util.createInputFile(cluster, input1,
+ new String[] {"a:1", "b:2", "b:20", "c:3", "c:30"});
+ Util.createInputFile(cluster, input2,
+ new String[] {"a:first", "b:second", "c:third"});
+ pigServer.registerQuery("a = load '" + input1 +
+ "' using PigStorage(':') as (name:chararray, marks:int);");
+ pigServer.registerQuery("b = load '" + input2 +
+ "' using PigStorage(':') as (name:chararray, rank:chararray);");
+ pigServer.registerQuery("c = cogroup a by name, b by name;");
+ pigServer.registerQuery("d = foreach c generate group, FLATTEN(a.marks) as newmarks;");
+ pigServer.registerQuery("e = cogroup a by marks, d by newmarks;");
+ pigServer.registerQuery("f = foreach e generate group, flatten(a), flatten(d);");
+ HashMap<Integer, Object[]> results = new HashMap<Integer, Object[]>();
+ results.put(1, new Object[] { "a", 1, "a", 1 });
+ results.put(2, new Object[] { "b", 2, "b", 2 });
+ results.put(3, new Object[] { "c", 3, "c", 3 });
+ results.put(20, new Object[] { "b", 20, "b", 20 });
+ results.put(30, new Object[] { "c", 30, "c", 30 });
+ pigServer.explain("f", System.out);
+
+ Iterator<Tuple> it = pigServer.openIterator("f");
+ while(it.hasNext()) {
+ Tuple t = it.next();
+ System.err.println("Tuple:" + t);
+ Integer group = (Integer)t.get(0);
+ Object[] groupValues = results.get(group);
+ for(int i = 0; i < 4; i++) {
+ assertEquals(groupValues[i], t.get(i+1));
+ }
+ }
+ Util.deleteFile(cluster, input1);
+ Util.deleteFile(cluster, input2);
+ }
+
+ @Test
+ public void testImplicitSplitInCoGroup2() throws Exception {
+ // this query is similar to the one reported in JIRA - PIG-537
+ LogicalPlanTester planTester = new LogicalPlanTester();
+ planTester.buildPlan("a = load 'file1' using PigStorage(':') as (name:chararray, marks:int);");
+ planTester.buildPlan("b = load 'file2' using PigStorage(':') as (name:chararray, rank:chararray);");
+ planTester.buildPlan("c = cogroup a by name, b by name;");
+ planTester.buildPlan("d = foreach c generate group, FLATTEN(a.marks) as newmarks;");
+ planTester.buildPlan("e = cogroup a by marks, d by newmarks;");
+ LogicalPlan plan = planTester.buildPlan("f = foreach e generate group, flatten(a), flatten(d);");
+
+ // Set the logical plan values correctly in all the operators
+ PlanSetter ps = new PlanSetter(plan);
+ ps.visit();
+
+ // run through validator
+ CompilationMessageCollector collector = new CompilationMessageCollector() ;
+ TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+ typeValidator.validate(plan, collector) ;
+ printMessageCollector(collector) ;
+ printTypeGraph(plan) ;
+
+ if (collector.hasError()) {
+ throw new Exception("Error during type checking") ;
+ }
+
+ // this will run ImplicitSplitInserter
+ TestLogicalOptimizer.optimizePlan(plan);
+
+ // get Schema of leaf and compare:
+ Schema expectedSchema = Util.getSchemaFromString("grp: int,A::username: chararray,A::marks: int,AB::group: chararray,AB::newmarks: int");
+ assertTrue(Schema.equals(expectedSchema, plan.getLeaves().get(0).getSchema(),false, true));
+ }
+}