You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by xu...@apache.org on 2011/05/07 02:15:44 UTC
svn commit: r1100420 [10/19] - in /pig/branches/branch-0.9: ./ src/
src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/
src/org/apache/pig/impl/logicalLayer/
src/org/apache/pig/impl/logicalLayer/optimizer/ src/org/apach...
Modified: pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java?rev=1100420&r1=1100419&r2=1100420&view=diff
==============================================================================
--- pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java (original)
+++ pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/optimizer/PruneColumns.java Sat May 7 00:15:40 2011
@@ -1,829 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.pig.impl.logicalLayer.optimizer;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeSet;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.pig.LoadFunc;
-import org.apache.pig.LoadPushDown;
-import org.apache.pig.PigException;
-import org.apache.pig.LoadPushDown.RequiredField;
-import org.apache.pig.LoadPushDown.RequiredFieldList;
-import org.apache.pig.LoadPushDown.RequiredFieldResponse;
-import org.apache.pig.data.DataType;
-import org.apache.pig.impl.logicalLayer.ColumnPruner;
-import org.apache.pig.impl.logicalLayer.ExpressionOperator;
-import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.logicalLayer.LOCast;
-import org.apache.pig.impl.logicalLayer.LOCogroup;
-import org.apache.pig.impl.logicalLayer.LOCross;
-import org.apache.pig.impl.logicalLayer.LODistinct;
-import org.apache.pig.impl.logicalLayer.LOFilter;
-import org.apache.pig.impl.logicalLayer.LOForEach;
-import org.apache.pig.impl.logicalLayer.LOJoin;
-import org.apache.pig.impl.logicalLayer.LOLoad;
-import org.apache.pig.impl.logicalLayer.LOMapLookup;
-import org.apache.pig.impl.logicalLayer.LOProject;
-import org.apache.pig.impl.logicalLayer.LOSort;
-import org.apache.pig.impl.logicalLayer.LOSplit;
-import org.apache.pig.impl.logicalLayer.LOSplitOutput;
-import org.apache.pig.impl.logicalLayer.LOStore;
-import org.apache.pig.impl.logicalLayer.LOStream;
-import org.apache.pig.impl.logicalLayer.LOUnion;
-import org.apache.pig.impl.logicalLayer.LogicalOperator;
-import org.apache.pig.impl.logicalLayer.LogicalPlan;
-import org.apache.pig.impl.logicalLayer.RelationalOperator;
-import org.apache.pig.impl.logicalLayer.TopLevelProjectFinder;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.impl.plan.MapKeysInfo;
-import org.apache.pig.impl.plan.NodeIdGenerator;
-import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.impl.plan.RequiredFields;
-import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.impl.plan.ProjectionMap.Column;
-import org.apache.pig.impl.plan.optimizer.OptimizerException;
-import org.apache.pig.impl.util.MultiMap;
-import org.apache.pig.impl.util.Pair;
-
-class RequiredInfo {
- List<RequiredFields> requiredFieldsList;
-
- RequiredInfo(List<RequiredFields> requiredFieldsList)
- {
- this.requiredFieldsList = requiredFieldsList;
- }
-}
-
-public class PruneColumns extends LogicalTransformer {
- private boolean safeToPrune = true;
- private static Log log = LogFactory.getLog(PruneColumns.class);
- Map<RelationalOperator, RequiredInfo> cachedRequiredInfo = new HashMap<RelationalOperator, RequiredInfo>();
- private Map<LOLoad, RequiredFields> prunedLoaderColumnsMap = new HashMap<LOLoad, RequiredFields>();
- ColumnPruner pruner;
- public PruneColumns(LogicalPlan plan) {
- super(plan);
- pruner = new ColumnPruner(plan);
- }
-
- @Override
- public boolean check(List<LogicalOperator> nodes) throws OptimizerException {
- if((nodes == null) || (nodes.size() <= 0)) {
- int errCode = 2177;
- String msg = "Cannot retrieve operator from null or empty list.";
- throw new OptimizerException(msg, errCode, PigException.BUG);
- }
-
- try {
- LogicalOperator lo = nodes.get(0);
- if (lo == null) {
- int errCode = 2178;
- String msg = "The matching node from the optimizor framework is null";
- throw new OptimizerException(msg, errCode, PigException.BUG);
- }
- if ((lo instanceof LOForEach||lo instanceof LOSplit)&&lo.getSchema()!=null)
- return true;
- return false;
- } catch (Exception e) {
- int errCode = 2179;
- String msg = "Error while performing checks to prune columns.";
- throw new OptimizerException(msg, errCode, PigException.BUG, e);
- }
- }
-
- @Override
- // transform will pick every LOForEach and LOSplit
- public void transform(List<LogicalOperator> nodes)
- throws OptimizerException {
- if((nodes == null) || (nodes.size() <= 0)) {
- int errCode = 2177;
- String msg = "Cannot retrieve operator from null or empty list.";
- throw new OptimizerException(msg, errCode, PigException.BUG);
- }
- try {
- LogicalOperator lo = nodes.get(0);
- if (lo == null || !(lo instanceof LOForEach || lo instanceof LOSplit)) {
- int errCode = 2178;
- String msg = "Expected " + LOForEach.class.getSimpleName() + " or " + LOSplit.class.getSimpleName();
- throw new OptimizerException(msg, errCode, PigException.BUG);
- }
-
- // Check if we have saved requiredInfo, if so, we will use that as required output fields for that operator;
- // Otherwise means we require every output field
- RequiredInfo requiredOutputInfo = cachedRequiredInfo.get(lo);
-
- if (requiredOutputInfo==null)
- {
- List<RequiredFields> requiredOutputFieldsList = new ArrayList<RequiredFields>();
- List<LogicalOperator> successors = mPlan.getSuccessors(lo);
- if (successors==null)
- {
- requiredOutputFieldsList.add(new RequiredFields(true));
- }
- else
- {
- // The only case requiredOutputFieldsList more than 1 element is when the current
- // operator is LOSplit
- for (int i=0;i<successors.size();i++)
- {
- requiredOutputFieldsList.add(new RequiredFields(true));
- }
- }
- requiredOutputInfo = new RequiredInfo(requiredOutputFieldsList);
- }
- processNode(lo, requiredOutputInfo);
- } catch (OptimizerException oe) {
- throw oe;
- } catch (Exception e) {
- int errCode = 2181;
- String msg = "Unable to prune columns.";
- throw new OptimizerException(msg, errCode, PigException.BUG, e);
- }
- }
-
- // We recursively collect required fields from forEach from bottom to top, until one of the following conditions occurs:
- // 1. If we see another LOForEach, we simply stop because optimizor will pick that foreach later and start from there
- // 2. If we see LOStore, LOStream, LODistinct, we stop, LOStore, LOStream, LODistinct require all fields, we cannot push upward
- // 3. If we see LOLoad, we set required fields and stop, LOLoad suppose to read only required fields
- // 4. If we see LOSplit, we save requiredInfo and quit. optimizor will pick that split after all its successors are visited
- // For all other operators, we recursively call processNode for all its parents
- //
- // Inside processNode, we will collect required input columns from required output columns. Required input/output columns
- // also include required map keys referred by the logical plan beneath. Required input columns come from two sources:
- // 1. Relevant input fields of required output fields
- // 2. Required input fields of the logic operator
- //
- // lo: logical operator to process
- // requiredOutputFields: requiredFieldsList below this operator
- public void processNode(LogicalOperator lo, RequiredInfo requiredOutputInfo) throws OptimizerException
- {
- try
- {
- if (!safeToPrune)
- return;
- if (!(lo instanceof RelationalOperator))
- {
- int errCode = 2182;
- String msg = "Only relational operator can be used in column prune optimization.";
- throw new OptimizerException(msg, errCode, PigException.BUG);
- }
- if (lo.getSchema()==null)
- {
- safeToPrune = false;
- return;
- }
- RelationalOperator rlo = (RelationalOperator)lo;
- List<LogicalOperator> predecessors = (mPlan.getPredecessors(rlo) == null ? null
- : new ArrayList<LogicalOperator>(mPlan.getPredecessors(rlo)));
-
- // Now we have collected required output fields of LOLoad (include requried map keys).
- // We need to push these into the loader
- if (rlo instanceof LOLoad)
- {
- // LOLoad has only one output
- RequiredFields loaderRequiredFields = requiredOutputInfo.requiredFieldsList.get(0);
- prunedLoaderColumnsMap.put((LOLoad)rlo, loaderRequiredFields);
- return;
- }
-
- // If the predecessor is one of LOStore/LOStream/LODistinct, we stop to trace up.
- // We require all input fields. We stop processing here. The optimizer will
- // pick the next ForEach and start processing from there
- if (rlo instanceof LOStore || rlo instanceof LOStream || rlo instanceof LODistinct) {
- return;
- }
-
- // merge requiredOutputFields and process the predecessor
- if (rlo instanceof LOSplit)
- {
- List<RequiredFields> requiredInputFieldsList = new ArrayList<RequiredFields>();
- RequiredFields requiredFields = new RequiredFields(false);
- for (int i=0;i<mPlan.getSuccessors(rlo).size();i++)
- {
- RequiredFields rf = null;
- try {
- rf = requiredOutputInfo.requiredFieldsList.get(i);
- } catch (Exception e) {
- }
- if (rf!=null)
- {
- rf.reIndex(0);
- requiredFields.merge(rf);
- } else {
- // need all fields
- List<Pair<Integer, Integer>> l = new ArrayList<Pair<Integer, Integer>>();
- for (int j=0;j<rlo.getSchema().size();j++)
- l.add(new Pair<Integer, Integer>(0, j));
- rf = new RequiredFields(l);
- requiredFields.merge(rf);
- break;
- }
- }
- requiredInputFieldsList.add(requiredFields);
- if (predecessors.get(0) instanceof LOForEach || predecessors.get(0) instanceof LOSplit)
- cachedRequiredInfo.put((RelationalOperator)predecessors.get(0), new RequiredInfo(requiredInputFieldsList));
- else
- processNode(predecessors.get(0), new RequiredInfo(requiredInputFieldsList));
- return;
- }
-
- // Initialize requiredInputFieldsList
- List<RequiredFields> requiredInputFieldsList = new ArrayList<RequiredFields>();
- for (int i=0;i<predecessors.size();i++)
- requiredInputFieldsList.add(null);
-
- // Map required output columns to required input columns.
- // We also collect required output map keys into input map keys.
- // Since we have already processed Split, so every remaining operator
- // have only one element in requiredOutputFieldList, so we get the first
- // element and process
- RequiredFields requiredOutputFields = requiredOutputInfo.requiredFieldsList.get(0);
-
- // needAllFields means we require every individual output column and all map keys of that output.
- // We convert needAllFields to individual fields here to facilitate further processing
- if (requiredOutputFields.needAllFields())
- {
- List<Pair<Integer, Integer>> outputList = new ArrayList<Pair<Integer, Integer>>();
- for (int j=0;j<rlo.getSchema().size();j++)
- outputList.add(new Pair<Integer, Integer>(0, j));
- requiredOutputFields = new RequiredFields(outputList);
- for (int i=0;i<requiredOutputFields.size();i++)
- requiredOutputFields.setMapKeysInfo(i, new MapKeysInfo(true));
- }
-
- if (requiredOutputFields.getFields()==null)
- {
- int errCode = 2184;
- String msg = "Fields list inside RequiredFields is null.";
- throw new OptimizerException(msg, errCode, PigException.BUG);
- }
-
- for (int i=0;i<requiredOutputFields.size();i++)
- {
- Pair<Integer, Integer> requiredOutputField = requiredOutputFields.getField(i);
- MapKeysInfo outputMapKeysInfo = requiredOutputFields.getMapKeysInfo(i);
-
- List<RequiredFields> relevantFieldsList = rlo.getRelevantInputs(requiredOutputField.first, requiredOutputField.second);
-
- // We do not have any relevant input fields for this output, continue to next output
- if (relevantFieldsList==null)
- continue;
-
- for (int j=0;j<relevantFieldsList.size();j++)
- {
- RequiredFields relevantFields = relevantFieldsList.get(j);
-
- if (relevantFields!=null && relevantFields.needAllFields())
- {
- requiredInputFieldsList.set(j, new RequiredFields(true));
- continue;
- }
-
- // Mapping output map keys to input map keys
- if (rlo instanceof LOCogroup)
- {
- if (j!=0 && relevantFields!=null && !relevantFields.needAllFields())
- {
- for (Pair<Integer, Integer> pair : relevantFields.getFields())
- relevantFields.setMapKeysInfo(pair.first, pair.second,
- new MapKeysInfo(true));
- }
- }
- else if (rlo instanceof LOForEach)
- {
- // Relay map keys from output to input
- LogicalPlan forEachPlan = ((LOForEach)rlo).getRelevantPlan(requiredOutputField.second);
- if (relevantFields.getFields()!=null && relevantFields.getFields().size()!=0)
- {
- int index = ((LOForEach)rlo).getForEachPlans().indexOf(forEachPlan);
- // We check if the field get flattened, if it does, then we do not relay output map keys to input map keys.
- // There are two situations:
- // 1. input column is tuple, bag, or other simple type, there is no concept of map key, so we do not relay
- // 2. input column is map, flatten does not do anything, we can still relay
- boolean nonflatten = false;
- if (!((LOForEach)rlo).getFlatten().get(index))
- {
- nonflatten = true;
- }
- else
- {
- // Foreach plan is flattened, check if there is only one input for this foreach plan
- // and input schema for that input is not map, if so, it is a dummy flatten
- if (forEachPlan.getRoots().size()==1 && forEachPlan.getRoots().get(0) instanceof LOProject)
- {
- LOProject loProj = (LOProject)forEachPlan.getRoots().get(0);
- if (loProj.getExpression().getSchema()!=null &&
- loProj.getExpression().getSchema().getField(loProj.getCol()).type!=DataType.BAG)
- nonflatten = true;
- }
- }
- if (nonflatten && outputMapKeysInfo!=null && isSimpleProjectCast(forEachPlan))
- {
- Pair<Integer, Integer> inputColumn = relevantFields.getFields().get(0);
- relevantFields.setMapKeysInfo(inputColumn.first, inputColumn.second, outputMapKeysInfo);
- }
- }
-
- // Collect required map keys in foreach plan here.
- // This is the only logical operator that we collect map keys
- // which are introduced by the operator here.
- // For all other logical operators, it is attached to required fields
- // of that logical operator, will process in required fields processing
- // section
- for (Pair<Integer, Integer> relevantField : relevantFields.getFields())
- {
- MapKeysInfo mapKeysInfo = getMapKeysInPlan(forEachPlan, relevantField.second);
- relevantFields.mergeMapKeysInfo(0, relevantField.second, mapKeysInfo);
- }
- }
- else
- {
- // For all other logical operators, we have one output column mapping to one or more input column.
- // We copy the output map keys from the output column to the according input column
- if (relevantFields!=null && relevantFields.getFields()!=null && outputMapKeysInfo!=null)
- {
- for (Pair<Integer, Integer> pair : relevantFields.getFields())
- relevantFields.setMapKeysInfo(pair.first, pair.second,
- outputMapKeysInfo);
- }
- }
-
- // Now we aggregate the input columns of this output column to the required input columns
- if (requiredInputFieldsList.get(j)==null)
- requiredInputFieldsList.set(j, relevantFields);
- else
- {
- requiredInputFieldsList.get(j).merge(relevantFields);
- }
- }
- }
-
- // Merge with required input fields of this logical operator.
- // RequiredInputFields come from two sources, one is mapping from required output to input,
- // the other is from the operator itself. Here we use getRequiredFields to get the second part,
- // and merge with the first part
- List<RequiredFields> requiredFieldsListOfLOOp;
-
- // For LOForEach, requiredFields all flattened fields. Even the flattened fields get pruned,
- // it may expand the number of rows in the result. So flattened fields shall not be pruned.
- // LOForEach.getRequiredFields does not give the required fields. RequiredFields means that field
- // is required by all the outputs. The pipeline does not work correctly without that field.
- // LOForEach.getRequiredFields give all the input fields referred in the LOForEach statement, but those
- // fields can still be pruned (which means, not required)
- // Eg:
- // B = foreach A generate a0, a1, a2+a3;
- // LOForEach.getRequiredFields gives (a0, a1, a2, a3);
- // However, a2,a3 can be pruned if we do not need the a2+a3 for LOForEach.
- // So here, we do not use LOForEach.getRequiredFields, instead, any flattened fields are required fields
- if (rlo instanceof LOForEach) {
- List<Pair<Integer, Integer>> flattenedInputs = new ArrayList<Pair<Integer, Integer>>();
- for (int i=0;i<rlo.getSchema().size();i++) {
- if (((LOForEach)rlo).isInputFlattened(i)) {
- flattenedInputs.add(new Pair<Integer, Integer>(0, i));
- }
- }
- if (!flattenedInputs.isEmpty()) {
- requiredFieldsListOfLOOp = new ArrayList<RequiredFields>();
- requiredFieldsListOfLOOp.add(new RequiredFields(flattenedInputs));
- }
- else
- requiredFieldsListOfLOOp = null;
- }
- // For LOCross/LOUnion, actually we do not require any field here
- else if (rlo instanceof LOCross || rlo instanceof LOUnion)
- requiredFieldsListOfLOOp = null;
- else
- requiredFieldsListOfLOOp = rlo.getRequiredFields();
-
- if (requiredFieldsListOfLOOp!=null)
- {
- for (int i=0;i<requiredFieldsListOfLOOp.size();i++)
- {
- RequiredFields requiredFieldsOfLOOp = requiredFieldsListOfLOOp.get(i);
- if (requiredInputFieldsList.get(i)==null)
- requiredInputFieldsList.set(i, requiredFieldsOfLOOp);
- else
- {
- requiredInputFieldsList.get(i).merge(requiredFieldsOfLOOp);
- }
- }
-
- // Collect required map keys of this operator
- // Cases are:
- // 1. Single predecessor: LOFilter, LOSplitOutput, LOSort
- // 2. Multiple predecessors: LOJoin
- // 3. LOForEach do not have operator-wise required fields, we
- // have already processed it
- // 4. LOCogroup require all map keys (even if we cogroup by a0#'k1', a0 itself will be in bag a
- // and we have no way to figure out which keys are referenced for a0. So we do not process it and
- // simply require all map keys)
- // 5. Other operators do not have required fields, no need to process
- if (rlo instanceof LOFilter || rlo instanceof LOSplitOutput || rlo instanceof LOSort)
- {
- List<LogicalPlan> innerPlans = new ArrayList<LogicalPlan>();
- if (rlo instanceof LOFilter)
- {
- innerPlans.add(((LOFilter)rlo).getComparisonPlan());
- }
- else if (rlo instanceof LOSplitOutput)
- {
- innerPlans.add(((LOSplitOutput)rlo).getConditionPlan());
- }
- else if (rlo instanceof LOSort)
- {
- innerPlans.addAll(((LOSort)rlo).getSortColPlans());
- }
- for (LogicalPlan p : innerPlans)
- {
- for (RequiredFields rf : requiredFieldsListOfLOOp)
- {
- if (rf.getFields()==null)
- continue;
- for (Pair<Integer, Integer> pair : rf.getFields())
- {
- MapKeysInfo mapKeysInfo = getMapKeysInPlan(p, pair.second);
- if (mapKeysInfo!=null && !mapKeysInfo.needAllKeys() && mapKeysInfo.getKeys()!=null)
- requiredInputFieldsList.get(0).mergeMapKeysInfo(0, pair.second,
- mapKeysInfo);
- }
- }
- }
- }
- else if (rlo instanceof LOJoin)
- {
- for (int i=0;i<predecessors.size();i++)
- {
- Collection<LogicalPlan> joinPlans = ((LOJoin)rlo).getJoinPlans().get(predecessors.get(i));
- if (joinPlans==null)
- continue;
- for (LogicalPlan p : joinPlans)
- {
- RequiredFields rf = requiredFieldsListOfLOOp.get(i);
- if (rf.getFields()==null)
- continue;
- for (Pair<Integer, Integer> pair : rf.getFields())
- {
- MapKeysInfo mapKeysInfo = getMapKeysInPlan(p, pair.second);
- if (mapKeysInfo!=null && !mapKeysInfo.needAllKeys() && mapKeysInfo.getKeys()!=null)
- requiredInputFieldsList.get(i).mergeMapKeysInfo(i, pair.second,
- mapKeysInfo);
- }
- }
- }
- }
- }
-
- // Now we finish the current logical operator, we need to process next logical operator. There are two cases:
- // 1. If the predecessor is LOForEach or LOSplit, we put requiredOutputFieldsList into cache and exit, the optimizer
- // will invoke transform() on LOForEach or LOSplit and continue to process
- // 2. If the predecessor is otherwise, we then recursively collect required fields for the predecessor
- for (int i=0;i<predecessors.size();i++)
- {
- RelationalOperator predecessor = (RelationalOperator)predecessors.get(i);
-
- List<RequiredFields> newRequiredOutputFieldsList = new ArrayList<RequiredFields>();
-
- // In this optimization, we only prune columns and do not change structure of logical plan
- // So if we do not require anything from the input, we change it to require the first field
- if (requiredInputFieldsList.get(i)==null || requiredInputFieldsList.get(i).getNeedNoFields())
- {
- List<Pair<Integer, Integer>> dummyFields = new ArrayList<Pair<Integer, Integer>>();
- dummyFields.add(new Pair<Integer, Integer>(i, 0));
- requiredInputFieldsList.set(i, new RequiredFields(dummyFields));
- }
- // For all logical operator with one output, reindex the output to 0
- if (!(predecessor instanceof LOSplit))
- {
- if (requiredInputFieldsList.get(i)!=null)
- requiredInputFieldsList.get(i).reIndex(0);
- newRequiredOutputFieldsList.add(requiredInputFieldsList.get(i));
- }
- if (predecessor instanceof LOForEach)
- {
- cachedRequiredInfo.put(predecessor, new RequiredInfo(newRequiredOutputFieldsList));
- continue;
- }
-
- if (predecessor instanceof LOSplit)
- {
- int outputIndex = mPlan.getSuccessors(predecessor).indexOf(rlo);
- if (outputIndex==-1)
- {
- int errCode = 2186;
- String msg = "Cannot locate node from successor";
- throw new OptimizerException(msg, errCode, PigException.BUG);
- }
- if (requiredInputFieldsList.get(i)!=null)
- {
- requiredInputFieldsList.get(i).reIndex(outputIndex);
- }
- if (cachedRequiredInfo.containsKey(predecessor))
- newRequiredOutputFieldsList = cachedRequiredInfo.get(predecessor).requiredFieldsList;
-
- while (newRequiredOutputFieldsList.size()<=outputIndex)
- newRequiredOutputFieldsList.add(null);
- newRequiredOutputFieldsList.set(outputIndex, requiredInputFieldsList.get(i));
-
- cachedRequiredInfo.put(predecessor, new RequiredInfo(newRequiredOutputFieldsList));
- continue;
- }
- processNode(predecessors.get(i), new RequiredInfo(newRequiredOutputFieldsList));
- }
- } catch (FrontendException e) {
- int errCode = 2211;
- String msg = "Unable to prune columns when processing node " + lo;
- throw new OptimizerException(msg, errCode, PigException.BUG, e);
- }
- }
-
- // Get map keys in an inner plan of a particular input.
- private MapKeysInfo getMapKeysInPlan(LogicalPlan plan, int column) throws OptimizerException
- {
- // Determine if this foreach/cogroup plan can relate map keys from output columns to input columns.
- // For criteria of this, see the comment of method isMapKeyRelayableInInnerPlan
- // If this is true, the reference of the column here does not actually used by the logical operator:
- // eg: B = foreach A generate a0;
- // We relay map key of a0 to B.$0. Appearance of a0 on its own here does not mean we need all map keys of a0
- // So once we see this situation, we stop collecting required map keys of this logical operator
- if (isSimpleProjectCast(plan))
- return null;
-
- boolean requireAll = false;
- List<String> mapKeys = null;
- TopLevelProjectFinder projectFinder = new TopLevelProjectFinder(plan);
- try {
- projectFinder.visit();
- } catch (VisitorException ve) {
- int errCode = 2200;
- String msg = "Error getting top level project ";
- throw new OptimizerException(msg, errCode, PigException.BUG, ve);
- }
- for (LOProject project : projectFinder.getProjectSet())
- {
- if (!project.isStar() && project.getCol()==column) // LOProject for that column
- {
- List<LogicalOperator> successors = plan.getSuccessors(project);
- // If there are LOCast(s) in the middle (can only be cast to map, otherwise, there will not be maplookup below)
- // it is fine, we can ignore LOCast and continue to look for LOMapLookup
- while (successors!=null && successors.size()==1 && successors.get(0) instanceof LOCast)
- {
- LOCast cast = (LOCast)successors.get(0);
- successors = plan.getSuccessors(cast);
- }
- if (successors!=null && successors.size()==1 && successors.get(0) instanceof LOMapLookup)
- {
- LOMapLookup loMapLookup = (LOMapLookup)successors.get(0);
- if (loMapLookup.getLookUpKey()!=null)
- {
- if (mapKeys==null)
- mapKeys = new ArrayList<String>();
- if (!mapKeys.contains(loMapLookup.getLookUpKey()))
- mapKeys.add(loMapLookup.getLookUpKey());
- }
- requireAll = false;
- }
- else
- {
- requireAll = true;
- }
- }
- }
- return new MapKeysInfo(requireAll, mapKeys);
- }
-
- // Figure if we need to relay output map keys to input map keys. It is used for inner plan for LOForEach
- // and groupByPlan for LOCogroup
- // There are several cases:
- // 1. UDF, we cannot figure out how each input field is used in UDF, so for each input field,
- // we require everything
- // 2. Map constant, which do not requires any data input
- // 3. BinCond (B = foreach A generate a0==0?a1:a2; when a1, a2 is map)
- // This situation is complex. Two branches (a1, a2) is relayed independently, if it relays, then we cannot
- // collect map keys of that branches as the required operator map keys.
- // However, it is unlikely that user will refer a map key of that output, we simply say we need all map keys of
- // both map inputs
- // 4. Cast (B = foreach A generate a0 as (map[]);)
- // The only cases we can cast a field to a map is the input field is a map already or the field is a byte array
- // If the input field is a byte array, then the input field do not have concept of map keys. So we do not need
- // to figure out the map key for this input. If the input field is a map, then this cast is just a 1 to 1 mapping.
- // It is case 5
- // 5. 1 to 1 mapping (B = foreach A generate a0;)
- // 6. Map resolution (B = foreach A generate a0#'key1' as b0)
- // Since we only trace one level map keys, so relay no key (since all the key in the following script refer to b0#'k'
- // actually refer to a0#'key1'#'k', we do not relay second level map key 'k'), but require 'key1' for a0
- //
- // Based on the above observation, the algorithm to map output map keys to input map keys are:
- // 1. If a output column map to multiple input column, then we do not relay this output
- // 2. Find top level project for that plan, we collect map key only when we have only one input
- // associate with it
- // 3. Check if the predecessor of the project is null, if not, stop relaying input map keys
- // 4. Check the successors of that project, if it is not a null or cast, stop relaying input map keys
- // The qualifying logical plan should takes one project as root, optionally followed by one or more casts:
- //
- // Project
- // |
- // Cast*
- private boolean isSimpleProjectCast(LogicalPlan innerPlan) throws OptimizerException
- {
- TopLevelProjectFinder projectFinder = new TopLevelProjectFinder(innerPlan);
-
- try {
- projectFinder.visit();
- } catch (VisitorException ve) {
- throw new OptimizerException();
- }
- boolean relayingMapKeys = false;
- if (projectFinder.getProjectSet()!=null && projectFinder.getProjectSet().size()==1)
- {
- LOProject project = projectFinder.getProjectSet().iterator().next();
- if (innerPlan.getPredecessors(project)==null)
- {
- relayingMapKeys = true;
- LogicalOperator pred = project;
- while (innerPlan.getSuccessors(pred)!=null)
- {
- if (innerPlan.getSuccessors(pred).size()!=1)
- return false;
- if (!(innerPlan.getSuccessors(pred).get(0) instanceof LOCast))
- {
- return false;
- }
- pred = innerPlan.getSuccessors(pred).get(0);
- }
- }
- if (relayingMapKeys)
- return true;
- }
- return false;
- }
-
- // Prune fields of LOLoad, and use ColumePruner to prune all the downstream logical operators
- private void pruneLoader(LOLoad load, RequiredFields loaderRequiredFields) throws FrontendException
- {
- RequiredFieldList requiredFieldList = new RequiredFieldList();
-
- if (loaderRequiredFields==null || loaderRequiredFields.needAllFields())
- return;
- Schema loadSchema = load.getSchema();
- for (int i=0;i<loaderRequiredFields.size();i++)
- {
- Pair<Integer, Integer> pair = loaderRequiredFields.getField(i);
- MapKeysInfo mapKeysInfo = loaderRequiredFields.getMapKeysInfo(i);
- RequiredField requiredField = new RequiredField();
- requiredField.setIndex(pair.second);
- requiredField.setAlias(loadSchema.getField(pair.second).alias);
- requiredField.setType(loadSchema.getField(pair.second).type);
- if (mapKeysInfo!=null && !mapKeysInfo.needAllKeys())
- {
- List<RequiredField> subFieldList = new ArrayList<RequiredField>();
- for (String key : mapKeysInfo.getKeys())
- {
- RequiredField mapKeyField = new RequiredField();
- mapKeyField.setIndex(-1);
- mapKeyField.setType(DataType.UNKNOWN);
- mapKeyField.setAlias(key);
- subFieldList.add(mapKeyField);
- }
- requiredField.setSubFields(subFieldList);
- }
- // Sort requiredFieldList, loader expect required field list sorted by index
- int j=0;
- while (requiredFieldList.getFields().size()>j && requiredFieldList.getFields().get(j).getIndex()<pair.second)
- j++;
- requiredFieldList.getFields().add(j, requiredField);
- }
-
- boolean[] columnRequired = new boolean[load.getSchema().size()];
- RequiredFieldResponse response = null;
- try {
- response = load.pushProjection(requiredFieldList);
-
- } catch (FrontendException e) {
- log.warn("fieldsToRead on "+load+" throw an exception, skip it");
- }
-
- // If the request is not granted, probably the loader support position prune only,
- // and do not prune map key pruning (such as PigStorage). Drop all map keys (means
- // we do not prune map keys) and try again
- if (response==null || !response.getRequiredFieldResponse())
- {
- for (RequiredField rf : requiredFieldList.getFields())
- {
- if (rf.getType() == DataType.MAP)
- rf.setSubFields(null);
- }
- try {
- response = load.pushProjection(requiredFieldList);
- } catch (FrontendException e) {
- log.warn("fieldsToRead on "+load+" throw an exception, skip it");
- }
- }
-
- // Loader does not support column pruning, insert foreach
- LogicalOperator forEach = null;
- if (response==null || !response.getRequiredFieldResponse())
- {
- List<Integer> columnsToProject = new ArrayList<Integer>();
- for (RequiredField rf : requiredFieldList.getFields())
- columnsToProject.add(rf.getIndex());
-
- forEach = load.insertPlainForEachAfter(columnsToProject);
- }
-
- // Begin to prune
- for (Pair<Integer, Integer> pair: loaderRequiredFields.getFields())
- columnRequired[pair.second] = true;
-
- List<Pair<Integer, Integer>> pruneList = new ArrayList<Pair<Integer, Integer>>();
- for (int i=0;i<columnRequired.length;i++)
- {
- if (!columnRequired[i])
- pruneList.add(new Pair<Integer, Integer>(0, i));
- }
-
- StringBuffer message = new StringBuffer();
- if (pruneList.size()!=0)
- {
- if (forEach == null)
- pruner.addPruneMap(load, pruneList);
- else
- pruner.addPruneMap(forEach, pruneList);
-
- message.append("Columns pruned for " + load.getAlias() + ": ");
- for (int i=0;i<pruneList.size();i++)
- {
- message.append("$"+pruneList.get(i).second);
- if (i!=pruneList.size()-1)
- message.append(", ");
- }
- log.info(message);
- }
- else
- log.info("No column pruned for " + load.getAlias());
- message = new StringBuffer();;
- for (RequiredField rf : requiredFieldList.getFields())
- {
- if (rf.getSubFields()!=null)
- {
- message.append("Map key required for " + load.getAlias()+": ");
- if (rf.getIndex()!=-1)
- message.append("$"+rf.getIndex());
- else
- message.append(rf.getAlias());
- message.append("->[");
- for (int i=0;i<rf.getSubFields().size();i++)
- {
- RequiredField keyrf = rf.getSubFields().get(i);
- message.append(keyrf);
- if (i!=rf.getSubFields().size()-1)
- message.append(",");
- }
- message.append("] ");
- }
- }
- if (message.length()!=0)
- log.info(message);
- else
- log.info("No map keys pruned for " + load.getAlias());
- }
-
- public void prune() throws OptimizerException {
- try {
- if (!safeToPrune)
- return;
-
- for (LOLoad load : prunedLoaderColumnsMap.keySet())
- pruneLoader(load, prunedLoaderColumnsMap.get(load));
-
- if (!pruner.isEmpty())
- pruner.visit();
- }
- catch (FrontendException e) {
- int errCode = 2212;
- String msg = "Unable to prune plan";
- throw new OptimizerException(msg, errCode, PigException.BUG, e);
- }
- }
-}
Modified: pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/optimizer/PushDownForeachFlatten.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/optimizer/PushDownForeachFlatten.java?rev=1100420&r1=1100419&r2=1100420&view=diff
==============================================================================
--- pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/optimizer/PushDownForeachFlatten.java (original)
+++ pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/optimizer/PushDownForeachFlatten.java Sat May 7 00:15:40 2011
@@ -1,431 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.pig.impl.logicalLayer.optimizer;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.pig.impl.logicalLayer.CastFinder;
-import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.logicalLayer.LOCross;
-import org.apache.pig.impl.logicalLayer.LOForEach;
-import org.apache.pig.impl.logicalLayer.LOJoin;
-import org.apache.pig.impl.logicalLayer.LOProject;
-import org.apache.pig.impl.logicalLayer.LOSort;
-import org.apache.pig.impl.logicalLayer.LogicalOperator;
-import org.apache.pig.impl.logicalLayer.LogicalPlan;
-import org.apache.pig.impl.logicalLayer.RelationalOperator;
-import org.apache.pig.impl.logicalLayer.UDFFinder;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.impl.plan.DepthFirstWalker;
-import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.impl.plan.ProjectionMap;
-import org.apache.pig.impl.plan.RequiredFields;
-import org.apache.pig.impl.plan.OperatorPlan.IndexHelper;
-import org.apache.pig.impl.plan.ProjectionMap.Column;
-import org.apache.pig.impl.plan.optimizer.OptimizerException;
-import org.apache.pig.PigException;
-import org.apache.pig.impl.util.MultiMap;
-import org.apache.pig.impl.util.Pair;
-
-/**
- * A visitor to discover if a foreach with flatten(s) can be pushed as low down the tree as
- * possible.
- */
-public class PushDownForeachFlatten extends LogicalTransformer {
-
- // boolean to remember if the foreach has to be swapped
- private boolean mSwap = false;
-
- // boolean to remember if the foreach has to be cloned and pushed into one
- // of the foreach's successor's outputs
- private boolean mInsertBetween = false;
-
- // map of flattened column to its new position in the output
- Map<Integer, Integer> mFlattenedColumnReMap = null;
-
- public PushDownForeachFlatten(LogicalPlan plan) {
- super(plan);
- }
-
- /**
- *
- * @return true if the foreach has to swapped; false otherwise
- */
- public boolean getSwap() {
- return mSwap;
- }
-
- /**
- *
- * @return true if the foreach has to be inserted after its successor; false
- * otherwise
- */
- public boolean getInsertBetween() {
- return mInsertBetween;
- }
-
- /**
- *
- * @return a map of old column position in the foreach to the column
- * position in foreach's successor
- */
- public Map<Integer, Integer> getFlattenedColumnMap() {
- return mFlattenedColumnReMap;
- }
-
- @Override
- public boolean check(List<LogicalOperator> nodes) throws OptimizerException {
- try {
- LOForEach foreach = (LOForEach) getOperator(nodes);
-
- Pair<Boolean, List<Integer>> flattenResult = foreach.hasFlatten();
- boolean flattened = flattenResult.first;
- List<Integer> flattenedColumns = flattenResult.second;
- Set<Integer> flattenedColumnSet = (flattenedColumns == null? null: new HashSet<Integer>(flattenedColumns));
-
- if(!flattened) {
- return false;
- }
-
- if(flattenedColumns == null || flattenedColumns.size() == 0) {
- return false;
- }
-
- ProjectionMap foreachProjectionMap = foreach.getProjectionMap();
-
- if(foreachProjectionMap == null) {
- return false;
- }
-
- List<Integer> foreachAddedFields = foreachProjectionMap.getAddedFields();
- if(foreachAddedFields != null) {
- Set<Integer> foreachAddedFieldsSet = new HashSet<Integer>(foreachAddedFields);
- flattenedColumnSet.removeAll(foreachAddedFieldsSet);
- }
-
- if(flattenedColumnSet.size() == 0) {
- return false;
- }
-
- for(LogicalPlan foreachPlan: foreach.getForEachPlans()) {
- UDFFinder udfFinder = new UDFFinder(foreachPlan);
- udfFinder.visit();
-
- // if any of the foreach's inner plans contain a UDF then return false
- if (udfFinder.foundAnyUDF()) {
- return false;
- }
-
- CastFinder castFinder = new CastFinder(foreachPlan);
- castFinder.visit();
-
- // TODO
- // if any of the foreach's inner plans contain a cast then return false
- // in the future the cast should be moved appropriately
- if (castFinder.foundAnyCast()) {
- return false;
- }
- }
-
- List<LogicalOperator> successors = (mPlan.getSuccessors(foreach) == null ? null
- : new ArrayList<LogicalOperator>(mPlan
- .getSuccessors(foreach)));
-
- // if the foreach has no successors or more than one successor
- // return false
- if (successors == null || successors.size() == 0 || successors.size() > 1) {
- return false;
- }
-
- LogicalOperator successor = successors.get(0);
-
- List<LogicalOperator> peers = (mPlan.getPredecessors(successor) == null ? null
- : new ArrayList<LogicalOperator>(mPlan.getPredecessors(successor)));
-
- // check if any of the foreach's peers is a foreach flatten
- // if so then this rule does not apply
- if (peers != null){
- for(LogicalOperator peer: peers) {
- if(!peer.equals(foreach)) {
- if(peer instanceof LOForEach) {
- LOForEach peerForeach = (LOForEach)peer;
- if(peerForeach.hasFlatten().first) {
- return false;
- }
- }
- }
- }
- }
-
- IndexHelper<LogicalOperator> indexHelper = new IndexHelper<LogicalOperator>(peers);
- Integer foreachPosition = indexHelper.getIndex(foreach);
-
- // Check if flattened fields is required by successor, if so, don't optimize
- List<RequiredFields> requiredFieldsList = ((RelationalOperator)successor).getRequiredFields();
- RequiredFields requiredFields = requiredFieldsList.get(foreachPosition.intValue());
-
- MultiMap<Integer, Column> foreachMappedFields = foreachProjectionMap.getMappedFields();
-
- if (requiredFields.getFields()!=null) {
- for (Pair<Integer, Integer> pair : requiredFields.getFields()) {
- Collection<Column> columns = foreachMappedFields.get(pair.second);
- if (columns!=null) {
- for (Column column : columns) {
- Pair<Integer, Integer> foreachInputColumn = column.getInputColumn();
- if (foreach.isInputFlattened(foreachInputColumn.second))
- return false;
- }
- }
- }
- }
-
- // the foreach with flatten can be swapped with an order by
- // as the order by will have lesser number of records to sort
- // also the sort does not alter the records that are processed
-
- // the foreach with flatten can be pushed down a cross or a join
- // for the same reason. In this case the foreach has to be first
- // unflattened and then a new foreach has to be inserted after
- // the cross or join. In both cross and foreach the actual columns
- // from the foreach are not altered but positions might be changed
-
- // in the case of union the column is transformed and as a result
- // the foreach flatten cannot be pushed down
-
- // for distinct the output before flattening and the output
- // after flattening might be different. For example, consider
- // {(1), (1)}. Distinct of this bag is still {(1), (1)}.
- // distinct(flatten({(1), (1)})) is (1). However,
- // flatten(distinct({(1), (1)})) is (1), (1)
-
- // in both cases correctness is not affected
- if(successor instanceof LOSort) {
- LOSort sort = (LOSort) successor;
- RequiredFields sortRequiredField = sort.getRequiredFields().get(0);
-
- if(sortRequiredField.getNeedAllFields()) {
- return false;
- }
-
- List<Pair<Integer, Integer>> sortInputs = sortRequiredField.getFields();
- Set<Integer> requiredInputs = new HashSet<Integer>();
- for(Pair<Integer, Integer> pair: sortInputs) {
- requiredInputs.add(pair.second);
- }
-
- requiredInputs.retainAll(flattenedColumnSet);
- // the intersection of the sort's required inputs
- // and the flattened columns in the foreach should
- // be null, i.e., the size of required inputs == 0
- if(requiredInputs.size() != 0) {
- return false;
- }
-
- mSwap = true;
- return true;
- } else if (successor instanceof LOCross
- || successor instanceof LOJoin) {
-
- List<LogicalOperator> children = mPlan.getSuccessors(successor);
-
- if(children == null || children.size() > 1) {
- return false;
- }
-
- ProjectionMap succProjectionMap = successor.getProjectionMap();
-
- if(succProjectionMap == null) {
- return false;
- }
-
- MultiMap<Integer, ProjectionMap.Column> mappedFields = succProjectionMap.getMappedFields();
-
- if(mappedFields == null) {
- return false;
- }
-
- if(mFlattenedColumnReMap == null) {
- mFlattenedColumnReMap = new HashMap<Integer, Integer>();
- }
-
- // initialize the map
- for(Integer key: flattenedColumnSet) {
- mFlattenedColumnReMap.put(key, Integer.MAX_VALUE);
- }
-
- // for each output column find the corresponding input that matches the foreach's position
- // for each input column in the foreach check if the output column is a mapping of the flattened column
- // due to flattenning multiple output columns could be generated from the same input column
- // find the first or the lowest column that is a result of the
- for(Integer key: mappedFields.keySet()) {
- List<ProjectionMap.Column> columns = mappedFields.get(key);
- for(ProjectionMap.Column column: columns) {
- Pair<Integer, Integer> inputColumn = column.getInputColumn();
-
- // check if the input column number is the same as the
- // position of foreach in the list of predecessors
- if(foreachPosition.equals(inputColumn.first)) {
- if(flattenedColumnSet.contains(inputColumn.second)) {
- // check if the output column, i.e., key is the
- // least column number seen till date
- if(key < mFlattenedColumnReMap.get(inputColumn.second)) {
- mFlattenedColumnReMap.put(inputColumn.second, key);
- }
- }
- }
- }
- }
-
- // check if any of the flattened columns is not remapped
- for(Integer key: mFlattenedColumnReMap.keySet()) {
- if(mFlattenedColumnReMap.get(key).equals(Integer.MAX_VALUE)) {
- return false;
- }
- }
-
- mInsertBetween = true;
- return true;
- }
-
- return false;
-
- } catch (OptimizerException oe) {
- throw oe;
- } catch (Exception e) {
- int errCode = 2152;
- String msg = "Internal error while trying to check if foreach with flatten can be pushed down.";
- throw new OptimizerException(msg, errCode, PigException.BUG, e);
- }
- }
-
- private LogicalOperator getOperator(List<LogicalOperator> nodes)
- throws FrontendException {
- if ((nodes == null) || (nodes.size() <= 0)) {
- int errCode = 2052;
- String msg = "Internal error. Cannot retrieve operator from null or empty list.";
- throw new OptimizerException(msg, errCode, PigException.BUG);
- }
-
- LogicalOperator lo = nodes.get(0);
- if (lo == null || !(lo instanceof LOForEach)) {
- // we should never be called with any other operator class name
- int errCode = 2005;
- String msg = "Expected " + LOForEach.class.getSimpleName()
- + ", got "
- + (lo == null ? lo : lo.getClass().getSimpleName());
- throw new OptimizerException(msg, errCode, PigException.INPUT);
- } else {
- return lo;
- }
-
- }
-
- @Override
- public void transform(List<LogicalOperator> nodes)
- throws OptimizerException {
- try {
- LOForEach foreach = (LOForEach) getOperator(nodes);
- LogicalOperator successor = mPlan.getSuccessors(foreach).get(0);
- if (mSwap) {
- mPlan.swap(successor, foreach);
- } else if (mInsertBetween) {
- // mark the flattened columns as not flattened in the foreach
- // create a new foreach operator that projects each column of the
- // successor. Mark the remapped flattened columns as flattened
- // in the new foreach operator
-
- if(mFlattenedColumnReMap == null) {
- int errCode = 2153;
- String msg = "Internal error. The mapping for the flattened columns is empty";
- throw new OptimizerException(msg, errCode, PigException.BUG);
- }
-
- // set flatten to false for all columns in the mapping
-
- ArrayList<Boolean> flattenList = (ArrayList<Boolean>)foreach.getFlatten();
- for(Integer key: mFlattenedColumnReMap.keySet()) {
- flattenList.set(key, false);
- }
-
- // rebuild schemas of the foreach and the successor after the foreach modification
- foreach.regenerateSchema();
- successor.regenerateSchema();
-
- Schema successorSchema = successor.getSchema();
-
- if(successorSchema == null) {
- int errCode = 2154;
- String msg = "Internal error. Schema of successor cannot be null for pushing down foreach with flatten.";
- throw new OptimizerException(msg, errCode, PigException.BUG);
- }
-
- flattenList = new ArrayList<Boolean>();
-
- ArrayList<LogicalPlan> foreachInnerPlans = new ArrayList<LogicalPlan>();
-
- for(int i = 0; i < successorSchema.size(); ++i) {
- LogicalPlan innerPlan = new LogicalPlan();
- LOProject project = new LOProject(innerPlan, OperatorKey
- .genOpKey(foreach.getOperatorKey().scope),
- successor, i);
- innerPlan.add(project);
- foreachInnerPlans.add(innerPlan);
- flattenList.add(false);
- }
-
- // set the flattened remapped column to true
- for(Integer key: mFlattenedColumnReMap.keySet()) {
- Integer value = mFlattenedColumnReMap.get(key);
- flattenList.set(value, true);
- }
-
-
- LOForEach newForeach = new LOForEach(mPlan, OperatorKey
- .genOpKey(foreach.getOperatorKey().scope), foreachInnerPlans,
- flattenList);
-
- // add the new foreach to the plan
- mPlan.add(newForeach);
-
- // insert the new foreach between the successor and the successor's successor
- mPlan.insertBetween(successor, newForeach, mPlan.getSuccessors(successor).get(0));
- }
- } catch (OptimizerException oe) {
- throw oe;
- } catch (Exception e) {
- int errCode = 2155;
- String msg = "Internal error while pushing foreach with flatten down.";
- throw new OptimizerException(msg, errCode, PigException.BUG, e);
- }
- }
-
- @Override
- public void reset() {
- mInsertBetween = false;
- mSwap = false;
- mFlattenedColumnReMap = null;
- }
-
-}
Modified: pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/optimizer/PushUpFilter.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/optimizer/PushUpFilter.java?rev=1100420&r1=1100419&r2=1100420&view=diff
==============================================================================
--- pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/optimizer/PushUpFilter.java (original)
+++ pig/branches/branch-0.9/src/org/apache/pig/impl/logicalLayer/optimizer/PushUpFilter.java Sat May 7 00:15:40 2011
@@ -1,511 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.pig.impl.logicalLayer.optimizer;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.pig.data.DataType;
-import org.apache.pig.impl.logicalLayer.CastFinder;
-import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.logicalLayer.LOCast;
-import org.apache.pig.impl.logicalLayer.LOCogroup;
-import org.apache.pig.impl.logicalLayer.LOCross;
-import org.apache.pig.impl.logicalLayer.LOIsNull;
-import org.apache.pig.impl.logicalLayer.LOJoin;
-import org.apache.pig.impl.logicalLayer.LOFilter;
-import org.apache.pig.impl.logicalLayer.LOForEach;
-import org.apache.pig.impl.logicalLayer.LOLimit;
-import org.apache.pig.impl.logicalLayer.LOLoad;
-import org.apache.pig.impl.logicalLayer.LONative;
-import org.apache.pig.impl.logicalLayer.LOProject;
-import org.apache.pig.impl.logicalLayer.LOSplit;
-import org.apache.pig.impl.logicalLayer.LOStore;
-import org.apache.pig.impl.logicalLayer.LOStream;
-import org.apache.pig.impl.logicalLayer.LOSplitOutput;
-import org.apache.pig.impl.logicalLayer.LOUnion;
-import org.apache.pig.impl.logicalLayer.LogicalOperator;
-import org.apache.pig.impl.logicalLayer.LogicalPlan;
-import org.apache.pig.impl.logicalLayer.TopLevelProjectFinder;
-import org.apache.pig.impl.logicalLayer.UDFFinder;
-import org.apache.pig.impl.plan.DepthFirstWalker;
-import org.apache.pig.impl.plan.ProjectionMap;
-import org.apache.pig.impl.plan.RequiredFields;
-import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.impl.plan.optimizer.OptimizerException;
-import org.apache.pig.PigException;
-import org.apache.pig.impl.util.MultiMap;
-import org.apache.pig.impl.util.Pair;
-
-/**
- * A visitor to discover if a filter can be pushed as high up the tree as
- * possible.
- */
-public class PushUpFilter extends LogicalTransformer {
-
- // boolean to remember if the filter has to be swapped
- private boolean mSwap = false;
-
- // boolean to remember if the filter has to be pushed into one of the
- // filter's predecessor's inputs
- private boolean mPushBefore = false;
-
- // the input of the predecessor where the filter has to be pushed
- private int mPushBeforeInput = -1;
-
- public PushUpFilter(LogicalPlan plan) {
- super(plan);
- }
-
- /**
- *
- * @return true if the filter has to swapped; false otherwise
- */
- public boolean getSwap() {
- return mSwap;
- }
-
- /**
- *
- * @return true if the filter has to be pushed before its predecessor; false
- * otherwise
- */
- public boolean getPushBefore() {
- return mPushBefore;
- }
-
- /**
- *
- * @return return the input of the predecessor where the filter has to be
- * pushed
- */
- public int getPushBeforeInput() {
- return mPushBeforeInput;
- }
-
- @Override
- public boolean check(List<LogicalOperator> nodes) throws OptimizerException {
- try {
- LOFilter filter = (LOFilter) getOperator(nodes);
- List<LogicalOperator> predecessors = (mPlan.getPredecessors(filter) == null ? null
- : new ArrayList<LogicalOperator>(mPlan
- .getPredecessors(filter)));
-
- // if there are no predecessors return false
- if (predecessors == null) {
- return false;
- }
-
- // if the filter has no predecessors or more than one predecessor
- // return false
- if (predecessors.size() == 0 || predecessors.size() > 1) {
- return false;
- }
-
- LogicalOperator predecessor = predecessors.get(0);
-
- // if the predecessor is one of LOLoad/LOStore/LOStream/LOLimit/LONative
- // return false
- if (predecessor instanceof LOLoad || predecessor instanceof LOStore
- || predecessor instanceof LOStream
- || predecessor instanceof LOLimit
- || predecessor instanceof LONative) {
- return false;
- }
-
- // TODO
- // for now filters cannot be combined
- // remove this check when filters can be combined
- if (predecessor instanceof LOFilter)
- return false;
-
- // TODO
- // same rule as filters
- if (predecessor instanceof LOSplitOutput) {
- return false;
- }
- if (predecessor instanceof LOSplit) {
- return false;
- }
-
- UDFFinder udfFinder = new UDFFinder(filter.getComparisonPlan());
- udfFinder.visit();
-
- // if the filter's inner plan contains any UDF then return false
- if (udfFinder.foundAnyUDF()) {
- return false;
- }
-
- CastFinder castFinder = new CastFinder(filter.getComparisonPlan());
- castFinder.visit();
-
- // if the filter's inner plan contains any casts then return false
- if (castFinder.foundAnyCast()) {
- return false;
- }
-
- List<RequiredFields> filterRequiredFields = filter
- .getRequiredFields();
- if (filterRequiredFields == null) {
- return false;
- }
- RequiredFields requiredField = filterRequiredFields.get(0);
-
- // the filter's conditions contain constant expression
- // return false
- if (requiredField.needNoFields()) {
- return false;
- }
-
- // if the predecessor is a multi-input operator then detailed
- // checks are required
- if (predecessor instanceof LOCross
- || predecessor instanceof LOUnion
- || predecessor instanceof LOCogroup
- || predecessor instanceof LOJoin) {
-
- // check if the filter's required fields in conjunction with the
- // predecessor's projection map. If the filter needs more than
- // one input then the filter's expressions have to be split
-
- List<LogicalOperator> grandParents = mPlan
- .getPredecessors(predecessor);
-
- // if the predecessor does not have predecessors return false
- if (grandParents == null || grandParents.size() == 0) {
- return false;
- }
-
- // check if the predecessor is a group by
- if (grandParents.size() == 1) {
- if (predecessor instanceof LOCogroup) {
- mSwap = true;
- return true;
- } else {
- // only a group by can have a single input
- return false;
- }
- }
-
- if (requiredField.needAllFields()) {
- return false;
- }
-
- Pair<Boolean, Set<Integer>> mappingResult = isRequiredFieldMapped(requiredField, predecessor.getProjectionMap());
- boolean mapped = mappingResult.first;
- Set<Integer> grandParentIndexes = mappingResult.second;
- if (!mapped) {
- return false;
- }
-
- // TODO
- // the filter's conditions requires more than one input of its
- // predecessor
- // when the filter's conditions are splittable return true
- if ((grandParentIndexes == null)
- || (grandParentIndexes.size() == 0)
- || (grandParentIndexes.size() > 1)) {
- return false;
- }
-
- if (predecessor instanceof LOCogroup) {
- // check for outer
- if (isAnyOuter((LOCogroup) predecessor)) {
- return false;
- }
- }
-
- mPushBeforeInput = grandParentIndexes.iterator().next();
-
- if (predecessor instanceof LOJoin) {
- boolean otherBranchContainOuter = false;
- boolean sawInner = false;
- for (int i=0;i<=mPlan.getSuccessors(predecessor).size();i++) {
- // We do not push filter if any other branch is outer
- // See PIG-1289
- // Also in LOJoin, innerFlag==true indicate that branch is the outer join side
- // which has the exact opposite semantics
- // If all innerFlag is true, that implies a regular join
- // If all innerFlag is false, means a outer join, in this case, we can not push up filter for any path (See PIG-1507)
- if (i!=mPushBeforeInput && ((LOJoin)predecessor).getInnerFlags()[i]) {
- otherBranchContainOuter = true;
- }
- if (((LOJoin)predecessor).getInnerFlags()[i]==false) {
- sawInner = true;
- }
- }
- if (!otherBranchContainOuter && ((LOJoin)predecessor).getInnerFlags()[mPushBeforeInput]==false) // all innerFlag is false, implies an outer join
- {
- mPushBeforeInput = -1;
- return false;
- }
- if (otherBranchContainOuter && sawInner) // If it is not a regular join and the path we push is on inner side
- {
- mPushBeforeInput = -1;
- return false;
- }
- }
-
- mPushBefore = true;
- return true;
-
- } else if (predecessor instanceof LOForEach) {
-
- LOForEach loForEach = (LOForEach) predecessor;
- List<Boolean> mFlatten = loForEach.getFlatten();
- boolean hasFlatten = false;
- for (Boolean b : mFlatten) {
- if (b.equals(true)) {
- hasFlatten = true;
- }
- }
-
- // TODO
- // A better check is to examine each column in the filter's
- // required fields. If the column is the result of a flatten
- // then
- // return false else return true
-
- // for now if the foreach has a flatten then return false
- if (hasFlatten) {
- return false;
- }
-
- Pair<Boolean, Set<Integer>> mappingResult = isRequiredFieldMapped(requiredField, predecessor.getProjectionMap());
- boolean mapped = mappingResult.first;
-
- // Check if it is a direct mapping, that is, project optionally followed by cast, so if project->project, it is not
- // considered as a mapping
- for (Pair<Integer, Integer> pair : requiredField.getFields())
- {
- if (!isFieldSimple(loForEach.getForEachPlans().get(pair.second)))
- {
- mapped = false;
- break;
- }
- }
-
- if (!mapped) {
- return false;
- }
- }
-
- mSwap = true;
- return true;
- } catch (OptimizerException oe) {
- throw oe;
- } catch (Exception e) {
- int errCode = 2149;
- String msg = "Internal error while trying to check if filters can be pushed up.";
- throw new OptimizerException(msg, errCode, PigException.BUG, e);
- }
- }
-
- private LogicalOperator getOperator(List<LogicalOperator> nodes)
- throws FrontendException {
- if ((nodes == null) || (nodes.size() <= 0)) {
- int errCode = 2052;
- String msg = "Internal error. Cannot retrieve operator from null or empty list.";
- throw new OptimizerException(msg, errCode, PigException.BUG);
- }
-
- LogicalOperator lo = nodes.get(0);
- if (lo == null || !(lo instanceof LOFilter)) {
- // we should never be called with any other operator class name
- int errCode = 2005;
- String msg = "Expected " + LOFilter.class.getSimpleName()
- + ", got "
- + (lo == null ? lo : lo.getClass().getSimpleName());
- throw new OptimizerException(msg, errCode, PigException.INPUT);
- } else {
- return lo;
- }
-
- }
-
- @Override
- public void transform(List<LogicalOperator> nodes)
- throws OptimizerException {
- try {
- LOFilter filter = (LOFilter) getOperator(nodes);
- LogicalOperator predecessor = mPlan.getPredecessors(filter).get(0);
- if (mSwap) {
- mPlan.swap(predecessor, filter);
- } else if (mPushBefore) {
- if (mPushBeforeInput == -1) {
- // something is wrong!
- int errCode = 2150;
- String msg = "Internal error. The push before input is not set.";
- throw new OptimizerException(msg, errCode, PigException.BUG);
- }
- mPlan.pushBefore(predecessor, filter, mPushBeforeInput);
- }
- } catch (OptimizerException oe) {
- throw oe;
- } catch (Exception e) {
- int errCode = 2151;
- String msg = "Internal error while pushing filters up.";
- throw new OptimizerException(msg, errCode, PigException.BUG, e);
- }
- }
-
- @Override
- public void reset() {
- mPushBefore = false;
- mPushBeforeInput = -1;
- mSwap = false;
- }
-
- /**
- *
- * A method to check if there any grouping column has the outer clause in a
- * grouping operator
- *
- * @param cogroup
- * the cogroup operator to be examined for presence of outer
- * clause
- * @return true if the cogroup contains any input that has an outer clause;
- * false otherwise
- */
- private boolean isAnyOuter(LOCogroup cogroup) {
- boolean[] innerList = cogroup.getInner();
- for (boolean inner : innerList) {
- if (!inner) {
- return true;
- }
- }
- return false;
- }
-
- /**
- * A method to check if the required field contains elements that are mapped
- * in the predecessor's inputs without a cast
- *
- * @param requiredField
- * the required field of the operator
- * @param predProjectionMap
- * the projection map of the predecessor
- * @return a pair of boolean and a set of integers; the first element of the
- * pair is true if the field is mapped without a cast; false
- * otherwise; the second element of the pair is the set of
- * predecessor's inputs that are required for the mapping
- */
- private Pair<Boolean, Set<Integer>> isRequiredFieldMapped(RequiredFields requiredField,
- ProjectionMap predProjectionMap) {
-
- if(requiredField == null) {
- return new Pair<Boolean, Set<Integer>>(false, null);
- }
-
- // if predecessor projection map is null then return false
- if (predProjectionMap == null) {
- return new Pair<Boolean, Set<Integer>>(false, null);
- }
-
- // if the predecessor does not change its output return true
- if (!predProjectionMap.changes()) {
- return new Pair<Boolean, Set<Integer>>(true, null);
- }
-
- MultiMap<Integer, ProjectionMap.Column> mappedFields = predProjectionMap
- .getMappedFields();
- // if there is no mapping in the predecessor then return false
- if (mappedFields == null) {
- return new Pair<Boolean, Set<Integer>>(false, null);
- }
-
- Set<Integer> predInputs = new HashSet<Integer>();
- for (Pair<Integer, Integer> pair : requiredField.getFields()) {
- predInputs.add(pair.second);
- }
-
- boolean mapped = false;
- Set<Integer> grandParentIndexes = new HashSet<Integer>();
-
- for (Integer input : predInputs) {
- List<ProjectionMap.Column> inputList = mappedFields.get(input);
- // inputList is null -> the required field is added
- if(inputList == null) {
- return new Pair<Boolean, Set<Integer>>(false, null);
- }
- for (ProjectionMap.Column column : inputList) {
- // TODO
- // Check if the column has a cast
- // if a cast is not used then consider it as mapped
- // in the future this should go away and the cast
- // type should be used to move around the projections
- if (!column.cast()) {
- mapped = true;
- }
- Pair<Integer, Integer> pair = column.getInputColumn();
- grandParentIndexes.add(pair.first);
- }
- }
-
- if (!mapped) {
- return new Pair<Boolean, Set<Integer>>(false, null);
- }
-
- return new Pair<Boolean, Set<Integer>>(true, grandParentIndexes);
- }
-
- /**
- * Check if the inner plan is simple
- *
- * @param lp
- * logical plan to check
- * @return Whether if the logical plan is a simple project optionally followed by cast
- */
- boolean isFieldSimple(LogicalPlan lp) throws OptimizerException
- {
- TopLevelProjectFinder projectFinder = new TopLevelProjectFinder(lp);
-
- try {
- projectFinder.visit();
- } catch (VisitorException ve) {
- throw new OptimizerException();
- }
- if (projectFinder.getProjectSet()!=null && projectFinder.getProjectSet().size()==1)
- {
- LOProject project = projectFinder.getProjectSet().iterator().next();
- if (lp.getPredecessors(project)==null)
- {
- LogicalOperator pred = project;
- while (lp.getSuccessors(pred)!=null)
- {
- if (lp.getSuccessors(pred).size()!=1)
- return false;
- if (!(lp.getSuccessors(pred).get(0) instanceof LOCast))
- {
- return false;
- }
- pred = lp.getSuccessors(pred).get(0);
- }
- return true;
- }
- return false;
- }
- else
- return true;
- }
-}