You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2010/08/04 19:46:48 UTC
svn commit: r982345 [7/13] - in /hadoop/pig/trunk: ./ src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/
src/org/apache/pig/experimental/ src/org/apache/pig/newplan/
src/org/apache/pig/newplan/logical/ src/org/apache/pig/newplan/log...
Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/AddForEach.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/AddForEach.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/AddForEach.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/AddForEach.java Wed Aug 4 17:46:42 2010
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan.logical.rules;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.OperatorSubPlan;
+import org.apache.pig.newplan.logical.Util;
+import org.apache.pig.newplan.logical.relational.LOFilter;
+import org.apache.pig.newplan.logical.relational.LOForEach;
+import org.apache.pig.newplan.logical.relational.LOSort;
+import org.apache.pig.newplan.logical.relational.LOSplitOutput;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.newplan.logical.relational.LogicalSchema;
+import org.apache.pig.newplan.optimizer.Transformer;
+
+public class AddForEach extends WholePlanRule {
+ protected static final String STATUS = "AddForEach:Status";
+ protected static final int STATUSADDED = 1;
+ protected static final int STATUSNEWFOREACH = 2;
+
+ public AddForEach(String n) {
+ super(n);
+ }
+
+ @Override
+ public Transformer getNewTransformer() {
+ return new AddForEachTransformer();
+ }
+
+ public class AddForEachTransformer extends Transformer {
+ LogicalRelationalOperator opForAdd;
+ OperatorSubPlan subPlan;
+
+ @Override
+ public boolean check(OperatorPlan matched) throws IOException {
+ Iterator<Operator> iter = matched.getOperators();
+ while(iter.hasNext()) {
+ LogicalRelationalOperator op = (LogicalRelationalOperator)iter.next();
+ if ((op instanceof LOFilter||op instanceof LOSort||op instanceof LOSplitOutput) && shouldAdd(op)) {
+ opForAdd = op;
+ return true;
+ }
+ }
+
+ return false;
+ }
+
+ @Override
+ public OperatorPlan reportChanges() {
+ return subPlan;
+ }
+
+ private void addSuccessors(Operator op) throws IOException {
+ subPlan.add(op);
+ List<Operator> ll = op.getPlan().getSuccessors(op);
+ if (ll != null) {
+ for(Operator suc: ll) {
+ addSuccessors(suc);
+ }
+ }
+ }
+
+ @Override
+ public void transform(OperatorPlan matched) throws IOException {
+ addForeach(opForAdd);
+
+ subPlan = new OperatorSubPlan(currentPlan);
+ addSuccessors(opForAdd);
+ }
+
+ @SuppressWarnings("unchecked")
+ // check if an LOForEach should be added after the logical operator
+ private boolean shouldAdd(LogicalRelationalOperator op) throws IOException {
+ Integer status = (Integer)op.getAnnotation(STATUS);
+ if (status!=null && (status==STATUSADDED ||status==STATUSNEWFOREACH))
+ return false;
+
+ Set<Long> outputUids = (Set<Long>)op.getAnnotation(ColumnPruneHelper.OUTPUTUIDS);
+ if (outputUids==null)
+ return false;
+
+ LogicalSchema schema = op.getSchema();
+ if (schema==null)
+ return false;
+
+ Set<Integer> columnsToDrop = new HashSet<Integer>();
+
+ for (int i=0;i<schema.size();i++) {
+ if (!outputUids.contains(schema.getField(i).uid))
+ columnsToDrop.add(i);
+ }
+
+ if (!columnsToDrop.isEmpty()) return true;
+
+ return false;
+ }
+
+ @SuppressWarnings("unchecked")
+ private void addForeach(LogicalRelationalOperator op) throws IOException {
+ Set<Long> outputUids = (Set<Long>)op.getAnnotation(ColumnPruneHelper.OUTPUTUIDS);
+ LogicalSchema schema = op.getSchema();
+ Set<Integer> columnsToDrop = new HashSet<Integer>();
+
+ for (int i=0;i<schema.size();i++) {
+ if (!outputUids.contains(schema.getField(i).uid))
+ columnsToDrop.add(i);
+ }
+
+ if (!columnsToDrop.isEmpty()) {
+ LOForEach foreach = Util.addForEachAfter((LogicalPlan)op.getPlan(), op, columnsToDrop);
+ op.annotate(STATUS, STATUSADDED);
+ foreach.annotate(STATUS, STATUSNEWFOREACH);
+ }
+ }
+ }
+}
Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnMapKeyPrune.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnMapKeyPrune.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnMapKeyPrune.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnMapKeyPrune.java Wed Aug 4 17:46:42 2010
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan.logical.rules;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.pig.impl.util.Pair;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.logical.relational.LOLoad;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.newplan.optimizer.Transformer;
+
+/**
+ * This Rule prunes columns and map keys and set to loader. This rule depends
+ * on MapKeysPruneHelper to calculate what keys are required for a loader,
+ * and ColumnPruneHelper to calculate the required columns for a loader. Then
+ * it combines the map keys and columns info to set into the loader.
+ */
+public class ColumnMapKeyPrune extends WholePlanRule {
+ private boolean hasRun;
+
+ public ColumnMapKeyPrune(String n) {
+ super(n);
+ hasRun = false;
+ }
+
+ @Override
+ public Transformer getNewTransformer() {
+ return new ColumnMapKeyPruneTransformer();
+ }
+
+ public class ColumnMapKeyPruneTransformer extends Transformer {
+ private MapKeysPruneHelper mapKeyHelper;
+ private ColumnPruneHelper columnHelper;
+ private boolean columnPrune;
+ private boolean mapKeyPrune;
+
+ /*
+ * This is a map of of required columns and map keys for each LOLoad
+ * RequiredMapKeys --> Map<Integer, Set<String> >
+ * RequiredColumns --> Set<Integer>
+ *
+ * The integer are column indexes.
+ */
+ private Map<LOLoad,Pair<Map<Integer,Set<String>>,Set<Integer>>> requiredItems =
+ new HashMap<LOLoad,Pair<Map<Integer,Set<String>>,Set<Integer>>>();
+
+ @Override
+ public boolean check(OperatorPlan matched) throws IOException {
+ // only run this rule once
+ if (hasRun) {
+ return false;
+ }
+
+ hasRun = true;
+ mapKeyHelper = new MapKeysPruneHelper(matched);
+ columnHelper = new ColumnPruneHelper(matched);
+
+ // check if map keys can be pruned
+ mapKeyPrune = mapKeyHelper.check();
+ // check if columns can be pruned
+ columnPrune = columnHelper.check();
+
+ return mapKeyPrune || columnPrune;
+ }
+
+ @Override
+ public OperatorPlan reportChanges() {
+ return currentPlan;
+ }
+
+ @SuppressWarnings("unchecked")
+ private void merge() {
+ // combine annotations
+ for( Operator source : currentPlan.getSources() ) {
+ Map<Integer,Set<String>> mapKeys =
+ (Map<Integer, Set<String>>) source.getAnnotation(MapKeysPruneHelper.REQUIRED_MAPKEYS);
+ Set<Integer> requiredColumns = null;
+ if (source.getAnnotation(ColumnPruneHelper.REQUIREDCOLS) != null) {
+ requiredColumns = new HashSet<Integer>((Set<Integer>) source.getAnnotation(ColumnPruneHelper.REQUIREDCOLS));
+ }
+
+ // We dont have any information so skip
+ if( requiredColumns == null && mapKeys == null ) {
+ continue;
+ }
+
+ if( requiredColumns != null && mapKeys != null ) {
+
+ Set<Integer> duplicatedCols = new HashSet<Integer>();
+
+ // Remove the columns already marked by MapKeys
+ for( Integer col : requiredColumns ) {
+ if( mapKeys.containsKey(col) ) {
+ duplicatedCols.add(col);
+ }
+ }
+ requiredColumns.removeAll(duplicatedCols);
+ } else if ( mapKeys != null && requiredColumns == null ) {
+ // This is the case where only mapKeys can be pruned. And none
+ // of the columns can be pruned. So we add all columns to the
+ // requiredcolumns part
+ requiredColumns = new HashSet<Integer>();
+ for(int i = 0; i < ((LogicalRelationalOperator)source).getSchema().size(); i++ ) {
+ if( !mapKeys.containsKey(i) ) {
+ requiredColumns.add(i);
+ }
+ }
+ }
+
+ requiredItems.put((LOLoad) source, new Pair<Map<Integer,Set<String>>,Set<Integer>>(mapKeys, requiredColumns));
+ }
+ }
+
+ @Override
+ public void transform(OperatorPlan matched) throws IOException {
+ merge();
+
+ ColumnPruneVisitor columnPruneVisitor = new ColumnPruneVisitor(currentPlan, requiredItems, columnPrune);
+ columnPruneVisitor.visit();
+ }
+ }
+}
Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnPruneHelper.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnPruneHelper.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnPruneHelper.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnPruneHelper.java Wed Aug 4 17:46:42 2010
@@ -0,0 +1,573 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.newplan.logical.rules;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.pig.data.DataType;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.OperatorSubPlan;
+import org.apache.pig.newplan.ReverseDependencyOrderWalker;
+import org.apache.pig.newplan.logical.expression.LogicalExpression;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.logical.expression.ProjectExpression;
+import org.apache.pig.newplan.logical.relational.LOCogroup;
+import org.apache.pig.newplan.logical.relational.LOCross;
+import org.apache.pig.newplan.logical.relational.LODistinct;
+import org.apache.pig.newplan.logical.relational.LOFilter;
+import org.apache.pig.newplan.logical.relational.LOForEach;
+import org.apache.pig.newplan.logical.relational.LOGenerate;
+import org.apache.pig.newplan.logical.relational.LOInnerLoad;
+import org.apache.pig.newplan.logical.relational.LOJoin;
+import org.apache.pig.newplan.logical.relational.LOLimit;
+import org.apache.pig.newplan.logical.relational.LOLoad;
+import org.apache.pig.newplan.logical.relational.LOSort;
+import org.apache.pig.newplan.logical.relational.LOSplit;
+import org.apache.pig.newplan.logical.relational.LOSplitOutput;
+import org.apache.pig.newplan.logical.relational.LOStore;
+import org.apache.pig.newplan.logical.relational.LOStream;
+import org.apache.pig.newplan.logical.relational.LOUnion;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalNodesVisitor;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.newplan.logical.relational.LogicalSchema;
+import org.apache.pig.newplan.logical.relational.SchemaNotDefinedException;
+import org.apache.pig.newplan.logical.relational.LogicalSchema.LogicalFieldSchema;
+
+/**
+ * Helper class used by ColumnMapKeyPrune to figure out what columns can be pruned.
+ * It doesn't make any changes to the operator plan
+ *
+ */
+public class ColumnPruneHelper {
+ protected static final String INPUTUIDS = "ColumnPrune:InputUids";
+ public static final String OUTPUTUIDS = "ColumnPrune:OutputUids";
+ protected static final String REQUIREDCOLS = "ColumnPrune:RequiredColumns";
+
+ private OperatorPlan currentPlan;
+ private OperatorSubPlan subPlan;
+
+ public ColumnPruneHelper(OperatorPlan currentPlan) {
+ this.currentPlan = currentPlan;
+ }
+
+ private OperatorSubPlan getSubPlan() throws IOException {
+ OperatorSubPlan p = null;
+ if (currentPlan instanceof OperatorSubPlan) {
+ p = new OperatorSubPlan(((OperatorSubPlan)currentPlan).getBasePlan());
+ } else {
+ p = new OperatorSubPlan(currentPlan);
+ }
+ Iterator<Operator> iter = currentPlan.getOperators();
+
+ while(iter.hasNext()) {
+ Operator op = iter.next();
+ if (op instanceof LOForEach) {
+ addOperator(op, p);
+ }
+ }
+
+ return p;
+ }
+
+ private void addOperator(Operator op, OperatorSubPlan subplan) throws IOException {
+ if (op == null) {
+ return;
+ }
+
+ subplan.add(op);
+
+ List<Operator> ll = currentPlan.getPredecessors(op);
+ if (ll == null) {
+ return;
+ }
+
+ for(Operator pred: ll) {
+ addOperator(pred, subplan);
+ }
+ }
+
+
+ @SuppressWarnings("unchecked")
+ public boolean check() throws IOException {
+ List<Operator> sources = currentPlan.getSources();
+ // if this rule has run before, just return false
+ if (sources.get(0).getAnnotation(INPUTUIDS) != null) {
+ return false;
+ }
+
+ // create sub-plan that ends with foreach
+ subPlan = getSubPlan();
+ if (subPlan.size() == 0) {
+ return false;
+ }
+
+ ColumnDependencyVisitor v = new ColumnDependencyVisitor(subPlan);
+ try {
+ v.visit();
+ }catch(SchemaNotDefinedException e) {
+ // if any operator has an unknown schema, just return false
+ return false;
+ }
+
+ List<Operator> ll = subPlan.getSources();
+ boolean found = false;
+ for(Operator op: ll) {
+ if (op instanceof LOLoad) {
+ Set<Long> uids = (Set<Long>)op.getAnnotation(INPUTUIDS);
+ LogicalSchema s = ((LOLoad) op).getSchema();
+ Set<Integer> required = getColumns(s, uids);
+
+ if (required.size() < s.size()) {
+ op.annotate(REQUIREDCOLS, required);
+ found = true;
+ }
+ }
+ }
+
+ return found;
+ }
+
+ // get a set of column indexes from a set of uids
+ protected Set<Integer> getColumns(LogicalSchema schema, Set<Long> uids) throws IOException {
+ if (schema == null) {
+ throw new SchemaNotDefinedException("Schema is not defined.");
+ }
+
+ Set<Integer> cols = new HashSet<Integer>();
+ Iterator<Long> iter = uids.iterator();
+ while(iter.hasNext()) {
+ long uid = iter.next();
+ int index = schema.findField(uid);
+ if (index == -1) {
+ throw new IOException("UID " + uid + " is not found in the schema");
+ }
+
+ cols.add(index);
+ }
+
+ return cols;
+ }
+
+ public OperatorPlan reportChanges() {
+ return subPlan;
+ }
+
+ // Visitor to calculate the input and output uids for each operator
+ // It doesn't change the plan, only put calculated info as annotations
+ // The input and output uids are not necessarily the top level uids of
+ // a schema. They may be the uids of lower level fields of complex fields
+ // that have their own schema.
+ static private class ColumnDependencyVisitor extends LogicalRelationalNodesVisitor {
+
+ public ColumnDependencyVisitor(OperatorPlan plan) {
+ super(plan, new ReverseDependencyOrderWalker(plan));
+ }
+
+ @Override
+ public void visit(LOLoad load) throws IOException {
+ Set<Long> output = setOutputUids(load);
+
+ // for load, input uids are same as output uids
+ load.annotate(INPUTUIDS, output);
+ }
+
+ @Override
+ public void visit(LOFilter filter) throws IOException {
+ Set<Long> output = setOutputUids(filter);
+
+ // the input uids contains all the output uids and
+ // projections in filter conditions
+ Set<Long> input = new HashSet<Long>(output);
+
+ LogicalExpressionPlan exp = filter.getFilterPlan();
+ collectUids(filter, exp, input);
+
+ filter.annotate(INPUTUIDS, input);
+ }
+
+ @Override
+ public void visit(LOStore store) throws IOException {
+ Set<Long> output = setOutputUids(store);
+
+ if (output.isEmpty()) {
+ // to deal with load-store-load-store case
+ LogicalSchema s = store.getSchema();
+ if (s == null) {
+ throw new SchemaNotDefinedException("Schema for " + store.getName() + " is not defined.");
+ }
+
+ for(int i=0; i<s.size(); i++) {
+ output.add(s.getField(i).uid);
+ }
+ }
+
+ // for store, input uids are same as output uids
+ store.annotate(INPUTUIDS, output);
+ }
+
+ @Override
+ public void visit(LOJoin join) throws IOException {
+ Set<Long> output = setOutputUids(join);
+
+ // the input uids contains all the output uids and
+ // projections in join expressions
+ Set<Long> input = new HashSet<Long>(output);
+
+ Collection<LogicalExpressionPlan> exps = join.getExpressionPlans();
+ Iterator<LogicalExpressionPlan> iter = exps.iterator();
+ while(iter.hasNext()) {
+ LogicalExpressionPlan exp = iter.next();
+ collectUids(join, exp, input);
+ }
+
+ join.annotate(INPUTUIDS, input);
+ }
+
+ @Override
+ public void visit(LOCogroup cg) throws IOException {
+ Set<Long> output = setOutputUids(cg);
+
+ // the input uids contains all the output uids and
+ // projections in join expressions
+ Set<Long> input = new HashSet<Long>();
+
+ // Add all the uids required for doing cogroup. As in all the
+ // keys on which the cogroup is done.
+ for( LogicalExpressionPlan plan : cg.getExpressionPlans().values() ) {
+ collectUids(cg, plan, input);
+ }
+
+ // Now check for the case where the output uid is a generated one
+ // If that is the case we need to add the uids which generated it in
+ // the input
+ Map<Integer,Long> generatedInputUids = cg.getGeneratedInputUids();
+ for( Map.Entry<Integer, Long> entry : generatedInputUids.entrySet() ) {
+ Long uid = entry.getValue();
+ if( output.contains(uid) ) {
+ // Hence we need to all the full schema of the bag
+ LogicalRelationalOperator pred =
+ (LogicalRelationalOperator) cg.getPlan().getPredecessors(cg).get(entry.getKey());
+ input.addAll( getAllUids( pred.getSchema() ) );
+ }
+ }
+
+ cg.annotate(INPUTUIDS, input);
+ }
+
+ @Override
+ public void visit(LOLimit limit) throws IOException {
+ Set<Long> output = setOutputUids(limit);
+ limit.annotate(INPUTUIDS, output);
+ }
+
+ @Override
+ public void visit(LOStream stream) throws IOException {
+ Set<Long> input = new HashSet<Long>();
+
+ // Every field is required
+ LogicalSchema s = stream.getSchema();
+ if (s == null) {
+ throw new SchemaNotDefinedException("Schema for " + stream.getName() + " is not defined.");
+ }
+
+ for(int i=0; i<s.size(); i++) {
+ input.add(s.getField(i).uid);
+ }
+ stream.annotate(INPUTUIDS, input);
+ }
+
+ @Override
+ public void visit(LODistinct distinct) throws IOException {
+ Set<Long> input = new HashSet<Long>();
+
+ // Every field is required
+ LogicalSchema s = distinct.getSchema();
+ if (s == null) {
+ throw new SchemaNotDefinedException("Schema for " + distinct.getName() + " is not defined.");
+ }
+
+ for(int i=0; i<s.size(); i++) {
+ input.add(s.getField(i).uid);
+ }
+ distinct.annotate(INPUTUIDS, input);
+ }
+
+ @Override
+ public void visit(LOCross cross) throws IOException {
+ Set<Long> output = setOutputUids(cross);
+ // Since we do not change the topology of the plan, we keep
+ // at least one input for each predecessor.
+ List<Operator> preds = plan.getPredecessors(cross);
+ for (Operator pred : preds) {
+ LogicalSchema schema = ((LogicalRelationalOperator)pred).getSchema();
+ Set<Long> uids = getAllUids(schema);
+ boolean allPruned = true;
+ for (Long uid : uids) {
+ if (output.contains(uid))
+ allPruned = false;
+ }
+ if (allPruned)
+ output.add(schema.getField(0).uid);
+ }
+ cross.annotate(INPUTUIDS, output);
+ }
+
+ @Override
+ public void visit(LOUnion union) throws IOException {
+ Set<Long> output = setOutputUids(union);
+ Set<Long> input = new HashSet<Long>();
+ for (long uid : output) {
+ input.addAll(union.getInputUids(uid));
+ }
+ union.annotate(INPUTUIDS, input);
+ }
+
+ @Override
+ public void visit(LOSplit split) throws IOException {
+ Set<Long> output = setOutputUids(split);
+ split.annotate(INPUTUIDS, output);
+ }
+
+ @Override
+ public void visit(LOSplitOutput splitOutput) throws IOException {
+ Set<Long> output = setOutputUids(splitOutput);
+
+ // the input uids contains all the output uids and
+ // projections in splitOutput conditions
+ Set<Long> input = new HashSet<Long>(output);
+
+ LogicalExpressionPlan exp = splitOutput.getFilterPlan();
+ collectUids(splitOutput, exp, input);
+
+ splitOutput.annotate(INPUTUIDS, input);
+ }
+
+ @Override
+ public void visit(LOSort sort) throws IOException {
+ Set<Long> output = setOutputUids(sort);
+
+ Set<Long> input = new HashSet<Long>(output);
+
+ for (LogicalExpressionPlan exp : sort.getSortColPlans()) {
+ collectUids(sort, exp, input);
+ }
+
+ sort.annotate(INPUTUIDS, input);
+ }
+
+ /*
+ * This function returns all uids present in the given schema
+ */
+ private Set<Long> getAllUids( LogicalSchema schema ) {
+ Set<Long> uids = new HashSet<Long>();
+
+ if( schema == null ) {
+ return uids;
+ }
+
+ for( LogicalFieldSchema field : schema.getFields() ) {
+ if( ( field.type == DataType.TUPLE || field.type == DataType.BAG )
+ && field.schema != null ) {
+ uids.addAll( getAllUids( field.schema ) );
+ }
+ uids.add( field.uid );
+ }
+ return uids;
+ }
+
+ @Override
+ public void visit(LOForEach foreach) throws IOException {
+ Set<Long> output = setOutputUids(foreach);
+
+ LogicalPlan innerPlan = foreach.getInnerPlan();
+ LOGenerate gen = (LOGenerate)innerPlan.getSinks().get(0);
+ gen.annotate(OUTPUTUIDS, output);
+
+ visit(gen);
+
+ foreach.annotate(INPUTUIDS, gen.getAnnotation(INPUTUIDS));
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void visit(LOGenerate gen) throws IOException {
+ Set<Long> output = (Set<Long>)gen.getAnnotation(OUTPUTUIDS);
+
+ Set<Long> input = new HashSet<Long>();
+
+ List<LogicalExpressionPlan> ll = gen.getOutputPlans();
+
+ Iterator<Long> iter = output.iterator();
+ while(iter.hasNext()) {
+ long uid = iter.next();
+ for(int i=0; i<ll.size(); i++) {
+ boolean found = false;
+ LogicalExpressionPlan exp = ll.get(i);
+ LogicalExpression op = (LogicalExpression)exp.getSources().get(0);
+
+ if (gen.getFlattenFlags()[i] && (op.getFieldSchema().type==DataType.TUPLE ||
+ op.getFieldSchema().type== DataType.BAG)) {
+ // if uid equal to the expression, get all uids of original projections
+ LogicalSchema schema;
+
+ schema = op.getFieldSchema().schema;
+ for (LogicalSchema.LogicalFieldSchema fs : schema.getFields())
+ {
+ if (fs.uid==uid) {
+ found = true;
+ break;
+ }
+ }
+ }
+ else {
+ // No flatten, collect outer uid
+ if (op.getFieldSchema().uid == uid) {
+ found = true;
+ }
+ }
+
+ if (found) {
+ List<Operator> srcs = exp.getSinks();
+ for (Operator src : srcs) {
+ if (src instanceof ProjectExpression) {
+ List<LOInnerLoad> innerLoads = LOForEach.findReacheableInnerLoadFromBoundaryProject((ProjectExpression)src);
+ for (LOInnerLoad innerLoad : innerLoads) {
+ ProjectExpression prj = innerLoad.getProjection();
+ if (prj.isProjectStar()) {
+ if (prj.findReferent().getSchema()!=null) {
+ for (LogicalSchema.LogicalFieldSchema fs : prj.findReferent().getSchema().getFields()) {
+ input.add(fs.uid);
+ }
+ }
+ }
+ else {
+ if (prj.findReferent().getSchema()!=null) {
+ LogicalSchema.LogicalFieldSchema fs = prj.findReferent().getSchema().getField(prj.getColNum());
+ input.add(fs.uid);
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+ }
+
+ // for the flatten bag, we need to make sure at least one field is in the input
+ for(int i=0; i<ll.size(); i++) {
+ if (!gen.getFlattenFlags()[i]) {
+ continue;
+ }
+ LogicalExpressionPlan exp = ll.get(i);
+ List<Operator> srcs = exp.getSinks();
+ for (Operator src : srcs) {
+ List<LOInnerLoad> innerLoads = LOForEach.findReacheableInnerLoadFromBoundaryProject((ProjectExpression)src);
+ for (LOInnerLoad innerLoad : innerLoads) {
+ ProjectExpression prj = innerLoad.getProjection();
+ if (prj.isProjectStar()) {
+ if (prj.findReferent().getSchema()!=null) {
+ for (LogicalSchema.LogicalFieldSchema fs : prj.findReferent().getSchema().getFields()) {
+ input.add(fs.uid);
+ }
+ }
+ }
+ else {
+ if (prj.findReferent().getSchema()!=null) {
+ LogicalSchema.LogicalFieldSchema fs = prj.findReferent().getSchema().getField(prj.getColNum());
+ input.add(fs.uid);
+ }
+ }
+ }
+ }
+ }
+ gen.annotate(INPUTUIDS, input);
+ }
+
+ @Override
+ public void visit(LOInnerLoad load) throws IOException {
+ Set<Long> output = setOutputUids(load);
+ load.annotate(INPUTUIDS, output);
+ }
+
+ private void collectUids(LogicalRelationalOperator currentOp, LogicalExpressionPlan exp, Set<Long> uids) throws IOException {
+ List<Operator> ll = exp.getSinks();
+ for(Operator op: ll) {
+ if (op instanceof ProjectExpression) {
+ if (!((ProjectExpression)op).isProjectStar()) {
+ long uid = ((ProjectExpression)op).getFieldSchema().uid;
+ uids.add(uid);
+ } else {
+ LogicalRelationalOperator ref = ((ProjectExpression)op).findReferent();
+ LogicalSchema s = ref.getSchema();
+ if (s == null) {
+ throw new SchemaNotDefinedException("Schema not defined for " + ref.getAlias());
+ }
+ for(LogicalFieldSchema f: s.getFields()) {
+ uids.add(f.uid);
+ }
+ }
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private Set<Long> setOutputUids(LogicalRelationalOperator op) throws IOException {
+
+ List<Operator> ll = plan.getSuccessors(op);
+ Set<Long> uids = new HashSet<Long>();
+
+ LogicalSchema s = op.getSchema();
+ if (s == null) {
+ throw new SchemaNotDefinedException("Schema for " + op.getName() + " is not defined.");
+ }
+
+ if (ll != null) {
+ // if this is not sink, the output uids are union of input uids of its successors
+ for(Operator succ: ll) {
+ Set<Long> inputUids = (Set<Long>)succ.getAnnotation(INPUTUIDS);
+ if (inputUids != null) {
+ Iterator<Long> iter = inputUids.iterator();
+ while(iter.hasNext()) {
+ long uid = iter.next();
+
+ if (s.findField(uid) != -1) {
+ uids.add(uid);
+ }
+ }
+ }
+ }
+ } else {
+ // if it's leaf, set to its schema
+ for(int i=0; i<s.size(); i++) {
+ uids.add(s.getField(i).uid);
+ }
+ }
+
+ op.annotate(OUTPUTUIDS, uids);
+ return uids;
+ }
+ }
+}
Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnPruneVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnPruneVisitor.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnPruneVisitor.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnPruneVisitor.java Wed Aug 4 17:46:42 2010
@@ -0,0 +1,405 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.newplan.logical.rules;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.LoadPushDown;
+import org.apache.pig.LoadPushDown.RequiredField;
+import org.apache.pig.LoadPushDown.RequiredFieldList;
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.util.Pair;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.ReverseDependencyOrderWalker;
+import org.apache.pig.newplan.logical.Util;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.logical.expression.ProjectExpression;
+import org.apache.pig.newplan.logical.relational.LOCogroup;
+import org.apache.pig.newplan.logical.relational.LOCross;
+import org.apache.pig.newplan.logical.relational.LOFilter;
+import org.apache.pig.newplan.logical.relational.LOForEach;
+import org.apache.pig.newplan.logical.relational.LOGenerate;
+import org.apache.pig.newplan.logical.relational.LOInnerLoad;
+import org.apache.pig.newplan.logical.relational.LOJoin;
+import org.apache.pig.newplan.logical.relational.LOLoad;
+import org.apache.pig.newplan.logical.relational.LOSort;
+import org.apache.pig.newplan.logical.relational.LOSplitOutput;
+import org.apache.pig.newplan.logical.relational.LOStore;
+import org.apache.pig.newplan.logical.relational.LOUnion;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalNodesVisitor;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.newplan.logical.relational.LogicalSchema;
+
+public class ColumnPruneVisitor extends LogicalRelationalNodesVisitor {
+ protected static final Log log = LogFactory.getLog(ColumnPruneVisitor.class);
+ private Map<LOLoad,Pair<Map<Integer,Set<String>>,Set<Integer>>> requiredItems =
+ new HashMap<LOLoad,Pair<Map<Integer,Set<String>>,Set<Integer>>>();
+ private boolean columnPrune;
+
+ public ColumnPruneVisitor(OperatorPlan plan, Map<LOLoad,Pair<Map<Integer,Set<String>>,Set<Integer>>> requiredItems,
+ boolean columnPrune) {
+ super(plan, new ReverseDependencyOrderWalker(plan));
+ this.columnPrune = columnPrune;
+ this.requiredItems = requiredItems;
+ }
+
+ public void addRequiredItems(LOLoad load, Pair<Map<Integer,Set<String>>,Set<Integer>> requiredItem) {
+ requiredItems.put(load, requiredItem);
+ }
+
+ @Override
+ public void visit(LOLoad load) throws IOException {
+ if(! requiredItems.containsKey( load ) ) {
+ return;
+ }
+
+ Pair<Map<Integer,Set<String>>,Set<Integer>> required =
+ requiredItems.get(load);
+
+ RequiredFieldList requiredFields = new RequiredFieldList();
+
+ LogicalSchema s = load.getSchema();
+ for (int i=0;i<s.size();i++) {
+ RequiredField requiredField = null;
+ // As we have done processing ahead, we assume that
+ // a column is not present in both ColumnPruner and
+ // MapPruner
+ if( required.first != null && required.first.containsKey(i) ) {
+ requiredField = new RequiredField();
+ requiredField.setIndex(i);
+ requiredField.setType(s.getField(i).type);
+ List<RequiredField> subFields = new ArrayList<RequiredField>();
+ for( String key : required.first.get(i) ) {
+ RequiredField subField = new RequiredField(key,-1,null,DataType.BYTEARRAY);
+ subFields.add(subField);
+ }
+ requiredField.setSubFields(subFields);
+ requiredFields.add(requiredField);
+ }
+ if( required.second != null && required.second.contains(i) ) {
+ requiredField = new RequiredField();
+ requiredField.setIndex(i);
+ requiredField.setType(s.getField(i).type);
+ requiredFields.add(requiredField);
+ }
+ }
+
+ log.info("Loader for " + load.getAlias() + " is pruned. Load fields " + requiredFields);
+ for(RequiredField rf: requiredFields.getFields()) {
+ List<RequiredField> sub = rf.getSubFields();
+ if (sub != null) {
+ // log.info("For column " + rf.getIndex() + ", set map keys: " + sub.toString());
+ log.info("Map key required for " + load.getAlias() + ": $" + rf.getIndex() + "->" + sub);
+ }
+ }
+
+ LoadPushDown.RequiredFieldResponse response = null;
+ try {
+ LoadFunc loadFunc = load.getLoadFunc();
+ if (loadFunc instanceof LoadPushDown) {
+ response = ((LoadPushDown)loadFunc).pushProjection(requiredFields);
+ }
+
+ } catch (FrontendException e) {
+ log.warn("pushProjection on "+load+" throw an exception, skip it");
+ }
+
+ // Loader does not support column pruning, insert foreach
+ if (columnPrune) {
+ if (response==null || !response.getRequiredFieldResponse()) {
+ LogicalPlan p = (LogicalPlan)load.getPlan();
+ Operator next = p.getSuccessors(load).get(0);
+ // if there is already a LOForEach after load, we don't need to
+ // add another LOForEach
+ if (next instanceof LOForEach) {
+ return;
+ }
+
+ LOForEach foreach = new LOForEach(load.getPlan());
+
+ // add foreach to the base plan
+ p.add(foreach);
+
+ Pair<Integer,Integer> disconnectedPos = p.disconnect(load, next);
+ p.connect(load, disconnectedPos.first.intValue(), foreach, 0 );
+ p.connect(foreach, 0, next, disconnectedPos.second.intValue());
+
+ LogicalPlan innerPlan = new LogicalPlan();
+ foreach.setInnerPlan(innerPlan);
+
+ // build foreach inner plan
+ List<LogicalExpressionPlan> exps = new ArrayList<LogicalExpressionPlan>();
+ LOGenerate gen = new LOGenerate(innerPlan, exps, new boolean[requiredFields.getFields().size()]);
+ innerPlan.add(gen);
+
+ for (int i=0; i<requiredFields.getFields().size(); i++) {
+ LoadPushDown.RequiredField rf = requiredFields.getFields().get(i);
+ LOInnerLoad innerLoad = new LOInnerLoad(innerPlan, foreach, rf.getIndex());
+ innerPlan.add(innerLoad);
+ innerPlan.connect(innerLoad, gen);
+
+ LogicalExpressionPlan exp = new LogicalExpressionPlan();
+ ProjectExpression prj = new ProjectExpression(exp, i, -1, gen);
+ exp.add(prj);
+ exps.add(exp);
+ }
+
+ } else {
+ // columns are pruned, reset schema for LOLoader
+ List<Integer> requiredIndexes = new ArrayList<Integer>();
+ List<LoadPushDown.RequiredField> fieldList = requiredFields.getFields();
+ for (int i=0; i<fieldList.size(); i++) {
+ requiredIndexes.add(fieldList.get(i).getIndex());
+ }
+
+ load.setRequiredFields(requiredIndexes);
+
+ LogicalSchema newSchema = new LogicalSchema();
+ for (int i=0; i<fieldList.size(); i++) {
+ newSchema.addField(s.getField(fieldList.get(i).getIndex()));
+ }
+
+ load.setSchema(newSchema);
+ }
+ }
+ }
+
+ @Override
+ public void visit(LOFilter filter) throws IOException {
+ }
+
+ @Override
+ public void visit(LOSplitOutput splitOutput) throws IOException {
+ }
+
+ @Override
+ public void visit(LOSort sort) throws IOException {
+ }
+
+ @Override
+ public void visit(LOStore store) throws IOException {
+ }
+
+ @Override
+ public void visit( LOCogroup cg ) throws IOException {
+ addForEachIfNecessary(cg);
+ }
+
+ @Override
+ public void visit(LOJoin join) throws IOException {
+ }
+
+ @Override
+ public void visit(LOCross cross) throws IOException {
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public void visit(LOForEach foreach) throws IOException {
+ if (!columnPrune) {
+ return;
+ }
+
+ // get column numbers from input uids
+ Set<Long> inputUids = (Set<Long>)foreach.getAnnotation(ColumnPruneHelper.INPUTUIDS);
+
+ // Get all top level projects
+ LogicalPlan innerPlan = foreach.getInnerPlan();
+ List<LOInnerLoad> innerLoads= new ArrayList<LOInnerLoad>();
+ List<Operator> sources = innerPlan.getSources();
+ for (Operator s : sources) {
+ if (s instanceof LOInnerLoad)
+ innerLoads.add((LOInnerLoad)s);
+ }
+
+ // If project of the innerLoad is not in INPUTUIDS, remove this innerLoad
+ Set<LOInnerLoad> innerLoadsToRemove = new HashSet<LOInnerLoad>();
+ for (LOInnerLoad innerLoad: innerLoads) {
+ ProjectExpression project = innerLoad.getProjection();
+ if (project.isProjectStar()) {
+ LogicalSchema.LogicalFieldSchema tupleFS = project.getFieldSchema();
+ // Check the first component of the star projection
+ long uid = tupleFS.schema.getField(0).uid;
+ if (!inputUids.contains(uid))
+ innerLoadsToRemove.add(innerLoad);
+ }
+ else {
+ if (!inputUids.contains(project.getFieldSchema().uid))
+ innerLoadsToRemove.add(innerLoad);
+ }
+ }
+
+ // Find the logical operator immediate precede LOGenerate which should be removed (the whole branch)
+ Set<LogicalRelationalOperator> branchHeadToRemove = new HashSet<LogicalRelationalOperator>();
+ for (LOInnerLoad innerLoad : innerLoadsToRemove) {
+ Operator op = innerLoad;
+ while (!(innerPlan.getSuccessors(op).get(0) instanceof LOGenerate)) {
+ op = innerPlan.getSuccessors(op).get(0);
+ }
+ branchHeadToRemove.add((LogicalRelationalOperator)op);
+ }
+
+ // Find the expression plan to remove
+ LOGenerate gen = (LOGenerate)innerPlan.getSinks().get(0);
+ List<LogicalExpressionPlan> genPlansToRemove = new ArrayList<LogicalExpressionPlan>();
+
+ List<LogicalExpressionPlan> genPlans = gen.getOutputPlans();
+ for (int i=0;i<genPlans.size();i++) {
+ LogicalExpressionPlan expPlan = genPlans.get(i);
+ List<Operator> expSources = expPlan.getSinks();
+
+ for (Operator expSrc : expSources) {
+ if (expSrc instanceof ProjectExpression) {
+ LogicalRelationalOperator reference = ((ProjectExpression)expSrc).findReferent();
+ if (branchHeadToRemove.contains(reference)) {
+ genPlansToRemove.add(expPlan);
+ }
+ }
+ }
+ }
+
+ // Build the temporary structure based on genPlansToRemove, which include:
+ // * flattenList
+ // * inputsRemoved
+ // We first construct inputsNeeded, and inputsRemoved = (all inputs) - inputsNeeded.
+ // We cannot figure out inputsRemoved directly since the inputs may be used by other output plan.
+ // We can only get inputsRemoved after visiting all output plans.
+ List<Boolean> flattenList = new ArrayList<Boolean>();
+ Set<Integer> inputsNeeded = new HashSet<Integer>();
+ Set<Integer> inputsRemoved = new HashSet<Integer>();
+
+ for (int i=0;i<genPlans.size();i++) {
+ LogicalExpressionPlan genPlan = genPlans.get(i);
+ if (!genPlansToRemove.contains(genPlan)) {
+ flattenList.add(gen.getFlattenFlags()[i]);
+ List<Operator> sinks = genPlan.getSinks();
+ for(Operator s: sinks) {
+ if (s instanceof ProjectExpression) {
+ inputsNeeded.add(((ProjectExpression)s).getInputNum());
+ }
+ }
+ }
+ }
+
+ List<Operator> preds = innerPlan.getPredecessors(gen);
+ for (int i=0;i<preds.size();i++) {
+ if (!inputsNeeded.contains(i))
+ inputsRemoved.add(i);
+ }
+
+ // Change LOGenerate: remove unneeded output expression plan
+ // change flatten flag
+ boolean[] flatten = new boolean[flattenList.size()];
+ for (int i=0;i<flattenList.size();i++)
+ flatten[i] = flattenList.get(i);
+
+ gen.setFlattenFlags(flatten);
+
+ for (LogicalExpressionPlan genPlanToRemove : genPlansToRemove) {
+ genPlans.remove(genPlanToRemove);
+ }
+
+ // shift project input
+ if (!inputsRemoved.isEmpty()) {
+ for (LogicalExpressionPlan genPlan : genPlans) {
+ List<Operator> sinks = genPlan.getSinks();
+ for(Operator s: sinks) {
+ if (s instanceof ProjectExpression) {
+ int input = ((ProjectExpression)s).getInputNum();
+ int numToShift = 0;
+ for (int i :inputsRemoved) {
+ if (i<input)
+ numToShift++;
+ }
+ ((ProjectExpression)s).setInputNum(input-numToShift);
+ }
+ }
+ }
+ }
+
+ // Prune unneeded LOInnerLoad
+ List<LogicalRelationalOperator> predToRemove = new ArrayList<LogicalRelationalOperator>();
+ for (int i : inputsRemoved) {
+ predToRemove.add((LogicalRelationalOperator)preds.get(i));
+ }
+ for (LogicalRelationalOperator pred : predToRemove) {
+ removeSubTree(pred);
+ }
+ }
+
+ @Override
+ public void visit(LOUnion union) throws IOException {
+ // AddForEach before union if necessary.
+ List<Operator> preds = new ArrayList<Operator>();
+ preds.addAll(plan.getPredecessors(union));
+
+ for (Operator pred : preds) {
+ addForEachIfNecessary((LogicalRelationalOperator)pred);
+ }
+ }
+
+ // remove all the operators starting from an operator
+ private void removeSubTree(LogicalRelationalOperator op) throws IOException {
+ LogicalPlan p = (LogicalPlan)op.getPlan();
+ List<Operator> ll = p.getPredecessors(op);
+ if (ll != null) {
+ for(Operator pred: ll) {
+ removeSubTree((LogicalRelationalOperator)pred);
+ }
+ }
+
+ if (p.getSuccessors(op) != null) {
+ Operator[] succs = p.getSuccessors(op).toArray(new Operator[0]);
+ for(Operator s: succs) {
+ p.disconnect(op, s);
+ }
+ }
+
+ p.remove(op);
+ }
+
+ // Add ForEach after op to prune unnecessary columns
+ @SuppressWarnings("unchecked")
+ private void addForEachIfNecessary(LogicalRelationalOperator op) throws IOException {
+ Set<Long> outputUids = (Set<Long>)op.getAnnotation(ColumnPruneHelper.OUTPUTUIDS);
+ LogicalSchema schema = op.getSchema();
+ Set<Integer> columnsToDrop = new HashSet<Integer>();
+
+ for (int i=0;i<schema.size();i++) {
+ if (!outputUids.contains(schema.getField(i).uid))
+ columnsToDrop.add(i);
+ }
+
+ if (!columnsToDrop.isEmpty()) {
+ LOForEach foreach = Util.addForEachAfter((LogicalPlan)op.getPlan(), op, columnsToDrop);
+ foreach.getSchema();
+ }
+ }
+}
Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/FilterAboveForeach.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/FilterAboveForeach.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/FilterAboveForeach.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/FilterAboveForeach.java Wed Aug 4 17:46:42 2010
@@ -0,0 +1,265 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.newplan.logical.rules;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.pig.impl.util.Pair;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.OperatorSubPlan;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.logical.expression.ProjectExpression;
+import org.apache.pig.newplan.logical.relational.LOFilter;
+import org.apache.pig.newplan.logical.relational.LOForEach;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.newplan.logical.relational.LogicalSchema;
+import org.apache.pig.newplan.optimizer.Rule;
+import org.apache.pig.newplan.optimizer.Transformer;
+
+/**
+ * This Rule moves Filter Above Foreach.
+ * It checks if uid on which filter works on
+ * is present in the predecessor of foreach.
+ * If so it transforms it.
+ */
+public class FilterAboveForeach extends Rule {
+
+ public FilterAboveForeach(String n) {
+ super(n);
+ }
+
+ @Override
+ protected OperatorPlan buildPattern() {
+ // the pattern that this rule looks for
+ // is foreach -> filter
+ LogicalPlan plan = new LogicalPlan();
+ LogicalRelationalOperator foreach = new LOForEach(plan);
+ LogicalRelationalOperator filter = new LOFilter(plan);
+
+ plan.add(foreach);
+ plan.add(filter);
+ plan.connect(foreach, filter);
+
+ return plan;
+ }
+
+ @Override
+ public Transformer getNewTransformer() {
+ return new FilterAboveForEachTransformer();
+ }
+
+ public class FilterAboveForEachTransformer extends Transformer {
+
+ LOFilter filter = null;
+ LOForEach foreach = null;
+ LogicalRelationalOperator forEachPred = null;
+ OperatorSubPlan subPlan = null;
+
+ @Override
+ public boolean check(OperatorPlan matched) throws IOException {
+ Iterator<Operator> iter = matched.getOperators();
+ while( iter.hasNext() ) {
+ Operator op = iter.next();
+ if( op instanceof LOForEach ) {
+ foreach = (LOForEach)op;
+ break;
+ }
+ }
+
+ // This would be a strange case
+ if( foreach == null ) return false;
+
+ iter = matched.getOperators();
+ while( iter.hasNext() ) {
+ Operator op = iter.next();
+ if( ( op instanceof LOFilter ) ) {
+ filter = (LOFilter)op;
+ break;
+ }
+ }
+
+ // This is for cheating, we look up more than one filter in the plan
+ while( filter != null ) {
+ // Get uids of Filter
+ Pair<List<Long>, List<Byte>> uidWithTypes = getFilterProjectionUids(filter);
+
+ // See if the previous operators have uids from project
+ List<Operator> preds = currentPlan.getPredecessors(foreach);
+ for(int j=0; j< preds.size(); j++) {
+ LogicalRelationalOperator logRelOp = (LogicalRelationalOperator)preds.get(j);
+ if (hasAll(logRelOp, uidWithTypes)) {
+ forEachPred = (LogicalRelationalOperator) preds.get(j);
+ return true;
+ }
+ }
+
+ // Chances are there are filters below this filter which can be
+ // moved up. So searching for those filters
+ List<Operator> successors = currentPlan.getSuccessors(filter);
+ if( successors != null && successors.size() > 0 &&
+ successors.get(0) instanceof LOFilter ) {
+ filter = (LOFilter)successors.get(0);
+ } else {
+ filter = null;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Get all uids from Projections of this FilterOperator
+ * @param filter
+ * @return Set of uid
+ */
+ private Pair<List<Long>, List<Byte>> getFilterProjectionUids(LOFilter filter) throws IOException {
+ List<Long> uids = new ArrayList<Long>();
+ List<Byte> types = new ArrayList<Byte>();
+ if( filter != null ) {
+ LogicalExpressionPlan filterPlan = filter.getFilterPlan();
+ Iterator<Operator> iter = filterPlan.getOperators();
+ Operator op = null;
+ while( iter.hasNext() ) {
+ op = iter.next();
+ if( op instanceof ProjectExpression ) {
+ uids.add(((ProjectExpression)op).getFieldSchema().uid);
+ types.add(((ProjectExpression)op).getFieldSchema().type);
+ }
+ }
+ }
+ Pair<List<Long>, List<Byte>> result = new Pair<List<Long>, List<Byte>>(uids, types);
+ return result;
+ }
+
+ /**
+ * checks if a relational operator contains all of the specified uids
+ * @param op LogicalRelational operator that should contain the uid
+ * @param uids Uids to check for
+ * @return true if given LogicalRelationalOperator has all the given uids
+ */
+ private boolean hasAll(LogicalRelationalOperator op, Pair<List<Long>,
+ List<Byte>> uidWithTypes) {
+ LogicalSchema schema = op.getSchema();
+
+ if (schema==null)
+ return false;
+
+ List<Long> uids = uidWithTypes.first;
+ List<Byte> types = uidWithTypes.second;
+
+ for (int i=0;i<uids.size();i++) {
+ boolean found = false;
+ for (LogicalSchema.LogicalFieldSchema fs : schema.getFields()) {
+ if (fs.uid==uids.get(i) && fs.type==types.get(i))
+ found = true;
+ }
+ if (!found)
+ return false;
+ }
+ return true;
+ }
+
+ @Override
+ public OperatorPlan reportChanges() {
+ return subPlan;
+ }
+
+ @Override
+ public void transform(OperatorPlan matched) throws IOException {
+
+ List<Operator> opSet = currentPlan.getPredecessors(filter);
+ if( ! ( opSet != null && opSet.size() > 0 ) ) {
+ return;
+ }
+ Operator filterPred = opSet.get(0);
+
+ opSet = currentPlan.getSuccessors(filter);
+ if( ! ( opSet != null && opSet.size() > 0 ) ) {
+ return;
+ }
+ Operator filterSuc = opSet.get(0);
+
+ subPlan = new OperatorSubPlan(currentPlan);
+
+ // Steps below do the following
+ /*
+ * ForEachPred
+ * |
+ * ForEach
+ * |
+ * Filter*
+ * ( These are filters
+ * which cannot be moved )
+ * |
+ * FilterPred
+ * ( is a Filter )
+ * |
+ * Filter
+ * ( To be moved )
+ * |
+ * FilterSuc
+ *
+ * |
+ * |
+ * Transforms into
+ * |
+ * \/
+ *
+ * ForEachPred
+ * |
+ * Filter
+ * ( After being Moved )
+ * |
+ * ForEach
+ * |
+ * Filter*
+ * ( These are filters
+ * which cannot be moved )
+ * |
+ * FilterPred
+ * ( is a Filter )
+ * |
+ * FilterSuc
+ *
+ * Above plan is assuming we are modifying the filter in middle.
+ * If we are modifying the first filter after ForEach then
+ * -- * (kleene star) becomes zero
+ * -- And ForEach is FilterPred
+ */
+
+ Pair<Integer, Integer> forEachPredPlaces = currentPlan.disconnect(forEachPred, foreach);
+ Pair<Integer, Integer> filterPredPlaces = currentPlan.disconnect(filterPred, filter);
+ Pair<Integer, Integer> filterSucPlaces = currentPlan.disconnect(filter, filterSuc);
+
+ currentPlan.connect(forEachPred, forEachPredPlaces.first, filter, filterPredPlaces.second);
+ currentPlan.connect(filter, filterSucPlaces.first, foreach, forEachPredPlaces.second);
+ currentPlan.connect(filterPred, filterPredPlaces.first, filterSuc, filterSucPlaces.second);
+
+ subPlan.add(forEachPred);
+ subPlan.add(foreach);
+ subPlan.add(filterPred);
+ subPlan.add(filter);
+ subPlan.add(filterSuc);
+ }
+ }
+
+}
Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/MapKeysPruneHelper.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/MapKeysPruneHelper.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/MapKeysPruneHelper.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/MapKeysPruneHelper.java Wed Aug 4 17:46:42 2010
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan.logical.rules;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.pig.data.DataType;
+import org.apache.pig.newplan.DependencyOrderWalker;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.OperatorSubPlan;
+import org.apache.pig.newplan.ReverseDependencyOrderWalker;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionVisitor;
+import org.apache.pig.newplan.logical.expression.MapLookupExpression;
+import org.apache.pig.newplan.logical.optimizer.AllExpressionVisitor;
+import org.apache.pig.newplan.logical.relational.LOFilter;
+import org.apache.pig.newplan.logical.relational.LOGenerate;
+import org.apache.pig.newplan.logical.relational.LOJoin;
+import org.apache.pig.newplan.logical.relational.LOLoad;
+import org.apache.pig.newplan.logical.relational.LOSort;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.newplan.logical.relational.LogicalSchema;
+import org.apache.pig.newplan.logical.relational.LogicalSchema.LogicalFieldSchema;
+
+/**
+ * This filter Marks every Load Operator which has a Map
+ * with MAP_MARKER_ANNOTATION. The annotation value is
+ * <code>Map<Integer,Set<String>><code> where Integer is the column number
+ * of the field and Set is the set of Keys in this field ( field is a map field only ).
+ *
+ * It does this for only the top level schema in load.
+ *
+ * Algorithm:
+ * Traverse the Plan in ReverseDependency order ( ie. Sink to Source )
+ * For LogicalRelationalOperators having MapLookupExpression in their
+ * expressionPlan collect uid and keys related to it. This is
+ * retained in the visitor
+ * For ForEach having nested LogicalPlan use the same visitor hence
+ * there is no distinction required
+ * At Sources find all the uids provided by this source and annotate this
+ * LogicalRelationalOperator ( load ) with <code>Map<Integer,Set<String>></code>
+ * containing only the column numbers that this LogicalRelationalOperator generates
+ *
+ * NOTE: This is a simple Map Pruner. If a map key is mentioned in the script
+ * then this pruner assumes you need the key. This pruner is not as optimized
+ * as column pruner ( which removes a column if it is mentioned but never used )
+ *
+ */
+public class MapKeysPruneHelper {
+
+ public static final String REQUIRED_MAPKEYS = "MapPruner:RequiredKeys";
+
+ private OperatorPlan currentPlan;
+ private OperatorSubPlan subplan;
+
+ public MapKeysPruneHelper(OperatorPlan currentPlan) {
+ this.currentPlan = currentPlan;
+
+ if (currentPlan instanceof OperatorSubPlan) {
+ subplan = new OperatorSubPlan(((OperatorSubPlan)currentPlan).getBasePlan());
+ } else {
+ subplan = new OperatorSubPlan(currentPlan);
+ }
+ }
+
+
+ @SuppressWarnings("unchecked")
+ public boolean check() throws IOException {
+
+ // First check if we have a load with a map in it or not
+ List<Operator> sources = currentPlan.getSources();
+
+ for( Operator source : sources ) {
+ LogicalSchema schema = ((LogicalRelationalOperator)source).getSchema();
+ // If any of the loads has a null schema we dont know the ramifications here
+ // so we skip this optimization
+ if( schema == null ) {
+ return false;
+ }
+ }
+
+ // Now we check what keys are needed
+ MapMarker marker = new MapMarker(currentPlan);
+ marker.visit();
+
+ // Get all Uids from Sinks
+ List<Operator> sinks = currentPlan.getSinks();
+ Set<Long> sinkMapUids = new HashSet<Long>();
+ for( Operator sink : sinks ) {
+ LogicalSchema schema = ((LogicalRelationalOperator)sink).getSchema();
+ sinkMapUids.addAll( getMapUids( schema ) );
+ }
+
+
+ // If we have found specific keys which are needed then we return true;
+ // Else if we dont have any specific keys we return false
+ boolean hasAnnotation = false;
+ for( Operator source : sources ) {
+ Map<Integer,Set<String>> annotationValue =
+ (Map<Integer, Set<String>>) ((LogicalRelationalOperator)source).getAnnotation(REQUIRED_MAPKEYS);
+
+ // Now for all full maps found in sinks we cannot prune them at source
+ if( ! sinkMapUids.isEmpty() && annotationValue != null &&
+ !annotationValue.isEmpty() ) {
+ Integer[] annotationKeyArray = annotationValue.keySet().toArray( new Integer[0] );
+ LogicalSchema sourceSchema = ((LogicalRelationalOperator)source).getSchema();
+ for( Integer col : annotationKeyArray ) {
+ if( sinkMapUids.contains(sourceSchema.getField(col).uid)) {
+ annotationValue.remove( col );
+ }
+ }
+ }
+
+ if ( annotationValue != null && annotationValue.isEmpty()) {
+ ((LogicalRelationalOperator)source).removeAnnotation(REQUIRED_MAPKEYS);
+ annotationValue = null;
+ }
+
+ // Can we still prune any keys
+ if( annotationValue != null ) {
+ hasAnnotation = true;
+ subplan.add(source);
+ }
+ }
+
+ // If all the sinks dont have any schema, we cant to any optimization
+ return hasAnnotation;
+ }
+
+ /**
+ * This function checks if the schema has a map.
+ * We dont check for a nested structure.
+ * @param schema Schema to be checked
+ * @return true if it has a map, else false
+ * @throws NullPointerException incase Schema is null
+ */
+ private boolean hasMap(LogicalSchema schema ) throws NullPointerException {
+ for( LogicalFieldSchema field : schema.getFields() ) {
+ if( field.type == DataType.MAP ) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * This function returns a set of Uids corresponding to
+ * map datatype in the first level of this schema
+ * @param schema Schema having fields
+ * @return
+ */
+ private Set<Long> getMapUids(LogicalSchema schema ) {
+ Set<Long> uids = new HashSet<Long>();
+ if( schema != null ) {
+ for( LogicalFieldSchema field : schema.getFields() ) {
+ if( field.type == DataType.MAP ) {
+ uids.add( field.uid );
+ }
+ }
+ }
+ return uids;
+ }
+
+ public OperatorPlan reportChanges() {
+ return subplan;
+ }
+
+
+ /**
+ * This class collects all the information required to create
+ * the list of keys required for a map
+ */
+ static public class MapMarker extends AllExpressionVisitor {
+
+ Map<Long,Set<String>> inputUids = null;
+
+ protected MapMarker(OperatorPlan plan) {
+ super(plan, new ReverseDependencyOrderWalker(plan));
+ inputUids = new HashMap<Long,Set<String>>();
+ }
+
+ @Override
+ public void visit(LOLoad load) throws IOException {
+ if( load.getSchema() != null ) {
+ Map<Integer,Set<String>> annotation = new HashMap<Integer,Set<String>>();
+ for( int i=0; i<load.getSchema().size(); i++) {
+ LogicalFieldSchema field = load.getSchema().getField(i);
+ if( inputUids.containsKey( field.uid ) ) {
+ annotation.put(i, inputUids.get( field.uid ) );
+ }
+ }
+ load.annotate(REQUIRED_MAPKEYS, annotation);
+ }
+ }
+
+ @Override
+ public void visit(LOFilter filter) throws IOException {
+ currentOp = filter;
+ MapExprMarker v = (MapExprMarker) getVisitor(filter.getFilterPlan());
+ v.visit();
+ mergeUidKeys( v.inputUids );
+ }
+
+ @Override
+ public void visit(LOJoin join) throws IOException {
+ currentOp = join;
+ Collection<LogicalExpressionPlan> c = join.getExpressionPlans();
+ for (LogicalExpressionPlan plan : c) {
+ MapExprMarker v = (MapExprMarker) getVisitor(plan);
+ v.visit();
+ mergeUidKeys( v.inputUids );
+ }
+ }
+
+ @Override
+ public void visit(LOGenerate gen) throws IOException {
+ currentOp = gen;
+ Collection<LogicalExpressionPlan> plans = gen.getOutputPlans();
+ for( LogicalExpressionPlan plan : plans ) {
+ MapExprMarker v = (MapExprMarker) getVisitor(plan);
+ v.visit();
+ mergeUidKeys( v.inputUids );
+ }
+ }
+
+ @Override
+ public void visit(LOSort sort) throws IOException {
+ currentOp = sort;
+ Collection<LogicalExpressionPlan> c = sort.getSortColPlans();
+ for (LogicalExpressionPlan plan : c) {
+ MapExprMarker v = (MapExprMarker) getVisitor(plan);
+ v.visit();
+ mergeUidKeys( v.inputUids );
+ }
+ }
+
+ private void mergeUidKeys( Map<Long, Set<String> > inputMap ) {
+ for( Map.Entry<Long, Set<String>> entry : inputMap.entrySet() ) {
+ if( inputUids.containsKey(entry.getKey()) ) {
+ Set<String> mapKeySet = inputUids.get(entry.getKey());
+ mapKeySet.addAll(entry.getValue());
+ } else {
+ inputUids.put(entry.getKey(), inputMap.get(entry.getKey()));
+ }
+ }
+ }
+
+ @Override
+ protected LogicalExpressionVisitor getVisitor(LogicalExpressionPlan expr) {
+ return new MapExprMarker(expr );
+ }
+
+ static class MapExprMarker extends LogicalExpressionVisitor {
+
+ Map<Long,Set<String>> inputUids = null;
+
+ protected MapExprMarker(OperatorPlan p) {
+ super(p, new DependencyOrderWalker(p));
+ inputUids = new HashMap<Long,Set<String>>();
+ }
+
+ @Override
+ public void visit(MapLookupExpression op) throws IOException {
+ Long uid = op.getMap().getFieldSchema().uid;
+ String key = op.getLookupKey();
+
+ HashSet<String> mapKeySet = null;
+ if( inputUids.containsKey(uid) ) {
+ mapKeySet = (HashSet<String>) inputUids.get(uid);
+ } else {
+ mapKeySet = new HashSet<String>();
+ inputUids.put(uid, mapKeySet);
+ }
+ mapKeySet.add(key);
+ }
+ }
+ }
+}
Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/MergeFilter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/MergeFilter.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/MergeFilter.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/MergeFilter.java Wed Aug 4 17:46:42 2010
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.newplan.logical.rules;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.pig.impl.util.Pair;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.OperatorSubPlan;
+import org.apache.pig.newplan.logical.expression.AndExpression;
+import org.apache.pig.newplan.logical.expression.LogicalExpression;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.logical.expression.ProjectExpression;
+import org.apache.pig.newplan.logical.relational.LOFilter;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.newplan.optimizer.Rule;
+import org.apache.pig.newplan.optimizer.Transformer;
+
+public class MergeFilter extends Rule {
+
+ public MergeFilter(String n) {
+ super(n);
+ }
+
+ @Override
+ public Transformer getNewTransformer() {
+ return new MergeFilterTransformer();
+ }
+
+ public class MergeFilterTransformer extends Transformer {
+
+ private OperatorSubPlan subPlan;
+
+ @Override
+ public boolean check(OperatorPlan matched) throws IOException {
+ LOFilter filter = (LOFilter)matched.getSources().get(0);
+ List<Operator> succeds = currentPlan.getSuccessors(filter);
+ // if this filter is followed by another filter, we should combine them
+ if (succeds != null && succeds.size() == 1) {
+ if (succeds.get(0) instanceof LOFilter) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public void transform(OperatorPlan matched) throws IOException {
+ subPlan = new OperatorSubPlan(currentPlan);
+
+ LOFilter filter = (LOFilter)matched.getSources().get(0);
+
+ subPlan.add(filter);
+
+ List<Operator> succeds = currentPlan.getSuccessors(filter);
+ if (succeds != null && succeds.size()== 1 && (succeds.get(0) instanceof LOFilter)) {
+ LOFilter next = (LOFilter)succeds.get(0);
+ combineFilterCond(filter, next);
+ Pair<Integer, Integer> p1 = currentPlan.disconnect(filter, next);
+ List<Operator> ll = currentPlan.getSuccessors(next);
+ if (ll!= null && ll.size()>0) {
+ Operator op = ll.get(0);
+ Pair<Integer, Integer> p2 = currentPlan.disconnect(next, op);
+ currentPlan.connect(filter, p1.first, op, p2.second);
+ subPlan.add(op);
+ }
+
+ currentPlan.remove(next);
+ }
+
+ Iterator<Operator> iter = filter.getFilterPlan().getOperators();
+ while (iter.hasNext()) {
+ Operator oper = iter.next();
+ if (oper instanceof ProjectExpression) {
+ ((ProjectExpression)oper).setAttachedRelationalOp(filter);
+ }
+ }
+ }
+
+ @Override
+ public OperatorPlan reportChanges() {
+ return subPlan;
+ }
+
+ // combine the condition of two filters. The condition of second filter
+ // is added into the condition of first filter with an AND operator.
+ private void combineFilterCond(LOFilter f1, LOFilter f2) throws IOException {
+ LogicalExpressionPlan p1 = f1.getFilterPlan();
+ LogicalExpressionPlan p2 = f2.getFilterPlan();
+ LogicalExpressionPlan andPlan = new LogicalExpressionPlan();
+
+ // add existing operators
+ Iterator<Operator> iter = p1.getOperators();
+ while(iter.hasNext()) {
+ andPlan.add(iter.next());
+ }
+
+ iter = p2.getOperators();
+ while(iter.hasNext()) {
+ andPlan.add(iter.next());
+ }
+
+ // add all connections
+ iter = p1.getOperators();
+ while(iter.hasNext()) {
+ Operator n = iter.next();
+ List<Operator> l = p1.getPredecessors(n);
+ if (l != null) {
+ for(Operator op: l) {
+ andPlan.connect(op, n);
+ }
+ }
+ }
+
+ iter = p2.getOperators();
+ while(iter.hasNext()) {
+ Operator n = iter.next();
+ List<Operator> l = p2.getPredecessors(n);
+ if (l != null) {
+ for(Operator op: l) {
+ andPlan.connect(op, n);
+ }
+ }
+ }
+
+ // create an AND
+ new AndExpression(andPlan, (LogicalExpression)p1.getSources().get(0), (LogicalExpression)p2.getSources().get(0));
+
+ f1.setFilterPlan(andPlan);
+ }
+
+ }
+
+ @Override
+ protected OperatorPlan buildPattern() {
+ // the pattern that this rule looks for
+ // is filter operator
+ LogicalPlan plan = new LogicalPlan();
+ LogicalRelationalOperator op = new LOFilter(plan);
+ plan.add(op);
+
+ return plan;
+ }
+}
+