You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2010/02/18 23:20:09 UTC
svn commit: r911616 [6/7] - in /hadoop/pig/branches/load-store-redesign: ./
contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/
contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/util/
src/org/apach...
Added: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestExperimentalLogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestExperimentalLogToPhyTranslationVisitor.java?rev=911616&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestExperimentalLogToPhyTranslationVisitor.java (added)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestExperimentalLogToPhyTranslationVisitor.java Thu Feb 18 22:20:07 2010
@@ -0,0 +1,537 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.EqualToExpr;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.GreaterThanExpr;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.LessThanExpr;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POCast;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
+import org.apache.pig.data.DataType;
+import org.apache.pig.experimental.logical.LogicalPlanMigrationVistor;
+import org.apache.pig.experimental.logical.expression.LogicalExpression;
+import org.apache.pig.experimental.logical.optimizer.UidStamper;
+import org.apache.pig.experimental.logical.relational.LogToPhyTranslationVisitor;
+import org.apache.pig.experimental.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.experimental.logical.relational.LogicalSchema;
+import org.apache.pig.experimental.logical.relational.LogicalSchema.LogicalFieldSchema;
+import org.apache.pig.experimental.plan.Operator;
+import org.apache.pig.experimental.plan.OperatorPlan;
+import org.apache.pig.experimental.plan.PlanVisitor;
+import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.test.utils.LogicalPlanTester;
+
+import junit.framework.TestCase;
+
+public class TestExperimentalLogToPhyTranslationVisitor extends TestCase {
+
+ private PhysicalPlan translatePlan(OperatorPlan plan) throws IOException {
+ LogToPhyTranslationVisitor visitor = new LogToPhyTranslationVisitor(plan);
+ visitor.visit();
+ return visitor.getPhysicalPlan();
+ }
+
+ private org.apache.pig.experimental.logical.relational.LogicalPlan migratePlan(LogicalPlan lp) throws VisitorException{
+ LogicalPlanMigrationVistor visitor = new LogicalPlanMigrationVistor(lp);
+ visitor.visit();
+ org.apache.pig.experimental.logical.relational.LogicalPlan newPlan = visitor.getNewLogicalPlan();
+
+ try {
+ UidStamper stamper = new UidStamper(newPlan);
+ stamper.visit();
+
+ return newPlan;
+ }catch(Exception e) {
+ throw new VisitorException(e);
+ }
+ }
+
+ protected void setUp() throws Exception {
+ LogicalExpression.resetNextUid();
+ }
+
+ public void testSimplePlan() throws Exception {
+ LogicalPlanTester lpt = new LogicalPlanTester();
+ lpt.buildPlan("a = load 'd.txt';");
+ lpt.buildPlan("b = filter a by $0==NULL;");
+ LogicalPlan plan = lpt.buildPlan("store b into 'empty';");
+
+ org.apache.pig.experimental.logical.relational.LogicalPlan newLogicalPlan = migratePlan(plan);
+ PhysicalPlan phyPlan = translatePlan(newLogicalPlan);
+
+ assertEquals( 3, phyPlan.size() );
+ assertEquals( 1, phyPlan.getRoots().size() );
+ assertEquals( 1, phyPlan.getLeaves().size() );
+
+ PhysicalOperator load = phyPlan.getRoots().get(0);
+ assertEquals( POLoad.class, load.getClass() );
+ assertTrue( ((POLoad)load).getLFile().getFileName().contains("d.txt") );
+
+ // Check for Filter
+ PhysicalOperator fil = phyPlan.getSuccessors(load).get(0);
+ assertEquals( POFilter.class, fil.getClass() );
+ PhysicalPlan filPlan = ((POFilter)fil).getPlan();
+ assertEquals( 2, filPlan.getRoots().size() );
+ assertEquals( 1, filPlan.getLeaves().size() );
+
+ PhysicalOperator eq = filPlan.getLeaves().get(0);
+ assertEquals( EqualToExpr.class, eq.getClass() );
+
+ PhysicalOperator prj1 = filPlan.getRoots().get(0);
+ assertEquals( POProject.class, prj1.getClass() );
+ assertEquals( 0, ((POProject)prj1).getColumn() );
+ PhysicalOperator constExp = filPlan.getRoots().get(1);
+ assertEquals( ConstantExpression.class, constExp.getClass() );
+ assertEquals( null, ((ConstantExpression)constExp).getValue() );
+
+ // Check for Store
+ PhysicalOperator stor = phyPlan.getSuccessors(fil).get(0);
+ assertEquals( POStore.class, stor.getClass() );
+ assertTrue( ((POStore)stor).getSFile().getFileName().contains("empty"));
+ }
+
+ public void testJoinPlan() throws Exception {
+ LogicalPlanTester lpt = new LogicalPlanTester();
+ lpt.buildPlan("a = load 'd1.txt' as (id, c);");
+ lpt.buildPlan("b = load 'd2.txt'as (id, c);");
+ lpt.buildPlan("c = join a by id, b by c;");
+ lpt.buildPlan("d = filter c by a::id==NULL AND b::c==NULL;");
+ LogicalPlan plan = lpt.buildPlan("store d into 'empty';");
+
+ // check basics
+ org.apache.pig.experimental.logical.relational.LogicalPlan newPlan = migratePlan(plan);
+ PhysicalPlan physicalPlan = translatePlan(newPlan);
+ assertEquals(9, physicalPlan.size());
+ assertEquals(physicalPlan.getRoots().size(), 2);
+
+ // Check Load and LocalRearrange and GlobalRearrange
+ PhysicalOperator LoR = (PhysicalOperator)physicalPlan.getSuccessors(physicalPlan.getRoots().get(0)).get(0);
+ assertEquals( POLocalRearrange.class, LoR.getClass() );
+ POLocalRearrange Lor = (POLocalRearrange) LoR;
+ PhysicalOperator prj3 = Lor.getPlans().get(0).getLeaves().get(0);
+ assertEquals( POProject.class, prj3.getClass() );
+ assertEquals(0, ((POProject)prj3).getColumn() );
+ PhysicalOperator inp1 = Lor.getInputs().get(0);
+ assertEquals( POLoad.class, inp1.getClass() );
+ assertTrue( ((POLoad)inp1).getLFile().getFileName().contains("d1.txt") );
+
+ PhysicalOperator LoR1 = (PhysicalOperator)physicalPlan.getSuccessors(physicalPlan.getRoots().get(1)).get(0);
+ assertEquals( POLocalRearrange.class, LoR1.getClass() );
+ POLocalRearrange Lor1 = (POLocalRearrange) LoR1;
+ PhysicalOperator prj4 = Lor1.getPlans().get(0).getLeaves().get(0);
+ assertEquals( POProject.class, prj4.getClass() );
+ assertEquals(1, ((POProject)prj4).getColumn() );
+ PhysicalOperator inp2 = Lor1.getInputs().get(0);
+ assertEquals( POLoad.class, inp2.getClass() );
+ assertTrue( ((POLoad)inp2).getLFile().getFileName().contains("d2.txt") );
+
+ PhysicalOperator GoR = (PhysicalOperator)physicalPlan.getSuccessors(LoR).get(0);
+ assertEquals( POGlobalRearrange.class, GoR.getClass() );
+
+ PhysicalOperator Pack = (PhysicalOperator)physicalPlan.getSuccessors(GoR).get(0);
+ assertEquals( POPackage.class, Pack.getClass() );
+
+ // Check for ForEach
+ PhysicalOperator ForE = (PhysicalOperator)physicalPlan.getSuccessors(Pack).get(0);
+ assertEquals( POForEach.class, ForE.getClass() );
+ PhysicalOperator prj5 = ((POForEach)ForE).getInputPlans().get(0).getLeaves().get(0);
+ assertEquals( POProject.class, prj5.getClass() );
+ assertEquals( 1, ((POProject)prj5).getColumn() );
+ PhysicalOperator prj6 = ((POForEach)ForE).getInputPlans().get(1).getLeaves().get(0);
+ assertEquals( POProject.class, prj6.getClass() );
+ assertEquals( 2, ((POProject)prj6).getColumn() );
+
+ // Filter Operator
+ PhysicalOperator fil = (PhysicalOperator)physicalPlan.getSuccessors(ForE).get(0);
+ assertEquals( POFilter.class, fil.getClass() );
+
+ PhysicalPlan filPlan = ((POFilter)fil).getPlan();
+ List<PhysicalOperator> filRoots = filPlan.getRoots();
+
+ assertEquals( ConstantExpression.class, filRoots.get(1).getClass() );
+ ConstantExpression ce1 = (ConstantExpression) filRoots.get(1);
+ assertEquals( null, ce1.getValue() );
+ assertEquals( ConstantExpression.class, filRoots.get(3).getClass() );
+ ConstantExpression ce2 = (ConstantExpression) filRoots.get(3);
+ assertEquals( null, ce2.getValue() );
+ assertEquals( POProject.class, filRoots.get(0).getClass() );
+ POProject prj1 = (POProject) filRoots.get(0);
+ assertEquals( 3, prj1.getColumn() );
+ assertEquals( POProject.class, filRoots.get(2).getClass() );
+ POProject prj2 = (POProject) filRoots.get(2);
+ assertEquals( 0, prj2.getColumn() );
+
+
+ // Check Store Operator
+ PhysicalOperator stor = (PhysicalOperator)physicalPlan.getSuccessors(fil).get(0);
+ assertEquals( POStore.class, stor.getClass() );
+ assertTrue( ((POStore)stor).getSFile().getFileName().contains("empty") );
+ }
+
+ public void testMultiStore() throws Exception {
+ LogicalPlanTester lpt = new LogicalPlanTester();
+ lpt.buildPlan("a = load 'd1.txt' as (id, c);");
+ lpt.buildPlan("b = load 'd2.txt'as (id, c);");
+ lpt.buildPlan("c = load 'd3.txt' as (id, c);");
+ lpt.buildPlan("d = join a by id, b by c;");
+ lpt.buildPlan("e = filter d by a::id==NULL AND b::c==NULL;");
+ lpt.buildPlan("f = join e by b::c, c by id;");
+ lpt.buildPlan("g = filter f by b::id==NULL AND c::c==NULL;");
+ LogicalPlan plan = lpt.buildPlan("store g into 'empty2';");
+
+ // check basics
+ org.apache.pig.experimental.logical.relational.LogicalPlan newLogicalPlan = migratePlan(plan);
+ PhysicalPlan phyPlan = translatePlan(newLogicalPlan);
+ assertEquals(16, phyPlan.size());
+ assertEquals(phyPlan.getRoots().size(), 3);
+ assertEquals(phyPlan.getLeaves().size(), 1 );
+
+ // Check Load and LocalRearrange and GlobalRearrange
+ PhysicalOperator LoR = (PhysicalOperator)phyPlan.getSuccessors(phyPlan.getRoots().get(0)).get(0);
+ assertEquals( POLocalRearrange.class, LoR.getClass() );
+ POLocalRearrange Lor = (POLocalRearrange) LoR;
+ PhysicalOperator prj1 = Lor.getPlans().get(0).getLeaves().get(0);
+ assertEquals( POProject.class, prj1.getClass() );
+ assertEquals(0, ((POProject)prj1).getColumn() );
+ PhysicalOperator inp1 = Lor.getInputs().get(0);
+ assertEquals( POLoad.class, inp1.getClass() );
+ assertTrue( ((POLoad)inp1).getLFile().getFileName().contains("d3.txt") );
+
+ PhysicalOperator LoR1 = (PhysicalOperator)phyPlan.getSuccessors(phyPlan.getRoots().get(1)).get(0);
+ assertEquals( POLocalRearrange.class, LoR1.getClass() );
+ POLocalRearrange Lor1 = (POLocalRearrange) LoR1;
+ PhysicalOperator prj2 = Lor1.getPlans().get(0).getLeaves().get(0);
+ assertEquals( POProject.class, prj2.getClass() );
+ assertEquals(1, ((POProject)prj2).getColumn() );
+ PhysicalOperator inp2 = Lor1.getInputs().get(0);
+ assertEquals( POLoad.class, inp2.getClass() );
+ assertTrue( ((POLoad)inp2).getLFile().getFileName().contains("d2.txt") );
+
+ PhysicalOperator GoR = (PhysicalOperator)phyPlan.getSuccessors(LoR).get(0);
+ assertEquals( POGlobalRearrange.class, GoR.getClass() );
+
+ PhysicalOperator Pack = (PhysicalOperator)phyPlan.getSuccessors(GoR).get(0);
+ assertEquals( POPackage.class, Pack.getClass() );
+
+ PhysicalOperator LoR2 = (PhysicalOperator)phyPlan.getSuccessors(phyPlan.getRoots().get(2)).get(0);
+ assertEquals( POLocalRearrange.class, LoR2.getClass() );
+ POLocalRearrange Lor2 = (POLocalRearrange) LoR2;
+ PhysicalOperator prj3 = Lor2.getPlans().get(0).getLeaves().get(0);
+ assertEquals( POProject.class, prj3.getClass() );
+ assertEquals(0, ((POProject)prj3).getColumn() );
+ PhysicalOperator inp3 = Lor2.getInputs().get(0);
+ assertEquals( POLoad.class, inp3.getClass() );
+ assertTrue( ((POLoad)inp3).getLFile().getFileName().contains("d1.txt") );
+
+ PhysicalOperator GoR2 = (PhysicalOperator)phyPlan.getSuccessors(LoR2).get(0);
+ assertEquals( POGlobalRearrange.class, GoR2.getClass() );
+
+ PhysicalOperator Pack2 = (PhysicalOperator)phyPlan.getSuccessors(GoR2).get(0);
+ assertEquals( POPackage.class, Pack2.getClass() );
+
+ // Check for ForEach
+ PhysicalOperator ForE = (PhysicalOperator)phyPlan.getSuccessors(Pack).get(0);
+ assertEquals( POForEach.class, ForE.getClass() );
+ PhysicalOperator prj4 = ((POForEach)ForE).getInputPlans().get(0).getLeaves().get(0);
+ assertEquals( POProject.class, prj4.getClass() );
+ assertEquals( 1, ((POProject)prj4).getColumn() );
+ PhysicalOperator prj5 = ((POForEach)ForE).getInputPlans().get(1).getLeaves().get(0);
+ assertEquals( POProject.class, prj5.getClass() );
+ assertEquals( 2, ((POProject)prj5).getColumn() );
+
+ PhysicalOperator ForE2 = (PhysicalOperator)phyPlan.getSuccessors(Pack2).get(0);
+ assertEquals( POForEach.class, ForE2.getClass() );
+ PhysicalOperator prj6 = ((POForEach)ForE2).getInputPlans().get(0).getLeaves().get(0);
+ assertEquals( POProject.class, prj6.getClass() );
+ assertEquals( 1, ((POProject)prj6).getColumn() );
+ PhysicalOperator prj7 = ((POForEach)ForE2).getInputPlans().get(1).getLeaves().get(0);
+ assertEquals( POProject.class, prj7.getClass() );
+ assertEquals( 2, ((POProject)prj7).getColumn() );
+
+ // Check Filter Operator
+ PhysicalOperator fil = (PhysicalOperator)phyPlan.getSuccessors(ForE).get(0);
+ assertEquals( POFilter.class, fil.getClass() );
+
+ PhysicalPlan filPlan = ((POFilter)fil).getPlan();
+ List<PhysicalOperator> filRoots = filPlan.getRoots();
+
+ assertEquals( ConstantExpression.class, filRoots.get(0).getClass() );
+ ConstantExpression ce1 = (ConstantExpression) filRoots.get(0);
+ assertEquals( null, ce1.getValue() );
+ assertEquals( ConstantExpression.class, filRoots.get(2).getClass() );
+ ConstantExpression ce2 = (ConstantExpression) filRoots.get(2);
+ assertEquals( null, ce2.getValue() );
+ assertEquals( POProject.class, filRoots.get(1).getClass() );
+ POProject prj8 = (POProject) filRoots.get(1);
+ assertEquals( 5, prj8.getColumn() );
+ assertEquals( POProject.class, filRoots.get(3).getClass() );
+ POProject prj9 = (POProject) filRoots.get(3);
+ assertEquals( 2, prj9.getColumn() );
+
+
+ PhysicalOperator fil2 = (PhysicalOperator)phyPlan.getSuccessors(ForE2).get(0);
+ assertEquals( POFilter.class, fil2.getClass() );
+
+ PhysicalOperator LoR3 = (PhysicalOperator)phyPlan.getSuccessors(fil2).get(0);
+ assertEquals( POLocalRearrange.class, LoR3.getClass() );
+ POLocalRearrange Lor3 = (POLocalRearrange) LoR3;
+ PhysicalOperator prj12 = Lor3.getPlans().get(0).getLeaves().get(0);
+ assertEquals( POProject.class, prj12.getClass() );
+ assertEquals(3, ((POProject)prj12).getColumn() );
+
+ PhysicalPlan filPlan2 = ((POFilter)fil2).getPlan();
+ List<PhysicalOperator> filRoots2 = filPlan2.getRoots();
+
+ assertEquals( ConstantExpression.class, filRoots2.get(0).getClass() );
+ ConstantExpression ce3 = (ConstantExpression) filRoots2.get(0);
+ assertEquals( null, ce3.getValue() );
+ assertEquals( ConstantExpression.class, filRoots2.get(2).getClass() );
+ ConstantExpression ce4 = (ConstantExpression) filRoots2.get(2);
+ assertEquals( null, ce4.getValue() );
+ assertEquals( POProject.class, filRoots2.get(1).getClass() );
+ POProject prj10 = (POProject) filRoots2.get(1);
+ assertEquals( 3, prj10.getColumn() );
+ assertEquals( POProject.class, filRoots2.get(3).getClass() );
+ POProject prj11 = (POProject) filRoots2.get(3);
+ assertEquals( 0, prj11.getColumn() );
+
+ // Check Store Operator
+ PhysicalOperator stor = (PhysicalOperator)phyPlan.getLeaves().get(0);
+ assertEquals( stor, phyPlan.getSuccessors(fil).get(0));
+ assertEquals( POStore.class, stor.getClass() );
+ assertTrue( ((POStore)stor).getSFile().getFileName().contains("empty") );
+ }
+
+ public void testPlanWithCast() throws Exception {
+ LogicalPlanTester lpt = new LogicalPlanTester();
+ lpt.buildPlan("a = load 'd.txt' as (id, c);");
+ lpt.buildPlan("b = filter a by (int)id==10;");
+ LogicalPlan plan = lpt.buildPlan("store b into 'empty';");
+
+ // check basics
+ org.apache.pig.experimental.logical.relational.LogicalPlan newLogicalPlan = migratePlan(plan);
+ PhysicalPlan phyPlan = translatePlan(newLogicalPlan);
+ assertEquals(3, phyPlan.size());
+ assertEquals(phyPlan.getRoots().size(), 1);
+ assertEquals(phyPlan.getLeaves().size(), 1 );
+
+ PhysicalOperator load = phyPlan.getRoots().get(0);
+ assertEquals( POLoad.class, load.getClass() );
+ assertTrue( ((POLoad)load).getLFile().getFileName().contains("d.txt"));
+
+ PhysicalOperator fil = phyPlan.getSuccessors(load).get(0);
+ assertEquals( POFilter.class, fil.getClass() );
+ PhysicalPlan filPlan = ((POFilter)fil).getPlan();
+
+ PhysicalOperator equal = filPlan.getLeaves().get(0);
+ assertEquals( EqualToExpr.class, equal.getClass() );
+ assertEquals( DataType.BOOLEAN, ((EqualToExpr)equal).getResultType() );
+
+ PhysicalOperator constExpr = ((EqualToExpr)equal).getRhs();
+ assertEquals( ConstantExpression.class, constExpr.getClass() );
+ assertEquals( 10, ((ConstantExpression)constExpr).getValue() );
+
+ PhysicalOperator castExpr = ((EqualToExpr)equal).getLhs();
+ assertEquals( POCast.class, castExpr.getClass() );
+ assertEquals( DataType.INTEGER, ((POCast)castExpr).getResultType() );
+
+ PhysicalOperator prj = ((POCast)castExpr).getInputs().get(0);
+ assertEquals( POProject.class, prj.getClass() );
+ assertEquals( 0, ((POProject)prj).getColumn() );
+
+ PhysicalOperator stor = phyPlan.getLeaves().get(0);
+ assertEquals( POStore.class, stor.getClass() );
+ assertTrue( ((POStore)stor).getSFile().getFileName().contains( "empty" ) );
+ }
+
+ public void testPlanWithGreaterThan() throws Exception {
+ LogicalPlanTester lpt = new LogicalPlanTester();
+ lpt.buildPlan("a = load 'd.txt' as (id, c);");
+ lpt.buildPlan("b = filter a by (int)id>10;");
+ LogicalPlan plan = lpt.buildPlan("store b into 'empty';");
+
+ // check basics
+ org.apache.pig.experimental.logical.relational.LogicalPlan newLogicalPlan = migratePlan(plan);
+ PhysicalPlan phyPlan = translatePlan(newLogicalPlan);
+ assertEquals(3, phyPlan.size());
+ assertEquals(phyPlan.getRoots().size(), 1);
+ assertEquals(phyPlan.getLeaves().size(), 1 );
+
+ PhysicalOperator load = phyPlan.getRoots().get(0);
+ assertEquals( POLoad.class, load.getClass() );
+ assertTrue( ((POLoad)load).getLFile().getFileName().contains("d.txt"));
+
+ PhysicalOperator fil = phyPlan.getSuccessors(load).get(0);
+ assertEquals( POFilter.class, fil.getClass() );
+ PhysicalPlan filPlan = ((POFilter)fil).getPlan();
+
+ PhysicalOperator greaterThan = filPlan.getLeaves().get(0);
+ assertEquals( GreaterThanExpr.class, greaterThan.getClass() );
+ assertEquals( DataType.BOOLEAN, ((GreaterThanExpr)greaterThan).getResultType() );
+
+ PhysicalOperator constExpr = ((GreaterThanExpr)greaterThan).getRhs();
+ assertEquals( ConstantExpression.class, constExpr.getClass() );
+ assertEquals( 10, ((ConstantExpression)constExpr).getValue() );
+
+ PhysicalOperator castExpr = ((GreaterThanExpr)greaterThan).getLhs();
+ assertEquals( POCast.class, castExpr.getClass() );
+ assertEquals( DataType.INTEGER, ((POCast)castExpr).getResultType() );
+
+ PhysicalOperator prj = ((POCast)castExpr).getInputs().get(0);
+ assertEquals( POProject.class, prj.getClass() );
+ assertEquals( 0, ((POProject)prj).getColumn() );
+
+ PhysicalOperator stor = phyPlan.getLeaves().get(0);
+ assertEquals( POStore.class, stor.getClass() );
+ assertTrue( ((POStore)stor).getSFile().getFileName().contains( "empty" ) );
+ }
+
+ public void testPlanWithLessThan() throws Exception {
+ LogicalPlanTester lpt = new LogicalPlanTester();
+ lpt.buildPlan("a = load 'd.txt' as (id, c);");
+ lpt.buildPlan("b = filter a by (int)id<10;");
+ LogicalPlan plan = lpt.buildPlan("store b into 'empty';");
+
+ // check basics
+ org.apache.pig.experimental.logical.relational.LogicalPlan newLogicalPlan = migratePlan(plan);
+ PhysicalPlan phyPlan = translatePlan(newLogicalPlan);
+ assertEquals(3, phyPlan.size());
+ assertEquals(phyPlan.getRoots().size(), 1);
+ assertEquals(phyPlan.getLeaves().size(), 1 );
+
+ PhysicalOperator load = phyPlan.getRoots().get(0);
+ assertEquals( POLoad.class, load.getClass() );
+ assertTrue( ((POLoad)load).getLFile().getFileName().contains("d.txt"));
+
+ PhysicalOperator fil = phyPlan.getSuccessors(load).get(0);
+ assertEquals( POFilter.class, fil.getClass() );
+ PhysicalPlan filPlan = ((POFilter)fil).getPlan();
+
+ PhysicalOperator lessThan = filPlan.getLeaves().get(0);
+ assertEquals( LessThanExpr.class, lessThan.getClass() );
+ assertEquals( DataType.BOOLEAN, ((LessThanExpr)lessThan).getResultType() );
+
+ PhysicalOperator constExpr = ((LessThanExpr)lessThan).getRhs();
+ assertEquals( ConstantExpression.class, constExpr.getClass() );
+ assertEquals( 10, ((ConstantExpression)constExpr).getValue() );
+
+ PhysicalOperator castExpr = ((LessThanExpr)lessThan).getLhs();
+ assertEquals( POCast.class, castExpr.getClass() );
+ assertEquals( DataType.INTEGER, ((POCast)castExpr).getResultType() );
+
+ PhysicalOperator prj = ((POCast)castExpr).getInputs().get(0);
+ assertEquals( POProject.class, prj.getClass() );
+ assertEquals( 0, ((POProject)prj).getColumn() );
+
+ PhysicalOperator stor = phyPlan.getLeaves().get(0);
+ assertEquals( POStore.class, stor.getClass() );
+ assertTrue( ((POStore)stor).getSFile().getFileName().contains( "empty" ) );
+ }
+
+ public void testForeachPlan() throws Exception {
+ LogicalPlanTester lpt = new LogicalPlanTester();
+ lpt.buildPlan("a = load 'd.txt' as (id, c);");
+ lpt.buildPlan("b = foreach a generate id, c;");
+ LogicalPlan plan = lpt.buildPlan("store b into 'empty';");
+
+ org.apache.pig.experimental.logical.relational.LogicalPlan newLogicalPlan = migratePlan(plan);
+ PhysicalPlan phyPlan = translatePlan(newLogicalPlan);
+
+ assertEquals(phyPlan.size(), 3);
+ POLoad load = (POLoad)phyPlan.getRoots().get(0);
+ assertEquals(phyPlan.getLeaves().get(0).getClass(), POStore.class);
+ POForEach foreach = (POForEach)phyPlan.getSuccessors(phyPlan.getRoots().get(0)).get(0);
+
+ assertEquals(foreach.getInputPlans().size(), 2);
+
+ PhysicalPlan inner = foreach.getInputPlans().get(0);
+ assertEquals(inner.size(), 1);
+ POProject prj = (POProject)inner.getRoots().get(0);
+ assertEquals(prj.getColumn(), 0);
+ assertEquals(prj.getInputs().get(0), load);
+
+ inner = foreach.getInputPlans().get(1);
+ assertEquals(inner.size(), 1);
+ prj = (POProject)inner.getRoots().get(0);
+ assertEquals(prj.getColumn(), 1);
+ assertEquals(prj.getInputs().get(0), load);
+ Boolean[] flat = foreach.getToBeFlattened().toArray(new Boolean[0]);
+ assertFalse(flat[0]);
+ assertFalse(flat[1]);
+ }
+
+ public void testForeachPlan2() throws Exception {
+ LogicalPlanTester lpt = new LogicalPlanTester();
+ lpt.buildPlan("a = load 'd.txt' as (id, c:bag{t:(s,v)});");
+ lpt.buildPlan("b = foreach a generate id, flatten(c);");
+ LogicalPlan plan = lpt.buildPlan("store b into 'empty';");
+
+ org.apache.pig.experimental.logical.relational.LogicalPlan newLogicalPlan = migratePlan(plan);
+ LogicalRelationalOperator ld = (LogicalRelationalOperator)newLogicalPlan.getSources().get(0);
+ LogicalRelationalOperator fe = (LogicalRelationalOperator)newLogicalPlan.getSuccessors(ld).get(0);
+ LogicalSchema ls = fe.getSchema();
+ assertEquals(1, ls.getField(0).uid);
+ assertEquals(4, ls.getField(1).uid);
+ assertEquals(5, ls.getField(2).uid);
+
+ LogicalSchema expected = new LogicalSchema();
+ expected.addField(new LogicalFieldSchema("id", null, DataType.BYTEARRAY));
+ expected.addField(new LogicalFieldSchema("s", null, DataType.BYTEARRAY));
+ expected.addField(new LogicalFieldSchema("v", null, DataType.BYTEARRAY));
+ assertTrue(expected.isEqual(ls));
+
+
+ PhysicalPlan phyPlan = translatePlan(newLogicalPlan);
+
+ assertEquals(phyPlan.size(), 3);
+ POLoad load = (POLoad)phyPlan.getRoots().get(0);
+ assertEquals(phyPlan.getLeaves().get(0).getClass(), POStore.class);
+ POForEach foreach = (POForEach)phyPlan.getSuccessors(phyPlan.getRoots().get(0)).get(0);
+
+ assertEquals(foreach.getInputPlans().size(), 2);
+
+ PhysicalPlan inner = foreach.getInputPlans().get(0);
+ assertEquals(inner.size(), 1);
+ POProject prj = (POProject)inner.getRoots().get(0);
+ assertEquals(prj.getColumn(), 0);
+ assertEquals(prj.getInputs().get(0), load);
+
+ inner = foreach.getInputPlans().get(1);
+ assertEquals(inner.size(), 1);
+ prj = (POProject)inner.getRoots().get(0);
+ assertEquals(prj.getColumn(), 1);
+ assertEquals(prj.getInputs().get(0), load);
+ Boolean[] flat = foreach.getToBeFlattened().toArray(new Boolean[0]);
+ assertFalse(flat[0]);
+ assertTrue(flat[1]);
+ }
+
+}
Added: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestExperimentalLogicalOptimizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestExperimentalLogicalOptimizer.java?rev=911616&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestExperimentalLogicalOptimizer.java (added)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestExperimentalLogicalOptimizer.java Thu Feb 18 22:20:07 2010
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.io.IOException;
+
+import org.apache.pig.FuncSpec;
+import org.apache.pig.data.DataType;
+import org.apache.pig.experimental.logical.expression.AndExpression;
+import org.apache.pig.experimental.logical.expression.ConstantExpression;
+import org.apache.pig.experimental.logical.expression.EqualExpression;
+import org.apache.pig.experimental.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.experimental.logical.expression.ProjectExpression;
+import org.apache.pig.experimental.logical.optimizer.LogicalPlanOptimizer;
+import org.apache.pig.experimental.logical.relational.LOFilter;
+import org.apache.pig.experimental.logical.relational.LOJoin;
+import org.apache.pig.experimental.logical.relational.LOLoad;
+import org.apache.pig.experimental.logical.relational.LogicalPlan;
+import org.apache.pig.experimental.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.experimental.logical.relational.LogicalSchema;
+import org.apache.pig.experimental.logical.relational.LOJoin.JOINTYPE;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.util.MultiMap;
+import org.junit.Test;
+
+import junit.framework.TestCase;
+
+/**
+ * Test end to end logical optimizations.
+ */
+public class TestExperimentalLogicalOptimizer extends TestCase {
+
+ @Test
+ public void testFilterPushDown() throws IOException {
+ // A logical plan for:
+ // A = load 'bla' as (x, y);
+ // B = load 'morebla' as (a, b);
+ // C = join A on x, B on a;
+ // D = filter C by x = a and x = 0 and b = 1 and y = b;
+ // store D into 'whatever';
+
+ // A = load
+ LogicalPlan lp = new LogicalPlan();
+ {
+ LogicalSchema aschema = new LogicalSchema();
+ aschema.addField(new LogicalSchema.LogicalFieldSchema(
+ "x", null, DataType.INTEGER));
+ aschema.addField(new LogicalSchema.LogicalFieldSchema(
+ "y", null, DataType.INTEGER));
+ aschema.getField(0).uid = 1;
+ aschema.getField(1).uid = 2;
+ LOLoad A = new LOLoad(new FileSpec("bla", new FuncSpec("PigStorage", "\t")), aschema, lp);
+ lp.add(A);
+
+ // B = load
+ LogicalSchema bschema = new LogicalSchema();
+ bschema.addField(new LogicalSchema.LogicalFieldSchema(
+ "a", null, DataType.INTEGER));
+ bschema.addField(new LogicalSchema.LogicalFieldSchema(
+ "b", null, DataType.INTEGER));
+ bschema.getField(0).uid = 3;
+ bschema.getField(1).uid = 4;
+ LOLoad B = new LOLoad(null, bschema, lp);
+ lp.add(B);
+
+ // C = join
+ LogicalSchema cschema = new LogicalSchema();
+ cschema.addField(new LogicalSchema.LogicalFieldSchema(
+ "x", null, DataType.INTEGER));
+ cschema.addField(new LogicalSchema.LogicalFieldSchema(
+ "y", null, DataType.INTEGER));
+ cschema.addField(new LogicalSchema.LogicalFieldSchema(
+ "a", null, DataType.INTEGER));
+ cschema.addField(new LogicalSchema.LogicalFieldSchema(
+ "b", null, DataType.INTEGER));
+ cschema.getField(0).uid = 1;
+ cschema.getField(1).uid = 2;
+ cschema.getField(2).uid = 3;
+ cschema.getField(3).uid = 4;
+ LogicalExpressionPlan aprojplan = new LogicalExpressionPlan();
+ ProjectExpression x = new ProjectExpression(aprojplan, DataType.INTEGER, 0, 0);
+ x.neverUseForRealSetUid(1);
+ LogicalExpressionPlan bprojplan = new LogicalExpressionPlan();
+ ProjectExpression y = new ProjectExpression(bprojplan, DataType.INTEGER, 1, 0);
+ y.neverUseForRealSetUid(3);
+ MultiMap<Integer, LogicalExpressionPlan> mm =
+ new MultiMap<Integer, LogicalExpressionPlan>();
+ mm.put(0, aprojplan);
+ mm.put(1, bprojplan);
+ LOJoin C = new LOJoin(lp, mm, JOINTYPE.HASH, new boolean[] {true, true});
+ C.neverUseForRealSetSchema(cschema);
+ lp.add(new LogicalRelationalOperator[] {A, B}, C, null);
+
+ // D = filter
+ LogicalExpressionPlan filterPlan = new LogicalExpressionPlan();
+ ProjectExpression fx = new ProjectExpression(filterPlan, DataType.INTEGER, 0, 0);
+ fx.neverUseForRealSetUid(1);
+ ConstantExpression fc0 = new ConstantExpression(filterPlan, DataType.INTEGER, new Integer(0));
+ EqualExpression eq1 = new EqualExpression(filterPlan, fx, fc0);
+ ProjectExpression fanotherx = new ProjectExpression(filterPlan, DataType.INTEGER, 0, 0);
+ fanotherx.neverUseForRealSetUid(1);
+ ProjectExpression fa = new ProjectExpression(filterPlan, DataType.INTEGER, 0, 2);
+ fa.neverUseForRealSetUid(3);
+ EqualExpression eq2 = new EqualExpression(filterPlan, fanotherx, fa);
+ AndExpression and1 = new AndExpression(filterPlan, eq1, eq2);
+ ProjectExpression fb = new ProjectExpression(filterPlan, DataType.INTEGER, 0, 3);
+ fb.neverUseForRealSetUid(4);
+ ConstantExpression fc1 = new ConstantExpression(filterPlan, DataType.INTEGER, new Integer(1));
+ EqualExpression eq3 = new EqualExpression(filterPlan, fb, fc1);
+ AndExpression and2 = new AndExpression(filterPlan, and1, eq3);
+ ProjectExpression fanotherb = new ProjectExpression(filterPlan, DataType.INTEGER, 0, 3);
+ fanotherb.neverUseForRealSetUid(4);
+ ProjectExpression fy = new ProjectExpression(filterPlan, DataType.INTEGER, 0, 1);
+ fy.neverUseForRealSetUid(2);
+ EqualExpression eq4 = new EqualExpression(filterPlan, fy, fanotherb);
+ new AndExpression(filterPlan, and2, eq4);
+
+ LOFilter D = new LOFilter(lp, filterPlan);
+ D.neverUseForRealSetSchema(cschema);
+ // Connect D to B, since the transform has happened.
+ lp.add(C, D, (LogicalRelationalOperator)null);
+ }
+
+ LogicalPlanOptimizer optimizer = new LogicalPlanOptimizer(lp, 500);
+ optimizer.optimize();
+
+ LogicalPlan expected = new LogicalPlan();
+ {
+ // A = load
+ LogicalSchema aschema = new LogicalSchema();
+ aschema.addField(new LogicalSchema.LogicalFieldSchema(
+ "x", null, DataType.INTEGER));
+ aschema.addField(new LogicalSchema.LogicalFieldSchema(
+ "y", null, DataType.INTEGER));
+ aschema.getField(0).uid = 1;
+ aschema.getField(1).uid = 2;
+ LOLoad A = new LOLoad(new FileSpec("bla", new FuncSpec("PigStorage", "\t")), aschema, expected);
+ expected.add(A);
+
+ // DA = filter
+ LogicalExpressionPlan DAfilterPlan = new LogicalExpressionPlan();
+ ProjectExpression fx = new ProjectExpression(DAfilterPlan, DataType.INTEGER, 0, 0);
+ fx.neverUseForRealSetUid(1);
+ ConstantExpression fc0 = new ConstantExpression(DAfilterPlan, DataType.INTEGER, new Integer(0));
+ new EqualExpression(DAfilterPlan, fx, fc0);
+
+ LOFilter DA = new LOFilter(expected, DAfilterPlan);
+ DA.neverUseForRealSetSchema(aschema);
+ expected.add(A, DA, (LogicalRelationalOperator)null);
+
+ // B = load
+ LogicalSchema bschema = new LogicalSchema();
+ bschema.addField(new LogicalSchema.LogicalFieldSchema(
+ "a", null, DataType.INTEGER));
+ bschema.addField(new LogicalSchema.LogicalFieldSchema(
+ "b", null, DataType.INTEGER));
+ bschema.getField(0).uid = 3;
+ bschema.getField(1).uid = 4;
+ LOLoad B = new LOLoad(null, bschema, expected);
+ expected.add(B);
+
+ // DB = filter
+ LogicalExpressionPlan DBfilterPlan = new LogicalExpressionPlan();
+ ProjectExpression fb = new ProjectExpression(DBfilterPlan, DataType.INTEGER, 0, 1);
+ fb.neverUseForRealSetUid(4);
+ ConstantExpression fc1 = new ConstantExpression(DBfilterPlan, DataType.INTEGER, new Integer(1));
+ new EqualExpression(DBfilterPlan, fb, fc1);
+
+ LOFilter DB = new LOFilter(expected, DBfilterPlan);
+ DB.neverUseForRealSetSchema(bschema);
+ expected.add(B, DB, (LogicalRelationalOperator)null);
+
+ // C = join
+ LogicalSchema cschema = new LogicalSchema();
+ cschema.addField(new LogicalSchema.LogicalFieldSchema(
+ "x", null, DataType.INTEGER));
+ cschema.addField(new LogicalSchema.LogicalFieldSchema(
+ "y", null, DataType.INTEGER));
+ cschema.addField(new LogicalSchema.LogicalFieldSchema(
+ "a", null, DataType.INTEGER));
+ cschema.addField(new LogicalSchema.LogicalFieldSchema(
+ "b", null, DataType.INTEGER));
+ cschema.getField(0).uid = 1;
+ cschema.getField(1).uid = 2;
+ cschema.getField(2).uid = 3;
+ cschema.getField(3).uid = 4;
+ LogicalExpressionPlan aprojplan = new LogicalExpressionPlan();
+ ProjectExpression x = new ProjectExpression(aprojplan, DataType.INTEGER, 0, 0);
+ x.neverUseForRealSetUid(1);
+ LogicalExpressionPlan bprojplan = new LogicalExpressionPlan();
+ ProjectExpression y = new ProjectExpression(bprojplan, DataType.INTEGER, 1, 0);
+ y.neverUseForRealSetUid(3);
+ MultiMap<Integer, LogicalExpressionPlan> mm =
+ new MultiMap<Integer, LogicalExpressionPlan>();
+ mm.put(0, aprojplan);
+ mm.put(1, bprojplan);
+ LOJoin C = new LOJoin(expected, mm, JOINTYPE.HASH, new boolean[] {true, true});
+ C.neverUseForRealSetSchema(cschema);
+ expected.add(new LogicalRelationalOperator[] {DA, DB}, C, null);
+
+ // D = filter
+ LogicalExpressionPlan filterPlan = new LogicalExpressionPlan();
+ ProjectExpression fanotherx = new ProjectExpression(filterPlan, DataType.INTEGER, 0, 0);
+ fanotherx.neverUseForRealSetUid(1);
+ ProjectExpression fa = new ProjectExpression(filterPlan, DataType.INTEGER, 0, 2);
+ fa.neverUseForRealSetUid(3);
+ EqualExpression eq2 = new EqualExpression(filterPlan, fanotherx, fa);
+ ProjectExpression fanotherb = new ProjectExpression(filterPlan, DataType.INTEGER, 0, 3);
+ fanotherb.neverUseForRealSetUid(4);
+ ProjectExpression fy = new ProjectExpression(filterPlan, DataType.INTEGER, 0, 1);
+ fy.neverUseForRealSetUid(2);
+ EqualExpression eq4 = new EqualExpression(filterPlan, fy, fanotherb);
+ new AndExpression(filterPlan, eq2, eq4);
+
+ LOFilter D = new LOFilter(expected, filterPlan);
+ D.neverUseForRealSetSchema(cschema);
+ expected.add(C, D, (LogicalRelationalOperator)null);
+ }
+
+ assertTrue( lp.isEqual(expected) );
+ // assertEquals(lp, expected);
+ }
+
+}
Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestExperimentalOperatorPlan.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestExperimentalOperatorPlan.java?rev=911616&r1=911615&r2=911616&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestExperimentalOperatorPlan.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestExperimentalOperatorPlan.java Thu Feb 18 22:20:07 2010
@@ -22,6 +22,7 @@
import java.util.Collection;
import java.util.List;
+import org.apache.pig.FuncSpec;
import org.apache.pig.data.DataType;
import org.apache.pig.experimental.logical.expression.AndExpression;
import org.apache.pig.experimental.logical.expression.ConstantExpression;
@@ -29,10 +30,14 @@
import org.apache.pig.experimental.logical.expression.LogicalExpressionPlan;
import org.apache.pig.experimental.logical.expression.LogicalExpressionVisitor;
import org.apache.pig.experimental.logical.expression.ProjectExpression;
+import org.apache.pig.experimental.logical.relational.LOFilter;
+import org.apache.pig.experimental.logical.relational.LOJoin;
import org.apache.pig.experimental.logical.relational.LOLoad;
import org.apache.pig.experimental.logical.relational.LogicalPlan;
import org.apache.pig.experimental.logical.relational.LogicalPlanVisitor;
import org.apache.pig.experimental.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.experimental.logical.relational.LogicalSchema;
+import org.apache.pig.experimental.logical.relational.LOJoin.JOINTYPE;
import org.apache.pig.experimental.plan.BaseOperatorPlan;
import org.apache.pig.experimental.plan.DependencyOrderWalker;
import org.apache.pig.experimental.plan.DepthFirstWalker;
@@ -42,6 +47,8 @@
import org.apache.pig.experimental.plan.PlanVisitor;
import org.apache.pig.experimental.plan.PlanWalker;
import org.apache.pig.experimental.plan.ReverseDependencyOrderWalker;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.util.MultiMap;
import org.apache.pig.impl.util.Pair;
import org.junit.Test;
@@ -54,7 +61,6 @@
SillyPlan() {
super();
}
-
}
private static class SillyOperator extends Operator {
@@ -64,10 +70,6 @@
super(n, p);
name = n;
}
-
- public boolean equals(SillyOperator other) {
- return other.name == name;
- }
@Override
public void accept(PlanVisitor v) {
@@ -75,6 +77,11 @@
((SillyVisitor)v).visitSillyOperator(this);
}
}
+
+ @Override
+ public boolean isEqual(Operator operator) {
+ return ( name.compareTo(operator.getName()) == 0 );
+ }
}
private static class SillyVisitor extends PlanVisitor {
@@ -173,9 +180,9 @@
// Test that roots and leaves are empty when there are no operators in
// plan.
- List<Operator> list = plan.getRoots();
+ List<Operator> list = plan.getSources();
assertEquals(0, list.size());
- list = plan.getLeaves();
+ list = plan.getSinks();
assertEquals(0, list.size());
plan.add(fred);
@@ -185,9 +192,9 @@
plan.add(sam);
// Test that when not connected all nodes are roots and leaves.
- list = plan.getRoots();
+ list = plan.getSources();
assertEquals(5, list.size());
- list = plan.getLeaves();
+ list = plan.getSinks();
assertEquals(5, list.size());
// Connect them up
@@ -197,15 +204,15 @@
plan.connect(bob, sam);
// Check that the roots and leaves came out right
- list = plan.getRoots();
+ list = plan.getSources();
assertEquals(2, list.size());
for (Operator op : list) {
- assertTrue(fred.equals(op) || joe.equals(op));
+ assertTrue(fred.isEqual(op) || joe.isEqual(op));
}
- list = plan.getLeaves();
+ list = plan.getSinks();
assertEquals(2, list.size());
for (Operator op : list) {
- assertTrue(jim.equals(op) || sam.equals(op));
+ assertTrue(jim.isEqual(op) || sam.isEqual(op));
}
// Check each of their successors and predecessors
@@ -243,15 +250,15 @@
plan.connect(jim, p2.first, bob, p2.second);
// Check that the roots and leaves came out right
- list = plan.getRoots();
+ list = plan.getSources();
assertEquals(2, list.size());
for (Operator op : list) {
- assertTrue(jim.equals(op) || joe.equals(op));
+ assertTrue(jim.isEqual(op) || joe.isEqual(op));
}
- list = plan.getLeaves();
+ list = plan.getSinks();
assertEquals(2, list.size());
for (Operator op : list) {
- assertTrue(fred.equals(op) || sam.equals(op));
+ assertTrue(fred.isEqual(op) || sam.isEqual(op));
}
// Check each of their successors and predecessors
@@ -299,9 +306,9 @@
plan.remove(bob);
plan.disconnect(fred, joe);
- List<Operator> list = plan.getRoots();
+ List<Operator> list = plan.getSources();
assertEquals(2, list.size());
- list = plan.getLeaves();
+ list = plan.getSinks();
assertEquals(2, list.size());
plan.remove(fred);
@@ -309,9 +316,9 @@
assertEquals(0, plan.size());
- list = plan.getRoots();
+ list = plan.getSources();
assertEquals(0, list.size());
- list = plan.getLeaves();
+ list = plan.getSinks();
assertEquals(0, list.size());
}
@@ -744,5 +751,751 @@
v.visit();
assertEquals("and equal project constant constant ", v.getVisitPlan());
}
+
+ @Test
+ public void testExpressionEquality() {
+ LogicalExpressionPlan ep1 = new LogicalExpressionPlan();
+ ConstantExpression c1 = new ConstantExpression(ep1, DataType.INTEGER, new Integer(5));
+ ProjectExpression p1 = new ProjectExpression(ep1, DataType.INTEGER, 0, 0);
+ EqualExpression e1 = new EqualExpression(ep1, p1, c1);
+ ConstantExpression ca1 = new ConstantExpression(ep1, DataType.BOOLEAN, new Boolean("true"));
+ AndExpression a1 = new AndExpression(ep1, e1, ca1);
+
+ LogicalExpressionPlan ep2 = new LogicalExpressionPlan();
+ ConstantExpression c2 = new ConstantExpression(ep2, DataType.INTEGER, new Integer(5));
+ ProjectExpression p2 = new ProjectExpression(ep2, DataType.INTEGER, 0, 0);
+ EqualExpression e2 = new EqualExpression(ep2, p2, c2);
+ ConstantExpression ca2 = new ConstantExpression(ep2, DataType.BOOLEAN, new Boolean("true"));
+ AndExpression a2 = new AndExpression(ep2, e2, ca2);
+
+ assertTrue(ep1.isEqual(ep2));
+ assertTrue(c1.isEqual(c2));
+ assertTrue(p1.isEqual(p2));
+ assertTrue(e1.isEqual(e2));
+ assertTrue(ca1.isEqual(ca2));
+ assertTrue(a1.isEqual(a2));
+
+ LogicalExpressionPlan ep3 = new LogicalExpressionPlan();
+ ConstantExpression c3 = new ConstantExpression(ep3, DataType.INTEGER, new Integer(3));
+ ProjectExpression p3 = new ProjectExpression(ep3, DataType.INTEGER, 0, 1);
+ EqualExpression e3 = new EqualExpression(ep3, p3, c3);
+ ConstantExpression ca3 = new ConstantExpression(ep3, DataType.CHARARRAY, "true");
+ AndExpression a3 = new AndExpression(ep3, e3, ca3);
+
+ assertFalse(ep1.isEqual(ep3));
+ assertFalse(c1.isEqual(c3));
+ assertFalse(p1.isEqual(p3));
+ assertFalse(e1.isEqual(e3));
+ assertFalse(ca1.isEqual(ca3));
+ assertFalse(a1.isEqual(a3));
+
+ LogicalExpressionPlan ep4 = new LogicalExpressionPlan();
+ ProjectExpression p4 = new ProjectExpression(ep4, DataType.INTEGER, 1, 0);
+
+ assertFalse(ep1.isEqual(ep4));
+ assertFalse(p1.isEqual(p4));
+ }
+
+ @Test
+ public void testRelationalEquality() throws IOException {
+ // Build a plan that is the logical plan for
+ // A = load 'bla' as (x);
+ // B = load 'morebla' as (y);
+ // C = join A on x, B on y;
+ // D = filter C by y > 0;
+
+ // A = load
+ LogicalPlan lp = new LogicalPlan();
+ {
+ LogicalSchema aschema = new LogicalSchema();
+ aschema.addField(new LogicalSchema.LogicalFieldSchema(
+ "x", null, DataType.INTEGER));
+ LOLoad A = new LOLoad(new FileSpec("/abc",
+ new FuncSpec("/fooload", new String[] {"x", "y"})), aschema, lp);
+ lp.add(A);
+
+ // B = load
+ LogicalSchema bschema = new LogicalSchema();
+ bschema.addField(new LogicalSchema.LogicalFieldSchema(
+ "y", null, DataType.INTEGER));
+ LOLoad B = new LOLoad(new FileSpec("/def",
+ new FuncSpec("PigStorage", "\t")), bschema, lp);
+ lp.add(B);
+
+ // C = join
+ LogicalSchema cschema = new LogicalSchema();
+ cschema.addField(new LogicalSchema.LogicalFieldSchema(
+ "x", null, DataType.INTEGER));
+ cschema.addField(new LogicalSchema.LogicalFieldSchema(
+ "y", null, DataType.INTEGER));
+ LogicalExpressionPlan aprojplan = new LogicalExpressionPlan();
+ new ProjectExpression(aprojplan, DataType.INTEGER, 0, 0);
+ LogicalExpressionPlan bprojplan = new LogicalExpressionPlan();
+ new ProjectExpression(bprojplan, DataType.INTEGER, 1, 0);
+ MultiMap<Integer, LogicalExpressionPlan> mm =
+ new MultiMap<Integer, LogicalExpressionPlan>();
+ mm.put(0, aprojplan);
+ mm.put(1, bprojplan);
+ LOJoin C = new LOJoin(lp, mm, JOINTYPE.HASH, new boolean[] {true, true});
+ C.neverUseForRealSetSchema(cschema);
+ lp.add(new LogicalRelationalOperator[] {A, B}, C, null);
+
+ // D = filter
+ LogicalExpressionPlan filterPlan = new LogicalExpressionPlan();
+ ProjectExpression fy = new ProjectExpression(filterPlan, DataType.INTEGER, 0, 1);
+ ConstantExpression fc = new ConstantExpression(filterPlan, DataType.INTEGER, new Integer(0));
+ new EqualExpression(filterPlan, fy, fc);
+ LOFilter D = new LOFilter(lp, filterPlan);
+ D.neverUseForRealSetSchema(cschema);
+ lp.add(C, D, (LogicalRelationalOperator)null);
+ }
+
+ // Build a second similar plan to test equality
+ // A = load
+ LogicalPlan lp1 = new LogicalPlan();
+ {
+ LogicalSchema aschema = new LogicalSchema();
+ aschema.addField(new LogicalSchema.LogicalFieldSchema(
+ "x", null, DataType.INTEGER));
+ LOLoad A = new LOLoad(new FileSpec("/abc",
+ new FuncSpec("/fooload", new String[] {"x", "y"})), aschema, lp1);
+ lp1.add(A);
+
+ // B = load
+ LogicalSchema bschema = new LogicalSchema();
+ bschema.addField(new LogicalSchema.LogicalFieldSchema(
+ "y", null, DataType.INTEGER));
+ LOLoad B = new LOLoad(new FileSpec("/def",
+ new FuncSpec("PigStorage", "\t")), bschema, lp1);
+ lp1.add(B);
+
+ // C = join
+ LogicalSchema cschema = new LogicalSchema();
+ cschema.addField(new LogicalSchema.LogicalFieldSchema(
+ "x", null, DataType.INTEGER));
+ cschema.addField(new LogicalSchema.LogicalFieldSchema(
+ "y", null, DataType.INTEGER));
+ LogicalExpressionPlan aprojplan = new LogicalExpressionPlan();
+ new ProjectExpression(aprojplan, DataType.INTEGER, 0, 0);
+ LogicalExpressionPlan bprojplan = new LogicalExpressionPlan();
+ new ProjectExpression(bprojplan, DataType.INTEGER, 1, 0);
+ MultiMap<Integer, LogicalExpressionPlan> mm =
+ new MultiMap<Integer, LogicalExpressionPlan>();
+ mm.put(0, aprojplan);
+ mm.put(1, bprojplan);
+ LOJoin C = new LOJoin(lp1, mm, JOINTYPE.HASH, new boolean[] {true, true});
+ C.neverUseForRealSetSchema(cschema);
+ lp1.add(new LogicalRelationalOperator[] {A, B}, C, null);
+
+ // D = filter
+ LogicalExpressionPlan filterPlan = new LogicalExpressionPlan();
+ ProjectExpression fy = new ProjectExpression(filterPlan, DataType.INTEGER, 0, 1);
+ ConstantExpression fc = new ConstantExpression(filterPlan, DataType.INTEGER, new Integer(0));
+ new EqualExpression(filterPlan, fy, fc);
+ LOFilter D = new LOFilter(lp1, filterPlan);
+ D.neverUseForRealSetSchema(cschema);
+ lp1.add(C, D, (LogicalRelationalOperator)null);
+ }
+
+ assertTrue( lp.isEqual(lp1));
+ }
+
+ @Test
+ public void testLoadEqualityDifferentFuncSpecCtorArgs() {
+ LogicalPlan lp = new LogicalPlan();
+
+ LogicalSchema aschema1 = new LogicalSchema();
+ aschema1.addField(new LogicalSchema.LogicalFieldSchema(
+ "x", null, DataType.INTEGER));
+ LOLoad load1 = new LOLoad(new FileSpec("/abc",
+ new FuncSpec("foo", new String[] {"x", "y"})), aschema1, lp);
+ lp.add(load1);
+
+ LOLoad load2 = new LOLoad(new FileSpec("/abc",
+ new FuncSpec("foo", new String[] {"x", "z"})), aschema1, lp);
+ lp.add(load2);
+
+ assertFalse(load1.isEqual(load2));
+ }
+
+ @Test
+ public void testLoadEqualityDifferentNumFuncSpecCstorArgs() {
+ LogicalPlan lp = new LogicalPlan();
+
+ LogicalSchema aschema1 = new LogicalSchema();
+ aschema1.addField(new LogicalSchema.LogicalFieldSchema(
+ "x", null, DataType.INTEGER));
+ LOLoad load1 = new LOLoad(new FileSpec("/abc",
+ new FuncSpec("foo", new String[] {"x", "y"})), aschema1, lp);
+ lp.add(load1);
+
+ LOLoad load3 = new LOLoad(new FileSpec("/abc",
+ new FuncSpec("foo", "x")), aschema1, lp);
+ lp.add(load3);
+
+ assertFalse(load1.isEqual(load3));
+ }
+
+ @Test
+ public void testLoadEqualityDifferentFunctionNames() {
+ LogicalPlan lp = new LogicalPlan();
+
+ LogicalSchema aschema1 = new LogicalSchema();
+ aschema1.addField(new LogicalSchema.LogicalFieldSchema(
+ "x", null, DataType.INTEGER));
+ LOLoad load1 = new LOLoad(new FileSpec("/abc",
+ new FuncSpec("foo", new String[] {"x", "y"})), aschema1, lp);
+ lp.add(load1);
+
+ // Different function names in FuncSpec
+ LOLoad load4 = new LOLoad(new FileSpec("/abc",
+ new FuncSpec("foobar", new String[] {"x", "z"})), aschema1, lp);
+ lp.add(load4);
+
+ assertFalse(load1.isEqual(load4));
+ }
+
+ @Test
+ public void testLoadEqualityDifferentFileName() {
+ LogicalPlan lp = new LogicalPlan();
+ LogicalSchema aschema1 = new LogicalSchema();
+ aschema1.addField(new LogicalSchema.LogicalFieldSchema(
+ "x", null, DataType.INTEGER));
+ LOLoad load1 = new LOLoad(new FileSpec("/abc",
+ new FuncSpec("foo", new String[] {"x", "y"})), aschema1, lp);
+ lp.add(load1);
+
+ // Different file name
+ LOLoad load5 = new LOLoad(new FileSpec("/def",
+ new FuncSpec("foo", new String[] {"x", "z"})), aschema1, lp);
+ lp.add(load5);
+
+ assertFalse(load1.isEqual(load5));
+ }
+
+ @Test
+ public void testRelationalEqualityDifferentSchema() {
+ LogicalPlan lp = new LogicalPlan();
+ LogicalSchema aschema1 = new LogicalSchema();
+ aschema1.addField(new LogicalSchema.LogicalFieldSchema(
+ "x", null, DataType.INTEGER));
+ LOLoad load1 = new LOLoad(new FileSpec("/abc",
+ new FuncSpec("foo", new String[] {"x", "y"})), aschema1, lp);
+ lp.add(load1);
+
+ // Different schema
+ LogicalSchema aschema2 = new LogicalSchema();
+ aschema2.addField(new LogicalSchema.LogicalFieldSchema(
+ "x", null, DataType.CHARARRAY));
+
+ LOLoad load6 = new LOLoad(new FileSpec("/abc",
+ new FuncSpec("foo", new String[] {"x", "z"})), aschema2, lp);
+ lp.add(load6);
+
+ assertFalse(load1.isEqual(load6));
+ }
+
+ @Test
+ public void testRelationalEqualityNullSchemas() {
+ LogicalPlan lp = new LogicalPlan();
+ // Test that two loads with no schema are still equal
+ LOLoad load7 = new LOLoad(new FileSpec("/abc",
+ new FuncSpec("foo", new String[] {"x", "y"})), null, lp);
+ lp.add(load7);
+
+ LOLoad load8 = new LOLoad(new FileSpec("/abc",
+ new FuncSpec("foo", new String[] {"x", "y"})), null, lp);
+ lp.add(load8);
+
+ assertTrue(load7.isEqual(load8));
+ }
+
+ @Test
+ public void testRelationalEqualityOneNullOneNotNullSchema() {
+ LogicalPlan lp = new LogicalPlan();
+ LogicalSchema aschema1 = new LogicalSchema();
+ aschema1.addField(new LogicalSchema.LogicalFieldSchema(
+ "x", null, DataType.INTEGER));
+ LOLoad load1 = new LOLoad(new FileSpec("/abc",
+ new FuncSpec("foo", new String[] {"x", "y"})), aschema1, lp);
+ lp.add(load1);
+
+ // Test that one with schema and one without breaks equality
+ LOLoad load9 = new LOLoad(new FileSpec("/abc",
+ new FuncSpec("foo", new String[] {"x", "z"})), null, lp);
+ lp.add(load9);
+
+ assertFalse(load1.isEqual(load9));
+ }
+
+ @Test
+ public void testFilterDifferentPredicates() {
+ LogicalPlan lp = new LogicalPlan();
+
+ LogicalExpressionPlan fp1 = new LogicalExpressionPlan();
+ ProjectExpression fy1 = new ProjectExpression(fp1, DataType.INTEGER, 0, 1);
+ ConstantExpression fc1 = new ConstantExpression(fp1, DataType.INTEGER,
+ new Integer(0));
+ new EqualExpression(fp1, fy1, fc1);
+ LOFilter D1 = new LOFilter(lp, fp1);
+ LogicalSchema cschema = new LogicalSchema();
+ cschema.addField(new LogicalSchema.LogicalFieldSchema(
+ "x", null, DataType.INTEGER));
+ cschema.addField(new LogicalSchema.LogicalFieldSchema(
+ "y", null, DataType.INTEGER));
+ D1.neverUseForRealSetSchema(cschema);
+ lp.add(D1);
+
+ LogicalExpressionPlan fp2 = new LogicalExpressionPlan();
+ ProjectExpression fy2 = new ProjectExpression(fp2, DataType.INTEGER, 0, 1);
+ ConstantExpression fc2 = new ConstantExpression(fp2, DataType.INTEGER,
+ new Integer(1));
+ new EqualExpression(fp2, fy2, fc2);
+ LOFilter D2 = new LOFilter(lp, fp2);
+ D2.neverUseForRealSetSchema(cschema);
+ lp.add(D2);
+
+ assertFalse(D1.isEqual(D2));
+ }
+
+ // No tests for LOStore because it tries to actually instantiate the store
+ // func, and I don't want to mess with that here.
+
+ @Test
+ public void testJoinDifferentJoinTypes() throws IOException {
+ LogicalPlan lp = new LogicalPlan();
+ LogicalSchema jaschema1 = new LogicalSchema();
+ jaschema1.addField(new LogicalSchema.LogicalFieldSchema(
+ "x", null, DataType.INTEGER));
+ LOLoad A1 = new LOLoad(new FileSpec("/abc",
+ new FuncSpec("/fooload", new String[] {"x", "y"})), jaschema1, lp);
+ lp.add(A1);
+
+ // B = load
+ LogicalSchema jbschema1 = new LogicalSchema();
+ jbschema1.addField(new LogicalSchema.LogicalFieldSchema(
+ "y", null, DataType.INTEGER));
+ LOLoad B1 = new LOLoad(new FileSpec("/def",
+ new FuncSpec("PigStorage", "\t")), jbschema1, lp);
+ lp.add(B1);
+
+ // C = join
+ LogicalSchema jcschema1 = new LogicalSchema();
+ jcschema1.addField(new LogicalSchema.LogicalFieldSchema(
+ "x", null, DataType.INTEGER));
+ jcschema1.addField(new LogicalSchema.LogicalFieldSchema(
+ "y", null, DataType.INTEGER));
+ LogicalExpressionPlan aprojplan1 = new LogicalExpressionPlan();
+ new ProjectExpression(aprojplan1, DataType.INTEGER, 0, 0);
+ LogicalExpressionPlan bprojplan1 = new LogicalExpressionPlan();
+ new ProjectExpression(bprojplan1, DataType.INTEGER, 1, 0);
+ MultiMap<Integer, LogicalExpressionPlan> mm1 =
+ new MultiMap<Integer, LogicalExpressionPlan>();
+ mm1.put(0, aprojplan1);
+ mm1.put(1, bprojplan1);
+ LOJoin C1 = new LOJoin(lp, mm1, JOINTYPE.HASH, new boolean[] {true, true});
+ C1.neverUseForRealSetSchema(jcschema1);
+ lp.add(new LogicalRelationalOperator[] {A1, B1}, C1, null);
+
+ // A = load
+ LogicalSchema jaschema2 = new LogicalSchema();
+ jaschema2.addField(new LogicalSchema.LogicalFieldSchema(
+ "x", null, DataType.INTEGER));
+ LOLoad A2 = new LOLoad(new FileSpec("/abc",
+ new FuncSpec("/fooload", new String[] {"x", "y"})), jaschema2, lp);
+ lp.add(A2);
+
+ // B = load
+ LogicalSchema jbschema2 = new LogicalSchema();
+ jbschema2.addField(new LogicalSchema.LogicalFieldSchema(
+ "y", null, DataType.INTEGER));
+ LOLoad B2 = new LOLoad(new FileSpec("/def",
+ new FuncSpec("PigStorage", "\t")), jbschema2, lp);
+ lp.add(B2);
+
+ // C = join
+ LogicalSchema jcschema2 = new LogicalSchema();
+ jcschema2.addField(new LogicalSchema.LogicalFieldSchema(
+ "x", null, DataType.INTEGER));
+ jcschema2.addField(new LogicalSchema.LogicalFieldSchema(
+ "y", null, DataType.INTEGER));
+ LogicalExpressionPlan aprojplan2 = new LogicalExpressionPlan();
+ new ProjectExpression(aprojplan2, DataType.INTEGER, 0, 0);
+ LogicalExpressionPlan bprojplan2 = new LogicalExpressionPlan();
+ new ProjectExpression(bprojplan2, DataType.INTEGER, 1, 0);
+ MultiMap<Integer, LogicalExpressionPlan> mm2 =
+ new MultiMap<Integer, LogicalExpressionPlan>();
+ mm2.put(0, aprojplan2);
+ mm2.put(1, bprojplan2);
+ LOJoin C2 = new LOJoin(lp, mm2, JOINTYPE.SKEWED, new boolean[] {true, true});
+ C2.neverUseForRealSetSchema(jcschema2);
+ lp.add(new LogicalRelationalOperator[] {A2, B2}, C2, null);
+
+ assertFalse(C1.isEqual(C2));
+ }
+
+ @Test
+ public void testJoinDifferentInner() throws IOException {
+ LogicalPlan lp = new LogicalPlan();
+ LogicalSchema jaschema1 = new LogicalSchema();
+ jaschema1.addField(new LogicalSchema.LogicalFieldSchema(
+ "x", null, DataType.INTEGER));
+ LOLoad A1 = new LOLoad(new FileSpec("/abc",
+ new FuncSpec("/fooload", new String[] {"x", "y"})), jaschema1, lp);
+ lp.add(A1);
+
+ // B = load
+ LogicalSchema jbschema1 = new LogicalSchema();
+ jbschema1.addField(new LogicalSchema.LogicalFieldSchema(
+ "y", null, DataType.INTEGER));
+ LOLoad B1 = new LOLoad(new FileSpec("/def",
+ new FuncSpec("PigStorage", "\t")), jbschema1, lp);
+ lp.add(B1);
+
+ // C = join
+ LogicalSchema jcschema1 = new LogicalSchema();
+ jcschema1.addField(new LogicalSchema.LogicalFieldSchema(
+ "x", null, DataType.INTEGER));
+ jcschema1.addField(new LogicalSchema.LogicalFieldSchema(
+ "y", null, DataType.INTEGER));
+ LogicalExpressionPlan aprojplan1 = new LogicalExpressionPlan();
+ new ProjectExpression(aprojplan1, DataType.INTEGER, 0, 0);
+ LogicalExpressionPlan bprojplan1 = new LogicalExpressionPlan();
+ new ProjectExpression(bprojplan1, DataType.INTEGER, 1, 0);
+ MultiMap<Integer, LogicalExpressionPlan> mm1 =
+ new MultiMap<Integer, LogicalExpressionPlan>();
+ mm1.put(0, aprojplan1);
+ mm1.put(1, bprojplan1);
+ LOJoin C1 = new LOJoin(lp, mm1, JOINTYPE.HASH, new boolean[] {true, true});
+ C1.neverUseForRealSetSchema(jcschema1);
+ lp.add(new LogicalRelationalOperator[] {A1, B1}, C1, null);
+
+ // Test different inner status
+ // A = load
+ LogicalSchema jaschema3 = new LogicalSchema();
+ jaschema3.addField(new LogicalSchema.LogicalFieldSchema(
+ "x", null, DataType.INTEGER));
+ LOLoad A3 = new LOLoad(new FileSpec("/abc",
+ new FuncSpec("/fooload", new String[] {"x", "y"})), jaschema3, lp);
+ lp.add(A3);
+
+ // B = load
+ LogicalSchema jbschema3 = new LogicalSchema();
+ jbschema3.addField(new LogicalSchema.LogicalFieldSchema(
+ "y", null, DataType.INTEGER));
+ LOLoad B3 = new LOLoad(new FileSpec("/def",
+ new FuncSpec("PigStorage", "\t")), jbschema3, lp);
+ lp.add(B3);
+
+ // C = join
+ LogicalSchema jcschema3 = new LogicalSchema();
+ jcschema3.addField(new LogicalSchema.LogicalFieldSchema(
+ "x", null, DataType.INTEGER));
+ jcschema3.addField(new LogicalSchema.LogicalFieldSchema(
+ "y", null, DataType.INTEGER));
+ LogicalExpressionPlan aprojplan3 = new LogicalExpressionPlan();
+ new ProjectExpression(aprojplan3, DataType.INTEGER, 0, 0);
+ LogicalExpressionPlan bprojplan3 = new LogicalExpressionPlan();
+ new ProjectExpression(bprojplan3, DataType.INTEGER, 1, 0);
+ MultiMap<Integer, LogicalExpressionPlan> mm3 =
+ new MultiMap<Integer, LogicalExpressionPlan>();
+ mm3.put(0, aprojplan3);
+ mm3.put(1, bprojplan3);
+ LOJoin C3 = new LOJoin(lp, mm3, JOINTYPE.HASH, new boolean[] {true, false});
+ C3.neverUseForRealSetSchema(jcschema3);
+ lp.add(new LogicalRelationalOperator[] {A3, B3}, C3, null);
+
+ assertFalse(C1.isEqual(C3));
+ }
+
+ @Test
+ public void testJoinDifferentNumInputs() throws IOException {
+ LogicalPlan lp = new LogicalPlan();
+ LogicalSchema jaschema1 = new LogicalSchema();
+ jaschema1.addField(new LogicalSchema.LogicalFieldSchema(
+ "x", null, DataType.INTEGER));
+ LOLoad A1 = new LOLoad(new FileSpec("/abc",
+ new FuncSpec("/fooload", new String[] {"x", "y"})), jaschema1, lp);
+ lp.add(A1);
+
+ // B = load
+ LogicalSchema jbschema1 = new LogicalSchema();
+ jbschema1.addField(new LogicalSchema.LogicalFieldSchema(
+ "y", null, DataType.INTEGER));
+ LOLoad B1 = new LOLoad(new FileSpec("/def",
+ new FuncSpec("PigStorage", "\t")), jbschema1, lp);
+ lp.add(B1);
+
+ // C = join
+ LogicalSchema jcschema1 = new LogicalSchema();
+ jcschema1.addField(new LogicalSchema.LogicalFieldSchema(
+ "x", null, DataType.INTEGER));
+ jcschema1.addField(new LogicalSchema.LogicalFieldSchema(
+ "y", null, DataType.INTEGER));
+ LogicalExpressionPlan aprojplan1 = new LogicalExpressionPlan();
+ new ProjectExpression(aprojplan1, DataType.INTEGER, 0, 0);
+ LogicalExpressionPlan bprojplan1 = new LogicalExpressionPlan();
+ new ProjectExpression(bprojplan1, DataType.INTEGER, 1, 0);
+ MultiMap<Integer, LogicalExpressionPlan> mm1 =
+ new MultiMap<Integer, LogicalExpressionPlan>();
+ mm1.put(0, aprojplan1);
+ mm1.put(1, bprojplan1);
+ LOJoin C1 = new LOJoin(lp, mm1, JOINTYPE.HASH, new boolean[] {true, true});
+ C1.neverUseForRealSetSchema(jcschema1);
+ lp.add(new LogicalRelationalOperator[] {A1, B1}, C1, null);
+
+ // A = load
+ LogicalSchema jaschema5 = new LogicalSchema();
+ jaschema5.addField(new LogicalSchema.LogicalFieldSchema(
+ "x", null, DataType.INTEGER));
+ LOLoad A5 = new LOLoad(new FileSpec("/abc",
+ new FuncSpec("/fooload", new String[] {"x", "y"})), jaschema5, lp);
+ lp.add(A5);
+
+ // B = load
+ LogicalSchema jbschema5 = new LogicalSchema();
+ jbschema5.addField(new LogicalSchema.LogicalFieldSchema(
+ "y", null, DataType.INTEGER));
+ LOLoad B5 = new LOLoad(new FileSpec("/def",
+ new FuncSpec("PigStorage", "\t")), jbschema5, lp);
+ lp.add(B5);
+
+ // Beta = load
+ LogicalSchema jbetaschema5 = new LogicalSchema();
+ jbetaschema5.addField(new LogicalSchema.LogicalFieldSchema(
+ "y", null, DataType.INTEGER));
+ LOLoad Beta5 = new LOLoad(new FileSpec("/ghi",
+ new FuncSpec("PigStorage", "\t")), jbetaschema5, lp);
+ lp.add(Beta5);
+
+ // C = join
+ LogicalSchema jcschema5 = new LogicalSchema();
+ jcschema5.addField(new LogicalSchema.LogicalFieldSchema(
+ "x", null, DataType.INTEGER));
+ jcschema5.addField(new LogicalSchema.LogicalFieldSchema(
+ "y", null, DataType.INTEGER));
+ LogicalExpressionPlan aprojplan5 = new LogicalExpressionPlan();
+ new ProjectExpression(aprojplan5, DataType.INTEGER, 0, 0);
+ LogicalExpressionPlan bprojplan5 = new LogicalExpressionPlan();
+ new ProjectExpression(bprojplan5, DataType.INTEGER, 1, 0);
+ LogicalExpressionPlan betaprojplan5 = new LogicalExpressionPlan();
+ new ProjectExpression(betaprojplan5, DataType.INTEGER, 1, 0);
+ MultiMap<Integer, LogicalExpressionPlan> mm5 =
+ new MultiMap<Integer, LogicalExpressionPlan>();
+ mm5.put(0, aprojplan5);
+ mm5.put(1, bprojplan5);
+ mm5.put(2, betaprojplan5);
+ LOJoin C5 = new LOJoin(lp, mm5, JOINTYPE.HASH, new boolean[] {true, true});
+ C5.neverUseForRealSetSchema(jcschema5);
+ lp.add(new LogicalRelationalOperator[] {A5, B5, Beta5}, C5, null);
+
+ assertFalse(C1.isEqual(C5));
+ }
+
+ @Test
+ public void testJoinDifferentJoinKeys() throws IOException {
+ LogicalPlan lp = new LogicalPlan();
+
+ // Test different join keys
+ LogicalSchema jaschema6 = new LogicalSchema();
+ jaschema6.addField(new LogicalSchema.LogicalFieldSchema(
+ "x", null, DataType.INTEGER));
+ LOLoad A6 = new LOLoad(new FileSpec("/abc",
+ new FuncSpec("/fooload", new String[] {"x", "y"})), jaschema6, lp);
+ lp.add(A6);
+
+ // B = load
+ LogicalSchema jbschema6 = new LogicalSchema();
+ jbschema6.addField(new LogicalSchema.LogicalFieldSchema(
+ "y", null, DataType.INTEGER));
+ jbschema6.addField(new LogicalSchema.LogicalFieldSchema(
+ "z", null, DataType.LONG));
+ LOLoad B6 = new LOLoad(new FileSpec("/def",
+ new FuncSpec("PigStorage", "\t")), jbschema6, lp);
+ lp.add(B6);
+
+ // C = join
+ LogicalSchema jcschema6 = new LogicalSchema();
+ jcschema6.addField(new LogicalSchema.LogicalFieldSchema(
+ "x", null, DataType.INTEGER));
+ jcschema6.addField(new LogicalSchema.LogicalFieldSchema(
+ "y", null, DataType.INTEGER));
+ LogicalExpressionPlan aprojplan6 = new LogicalExpressionPlan();
+ new ProjectExpression(aprojplan6, DataType.INTEGER, 0, 0);
+ LogicalExpressionPlan bprojplan6 = new LogicalExpressionPlan();
+ new ProjectExpression(bprojplan6, DataType.INTEGER, 1, 0);
+ LogicalExpressionPlan b2projplan6 = new LogicalExpressionPlan();
+ new ProjectExpression(b2projplan6, DataType.INTEGER, 1, 1);
+ MultiMap<Integer, LogicalExpressionPlan> mm6 =
+ new MultiMap<Integer, LogicalExpressionPlan>();
+ mm6.put(0, aprojplan6);
+ mm6.put(1, bprojplan6);
+ mm6.put(1, b2projplan6);
+ LOJoin C6 = new LOJoin(lp, mm6, JOINTYPE.HASH, new boolean[] {true, true});
+ C6.neverUseForRealSetSchema(jcschema6);
+ lp.add(new LogicalRelationalOperator[] {A6, B6}, C6, null);
+
+ LogicalSchema jaschema7 = new LogicalSchema();
+ jaschema7.addField(new LogicalSchema.LogicalFieldSchema(
+ "x", null, DataType.INTEGER));
+ LOLoad A7 = new LOLoad(new FileSpec("/abc",
+ new FuncSpec("/fooload", new String[] {"x", "y"})), jaschema7, lp);
+ lp.add(A7);
+
+ // B = load
+ LogicalSchema jbschema7 = new LogicalSchema();
+ jbschema7.addField(new LogicalSchema.LogicalFieldSchema(
+ "y", null, DataType.INTEGER));
+ jbschema7.addField(new LogicalSchema.LogicalFieldSchema(
+ "z", null, DataType.LONG));
+ LOLoad B7 = new LOLoad(new FileSpec("/def",
+ new FuncSpec("PigStorage", "\t")), jbschema7, lp);
+ lp.add(B7);
+
+ // C = join
+ LogicalSchema jcschema7 = new LogicalSchema();
+ jcschema7.addField(new LogicalSchema.LogicalFieldSchema(
+ "x", null, DataType.INTEGER));
+ jcschema7.addField(new LogicalSchema.LogicalFieldSchema(
+ "y", null, DataType.INTEGER));
+ LogicalExpressionPlan aprojplan7 = new LogicalExpressionPlan();
+ new ProjectExpression(aprojplan7, DataType.INTEGER, 0, 0);
+ LogicalExpressionPlan bprojplan7 = new LogicalExpressionPlan();
+ new ProjectExpression(bprojplan7, DataType.INTEGER, 1, 1);
+ LogicalExpressionPlan b2projplan7 = new LogicalExpressionPlan();
+ new ProjectExpression(b2projplan7, DataType.INTEGER, 1, 0);
+ MultiMap<Integer, LogicalExpressionPlan> mm7 =
+ new MultiMap<Integer, LogicalExpressionPlan>();
+ mm7.put(0, aprojplan7);
+ mm7.put(1, bprojplan7);
+ mm7.put(1, b2projplan7);
+ LOJoin C7 = new LOJoin(lp, mm7, JOINTYPE.HASH, new boolean[] {true, true});
+ C7.neverUseForRealSetSchema(jcschema7);
+ lp.add(new LogicalRelationalOperator[] {A7, B7}, C7, null);
+
+ assertFalse(C6.isEqual(C7));
+ }
+
+ @Test
+ public void testJoinDifferentNumJoinKeys() throws IOException {
+ LogicalPlan lp = new LogicalPlan();
+
+ // Test different join keys
+ LogicalSchema jaschema6 = new LogicalSchema();
+ jaschema6.addField(new LogicalSchema.LogicalFieldSchema(
+ "x", null, DataType.INTEGER));
+ LOLoad A6 = new LOLoad(new FileSpec("/abc",
+ new FuncSpec("/fooload", new String[] {"x", "y"})), jaschema6, lp);
+ lp.add(A6);
+
+ // B = load
+ LogicalSchema jbschema6 = new LogicalSchema();
+ jbschema6.addField(new LogicalSchema.LogicalFieldSchema(
+ "y", null, DataType.INTEGER));
+ jbschema6.addField(new LogicalSchema.LogicalFieldSchema(
+ "z", null, DataType.LONG));
+ LOLoad B6 = new LOLoad(new FileSpec("/def",
+ new FuncSpec("PigStorage", "\t")), jbschema6, lp);
+ lp.add(B6);
+
+ // C = join
+ LogicalSchema jcschema6 = new LogicalSchema();
+ jcschema6.addField(new LogicalSchema.LogicalFieldSchema(
+ "x", null, DataType.INTEGER));
+ jcschema6.addField(new LogicalSchema.LogicalFieldSchema(
+ "y", null, DataType.INTEGER));
+ LogicalExpressionPlan aprojplan6 = new LogicalExpressionPlan();
+ new ProjectExpression(aprojplan6, DataType.INTEGER, 0, 0);
+ LogicalExpressionPlan bprojplan6 = new LogicalExpressionPlan();
+ new ProjectExpression(bprojplan6, DataType.INTEGER, 1, 0);
+ LogicalExpressionPlan b2projplan6 = new LogicalExpressionPlan();
+ new ProjectExpression(b2projplan6, DataType.INTEGER, 1, 1);
+ MultiMap<Integer, LogicalExpressionPlan> mm6 =
+ new MultiMap<Integer, LogicalExpressionPlan>();
+ mm6.put(0, aprojplan6);
+ mm6.put(1, bprojplan6);
+ mm6.put(1, b2projplan6);
+ LOJoin C6 = new LOJoin(lp, mm6, JOINTYPE.HASH, new boolean[] {true, true});
+ C6.neverUseForRealSetSchema(jcschema6);
+ lp.add(new LogicalRelationalOperator[] {A6, B6}, C6, null);
+
+ // Test different different number of join keys
+ LogicalSchema jaschema8 = new LogicalSchema();
+ jaschema8.addField(new LogicalSchema.LogicalFieldSchema(
+ "x", null, DataType.INTEGER));
+ LOLoad A8 = new LOLoad(new FileSpec("/abc",
+ new FuncSpec("/fooload", new String[] {"x", "y"})), jaschema8, lp);
+ lp.add(A8);
+
+ // B = load
+ LogicalSchema jbschema8 = new LogicalSchema();
+ jbschema8.addField(new LogicalSchema.LogicalFieldSchema(
+ "y", null, DataType.INTEGER));
+ jbschema8.addField(new LogicalSchema.LogicalFieldSchema(
+ "z", null, DataType.LONG));
+ LOLoad B8 = new LOLoad(new FileSpec("/def",
+ new FuncSpec("PigStorage", "\t")), jbschema8, lp);
+ lp.add(B8);
+
+ // C = join
+ LogicalSchema jcschema8 = new LogicalSchema();
+ jcschema8.addField(new LogicalSchema.LogicalFieldSchema(
+ "x", null, DataType.INTEGER));
+ jcschema8.addField(new LogicalSchema.LogicalFieldSchema(
+ "y", null, DataType.INTEGER));
+ LogicalExpressionPlan aprojplan8 = new LogicalExpressionPlan();
+ new ProjectExpression(aprojplan8, DataType.INTEGER, 0, 0);
+ LogicalExpressionPlan bprojplan8 = new LogicalExpressionPlan();
+ new ProjectExpression(bprojplan8, DataType.INTEGER, 1, 0);
+ MultiMap<Integer, LogicalExpressionPlan> mm8 =
+ new MultiMap<Integer, LogicalExpressionPlan>();
+ mm8.put(0, aprojplan8);
+ mm8.put(1, bprojplan8);
+ LOJoin C8 = new LOJoin(lp, mm8, JOINTYPE.HASH, new boolean[] {true, true});
+ C8.neverUseForRealSetSchema(jcschema8);
+ lp.add(new LogicalRelationalOperator[] {A8, B8}, C8, null);
+
+ assertFalse(C6.isEqual(C8));
+ }
+
+ @Test
+ public void testRelationalSameOpDifferentPreds() throws IOException {
+ LogicalPlan lp1 = new LogicalPlan();
+ LogicalSchema aschema1 = new LogicalSchema();
+ aschema1.addField(new LogicalSchema.LogicalFieldSchema(
+ "x", null, DataType.INTEGER));
+ LOLoad A1 = new LOLoad(new FileSpec("/abc",
+ new FuncSpec("/fooload", new String[] {"x", "y"})), aschema1, lp1);
+ lp1.add(A1);
+
+ LogicalExpressionPlan fp1 = new LogicalExpressionPlan();
+ ProjectExpression fy1 = new ProjectExpression(fp1, DataType.INTEGER, 0, 0);
+ ConstantExpression fc1 = new ConstantExpression(fp1, DataType.INTEGER,
+ new Integer(0));
+ new EqualExpression(fp1, fy1, fc1);
+ LOFilter D1 = new LOFilter(lp1, fp1);
+ LogicalSchema cschema = new LogicalSchema();
+ cschema.addField(new LogicalSchema.LogicalFieldSchema(
+ "x", null, DataType.INTEGER));
+ D1.neverUseForRealSetSchema(cschema);
+ lp1.add(A1, D1, (LogicalRelationalOperator)null);
+
+ LogicalPlan lp2 = new LogicalPlan();
+ LOLoad A2 = new LOLoad(new FileSpec("/abc",
+ new FuncSpec("/foo", new String[] {"x", "z"})), null, lp2);
+ lp2.add(A2);
+
+ LogicalExpressionPlan fp2 = new LogicalExpressionPlan();
+ ProjectExpression fy2 = new ProjectExpression(fp2, DataType.INTEGER, 0, 0);
+ ConstantExpression fc2 = new ConstantExpression(fp2, DataType.INTEGER,
+ new Integer(0));
+ new EqualExpression(fp2, fy2, fc2);
+ LOFilter D2 = new LOFilter(lp2, fp2);
+ D2.neverUseForRealSetSchema(cschema);
+ lp2.add(A2, D2, (LogicalRelationalOperator)null);
+
+ assertFalse(D1.isEqual(D2));
+ }
+
}