You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by sm...@apache.org on 2009/07/02 22:56:22 UTC
svn commit: r790735 - in /hadoop/pig/trunk:
src/org/apache/pig/impl/logicalLayer/
src/org/apache/pig/impl/logicalLayer/optimizer/
src/org/apache/pig/impl/plan/ test/org/apache/pig/test/
test/org/apache/pig/test/utils/
Author: sms
Date: Thu Jul 2 20:56:21 2009
New Revision: 790735
URL: http://svn.apache.org/viewvc?rev=790735&view=rev
Log:
PIG-697: Proposed improvements to pig's optimizer
Added:
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PushDownForeachFlatten.java
hadoop/pig/trunk/test/org/apache/pig/test/TestPushDownForeachFlatten.java
Modified:
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOFRJoin.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOForEach.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/ProjectFixerUpper.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java
hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PushUpFilter.java
hadoop/pig/trunk/src/org/apache/pig/impl/plan/OperatorPlan.java
hadoop/pig/trunk/test/org/apache/pig/test/TestPushUpFilter.java
hadoop/pig/trunk/test/org/apache/pig/test/TestRewire.java
hadoop/pig/trunk/test/org/apache/pig/test/utils/LogicalPlanTester.java
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOFRJoin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOFRJoin.java?rev=790735&r1=790734&r2=790735&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOFRJoin.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOFRJoin.java Thu Jul 2 20:56:21 2009
@@ -86,16 +86,20 @@
for (FieldSchema schema : cSchema.getFields()) {
++i;
- if(nonDuplicates.containsKey(schema.alias))
- {
- if(nonDuplicates.get(schema.alias)!=-1) {
- nonDuplicates.remove(schema.alias);
- nonDuplicates.put(schema.alias, -1);
- }
+ FieldSchema newFS = null;
+ if(schema.alias != null) {
+ if(nonDuplicates.containsKey(schema.alias)) {
+ if(nonDuplicates.get(schema.alias)!=-1) {
+ nonDuplicates.remove(schema.alias);
+ nonDuplicates.put(schema.alias, -1);
+ }
+ } else {
+ nonDuplicates.put(schema.alias, i);
}
- else
- nonDuplicates.put(schema.alias, i);
- FieldSchema newFS = new FieldSchema(op.getAlias()+"::"+schema.alias,schema.schema,schema.type);
+ newFS = new FieldSchema(op.getAlias()+"::"+schema.alias,schema.schema,schema.type);
+ } else {
+ newFS = new Schema.FieldSchema(null, DataType.BYTEARRAY);
+ }
newFS.setParent(schema.canonicalName, op);
fss.add(newFS);
}
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOForEach.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOForEach.java?rev=790735&r1=790734&r2=790735&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOForEach.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOForEach.java Thu Jul 2 20:56:21 2009
@@ -588,43 +588,7 @@
}
}
} else {
- //innerSchema is null; check for schema type
- if(DataType.isSchemaType(leafFS.type)) {
- //flattening a null schema results in a bytearray
- if(mapped) {
- //map each flattened column to the original column
- if (cast != null) {
- mapFields.put(outputColumn++,
- new ProjectionMap.Column(
- new Pair<Integer, Integer>(0, inputColumn), true, cast.getType()
- )
- );
- } else {
- mapFields.put(outputColumn++,
- new ProjectionMap.Column(new Pair<Integer, Integer>(0, inputColumn))
- );
- }
- } else {
- addedFields.add(outputColumn++);
- }
- } else {
- if (cast != null) {
- mapFields.put(outputColumn++,
- new ProjectionMap.Column(
- new Pair<Integer, Integer>(0, inputColumn), true, cast.getType()
- )
- );
- } else {
- mapFields.put(outputColumn++,
- new ProjectionMap.Column(new Pair<Integer, Integer>(0, inputColumn))
- );
- }
- }
- }
- } else {
- //innerSchema is null; check for schema type
- if(DataType.isSchemaType(leafFS.type)) {
- //flattening a null schema results in a bytearray
+ //innerSchema is null
if(mapped) {
//map each flattened column to the original column
if (cast != null) {
@@ -641,8 +605,12 @@
} else {
addedFields.add(outputColumn++);
}
- } else {
- if (cast != null) {
+ }
+ } else {
+ //innerSchema is null
+ if(mapped) {
+ //map each flattened column to the original column
+ if (cast != null) {
mapFields.put(outputColumn++,
new ProjectionMap.Column(
new Pair<Integer, Integer>(0, inputColumn), true, cast.getType()
@@ -653,6 +621,8 @@
new ProjectionMap.Column(new Pair<Integer, Integer>(0, inputColumn))
);
}
+ } else {
+ addedFields.add(outputColumn++);
}
}
} else {
@@ -785,5 +755,24 @@
}
}
}
+
+ /**
+ * A helper method to check if the foreach has a flattened element
+ *
+ * @return true if any of the expressions in the foreach has a flatten;
+ * false otherwise
+ */
+ public Pair<Boolean, List<Integer>> hasFlatten() {
+ boolean hasFlatten = false;
+ List<Integer> flattenedColumns = new ArrayList<Integer>();
+ for (int i = 0; i < mFlatten.size(); ++i) {
+ Boolean b = mFlatten.get(i);
+ if (b.equals(true)) {
+ hasFlatten = true;
+ flattenedColumns.add(i);
+ }
+ }
+ return new Pair<Boolean, List<Integer>>(hasFlatten, flattenedColumns);
+ }
}
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/ProjectFixerUpper.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/ProjectFixerUpper.java?rev=790735&r1=790734&r2=790735&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/ProjectFixerUpper.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/ProjectFixerUpper.java Thu Jul 2 20:56:21 2009
@@ -133,7 +133,7 @@
if (oldNodeMap == null) {
// bail out if the projection map is null
- int errCode = 2146;
+ int errCode = 2156;
String msg = "Error while fixing projections. Projection map of node to be replaced is null.";
throw new VisitorException(msg, errCode,
PigException.BUG);
@@ -148,7 +148,7 @@
.getMappedFields();
if (oldNodeMappedFields == null) {
// there is no mapping available bail out
- int errCode = 2147;
+ int errCode = 2157;
String msg = "Error while fixing projections. No mapping available in old predecessor to replace column.";
throw new VisitorException(msg, errCode,
PigException.BUG);
@@ -160,7 +160,7 @@
if (columns == null) {
// there is no mapping for oldNodeColumn
// it could be an added field; bail out
- int errCode = 2148;
+ int errCode = 2158;
String msg = "Error during fixing projections. No mapping available in old predecessor for column to be replaced.";
throw new VisitorException(msg, errCode,
PigException.BUG);
@@ -179,7 +179,7 @@
}
if (!foundMapping) {
// did not find a mapping - bail out
- int errCode = 2149;
+ int errCode = 2159;
String msg = "Error during fixing projections. Could not locate replacement column from the old predecessor.";
throw new VisitorException(msg, errCode,
PigException.BUG);
@@ -199,7 +199,7 @@
ProjectionMap newNodeMap = mNewNode.getProjectionMap();
if (newNodeMap == null) {
// did not find a mapping - bail out
- int errCode = 2150;
+ int errCode = 2160;
String msg = "Error during fixing projections. Projection map of new predecessor is null.";
throw new VisitorException(msg, errCode,
PigException.BUG);
@@ -214,7 +214,7 @@
.getMappedFields();
if (newNodeMappedFields == null) {
// there is no mapping available bail out
- int errCode = 2151;
+ int errCode = 2161;
String msg = "Error during fixing projections. No mapping available in new predecessor to replace column.";
throw new VisitorException(msg, errCode,
PigException.BUG);
@@ -228,7 +228,7 @@
.get(key);
if (columns == null) {
// should not happen
- int errCode = 2152;
+ int errCode = 2162;
String msg = "Error during fixing projections. Could not locate mapping for column: "
+ key + " in new predecessor.";
throw new VisitorException(msg, errCode,
@@ -255,8 +255,10 @@
}
if (!foundMapping) {
// did not find a mapping - bail out
- int errCode = 2153;
- String msg = "Error during fixing projections. Could not locate replacement column in the new predecessor.";
+ int errCode = 2163;
+ String msg = "Error during fixing projections. Could not locate replacement column for column: "
+ + oldNodeColumn
+ + " in the new predecessor.";
throw new VisitorException(msg, errCode,
PigException.BUG);
}
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java?rev=790735&r1=790734&r2=790735&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/LogicalOptimizer.java Thu Jul 2 20:56:21 2009
@@ -24,6 +24,7 @@
import org.apache.pig.PigException;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.LOFilter;
+import org.apache.pig.impl.logicalLayer.LOForEach;
import org.apache.pig.impl.logicalLayer.LOLimit;
import org.apache.pig.impl.logicalLayer.LOLoad;
import org.apache.pig.impl.logicalLayer.LOPrinter;
@@ -138,6 +139,15 @@
rule = new Rule<LogicalOperator, LogicalPlan>(rulePlan,
new PushUpFilter(plan), "PushUpFilter");
checkAndAddRule(rule);
+
+ // Push foreach with flatten down wherever possible
+ rulePlan = new RulePlan();
+ RuleOperator loForeach = new RuleOperator(LOForEach.class,
+ new OperatorKey(SCOPE, nodeIdGen.getNextNodeId(SCOPE)));
+ rulePlan.add(loForeach);
+ rule = new Rule<LogicalOperator, LogicalPlan>(rulePlan,
+ new PushDownForeachFlatten(plan), "PushDownForeachFlatten");
+ checkAndAddRule(rule);
}
}
Added: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PushDownForeachFlatten.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PushDownForeachFlatten.java?rev=790735&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PushDownForeachFlatten.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PushDownForeachFlatten.java Thu Jul 2 20:56:21 2009
@@ -0,0 +1,407 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.impl.logicalLayer.optimizer;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.pig.impl.logicalLayer.CastFinder;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.LOCross;
+import org.apache.pig.impl.logicalLayer.LOFRJoin;
+import org.apache.pig.impl.logicalLayer.LOForEach;
+import org.apache.pig.impl.logicalLayer.LOProject;
+import org.apache.pig.impl.logicalLayer.LOSort;
+import org.apache.pig.impl.logicalLayer.LogicalOperator;
+import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.impl.logicalLayer.UDFFinder;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.ProjectionMap;
+import org.apache.pig.impl.plan.RequiredFields;
+import org.apache.pig.impl.plan.OperatorPlan.IndexHelper;
+import org.apache.pig.impl.plan.optimizer.OptimizerException;
+import org.apache.pig.PigException;
+import org.apache.pig.impl.util.MultiMap;
+import org.apache.pig.impl.util.Pair;
+
+/**
+ * A visitor to discover if a foreach with flatten(s) can be pushed as low down the tree as
+ * possible.
+ */
+public class PushDownForeachFlatten extends LogicalTransformer {
+
+ // boolean to remember if the foreach has to be swapped
+ private boolean mSwap = false;
+
+ // boolean to remember if the foreach has to be cloned and pushed into one
+ // of the foreach's successor's outputs
+ private boolean mInsertBetween = false;
+
+ // map of flattened column to its new position in the output
+ Map<Integer, Integer> mFlattenedColumnReMap = null;
+
+ public PushDownForeachFlatten(LogicalPlan plan) {
+ super(plan, new DepthFirstWalker<LogicalOperator, LogicalPlan>(plan));
+ }
+
+ /**
+ *
+ * @return true if the foreach has to swapped; false otherwise
+ */
+ public boolean getSwap() {
+ return mSwap;
+ }
+
+ /**
+ *
+ * @return true if the foreach has to be inserted after its successor; false
+ * otherwise
+ */
+ public boolean getInsertBetween() {
+ return mInsertBetween;
+ }
+
+ /**
+ *
+ * @return a map of old column position in the foreach to the column
+ * position in foreach's successor
+ */
+ public Map<Integer, Integer> getFlattenedColumnMap() {
+ return mFlattenedColumnReMap;
+ }
+
+ @Override
+ public boolean check(List<LogicalOperator> nodes) throws OptimizerException {
+ try {
+ LOForEach foreach = (LOForEach) getOperator(nodes);
+
+ Pair<Boolean, List<Integer>> flattenResult = foreach.hasFlatten();
+ boolean flattened = flattenResult.first;
+ List<Integer> flattenedColumns = flattenResult.second;
+ Set<Integer> flattenedColumnSet = (flattenedColumns == null? null: new HashSet<Integer>(flattenedColumns));
+
+ if(!flattened) {
+ return false;
+ }
+
+ if(flattenedColumns == null || flattenedColumns.size() == 0) {
+ return false;
+ }
+
+ ProjectionMap foreachProjectionMap = foreach.getProjectionMap();
+
+ if(foreachProjectionMap == null) {
+ return false;
+ }
+
+ List<Integer> foreachAddedFields = foreachProjectionMap.getAddedFields();
+ if(foreachAddedFields != null) {
+ Set<Integer> foreachAddedFieldsSet = new HashSet<Integer>(foreachAddedFields);
+ flattenedColumnSet.removeAll(foreachAddedFieldsSet);
+ }
+
+ if(flattenedColumnSet.size() == 0) {
+ return false;
+ }
+
+ for(LogicalPlan foreachPlan: foreach.getForEachPlans()) {
+ UDFFinder udfFinder = new UDFFinder(foreachPlan);
+ udfFinder.visit();
+
+ // if any of the foreach's inner plans contain a UDF then return false
+ if (udfFinder.foundAnyUDF()) {
+ return false;
+ }
+
+ CastFinder castFinder = new CastFinder(foreachPlan);
+ castFinder.visit();
+
+ // TODO
+ // if any of the foreach's inner plans contain a cast then return false
+ // in the future the cast should be moved appropriately
+ if (castFinder.foundAnyCast()) {
+ return false;
+ }
+ }
+
+ List<LogicalOperator> successors = (mPlan.getSuccessors(foreach) == null ? null
+ : new ArrayList<LogicalOperator>(mPlan
+ .getSuccessors(foreach)));
+
+ // if the foreach has no successors or more than one successor
+ // return false
+ if (successors == null || successors.size() == 0 || successors.size() > 1) {
+ return false;
+ }
+
+ LogicalOperator successor = successors.get(0);
+
+ List<LogicalOperator> peers = (mPlan.getPredecessors(successor) == null ? null
+ : new ArrayList<LogicalOperator>(mPlan.getPredecessors(successor)));
+
+ // check if any of the foreach's peers is a foreach flatten
+ // if so then this rule does not apply
+ for(LogicalOperator peer: peers) {
+ if(!peer.equals(foreach)) {
+ if(peer instanceof LOForEach) {
+ LOForEach peerForeach = (LOForEach)peer;
+ if(peerForeach.hasFlatten().first) {
+ return false;
+ }
+ }
+ }
+ }
+
+ IndexHelper indexHelper = new IndexHelper(peers);
+ Integer foreachPosition = indexHelper.getIndex(foreach);
+
+ // the foreach with flatten can be swapped with an order by
+ // as the order by will have lesser number of records to sort
+ // also the sort does not alter the records that are processed
+
+ // the foreach with flatten can be pushed down a cross or a join
+ // for the same reason. In this case the foreach has to be first
+ // unflattened and then a new foreach has to be inserted after
+ // the cross or join. In both cross and foreach the actual columns
+ // from the foreach are not altered but positions might be changed
+
+ // in the case of union the column is transformed and as a result
+ // the foreach flatten cannot be pushed down
+
+ // for distinct the output before flattening and the output
+ // after flattening might be different. For example, consider
+ // {(1), (1)}. Distinct of this bag is still {(1), (1)}.
+ // distinct(flatten({(1), (1)})) is (1). However,
+ // flatten(distinct({(1), (1)})) is (1), (1)
+
+ // in both cases correctness is not affected
+ if(successor instanceof LOSort) {
+ LOSort sort = (LOSort) successor;
+ RequiredFields sortRequiredField = sort.getRequiredFields().get(0);
+
+ if(sortRequiredField.getNeedAllFields()) {
+ return false;
+ }
+
+ List<Pair<Integer, Integer>> sortInputs = sortRequiredField.getFields();
+ Set<Integer> requiredInputs = new HashSet<Integer>();
+ for(Pair<Integer, Integer> pair: sortInputs) {
+ requiredInputs.add(pair.second);
+ }
+
+ requiredInputs.retainAll(flattenedColumnSet);
+ // the intersection of the sort's required inputs
+ // and the flattened columns in the foreach should
+ // be null, i.e., the size of required inputs == 0
+ if(requiredInputs.size() != 0) {
+ return false;
+ }
+
+ mSwap = true;
+ return true;
+ } else if (successor instanceof LOCross
+ || successor instanceof LOFRJoin) {
+
+ List<LogicalOperator> children = mPlan.getSuccessors(successor);
+
+ if(children == null || children.size() > 1) {
+ return false;
+ }
+
+ ProjectionMap succProjectionMap = successor.getProjectionMap();
+
+ if(succProjectionMap == null) {
+ return false;
+ }
+
+ MultiMap<Integer, ProjectionMap.Column> mappedFields = succProjectionMap.getMappedFields();
+
+ if(mappedFields == null) {
+ return false;
+ }
+
+ if(mFlattenedColumnReMap == null) {
+ mFlattenedColumnReMap = new HashMap<Integer, Integer>();
+ }
+
+ // initialize the map
+ for(Integer key: flattenedColumnSet) {
+ mFlattenedColumnReMap.put(key, Integer.MAX_VALUE);
+ }
+
+ // for each output column find the corresponding input that matches the foreach's position
+ // for each input column in the foreach check if the output column is a mapping of the flattened column
+ // due to flattenning multiple output columns could be generated from the same input column
+ // find the first or the lowest column that is a result of the
+ for(Integer key: mappedFields.keySet()) {
+ List<ProjectionMap.Column> columns = (List<ProjectionMap.Column>)mappedFields.get(key);
+ for(ProjectionMap.Column column: columns) {
+ Pair<Integer, Integer> inputColumn = column.getInputColumn();
+
+ // check if the input column number is the same as the
+ // position of foreach in the list of predecessors
+ if(foreachPosition.equals(inputColumn.first)) {
+ if(flattenedColumnSet.contains(inputColumn.second)) {
+ // check if the output column, i.e., key is the
+ // least column number seen till date
+ if(key < mFlattenedColumnReMap.get(inputColumn.second)) {
+ mFlattenedColumnReMap.put(inputColumn.second, key);
+ }
+ }
+ }
+ }
+ }
+
+ // check if any of the flattened columns is not remapped
+ for(Integer key: mFlattenedColumnReMap.keySet()) {
+ if(mFlattenedColumnReMap.get(key).equals(Integer.MAX_VALUE)) {
+ return false;
+ }
+ }
+
+ mInsertBetween = true;
+ return true;
+ }
+
+ return false;
+
+ } catch (OptimizerException oe) {
+ throw oe;
+ } catch (Exception e) {
+ int errCode = 2152;
+ String msg = "Internal error while trying to check if foreach with flatten can be pushed down.";
+ throw new OptimizerException(msg, errCode, PigException.BUG, e);
+ }
+ }
+
+ private LogicalOperator getOperator(List<LogicalOperator> nodes)
+ throws FrontendException {
+ if ((nodes == null) || (nodes.size() <= 0)) {
+ int errCode = 2052;
+ String msg = "Internal error. Cannot retrieve operator from null or empty list.";
+ throw new OptimizerException(msg, errCode, PigException.BUG);
+ }
+
+ LogicalOperator lo = nodes.get(0);
+ if (lo == null || !(lo instanceof LOForEach)) {
+ // we should never be called with any other operator class name
+ int errCode = 2005;
+ String msg = "Expected " + LOForEach.class.getSimpleName()
+ + ", got "
+ + (lo == null ? lo : lo.getClass().getSimpleName());
+ throw new OptimizerException(msg, errCode, PigException.INPUT);
+ } else {
+ return lo;
+ }
+
+ }
+
+ @Override
+ public void transform(List<LogicalOperator> nodes)
+ throws OptimizerException {
+ try {
+ LOForEach foreach = (LOForEach) getOperator(nodes);
+ LogicalOperator successor = mPlan.getSuccessors(foreach).get(0);
+ if (mSwap) {
+ mPlan.swap(successor, foreach);
+ } else if (mInsertBetween) {
+ // mark the flattened columns as not flattened in the foreach
+ // create a new foreach operator that projects each column of the
+ // successor. Mark the remapped flattened columns as flattened
+ // in the new foreach operator
+
+ if(mFlattenedColumnReMap == null) {
+ int errCode = 2153;
+ String msg = "Internal error. The mapping for the flattened columns is empty";
+ throw new OptimizerException(msg, errCode, PigException.BUG);
+ }
+
+ // set flatten to false for all columns in the mapping
+
+ ArrayList<Boolean> flattenList = (ArrayList<Boolean>)foreach.getFlatten();
+ for(Integer key: mFlattenedColumnReMap.keySet()) {
+ flattenList.set(key, false);
+ }
+
+ // rebuild schemas of the foreach and the successor after the foreach modification
+ foreach.regenerateSchema();
+ successor.regenerateSchema();
+
+ Schema successorSchema = successor.getSchema();
+
+ if(successorSchema == null) {
+ int errCode = 2154;
+ String msg = "Internal error. Schema of successor cannot be null for pushing down foreach with flatten.";
+ throw new OptimizerException(msg, errCode, PigException.BUG);
+ }
+
+ flattenList = new ArrayList<Boolean>();
+
+ ArrayList<LogicalPlan> foreachInnerPlans = new ArrayList<LogicalPlan>();
+
+ for(int i = 0; i < successorSchema.size(); ++i) {
+ LogicalPlan innerPlan = new LogicalPlan();
+ LOProject project = new LOProject(innerPlan, OperatorKey
+ .genOpKey(foreach.getOperatorKey().scope),
+ successor, i);
+ innerPlan.add(project);
+ foreachInnerPlans.add(innerPlan);
+ flattenList.add(false);
+ }
+
+ // set the flattened remapped column to true
+ for(Integer key: mFlattenedColumnReMap.keySet()) {
+ Integer value = mFlattenedColumnReMap.get(key);
+ flattenList.set(value, true);
+ }
+
+
+ LOForEach newForeach = new LOForEach(mPlan, OperatorKey
+ .genOpKey(foreach.getOperatorKey().scope), foreachInnerPlans,
+ flattenList);
+
+ // add the new foreach to the plan
+ mPlan.add(newForeach);
+
+ // insert the new foreach between the successor and the successor's successor
+ mPlan.insertBetween(successor, newForeach, mPlan.getSuccessors(successor).get(0));
+ }
+ } catch (OptimizerException oe) {
+ throw oe;
+ } catch (Exception e) {
+ int errCode = 2155;
+ String msg = "Internal error while pushing foreach with flatten down.";
+ throw new OptimizerException(msg, errCode, PigException.BUG, e);
+ }
+ }
+
+ @Override
+ public void reset() {
+ mInsertBetween = false;
+ mSwap = false;
+ mFlattenedColumnReMap = null;
+ }
+
+}
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PushUpFilter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PushUpFilter.java?rev=790735&r1=790734&r2=790735&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PushUpFilter.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/optimizer/PushUpFilter.java Thu Jul 2 20:56:21 2009
@@ -289,7 +289,7 @@
LogicalOperator lo = nodes.get(0);
if (lo == null || !(lo instanceof LOFilter)) {
// we should never be called with any other operator class name
- int errCode = 1101;
+ int errCode = 2005;
String msg = "Expected " + LOFilter.class.getSimpleName()
+ ", got "
+ (lo == null ? lo : lo.getClass().getSimpleName());
Modified: hadoop/pig/trunk/src/org/apache/pig/impl/plan/OperatorPlan.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/plan/OperatorPlan.java?rev=790735&r1=790734&r2=790735&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/plan/OperatorPlan.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/plan/OperatorPlan.java Thu Jul 2 20:56:21 2009
@@ -1381,7 +1381,7 @@
/*
* A helper class that computes the index of each reference in a list for a quick lookup
*/
- class IndexHelper <E> {
+ public static class IndexHelper <E> {
private Map<E, Integer> mIndex = null;
Added: hadoop/pig/trunk/test/org/apache/pig/test/TestPushDownForeachFlatten.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestPushDownForeachFlatten.java?rev=790735&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestPushDownForeachFlatten.java (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestPushDownForeachFlatten.java Thu Jul 2 20:56:21 2009
@@ -0,0 +1,929 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.FilterFunc;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.logicalLayer.*;
+import org.apache.pig.impl.logicalLayer.optimizer.*;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.test.utils.Identity;
+import org.apache.pig.test.utils.LogicalPlanTester;
+import org.apache.pig.impl.plan.optimizer.OptimizerException;
+
+import org.junit.Test;
+import org.junit.Before;
+
+/**
+ * Test the logical optimizer.
+ */
+
+public class TestPushDownForeachFlatten extends junit.framework.TestCase {
+
+ final String FILE_BASE_LOCATION = "test/org/apache/pig/test/data/DotFiles/" ;
+ static final int MAX_SIZE = 100000;
+
+ private final Log log = LogFactory.getLog(getClass());
+ LogicalPlanTester planTester = new LogicalPlanTester() ;
+
+
+ private static final String simpleEchoStreamingCommand;
+ static {
+ if (System.getProperty("os.name").toUpperCase().startsWith("WINDOWS"))
+ simpleEchoStreamingCommand = "perl -ne 'print \\\"$_\\\"'";
+ else
+ simpleEchoStreamingCommand = "perl -ne 'print \"$_\"'";
+ }
+
+
+ @Before
+ public void tearDown() {
+ planTester.reset();
+ }
+
+ /**
+ *
+ * A simple filter UDF for testing
+ *
+ */
+ static public class MyFilterFunc extends FilterFunc {
+
+ @Override
+ public Boolean exec(Tuple input) {
+ return false;
+ }
+ }
+
+ @Test
+ //Test to ensure that the right exception is thrown when the input list is empty
+ public void testErrorEmptyInput() throws Exception {
+ LogicalPlan lp = new LogicalPlan();
+ PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
+ try {
+ pushDownForeach.check(lp.getRoots());
+ fail("Exception Expected!");
+ } catch(Exception e) {
+ assertTrue(((OptimizerException)e).getErrorCode() == 2052);
+ }
+ }
+
+ @Test
+ //Test to ensure that the right exception is thrown when the input list is empty
+ public void testErrorNonForeachInput() throws Exception {
+ LogicalPlan lp = planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");;
+ PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
+ try {
+ pushDownForeach.check(lp.getRoots());
+ fail("Exception Expected!");
+ } catch(Exception e) {
+ assertTrue(((OptimizerException)e).getErrorCode() == 2005);
+ }
+ }
+
+ @Test
+ public void testForeachNoFlatten() throws Exception {
+ planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
+ LogicalPlan lp = planTester.buildPlan("B = foreach A generate $1;");
+
+ planTester.setPlan(lp);
+ planTester.setProjectionMap(lp);
+
+ PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
+
+ assertTrue(!pushDownForeach.check(lp.getLeaves()));
+ assertTrue(pushDownForeach.getSwap() == false);
+ assertTrue(pushDownForeach.getInsertBetween() == false);
+ assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
+
+ }
+
+ @Test
+ public void testForeachNoSuccessors() throws Exception {
+ planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
+ LogicalPlan lp = planTester.buildPlan("B = foreach A generate flatten($1);");
+
+ planTester.setPlan(lp);
+ planTester.setProjectionMap(lp);
+
+ PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
+
+ assertTrue(!pushDownForeach.check(lp.getLeaves()));
+ assertTrue(pushDownForeach.getSwap() == false);
+ assertTrue(pushDownForeach.getInsertBetween() == false);
+ assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
+
+ }
+
+ @Test
+ public void testForeachStreaming() throws Exception {
+ planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
+ planTester.buildPlan("B = foreach A generate flatten($1);");
+ LogicalPlan lp = planTester.buildPlan("C = stream B through `" + simpleEchoStreamingCommand + "`;");
+
+ planTester.setPlan(lp);
+ planTester.setProjectionMap(lp);
+
+ PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
+
+ LOLoad load = (LOLoad) lp.getRoots().get(0);
+
+ assertTrue(!pushDownForeach.check(lp.getSuccessors(load)));
+ assertTrue(pushDownForeach.getSwap() == false);
+ assertTrue(pushDownForeach.getInsertBetween() == false);
+ assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
+
+ }
+
+ @Test
+ public void testForeachDistinct() throws Exception {
+ planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
+ planTester.buildPlan("B = foreach A generate flatten($1);");
+ LogicalPlan lp = planTester.buildPlan("C = distinct B;");
+
+ planTester.setPlan(lp);
+ planTester.setProjectionMap(lp);
+
+ PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
+
+ LOLoad load = (LOLoad) lp.getRoots().get(0);
+
+ assertTrue(!pushDownForeach.check(lp.getSuccessors(load)));
+ assertTrue(pushDownForeach.getSwap() == false);
+ assertTrue(pushDownForeach.getInsertBetween() == false);
+ assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
+
+ }
+
+ @Test
+ public void testForeachForeach() throws Exception {
+ planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
+ planTester.buildPlan("B = foreach A generate $0, $1, flatten(1);");
+ LogicalPlan lp = planTester.buildPlan("C = foreach B generate $0;");
+
+ planTester.setPlan(lp);
+ planTester.setProjectionMap(lp);
+
+ PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
+
+ LOLoad load = (LOLoad) lp.getRoots().get(0);
+
+ assertTrue(!pushDownForeach.check(lp.getSuccessors(load)));
+ assertTrue(pushDownForeach.getSwap() == false);
+ assertTrue(pushDownForeach.getInsertBetween() == false);
+ assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
+ }
+
+
+ @Test
+ public void testForeachFilter() throws Exception {
+ planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
+ planTester.buildPlan("B = foreach A generate $0, $1, flatten($2);");
+ LogicalPlan lp = planTester.buildPlan("C = filter B by $1 < 18;");
+
+ planTester.setPlan(lp);
+ planTester.setProjectionMap(lp);
+
+ PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
+
+ LOLoad load = (LOLoad) lp.getRoots().get(0);
+
+ assertTrue(!pushDownForeach.check(lp.getSuccessors(load)));
+ assertTrue(pushDownForeach.getSwap() == false);
+ assertTrue(pushDownForeach.getInsertBetween() == false);
+ assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
+
+ }
+
+ @Test
+ public void testForeachSplitOutput() throws Exception {
+ planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
+ planTester.buildPlan("B = foreach A generate $0, $1, flatten($2);");
+ LogicalPlan lp = planTester.buildPlan("split B into C if $1 < 18, D if $1 >= 18;");
+
+ planTester.setPlan(lp);
+ planTester.setProjectionMap(lp);
+
+ PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
+
+ LOLoad load = (LOLoad) lp.getRoots().get(0);
+
+ assertTrue(!pushDownForeach.check(lp.getSuccessors(load)));
+ assertTrue(pushDownForeach.getSwap() == false);
+ assertTrue(pushDownForeach.getInsertBetween() == false);
+ assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
+
+ }
+
+ @Test
+ public void testForeachLimit() throws Exception {
+ planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
+ planTester.buildPlan("B = foreach A generate $0, $1, flatten($2);");
+ LogicalPlan lp = planTester.buildPlan("B = limit B 10;");
+
+ planTester.setPlan(lp);
+ planTester.setProjectionMap(lp);
+
+ PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
+
+ LOLoad load = (LOLoad) lp.getRoots().get(0);
+
+ assertTrue(!pushDownForeach.check(lp.getSuccessors(load)));
+ assertTrue(pushDownForeach.getSwap() == false);
+ assertTrue(pushDownForeach.getInsertBetween() == false);
+ assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
+ }
+
+ @Test
+ public void testForeachUnion() throws Exception {
+ planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
+ planTester.buildPlan("B = foreach A generate $0, $1, flatten($2);");
+ planTester.buildPlan("C = load 'anotherfile' as (name, age, preference);");
+ LogicalPlan lp = planTester.buildPlan("D = union B, C;");
+
+ planTester.setPlan(lp);
+ planTester.setProjectionMap(lp);
+
+ PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
+
+ LOLoad load = (LOLoad) lp.getRoots().get(0);
+
+ assertTrue(!pushDownForeach.check(lp.getSuccessors(load)));
+ assertTrue(pushDownForeach.getSwap() == false);
+ assertTrue(pushDownForeach.getInsertBetween() == false);
+ assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
+ }
+
+ @Test
+ public void testForeachCogroup() throws Exception {
+ planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
+ planTester.buildPlan("B = foreach A generate $0, $1, flatten($2);");
+ planTester.buildPlan("C = load 'anotherfile' as (name, age, preference);");
+ LogicalPlan lp = planTester.buildPlan("D = cogroup B by $0, C by $0;");
+
+ planTester.setPlan(lp);
+ planTester.setProjectionMap(lp);
+
+ PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
+
+ LOLoad load = (LOLoad) lp.getRoots().get(0);
+
+ assertTrue(!pushDownForeach.check(lp.getSuccessors(load)));
+ assertTrue(pushDownForeach.getSwap() == false);
+ assertTrue(pushDownForeach.getInsertBetween() == false);
+ assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
+ }
+
+ @Test
+ public void testForeachGroupBy() throws Exception {
+ planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
+ planTester.buildPlan("B = foreach A generate $0, $1, flatten($2);");
+ LogicalPlan lp = planTester.buildPlan("C = group B by $0;");
+
+ planTester.setPlan(lp);
+ planTester.setProjectionMap(lp);
+
+ PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
+
+ LOLoad load = (LOLoad) lp.getRoots().get(0);
+
+ assertTrue(!pushDownForeach.check(lp.getSuccessors(load)));
+ assertTrue(pushDownForeach.getSwap() == false);
+ assertTrue(pushDownForeach.getInsertBetween() == false);
+ assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
+ }
+
+ @Test
+ public void testForeachSort() throws Exception {
+ planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
+ planTester.buildPlan("B = foreach A generate $0, $1, flatten($2);");
+ LogicalPlan lp = planTester.buildPlan("C = order B by $0, $1;");
+
+ planTester.setPlan(lp);
+ planTester.setProjectionMap(lp);
+
+ PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
+
+ LOSort sort = (LOSort) lp.getLeaves().get(0);
+ LOForEach foreach = (LOForEach)lp.getPredecessors(sort).get(0);
+
+ assertTrue(pushDownForeach.check(lp.getPredecessors(sort)));
+ assertTrue(pushDownForeach.getSwap() == true);
+ assertTrue(pushDownForeach.getInsertBetween() == false);
+ assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
+
+ pushDownForeach.transform(lp.getPredecessors(sort));
+
+ assertEquals(foreach, lp.getLeaves().get(0));
+ assertEquals(sort, lp.getPredecessors(foreach).get(0));
+
+ }
+
+ @Test
+ public void testForeachFlattenAddedColumnSort() throws Exception {
+ planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
+ planTester.buildPlan("B = foreach A generate $0, $1, flatten(1);");
+ LogicalPlan lp = planTester.buildPlan("C = order B by $0, $1;");
+
+ planTester.setPlan(lp);
+ planTester.setProjectionMap(lp);
+
+ PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
+
+ LOSort sort = (LOSort) lp.getLeaves().get(0);
+
+ assertTrue(!pushDownForeach.check(lp.getPredecessors(sort)));
+ assertTrue(pushDownForeach.getSwap() == false);
+ assertTrue(pushDownForeach.getInsertBetween() == false);
+ assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
+
+ }
+
+ @Test
+ public void testForeachUDFSort() throws Exception {
+ planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
+ planTester.buildPlan("B = foreach A generate $0, $1, " + Identity.class.getName() + "($2) ;");
+ LogicalPlan lp = planTester.buildPlan("C = order B by $0, $1;");
+
+ planTester.setPlan(lp);
+ planTester.setProjectionMap(lp);
+
+ PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
+
+ LOSort sort = (LOSort) lp.getLeaves().get(0);
+
+ assertTrue(!pushDownForeach.check(lp.getPredecessors(sort)));
+ assertTrue(pushDownForeach.getSwap() == false);
+ assertTrue(pushDownForeach.getInsertBetween() == false);
+ assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
+
+ }
+
+ @Test
+ public void testForeachCastSort() throws Exception {
+ planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
+ planTester.buildPlan("B = foreach A generate (chararray)$0, $1, flatten($2);");
+ LogicalPlan lp = planTester.buildPlan("C = order B by $0, $1;");
+
+ planTester.setPlan(lp);
+ planTester.setProjectionMap(lp);
+
+ PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
+
+ LOSort sort = (LOSort) lp.getLeaves().get(0);
+
+ assertTrue(!pushDownForeach.check(lp.getPredecessors(sort)));
+ assertTrue(pushDownForeach.getSwap() == false);
+ assertTrue(pushDownForeach.getInsertBetween() == false);
+ assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
+
+ }
+
+ @Test
+ public void testForeachCross() throws Exception {
+ planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
+ planTester.buildPlan("B = foreach A generate $0, $1, flatten($2);");
+ planTester.buildPlan("C = load 'anotherfile' as (name, age, preference);");
+ planTester.buildPlan("D = cross B, C;");
+ LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
+
+ planTester.setPlan(lp);
+ planTester.setProjectionMap(lp);
+ planTester.rebuildSchema(lp);
+
+ PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
+
+ LOLoad load = (LOLoad) lp.getRoots().get(0);
+ LOLimit limit = (LOLimit) lp.getLeaves().get(0);
+ LOCross cross = (LOCross)lp.getPredecessors(limit).get(0);
+ LOForEach foreach = (LOForEach) lp.getPredecessors(cross).get(0);
+
+ Schema limitSchema = limit.getSchema();
+
+ assertTrue(pushDownForeach.check(lp.getSuccessors(load)));
+ assertTrue(pushDownForeach.getSwap() == false);
+ assertTrue(pushDownForeach.getInsertBetween() == true);
+ assertTrue(pushDownForeach.getFlattenedColumnMap() != null);
+
+ pushDownForeach.transform(lp.getSuccessors(load));
+
+ planTester.rebuildSchema(lp);
+
+ for(Boolean b: foreach.getFlatten()) {
+ assertEquals(b.booleanValue(), false);
+ }
+
+ LOForEach newForeach = (LOForEach)lp.getSuccessors(cross).get(0);
+
+
+ List<Boolean> newForeachFlatten = newForeach.getFlatten();
+ Map<Integer, Integer> remap = pushDownForeach.getFlattenedColumnMap();
+ for(Integer key: remap.keySet()) {
+ Integer value = remap.get(key);
+ assertEquals(newForeachFlatten.get(value).booleanValue(), true);
+ }
+
+ assertTrue(Schema.equals(limitSchema, limit.getSchema(), false, true));
+
+ }
+
+ @Test
+ public void testForeachCross1() throws Exception {
+ planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
+ planTester.buildPlan("B = load 'anotherfile' as (name, age, preference:(course_name, instructor));");
+ planTester.buildPlan("C = foreach B generate $0, $1, flatten($2);");
+ planTester.buildPlan("D = cross A, C;");
+ LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
+
+ planTester.setPlan(lp);
+ planTester.setProjectionMap(lp);
+ planTester.rebuildSchema(lp);
+
+ PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
+
+ LOLoad load = (LOLoad) lp.getRoots().get(1);
+ LOLimit limit = (LOLimit) lp.getLeaves().get(0);
+ LOCross cross = (LOCross)lp.getPredecessors(limit).get(0);
+ LOForEach foreach = (LOForEach) lp.getPredecessors(cross).get(1);
+
+ Schema limitSchema = limit.getSchema();
+
+ assertTrue(pushDownForeach.check(lp.getSuccessors(load)));
+ assertTrue(pushDownForeach.getSwap() == false);
+ assertTrue(pushDownForeach.getInsertBetween() == true);
+ assertTrue(pushDownForeach.getFlattenedColumnMap() != null);
+
+ pushDownForeach.transform(lp.getSuccessors(load));
+
+ planTester.rebuildSchema(lp);
+
+ for(Boolean b: foreach.getFlatten()) {
+ assertEquals(b.booleanValue(), false);
+ }
+
+ LOForEach newForeach = (LOForEach)lp.getSuccessors(cross).get(0);
+
+
+ List<Boolean> newForeachFlatten = newForeach.getFlatten();
+ Map<Integer, Integer> remap = pushDownForeach.getFlattenedColumnMap();
+ for(Integer key: remap.keySet()) {
+ Integer value = remap.get(key);
+ assertEquals(newForeachFlatten.get(value).booleanValue(), true);
+ }
+
+ assertTrue(Schema.equals(limitSchema, limit.getSchema(), false, true));
+
+ }
+
+ // TODO
+ // The following test case testForeachCross2 has multiple foreach flatten
+ // A new rule should optimize this case
+ @Test
+ public void testForeachCross2() throws Exception {
+ planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
+ planTester.buildPlan("B = foreach A generate $0, $1, flatten($2);");
+ planTester.buildPlan("C = load 'anotherfile' as (name, age, preference:(course_name, instructor));");
+ planTester.buildPlan("D = foreach C generate $0, $1, flatten($2);");
+ planTester.buildPlan("E = cross B, D;");
+ LogicalPlan lp = planTester.buildPlan("F = limit E 10;");
+
+ planTester.setPlan(lp);
+ planTester.setProjectionMap(lp);
+ planTester.rebuildSchema(lp);
+
+ PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
+
+ LOLoad loada = (LOLoad) lp.getRoots().get(0);
+
+ assertTrue(!pushDownForeach.check(lp.getSuccessors(loada)));
+ assertTrue(pushDownForeach.getSwap() == false);
+ assertTrue(pushDownForeach.getInsertBetween() == false);
+ assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
+
+ }
+
+ @Test
+ public void testForeachFlattenAddedColumnCross() throws Exception {
+ planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
+ planTester.buildPlan("B = foreach A generate $0, $1, flatten(1);");
+ planTester.buildPlan("C = load 'anotherfile' as (name, age, preference:(course_name, instructor));");
+ planTester.buildPlan("D = cross B, C;");
+ LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
+
+ planTester.setPlan(lp);
+ planTester.setProjectionMap(lp);
+ planTester.rebuildSchema(lp);
+
+ PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
+
+ LOLoad loada = (LOLoad) lp.getRoots().get(0);
+
+ assertTrue(!pushDownForeach.check(lp.getSuccessors(loada)));
+ assertTrue(pushDownForeach.getSwap() == false);
+ assertTrue(pushDownForeach.getInsertBetween() == false);
+ assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
+
+ }
+
+ @Test
+ public void testForeachUDFCross() throws Exception {
+ planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
+ planTester.buildPlan("B = foreach A generate $0, flatten($1), " + Identity.class.getName() + "($2) ;");
+ planTester.buildPlan("C = load 'anotherfile' as (name, age, preference:(course_name, instructor));");
+ planTester.buildPlan("D = cross B, C;");
+ LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
+
+ planTester.setPlan(lp);
+ planTester.setProjectionMap(lp);
+ planTester.rebuildSchema(lp);
+
+ PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
+
+ LOLoad loada = (LOLoad) lp.getRoots().get(0);
+
+ assertTrue(!pushDownForeach.check(lp.getSuccessors(loada)));
+ assertTrue(pushDownForeach.getSwap() == false);
+ assertTrue(pushDownForeach.getInsertBetween() == false);
+ assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
+
+ }
+
+ @Test
+ public void testForeachCastCross() throws Exception {
+ planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
+ planTester.buildPlan("B = foreach A generate $0, (int)$1, $2;");
+ planTester.buildPlan("C = load 'anotherfile' as (name, age, preference:(course_name, instructor));");
+ planTester.buildPlan("D = cross B, C;");
+ LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
+
+ planTester.setPlan(lp);
+ planTester.setProjectionMap(lp);
+ planTester.rebuildSchema(lp);
+
+ PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
+
+ LOLoad loada = (LOLoad) lp.getRoots().get(0);
+
+ assertTrue(!pushDownForeach.check(lp.getSuccessors(loada)));
+ assertTrue(pushDownForeach.getSwap() == false);
+ assertTrue(pushDownForeach.getInsertBetween() == false);
+ assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
+
+ }
+
+ @Test
+ public void testForeachFRJoin() throws Exception {
+ planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
+ planTester.buildPlan("B = foreach A generate $0, $1, flatten($2);");
+ planTester.buildPlan("C = load 'anotherfile' as (name, age, preference);");
+ planTester.buildPlan("D = join B by $0, C by $0 using \"replicated\";");
+ LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
+
+ planTester.setPlan(lp);
+ planTester.setProjectionMap(lp);
+ planTester.rebuildSchema(lp);
+
+ PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
+
+ LOLoad load = (LOLoad) lp.getRoots().get(0);
+ LOLimit limit = (LOLimit) lp.getLeaves().get(0);
+ LOFRJoin frjoin = (LOFRJoin)lp.getPredecessors(limit).get(0);
+ LOForEach foreach = (LOForEach) lp.getPredecessors(frjoin).get(0);
+
+ Schema limitSchema = limit.getSchema();
+
+ assertTrue(pushDownForeach.check(lp.getSuccessors(load)));
+ assertTrue(pushDownForeach.getSwap() == false);
+ assertTrue(pushDownForeach.getInsertBetween() == true);
+ assertTrue(pushDownForeach.getFlattenedColumnMap() != null);
+
+ pushDownForeach.transform(lp.getSuccessors(load));
+
+ planTester.rebuildSchema(lp);
+
+ for(Boolean b: foreach.getFlatten()) {
+ assertEquals(b.booleanValue(), false);
+ }
+
+ LOForEach newForeach = (LOForEach)lp.getSuccessors(frjoin).get(0);
+
+
+ List<Boolean> newForeachFlatten = newForeach.getFlatten();
+ Map<Integer, Integer> remap = pushDownForeach.getFlattenedColumnMap();
+ for(Integer key: remap.keySet()) {
+ Integer value = remap.get(key);
+ assertEquals(newForeachFlatten.get(value).booleanValue(), true);
+ }
+
+ assertTrue(Schema.equals(limitSchema, limit.getSchema(), false, true));
+
+ }
+
+
+ @Test
+ public void testForeachFRJoin1() throws Exception {
+ planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
+ planTester.buildPlan("B = load 'anotherfile' as (name, age, preference:(course_name, instructor));");
+ planTester.buildPlan("C = foreach B generate $0, $1, flatten($2);");
+ planTester.buildPlan("D = join A by $0, C by $0 using \"replicated\";");
+ LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
+
+ planTester.setPlan(lp);
+ planTester.setProjectionMap(lp);
+ planTester.rebuildSchema(lp);
+
+ PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
+
+ LOLoad load = (LOLoad) lp.getRoots().get(1);
+ LOLimit limit = (LOLimit) lp.getLeaves().get(0);
+ LOFRJoin frjoin = (LOFRJoin)lp.getPredecessors(limit).get(0);
+ LOForEach foreach = (LOForEach) lp.getPredecessors(frjoin).get(1);
+
+ Schema limitSchema = limit.getSchema();
+
+ assertTrue(pushDownForeach.check(lp.getSuccessors(load)));
+ assertTrue(pushDownForeach.getSwap() == false);
+ assertTrue(pushDownForeach.getInsertBetween() == true);
+ assertTrue(pushDownForeach.getFlattenedColumnMap() != null);
+
+ pushDownForeach.transform(lp.getSuccessors(load));
+
+ planTester.rebuildSchema(lp);
+
+ for(Boolean b: foreach.getFlatten()) {
+ assertEquals(b.booleanValue(), false);
+ }
+
+ LOForEach newForeach = (LOForEach)lp.getSuccessors(frjoin).get(0);
+
+
+ List<Boolean> newForeachFlatten = newForeach.getFlatten();
+ Map<Integer, Integer> remap = pushDownForeach.getFlattenedColumnMap();
+ for(Integer key: remap.keySet()) {
+ Integer value = remap.get(key);
+ assertEquals(newForeachFlatten.get(value).booleanValue(), true);
+ }
+
+ assertTrue(Schema.equals(limitSchema, limit.getSchema(), false, true));
+
+ }
+
+ // TODO
+ // The following test case testForeachFRJoin2 has multiple foreach flatten
+ // A new rule should optimize this case
+ @Test
+ public void testForeachFRJoin2() throws Exception {
+ planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
+ planTester.buildPlan("B = foreach A generate $0, $1, flatten($2);");
+ planTester.buildPlan("C = load 'anotherfile' as (name, age, preference:(course_name, instructor));");
+ planTester.buildPlan("D = foreach C generate $0, $1, flatten($2);");
+ planTester.buildPlan("E = join B by $0, D by $0 using \"replicated\";");
+ LogicalPlan lp = planTester.buildPlan("F = limit E 10;");
+
+ planTester.setPlan(lp);
+ planTester.setProjectionMap(lp);
+ planTester.rebuildSchema(lp);
+
+ PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
+
+ LOLoad loada = (LOLoad) lp.getRoots().get(0);
+
+ assertTrue(!pushDownForeach.check(lp.getSuccessors(loada)));
+ assertTrue(pushDownForeach.getSwap() == false);
+ assertTrue(pushDownForeach.getInsertBetween() == false);
+ assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
+
+ }
+
+ @Test
+ public void testForeachFlattenAddedColumnFRJoin() throws Exception {
+ planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
+ planTester.buildPlan("B = foreach A generate $0, $1, flatten(1);");
+ planTester.buildPlan("C = load 'anotherfile' as (name, age, preference:(course_name, instructor));");
+ planTester.buildPlan("D = join B by $0, C by $0 using \"replicated\";");
+ LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
+
+ planTester.setPlan(lp);
+ planTester.setProjectionMap(lp);
+ planTester.rebuildSchema(lp);
+
+ PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
+
+ LOLoad loada = (LOLoad) lp.getRoots().get(0);
+
+ assertTrue(!pushDownForeach.check(lp.getSuccessors(loada)));
+ assertTrue(pushDownForeach.getSwap() == false);
+ assertTrue(pushDownForeach.getInsertBetween() == false);
+ assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
+
+ }
+
+ @Test
+ public void testForeachUDFFRJoin() throws Exception {
+ planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
+ planTester.buildPlan("B = foreach A generate $0, flatten($1), " + Identity.class.getName() + "($2) ;");
+ planTester.buildPlan("C = load 'anotherfile' as (name, age, preference:(course_name, instructor));");
+ planTester.buildPlan("D = join B by $0, C by $0 using \"replicated\";");
+ LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
+
+ planTester.setPlan(lp);
+ planTester.setProjectionMap(lp);
+ planTester.rebuildSchema(lp);
+
+ PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
+
+ LOLoad loada = (LOLoad) lp.getRoots().get(0);
+
+ assertTrue(!pushDownForeach.check(lp.getSuccessors(loada)));
+ assertTrue(pushDownForeach.getSwap() == false);
+ assertTrue(pushDownForeach.getInsertBetween() == false);
+ assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
+
+ }
+
+ @Test
+ public void testForeachCastFRJoin() throws Exception {
+ planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
+ planTester.buildPlan("B = foreach A generate $0, (int)$1, flatten($2);");
+ planTester.buildPlan("C = load 'anotherfile' as (name, age, preference:(course_name, instructor));");
+ planTester.buildPlan("D = join B by $0, C by $0 using \"replicated\";");
+ LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
+
+ planTester.setPlan(lp);
+ planTester.setProjectionMap(lp);
+ planTester.rebuildSchema(lp);
+
+ PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
+
+ LOLoad loada = (LOLoad) lp.getRoots().get(0);
+
+ assertTrue(!pushDownForeach.check(lp.getSuccessors(loada)));
+ assertTrue(pushDownForeach.getSwap() == false);
+ assertTrue(pushDownForeach.getInsertBetween() == false);
+ assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
+
+ }
+
+ @Test
+ public void testForeachInnerJoin() throws Exception {
+ planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
+ planTester.buildPlan("B = foreach A generate $0, $1, flatten($2);");
+ planTester.buildPlan("C = load 'anotherfile' as (name, age, preference:(course_name, instructor));");
+ planTester.buildPlan("D = join B by $0, C by $0;");
+ LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
+
+ planTester.setPlan(lp);
+ planTester.setProjectionMap(lp);
+ planTester.rebuildSchema(lp);
+
+ PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
+
+ LOLoad loada = (LOLoad) lp.getRoots().get(0);
+
+ assertTrue(!pushDownForeach.check(lp.getSuccessors(loada)));
+ assertTrue(pushDownForeach.getSwap() == false);
+ assertTrue(pushDownForeach.getInsertBetween() == false);
+ assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
+ }
+
+ @Test
+ public void testForeachInnerJoin1() throws Exception {
+ planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
+ planTester.buildPlan("B = load 'anotherfile' as (name, age, preference:(course_name, instructor));");
+ planTester.buildPlan("C = foreach B generate $0, $1, flatten($2);");
+ planTester.buildPlan("D = join A by $0, C by $0;");
+ LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
+
+ planTester.setPlan(lp);
+ planTester.setProjectionMap(lp);
+ planTester.rebuildSchema(lp);
+
+ PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
+
+ LOLoad loadb = (LOLoad) lp.getRoots().get(1);
+
+ assertTrue(!pushDownForeach.check(lp.getSuccessors(loadb)));
+ assertTrue(pushDownForeach.getSwap() == false);
+ assertTrue(pushDownForeach.getInsertBetween() == false);
+ assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
+ }
+
+
+ // TODO
+ // The following test case testForeachInnerJoin2 has multiple foreach flatten
+ // A new rule should optimize this case
+ @Test
+ public void testForeachInnerJoin2() throws Exception {
+ planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
+ planTester.buildPlan("B = foreach A generate $0, $1, flatten($2);");
+ planTester.buildPlan("C = load 'anotherfile' as (name, age, preference:(course_name, instructor));");
+ planTester.buildPlan("D = foreach C generate $0, $1, flatten($2);");
+ planTester.buildPlan("E = join B by $0, D by $0;");
+ LogicalPlan lp = planTester.buildPlan("F = limit E 10;");
+
+ planTester.setPlan(lp);
+ planTester.setProjectionMap(lp);
+ planTester.rebuildSchema(lp);
+
+ PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
+
+ LOLoad loada = (LOLoad) lp.getRoots().get(0);
+
+ assertTrue(!pushDownForeach.check(lp.getSuccessors(loada)));
+ assertTrue(pushDownForeach.getSwap() == false);
+ assertTrue(pushDownForeach.getInsertBetween() == false);
+ assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
+
+ }
+
+ @Test
+ public void testForeachFlattenAddedColumnInnerJoin() throws Exception {
+ planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
+ planTester.buildPlan("B = foreach A generate $0, $1, flatten(1);");
+ planTester.buildPlan("C = load 'anotherfile' as (name, age, preference:(course_name, instructor));");
+ planTester.buildPlan("D = join B by $0, C by $0;");
+ LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
+
+ planTester.setPlan(lp);
+ planTester.setProjectionMap(lp);
+ planTester.rebuildSchema(lp);
+
+ PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
+
+ LOLoad loada = (LOLoad) lp.getRoots().get(0);
+
+ assertTrue(!pushDownForeach.check(lp.getSuccessors(loada)));
+ assertTrue(pushDownForeach.getSwap() == false);
+ assertTrue(pushDownForeach.getInsertBetween() == false);
+ assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
+
+ }
+
+ @Test
+ public void testForeachUDFInnerJoin() throws Exception {
+ planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
+ planTester.buildPlan("B = foreach A generate $0, flatten($1), " + Identity.class.getName() + "($2) ;");
+ planTester.buildPlan("C = load 'anotherfile' as (name, age, preference:(course_name, instructor));");
+ planTester.buildPlan("D = join B by $0, C by $0;");
+ LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
+
+ planTester.setPlan(lp);
+ planTester.setProjectionMap(lp);
+ planTester.rebuildSchema(lp);
+
+ PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
+
+ LOLoad loada = (LOLoad) lp.getRoots().get(0);
+
+ assertTrue(!pushDownForeach.check(lp.getSuccessors(loada)));
+ assertTrue(pushDownForeach.getSwap() == false);
+ assertTrue(pushDownForeach.getInsertBetween() == false);
+ assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
+
+ }
+
+ @Test
+ public void testForeachCastInnerJoin() throws Exception {
+ planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
+ planTester.buildPlan("B = foreach A generate $0, (int)$1, flatten($2);");
+ planTester.buildPlan("C = load 'anotherfile' as (name, age, preference:(course_name, instructor));");
+ planTester.buildPlan("D = join B by $0, C by $0;");
+ LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
+
+ planTester.setPlan(lp);
+ planTester.setProjectionMap(lp);
+ planTester.rebuildSchema(lp);
+
+ PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
+
+ LOLoad loada = (LOLoad) lp.getRoots().get(0);
+
+ assertTrue(!pushDownForeach.check(lp.getSuccessors(loada)));
+ assertTrue(pushDownForeach.getSwap() == false);
+ assertTrue(pushDownForeach.getInsertBetween() == false);
+ assertTrue(pushDownForeach.getFlattenedColumnMap() == null);
+
+ }
+
+}
+
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestPushUpFilter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestPushUpFilter.java?rev=790735&r1=790734&r2=790735&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestPushUpFilter.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestPushUpFilter.java Thu Jul 2 20:56:21 2009
@@ -88,7 +88,7 @@
pushUpFilter.check(lp.getRoots());
fail("Exception Expected!");
} catch(Exception e) {
- assertTrue(((OptimizerException)e).getErrorCode() == 1101);
+ assertTrue(((OptimizerException)e).getErrorCode() == 2005);
}
}
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestRewire.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestRewire.java?rev=790735&r1=790734&r2=790735&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestRewire.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestRewire.java Thu Jul 2 20:56:21 2009
@@ -366,7 +366,7 @@
fail("Expected failure.");
} catch (Exception e) {
PigException pe = LogUtils.getPigException(e);
- assertTrue(pe.getErrorCode() == 2146);
+ assertTrue(pe.getErrorCode() == 2156);
}
}
@@ -392,7 +392,7 @@
fail("Expected failure.");
} catch(Exception e) {
PigException pe = LogUtils.getPigException(e);
- assertTrue(pe.getErrorCode() == 2148);
+ assertTrue(pe.getErrorCode() == 2158);
}
}
Modified: hadoop/pig/trunk/test/org/apache/pig/test/utils/LogicalPlanTester.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/utils/LogicalPlanTester.java?rev=790735&r1=790734&r2=790735&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/utils/LogicalPlanTester.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/utils/LogicalPlanTester.java Thu Jul 2 20:56:21 2009
@@ -21,6 +21,8 @@
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.impl.logicalLayer.*;
import org.apache.pig.impl.logicalLayer.optimizer.LogicalOptimizer;
+import org.apache.pig.impl.logicalLayer.optimizer.SchemaCalculator;
+import org.apache.pig.impl.logicalLayer.optimizer.SchemaRemover;
import org.apache.pig.impl.logicalLayer.validators.TypeCheckingValidator;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.plan.PlanValidationException;
@@ -247,6 +249,19 @@
ProjectionMapCalculator pmc = new ProjectionMapCalculator(lp);
pmc.visit();
}
-
-
+
+ public void rebuildProjectionMap(LogicalPlan lp) throws VisitorException {
+ ProjectionMapRemover pmr = new ProjectionMapRemover(lp);
+ pmr.visit();
+ ProjectionMapCalculator pmc = new ProjectionMapCalculator(lp);
+ pmc.visit();
+ }
+
+ public void rebuildSchema(LogicalPlan lp) throws VisitorException {
+ SchemaRemover sr = new SchemaRemover(lp);
+ sr.visit();
+ SchemaCalculator sc = new SchemaCalculator(lp);
+ sc.visit();
+ }
+
}