You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2010/02/11 23:12:43 UTC

svn commit: r909165 [3/6] - in /hadoop/pig/trunk: src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/experimental/logical/ src/org/apache/pig/experimental/logical/expression/ src/org/apache/pig/experimental/logica...

Added: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogToPhyTranslationVisitor.java?rev=909165&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogToPhyTranslationVisitor.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogToPhyTranslationVisitor.java Thu Feb 11 22:12:36 2010
@@ -0,0 +1,812 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.experimental.logical.relational;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Stack;
+import org.apache.pig.PigException;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.LogicalToPhysicalTranslatorException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.experimental.logical.expression.ExpToPhyTranslationVisitor;
+import org.apache.pig.experimental.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.experimental.logical.expression.ProjectExpression;
+import org.apache.pig.experimental.logical.relational.LogicalSchema.LogicalFieldSchema;
+import org.apache.pig.experimental.plan.DependencyOrderWalker;
+import org.apache.pig.experimental.plan.Operator;
+import org.apache.pig.experimental.plan.OperatorPlan;
+import org.apache.pig.experimental.plan.PlanWalker;
+import org.apache.pig.experimental.plan.ReverseDependencyOrderWalker;
+import org.apache.pig.experimental.plan.SubtreeDependencyOrderWalker;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
+import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.CompilerUtils;
+import org.apache.pig.impl.util.LinkedMultiMap;
+import org.apache.pig.impl.util.MultiMap;
+
+public class LogToPhyTranslationVisitor extends LogicalPlanVisitor {
+    
+    public LogToPhyTranslationVisitor(OperatorPlan plan) {
+        super(plan, new DependencyOrderWalker(plan));
+        currentPlan = new PhysicalPlan();
+        logToPhyMap = new HashMap<Operator, PhysicalOperator>();
+        currentPlans = new Stack<PhysicalPlan>();
+    }
+
+    protected Map<Operator, PhysicalOperator> logToPhyMap;
+
+    protected Stack<PhysicalPlan> currentPlans;
+
+    protected PhysicalPlan currentPlan;
+
+    protected NodeIdGenerator nodeGen = NodeIdGenerator.getGenerator();
+
+    protected PigContext pc;
+    
+    public void setPigContext(PigContext pc) {
+        this.pc = pc;
+    }
+
+    public PhysicalPlan getPhysicalPlan() {
+        return currentPlan;
+    }
+    
+    @Override
+    public void visitLOLoad(LOLoad loLoad) throws IOException {
+        String scope = DEFAULT_SCOPE;
+//        System.err.println("Entering Load");
+        // The last parameter here is set to true as we assume all files are 
+        // splittable due to LoadStore Refactor
+        POLoad load = new POLoad(new OperatorKey(scope, nodeGen
+                .getNextNodeId(scope)), true);
+        load.setAlias(loLoad.getAlias());
+        load.setLFile(loLoad.getFileSpec());
+        load.setPc(pc);
+        load.setResultType(DataType.BAG);
+        load.setSignature(loLoad.getAlias());
+        currentPlan.add(load);
+        logToPhyMap.put(loLoad, load);
+
+        // Load is typically a root operator, but in the multiquery
+        // case it might have a store as a predecessor.
+        List<Operator> op = loLoad.getPlan().getPredecessors(loLoad);
+        PhysicalOperator from;
+        
+        if(op != null) {
+            from = logToPhyMap.get(op.get(0));
+            try {
+                currentPlan.connect(from, load);
+            } catch (PlanException e) {
+                int errCode = 2015;
+                String msg = "Invalid physical operators in the physical plan" ;
+                throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+            }
+        }
+//        System.err.println("Exiting Load");
+    }
+    
+    @Override
+    public void visitLOFilter(LOFilter filter) throws IOException {
+        String scope = DEFAULT_SCOPE;
+//        System.err.println("Entering Filter");
+        POFilter poFilter = new POFilter(new OperatorKey(scope, nodeGen
+                .getNextNodeId(scope)), filter.getRequestedParallelisam());
+        poFilter.setAlias(filter.getAlias());
+        poFilter.setResultType(DataType.BAG);
+        currentPlan.add(poFilter);
+        logToPhyMap.put(filter, poFilter);
+        currentPlans.push(currentPlan);
+
+        currentPlan = new PhysicalPlan();
+
+//        PlanWalker childWalker = currentWalker
+//                .spawnChildWalker(filter.getFilterPlan());
+        PlanWalker childWalker = new ReverseDependencyOrderWalker(filter.getFilterPlan());
+        pushWalker(childWalker);
+        //currentWalker.walk(this);
+        currentWalker.walk(
+                new ExpToPhyTranslationVisitor( currentWalker.getPlan(), 
+                        childWalker, filter, currentPlan, logToPhyMap ) );
+        popWalker();
+
+        poFilter.setPlan(currentPlan);
+        currentPlan = currentPlans.pop();
+
+        List<Operator> op = filter.getPlan().getPredecessors(filter);
+
+        PhysicalOperator from;
+        if(op != null) {
+            from = logToPhyMap.get(op.get(0));
+        } else {
+            int errCode = 2051;
+            String msg = "Did not find a predecessor for Filter." ;
+            throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG);
+        }
+        
+        try {
+            currentPlan.connect(from, poFilter);
+        } catch (PlanException e) {
+            int errCode = 2015;
+            String msg = "Invalid physical operators in the physical plan" ;
+            throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+        }
+//        System.err.println("Exiting Filter");
+    }
+    
+    public void visitLOInnerLoad(LOInnerLoad load) throws IOException {
+        String scope = DEFAULT_SCOPE;
+        
+        POProject exprOp = new POProject(new OperatorKey(scope, nodeGen
+              .getNextNodeId(scope)));
+             
+        LogicalSchema s = load.getSchema();
+        if (s != null) {
+            exprOp.setResultType(s.getField(0).type);
+        }
+        exprOp.setColumn(load.getColNum());
+        
+        // set input to POProject to the predecessor of foreach
+        List<PhysicalOperator> l = new ArrayList<PhysicalOperator>();
+        LOForEach foreach = load.getLOForEach();        
+        Operator pred = foreach.getPlan().getPredecessors(foreach).get(0);
+        l.add(logToPhyMap.get(pred));
+        exprOp.setInputs(l);
+        
+        logToPhyMap.put(load, exprOp);
+        currentPlan.add(exprOp);
+    }
+    
+    public void visitLOForEach(LOForEach foreach) throws IOException {
+        String scope = DEFAULT_SCOPE;
+        
+        List<PhysicalPlan> innerPlans = new ArrayList<PhysicalPlan>();
+        
+        org.apache.pig.experimental.logical.relational.LogicalPlan inner = foreach.getInnerPlan();
+        LOGenerate gen = (LOGenerate)inner.getSinks().get(0);
+       
+        List<LogicalExpressionPlan> exps = gen.getOutputPlans();
+        List<Operator> preds = inner.getPredecessors(gen);
+
+        currentPlans.push(currentPlan);
+        
+        // we need to translate each predecessor of LOGenerate into a physical plan.
+        // The physical plan should contain the expression plan for this predecessor plus
+        // the subtree starting with this predecessor
+        for (int i=0; i<preds.size(); i++) {
+            currentPlan = new PhysicalPlan();
+            // translate the predecessors into a physical plan
+            PlanWalker childWalker = new SubtreeDependencyOrderWalker(inner, preds.get(i));
+            pushWalker(childWalker);
+            childWalker.walk(this);
+            popWalker();
+            
+            // get the leaf of partially translated plan
+            PhysicalOperator leaf = currentPlan.getLeaves().get(0);
+            
+            // add up the expressions
+            childWalker = new ReverseDependencyOrderWalker(exps.get(i));
+            pushWalker(childWalker);
+            childWalker.walk(new ExpToPhyTranslationVisitor(exps.get(i),
+                    childWalker, gen, currentPlan, logToPhyMap ));            
+            popWalker();
+            
+            List<Operator> leaves = exps.get(i).getSinks();
+            for(Operator l: leaves) {
+                PhysicalOperator op = logToPhyMap.get(l);
+                if (l instanceof ProjectExpression) {
+                    int input = ((ProjectExpression)l).getInputNum();
+                    Operator pred = preds.get(input);
+                    if (pred instanceof LOInnerLoad) {
+                        List<PhysicalOperator> ll = currentPlan.getSuccessors(op);     
+                        PhysicalOperator[] ll2 = null;
+                        if (ll != null) {
+                            ll2 = ll.toArray(new PhysicalOperator[0]);
+                        }
+                        currentPlan.remove(op);
+                        if (ll2 != null) {                        	
+                            for(PhysicalOperator suc: ll2) {
+                                currentPlan.connect(leaf, suc);
+                            }
+                        }
+                        
+                        innerPlans.add(currentPlan);
+                        
+                        continue;
+                    }
+                }
+                
+                currentPlan.connect(leaf, op);                
+                innerPlans.add(currentPlan);
+            }                        
+        }
+        
+        currentPlan = currentPlans.pop();
+
+        // PhysicalOperator poGen = new POGenerate(new OperatorKey("",
+        // r.nextLong()), inputs, toBeFlattened);
+        boolean[] flatten = gen.getFlattenFlags();
+        List<Boolean> flattenList = new ArrayList<Boolean>();
+        for(boolean fl: flatten) {
+            flattenList.add(fl);
+        }
+        POForEach poFE = new POForEach(new OperatorKey(scope, nodeGen
+                .getNextNodeId(scope)), foreach.getRequestedParallelisam(), innerPlans, flattenList);
+        poFE.setAlias(foreach.getAlias());
+        poFE.setResultType(DataType.BAG);
+        logToPhyMap.put(foreach, poFE);
+        currentPlan.add(poFE); 
+
+        // generate cannot have multiple inputs
+        List<Operator> op = foreach.getPlan().getPredecessors(foreach);
+
+        // generate may not have any predecessors
+        if (op == null)
+            return;
+
+        PhysicalOperator from = logToPhyMap.get(op.get(0));
+        try {
+           currentPlan.connect(from, poFE);
+        } catch (Exception e) {
+            int errCode = 2015;
+            String msg = "Invalid physical operators in the physical plan" ;
+            throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+        }
+
+    }
+    
+
+    /**
+     * This function translates the new LogicalSchema into old Schema format required
+     * by PhysicalOperators
+     * @param lSchema LogicalSchema to be converted to Schema
+     * @return Schema that is converted from LogicalSchema
+     * @throws FrontendException 
+     */
+    private static Schema translateSchema( LogicalSchema lSchema ) throws FrontendException {
+        if( lSchema == null ) {
+            return null;
+        }
+        Schema schema = new Schema();
+        List<LogicalFieldSchema> lFields = lSchema.getFields();
+        for( LogicalFieldSchema lField : lFields ) {
+            FieldSchema field = new FieldSchema( lField.alias, translateSchema(lField.schema),lField.type );
+            field.canonicalName = ((Long)lField.uid).toString();
+            schema.add(field);
+        }        
+        return schema;
+    }
+    
+    /**
+     * This function takes in a List of LogicalExpressionPlan and converts them to 
+     * a list of PhysicalPlans
+     * @param plans
+     * @return
+     * @throws IOException 
+     */
+    private List<PhysicalPlan> translateExpressionPlans(LogicalRelationalOperator loj,
+            List<LogicalExpressionPlan> plans ) throws IOException {
+        List<PhysicalPlan> exprPlans = new ArrayList<PhysicalPlan>();
+        if( plans == null || plans.size() == 0 ) {
+            return exprPlans;
+        }
+        
+        // Save the current plan onto stack
+        currentPlans.push(currentPlan);
+        
+        for( LogicalExpressionPlan lp : plans ) {
+            currentPlan = new PhysicalPlan();
+            
+            // We spawn a new Dependency Walker and use it 
+            // PlanWalker childWalker = currentWalker.spawnChildWalker(lp);
+            PlanWalker childWalker = new ReverseDependencyOrderWalker(lp);
+            
+            // Save the old walker and use childWalker as current Walker
+            pushWalker(childWalker);
+            
+            // We create a new ExpToPhyTranslationVisitor to walk the ExpressionPlan
+            currentWalker.walk(
+                    new ExpToPhyTranslationVisitor( 
+                            currentWalker.getPlan(), 
+                            childWalker, loj, currentPlan, logToPhyMap ) );
+            
+            exprPlans.add(currentPlan);
+            popWalker();
+        }
+        
+        // Pop the current plan back out
+        currentPlan = currentPlans.pop();
+
+        return exprPlans;
+    }
+    
+    @Override
+    public void visitLOStore(LOStore loStore) throws IOException {
+        String scope = DEFAULT_SCOPE;
+//        System.err.println("Entering Store");
+        POStore store = new POStore(new OperatorKey(scope, nodeGen
+                .getNextNodeId(scope)));
+        store.setAlias(((LogicalRelationalOperator)loStore.getPlan().
+                getPredecessors(loStore).get(0)).getAlias());
+        store.setSFile(loStore.getOutputSpec());
+        // TODO Implement this
+        //store.setInputSpec(loStore.getInputSpec());
+//        try {
+            // create a new schema for ourselves so that when
+            // we serialize we are not serializing objects that
+            // contain the schema - apparently Java tries to
+            // serialize the object containing the schema if
+            // we are trying to serialize the schema reference in
+            // the containing object. The schema here will be serialized
+            // in JobControlCompiler
+            store.setSchema(translateSchema( loStore.getSchema() ));
+//        } catch (FrontendException e1) {
+//            int errorCode = 1060;
+//            String message = "Cannot resolve Store output schema";  
+//            throw new VisitorException(message, errorCode, PigException.BUG, e1);    
+//        }
+        currentPlan.add(store);
+        
+        List<Operator> op = loStore.getPlan().getPredecessors(loStore); 
+        PhysicalOperator from = null;
+        
+        if(op != null) {
+            from = logToPhyMap.get(op.get(0));
+            // TODO Implement sorting when we have a LOSort (new) and LOLimit (new) operator ready
+//            SortInfo sortInfo = null;
+//            // if store's predecessor is limit,
+//            // check limit's predecessor
+//            if(op.get(0) instanceof LOLimit) {
+//                op = loStore.getPlan().getPredecessors(op.get(0));
+//            }
+//            PhysicalOperator sortPhyOp = logToPhyMap.get(op.get(0));
+//            // if this predecessor is a sort, get
+//            // the sort info.
+//            if(op.get(0) instanceof LOSort) {
+//                sortInfo = ((POSort)sortPhyOp).getSortInfo();
+//            }
+//            store.setSortInfo(sortInfo);
+//        } else {
+//            int errCode = 2051;
+//            String msg = "Did not find a predecessor for Store." ;
+//            throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG);            
+        }        
+
+        try {
+            currentPlan.connect(from, store);
+        } catch (PlanException e) {
+            int errCode = 2015;
+            String msg = "Invalid physical operators in the physical plan" ;
+            throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+        }
+        logToPhyMap.put(loStore, store);
+//        System.err.println("Exiting Store");
+    }
+    
+    @Override
+    public void visitLOJoin(LOJoin loj) throws IOException {
+        String scope = DEFAULT_SCOPE;
+//        System.err.println("Entering Join");
+        
+        // List of join predicates
+        List<Operator> inputs = plan.getPredecessors(loj);
+        
+        // mapping of inner join physical plans corresponding to inner physical operators.
+        MultiMap<PhysicalOperator, PhysicalPlan> joinPlans = new LinkedMultiMap<PhysicalOperator, PhysicalPlan>();
+        
+        // Outer list corresponds to join predicates. Inner list is inner physical plan of each predicate.
+        List<List<PhysicalPlan>> ppLists = new ArrayList<List<PhysicalPlan>>();
+        
+        // List of physical operator corresponding to join predicates.
+        List<PhysicalOperator> inp = new ArrayList<PhysicalOperator>();
+        
+        // Outer list corresponds to join predicates and inner list corresponds to type of keys for each predicate.
+        List<List<Byte>> keyTypes = new ArrayList<List<Byte>>();
+        
+        for (int i=0; i<inputs.size(); i++) {
+            Operator op = inputs.get(i);
+            if( ! ( op instanceof LogicalRelationalOperator ) ) {
+                continue;
+            }
+            LogicalRelationalOperator lop = (LogicalRelationalOperator)op;
+            PhysicalOperator physOp = logToPhyMap.get(op);
+            inp.add(physOp);
+            List<LogicalExpressionPlan> plans = (List<LogicalExpressionPlan>) loj.getJoinPlan(i);
+            
+            // Convert the expression plan into physical Plan
+            List<PhysicalPlan> exprPlans = translateExpressionPlans(loj, plans);
+
+//            currentPlans.push(currentPlan);
+//            for (LogicalExpressionPlan lp : plans) {
+//                currentPlan = new PhysicalPlan();
+//                
+//                // We spawn a new Dependency Walker and use it 
+//                PlanWalker childWalker = currentWalker.spawnChildWalker(lp);
+//                pushWalker(childWalker);
+//                // We create a new ExpToPhyTranslationVisitor to walk the ExpressionPlan
+//                currentWalker.walk(
+//                        new ExpToPhyTranslationVisitor(currentWalker.getPlan(), 
+//                                childWalker) );
+//                
+//                exprPlans.add(currentPlan);
+//                popWalker();
+//            }
+//            currentPlan = currentPlans.pop();
+            
+            ppLists.add(exprPlans);
+            joinPlans.put(physOp, exprPlans);
+            
+            // Key could potentially be a tuple. So, we visit all exprPlans to get types of members of tuples.
+            List<Byte> tupleKeyMemberTypes = new ArrayList<Byte>();
+            for(PhysicalPlan exprPlan : exprPlans)
+                tupleKeyMemberTypes.add(exprPlan.getLeaves().get(0).getResultType());
+            keyTypes.add(tupleKeyMemberTypes);
+        }
+
+        if (loj.getJoinType() == LOJoin.JOINTYPE.SKEWED) {
+            POSkewedJoin skj;
+            try {
+                skj = new POSkewedJoin(new OperatorKey(scope,nodeGen.getNextNodeId(scope)),loj.getRequestedParallelisam(),
+                                            inp, loj.getInnerFlags());
+                skj.setAlias(loj.getAlias());
+                skj.setJoinPlans(joinPlans);
+            }
+            catch (Exception e) {
+                int errCode = 2015;
+                String msg = "Skewed Join creation failed";
+                throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+            }
+            skj.setResultType(DataType.TUPLE);
+            
+            boolean[] innerFlags = loj.getInnerFlags();
+            for (int i=0; i < inputs.size(); i++) {
+                LogicalRelationalOperator op = (LogicalRelationalOperator) inputs.get(i);
+                if (!innerFlags[i]) {
+                    try {
+                        LogicalSchema s = op.getSchema();
+                        // if the schema cannot be determined
+                        if (s == null) {
+                            throw new FrontendException();
+                        }
+                        skj.addSchema(translateSchema(s));
+                    } catch (FrontendException e) {
+                        int errCode = 2015;
+                        String msg = "Couldn't set the schema for outer join" ;
+                        throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+                    }
+                } else {
+                    // This will never be retrieved. It just guarantees that the index will be valid when
+                    // MRCompiler is trying to read the schema
+                    skj.addSchema(null);
+                }
+            }
+            
+            currentPlan.add(skj);
+
+            for (Operator op : inputs) {
+                try {
+                    currentPlan.connect(logToPhyMap.get(op), skj);
+                } catch (PlanException e) {
+                    int errCode = 2015;
+                    String msg = "Invalid physical operators in the physical plan" ;
+                    throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+                }
+            }
+            logToPhyMap.put(loj, skj);
+        }
+        else if(loj.getJoinType() == LOJoin.JOINTYPE.REPLICATED) {
+            
+            int fragment = 0;
+            POFRJoin pfrj;
+            try {
+                boolean []innerFlags = loj.getInnerFlags();
+                boolean isLeftOuter = false;
+                // We dont check for bounds issue as we assume that a join 
+                // involves atleast two inputs
+                isLeftOuter = !innerFlags[1];
+                
+                Tuple nullTuple = null;
+                if( isLeftOuter ) {
+                    try {
+                        // We know that in a Left outer join its only a two way 
+                        // join, so we assume index of 1 for the right input                        
+                        LogicalSchema inputSchema = ((LogicalRelationalOperator)inputs.get(1)).getSchema();                     
+                        
+                        // We check if we have a schema before the join
+                        if(inputSchema == null) {
+                            int errCode = 1109;
+                            String msg = "Input (" + ((LogicalRelationalOperator) inputs.get(1)).getAlias() + ") " +
+                            "on which outer join is desired should have a valid schema";
+                            throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.INPUT);
+                        }
+                        
+                        // Using the schema we decide the number of columns/fields 
+                        // in the nullTuple
+                        nullTuple = TupleFactory.getInstance().newTuple(inputSchema.size());
+                        for(int j = 0; j < inputSchema.size(); j++) {
+                            nullTuple.set(j, null);
+                        }
+                        
+                    } catch( FrontendException e ) {
+                        int errCode = 2104;
+                        String msg = "Error while determining the schema of input";
+                        throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+                    }
+                }
+                
+                pfrj = new POFRJoin(new OperatorKey(scope,nodeGen.getNextNodeId(scope)),loj.getRequestedParallelisam(),
+                                            inp, ppLists, keyTypes, null, fragment, isLeftOuter, nullTuple);
+                pfrj.setAlias(loj.getAlias());
+            } catch (ExecException e1) {
+                int errCode = 2058;
+                String msg = "Unable to set index on newly create POLocalRearrange.";
+                throw new VisitorException(msg, errCode, PigException.BUG, e1);
+            }
+            pfrj.setResultType(DataType.TUPLE);
+            currentPlan.add(pfrj);
+            for (Operator op : inputs) {
+                try {
+                    currentPlan.connect(logToPhyMap.get(op), pfrj);
+                } catch (PlanException e) {
+                    int errCode = 2015;
+                    String msg = "Invalid physical operators in the physical plan" ;
+                    throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+                }
+            }
+            logToPhyMap.put(loj, pfrj);
+        }
+        
+        else if (loj.getJoinType() == LOJoin.JOINTYPE.MERGE && validateMergeJoin(loj)) {
+            
+            POMergeJoin smj;
+            try {
+                smj = new POMergeJoin(new OperatorKey(scope,nodeGen.getNextNodeId(scope)),loj.getRequestedParallelisam(),inp,joinPlans,keyTypes);
+            }
+            catch (Exception e) {
+                int errCode = 2042;
+                String msg = "Merge Join creation failed";
+                throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+            }
+
+            smj.setResultType(DataType.TUPLE);
+            currentPlan.add(smj);
+
+            for (Operator op : inputs) {
+                try {
+                    currentPlan.connect(logToPhyMap.get(op), smj);
+                } catch (PlanException e) {
+                    int errCode = 2015;
+                    String msg = "Invalid physical operators in the physical plan" ;
+                    throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+                }
+            }
+            logToPhyMap.put(loj, smj);
+            return;
+        }
+        else if (loj.getJoinType() == LOJoin.JOINTYPE.HASH){
+            POGlobalRearrange poGlobal = new POGlobalRearrange(new OperatorKey(
+                    scope, nodeGen.getNextNodeId(scope)), loj
+                    .getRequestedParallelisam());
+            poGlobal.setAlias(loj.getAlias());
+            POPackage poPackage = new POPackage(new OperatorKey(scope, nodeGen
+                    .getNextNodeId(scope)), loj.getRequestedParallelisam());
+            poPackage.setAlias(loj.getAlias());
+            currentPlan.add(poGlobal);
+            currentPlan.add(poPackage);
+            
+            int count = 0;
+            Byte type = null;
+            
+            try {
+                currentPlan.connect(poGlobal, poPackage);
+                for (int i=0; i<inputs.size(); i++) {       
+                    Operator op = inputs.get(i);
+                    List<LogicalExpressionPlan> plans = 
+                        (List<LogicalExpressionPlan>) loj.getJoinPlan(i);
+                    POLocalRearrange physOp = new POLocalRearrange(new OperatorKey(
+                            scope, nodeGen.getNextNodeId(scope)), loj
+                            .getRequestedParallelisam());
+                    List<PhysicalPlan> exprPlans = translateExpressionPlans(loj, plans);
+//                    currentPlans.push(currentPlan);
+//                    for (LogicalExpressionPlan lp : plans) {
+//                        currentPlan = new PhysicalPlan();
+//                        PlanWalker childWalker = currentWalker
+//                                .spawnChildWalker(lp);
+//                        pushWalker(childWalker);
+//                        //currentWalker.walk(this);
+//                        currentWalker.walk(
+//                                new ExpToPhyTranslationVisitor(currentWalker.getPlan(), 
+//                                        childWalker) );
+//                        exprPlans.add(currentPlan);
+//                        popWalker();
+//
+//                    }
+//                    currentPlan = currentPlans.pop();
+                    try {
+                        physOp.setPlans(exprPlans);
+                    } catch (PlanException pe) {
+                        int errCode = 2071;
+                        String msg = "Problem with setting up local rearrange's plans.";
+                        throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, pe);
+                    }
+                    try {
+                        physOp.setIndex(count++);
+                    } catch (ExecException e1) {
+                        int errCode = 2058;
+                        String msg = "Unable to set index on newly create POLocalRearrange.";
+                        throw new VisitorException(msg, errCode, PigException.BUG, e1);
+                    }
+                    if (plans.size() > 1) {
+                        type = DataType.TUPLE;
+                        physOp.setKeyType(type);
+                    } else {
+                        type = exprPlans.get(0).getLeaves().get(0).getResultType();
+                        physOp.setKeyType(type);
+                    }
+                    physOp.setResultType(DataType.TUPLE);
+
+                    currentPlan.add(physOp);
+
+                    try {
+                        currentPlan.connect(logToPhyMap.get(op), physOp);
+                        currentPlan.connect(physOp, poGlobal);
+                    } catch (PlanException e) {
+                        int errCode = 2015;
+                        String msg = "Invalid physical operators in the physical plan" ;
+                        throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+                    }
+
+                }
+                
+            } catch (PlanException e1) {
+                int errCode = 2015;
+                String msg = "Invalid physical operators in the physical plan" ;
+                throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e1);
+            }
+
+            poPackage.setKeyType(type);
+            poPackage.setResultType(DataType.TUPLE);
+            poPackage.setNumInps(count);
+            
+            boolean[] innerFlags = loj.getInnerFlags();
+            poPackage.setInner(innerFlags);
+            
+            List<PhysicalPlan> fePlans = new ArrayList<PhysicalPlan>();
+            List<Boolean> flattenLst = new ArrayList<Boolean>();
+            
+            try{
+                for(int i=0;i< count;i++){
+                    PhysicalPlan fep1 = new PhysicalPlan();
+                    POProject feproj1 = new POProject(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), 
+                            loj.getRequestedParallelisam(), i+1); //i+1 since the first column is the "group" field
+                    feproj1.setAlias(loj.getAlias());
+                    feproj1.setResultType(DataType.BAG);
+                    feproj1.setOverloaded(false);
+                    fep1.add(feproj1);
+                    fePlans.add(fep1);
+                    // the parser would have marked the side
+                    // where we need to keep empty bags on
+                    // non matched as outer (innerFlags[i] would be
+                    // false)
+                    if(!(innerFlags[i])) {
+                        Operator joinInput = inputs.get(i);
+                        // for outer join add a bincond
+                        // which will project nulls when bag is
+                        // empty
+                        updateWithEmptyBagCheck(fep1, joinInput);
+                    }
+                    flattenLst.add(true);
+                }
+                
+                POForEach fe = new POForEach(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), 
+                        loj.getRequestedParallelisam(), fePlans, flattenLst );
+                fe.setAlias(loj.getAlias());
+                currentPlan.add(fe);
+                currentPlan.connect(poPackage, fe);
+                logToPhyMap.put(loj, fe);
+            }catch (PlanException e1) {
+                int errCode = 2015;
+                String msg = "Invalid physical operators in the physical plan" ;
+                throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e1);
+            }
+        }
+//        System.err.println("Exiting Join");
+    }
+    
+    /**
+     * updates plan with check for empty bag and if bag is empty to flatten a bag
+     * with as many null's as dictated by the schema
+     * @param fePlan the plan to update
+     * @param joinInput the relation for which the corresponding bag is being checked
+     * @throws FrontendException 
+     */
+    public static void updateWithEmptyBagCheck(PhysicalPlan fePlan, Operator joinInput) throws FrontendException {
+        LogicalSchema inputSchema = null;
+        try {
+            inputSchema = ((LogicalRelationalOperator) joinInput).getSchema();
+         
+          
+            if(inputSchema == null) {
+                int errCode = 1109;
+                String msg = "Input (" + ((LogicalRelationalOperator) joinInput).getAlias() + ") " +
+                        "on which outer join is desired should have a valid schema";
+                throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.INPUT);
+            }
+        } catch (FrontendException e) {
+            int errCode = 2104;
+            String msg = "Error while determining the schema of input";
+            throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+        }
+        
+        CompilerUtils.addEmptyBagOuterJoin(fePlan, translateSchema(inputSchema));
+        
+    }
+
+    private boolean validateMergeJoin(LOJoin loj) throws IOException{
+        
+        List<Operator> preds = plan.getPredecessors(loj);
+
+        int errCode = 1101;
+        String errMsg = "Merge Join must have exactly two inputs.";
+        if(preds.size() != 2)
+            throw new LogicalToPhysicalTranslatorException(errMsg+" Found: "+preds.size(),errCode);
+        
+        return mergeJoinValidator(preds,loj.getPlan());
+    }
+    
+    private boolean mergeJoinValidator(List<Operator> preds,OperatorPlan lp) throws IOException {
+        
+        int errCode = 1103;
+        String errMsg = "Merge join only supports Filter, Foreach, filter and Load as its predecessor. Found : ";
+        if(preds != null && !preds.isEmpty()){
+            for(Operator lo : preds){
+                // TODO Need to add LOForEach in this statement
+                if (!(lo instanceof LOFilter || lo instanceof LOLoad)) // || lo instanceof LOForEach
+                 throw new LogicalToPhysicalTranslatorException(errMsg, errCode);
+                // All is good at this level. Visit predecessors now.
+                mergeJoinValidator(lp.getPredecessors(lo),lp);
+            }
+        }
+        // We visited everything and all is good.
+        return true;
+    }
+}

Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalPlan.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalPlan.java?rev=909165&r1=909164&r2=909165&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalPlan.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalPlan.java Thu Feb 11 22:12:36 2010
@@ -19,11 +19,13 @@
 package org.apache.pig.experimental.logical.relational;
 
 import java.io.IOException;
+import java.util.Iterator;
 import java.util.List;
 
 import org.apache.pig.experimental.plan.BaseOperatorPlan;
 import org.apache.pig.experimental.plan.Operator;
 import org.apache.pig.experimental.plan.OperatorPlan;
+import org.apache.pig.experimental.plan.OperatorSubPlan;
 
 /**
  * LogicalPlan is the logical view of relational operations Pig will execute 
@@ -47,7 +49,7 @@
      * be in the plan.
      * @param after operator  that will be after the new operator.  This
      * operator should already be in the plan.  If after is null, then the
-     * new operator will be a root.
+     * new operator will be a leaf.
      * @throws IOException if add is already in the plan, or before or after
      * are not in the plan.
      */
@@ -65,11 +67,11 @@
      * be in the plan.
      * @param after operator  that will be after the new operator.  This
      * operator should already be in the plan.  If after is null, then the
-     * new operator will be a root.
+     * new operator will be a leaf.
      * @throws IOException if add is already in the plan, or before or after
      * are not in the plan.
      */
-    public void add(List<LogicalRelationalOperator> before,
+    public void add(LogicalRelationalOperator[] before,
                     LogicalRelationalOperator newOper,
                     LogicalRelationalOperator after) throws IOException {
         doAdd(null, newOper, after);
@@ -94,7 +96,7 @@
      */
     public void add(LogicalRelationalOperator before,
                     LogicalRelationalOperator newOper,
-                    List<LogicalRelationalOperator> after) throws IOException {
+                    LogicalRelationalOperator[] after) throws IOException {
         doAdd(before, newOper, null);
         
         for (LogicalRelationalOperator op : after) {
@@ -117,7 +119,7 @@
      * @param afterFromPos Position in newOps's edges to connect after at.
      * @param after operator  that will be after the new operator.  This
      * operator should already be in the plan.  If after is null, then the
-     * new operator will be a root.
+     * new operator will be a leaf.
      * @throws IOException if add is already in the plan, or before or after
      * are not in the plan.
      */
@@ -181,6 +183,28 @@
         
     }
     
+    /**
+     * Equality is checked by calling equals on every leaf in the plan.  This
+     * assumes that plans are always connected graphs.  It is somewhat 
+     * inefficient since every leaf will test equality all the way to 
+     * every root.  But it is only intended for use in testing, so that
+     * should be ok.  Checking predecessors (as opposed to successors) was
+     * chosen because splits (which have multiple successors) do not depend
+     * on order of outputs for correctness, whereas joins (with multiple
+     * predecessors) do.  That is, reversing the outputs of split in the
+     * graph has no correctness implications, whereas reversing the inputs
+     * of join can.  This method of doing equals will detect predecessors
+     * in different orders but not successors in different orders.
+     */
+    @Override
+    public boolean isEqual(OperatorPlan other) {
+        if (other == null || !(other instanceof LogicalPlan)) {
+            return false;
+        }
+        
+        return super.isEqual(other);   
+    }
+    
     private void doAdd(LogicalRelationalOperator before,
                        LogicalRelationalOperator newOper,
                        LogicalRelationalOperator after) throws IOException {
@@ -205,6 +229,5 @@
             throw new IOException("Attempt to add operator " + op.getName() + 
                 " which is already in the plan.");
         }
-    }
-        
+     }           
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalPlanVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalPlanVisitor.java?rev=909165&r1=909164&r2=909165&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalPlanVisitor.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalPlanVisitor.java Thu Feb 11 22:12:36 2010
@@ -18,6 +18,10 @@
 
 package org.apache.pig.experimental.logical.relational;
 
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.pig.experimental.plan.Operator;
 import org.apache.pig.experimental.plan.OperatorPlan;
 import org.apache.pig.experimental.plan.PlanVisitor;
 import org.apache.pig.experimental.plan.PlanWalker;
@@ -30,13 +34,32 @@
     protected LogicalPlanVisitor(OperatorPlan plan, PlanWalker walker) {
         super(plan, walker);
         
-        if (!(plan instanceof LogicalPlan)) {
-            throw new RuntimeException(
-                "LogicalPlanVisitor expects to visit logical plans");
+        Iterator<Operator> iter = plan.getOperators();
+        while(iter.hasNext()) {
+            if (!(iter.next() instanceof LogicalRelationalOperator)) {
+                throw new RuntimeException("LogicalPlanVisitor can only visit logical plan");
+            }
         }
     }
     
-    public void visitLOLoad(LOLoad load) {
+    public void visitLOLoad(LOLoad load) throws IOException {
     }
 
+    public void visitLOFilter(LOFilter filter) throws IOException {
+    }
+    
+    public void visitLOStore(LOStore store) throws IOException {
+    }
+    
+    public void visitLOJoin(LOJoin join) throws IOException {
+    }
+    
+    public void visitLOForEach(LOForEach foreach) throws IOException {
+    }
+    
+    public void visitLOGenerate(LOGenerate gen) throws IOException {
+    }
+    
+    public void visitLOInnerLoad(LOInnerLoad load) throws IOException {
+    }
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalRelationalOperator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalRelationalOperator.java?rev=909165&r1=909164&r2=909165&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalRelationalOperator.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalRelationalOperator.java Thu Feb 11 22:12:36 2010
@@ -28,7 +28,9 @@
 abstract public class LogicalRelationalOperator extends Operator {
     
     protected LogicalSchema schema;
-    int requestedParallelism;
+    protected int requestedParallelism;
+    protected String alias;
+    protected int lineNum;
 
     /**
      * 
@@ -51,16 +53,92 @@
         super(name, plan);
         requestedParallelism = rp;
     }
+
+    /**
+     * Get the schema for the output of this relational operator.  This does
+     * not merely return the schema variable.  If schema is not yet set, this
+     * will attempt to construct it.  Therefore it is abstract since each
+     * operator will need to construct its schema differently.
+     * @return the schema
+     */
+    abstract public LogicalSchema getSchema();
     
+    /**
+     * Reset the schema to null so that the next time getSchema is called
+     * the schema will be regenerated from scratch.
+     */
+    public void resetSchema() {
+        schema = null;
+    }
+ 
 
+    /**
+     * Get the requestedParallelism for this operator.
+     * @return requestedParallelsim
+     */
     public int getRequestedParallelisam() {
         return requestedParallelism;
     } 
 
     /**
-     * Get the schema for the output of this relational operator.
-     * @return the schema
+     * Get the alias of this operator.  That is, if the Pig Latin for this operator
+     * was 'X = sort W by $0' then the alias will be X.  For store and split it will
+     * be the alias being stored or split.  Note that because of this this alias
+     * is not guaranteed to be unique to a single operator.
+     * @return alias
      */
-    abstract public LogicalSchema getSchema();
 
+    public String getAlias() {
+        return alias;
+    }
+    
+    public void setAlias(String alias) {
+        this.alias = alias;
+    }
+    
+    public void setRequestedParallelism(int parallel) {
+        this.requestedParallelism = parallel;
+    }
+
+    /**
+     * Get the line number in the submitted Pig Latin script where this operator
+     * occurred.
+     * @return line number
+     */
+    public int getLineNumber() {
+        return lineNum;
+    }
+    
+    /**
+     * Only to be used by unit tests.  This is a back door cheat to set the schema
+     * without having to calculate it.  This should never be called by production
+     * code, only by tests.
+     * @param schema to set
+     */
+    public void neverUseForRealSetSchema(LogicalSchema schema) {
+        this.schema = schema;
+    }
+    
+    /**
+     * Do some basic equality checks on two relational operators.  Equality
+     * is defined here as having equal schemas and  predecessors that are equal.
+     * This is intended to be used by operators' equals methods.
+     * @param other LogicalRelationalOperator to compare predecessors against
+     * @return true if the equals() methods of this node's predecessor(s) returns
+     * true when invoked with other's predecessor(s).
+     */
+    protected boolean checkEquality(LogicalRelationalOperator other) {
+        if (other == null) return false;
+        LogicalSchema s = getSchema();
+        LogicalSchema os = other.getSchema();
+        if (s == null && os == null) {
+            // intentionally blank
+        } else if (s == null || os == null) {
+            // one of them is null and one isn't
+            return false;
+        } else {
+            if (!s.isEqual(os)) return false;
+        }
+        return true;
+    } 
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalSchema.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalSchema.java?rev=909165&r1=909164&r2=909165&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalSchema.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalSchema.java Thu Feb 11 22:12:36 2010
@@ -23,8 +23,6 @@
 import java.util.List;
 import java.util.Map;
 
-import org.apache.pig.data.DataType;
-
 /**
  * Schema, from a logical perspective.
  */
@@ -32,10 +30,39 @@
 
     public static class LogicalFieldSchema {
         public String alias;
-        public DataType type;
+        public byte type;
         public long uid;
         public LogicalSchema schema;
+
+        public LogicalFieldSchema(String alias, LogicalSchema schema,  byte type) {
+            this(alias, schema, type, -1);
+        }
+        
+        public LogicalFieldSchema(String alias, LogicalSchema schema,  byte type, long uid) {
+            this.alias = alias;
+            this.type = type;
+            this.schema = schema;
+            this.uid = uid;
+        }
+        
+        /**
+         * Equality is defined as having the same type and either the same schema
+         * or both null schema.  Alias and uid are not checked.
+         */
+        public boolean isEqual(Object other) {
+            if (other instanceof LogicalFieldSchema) {
+                LogicalFieldSchema ofs = (LogicalFieldSchema)other;
+                if (type != ofs.type) return false;
+                if (schema == null && ofs.schema == null) return true;
+                if (schema == null) return false;
+                else return schema.isEqual(ofs.schema);
+            } else {
+                return false;
+            }
+        }
     }
+
+    
     
     private List<LogicalFieldSchema> fields;
     private Map<String, Integer> aliases;
@@ -51,8 +78,22 @@
      */
     public void addField(LogicalFieldSchema field) {
         fields.add(field);
-        if (field.alias != null && field.alias.equals("")) {
+        if (field.alias != null && !field.alias.equals("")) {
             aliases.put(field.alias, fields.size() - 1);
+            int index = 0;
+            while(index != -1) {
+                index = field.alias.indexOf("::", index);
+                if (index != -1) {
+                    String a = field.alias.substring(index+2);
+                    if (aliases.containsKey(a)) {
+                        aliases.remove(a);
+                    }else{
+                        aliases.put(a, fields.size()-1);                       
+                    }
+
+                    index = index +2;
+                }
+            }
         }
     }
     
@@ -62,9 +103,12 @@
      * @return field associated with alias, or null if no such field
      */
     public LogicalFieldSchema getField(String alias) {
-        Integer i = aliases.get(alias);
-        if (i == null) return null;
-        else return fields.get(i);
+        Integer index = aliases.get(alias);
+        if (index == null) {
+            return null;
+        }
+
+        return fields.get(index);
     }
 
     /**
@@ -88,8 +132,26 @@
      * Get the size of the schema.
      * @return size
      */
-    public Integer size() {
-       return null;
+    public int size() {
+       return fields.size();
+    }
+    
+    /**
+     * Two schemas are equal if they are of equal size and their fields
+     * schemas considered in order are equal.
+     */
+    public boolean isEqual(Object other) {
+        if (other != null && other instanceof LogicalSchema) {
+            LogicalSchema os = (LogicalSchema)other;
+            if (size() != os.size()) return false;
+            for (int i = 0; i < size(); i++) {
+                if (!getField(i).isEqual(os.getField(i))) return false;
+            }
+            return true;
+        } else {
+            return false;
+        }
+        
     }
     
     /**

Added: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/FilterAboveForeach.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/FilterAboveForeach.java?rev=909165&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/FilterAboveForeach.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/FilterAboveForeach.java Thu Feb 11 22:12:36 2010
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.experimental.logical.rules;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import org.apache.pig.data.DataType;
+import org.apache.pig.experimental.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.experimental.logical.expression.ProjectExpression;
+import org.apache.pig.experimental.logical.relational.LOFilter;
+import org.apache.pig.experimental.logical.relational.LOForEach;
+import org.apache.pig.experimental.logical.relational.LOGenerate;
+import org.apache.pig.experimental.logical.relational.LogicalPlan;
+import org.apache.pig.experimental.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.experimental.logical.relational.LogicalSchema;
+import org.apache.pig.experimental.plan.Operator;
+import org.apache.pig.experimental.plan.OperatorPlan;
+import org.apache.pig.experimental.plan.OperatorSubPlan;
+import org.apache.pig.experimental.plan.optimizer.Rule;
+import org.apache.pig.experimental.plan.optimizer.Transformer;
+import org.apache.pig.impl.util.Pair;
+
+public class FilterAboveForeach extends Rule {
+
+    public FilterAboveForeach(String n) {
+        super(n);
+    }
+
+    @Override
+    protected OperatorPlan buildPattern() {
+        // the pattern that this rule looks for
+        // is foreach -> flatten -> filter
+        LogicalPlan plan = new LogicalPlan();
+        LogicalRelationalOperator foreach = new LOForEach(plan);
+        LogicalRelationalOperator filter = new LOFilter(plan);
+        
+        plan.add(foreach);
+        plan.add(filter);
+        plan.connect(foreach, filter);
+
+        return plan;
+    }
+
+    @Override
+    public Transformer getNewTransformer() {
+        return new FilterAboveFlattenTransformer();
+    }
+    
+    public class FilterAboveFlattenTransformer extends Transformer {
+
+        LOFilter filter = null;
+        LOForEach foreach = null;
+        LogicalRelationalOperator forEachPred = null;
+        OperatorSubPlan subPlan = null;
+        
+        @Override
+        public boolean check(OperatorPlan matched) throws IOException {
+            Iterator<Operator> iter = matched.getOperators();
+            while( iter.hasNext() ) {
+                Operator op = iter.next();
+                if( op instanceof LOForEach ) {
+                    foreach = (LOForEach)op;
+                    break;
+                }
+            }
+            
+            // This would be a strange case
+            if( foreach == null ) return false;
+            
+            List<Operator> sinks = foreach.getInnerPlan().getSinks();            
+            if( ! ( sinks.size() == 1 && (sinks.get(0) instanceof LOGenerate ) ) ) {
+                return false;
+            }
+
+//            LOGenerate generate = (LOGenerate)sinks.get(0);
+//            // We check if we have any flatten
+//            // Other cases are handled by other Optimizers
+//            boolean hasFlatten = false;            
+//            for( boolean flattenFlag : generate.getFlattenFlags() ) {
+//                if( flattenFlag ) {
+//                    hasFlatten = true;
+//                    break;
+//                }
+//            }
+//
+//            if( !hasFlatten )
+//                return false;             
+            
+            iter = matched.getOperators();
+            while( iter.hasNext() ) {
+                Operator op = iter.next();
+                if( ( op instanceof LOFilter ) ) {
+                    filter = (LOFilter)op;
+                    break;
+                }
+            }
+            
+            // This is for cheating, we look up more than one filter in the plan
+            while( filter != null ) {
+                // Get uids of Filter
+                Set<Long> uids = getFilterProjectionUids(filter);
+
+
+                // See if the previous operators have uids from project
+                List<Operator> preds = currentPlan.getPredecessors(foreach);            
+                for(int j=0; j< preds.size(); j++) {
+                    LogicalRelationalOperator logRelOp = (LogicalRelationalOperator)preds.get(j);
+                    if (hasAll( logRelOp, uids) ) {
+                        // If any of the uids are of complex type then we 
+                        // cannot think about moving this filter.
+                        if( containsComplexType(logRelOp.getSchema(), uids ) ) {
+                            break;
+                        }
+                        forEachPred = (LogicalRelationalOperator) preds.get(j);
+                        return true;
+                    }
+                }
+                
+                // Chances are there are filters below this filter which can be
+                // moved up. So searching for those filters
+                List<Operator> successors = currentPlan.getSuccessors(filter);
+                if( successors != null && successors.size() > 0 && 
+                        successors.get(0) instanceof LOFilter ) {
+                    filter = (LOFilter)successors.get(0);
+                } else {
+                    filter = null;
+                }
+            }
+            return false;            
+        }
+        
+        private Set<Long> getFilterProjectionUids( LOFilter filter ) {
+            Set<Long> uids = new HashSet<Long>();
+            if( filter != null ) {
+                LogicalExpressionPlan filterPlan = filter.getFilterPlan();
+                Iterator<Operator> iter = filterPlan.getOperators();            
+                Operator op = null;
+                while( iter.hasNext() ) {
+                    op = iter.next();
+                    if( op instanceof ProjectExpression ) {
+                        uids.add(((ProjectExpression)op).getUid() );
+                    }
+                }
+            }
+            return uids;
+        }
+        
+        // check if a relational operator contains all of the specified uids
+        private boolean hasAll(LogicalRelationalOperator op, Set<Long> uids) {
+            LogicalSchema schema = op.getSchema();
+            List<LogicalSchema.LogicalFieldSchema> fields = schema.getFields();
+            Set<Long> all = new HashSet<Long>();
+            for(LogicalSchema.LogicalFieldSchema f:fields) {
+                all.add(f.uid);
+            }
+            return all.containsAll(uids);
+        }
+        
+        /**
+         * This function checks if any of the fields mentioned are a Bug or Tuple.
+         * If so we cannot move the filter above the operator having the schema
+         * @param schema Schema of the operator we are investigating
+         * @param uids Uids of the fields we are checking for
+         * @return true if one of the uid belong to a complex type
+         */
+        private boolean containsComplexType(LogicalSchema schema, Set<Long> uids) {
+            List<LogicalSchema.LogicalFieldSchema> fields = schema.getFields();
+
+            for(LogicalSchema.LogicalFieldSchema f:fields) {
+                if ( ( f.type == DataType.BAG || f.type == DataType.TUPLE ) ) {
+                    if( uids.contains( f.uid ) ) {
+                        return true;
+                    }
+                    if( f.schema != null && containsComplexType(f.schema, uids) ) {
+                        return true;
+                    }
+                }
+            }
+            return false;
+        }
+
+        @Override
+        public OperatorPlan reportChanges() {            
+            return subPlan;
+        }
+
+        @Override
+        public void transform(OperatorPlan matched) throws IOException {
+            
+            List<Operator> opSet = currentPlan.getPredecessors(filter);
+            if( ! ( opSet != null && opSet.size() > 0 ) ) {
+                return;
+            }
+            Operator filterPred = opSet.get(0);
+            
+            opSet = currentPlan.getSuccessors(filter);
+            if( ! ( opSet != null && opSet.size() > 0 ) ) {
+                return;
+            }
+            Operator filterSuc = opSet.get(0);
+            
+            subPlan = new OperatorSubPlan(currentPlan);
+            
+            // Steps below do the following
+            /*
+             *          ForEachPred
+             *               |
+             *            ForEach
+             *               |
+             *             Filter*
+             *               |
+             *           FilterPred                 
+             *  ( has to be a Filter or ForEach )
+             *               |
+             *             Filter
+             *               |
+             *            FilterSuc
+             *              
+             *               |
+             *               |
+             *        Transforms into 
+             *               |
+             *              \/            
+             *                      
+             *            ForEachPred
+             *               |
+             *            Filter
+             *               |
+             *            ForEach
+             *               |
+             *             Filter*
+             *               |
+             *           FilterPred                 
+             *  ( has to be a Filter or ForEach )
+             *               |
+             *            FilterSuc
+             */
+            
+            Pair<Integer, Integer> forEachPredPlaces = currentPlan.disconnect(forEachPred, foreach);
+            Pair<Integer, Integer> filterPredPlaces = currentPlan.disconnect(filterPred, filter);
+            Pair<Integer, Integer> filterSucPlaces = currentPlan.disconnect(filter, filterSuc);
+            
+            currentPlan.connect(forEachPred, forEachPredPlaces.first, filter, filterPredPlaces.second);
+            currentPlan.connect(filter, filterSucPlaces.first, foreach, forEachPredPlaces.second);
+            currentPlan.connect(filterPred, filterPredPlaces.first, filterSuc, filterSucPlaces.second);
+            
+            subPlan.add(forEachPred);
+            subPlan.add(foreach);
+            subPlan.add(filterPred);
+            subPlan.add(filter);
+            subPlan.add(filterSuc);
+        }
+    }
+
+}

Added: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/MergeFilter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/MergeFilter.java?rev=909165&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/MergeFilter.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/MergeFilter.java Thu Feb 11 22:12:36 2010
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.experimental.logical.rules;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.pig.experimental.logical.expression.AndExpression;
+import org.apache.pig.experimental.logical.expression.LogicalExpression;
+import org.apache.pig.experimental.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.experimental.logical.relational.LOFilter;
+import org.apache.pig.experimental.logical.relational.LogicalPlan;
+import org.apache.pig.experimental.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.experimental.plan.Operator;
+import org.apache.pig.experimental.plan.OperatorPlan;
+import org.apache.pig.experimental.plan.OperatorSubPlan;
+import org.apache.pig.experimental.plan.optimizer.Rule;
+import org.apache.pig.experimental.plan.optimizer.Transformer;
+import org.apache.pig.impl.util.Pair;
+
+public class MergeFilter extends Rule {
+
+    public MergeFilter(String n) {
+        super(n);       
+    }
+
+    @Override
+    public Transformer getNewTransformer() {        
+        return new MergeFilterTransformer();
+    }
+
+    public class MergeFilterTransformer extends Transformer {
+
+        private OperatorSubPlan subPlan;
+
+        @Override
+        public boolean check(OperatorPlan matched) throws IOException {           
+            LOFilter filter = (LOFilter)matched.getSources().get(0);
+            List<Operator> succeds = currentPlan.getSuccessors(filter);
+            // if this filter is followed by another filter, we should combine them
+            if (succeds != null && succeds.size() == 1) {
+                if (succeds.get(0) instanceof LOFilter) {
+                    return true;
+                }
+            }
+            return false;
+        }
+
+        @Override
+        public void transform(OperatorPlan matched) throws IOException {     
+            subPlan = new OperatorSubPlan(currentPlan);
+            
+            LOFilter filter = (LOFilter)matched.getSources().get(0);
+
+            subPlan.add(filter);
+            
+            List<Operator> succeds = currentPlan.getSuccessors(filter);
+            if (succeds != null && succeds.size()== 1 && (succeds.get(0) instanceof LOFilter)) {
+                LOFilter next = (LOFilter)succeds.get(0);
+                combineFilterCond(filter, next);
+                Pair<Integer, Integer> p1 = currentPlan.disconnect(filter, next);
+                List<Operator> ll = currentPlan.getSuccessors(next);
+                if (ll!= null && ll.size()>0) {
+                    Operator op = ll.get(0);
+                    Pair<Integer, Integer> p2 = currentPlan.disconnect(next, op);
+                    currentPlan.connect(filter, p1.first, op, p2.second);
+                    subPlan.add(op);
+                }
+                
+                currentPlan.remove(next);
+            }                            
+        }        
+        
+        @Override
+        public OperatorPlan reportChanges() {          
+            return subPlan;
+        }
+        
+        // combine the condition of two filters. The condition of second filter
+        // is added into the condition of first filter with an AND operator.
+        private void combineFilterCond(LOFilter f1, LOFilter f2) throws IOException {
+            LogicalExpressionPlan p1 = f1.getFilterPlan();
+            LogicalExpressionPlan p2 = f2.getFilterPlan();
+            LogicalExpressionPlan andPlan = new LogicalExpressionPlan();
+            
+            // add existing operators          
+            Iterator<Operator> iter = p1.getOperators();
+            while(iter.hasNext()) {
+                andPlan.add(iter.next());
+            }
+            
+            iter = p2.getOperators();
+            while(iter.hasNext()) {
+                andPlan.add(iter.next());
+            }
+            
+            // add all connections
+            iter = p1.getOperators();
+            while(iter.hasNext()) {
+                Operator n = iter.next();
+                List<Operator> l = p1.getPredecessors(n);
+                if (l != null) {
+                    for(Operator op: l) {
+                        andPlan.connect(op, n);
+                    }
+                }
+            }
+            
+            iter = p2.getOperators();
+            while(iter.hasNext()) {
+                Operator n = iter.next();
+                List<Operator> l = p2.getPredecessors(n);
+                if (l != null) {
+                    for(Operator op: l) {
+                        andPlan.connect(op, n);
+                    }
+                }
+            }          
+            
+            // create an AND
+            new AndExpression(andPlan, (LogicalExpression)p1.getSources().get(0), (LogicalExpression)p2.getSources().get(0));          
+            
+            f1.setFilterPlan(andPlan);
+        }
+
+    }
+
+    @Override
+    protected OperatorPlan buildPattern() {        
+        // the pattern that this rule looks for
+        // is filter operator
+        LogicalPlan plan = new LogicalPlan();
+        LogicalRelationalOperator op = new LOFilter(plan);
+        plan.add(op);        
+        
+        return plan;
+    }
+}
+

Added: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/PushUpFilter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/PushUpFilter.java?rev=909165&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/PushUpFilter.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/PushUpFilter.java Thu Feb 11 22:12:36 2010
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.experimental.logical.rules;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.pig.experimental.logical.expression.ProjectExpression;
+import org.apache.pig.experimental.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.experimental.logical.relational.LOFilter;
+import org.apache.pig.experimental.logical.relational.LOJoin;
+import org.apache.pig.experimental.logical.relational.LogicalPlan;
+import org.apache.pig.experimental.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.experimental.logical.relational.LogicalSchema;
+import org.apache.pig.experimental.plan.Operator;
+import org.apache.pig.experimental.plan.OperatorPlan;
+import org.apache.pig.experimental.plan.OperatorSubPlan;
+import org.apache.pig.experimental.plan.optimizer.Rule;
+import org.apache.pig.experimental.plan.optimizer.Transformer;
+import org.apache.pig.impl.util.Pair;
+
+public class PushUpFilter extends Rule {
+    
+    public PushUpFilter(String n) {
+        super(n);       
+    }
+
+    @Override
+    public Transformer getNewTransformer() {        
+        return new PushUpFilterTransformer();
+    }
+
+    public class PushUpFilterTransformer extends Transformer {
+
+        private OperatorSubPlan subPlan;
+
+        @Override
+        public boolean check(OperatorPlan matched) throws IOException {   
+            // check if it is inner join
+            LOJoin join = (LOJoin)matched.getSources().get(0);
+            boolean[] innerFlags = join.getInnerFlags();
+            for(boolean inner: innerFlags) {
+                if (!inner){
+                    return false;
+                }
+            }
+           
+            Operator next = matched.getSinks().get(0);
+            while(next != null && next instanceof LOFilter) {
+                LOFilter filter = (LOFilter)next;            
+                LogicalExpressionPlan filterPlan = filter.getFilterPlan();
+                
+                // collect all uids used in the filter plan
+                Set<Long> uids = new HashSet<Long>();
+                Iterator<Operator> iter = filterPlan.getOperators();
+                while(iter.hasNext()) {
+                    Operator op = iter.next();
+                    if (op instanceof ProjectExpression) {
+                        long uid = ((ProjectExpression)op).getUid();
+                        uids.add(uid);
+                    }
+                }
+                                
+                List<Operator> preds = currentPlan.getPredecessors(join);
+                            
+                for(int j=0; j<preds.size(); j++) {
+                    if (hasAll((LogicalRelationalOperator)preds.get(j), uids)) {                            
+                        return true;
+                    }
+                }                       
+             
+                // if current filter can not move up, check next filter
+                List<Operator> l = currentPlan.getSuccessors(filter);
+                if (l != null) {
+                    next = l.get(0);
+                } else {
+                    next = null;
+                }
+            }
+            
+            return false;
+        }
+
+        @Override
+        public void transform(OperatorPlan matched) throws IOException {
+            subPlan = new OperatorSubPlan(currentPlan);
+
+            LOJoin join = (LOJoin)matched.getSources().get(0);
+            subPlan.add(join);     
+            
+            Operator next = matched.getSinks().get(0);
+            while(next != null && next instanceof LOFilter) {
+                LOFilter filter = (LOFilter)next;                
+                subPlan.add(filter);
+                
+                LogicalExpressionPlan filterPlan = filter.getFilterPlan();
+                
+                // collect all uids used in the filter plan
+                Set<Long> uids = new HashSet<Long>();
+                Iterator<Operator> iter = filterPlan.getOperators();
+                while(iter.hasNext()) {
+                    Operator op = iter.next();
+                    if (op instanceof ProjectExpression) {
+                        long uid = ((ProjectExpression)op).getUid();
+                        uids.add(uid);
+                    }
+                }
+                
+                // find the farthest predecessor that has all the fields
+                LogicalRelationalOperator input = join;
+                List<Operator> preds = currentPlan.getPredecessors(input);
+                while(preds != null) {                
+                    boolean found = false;
+                    for(int j=0; j<preds.size(); j++) {
+                        if (hasAll((LogicalRelationalOperator)preds.get(j), uids)) {
+                            input = (LogicalRelationalOperator)preds.get(j);   
+                            subPlan.add(input);
+                            found = true;
+                            break;
+                        }
+                    }
+                    if (!found) {
+                        break;
+                    }
+                    preds = currentPlan.getPredecessors(input);
+                }
+                            
+                if (input != join) {                           
+                    Operator pred = currentPlan.getPredecessors(filter).get(0);
+                    Operator succed = currentPlan.getSuccessors(filter).get(0);
+                    subPlan.add(succed);
+                    
+                    Pair<Integer, Integer> p1 = currentPlan.disconnect(pred, filter);
+                    Pair<Integer, Integer> p2 = currentPlan.disconnect(filter, succed);
+                    currentPlan.connect(pred, p1.first, succed, p2.second);
+                    
+                    succed = currentPlan.getSuccessors(input).get(0);
+                    Pair<Integer, Integer> p3 = currentPlan.disconnect(input, succed);
+                    currentPlan.connect(input, p3.first, filter, 0);
+                    currentPlan.connect(filter, 0, succed, p3.second);                                        
+                    
+                    return;
+                }  
+                
+                List<Operator> l = currentPlan.getSuccessors(filter);
+                if (l != null) {
+                    next = l.get(0);
+                } else {
+                    next = null;
+                }                         
+            }
+        }
+        
+        // check if a relational operator contains all of the specified uids
+        private boolean hasAll(LogicalRelationalOperator op, Set<Long> uids) {
+            LogicalSchema schema = op.getSchema();
+            List<LogicalSchema.LogicalFieldSchema> fields = schema.getFields();
+            Set<Long> all = new HashSet<Long>();
+            for(LogicalSchema.LogicalFieldSchema f:fields) {
+                all.add(f.uid);
+            }
+            
+            return all.containsAll(uids);
+        }
+           
+        @Override
+        public OperatorPlan reportChanges() {            
+            return subPlan;
+        }          
+
+    }
+
+    @Override
+    protected OperatorPlan buildPattern() {        
+        // the pattern that this rule looks for
+        // is join -> filter
+        LogicalPlan plan = new LogicalPlan();
+        LogicalRelationalOperator op1 = new LOJoin(plan);
+        LogicalRelationalOperator op2 = new LOFilter(plan);
+        plan.add(op1);
+        plan.add(op2);
+        plan.connect(op1, op2);
+        
+        return plan;
+    }
+}
+

Added: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/SplitFilter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/SplitFilter.java?rev=909165&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/SplitFilter.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/SplitFilter.java Thu Feb 11 22:12:36 2010
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.experimental.logical.rules;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.pig.experimental.logical.expression.AndExpression;
+import org.apache.pig.experimental.logical.expression.LogicalExpression;
+import org.apache.pig.experimental.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.experimental.logical.relational.LOFilter;
+import org.apache.pig.experimental.logical.relational.LogicalPlan;
+import org.apache.pig.experimental.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.experimental.plan.Operator;
+import org.apache.pig.experimental.plan.OperatorPlan;
+import org.apache.pig.experimental.plan.OperatorSubPlan;
+import org.apache.pig.experimental.plan.optimizer.Rule;
+import org.apache.pig.experimental.plan.optimizer.Transformer;
+import org.apache.pig.impl.util.Pair;
+
+public class SplitFilter extends Rule {    
+
+    public SplitFilter(String n) {
+        super(n);       
+    }
+
+    @Override
+    public Transformer getNewTransformer() {        
+        return new SplitFilterTransformer();
+    }
+
+    public class SplitFilterTransformer extends Transformer {
+        private OperatorSubPlan subPlan;
+
+        @Override
+        public boolean check(OperatorPlan matched) throws IOException {
+            LOFilter filter = (LOFilter)matched.getSources().get(0);
+            LogicalExpressionPlan cond = filter.getFilterPlan();
+            LogicalExpression root = (LogicalExpression) cond.getSources().get(0);
+            if (root instanceof AndExpression) {
+                return true;
+            }
+            
+            return false;
+        }
+
+        @Override
+        public void transform(OperatorPlan matched) throws IOException {
+            subPlan = new OperatorSubPlan(currentPlan);
+            
+            // split one LOFilter into 2 by "AND"
+            LOFilter filter = (LOFilter)matched.getSources().get(0);
+            LogicalExpressionPlan cond = filter.getFilterPlan();
+            LogicalExpression root = (LogicalExpression) cond.getSources().get(0);
+            if (!(root instanceof AndExpression)) {
+                return;
+            }
+            LogicalExpressionPlan op1 = new LogicalExpressionPlan();
+            op1.add((LogicalExpression)cond.getSuccessors(root).get(0));
+            fillSubPlan(cond, op1, (LogicalExpression)cond.getSuccessors(root).get(0));
+            
+            LogicalExpressionPlan op2 = new LogicalExpressionPlan();
+            op2.add((LogicalExpression)cond.getSuccessors(root).get(1));
+            fillSubPlan(cond, op2, (LogicalExpression)cond.getSuccessors(root).get(1));
+            
+            filter.setFilterPlan(op1);
+            LOFilter filter2 = new LOFilter((LogicalPlan)currentPlan, op2);
+            currentPlan.add(filter2);
+            
+            Operator succed = null;
+            try {
+                List<Operator> succeds = currentPlan.getSuccessors(filter);
+                if (succeds != null) {
+                    succed = succeds.get(0);
+                    subPlan.add(succed);
+                    Pair<Integer, Integer> p = currentPlan.disconnect(filter, succed);
+                    currentPlan.connect(filter2, 0, succed, p.second);
+                    currentPlan.connect(filter, p.first, filter2, 0); 
+                } else {
+                    currentPlan.connect(filter, 0, filter2, 0); 
+                }
+            }catch(Exception e) {
+                throw new IOException(e);
+            }                       
+            
+            subPlan.add(filter);
+            subPlan.add(filter2);            
+        }
+        
+        @Override
+        public OperatorPlan reportChanges() {
+            return subPlan;
+        }
+        
+        private void fillSubPlan(OperatorPlan origPlan, 
+                OperatorPlan subPlan, Operator startOp) throws IOException {
+                       
+            List<Operator> l = origPlan.getSuccessors(startOp);
+            if (l != null) {
+                for(Operator le: l) {
+                    subPlan.add(le);
+                    subPlan.connect(startOp, le);
+                    fillSubPlan(origPlan, subPlan, le);
+                }            
+            }
+        }
+
+    }
+
+    @Override
+    protected OperatorPlan buildPattern() {        
+        // the pattern that this rule looks for
+        // is filter
+        LogicalPlan plan = new LogicalPlan();      
+        LogicalRelationalOperator op2 = new LOFilter(plan);
+        plan.add(op2);
+        
+        return plan;
+    }
+}
+