You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2010/03/15 04:28:28 UTC
svn commit: r923043 [3/5] - in /hadoop/pig/trunk:
src/org/apache/pig/backend/hadoop/executionengine/
src/org/apache/pig/experimental/logical/
src/org/apache/pig/experimental/logical/expression/
src/org/apache/pig/experimental/logical/optimizer/ src/org...
Added: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/ColumnPruneHelper.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/ColumnPruneHelper.java?rev=923043&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/ColumnPruneHelper.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/ColumnPruneHelper.java Mon Mar 15 03:28:27 2010
@@ -0,0 +1,487 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.experimental.logical.rules;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.pig.data.DataType;
+import org.apache.pig.experimental.logical.expression.LogicalExpression;
+import org.apache.pig.experimental.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.experimental.logical.expression.ProjectExpression;
+import org.apache.pig.experimental.logical.relational.LOCogroup;
+import org.apache.pig.experimental.logical.relational.LOFilter;
+import org.apache.pig.experimental.logical.relational.LOForEach;
+import org.apache.pig.experimental.logical.relational.LOGenerate;
+import org.apache.pig.experimental.logical.relational.LOInnerLoad;
+import org.apache.pig.experimental.logical.relational.LOJoin;
+import org.apache.pig.experimental.logical.relational.LOLoad;
+import org.apache.pig.experimental.logical.relational.LOStore;
+import org.apache.pig.experimental.logical.relational.LogicalPlan;
+import org.apache.pig.experimental.logical.relational.LogicalPlanVisitor;
+import org.apache.pig.experimental.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.experimental.logical.relational.LogicalSchema;
+import org.apache.pig.experimental.logical.relational.SchemaNotDefinedException;
+import org.apache.pig.experimental.logical.relational.LogicalSchema.LogicalFieldSchema;
+import org.apache.pig.experimental.plan.Operator;
+import org.apache.pig.experimental.plan.OperatorPlan;
+import org.apache.pig.experimental.plan.OperatorSubPlan;
+import org.apache.pig.experimental.plan.ReverseDependencyOrderWalker;
+
+/**
+ * Helper class used by ColumnMapKeyPrune to figure out what columns can be pruned.
+ * It doesn't make any changes to the operator plan
+ *
+ */
+public class ColumnPruneHelper {
+ protected static final String INPUTUIDS = "ColumnPrune:InputUids";
+ protected static final String OUTPUTUIDS = "ColumnPrune:OutputUids";
+ protected static final String REQUIREDCOLS = "ColumnPrune:RequiredColumns";
+
+ private OperatorPlan currentPlan;
+ private OperatorSubPlan subPlan;
+
+ public ColumnPruneHelper(OperatorPlan currentPlan) {
+ this.currentPlan = currentPlan;
+ }
+
+ private OperatorSubPlan getSubPlan() throws IOException {
+ OperatorSubPlan p = null;
+ if (currentPlan instanceof OperatorSubPlan) {
+ p = new OperatorSubPlan(((OperatorSubPlan)currentPlan).getBasePlan());
+ } else {
+ p = new OperatorSubPlan(currentPlan);
+ }
+ Iterator<Operator> iter = currentPlan.getOperators();
+
+ while(iter.hasNext()) {
+ Operator op = iter.next();
+ if (op instanceof LOForEach) {
+ addOperator(op, p);
+ }
+ }
+
+ return p;
+ }
+
+ private void addOperator(Operator op, OperatorSubPlan subplan) throws IOException {
+ if (op == null) {
+ return;
+ }
+
+ subplan.add(op);
+
+ List<Operator> ll = currentPlan.getPredecessors(op);
+ if (ll == null) {
+ return;
+ }
+
+ for(Operator pred: ll) {
+ addOperator(pred, subplan);
+ }
+ }
+
+
+ @SuppressWarnings("unchecked")
+ public boolean check() throws IOException {
+ List<Operator> sources = currentPlan.getSources();
+ // if this rule has run before, just return false
+ if (sources.get(0).getAnnotation(INPUTUIDS) != null) {
+ return false;
+ }
+
+ // create sub-plan that ends with foreach
+ subPlan = getSubPlan();
+ if (subPlan.size() == 0) {
+ return false;
+ }
+
+ ColumnDependencyVisitor v = new ColumnDependencyVisitor(subPlan);
+ try {
+ v.visit();
+ }catch(SchemaNotDefinedException e) {
+ // if any operator has an unknown schema, just return false
+ return false;
+ }
+
+ List<Operator> ll = subPlan.getSources();
+ boolean found = false;
+ for(Operator op: ll) {
+ if (op instanceof LOLoad) {
+ Set<Long> uids = (Set<Long>)op.getAnnotation(INPUTUIDS);
+ LogicalSchema s = ((LOLoad) op).getSchema();
+ Set<Integer> required = getColumns(s, uids);
+
+ if (required.size() < s.size()) {
+ op.annotate(REQUIREDCOLS, required);
+ found = true;
+ }
+ }
+ }
+
+ return found;
+ }
+
+ // get a set of column indexes from a set of uids
+ protected Set<Integer> getColumns(LogicalSchema schema, Set<Long> uids) throws IOException {
+ if (schema == null) {
+ throw new SchemaNotDefinedException("Schema is not defined.");
+ }
+
+ Set<Integer> cols = new HashSet<Integer>();
+ Iterator<Long> iter = uids.iterator();
+ while(iter.hasNext()) {
+ long uid = iter.next();
+ int index = schema.findField(uid);
+ if (index == -1) {
+ throw new IOException("UID " + uid + " is not found in the schema");
+ }
+
+ cols.add(index);
+ }
+
+ return cols;
+ }
+
+ public OperatorPlan reportChanges() {
+ return subPlan;
+ }
+
+ // Visitor to calculate the input and output uids for each operator
+ // It doesn't change the plan, only put calculated info as annotations
+ // The input and output uids are not necessarily the top level uids of
+ // a schema. They may be the uids of lower level fields of complex fields
+ // that have their own schema.
+ private class ColumnDependencyVisitor extends LogicalPlanVisitor {
+
+ public ColumnDependencyVisitor(OperatorPlan plan) {
+ super(plan, new ReverseDependencyOrderWalker(plan));
+ }
+
+ public void visitLOLoad(LOLoad load) throws IOException {
+ Set<Long> output = setOutputUids(load);
+
+ // for load, input uids are same as output uids
+ load.annotate(INPUTUIDS, output);
+ }
+
+ public void visitLOFilter(LOFilter filter) throws IOException {
+ Set<Long> output = setOutputUids(filter);
+
+ // the input uids contains all the output uids and
+ // projections in filter conditions
+ Set<Long> input = new HashSet<Long>(output);
+
+ LogicalExpressionPlan exp = filter.getFilterPlan();
+ collectUids(filter, exp, input);
+
+ filter.annotate(INPUTUIDS, input);
+ }
+
+ public void visitLOStore(LOStore store) throws IOException {
+ Set<Long> output = setOutputUids(store);
+
+ if (output.isEmpty()) {
+ // to deal with load-store-load-store case
+ LogicalSchema s = store.getSchema();
+ if (s == null) {
+ throw new SchemaNotDefinedException("Schema for " + store.getName() + " is not defined.");
+ }
+
+ for(int i=0; i<s.size(); i++) {
+ output.add(s.getField(i).uid);
+ }
+ }
+
+ // for store, input uids are same as output uids
+ store.annotate(INPUTUIDS, output);
+ }
+
+ public void visitLOJoin(LOJoin join) throws IOException {
+ Set<Long> output = setOutputUids(join);
+
+ // the input uids contains all the output uids and
+ // projections in join expressions
+ Set<Long> input = new HashSet<Long>(output);
+
+ Collection<LogicalExpressionPlan> exps = join.getExpressionPlans();
+ Iterator<LogicalExpressionPlan> iter = exps.iterator();
+ while(iter.hasNext()) {
+ LogicalExpressionPlan exp = iter.next();
+ collectUids(join, exp, input);
+ }
+
+ join.annotate(INPUTUIDS, input);
+ }
+
+ @Override
+ public void visitLOCogroup(LOCogroup cg) throws IOException {
+ Set<Long> output = setOutputUids(cg);
+
+ // the input uids contains all the output uids and
+ // projections in join expressions
+ Set<Long> input = new HashSet<Long>(output);
+
+ // Add all the uids required for doing cogroup. As in all the
+ // keys on which the cogroup is done.
+ for( LogicalExpressionPlan plan : cg.getExpressionPlans().values() ) {
+ collectUids(cg, plan, input);
+ }
+
+ // Now check for the case where the output uid is a generated one
+ // If that is the case we need to add the uids which generated it in
+ // the input
+ Map<Integer,Long> generatedInputUids = cg.getGeneratedInputUids();
+ for( Map.Entry<Integer, Long> entry : generatedInputUids.entrySet() ) {
+ Long uid = entry.getValue();
+ if( output.contains(uid) ) {
+ // Hence we need to all the full schema of the bag
+ LogicalRelationalOperator pred =
+ (LogicalRelationalOperator) cg.getPlan().getPredecessors(cg).get(entry.getKey());
+ input.addAll( getAllUids( pred.getSchema() ) );
+ }
+ }
+
+ cg.annotate(INPUTUIDS, input);
+ }
+
+ /*
+ * This function returns all uids present in the given schema
+ */
+ private Set<Long> getAllUids( LogicalSchema schema ) {
+ Set<Long> uids = new HashSet<Long>();
+
+ if( schema == null ) {
+ return uids;
+ }
+
+ for( LogicalFieldSchema field : schema.getFields() ) {
+ if( ( field.type == DataType.TUPLE || field.type == DataType.BAG )
+ && field.schema != null ) {
+ uids.addAll( getAllUids( field.schema ) );
+ }
+ uids.add( field.uid );
+ }
+ return uids;
+ }
+
+ @SuppressWarnings("unchecked")
+ public void visitLOForEach(LOForEach foreach) throws IOException {
+ Set<Long> output = setOutputUids(foreach);
+
+ LogicalPlan innerPlan = foreach.getInnerPlan();
+ LOGenerate gen = (LOGenerate)innerPlan.getSinks().get(0);
+ gen.annotate(OUTPUTUIDS, output);
+
+ ColumnDependencyVisitor v = new ColumnDependencyVisitor(innerPlan);
+ v.visit();
+
+ Set<Long> input = new HashSet<Long>();
+ List<Operator> sources = innerPlan.getSources();
+ for(Operator s: sources) {
+ Set<Long> in = (Set<Long>)s.getAnnotation(INPUTUIDS);
+ if (in != null) {
+ input.addAll(in);
+ }
+ }
+
+ foreach.annotate(INPUTUIDS, input);
+ }
+
+ @SuppressWarnings("unchecked")
+ public void visitLOGenerate(LOGenerate gen) throws IOException {
+ Set<Long> output = (Set<Long>)gen.getAnnotation(OUTPUTUIDS);
+
+ Set<Long> input = new HashSet<Long>();
+
+ List<LogicalExpressionPlan> ll = gen.getOutputPlans();
+
+ Iterator<Long> iter = output.iterator();
+ while(iter.hasNext()) {
+ long uid = iter.next();
+ boolean found = false;
+ for(int i=0; i<ll.size(); i++) {
+ LogicalExpressionPlan exp = ll.get(i);
+ LogicalExpression op = (LogicalExpression)exp.getSources().get(0);
+
+ if (op.getUid() == uid) {
+ collectUids(gen, exp, input);
+ found = true;
+
+ } else if (op instanceof ProjectExpression && ((ProjectExpression)op).isProjectStar()) {
+ int inputNum = ((ProjectExpression)op).getInputNum();
+ LogicalRelationalOperator pred = (LogicalRelationalOperator)gen.getPlan().getPredecessors(gen).get(inputNum);
+
+ if (pred.getSchema() == null) {
+ throw new SchemaNotDefinedException("Schema for " + pred.getName() + " is not defined.");
+ }
+ for(LogicalFieldSchema f: pred.getSchema().getFields()) {
+ if (f.uid == uid) {
+ input.add(uid);
+ found = true;
+ }
+ }
+
+ } else if (gen.getFlattenFlags()[i]) {
+ // if uid equal to the expression, get all uids of original projections
+ List<Operator> ss = exp.getSinks();
+ for(Operator s: ss) {
+ if (s instanceof ProjectExpression) {
+ int inputNum = ((ProjectExpression)s).getInputNum();
+ LogicalRelationalOperator pred = (LogicalRelationalOperator)gen.getPlan().getPredecessors(gen).get(inputNum);
+
+ if (pred.getSchema() == null) {
+ throw new SchemaNotDefinedException("Schema for " + pred.getName() + " is not defined.");
+ }
+ if (pred.getSchema().findField(uid) != -1) {
+ input.add(uid);
+ found = true;
+ }
+ }
+ }
+ }
+
+ if (found) {
+ break;
+ }
+ }
+
+ if (!found) {
+ throw new IOException("uid " + uid +" is not in the schema of LOForEach");
+ }
+ }
+
+ // for the flatten bag, we need to make sure at least one field is in the input
+ for(int i=0; i<ll.size(); i++) {
+ if (!gen.getFlattenFlags()[i]) {
+ continue;
+ }
+
+ LogicalExpressionPlan exp = ll.get(i);
+ Operator s = exp.getSinks().get(0);
+
+ if (s instanceof ProjectExpression) {
+ int inputNum = ((ProjectExpression)s).getInputNum();
+ int colNum = ((ProjectExpression)s).getColNum();
+ LogicalRelationalOperator pred = (LogicalRelationalOperator)gen.getPlan().getPredecessors(gen).get(inputNum);
+
+ LogicalSchema predSchema = pred.getSchema();
+ if (predSchema == null) {
+ throw new SchemaNotDefinedException("Schema for " + pred.getName() + " is not defined.");
+ }
+
+ if (predSchema.getField(colNum).type == DataType.BAG) {
+ long fuid = predSchema.getField(colNum).uid;
+ LogicalSchema fschema = predSchema.getField(colNum).schema;
+ if (input.contains(fuid)) {
+ continue;
+ }
+
+ if (fschema == null) {
+ input.add(fuid);
+ }
+
+ boolean found = false;
+ for(long uid: input) {
+ if (fschema.findField(uid) != -1) {
+ found = true;
+ break;
+ }
+ }
+
+ // if the input uids doesn't contain any field from this bag, then add the first field
+ if (!found) {
+ input.add(fschema.getField(0).uid);
+ }
+ }
+ }
+ }
+
+ gen.annotate(INPUTUIDS, input);
+ }
+
+ public void visitLOInnerLoad(LOInnerLoad load) throws IOException {
+ Set<Long> output = setOutputUids(load);
+ load.annotate(INPUTUIDS, output);
+ }
+
+ private void collectUids(LogicalRelationalOperator currentOp, LogicalExpressionPlan exp, Set<Long> uids) throws IOException {
+ List<Operator> ll = exp.getSinks();
+ for(Operator op: ll) {
+ if (op instanceof ProjectExpression) {
+ if (!((ProjectExpression)op).isProjectStar()) {
+ long uid = ((ProjectExpression)op).getUid();
+ uids.add(uid);
+ } else {
+ LogicalRelationalOperator ref = ((ProjectExpression)op).findReferent(currentOp);
+ LogicalSchema s = ref.getSchema();
+ if (s == null) {
+ throw new SchemaNotDefinedException("Schema not defined for " + ref.getAlias());
+ }
+ for(LogicalFieldSchema f: s.getFields()) {
+ uids.add(f.uid);
+ }
+ }
+ }
+ }
+ }
+
+
+ @SuppressWarnings("unchecked")
+ private Set<Long> setOutputUids(LogicalRelationalOperator op) throws IOException {
+
+ List<Operator> ll = plan.getSuccessors(op);
+ Set<Long> uids = new HashSet<Long>();
+
+ LogicalSchema s = op.getSchema();
+ if (s == null) {
+ throw new SchemaNotDefinedException("Schema for " + op.getName() + " is not defined.");
+ }
+
+ if (ll != null) {
+ // if this is not sink, the output uids are union of input uids of its successors
+ for(Operator succ: ll) {
+ Set<Long> inputUids = (Set<Long>)succ.getAnnotation(INPUTUIDS);
+ if (inputUids != null) {
+ Iterator<Long> iter = inputUids.iterator();
+ while(iter.hasNext()) {
+ long uid = iter.next();
+
+ if (s.findField(uid) != -1) {
+ uids.add(uid);
+ }
+ }
+ }
+ }
+ } else {
+ // if it's leaf, set to its schema
+ for(int i=0; i<s.size(); i++) {
+ uids.add(s.getField(i).uid);
+ }
+ }
+
+ op.annotate(OUTPUTUIDS, uids);
+ return uids;
+ }
+ }
+}
Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/FilterAboveForeach.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/FilterAboveForeach.java?rev=923043&r1=923042&r2=923043&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/FilterAboveForeach.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/FilterAboveForeach.java Mon Mar 15 03:28:27 2010
@@ -67,10 +67,10 @@ public class FilterAboveForeach extends
@Override
public Transformer getNewTransformer() {
- return new FilterAboveFlattenTransformer();
+ return new FilterAboveForEachTransformer();
}
- public class FilterAboveFlattenTransformer extends Transformer {
+ public class FilterAboveForEachTransformer extends Transformer {
LOFilter filter = null;
LOForEach foreach = null;
@@ -111,11 +111,6 @@ public class FilterAboveForeach extends
for(int j=0; j< preds.size(); j++) {
LogicalRelationalOperator logRelOp = (LogicalRelationalOperator)preds.get(j);
if (hasAll( logRelOp, uids) ) {
- // If any of the uids are of complex type then we
- // cannot think about moving this filter.
- if( containsComplexType(logRelOp.getSchema(), uids ) ) {
- break;
- }
forEachPred = (LogicalRelationalOperator) preds.get(j);
return true;
}
@@ -161,39 +156,32 @@ public class FilterAboveForeach extends
* @param uids Uids to check for
* @return true if given LogicalRelationalOperator has all the given uids
*/
- private boolean hasAll(LogicalRelationalOperator op, Set<Long> uids) {
- LogicalSchema schema = op.getSchema();
- List<LogicalSchema.LogicalFieldSchema> fields = schema.getFields();
- Set<Long> all = new HashSet<Long>();
- for(LogicalSchema.LogicalFieldSchema f:fields) {
- all.add(f.uid);
- }
+ private boolean hasAll(LogicalRelationalOperator op, Set<Long> uids) {
+ Set<Long> all = getAllProjectableUids(op.getSchema());
return all.containsAll(uids);
}
- /**
- * This function checks if any of the fields mentioned are a Bug or Tuple.
- * If so we cannot move the filter above the operator having the schema
- * @param schema Schema of the operator we are investigating
- * @param uids Uids of the fields we are checking for
- * @return true if one of the uid belong to a complex type
+ /*
+ * Projectable set of uids are uids of fields of type except a bag
*/
- private boolean containsComplexType(LogicalSchema schema, Set<Long> uids) {
- List<LogicalSchema.LogicalFieldSchema> fields = schema.getFields();
-
- for(LogicalSchema.LogicalFieldSchema f:fields) {
- if ( ( f.type == DataType.BAG || f.type == DataType.TUPLE ) ) {
- if( uids.contains( f.uid ) ) {
- return true;
- }
- if( f.schema != null && containsComplexType(f.schema, uids) ) {
- return true;
- }
+ private Set<Long> getAllProjectableUids( LogicalSchema schema ) {
+ Set<Long> uids = new HashSet<Long>();
+
+ if( schema == null ) {
+ return uids;
+ }
+
+ for( LogicalSchema.LogicalFieldSchema field : schema.getFields() ) {
+ if( field.type == DataType.TUPLE ) {
+ uids.addAll( getAllProjectableUids(field.schema ) );
+ }
+ if( field.type != DataType.BAG ) {
+ uids.add( field.uid );
}
}
- return false;
+ return uids;
}
-
+
@Override
public OperatorPlan reportChanges() {
return subPlan;
Added: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/MapKeysPruneHelper.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/MapKeysPruneHelper.java?rev=923043&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/MapKeysPruneHelper.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/MapKeysPruneHelper.java Mon Mar 15 03:28:27 2010
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.experimental.logical.rules;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.pig.data.DataType;
+import org.apache.pig.experimental.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.experimental.logical.expression.LogicalExpressionVisitor;
+import org.apache.pig.experimental.logical.expression.MapLookupExpression;
+import org.apache.pig.experimental.logical.optimizer.AllExpressionVisitor;
+import org.apache.pig.experimental.logical.relational.LOFilter;
+import org.apache.pig.experimental.logical.relational.LOGenerate;
+import org.apache.pig.experimental.logical.relational.LOJoin;
+import org.apache.pig.experimental.logical.relational.LOLoad;
+import org.apache.pig.experimental.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.experimental.logical.relational.LogicalSchema;
+import org.apache.pig.experimental.logical.relational.LogicalSchema.LogicalFieldSchema;
+import org.apache.pig.experimental.plan.DependencyOrderWalker;
+import org.apache.pig.experimental.plan.Operator;
+import org.apache.pig.experimental.plan.OperatorPlan;
+import org.apache.pig.experimental.plan.OperatorSubPlan;
+import org.apache.pig.experimental.plan.ReverseDependencyOrderWalker;
+
+/**
+ * This filter Marks every Load Operator which has a Map
+ * with MAP_MARKER_ANNOTATION. The annotation value is
+ * <code>Map<Integer,Set<String>><code> where Integer is the column number
+ * of the field and Set is the set of Keys in this field ( field is a map field only ).
+ *
+ * It does this for only the top level schema in load.
+ *
+ * Algorithm:
+ * Traverse the Plan in ReverseDependency order ( ie. Sink to Source )
+ * For LogicalRelationalOperators having MapLookupExpression in their
+ * expressionPlan collect uid and keys related to it. This is
+ * retained in the visitor
+ * For ForEach having nested LogicalPlan use the same visitor hence
+ * there is no distinction required
+ * At Sources find all the uids provided by this source and annotate this
+ * LogicalRelationalOperator ( load ) with <code>Map<Integer,Set<String>></code>
+ * containing only the column numbers that this LogicalRelationalOperator generates
+ *
+ * NOTE: This is a simple Map Pruner. If a map key is mentioned in the script
+ * then this pruner assumes you need the key. This pruner is not as optimized
+ * as column pruner ( which removes a column if it is mentioned but never used )
+ *
+ */
+public class MapKeysPruneHelper {
+
+ public static final String REQUIRED_MAPKEYS = "MapPruner:RequiredKeys";
+
+ private OperatorPlan currentPlan;
+ private OperatorSubPlan subplan;
+
+ public MapKeysPruneHelper(OperatorPlan currentPlan) {
+ this.currentPlan = currentPlan;
+
+ if (currentPlan instanceof OperatorSubPlan) {
+ subplan = new OperatorSubPlan(((OperatorSubPlan)currentPlan).getBasePlan());
+ } else {
+ subplan = new OperatorSubPlan(currentPlan);
+ }
+ }
+
+
+ @SuppressWarnings("unchecked")
+ public boolean check() throws IOException {
+
+ // First check if we have a load with a map in it or not
+ List<Operator> sources = currentPlan.getSources();
+
+ boolean hasMap = false;
+ for( Operator source : sources ) {
+ LogicalSchema schema = ((LogicalRelationalOperator)source).getSchema();
+ // If any of the loads has a null schema we dont know the ramifications here
+ // so we skip this optimization
+ if( schema == null ) {
+ return false;
+ }
+ if( hasMap( schema ) ) {
+ hasMap = true;
+ }
+ }
+
+ // We dont have any map in the first level of schema
+ if( !hasMap ) {
+ return false;
+ }
+
+
+ // Now we check what keys are needed
+ MapMarker marker = new MapMarker(currentPlan);
+ marker.visit();
+
+ // Get all Uids from Sinks
+ List<Operator> sinks = currentPlan.getSinks();
+ Set<Long> sinkMapUids = new HashSet<Long>();
+ for( Operator sink : sinks ) {
+ LogicalSchema schema = ((LogicalRelationalOperator)sink).getSchema();
+ sinkMapUids.addAll( getMapUids( schema ) );
+ }
+
+
+ // If we have found specific keys which are needed then we return true;
+ // Else if we dont have any specific keys we return false
+ boolean hasAnnotation = false;
+ for( Operator source : sources ) {
+ Map<Integer,Set<String>> annotationValue =
+ (Map<Integer, Set<String>>) ((LogicalRelationalOperator)source).getAnnotation(REQUIRED_MAPKEYS);
+
+ // Now for all full maps found in sinks we cannot prune them at source
+ if( ! sinkMapUids.isEmpty() && annotationValue != null &&
+ !annotationValue.isEmpty() ) {
+ Integer[] annotationKeyArray = annotationValue.keySet().toArray( new Integer[0] );
+ LogicalSchema sourceSchema = ((LogicalRelationalOperator)source).getSchema();
+ for( Integer col : annotationKeyArray ) {
+ if( sinkMapUids.contains(sourceSchema.getField(col).uid)) {
+ annotationValue.remove( col );
+ }
+ }
+ }
+
+ if ( annotationValue != null && annotationValue.isEmpty()) {
+ ((LogicalRelationalOperator)source).removeAnnotation(REQUIRED_MAPKEYS);
+ annotationValue = null;
+ }
+
+ // Can we still prune any keys
+ if( annotationValue != null ) {
+ hasAnnotation = true;
+ subplan.add(source);
+ }
+ }
+
+ // If all the sinks dont have any schema, we cant to any optimization
+ return hasAnnotation;
+ }
+
+ /**
+ * This function checks if the schema has a map.
+ * We dont check for a nested structure.
+ * @param schema Schema to be checked
+ * @return true if it has a map, else false
+ * @throws NullPointerException incase Schema is null
+ */
+ private boolean hasMap(LogicalSchema schema ) throws NullPointerException {
+ for( LogicalFieldSchema field : schema.getFields() ) {
+ if( field.type == DataType.MAP ) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * This function returns a set of Uids corresponding to
+ * map datatype in the first level of this schema
+ * @param schema Schema having fields
+ * @return
+ */
+ private Set<Long> getMapUids(LogicalSchema schema ) {
+ Set<Long> uids = new HashSet<Long>();
+ if( schema != null ) {
+ for( LogicalFieldSchema field : schema.getFields() ) {
+ if( field.type == DataType.MAP ) {
+ uids.add( field.uid );
+ }
+ }
+ }
+ return uids;
+ }
+
+ public OperatorPlan reportChanges() {
+ return subplan;
+ }
+
+
+ /**
+ * This class collects all the information required to create
+ * the list of keys required for a map
+ */
+ static public class MapMarker extends AllExpressionVisitor {
+
+ Map<Long,Set<String>> inputUids = null;
+
+ protected MapMarker(OperatorPlan plan) {
+ super(plan, new ReverseDependencyOrderWalker(plan));
+ inputUids = new HashMap<Long,Set<String>>();
+ }
+
+ @Override
+ public void visitLOLoad(LOLoad load) throws IOException {
+ if( load.getSchema() != null ) {
+ Map<Integer,Set<String>> annotation = new HashMap<Integer,Set<String>>();
+ for( int i=0; i<load.getSchema().size(); i++) {
+ LogicalFieldSchema field = load.getSchema().getField(i);
+ if( inputUids.containsKey( field.uid ) ) {
+ annotation.put(i, inputUids.get( field.uid ) );
+ }
+ }
+ load.annotate(REQUIRED_MAPKEYS, annotation);
+ }
+ }
+
+ @Override
+ public void visitLOFilter(LOFilter filter) throws IOException {
+ currentOp = filter;
+ MapExprMarker v = (MapExprMarker) getVisitor(filter.getFilterPlan());
+ v.visit();
+ mergeUidKeys( v.inputUids );
+ }
+
+ @Override
+ public void visitLOJoin(LOJoin join) throws IOException {
+ currentOp = join;
+ Collection<LogicalExpressionPlan> c = join.getExpressionPlans();
+ for (LogicalExpressionPlan plan : c) {
+ MapExprMarker v = (MapExprMarker) getVisitor(plan);
+ v.visit();
+ mergeUidKeys( v.inputUids );
+ }
+ }
+
+ @Override
+ public void visitLOGenerate(LOGenerate gen) throws IOException {
+ currentOp = gen;
+ Collection<LogicalExpressionPlan> plans = gen.getOutputPlans();
+ for( LogicalExpressionPlan plan : plans ) {
+ MapExprMarker v = (MapExprMarker) getVisitor(plan);
+ v.visit();
+ mergeUidKeys( v.inputUids );
+ }
+ }
+
+ private void mergeUidKeys( Map<Long, Set<String> > inputMap ) {
+ for( Map.Entry<Long, Set<String>> entry : inputMap.entrySet() ) {
+ if( inputUids.containsKey(entry.getKey()) ) {
+ Set<String> mapKeySet = inputUids.get(entry.getKey());
+ mapKeySet.addAll(entry.getValue());
+ } else {
+ inputUids.put(entry.getKey(), inputMap.get(entry.getKey()));
+ }
+ }
+ }
+
+ @Override
+ protected LogicalExpressionVisitor getVisitor(LogicalExpressionPlan expr) {
+ return new MapExprMarker(expr );
+ }
+
+ static class MapExprMarker extends LogicalExpressionVisitor {
+
+ Map<Long,Set<String>> inputUids = null;
+
+ protected MapExprMarker(OperatorPlan p) {
+ super(p, new DependencyOrderWalker(p));
+ inputUids = new HashMap<Long,Set<String>>();
+ }
+
+ public void visitMapLookup(MapLookupExpression op) throws IOException {
+ Long uid = op.getMap().getUid();
+ String key = op.getLookupKey();
+
+ HashSet<String> mapKeySet = null;
+ if( inputUids.containsKey(uid) ) {
+ mapKeySet = (HashSet<String>) inputUids.get(uid);
+ } else {
+ mapKeySet = new HashSet<String>();
+ inputUids.put(uid, mapKeySet);
+ }
+ mapKeySet.add(key);
+ }
+ }
+ }
+}
Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/PushUpFilter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/PushUpFilter.java?rev=923043&r1=923042&r2=923043&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/PushUpFilter.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/PushUpFilter.java Mon Mar 15 03:28:27 2010
@@ -172,13 +172,13 @@ public class PushUpFilter extends Rule {
// check if a relational operator contains all of the specified uids
private boolean hasAll(LogicalRelationalOperator op, Set<Long> uids) {
LogicalSchema schema = op.getSchema();
- List<LogicalSchema.LogicalFieldSchema> fields = schema.getFields();
- Set<Long> all = new HashSet<Long>();
- for(LogicalSchema.LogicalFieldSchema f:fields) {
- all.add(f.uid);
+ for(long uid: uids) {
+ if (schema.findField(uid) == -1) {
+ return false;
+ }
}
- return all.containsAll(uids);
+ return true;
}
@Override
Added: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/WholePlanRule.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/WholePlanRule.java?rev=923043&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/WholePlanRule.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/WholePlanRule.java Mon Mar 15 03:28:27 2010
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.experimental.logical.rules;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.experimental.plan.OperatorPlan;
+import org.apache.pig.experimental.plan.optimizer.Rule;
+
+/**
+ * Super class for all rules that operates on the whole plan. It doesn't look for
+ * a specific pattern. An example of such kind rule is ColumnPrune.
+ *
+ */
+public abstract class WholePlanRule extends Rule {
+
+ public WholePlanRule(String n) {
+ super(n);
+ }
+
+ public List<OperatorPlan> match(OperatorPlan plan) {
+ currentPlan = plan;
+ List<OperatorPlan> ll = new ArrayList<OperatorPlan>();
+ ll.add(plan);
+ return ll;
+ }
+
+ @Override
+ protected OperatorPlan buildPattern() {
+ return null;
+ }
+}
Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/plan/BaseOperatorPlan.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/plan/BaseOperatorPlan.java?rev=923043&r1=923042&r2=923043&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/plan/BaseOperatorPlan.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/plan/BaseOperatorPlan.java Mon Mar 15 03:28:27 2010
@@ -18,6 +18,7 @@
package org.apache.pig.experimental.plan;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintStream;
import java.util.ArrayList;
@@ -271,5 +272,17 @@ public abstract class BaseOperatorPlan i
PlanPrinter npp = new PlanPrinter(this, ps);
npp.visit();
-}
+ }
+
+ @Override
+ public String toString() {
+ ByteArrayOutputStream os = new ByteArrayOutputStream();
+ PrintStream ps = new PrintStream(os);
+ try {
+ explain(ps,"",false);
+ } catch (IOException e) {
+ return "";
+ }
+ return os.toString();
+ }
}
Added: hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalColumnPrune.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalColumnPrune.java?rev=923043&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalColumnPrune.java (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalColumnPrune.java Mon Mar 15 03:28:27 2010
@@ -0,0 +1,452 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.test;
+
+import java.util.*;
+
+import org.apache.pig.experimental.logical.LogicalPlanMigrationVistor;
+import org.apache.pig.experimental.logical.optimizer.LogicalPlanOptimizer;
+import org.apache.pig.experimental.logical.optimizer.UidStamper;
+import org.apache.pig.experimental.logical.relational.LOLoad;
+import org.apache.pig.experimental.logical.relational.LogicalPlan;
+import org.apache.pig.experimental.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.experimental.logical.rules.AddForEach;
+import org.apache.pig.experimental.logical.rules.ColumnMapKeyPrune;
+import org.apache.pig.experimental.logical.rules.MapKeysPruneHelper;
+import org.apache.pig.experimental.plan.Operator;
+import org.apache.pig.experimental.plan.OperatorPlan;
+import org.apache.pig.experimental.plan.optimizer.PlanOptimizer;
+import org.apache.pig.experimental.plan.optimizer.Rule;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.test.utils.LogicalPlanTester;
+
+import junit.framework.TestCase;
+
+public class TestExperimentalColumnPrune extends TestCase {
+
+ LogicalPlan plan = null;
+
+ private LogicalPlan migratePlan(org.apache.pig.impl.logicalLayer.LogicalPlan lp) throws VisitorException{
+ LogicalPlanMigrationVistor visitor = new LogicalPlanMigrationVistor(lp);
+ visitor.visit();
+ org.apache.pig.experimental.logical.relational.LogicalPlan newPlan = visitor.getNewLogicalPlan();
+
+ try {
+ UidStamper stamper = new UidStamper(newPlan);
+ stamper.visit();
+
+ return newPlan;
+ }catch(Exception e) {
+ throw new VisitorException(e);
+ }
+ }
+
+
+ public void testNoPrune() throws Exception {
+ // no foreach
+ LogicalPlanTester lpt = new LogicalPlanTester();
+ lpt.buildPlan("a = load 'd.txt' as (id, v1, v2);");
+ lpt.buildPlan("b = filter a by v1==NULL;");
+ org.apache.pig.impl.logicalLayer.LogicalPlan plan = lpt.buildPlan("store b into 'empty';");
+ LogicalPlan newLogicalPlan = migratePlan(plan);
+
+ PlanOptimizer optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
+ optimizer.optimize();
+
+ lpt = new LogicalPlanTester();
+ lpt.buildPlan("a = load 'd.txt' as (id, v1, v2);");
+ lpt.buildPlan("b = filter a by v1==NULL;");
+ plan = lpt.buildPlan("store b into 'empty';");
+ LogicalPlan expected = migratePlan(plan);
+
+ assertTrue(expected.isEqual(newLogicalPlan));
+
+ // no schema
+ lpt = new LogicalPlanTester();
+ lpt.buildPlan("a = load 'd.txt';");
+ lpt.buildPlan("b = foreach a generate $0, $1;");
+ plan = lpt.buildPlan("store b into 'empty';");
+ newLogicalPlan = migratePlan(plan);
+
+ optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
+ optimizer.optimize();
+
+ lpt = new LogicalPlanTester();
+ lpt.buildPlan("a = load 'd.txt';");
+ lpt.buildPlan("b = foreach a generate $0, $1;");
+ plan = lpt.buildPlan("store b into 'empty';");
+ expected = migratePlan(plan);
+ assertTrue(expected.isEqual(newLogicalPlan));
+ }
+
+ public void testPrune() throws Exception {
+ // only foreach
+ LogicalPlanTester lpt = new LogicalPlanTester();
+ lpt.buildPlan("a = load 'd.txt' as (id, v1, v2);");
+ lpt.buildPlan("b = foreach a generate id;");
+ org.apache.pig.impl.logicalLayer.LogicalPlan plan = lpt.buildPlan("store b into 'empty';");
+ LogicalPlan newLogicalPlan = migratePlan(plan);
+
+ PlanOptimizer optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
+ optimizer.optimize();
+
+ lpt = new LogicalPlanTester();
+ lpt.buildPlan("a = load 'd.txt' as (id);");
+ lpt.buildPlan("b = foreach a generate id;");
+ plan = lpt.buildPlan("store b into 'empty';");
+ LogicalPlan expected = migratePlan(plan);
+
+ assertTrue(expected.isEqual(newLogicalPlan));
+
+ // with filter
+ lpt = new LogicalPlanTester();
+ lpt.buildPlan("a = load 'd.txt' as (id, v1, v5, v3, v4, v2);");
+ lpt.buildPlan("b = filter a by v1 != NULL AND (v2+v3)<100;");
+ lpt.buildPlan("c = foreach b generate id;");
+ plan = lpt.buildPlan("store c into 'empty';");
+ newLogicalPlan = migratePlan(plan);
+
+ optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
+ optimizer.optimize();
+
+ lpt = new LogicalPlanTester();
+ lpt.buildPlan("a = load 'd.txt' as (id, v1, v3, v2);");
+ lpt.buildPlan("b = filter a by v1 != NULL AND (v2+v3)<100;");
+ lpt.buildPlan("c = foreach b generate id;");
+ plan = lpt.buildPlan("store c into 'empty';");
+ expected = migratePlan(plan);
+ assertTrue(expected.isEqual(newLogicalPlan));
+
+ // with 2 foreach
+ lpt = new LogicalPlanTester();
+ lpt.buildPlan("a = load 'd.txt' as (id, v1, v5, v3, v4, v2);");
+ lpt.buildPlan("b = foreach a generate v2, v5, v4;");
+ lpt.buildPlan("c = foreach b generate v5, v4;");
+ plan = lpt.buildPlan("store c into 'empty';");
+ newLogicalPlan = migratePlan(plan);
+
+ optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
+ optimizer.optimize();
+
+ lpt = new LogicalPlanTester();
+ lpt.buildPlan("a = load 'd.txt' as (v5, v4);");
+ lpt.buildPlan("b = foreach a generate v5, v4;");
+ lpt.buildPlan("c = foreach b generate v5, v4;");
+ plan = lpt.buildPlan("store c into 'empty';");
+ expected = migratePlan(plan);
+ assertTrue(expected.isEqual(newLogicalPlan));
+
+ // with 2 foreach
+ lpt = new LogicalPlanTester();
+ lpt.buildPlan("a = load 'd.txt' as (id, v1, v5, v3, v4, v2);");
+ lpt.buildPlan("b = foreach a generate id, v1, v5, v3, v4;");
+ lpt.buildPlan("c = foreach b generate v5, v4;");
+ plan = lpt.buildPlan("store c into 'empty';");
+ newLogicalPlan = migratePlan(plan);
+
+ optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
+ optimizer.optimize();
+
+ lpt = new LogicalPlanTester();
+ lpt.buildPlan("a = load 'd.txt' as (v5, v4);");
+ lpt.buildPlan("b = foreach a generate v5, v4;");
+ lpt.buildPlan("c = foreach b generate v5, v4;");
+ plan = lpt.buildPlan("store c into 'empty';");
+ expected = migratePlan(plan);
+ assertTrue(expected.isEqual(newLogicalPlan));
+
+ // with 2 foreach and filter in between
+ lpt = new LogicalPlanTester();
+ lpt.buildPlan("a = load 'd.txt' as (id, v1, v5, v3, v4, v2);");
+ lpt.buildPlan("b = foreach a generate v2, v5, v4;");
+ lpt.buildPlan("c = filter b by v2 != NULL;");
+ lpt.buildPlan("d = foreach c generate v5, v4;");
+ plan = lpt.buildPlan("store d into 'empty';");
+ newLogicalPlan = migratePlan(plan);
+
+ optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
+ optimizer.optimize();
+
+ lpt = new LogicalPlanTester();
+ lpt.buildPlan("a = load 'd.txt' as (v5, v4, v2);");
+ lpt.buildPlan("b = foreach a generate v2, v5, v4;");
+ lpt.buildPlan("c = filter b by v2 != NULL;");
+ lpt.buildPlan("d = foreach c generate v5, v4;");
+ plan = lpt.buildPlan("store d into 'empty';");
+ expected = migratePlan(plan);
+ assertTrue(expected.isEqual(newLogicalPlan));
+
+ // with 2 foreach after join
+ lpt = new LogicalPlanTester();
+ lpt.buildPlan("a = load 'd.txt' as (id, v1, v2, v3);");
+ lpt.buildPlan("b = load 'c.txt' as (id, v4, v5, v6);");
+ lpt.buildPlan("c = join a by id, b by id;");
+ lpt.buildPlan("d = foreach c generate a::id, v5, v3, v4;");
+ plan = lpt.buildPlan("store d into 'empty';");
+ newLogicalPlan = migratePlan(plan);
+
+ optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
+ optimizer.optimize();
+
+ lpt = new LogicalPlanTester();
+ lpt.buildPlan("a = load 'd.txt' as (id, v3);");
+ lpt.buildPlan("b = load 'c.txt' as (id, v4, v5);");
+ lpt.buildPlan("c = join a by id, b by id;");
+ lpt.buildPlan("d = foreach c generate a::id, v5, v3, v4;");
+ plan = lpt.buildPlan("store d into 'empty';");
+ expected = migratePlan(plan);
+ assertTrue(expected.isEqual(newLogicalPlan));
+
+ // with BinStorage, insert foreach after load
+ lpt = new LogicalPlanTester();
+ lpt.buildPlan("a = load 'd.txt' using BinStorage() as (id, v1, v5, v3, v4, v2);");
+ lpt.buildPlan("c = filter a by v2 != NULL;");
+ lpt.buildPlan("d = foreach c generate v5, v4;");
+ plan = lpt.buildPlan("store d into 'empty';");
+ newLogicalPlan = migratePlan(plan);
+
+ optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
+ optimizer.optimize();
+
+ lpt = new LogicalPlanTester();
+ lpt.buildPlan("a = load 'd.txt' using BinStorage() as (id, v1, v5, v3, v4, v2);");
+ lpt.buildPlan("b = foreach a generate v5, v4, v2;");
+ lpt.buildPlan("c = filter b by v2 != NULL;");
+ lpt.buildPlan("d = foreach c generate v5, v4;");
+ plan = lpt.buildPlan("store d into 'empty';");
+ expected = migratePlan(plan);
+ assertTrue(expected.isEqual(newLogicalPlan));
+
+ // with BinStorage, not to insert foreach after load if there is already one
+ lpt = new LogicalPlanTester();
+ lpt.buildPlan("a = load 'd.txt' using BinStorage() as (id, v1, v5, v3, v4, v2);");
+ lpt.buildPlan("b = foreach a generate v5, v4, v2;");
+ lpt.buildPlan("c = filter b by v2 != NULL;");
+ lpt.buildPlan("d = foreach c generate v5;");
+ plan = lpt.buildPlan("store d into 'empty';");
+ newLogicalPlan = migratePlan(plan);
+
+ optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
+ optimizer.optimize();
+
+ lpt = new LogicalPlanTester();
+ lpt.buildPlan("a = load 'd.txt' using BinStorage() as (id, v1, v5, v3, v4, v2);");
+ lpt.buildPlan("b = foreach a generate v5, v2;");
+ lpt.buildPlan("c = filter b by v2 != NULL;");
+ lpt.buildPlan("d = foreach c generate v5;");
+ plan = lpt.buildPlan("store d into 'empty';");
+ expected = migratePlan(plan);
+ assertTrue(expected.isEqual(newLogicalPlan));
+
+ // with BinStorage, not to insert foreach after load if there is already one
+ lpt = new LogicalPlanTester();
+ lpt.buildPlan("a = load 'd.txt' using BinStorage() as (id, v1, v5, v3, v4, v2);");
+ lpt.buildPlan("b = foreach a generate v5, v4, v2, 10;");
+ lpt.buildPlan("c = filter b by v2 != NULL;");
+ lpt.buildPlan("d = foreach c generate v5;");
+ plan = lpt.buildPlan("store d into 'empty';");
+ newLogicalPlan = migratePlan(plan);
+
+ optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
+ optimizer.optimize();
+
+ lpt = new LogicalPlanTester();
+ lpt.buildPlan("a = load 'd.txt' using BinStorage() as (id, v1, v5, v3, v4, v2);");
+ lpt.buildPlan("b = foreach a generate v5, v2, 10;");
+ lpt.buildPlan("c = filter b by v2 != NULL;");
+ lpt.buildPlan("d = foreach c generate v5;");
+ plan = lpt.buildPlan("store d into 'empty';");
+ expected = migratePlan(plan);
+ assertTrue(expected.isEqual(newLogicalPlan));
+ }
+
+ public void testPruneWithMapKey() throws Exception {
+ // only foreach
+ LogicalPlanTester lpt = new LogicalPlanTester();
+ lpt.buildPlan("a = load 'd.txt' as (id, v1, m:map[]);");
+ lpt.buildPlan("b = foreach a generate id, m#'path';");
+ org.apache.pig.impl.logicalLayer.LogicalPlan plan = lpt.buildPlan("store b into 'empty';");
+ LogicalPlan newLogicalPlan = migratePlan(plan);
+
+ PlanOptimizer optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
+ optimizer.optimize();
+
+ lpt = new LogicalPlanTester();
+ lpt.buildPlan("a = load 'd.txt' as (id, m:map[]);");
+ lpt.buildPlan("b = foreach a generate id, m#'path';");
+ plan = lpt.buildPlan("store b into 'empty';");
+ LogicalPlan expected = migratePlan(plan);
+
+ assertTrue(expected.isEqual(newLogicalPlan));
+
+ LOLoad op = (LOLoad)newLogicalPlan.getSources().get(0);
+ Map<Integer,Set<String>> annotation =
+ (Map<Integer, Set<String>>) op.getAnnotation(MapKeysPruneHelper.REQUIRED_MAPKEYS);
+ assertEquals(annotation.size(), 1);
+ Set<String> s = new HashSet<String>();
+ s.add("path");
+ assertEquals(annotation.get(2), s);
+
+ // foreach with join
+ lpt = new LogicalPlanTester();
+ lpt.buildPlan("a = load 'd.txt' as (id, v1, m:map[]);");
+ lpt.buildPlan("b = load 'd.txt' as (id, v1, m:map[]);");
+ lpt.buildPlan("c = join a by id, b by id;");
+ lpt.buildPlan("d = filter c by a::m#'path' != NULL;");
+ lpt.buildPlan("e = foreach d generate a::id, b::id, b::m#'path', a::m;");
+ plan = lpt.buildPlan("store e into 'empty';");
+ newLogicalPlan = migratePlan(plan);
+
+ optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
+ optimizer.optimize();
+
+ lpt = new LogicalPlanTester();
+ lpt.buildPlan("a = load 'd.txt' as (id, m:map[]);");
+ lpt.buildPlan("b = load 'd.txt' as (id, m:map[]);");
+ lpt.buildPlan("c = join a by id, b by id;");
+ lpt.buildPlan("d = filter c by a::m#'path' != NULL;");
+ lpt.buildPlan("e = foreach d generate a::id, b::id, b::m#'path', a::m;");
+ plan = lpt.buildPlan("store e into 'empty';");
+ expected = migratePlan(plan);
+
+ assertTrue(expected.isEqual(newLogicalPlan));
+
+ List<Operator> ll = newLogicalPlan.getSources();
+ assertEquals(ll.size(), 2);
+ LOLoad loada = null;
+ LOLoad loadb = null;
+ for(Operator opp: ll) {
+ if (((LogicalRelationalOperator)opp).getAlias().equals("a")) {
+ loada = (LOLoad)opp;
+ continue;
+ }
+
+ if (((LogicalRelationalOperator)opp).getAlias().equals("b")) {
+ loadb = (LOLoad)opp;
+ continue;
+ }
+ }
+
+ annotation =
+ (Map<Integer, Set<String>>) loada.getAnnotation(MapKeysPruneHelper.REQUIRED_MAPKEYS);
+ assertNull(annotation);
+
+ annotation =
+ (Map<Integer, Set<String>>) loadb.getAnnotation(MapKeysPruneHelper.REQUIRED_MAPKEYS);
+ assertEquals(annotation.size(), 1);
+
+ s = new HashSet<String>();
+ s.add("path");
+ assertEquals(annotation.get(2), s);
+ }
+
+ public void testPruneWithBag() throws Exception {
+ // filter above foreach
+ LogicalPlanTester lpt = new LogicalPlanTester();
+ lpt.buildPlan("a = load 'd.txt' as (id, v:bag{t:(s1,s2,s3)});");
+ lpt.buildPlan("b = filter a by id>10;");
+ lpt.buildPlan("c = foreach b generate id, FLATTEN(v);");
+ lpt.buildPlan("d = foreach c generate id, v::s2;");
+ org.apache.pig.impl.logicalLayer.LogicalPlan plan = lpt.buildPlan("store d into 'empty';");
+ LogicalPlan newLogicalPlan = migratePlan(plan);
+
+ PlanOptimizer optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
+ optimizer.optimize();
+
+ lpt = new LogicalPlanTester();
+ lpt.buildPlan("a = load 'd.txt' as (id, v:bag{t:(s1,s2,s3)});");
+ lpt.buildPlan("b = filter a by id>10;");
+ lpt.buildPlan("c = foreach b generate id, FLATTEN(v);");
+ lpt.buildPlan("d = foreach c generate id, v::s2;");
+ plan = lpt.buildPlan("store d into 'empty';");
+ LogicalPlan expected = migratePlan(plan);
+
+ assertTrue(expected.isEqual(newLogicalPlan));
+ }
+
+ public void testAddForeach() throws Exception {
+ // filter above foreach
+ LogicalPlanTester lpt = new LogicalPlanTester();
+ lpt.buildPlan("a = load 'd.txt' as (id, v1, v2);");
+ lpt.buildPlan("b = filter a by v1>10;");
+ lpt.buildPlan("c = foreach b generate id;");
+ org.apache.pig.impl.logicalLayer.LogicalPlan plan = lpt.buildPlan("store c into 'empty';");
+ LogicalPlan newLogicalPlan = migratePlan(plan);
+
+ PlanOptimizer optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
+ optimizer.optimize();
+
+ lpt = new LogicalPlanTester();
+ lpt.buildPlan("a = load 'd.txt' as (id, v1);");
+ lpt.buildPlan("b = filter a by v1>10;");
+ lpt.buildPlan("c = foreach b generate id;");
+ plan = lpt.buildPlan("store c into 'empty';");
+ LogicalPlan expected = migratePlan(plan);
+
+ assertTrue(expected.isEqual(newLogicalPlan));
+
+ // join with foreach
+ lpt = new LogicalPlanTester();
+ lpt.buildPlan("a = load 'd.txt' as (id, v1, v2);");
+ lpt.buildPlan("b = load 'd.txt' as (id, v1, v2);");
+ lpt.buildPlan("c = join a by id, b by id;");
+ lpt.buildPlan("d = filter c by a::v1>b::v1;");
+ lpt.buildPlan("e = foreach d generate a::id;");
+ plan = lpt.buildPlan("store e into 'empty';");
+ newLogicalPlan = migratePlan(plan);
+
+ optimizer = new MyPlanOptimizer(newLogicalPlan, 3);
+ optimizer.optimize();
+
+ lpt = new LogicalPlanTester();
+ lpt.buildPlan("a = load 'd.txt' as (id, v1);");
+ lpt.buildPlan("b = load 'd.txt' as (id, v1);");
+ lpt.buildPlan("c = join a by id, b by id;");
+ lpt.buildPlan("d = foreach c generate a::id, a::v1, b::v1;");
+ lpt.buildPlan("e = filter d by a::v1>b::v1;");
+ lpt.buildPlan("f = foreach e generate a::id;");
+ plan = lpt.buildPlan("store f into 'empty';");
+ expected = migratePlan(plan);
+
+ assertTrue(expected.isEqual(newLogicalPlan));
+ }
+
+ public class MyPlanOptimizer extends LogicalPlanOptimizer {
+
+ protected MyPlanOptimizer(OperatorPlan p, int iterations) {
+ super(p, iterations);
+ }
+
+ protected List<Set<Rule>> buildRuleSets() {
+ List<Set<Rule>> ls = new ArrayList<Set<Rule>>();
+
+ Rule r = new ColumnMapKeyPrune("ColumnMapKeyPrune");
+ Set<Rule> s = new HashSet<Rule>();
+ s.add(r);
+ ls.add(s);
+
+ r = new AddForEach("AddForEach");
+ s = new HashSet<Rule>();
+ s.add(r);
+ ls.add(s);
+
+ return ls;
+ }
+ }
+}