You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2010/02/11 23:12:43 UTC
svn commit: r909165 [3/6] - in /hadoop/pig/trunk: src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/
src/org/apache/pig/experimental/logical/
src/org/apache/pig/experimental/logical/expression/
src/org/apache/pig/experimental/logica...
Added: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogToPhyTranslationVisitor.java?rev=909165&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogToPhyTranslationVisitor.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogToPhyTranslationVisitor.java Thu Feb 11 22:12:36 2010
@@ -0,0 +1,812 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.experimental.logical.relational;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Stack;
+import org.apache.pig.PigException;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.LogicalToPhysicalTranslatorException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.experimental.logical.expression.ExpToPhyTranslationVisitor;
+import org.apache.pig.experimental.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.experimental.logical.expression.ProjectExpression;
+import org.apache.pig.experimental.logical.relational.LogicalSchema.LogicalFieldSchema;
+import org.apache.pig.experimental.plan.DependencyOrderWalker;
+import org.apache.pig.experimental.plan.Operator;
+import org.apache.pig.experimental.plan.OperatorPlan;
+import org.apache.pig.experimental.plan.PlanWalker;
+import org.apache.pig.experimental.plan.ReverseDependencyOrderWalker;
+import org.apache.pig.experimental.plan.SubtreeDependencyOrderWalker;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
+import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.CompilerUtils;
+import org.apache.pig.impl.util.LinkedMultiMap;
+import org.apache.pig.impl.util.MultiMap;
+
+public class LogToPhyTranslationVisitor extends LogicalPlanVisitor {
+
+ public LogToPhyTranslationVisitor(OperatorPlan plan) {
+ super(plan, new DependencyOrderWalker(plan));
+ currentPlan = new PhysicalPlan();
+ logToPhyMap = new HashMap<Operator, PhysicalOperator>();
+ currentPlans = new Stack<PhysicalPlan>();
+ }
+
+ protected Map<Operator, PhysicalOperator> logToPhyMap;
+
+ protected Stack<PhysicalPlan> currentPlans;
+
+ protected PhysicalPlan currentPlan;
+
+ protected NodeIdGenerator nodeGen = NodeIdGenerator.getGenerator();
+
+ protected PigContext pc;
+
+ public void setPigContext(PigContext pc) {
+ this.pc = pc;
+ }
+
+ public PhysicalPlan getPhysicalPlan() {
+ return currentPlan;
+ }
+
+ @Override
+ public void visitLOLoad(LOLoad loLoad) throws IOException {
+ String scope = DEFAULT_SCOPE;
+// System.err.println("Entering Load");
+ // The last parameter here is set to true as we assume all files are
+ // splittable due to LoadStore Refactor
+ POLoad load = new POLoad(new OperatorKey(scope, nodeGen
+ .getNextNodeId(scope)), true);
+ load.setAlias(loLoad.getAlias());
+ load.setLFile(loLoad.getFileSpec());
+ load.setPc(pc);
+ load.setResultType(DataType.BAG);
+ load.setSignature(loLoad.getAlias());
+ currentPlan.add(load);
+ logToPhyMap.put(loLoad, load);
+
+ // Load is typically a root operator, but in the multiquery
+ // case it might have a store as a predecessor.
+ List<Operator> op = loLoad.getPlan().getPredecessors(loLoad);
+ PhysicalOperator from;
+
+ if(op != null) {
+ from = logToPhyMap.get(op.get(0));
+ try {
+ currentPlan.connect(from, load);
+ } catch (PlanException e) {
+ int errCode = 2015;
+ String msg = "Invalid physical operators in the physical plan" ;
+ throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+ }
+ }
+// System.err.println("Exiting Load");
+ }
+
+ @Override
+ public void visitLOFilter(LOFilter filter) throws IOException {
+ String scope = DEFAULT_SCOPE;
+// System.err.println("Entering Filter");
+ POFilter poFilter = new POFilter(new OperatorKey(scope, nodeGen
+ .getNextNodeId(scope)), filter.getRequestedParallelisam());
+ poFilter.setAlias(filter.getAlias());
+ poFilter.setResultType(DataType.BAG);
+ currentPlan.add(poFilter);
+ logToPhyMap.put(filter, poFilter);
+ currentPlans.push(currentPlan);
+
+ currentPlan = new PhysicalPlan();
+
+// PlanWalker childWalker = currentWalker
+// .spawnChildWalker(filter.getFilterPlan());
+ PlanWalker childWalker = new ReverseDependencyOrderWalker(filter.getFilterPlan());
+ pushWalker(childWalker);
+ //currentWalker.walk(this);
+ currentWalker.walk(
+ new ExpToPhyTranslationVisitor( currentWalker.getPlan(),
+ childWalker, filter, currentPlan, logToPhyMap ) );
+ popWalker();
+
+ poFilter.setPlan(currentPlan);
+ currentPlan = currentPlans.pop();
+
+ List<Operator> op = filter.getPlan().getPredecessors(filter);
+
+ PhysicalOperator from;
+ if(op != null) {
+ from = logToPhyMap.get(op.get(0));
+ } else {
+ int errCode = 2051;
+ String msg = "Did not find a predecessor for Filter." ;
+ throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG);
+ }
+
+ try {
+ currentPlan.connect(from, poFilter);
+ } catch (PlanException e) {
+ int errCode = 2015;
+ String msg = "Invalid physical operators in the physical plan" ;
+ throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+ }
+// System.err.println("Exiting Filter");
+ }
+
+ public void visitLOInnerLoad(LOInnerLoad load) throws IOException {
+ String scope = DEFAULT_SCOPE;
+
+ POProject exprOp = new POProject(new OperatorKey(scope, nodeGen
+ .getNextNodeId(scope)));
+
+ LogicalSchema s = load.getSchema();
+ if (s != null) {
+ exprOp.setResultType(s.getField(0).type);
+ }
+ exprOp.setColumn(load.getColNum());
+
+ // set input to POProject to the predecessor of foreach
+ List<PhysicalOperator> l = new ArrayList<PhysicalOperator>();
+ LOForEach foreach = load.getLOForEach();
+ Operator pred = foreach.getPlan().getPredecessors(foreach).get(0);
+ l.add(logToPhyMap.get(pred));
+ exprOp.setInputs(l);
+
+ logToPhyMap.put(load, exprOp);
+ currentPlan.add(exprOp);
+ }
+
+ public void visitLOForEach(LOForEach foreach) throws IOException {
+ String scope = DEFAULT_SCOPE;
+
+ List<PhysicalPlan> innerPlans = new ArrayList<PhysicalPlan>();
+
+ org.apache.pig.experimental.logical.relational.LogicalPlan inner = foreach.getInnerPlan();
+ LOGenerate gen = (LOGenerate)inner.getSinks().get(0);
+
+ List<LogicalExpressionPlan> exps = gen.getOutputPlans();
+ List<Operator> preds = inner.getPredecessors(gen);
+
+ currentPlans.push(currentPlan);
+
+ // we need to translate each predecessor of LOGenerate into a physical plan.
+ // The physical plan should contain the expression plan for this predecessor plus
+ // the subtree starting with this predecessor
+ for (int i=0; i<preds.size(); i++) {
+ currentPlan = new PhysicalPlan();
+ // translate the predecessors into a physical plan
+ PlanWalker childWalker = new SubtreeDependencyOrderWalker(inner, preds.get(i));
+ pushWalker(childWalker);
+ childWalker.walk(this);
+ popWalker();
+
+ // get the leaf of partially translated plan
+ PhysicalOperator leaf = currentPlan.getLeaves().get(0);
+
+ // add up the expressions
+ childWalker = new ReverseDependencyOrderWalker(exps.get(i));
+ pushWalker(childWalker);
+ childWalker.walk(new ExpToPhyTranslationVisitor(exps.get(i),
+ childWalker, gen, currentPlan, logToPhyMap ));
+ popWalker();
+
+ List<Operator> leaves = exps.get(i).getSinks();
+ for(Operator l: leaves) {
+ PhysicalOperator op = logToPhyMap.get(l);
+ if (l instanceof ProjectExpression) {
+ int input = ((ProjectExpression)l).getInputNum();
+ Operator pred = preds.get(input);
+ if (pred instanceof LOInnerLoad) {
+ List<PhysicalOperator> ll = currentPlan.getSuccessors(op);
+ PhysicalOperator[] ll2 = null;
+ if (ll != null) {
+ ll2 = ll.toArray(new PhysicalOperator[0]);
+ }
+ currentPlan.remove(op);
+ if (ll2 != null) {
+ for(PhysicalOperator suc: ll2) {
+ currentPlan.connect(leaf, suc);
+ }
+ }
+
+ innerPlans.add(currentPlan);
+
+ continue;
+ }
+ }
+
+ currentPlan.connect(leaf, op);
+ innerPlans.add(currentPlan);
+ }
+ }
+
+ currentPlan = currentPlans.pop();
+
+ // PhysicalOperator poGen = new POGenerate(new OperatorKey("",
+ // r.nextLong()), inputs, toBeFlattened);
+ boolean[] flatten = gen.getFlattenFlags();
+ List<Boolean> flattenList = new ArrayList<Boolean>();
+ for(boolean fl: flatten) {
+ flattenList.add(fl);
+ }
+ POForEach poFE = new POForEach(new OperatorKey(scope, nodeGen
+ .getNextNodeId(scope)), foreach.getRequestedParallelisam(), innerPlans, flattenList);
+ poFE.setAlias(foreach.getAlias());
+ poFE.setResultType(DataType.BAG);
+ logToPhyMap.put(foreach, poFE);
+ currentPlan.add(poFE);
+
+ // generate cannot have multiple inputs
+ List<Operator> op = foreach.getPlan().getPredecessors(foreach);
+
+ // generate may not have any predecessors
+ if (op == null)
+ return;
+
+ PhysicalOperator from = logToPhyMap.get(op.get(0));
+ try {
+ currentPlan.connect(from, poFE);
+ } catch (Exception e) {
+ int errCode = 2015;
+ String msg = "Invalid physical operators in the physical plan" ;
+ throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+ }
+
+ }
+
+
+ /**
+ * This function translates the new LogicalSchema into old Schema format required
+ * by PhysicalOperators
+ * @param lSchema LogicalSchema to be converted to Schema
+ * @return Schema that is converted from LogicalSchema
+ * @throws FrontendException
+ */
+ private static Schema translateSchema( LogicalSchema lSchema ) throws FrontendException {
+ if( lSchema == null ) {
+ return null;
+ }
+ Schema schema = new Schema();
+ List<LogicalFieldSchema> lFields = lSchema.getFields();
+ for( LogicalFieldSchema lField : lFields ) {
+ FieldSchema field = new FieldSchema( lField.alias, translateSchema(lField.schema),lField.type );
+ field.canonicalName = ((Long)lField.uid).toString();
+ schema.add(field);
+ }
+ return schema;
+ }
+
+ /**
+ * This function takes in a List of LogicalExpressionPlan and converts them to
+ * a list of PhysicalPlans
+ * @param plans
+ * @return
+ * @throws IOException
+ */
+ private List<PhysicalPlan> translateExpressionPlans(LogicalRelationalOperator loj,
+ List<LogicalExpressionPlan> plans ) throws IOException {
+ List<PhysicalPlan> exprPlans = new ArrayList<PhysicalPlan>();
+ if( plans == null || plans.size() == 0 ) {
+ return exprPlans;
+ }
+
+ // Save the current plan onto stack
+ currentPlans.push(currentPlan);
+
+ for( LogicalExpressionPlan lp : plans ) {
+ currentPlan = new PhysicalPlan();
+
+ // We spawn a new Dependency Walker and use it
+ // PlanWalker childWalker = currentWalker.spawnChildWalker(lp);
+ PlanWalker childWalker = new ReverseDependencyOrderWalker(lp);
+
+ // Save the old walker and use childWalker as current Walker
+ pushWalker(childWalker);
+
+ // We create a new ExpToPhyTranslationVisitor to walk the ExpressionPlan
+ currentWalker.walk(
+ new ExpToPhyTranslationVisitor(
+ currentWalker.getPlan(),
+ childWalker, loj, currentPlan, logToPhyMap ) );
+
+ exprPlans.add(currentPlan);
+ popWalker();
+ }
+
+ // Pop the current plan back out
+ currentPlan = currentPlans.pop();
+
+ return exprPlans;
+ }
+
+ @Override
+ public void visitLOStore(LOStore loStore) throws IOException {
+ String scope = DEFAULT_SCOPE;
+// System.err.println("Entering Store");
+ POStore store = new POStore(new OperatorKey(scope, nodeGen
+ .getNextNodeId(scope)));
+ store.setAlias(((LogicalRelationalOperator)loStore.getPlan().
+ getPredecessors(loStore).get(0)).getAlias());
+ store.setSFile(loStore.getOutputSpec());
+ // TODO Implement this
+ //store.setInputSpec(loStore.getInputSpec());
+// try {
+ // create a new schema for ourselves so that when
+ // we serialize we are not serializing objects that
+ // contain the schema - apparently Java tries to
+ // serialize the object containing the schema if
+ // we are trying to serialize the schema reference in
+ // the containing object. The schema here will be serialized
+ // in JobControlCompiler
+ store.setSchema(translateSchema( loStore.getSchema() ));
+// } catch (FrontendException e1) {
+// int errorCode = 1060;
+// String message = "Cannot resolve Store output schema";
+// throw new VisitorException(message, errorCode, PigException.BUG, e1);
+// }
+ currentPlan.add(store);
+
+ List<Operator> op = loStore.getPlan().getPredecessors(loStore);
+ PhysicalOperator from = null;
+
+ if(op != null) {
+ from = logToPhyMap.get(op.get(0));
+ // TODO Implement sorting when we have a LOSort (new) and LOLimit (new) operator ready
+// SortInfo sortInfo = null;
+// // if store's predecessor is limit,
+// // check limit's predecessor
+// if(op.get(0) instanceof LOLimit) {
+// op = loStore.getPlan().getPredecessors(op.get(0));
+// }
+// PhysicalOperator sortPhyOp = logToPhyMap.get(op.get(0));
+// // if this predecessor is a sort, get
+// // the sort info.
+// if(op.get(0) instanceof LOSort) {
+// sortInfo = ((POSort)sortPhyOp).getSortInfo();
+// }
+// store.setSortInfo(sortInfo);
+// } else {
+// int errCode = 2051;
+// String msg = "Did not find a predecessor for Store." ;
+// throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG);
+ }
+
+ try {
+ currentPlan.connect(from, store);
+ } catch (PlanException e) {
+ int errCode = 2015;
+ String msg = "Invalid physical operators in the physical plan" ;
+ throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+ }
+ logToPhyMap.put(loStore, store);
+// System.err.println("Exiting Store");
+ }
+
+ @Override
+ public void visitLOJoin(LOJoin loj) throws IOException {
+ String scope = DEFAULT_SCOPE;
+// System.err.println("Entering Join");
+
+ // List of join predicates
+ List<Operator> inputs = plan.getPredecessors(loj);
+
+ // mapping of inner join physical plans corresponding to inner physical operators.
+ MultiMap<PhysicalOperator, PhysicalPlan> joinPlans = new LinkedMultiMap<PhysicalOperator, PhysicalPlan>();
+
+ // Outer list corresponds to join predicates. Inner list is inner physical plan of each predicate.
+ List<List<PhysicalPlan>> ppLists = new ArrayList<List<PhysicalPlan>>();
+
+ // List of physical operator corresponding to join predicates.
+ List<PhysicalOperator> inp = new ArrayList<PhysicalOperator>();
+
+ // Outer list corresponds to join predicates and inner list corresponds to type of keys for each predicate.
+ List<List<Byte>> keyTypes = new ArrayList<List<Byte>>();
+
+ for (int i=0; i<inputs.size(); i++) {
+ Operator op = inputs.get(i);
+ if( ! ( op instanceof LogicalRelationalOperator ) ) {
+ continue;
+ }
+ LogicalRelationalOperator lop = (LogicalRelationalOperator)op;
+ PhysicalOperator physOp = logToPhyMap.get(op);
+ inp.add(physOp);
+ List<LogicalExpressionPlan> plans = (List<LogicalExpressionPlan>) loj.getJoinPlan(i);
+
+ // Convert the expression plan into physical Plan
+ List<PhysicalPlan> exprPlans = translateExpressionPlans(loj, plans);
+
+// currentPlans.push(currentPlan);
+// for (LogicalExpressionPlan lp : plans) {
+// currentPlan = new PhysicalPlan();
+//
+// // We spawn a new Dependency Walker and use it
+// PlanWalker childWalker = currentWalker.spawnChildWalker(lp);
+// pushWalker(childWalker);
+// // We create a new ExpToPhyTranslationVisitor to walk the ExpressionPlan
+// currentWalker.walk(
+// new ExpToPhyTranslationVisitor(currentWalker.getPlan(),
+// childWalker) );
+//
+// exprPlans.add(currentPlan);
+// popWalker();
+// }
+// currentPlan = currentPlans.pop();
+
+ ppLists.add(exprPlans);
+ joinPlans.put(physOp, exprPlans);
+
+ // Key could potentially be a tuple. So, we visit all exprPlans to get types of members of tuples.
+ List<Byte> tupleKeyMemberTypes = new ArrayList<Byte>();
+ for(PhysicalPlan exprPlan : exprPlans)
+ tupleKeyMemberTypes.add(exprPlan.getLeaves().get(0).getResultType());
+ keyTypes.add(tupleKeyMemberTypes);
+ }
+
+ if (loj.getJoinType() == LOJoin.JOINTYPE.SKEWED) {
+ POSkewedJoin skj;
+ try {
+ skj = new POSkewedJoin(new OperatorKey(scope,nodeGen.getNextNodeId(scope)),loj.getRequestedParallelisam(),
+ inp, loj.getInnerFlags());
+ skj.setAlias(loj.getAlias());
+ skj.setJoinPlans(joinPlans);
+ }
+ catch (Exception e) {
+ int errCode = 2015;
+ String msg = "Skewed Join creation failed";
+ throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+ }
+ skj.setResultType(DataType.TUPLE);
+
+ boolean[] innerFlags = loj.getInnerFlags();
+ for (int i=0; i < inputs.size(); i++) {
+ LogicalRelationalOperator op = (LogicalRelationalOperator) inputs.get(i);
+ if (!innerFlags[i]) {
+ try {
+ LogicalSchema s = op.getSchema();
+ // if the schema cannot be determined
+ if (s == null) {
+ throw new FrontendException();
+ }
+ skj.addSchema(translateSchema(s));
+ } catch (FrontendException e) {
+ int errCode = 2015;
+ String msg = "Couldn't set the schema for outer join" ;
+ throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+ }
+ } else {
+ // This will never be retrieved. It just guarantees that the index will be valid when
+ // MRCompiler is trying to read the schema
+ skj.addSchema(null);
+ }
+ }
+
+ currentPlan.add(skj);
+
+ for (Operator op : inputs) {
+ try {
+ currentPlan.connect(logToPhyMap.get(op), skj);
+ } catch (PlanException e) {
+ int errCode = 2015;
+ String msg = "Invalid physical operators in the physical plan" ;
+ throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+ }
+ }
+ logToPhyMap.put(loj, skj);
+ }
+ else if(loj.getJoinType() == LOJoin.JOINTYPE.REPLICATED) {
+
+ int fragment = 0;
+ POFRJoin pfrj;
+ try {
+ boolean []innerFlags = loj.getInnerFlags();
+ boolean isLeftOuter = false;
+ // We dont check for bounds issue as we assume that a join
+ // involves atleast two inputs
+ isLeftOuter = !innerFlags[1];
+
+ Tuple nullTuple = null;
+ if( isLeftOuter ) {
+ try {
+ // We know that in a Left outer join its only a two way
+ // join, so we assume index of 1 for the right input
+ LogicalSchema inputSchema = ((LogicalRelationalOperator)inputs.get(1)).getSchema();
+
+ // We check if we have a schema before the join
+ if(inputSchema == null) {
+ int errCode = 1109;
+ String msg = "Input (" + ((LogicalRelationalOperator) inputs.get(1)).getAlias() + ") " +
+ "on which outer join is desired should have a valid schema";
+ throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.INPUT);
+ }
+
+ // Using the schema we decide the number of columns/fields
+ // in the nullTuple
+ nullTuple = TupleFactory.getInstance().newTuple(inputSchema.size());
+ for(int j = 0; j < inputSchema.size(); j++) {
+ nullTuple.set(j, null);
+ }
+
+ } catch( FrontendException e ) {
+ int errCode = 2104;
+ String msg = "Error while determining the schema of input";
+ throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+ }
+ }
+
+ pfrj = new POFRJoin(new OperatorKey(scope,nodeGen.getNextNodeId(scope)),loj.getRequestedParallelisam(),
+ inp, ppLists, keyTypes, null, fragment, isLeftOuter, nullTuple);
+ pfrj.setAlias(loj.getAlias());
+ } catch (ExecException e1) {
+ int errCode = 2058;
+ String msg = "Unable to set index on newly create POLocalRearrange.";
+ throw new VisitorException(msg, errCode, PigException.BUG, e1);
+ }
+ pfrj.setResultType(DataType.TUPLE);
+ currentPlan.add(pfrj);
+ for (Operator op : inputs) {
+ try {
+ currentPlan.connect(logToPhyMap.get(op), pfrj);
+ } catch (PlanException e) {
+ int errCode = 2015;
+ String msg = "Invalid physical operators in the physical plan" ;
+ throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+ }
+ }
+ logToPhyMap.put(loj, pfrj);
+ }
+
+ else if (loj.getJoinType() == LOJoin.JOINTYPE.MERGE && validateMergeJoin(loj)) {
+
+ POMergeJoin smj;
+ try {
+ smj = new POMergeJoin(new OperatorKey(scope,nodeGen.getNextNodeId(scope)),loj.getRequestedParallelisam(),inp,joinPlans,keyTypes);
+ }
+ catch (Exception e) {
+ int errCode = 2042;
+ String msg = "Merge Join creation failed";
+ throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+ }
+
+ smj.setResultType(DataType.TUPLE);
+ currentPlan.add(smj);
+
+ for (Operator op : inputs) {
+ try {
+ currentPlan.connect(logToPhyMap.get(op), smj);
+ } catch (PlanException e) {
+ int errCode = 2015;
+ String msg = "Invalid physical operators in the physical plan" ;
+ throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+ }
+ }
+ logToPhyMap.put(loj, smj);
+ return;
+ }
+ else if (loj.getJoinType() == LOJoin.JOINTYPE.HASH){
+ POGlobalRearrange poGlobal = new POGlobalRearrange(new OperatorKey(
+ scope, nodeGen.getNextNodeId(scope)), loj
+ .getRequestedParallelisam());
+ poGlobal.setAlias(loj.getAlias());
+ POPackage poPackage = new POPackage(new OperatorKey(scope, nodeGen
+ .getNextNodeId(scope)), loj.getRequestedParallelisam());
+ poPackage.setAlias(loj.getAlias());
+ currentPlan.add(poGlobal);
+ currentPlan.add(poPackage);
+
+ int count = 0;
+ Byte type = null;
+
+ try {
+ currentPlan.connect(poGlobal, poPackage);
+ for (int i=0; i<inputs.size(); i++) {
+ Operator op = inputs.get(i);
+ List<LogicalExpressionPlan> plans =
+ (List<LogicalExpressionPlan>) loj.getJoinPlan(i);
+ POLocalRearrange physOp = new POLocalRearrange(new OperatorKey(
+ scope, nodeGen.getNextNodeId(scope)), loj
+ .getRequestedParallelisam());
+ List<PhysicalPlan> exprPlans = translateExpressionPlans(loj, plans);
+// currentPlans.push(currentPlan);
+// for (LogicalExpressionPlan lp : plans) {
+// currentPlan = new PhysicalPlan();
+// PlanWalker childWalker = currentWalker
+// .spawnChildWalker(lp);
+// pushWalker(childWalker);
+// //currentWalker.walk(this);
+// currentWalker.walk(
+// new ExpToPhyTranslationVisitor(currentWalker.getPlan(),
+// childWalker) );
+// exprPlans.add(currentPlan);
+// popWalker();
+//
+// }
+// currentPlan = currentPlans.pop();
+ try {
+ physOp.setPlans(exprPlans);
+ } catch (PlanException pe) {
+ int errCode = 2071;
+ String msg = "Problem with setting up local rearrange's plans.";
+ throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, pe);
+ }
+ try {
+ physOp.setIndex(count++);
+ } catch (ExecException e1) {
+ int errCode = 2058;
+ String msg = "Unable to set index on newly create POLocalRearrange.";
+ throw new VisitorException(msg, errCode, PigException.BUG, e1);
+ }
+ if (plans.size() > 1) {
+ type = DataType.TUPLE;
+ physOp.setKeyType(type);
+ } else {
+ type = exprPlans.get(0).getLeaves().get(0).getResultType();
+ physOp.setKeyType(type);
+ }
+ physOp.setResultType(DataType.TUPLE);
+
+ currentPlan.add(physOp);
+
+ try {
+ currentPlan.connect(logToPhyMap.get(op), physOp);
+ currentPlan.connect(physOp, poGlobal);
+ } catch (PlanException e) {
+ int errCode = 2015;
+ String msg = "Invalid physical operators in the physical plan" ;
+ throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+ }
+
+ }
+
+ } catch (PlanException e1) {
+ int errCode = 2015;
+ String msg = "Invalid physical operators in the physical plan" ;
+ throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e1);
+ }
+
+ poPackage.setKeyType(type);
+ poPackage.setResultType(DataType.TUPLE);
+ poPackage.setNumInps(count);
+
+ boolean[] innerFlags = loj.getInnerFlags();
+ poPackage.setInner(innerFlags);
+
+ List<PhysicalPlan> fePlans = new ArrayList<PhysicalPlan>();
+ List<Boolean> flattenLst = new ArrayList<Boolean>();
+
+ try{
+ for(int i=0;i< count;i++){
+ PhysicalPlan fep1 = new PhysicalPlan();
+ POProject feproj1 = new POProject(new OperatorKey(scope, nodeGen.getNextNodeId(scope)),
+ loj.getRequestedParallelisam(), i+1); //i+1 since the first column is the "group" field
+ feproj1.setAlias(loj.getAlias());
+ feproj1.setResultType(DataType.BAG);
+ feproj1.setOverloaded(false);
+ fep1.add(feproj1);
+ fePlans.add(fep1);
+ // the parser would have marked the side
+ // where we need to keep empty bags on
+ // non matched as outer (innerFlags[i] would be
+ // false)
+ if(!(innerFlags[i])) {
+ Operator joinInput = inputs.get(i);
+ // for outer join add a bincond
+ // which will project nulls when bag is
+ // empty
+ updateWithEmptyBagCheck(fep1, joinInput);
+ }
+ flattenLst.add(true);
+ }
+
+ POForEach fe = new POForEach(new OperatorKey(scope, nodeGen.getNextNodeId(scope)),
+ loj.getRequestedParallelisam(), fePlans, flattenLst );
+ fe.setAlias(loj.getAlias());
+ currentPlan.add(fe);
+ currentPlan.connect(poPackage, fe);
+ logToPhyMap.put(loj, fe);
+ }catch (PlanException e1) {
+ int errCode = 2015;
+ String msg = "Invalid physical operators in the physical plan" ;
+ throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e1);
+ }
+ }
+// System.err.println("Exiting Join");
+ }
+
+ /**
+ * updates plan with check for empty bag and if bag is empty to flatten a bag
+ * with as many null's as dictated by the schema
+ * @param fePlan the plan to update
+ * @param joinInput the relation for which the corresponding bag is being checked
+ * @throws FrontendException
+ */
+ public static void updateWithEmptyBagCheck(PhysicalPlan fePlan, Operator joinInput) throws FrontendException {
+ LogicalSchema inputSchema = null;
+ try {
+ inputSchema = ((LogicalRelationalOperator) joinInput).getSchema();
+
+
+ if(inputSchema == null) {
+ int errCode = 1109;
+ String msg = "Input (" + ((LogicalRelationalOperator) joinInput).getAlias() + ") " +
+ "on which outer join is desired should have a valid schema";
+ throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.INPUT);
+ }
+ } catch (FrontendException e) {
+ int errCode = 2104;
+ String msg = "Error while determining the schema of input";
+ throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+ }
+
+ CompilerUtils.addEmptyBagOuterJoin(fePlan, translateSchema(inputSchema));
+
+ }
+
+ private boolean validateMergeJoin(LOJoin loj) throws IOException{
+
+ List<Operator> preds = plan.getPredecessors(loj);
+
+ int errCode = 1101;
+ String errMsg = "Merge Join must have exactly two inputs.";
+ if(preds.size() != 2)
+ throw new LogicalToPhysicalTranslatorException(errMsg+" Found: "+preds.size(),errCode);
+
+ return mergeJoinValidator(preds,loj.getPlan());
+ }
+
+ private boolean mergeJoinValidator(List<Operator> preds,OperatorPlan lp) throws IOException {
+
+ int errCode = 1103;
+ String errMsg = "Merge join only supports Filter, Foreach, filter and Load as its predecessor. Found : ";
+ if(preds != null && !preds.isEmpty()){
+ for(Operator lo : preds){
+ // TODO Need to add LOForEach in this statement
+ if (!(lo instanceof LOFilter || lo instanceof LOLoad)) // || lo instanceof LOForEach
+ throw new LogicalToPhysicalTranslatorException(errMsg, errCode);
+ // All is good at this level. Visit predecessors now.
+ mergeJoinValidator(lp.getPredecessors(lo),lp);
+ }
+ }
+ // We visited everything and all is good.
+ return true;
+ }
+}
Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalPlan.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalPlan.java?rev=909165&r1=909164&r2=909165&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalPlan.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalPlan.java Thu Feb 11 22:12:36 2010
@@ -19,11 +19,13 @@
package org.apache.pig.experimental.logical.relational;
import java.io.IOException;
+import java.util.Iterator;
import java.util.List;
import org.apache.pig.experimental.plan.BaseOperatorPlan;
import org.apache.pig.experimental.plan.Operator;
import org.apache.pig.experimental.plan.OperatorPlan;
+import org.apache.pig.experimental.plan.OperatorSubPlan;
/**
* LogicalPlan is the logical view of relational operations Pig will execute
@@ -47,7 +49,7 @@
* be in the plan.
* @param after operator that will be after the new operator. This
* operator should already be in the plan. If after is null, then the
- * new operator will be a root.
+ * new operator will be a leaf.
* @throws IOException if add is already in the plan, or before or after
* are not in the plan.
*/
@@ -65,11 +67,11 @@
* be in the plan.
* @param after operator that will be after the new operator. This
* operator should already be in the plan. If after is null, then the
- * new operator will be a root.
+ * new operator will be a leaf.
* @throws IOException if add is already in the plan, or before or after
* are not in the plan.
*/
- public void add(List<LogicalRelationalOperator> before,
+ public void add(LogicalRelationalOperator[] before,
LogicalRelationalOperator newOper,
LogicalRelationalOperator after) throws IOException {
doAdd(null, newOper, after);
@@ -94,7 +96,7 @@
*/
public void add(LogicalRelationalOperator before,
LogicalRelationalOperator newOper,
- List<LogicalRelationalOperator> after) throws IOException {
+ LogicalRelationalOperator[] after) throws IOException {
doAdd(before, newOper, null);
for (LogicalRelationalOperator op : after) {
@@ -117,7 +119,7 @@
* @param afterFromPos Position in newOps's edges to connect after at.
* @param after operator that will be after the new operator. This
* operator should already be in the plan. If after is null, then the
- * new operator will be a root.
+ * new operator will be a leaf.
* @throws IOException if add is already in the plan, or before or after
* are not in the plan.
*/
@@ -181,6 +183,28 @@
}
+ /**
+ * Equality is checked by calling equals on every leaf in the plan. This
+ * assumes that plans are always connected graphs. It is somewhat
+ * inefficient since every leaf will test equality all the way to
+ * every root. But it is only intended for use in testing, so that
+ * should be ok. Checking predecessors (as opposed to successors) was
+ * chosen because splits (which have multiple successors) do not depend
+ * on order of outputs for correctness, whereas joins (with multiple
+ * predecessors) do. That is, reversing the outputs of split in the
+ * graph has no correctness implications, whereas reversing the inputs
+ * of join can. This method of doing equals will detect predecessors
+ * in different orders but not successors in different orders.
+ */
+ @Override
+ public boolean isEqual(OperatorPlan other) {
+ if (other == null || !(other instanceof LogicalPlan)) {
+ return false;
+ }
+
+ return super.isEqual(other);
+ }
+
private void doAdd(LogicalRelationalOperator before,
LogicalRelationalOperator newOper,
LogicalRelationalOperator after) throws IOException {
@@ -205,6 +229,5 @@
throw new IOException("Attempt to add operator " + op.getName() +
" which is already in the plan.");
}
- }
-
+ }
}
Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalPlanVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalPlanVisitor.java?rev=909165&r1=909164&r2=909165&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalPlanVisitor.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalPlanVisitor.java Thu Feb 11 22:12:36 2010
@@ -18,6 +18,10 @@
package org.apache.pig.experimental.logical.relational;
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.pig.experimental.plan.Operator;
import org.apache.pig.experimental.plan.OperatorPlan;
import org.apache.pig.experimental.plan.PlanVisitor;
import org.apache.pig.experimental.plan.PlanWalker;
@@ -30,13 +34,32 @@
protected LogicalPlanVisitor(OperatorPlan plan, PlanWalker walker) {
super(plan, walker);
- if (!(plan instanceof LogicalPlan)) {
- throw new RuntimeException(
- "LogicalPlanVisitor expects to visit logical plans");
+ Iterator<Operator> iter = plan.getOperators();
+ while(iter.hasNext()) {
+ if (!(iter.next() instanceof LogicalRelationalOperator)) {
+ throw new RuntimeException("LogicalPlanVisitor can only visit logical plan");
+ }
}
}
- public void visitLOLoad(LOLoad load) {
+ public void visitLOLoad(LOLoad load) throws IOException {
}
+ public void visitLOFilter(LOFilter filter) throws IOException {
+ }
+
+ public void visitLOStore(LOStore store) throws IOException {
+ }
+
+ public void visitLOJoin(LOJoin join) throws IOException {
+ }
+
+ public void visitLOForEach(LOForEach foreach) throws IOException {
+ }
+
+ public void visitLOGenerate(LOGenerate gen) throws IOException {
+ }
+
+ public void visitLOInnerLoad(LOInnerLoad load) throws IOException {
+ }
}
Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalRelationalOperator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalRelationalOperator.java?rev=909165&r1=909164&r2=909165&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalRelationalOperator.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalRelationalOperator.java Thu Feb 11 22:12:36 2010
@@ -28,7 +28,9 @@
abstract public class LogicalRelationalOperator extends Operator {
protected LogicalSchema schema;
- int requestedParallelism;
+ protected int requestedParallelism;
+ protected String alias;
+ protected int lineNum;
/**
*
@@ -51,16 +53,92 @@
super(name, plan);
requestedParallelism = rp;
}
+
+ /**
+ * Get the schema for the output of this relational operator. This does
+ * not merely return the schema variable. If schema is not yet set, this
+ * will attempt to construct it. Therefore it is abstract since each
+ * operator will need to construct its schema differently.
+ * @return the schema
+ */
+ abstract public LogicalSchema getSchema();
+ /**
+ * Reset the schema to null so that the next time getSchema is called
+ * the schema will be regenerated from scratch.
+ */
+ public void resetSchema() {
+ schema = null;
+ }
+
+ /**
+ * Get the requestedParallelism for this operator.
+ * @return requestedParallelsim
+ */
public int getRequestedParallelisam() {
return requestedParallelism;
}
/**
- * Get the schema for the output of this relational operator.
- * @return the schema
+ * Get the alias of this operator. That is, if the Pig Latin for this operator
+ * was 'X = sort W by $0' then the alias will be X. For store and split it will
+ * be the alias being stored or split. Note that because of this this alias
+ * is not guaranteed to be unique to a single operator.
+ * @return alias
*/
- abstract public LogicalSchema getSchema();
+ public String getAlias() {
+ return alias;
+ }
+
+ public void setAlias(String alias) {
+ this.alias = alias;
+ }
+
+ public void setRequestedParallelism(int parallel) {
+ this.requestedParallelism = parallel;
+ }
+
+ /**
+ * Get the line number in the submitted Pig Latin script where this operator
+ * occurred.
+ * @return line number
+ */
+ public int getLineNumber() {
+ return lineNum;
+ }
+
+ /**
+ * Only to be used by unit tests. This is a back door cheat to set the schema
+ * without having to calculate it. This should never be called by production
+ * code, only by tests.
+ * @param schema to set
+ */
+ public void neverUseForRealSetSchema(LogicalSchema schema) {
+ this.schema = schema;
+ }
+
+ /**
+ * Do some basic equality checks on two relational operators. Equality
+ * is defined here as having equal schemas and predecessors that are equal.
+ * This is intended to be used by operators' equals methods.
+ * @param other LogicalRelationalOperator to compare predecessors against
+ * @return true if the equals() methods of this node's predecessor(s) returns
+ * true when invoked with other's predecessor(s).
+ */
+ protected boolean checkEquality(LogicalRelationalOperator other) {
+ if (other == null) return false;
+ LogicalSchema s = getSchema();
+ LogicalSchema os = other.getSchema();
+ if (s == null && os == null) {
+ // intentionally blank
+ } else if (s == null || os == null) {
+ // one of them is null and one isn't
+ return false;
+ } else {
+ if (!s.isEqual(os)) return false;
+ }
+ return true;
+ }
}
Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalSchema.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalSchema.java?rev=909165&r1=909164&r2=909165&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalSchema.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalSchema.java Thu Feb 11 22:12:36 2010
@@ -23,8 +23,6 @@
import java.util.List;
import java.util.Map;
-import org.apache.pig.data.DataType;
-
/**
* Schema, from a logical perspective.
*/
@@ -32,10 +30,39 @@
public static class LogicalFieldSchema {
public String alias;
- public DataType type;
+ public byte type;
public long uid;
public LogicalSchema schema;
+
+ public LogicalFieldSchema(String alias, LogicalSchema schema, byte type) {
+ this(alias, schema, type, -1);
+ }
+
+ public LogicalFieldSchema(String alias, LogicalSchema schema, byte type, long uid) {
+ this.alias = alias;
+ this.type = type;
+ this.schema = schema;
+ this.uid = uid;
+ }
+
+ /**
+ * Equality is defined as having the same type and either the same schema
+ * or both null schema. Alias and uid are not checked.
+ */
+ public boolean isEqual(Object other) {
+ if (other instanceof LogicalFieldSchema) {
+ LogicalFieldSchema ofs = (LogicalFieldSchema)other;
+ if (type != ofs.type) return false;
+ if (schema == null && ofs.schema == null) return true;
+ if (schema == null) return false;
+ else return schema.isEqual(ofs.schema);
+ } else {
+ return false;
+ }
+ }
}
+
+
private List<LogicalFieldSchema> fields;
private Map<String, Integer> aliases;
@@ -51,8 +78,22 @@
*/
public void addField(LogicalFieldSchema field) {
fields.add(field);
- if (field.alias != null && field.alias.equals("")) {
+ if (field.alias != null && !field.alias.equals("")) {
aliases.put(field.alias, fields.size() - 1);
+ int index = 0;
+ while(index != -1) {
+ index = field.alias.indexOf("::", index);
+ if (index != -1) {
+ String a = field.alias.substring(index+2);
+ if (aliases.containsKey(a)) {
+ aliases.remove(a);
+ }else{
+ aliases.put(a, fields.size()-1);
+ }
+
+ index = index +2;
+ }
+ }
}
}
@@ -62,9 +103,12 @@
* @return field associated with alias, or null if no such field
*/
public LogicalFieldSchema getField(String alias) {
- Integer i = aliases.get(alias);
- if (i == null) return null;
- else return fields.get(i);
+ Integer index = aliases.get(alias);
+ if (index == null) {
+ return null;
+ }
+
+ return fields.get(index);
}
/**
@@ -88,8 +132,26 @@
* Get the size of the schema.
* @return size
*/
- public Integer size() {
- return null;
+ public int size() {
+ return fields.size();
+ }
+
+ /**
+ * Two schemas are equal if they are of equal size and their fields
+ * schemas considered in order are equal.
+ */
+ public boolean isEqual(Object other) {
+ if (other != null && other instanceof LogicalSchema) {
+ LogicalSchema os = (LogicalSchema)other;
+ if (size() != os.size()) return false;
+ for (int i = 0; i < size(); i++) {
+ if (!getField(i).isEqual(os.getField(i))) return false;
+ }
+ return true;
+ } else {
+ return false;
+ }
+
}
/**
Added: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/FilterAboveForeach.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/FilterAboveForeach.java?rev=909165&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/FilterAboveForeach.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/FilterAboveForeach.java Thu Feb 11 22:12:36 2010
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.experimental.logical.rules;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import org.apache.pig.data.DataType;
+import org.apache.pig.experimental.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.experimental.logical.expression.ProjectExpression;
+import org.apache.pig.experimental.logical.relational.LOFilter;
+import org.apache.pig.experimental.logical.relational.LOForEach;
+import org.apache.pig.experimental.logical.relational.LOGenerate;
+import org.apache.pig.experimental.logical.relational.LogicalPlan;
+import org.apache.pig.experimental.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.experimental.logical.relational.LogicalSchema;
+import org.apache.pig.experimental.plan.Operator;
+import org.apache.pig.experimental.plan.OperatorPlan;
+import org.apache.pig.experimental.plan.OperatorSubPlan;
+import org.apache.pig.experimental.plan.optimizer.Rule;
+import org.apache.pig.experimental.plan.optimizer.Transformer;
+import org.apache.pig.impl.util.Pair;
+
+public class FilterAboveForeach extends Rule {
+
+ public FilterAboveForeach(String n) {
+ super(n);
+ }
+
+ @Override
+ protected OperatorPlan buildPattern() {
+ // the pattern that this rule looks for
+ // is foreach -> flatten -> filter
+ LogicalPlan plan = new LogicalPlan();
+ LogicalRelationalOperator foreach = new LOForEach(plan);
+ LogicalRelationalOperator filter = new LOFilter(plan);
+
+ plan.add(foreach);
+ plan.add(filter);
+ plan.connect(foreach, filter);
+
+ return plan;
+ }
+
+ @Override
+ public Transformer getNewTransformer() {
+ return new FilterAboveFlattenTransformer();
+ }
+
+ public class FilterAboveFlattenTransformer extends Transformer {
+
+ LOFilter filter = null;
+ LOForEach foreach = null;
+ LogicalRelationalOperator forEachPred = null;
+ OperatorSubPlan subPlan = null;
+
+ @Override
+ public boolean check(OperatorPlan matched) throws IOException {
+ Iterator<Operator> iter = matched.getOperators();
+ while( iter.hasNext() ) {
+ Operator op = iter.next();
+ if( op instanceof LOForEach ) {
+ foreach = (LOForEach)op;
+ break;
+ }
+ }
+
+ // This would be a strange case
+ if( foreach == null ) return false;
+
+ List<Operator> sinks = foreach.getInnerPlan().getSinks();
+ if( ! ( sinks.size() == 1 && (sinks.get(0) instanceof LOGenerate ) ) ) {
+ return false;
+ }
+
+// LOGenerate generate = (LOGenerate)sinks.get(0);
+// // We check if we have any flatten
+// // Other cases are handled by other Optimizers
+// boolean hasFlatten = false;
+// for( boolean flattenFlag : generate.getFlattenFlags() ) {
+// if( flattenFlag ) {
+// hasFlatten = true;
+// break;
+// }
+// }
+//
+// if( !hasFlatten )
+// return false;
+
+ iter = matched.getOperators();
+ while( iter.hasNext() ) {
+ Operator op = iter.next();
+ if( ( op instanceof LOFilter ) ) {
+ filter = (LOFilter)op;
+ break;
+ }
+ }
+
+ // This is for cheating, we look up more than one filter in the plan
+ while( filter != null ) {
+ // Get uids of Filter
+ Set<Long> uids = getFilterProjectionUids(filter);
+
+
+ // See if the previous operators have uids from project
+ List<Operator> preds = currentPlan.getPredecessors(foreach);
+ for(int j=0; j< preds.size(); j++) {
+ LogicalRelationalOperator logRelOp = (LogicalRelationalOperator)preds.get(j);
+ if (hasAll( logRelOp, uids) ) {
+ // If any of the uids are of complex type then we
+ // cannot think about moving this filter.
+ if( containsComplexType(logRelOp.getSchema(), uids ) ) {
+ break;
+ }
+ forEachPred = (LogicalRelationalOperator) preds.get(j);
+ return true;
+ }
+ }
+
+ // Chances are there are filters below this filter which can be
+ // moved up. So searching for those filters
+ List<Operator> successors = currentPlan.getSuccessors(filter);
+ if( successors != null && successors.size() > 0 &&
+ successors.get(0) instanceof LOFilter ) {
+ filter = (LOFilter)successors.get(0);
+ } else {
+ filter = null;
+ }
+ }
+ return false;
+ }
+
+ private Set<Long> getFilterProjectionUids( LOFilter filter ) {
+ Set<Long> uids = new HashSet<Long>();
+ if( filter != null ) {
+ LogicalExpressionPlan filterPlan = filter.getFilterPlan();
+ Iterator<Operator> iter = filterPlan.getOperators();
+ Operator op = null;
+ while( iter.hasNext() ) {
+ op = iter.next();
+ if( op instanceof ProjectExpression ) {
+ uids.add(((ProjectExpression)op).getUid() );
+ }
+ }
+ }
+ return uids;
+ }
+
+ // check if a relational operator contains all of the specified uids
+ private boolean hasAll(LogicalRelationalOperator op, Set<Long> uids) {
+ LogicalSchema schema = op.getSchema();
+ List<LogicalSchema.LogicalFieldSchema> fields = schema.getFields();
+ Set<Long> all = new HashSet<Long>();
+ for(LogicalSchema.LogicalFieldSchema f:fields) {
+ all.add(f.uid);
+ }
+ return all.containsAll(uids);
+ }
+
+ /**
+ * This function checks if any of the fields mentioned are a Bug or Tuple.
+ * If so we cannot move the filter above the operator having the schema
+ * @param schema Schema of the operator we are investigating
+ * @param uids Uids of the fields we are checking for
+ * @return true if one of the uid belong to a complex type
+ */
+ private boolean containsComplexType(LogicalSchema schema, Set<Long> uids) {
+ List<LogicalSchema.LogicalFieldSchema> fields = schema.getFields();
+
+ for(LogicalSchema.LogicalFieldSchema f:fields) {
+ if ( ( f.type == DataType.BAG || f.type == DataType.TUPLE ) ) {
+ if( uids.contains( f.uid ) ) {
+ return true;
+ }
+ if( f.schema != null && containsComplexType(f.schema, uids) ) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public OperatorPlan reportChanges() {
+ return subPlan;
+ }
+
+ @Override
+ public void transform(OperatorPlan matched) throws IOException {
+
+ List<Operator> opSet = currentPlan.getPredecessors(filter);
+ if( ! ( opSet != null && opSet.size() > 0 ) ) {
+ return;
+ }
+ Operator filterPred = opSet.get(0);
+
+ opSet = currentPlan.getSuccessors(filter);
+ if( ! ( opSet != null && opSet.size() > 0 ) ) {
+ return;
+ }
+ Operator filterSuc = opSet.get(0);
+
+ subPlan = new OperatorSubPlan(currentPlan);
+
+ // Steps below do the following
+ /*
+ * ForEachPred
+ * |
+ * ForEach
+ * |
+ * Filter*
+ * |
+ * FilterPred
+ * ( has to be a Filter or ForEach )
+ * |
+ * Filter
+ * |
+ * FilterSuc
+ *
+ * |
+ * |
+ * Transforms into
+ * |
+ * \/
+ *
+ * ForEachPred
+ * |
+ * Filter
+ * |
+ * ForEach
+ * |
+ * Filter*
+ * |
+ * FilterPred
+ * ( has to be a Filter or ForEach )
+ * |
+ * FilterSuc
+ */
+
+ Pair<Integer, Integer> forEachPredPlaces = currentPlan.disconnect(forEachPred, foreach);
+ Pair<Integer, Integer> filterPredPlaces = currentPlan.disconnect(filterPred, filter);
+ Pair<Integer, Integer> filterSucPlaces = currentPlan.disconnect(filter, filterSuc);
+
+ currentPlan.connect(forEachPred, forEachPredPlaces.first, filter, filterPredPlaces.second);
+ currentPlan.connect(filter, filterSucPlaces.first, foreach, forEachPredPlaces.second);
+ currentPlan.connect(filterPred, filterPredPlaces.first, filterSuc, filterSucPlaces.second);
+
+ subPlan.add(forEachPred);
+ subPlan.add(foreach);
+ subPlan.add(filterPred);
+ subPlan.add(filter);
+ subPlan.add(filterSuc);
+ }
+ }
+
+}
Added: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/MergeFilter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/MergeFilter.java?rev=909165&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/MergeFilter.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/MergeFilter.java Thu Feb 11 22:12:36 2010
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.experimental.logical.rules;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.pig.experimental.logical.expression.AndExpression;
+import org.apache.pig.experimental.logical.expression.LogicalExpression;
+import org.apache.pig.experimental.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.experimental.logical.relational.LOFilter;
+import org.apache.pig.experimental.logical.relational.LogicalPlan;
+import org.apache.pig.experimental.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.experimental.plan.Operator;
+import org.apache.pig.experimental.plan.OperatorPlan;
+import org.apache.pig.experimental.plan.OperatorSubPlan;
+import org.apache.pig.experimental.plan.optimizer.Rule;
+import org.apache.pig.experimental.plan.optimizer.Transformer;
+import org.apache.pig.impl.util.Pair;
+
+public class MergeFilter extends Rule {
+
+ public MergeFilter(String n) {
+ super(n);
+ }
+
+ @Override
+ public Transformer getNewTransformer() {
+ return new MergeFilterTransformer();
+ }
+
+ public class MergeFilterTransformer extends Transformer {
+
+ private OperatorSubPlan subPlan;
+
+ @Override
+ public boolean check(OperatorPlan matched) throws IOException {
+ LOFilter filter = (LOFilter)matched.getSources().get(0);
+ List<Operator> succeds = currentPlan.getSuccessors(filter);
+ // if this filter is followed by another filter, we should combine them
+ if (succeds != null && succeds.size() == 1) {
+ if (succeds.get(0) instanceof LOFilter) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public void transform(OperatorPlan matched) throws IOException {
+ subPlan = new OperatorSubPlan(currentPlan);
+
+ LOFilter filter = (LOFilter)matched.getSources().get(0);
+
+ subPlan.add(filter);
+
+ List<Operator> succeds = currentPlan.getSuccessors(filter);
+ if (succeds != null && succeds.size()== 1 && (succeds.get(0) instanceof LOFilter)) {
+ LOFilter next = (LOFilter)succeds.get(0);
+ combineFilterCond(filter, next);
+ Pair<Integer, Integer> p1 = currentPlan.disconnect(filter, next);
+ List<Operator> ll = currentPlan.getSuccessors(next);
+ if (ll!= null && ll.size()>0) {
+ Operator op = ll.get(0);
+ Pair<Integer, Integer> p2 = currentPlan.disconnect(next, op);
+ currentPlan.connect(filter, p1.first, op, p2.second);
+ subPlan.add(op);
+ }
+
+ currentPlan.remove(next);
+ }
+ }
+
+ @Override
+ public OperatorPlan reportChanges() {
+ return subPlan;
+ }
+
+ // combine the condition of two filters. The condition of second filter
+ // is added into the condition of first filter with an AND operator.
+ private void combineFilterCond(LOFilter f1, LOFilter f2) throws IOException {
+ LogicalExpressionPlan p1 = f1.getFilterPlan();
+ LogicalExpressionPlan p2 = f2.getFilterPlan();
+ LogicalExpressionPlan andPlan = new LogicalExpressionPlan();
+
+ // add existing operators
+ Iterator<Operator> iter = p1.getOperators();
+ while(iter.hasNext()) {
+ andPlan.add(iter.next());
+ }
+
+ iter = p2.getOperators();
+ while(iter.hasNext()) {
+ andPlan.add(iter.next());
+ }
+
+ // add all connections
+ iter = p1.getOperators();
+ while(iter.hasNext()) {
+ Operator n = iter.next();
+ List<Operator> l = p1.getPredecessors(n);
+ if (l != null) {
+ for(Operator op: l) {
+ andPlan.connect(op, n);
+ }
+ }
+ }
+
+ iter = p2.getOperators();
+ while(iter.hasNext()) {
+ Operator n = iter.next();
+ List<Operator> l = p2.getPredecessors(n);
+ if (l != null) {
+ for(Operator op: l) {
+ andPlan.connect(op, n);
+ }
+ }
+ }
+
+ // create an AND
+ new AndExpression(andPlan, (LogicalExpression)p1.getSources().get(0), (LogicalExpression)p2.getSources().get(0));
+
+ f1.setFilterPlan(andPlan);
+ }
+
+ }
+
+ @Override
+ protected OperatorPlan buildPattern() {
+ // the pattern that this rule looks for
+ // is filter operator
+ LogicalPlan plan = new LogicalPlan();
+ LogicalRelationalOperator op = new LOFilter(plan);
+ plan.add(op);
+
+ return plan;
+ }
+}
+
Added: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/PushUpFilter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/PushUpFilter.java?rev=909165&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/PushUpFilter.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/PushUpFilter.java Thu Feb 11 22:12:36 2010
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.experimental.logical.rules;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.pig.experimental.logical.expression.ProjectExpression;
+import org.apache.pig.experimental.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.experimental.logical.relational.LOFilter;
+import org.apache.pig.experimental.logical.relational.LOJoin;
+import org.apache.pig.experimental.logical.relational.LogicalPlan;
+import org.apache.pig.experimental.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.experimental.logical.relational.LogicalSchema;
+import org.apache.pig.experimental.plan.Operator;
+import org.apache.pig.experimental.plan.OperatorPlan;
+import org.apache.pig.experimental.plan.OperatorSubPlan;
+import org.apache.pig.experimental.plan.optimizer.Rule;
+import org.apache.pig.experimental.plan.optimizer.Transformer;
+import org.apache.pig.impl.util.Pair;
+
+public class PushUpFilter extends Rule {
+
+ public PushUpFilter(String n) {
+ super(n);
+ }
+
+ @Override
+ public Transformer getNewTransformer() {
+ return new PushUpFilterTransformer();
+ }
+
+ public class PushUpFilterTransformer extends Transformer {
+
+ private OperatorSubPlan subPlan;
+
+ @Override
+ public boolean check(OperatorPlan matched) throws IOException {
+ // check if it is inner join
+ LOJoin join = (LOJoin)matched.getSources().get(0);
+ boolean[] innerFlags = join.getInnerFlags();
+ for(boolean inner: innerFlags) {
+ if (!inner){
+ return false;
+ }
+ }
+
+ Operator next = matched.getSinks().get(0);
+ while(next != null && next instanceof LOFilter) {
+ LOFilter filter = (LOFilter)next;
+ LogicalExpressionPlan filterPlan = filter.getFilterPlan();
+
+ // collect all uids used in the filter plan
+ Set<Long> uids = new HashSet<Long>();
+ Iterator<Operator> iter = filterPlan.getOperators();
+ while(iter.hasNext()) {
+ Operator op = iter.next();
+ if (op instanceof ProjectExpression) {
+ long uid = ((ProjectExpression)op).getUid();
+ uids.add(uid);
+ }
+ }
+
+ List<Operator> preds = currentPlan.getPredecessors(join);
+
+ for(int j=0; j<preds.size(); j++) {
+ if (hasAll((LogicalRelationalOperator)preds.get(j), uids)) {
+ return true;
+ }
+ }
+
+ // if current filter can not move up, check next filter
+ List<Operator> l = currentPlan.getSuccessors(filter);
+ if (l != null) {
+ next = l.get(0);
+ } else {
+ next = null;
+ }
+ }
+
+ return false;
+ }
+
+ @Override
+ public void transform(OperatorPlan matched) throws IOException {
+ subPlan = new OperatorSubPlan(currentPlan);
+
+ LOJoin join = (LOJoin)matched.getSources().get(0);
+ subPlan.add(join);
+
+ Operator next = matched.getSinks().get(0);
+ while(next != null && next instanceof LOFilter) {
+ LOFilter filter = (LOFilter)next;
+ subPlan.add(filter);
+
+ LogicalExpressionPlan filterPlan = filter.getFilterPlan();
+
+ // collect all uids used in the filter plan
+ Set<Long> uids = new HashSet<Long>();
+ Iterator<Operator> iter = filterPlan.getOperators();
+ while(iter.hasNext()) {
+ Operator op = iter.next();
+ if (op instanceof ProjectExpression) {
+ long uid = ((ProjectExpression)op).getUid();
+ uids.add(uid);
+ }
+ }
+
+ // find the farthest predecessor that has all the fields
+ LogicalRelationalOperator input = join;
+ List<Operator> preds = currentPlan.getPredecessors(input);
+ while(preds != null) {
+ boolean found = false;
+ for(int j=0; j<preds.size(); j++) {
+ if (hasAll((LogicalRelationalOperator)preds.get(j), uids)) {
+ input = (LogicalRelationalOperator)preds.get(j);
+ subPlan.add(input);
+ found = true;
+ break;
+ }
+ }
+ if (!found) {
+ break;
+ }
+ preds = currentPlan.getPredecessors(input);
+ }
+
+ if (input != join) {
+ Operator pred = currentPlan.getPredecessors(filter).get(0);
+ Operator succed = currentPlan.getSuccessors(filter).get(0);
+ subPlan.add(succed);
+
+ Pair<Integer, Integer> p1 = currentPlan.disconnect(pred, filter);
+ Pair<Integer, Integer> p2 = currentPlan.disconnect(filter, succed);
+ currentPlan.connect(pred, p1.first, succed, p2.second);
+
+ succed = currentPlan.getSuccessors(input).get(0);
+ Pair<Integer, Integer> p3 = currentPlan.disconnect(input, succed);
+ currentPlan.connect(input, p3.first, filter, 0);
+ currentPlan.connect(filter, 0, succed, p3.second);
+
+ return;
+ }
+
+ List<Operator> l = currentPlan.getSuccessors(filter);
+ if (l != null) {
+ next = l.get(0);
+ } else {
+ next = null;
+ }
+ }
+ }
+
+ // check if a relational operator contains all of the specified uids
+ private boolean hasAll(LogicalRelationalOperator op, Set<Long> uids) {
+ LogicalSchema schema = op.getSchema();
+ List<LogicalSchema.LogicalFieldSchema> fields = schema.getFields();
+ Set<Long> all = new HashSet<Long>();
+ for(LogicalSchema.LogicalFieldSchema f:fields) {
+ all.add(f.uid);
+ }
+
+ return all.containsAll(uids);
+ }
+
+ @Override
+ public OperatorPlan reportChanges() {
+ return subPlan;
+ }
+
+ }
+
+ @Override
+ protected OperatorPlan buildPattern() {
+ // the pattern that this rule looks for
+ // is join -> filter
+ LogicalPlan plan = new LogicalPlan();
+ LogicalRelationalOperator op1 = new LOJoin(plan);
+ LogicalRelationalOperator op2 = new LOFilter(plan);
+ plan.add(op1);
+ plan.add(op2);
+ plan.connect(op1, op2);
+
+ return plan;
+ }
+}
+
Added: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/SplitFilter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/SplitFilter.java?rev=909165&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/SplitFilter.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/SplitFilter.java Thu Feb 11 22:12:36 2010
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.experimental.logical.rules;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.pig.experimental.logical.expression.AndExpression;
+import org.apache.pig.experimental.logical.expression.LogicalExpression;
+import org.apache.pig.experimental.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.experimental.logical.relational.LOFilter;
+import org.apache.pig.experimental.logical.relational.LogicalPlan;
+import org.apache.pig.experimental.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.experimental.plan.Operator;
+import org.apache.pig.experimental.plan.OperatorPlan;
+import org.apache.pig.experimental.plan.OperatorSubPlan;
+import org.apache.pig.experimental.plan.optimizer.Rule;
+import org.apache.pig.experimental.plan.optimizer.Transformer;
+import org.apache.pig.impl.util.Pair;
+
+public class SplitFilter extends Rule {
+
+ public SplitFilter(String n) {
+ super(n);
+ }
+
+ @Override
+ public Transformer getNewTransformer() {
+ return new SplitFilterTransformer();
+ }
+
+ public class SplitFilterTransformer extends Transformer {
+ private OperatorSubPlan subPlan;
+
+ @Override
+ public boolean check(OperatorPlan matched) throws IOException {
+ LOFilter filter = (LOFilter)matched.getSources().get(0);
+ LogicalExpressionPlan cond = filter.getFilterPlan();
+ LogicalExpression root = (LogicalExpression) cond.getSources().get(0);
+ if (root instanceof AndExpression) {
+ return true;
+ }
+
+ return false;
+ }
+
+ @Override
+ public void transform(OperatorPlan matched) throws IOException {
+ subPlan = new OperatorSubPlan(currentPlan);
+
+ // split one LOFilter into 2 by "AND"
+ LOFilter filter = (LOFilter)matched.getSources().get(0);
+ LogicalExpressionPlan cond = filter.getFilterPlan();
+ LogicalExpression root = (LogicalExpression) cond.getSources().get(0);
+ if (!(root instanceof AndExpression)) {
+ return;
+ }
+ LogicalExpressionPlan op1 = new LogicalExpressionPlan();
+ op1.add((LogicalExpression)cond.getSuccessors(root).get(0));
+ fillSubPlan(cond, op1, (LogicalExpression)cond.getSuccessors(root).get(0));
+
+ LogicalExpressionPlan op2 = new LogicalExpressionPlan();
+ op2.add((LogicalExpression)cond.getSuccessors(root).get(1));
+ fillSubPlan(cond, op2, (LogicalExpression)cond.getSuccessors(root).get(1));
+
+ filter.setFilterPlan(op1);
+ LOFilter filter2 = new LOFilter((LogicalPlan)currentPlan, op2);
+ currentPlan.add(filter2);
+
+ Operator succed = null;
+ try {
+ List<Operator> succeds = currentPlan.getSuccessors(filter);
+ if (succeds != null) {
+ succed = succeds.get(0);
+ subPlan.add(succed);
+ Pair<Integer, Integer> p = currentPlan.disconnect(filter, succed);
+ currentPlan.connect(filter2, 0, succed, p.second);
+ currentPlan.connect(filter, p.first, filter2, 0);
+ } else {
+ currentPlan.connect(filter, 0, filter2, 0);
+ }
+ }catch(Exception e) {
+ throw new IOException(e);
+ }
+
+ subPlan.add(filter);
+ subPlan.add(filter2);
+ }
+
+ @Override
+ public OperatorPlan reportChanges() {
+ return subPlan;
+ }
+
+ private void fillSubPlan(OperatorPlan origPlan,
+ OperatorPlan subPlan, Operator startOp) throws IOException {
+
+ List<Operator> l = origPlan.getSuccessors(startOp);
+ if (l != null) {
+ for(Operator le: l) {
+ subPlan.add(le);
+ subPlan.connect(startOp, le);
+ fillSubPlan(origPlan, subPlan, le);
+ }
+ }
+ }
+
+ }
+
+ @Override
+ protected OperatorPlan buildPattern() {
+ // the pattern that this rule looks for
+ // is filter
+ LogicalPlan plan = new LogicalPlan();
+ LogicalRelationalOperator op2 = new LOFilter(plan);
+ plan.add(op2);
+
+ return plan;
+ }
+}
+