You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2010/03/15 04:28:28 UTC

svn commit: r923043 [2/5] - in /hadoop/pig/trunk: src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/experimental/logical/ src/org/apache/pig/experimental/logical/expression/ src/org/apache/pig/experimental/logical/optimizer/ src/org...

Added: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOCogroup.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOCogroup.java?rev=923043&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOCogroup.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOCogroup.java Mon Mar 15 03:28:27 2010
@@ -0,0 +1,319 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.experimental.logical.relational;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.pig.data.DataType;
+import org.apache.pig.experimental.logical.expression.LogicalExpression;
+import org.apache.pig.experimental.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.experimental.logical.expression.ProjectExpression;
+import org.apache.pig.experimental.logical.relational.LogicalSchema.LogicalFieldSchema;
+import org.apache.pig.experimental.plan.Operator;
+import org.apache.pig.experimental.plan.OperatorPlan;
+import org.apache.pig.experimental.plan.PlanVisitor;
+import org.apache.pig.impl.util.MultiMap;
+
+public class LOCogroup extends LogicalRelationalOperator {
+    
+    // List of booleans specifying if any of the cogroups is inner
+    private boolean[] mIsInner;
+    
+    // List of expressionPlans according to input
+    private MultiMap<Integer,LogicalExpressionPlan> mExpressionPlans;
+    
+    /**
+     * Enum for the type of group
+     */
+    public static enum GROUPTYPE {
+        REGULAR,    // Regular (co)group
+        COLLECTED   // Collected group
+    };
+    
+    private GROUPTYPE mGroupType;
+    
+    /*
+     * This member would store the schema of group key,
+     * we store it to retain uid numbers between 
+     * resetSchema and getSchema 
+     */
+    private LogicalFieldSchema groupKeySchema = null;
+    /*
+     * This is a map storing Uids which have been generated for an input
+     * This map is required to make the uids persistant between calls of
+     * resetSchema and getSchema
+     */
+    private Map<Integer,Long> generatedInputUids = null;
+    
+    final static String GROUP_COL_NAME = "group";
+        
+    public LOCogroup(OperatorPlan plan, MultiMap<Integer,LogicalExpressionPlan> 
+    expressionPlans, boolean[] isInner ) {
+        this( plan, expressionPlans, GROUPTYPE.REGULAR, isInner, -1 );
+    }
+
+    public LOCogroup(OperatorPlan plan, MultiMap<Integer,LogicalExpressionPlan> 
+    expressionPlans, GROUPTYPE groupType, boolean[] isInner, int requestedParrellism) {
+        super("LOCogroup", plan);
+        this.mExpressionPlans = expressionPlans;
+        if( isInner != null ) {
+            mIsInner = Arrays.copyOf(isInner, isInner.length);
+        }
+        this.mGroupType = groupType;
+        this.generatedInputUids = new HashMap<Integer,Long>();
+    }
+    
+    /**
+     * Given an expression plan this function returns a LogicalFieldSchema
+     * that can be generated using this expression plan
+     * @param exprPlan ExpressionPlan which generates this field
+     * @return
+     */
+    private LogicalFieldSchema getPlanSchema( LogicalExpressionPlan exprPlan ) {
+        LogicalExpression sourceExp = (LogicalExpression) exprPlan.getSources().get(0);
+        byte sourceType = sourceExp.getType();
+        // We dont support bags for Cogroup
+        if( sourceType == DataType.BAG ) {
+            return null;
+        }
+        LogicalSchema fieldSchema = null;
+        String alias = null;
+
+        // If we have a projection then caculate the schema of the projection
+        if (sourceExp instanceof ProjectExpression) {                
+            LogicalRelationalOperator op = null;
+            try{
+                op = ((ProjectExpression)sourceExp).findReferent(this);
+            }catch(Exception e) {
+                throw new RuntimeException(e);
+            }
+            LogicalSchema s = op.getSchema();
+            if (s != null) {
+                fieldSchema = s.getField(((ProjectExpression)sourceExp).getColNum()).schema;
+                alias = s.getField(((ProjectExpression)sourceExp).getColNum()).alias;
+            }
+        }
+        
+        return new LogicalFieldSchema(alias, fieldSchema, sourceType, sourceExp.getUid());
+    }
+
+    @Override
+    public LogicalSchema getSchema() {
+        // if schema is calculated before, just return
+        if (schema != null) {
+            return schema;
+        }
+
+        List<Operator> inputs = null;
+        try {
+            inputs = plan.getPredecessors(this);
+            if (inputs == null) {
+                return null;
+            }
+        }catch(Exception e) {
+            throw new RuntimeException("Unable to get predecessors of " + name 
+                    + " operator. ", e);
+        }
+
+        List<LogicalFieldSchema> fieldSchemaList = new ArrayList<LogicalFieldSchema>();
+
+        // We only calculate this if we havent. This would not be null in a case
+        // where we calculate the schema and then reset it.
+        if( groupKeySchema == null ) {
+            // See if we have more than one expression plans, if so the
+            // schema of the group column will be a tuple
+            boolean hasMultipleKeys = false;
+            for( Integer key : mExpressionPlans.keySet() ) {
+                if( mExpressionPlans.get(key).size() > 1 ) {
+                    hasMultipleKeys = true;
+                    break;
+                }
+            }
+
+            // Generate the groupField Schema
+            if( hasMultipleKeys ) {
+                LogicalSchema keySchema = new LogicalSchema();
+                // We sort here to maintain the correct order of inputs
+                TreeSet<Integer> keySet = new TreeSet<Integer>();
+                keySet.addAll( mExpressionPlans.keySet() );
+                for( Integer key : keySet ) {
+                    Collection<LogicalExpressionPlan> plans = 
+                        mExpressionPlans.get(key);
+
+                    for( LogicalExpressionPlan plan : plans ) {
+                        LogicalFieldSchema fieldSchema = getPlanSchema(plan);
+                        // if any plan schema is null, that means we can't calculate
+                        // further schemas so we bail out
+                        if( fieldSchema == null ) {
+                            schema = null;
+                            return schema;
+                        }
+                        // Change the uid of this field
+                        fieldSchema.uid = LogicalExpression.getNextUid();
+                        keySchema.addField(fieldSchema);
+                    }
+                    // We only need fields from one input and not all
+                    break;
+                }
+                groupKeySchema = new LogicalFieldSchema(GROUP_COL_NAME, keySchema, DataType.TUPLE, 
+                        LogicalExpression.getNextUid() );
+            } else {
+                // We sort here to maintain the correct order of inputs
+                TreeSet<Integer> keySet = new TreeSet<Integer>();
+                keySet.addAll( mExpressionPlans.keySet() );
+                for( Integer key : keySet ) {
+                    Collection<LogicalExpressionPlan> plans = mExpressionPlans.get(key);
+                    for( LogicalExpressionPlan plan : plans ) {
+                        groupKeySchema = getPlanSchema(plan);
+                        // if any plan schema is null, that means we can't calculate
+                        // further schemas so we bail out
+                        if( groupKeySchema == null ) {
+                            schema = null;
+                            return schema;
+                        }
+                        // Change the uid of this field
+                        groupKeySchema.alias = GROUP_COL_NAME;
+                        groupKeySchema.uid = LogicalExpression.getNextUid();
+                        break;
+                    }
+                    break;
+                }
+            }
+        }
+        
+        fieldSchemaList.add( groupKeySchema );
+
+        // Generate the Bag Schema
+        int counter = 0;
+        for (Operator op : inputs) {
+            LogicalSchema inputSchema = ((LogicalRelationalOperator)op).getSchema();
+            // the schema of one input is unknown, so the join schema is unknown, just return 
+            if (inputSchema == null) {
+                schema = null;
+                return schema;
+            }
+           
+            // Check if we already have calculated Uid for this bag for given 
+            // input operator
+            long bagUid = -1;
+            if( generatedInputUids.containsKey(counter) ) {
+                bagUid = generatedInputUids.get(counter);
+            } else {
+                bagUid = LogicalExpression.getNextUid();
+                generatedInputUids.put( counter, bagUid );
+            }
+            
+            LogicalFieldSchema newBagSchema = new LogicalFieldSchema(
+                    ((LogicalRelationalOperator)op).getAlias(), inputSchema, 
+                    DataType.BAG, bagUid);
+
+            fieldSchemaList.add( newBagSchema );
+            counter ++;
+        }
+
+        schema = new LogicalSchema();
+        for(LogicalFieldSchema fieldSchema: fieldSchemaList) {
+            schema.addField(fieldSchema);
+        }         
+
+        return schema;
+    }
+
+    @Override
+    public void accept(PlanVisitor v) throws IOException {
+        if (!(v instanceof LogicalPlanVisitor)) {
+            throw new IOException("Expected LogicalPlanVisitor");
+        }
+        ((LogicalPlanVisitor)v).visitLOCogroup(this);
+    }
+
+    @Override
+    public boolean isEqual(Operator other) {
+        if (other != null && other instanceof LOCogroup) {
+            LOCogroup oc = (LOCogroup)other;
+            if( mGroupType == oc.mGroupType && 
+                    mIsInner.length == oc.mIsInner.length 
+                    && mExpressionPlans.size() == oc.mExpressionPlans.size() ) {
+                for( int i = 0; i < mIsInner.length; i++ ) {
+                    if( mIsInner[i] != oc.mIsInner[i] ) {
+                        return false;
+                    }
+                }
+                for( Integer key : mExpressionPlans.keySet() ) {                    
+                    if( ! oc.mExpressionPlans.containsKey(key) ) {
+                        return false;
+                    }
+                    Collection<LogicalExpressionPlan> exp1 = 
+                        mExpressionPlans.get(key);
+                    Collection<LogicalExpressionPlan> exp2 = 
+                        oc.mExpressionPlans.get(key);
+
+                    if(! ( exp1 instanceof ArrayList<?> 
+                    || exp2 instanceof ArrayList<?> ) ) {
+                        throw new RuntimeException( "Expected an ArrayList " +
+                        "of Expression Plans" );
+                    }
+
+                    ArrayList<LogicalExpressionPlan> expList1 = 
+                        (ArrayList<LogicalExpressionPlan>) exp1;
+                    ArrayList<LogicalExpressionPlan> expList2 = 
+                        (ArrayList<LogicalExpressionPlan>) exp2;
+
+                    for (int i = 0; i < expList1.size(); i++) {
+                        if (!expList1.get(i).isEqual(expList2.get(i))) {
+                            return false;
+                        }
+                    }
+                }
+                return checkEquality((LogicalRelationalOperator) other);
+            }
+        }
+        return false;
+    }
+
+    public GROUPTYPE getGroupType() {
+        return mGroupType;
+    }
+    
+    /**
+     * Returns an Unmodifiable Map of Input Number to Uid 
+     * @return Unmodifiable Map<Integer,Long>
+     */
+    public Map<Integer,Long> getGeneratedInputUids() {
+        return Collections.unmodifiableMap( generatedInputUids );
+    }
+    
+    public MultiMap<Integer,LogicalExpressionPlan> getExpressionPlans() {
+        return mExpressionPlans;
+    }
+    
+    public boolean[] getInner() {
+        return mIsInner;
+    }
+
+}

Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOGenerate.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOGenerate.java?rev=923043&r1=923042&r2=923043&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOGenerate.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOGenerate.java Mon Mar 15 03:28:27 2010
@@ -63,6 +63,13 @@ public class LOGenerate extends LogicalR
                 }
                 LogicalSchema s = op.getSchema();
                 if (s != null) {
+                    if (((ProjectExpression)exp).isProjectStar()) {
+                        for(LogicalFieldSchema f: s.getFields()) {
+                            schema.addField(f);
+                        }
+                        continue;
+                    }
+                    
                     fieldSchema = s.getField(((ProjectExpression)exp).getColNum()).schema;
                     alias = s.getField(((ProjectExpression)exp).getColNum()).alias;
                 }
@@ -114,6 +121,10 @@ public class LOGenerate extends LogicalR
         return flattenFlags;
     }
     
+    public void setFlattenFlags(boolean[] flatten) {
+        flattenFlags = flatten;
+    }
+    
     @Override
     public boolean isEqual(Operator other) {
         if (!(other instanceof LOGenerate)) {

Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOInnerLoad.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOInnerLoad.java?rev=923043&r1=923042&r2=923043&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOInnerLoad.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOInnerLoad.java Mon Mar 15 03:28:27 2010
@@ -57,27 +57,27 @@ public class LOInnerLoad extends Logical
             LogicalRelationalOperator op = (LogicalRelationalOperator)p.getPredecessors(foreach).get(0);
             LogicalSchema s = op.getSchema();
             if (s != null) {
-                schema = new LogicalSchema();      
-                long uid = prj.getUid();
-                for(int i=0; i<s.size(); i++) {
-                    if (uid == s.getField(i).uid) {
-                        schema.addField(s.getField(i));
-                    }
+                if (prj.isProjectStar()) {
+                    schema = s;
+                } else {
+                    schema = new LogicalSchema();                  
+                    schema.addField(s.getField(getColNum()));           
                 }
-            }
+            }            
             
             if ( schema != null && schema.size() == 0) {
                 schema = null;
             }
         }catch(Exception e) {
+            e.printStackTrace();
             throw new RuntimeException(e);
         }
         
         return schema;
     }
     
-    public LogicalExpressionPlan getExpression() {
-        return (LogicalExpressionPlan)prj.getPlan();
+    public ProjectExpression getProjection() {
+        return prj;
     }
 
     @Override

Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOLoad.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOLoad.java?rev=923043&r1=923042&r2=923043&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOLoad.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOLoad.java Mon Mar 15 03:28:27 2010
@@ -31,7 +31,7 @@ public class LOLoad extends LogicalRelat
     
     private LogicalSchema scriptSchema;
     private FileSpec fs;
-    private transient LoadPushDown loadFunc;
+    private transient LoadFunc loadFunc;
 
     /**
      * 
@@ -46,13 +46,11 @@ public class LOLoad extends LogicalRelat
        fs = loader;
     }
     
-    public LoadPushDown getLoadPushDown() {
+    public LoadFunc getLoadFunc() {
         try { 
             if (loadFunc == null) {
-                Object obj = PigContext.instantiateFuncFromSpec(fs.getFuncSpec());
-                if (obj instanceof LoadPushDown) {
-                    loadFunc = (LoadPushDown)obj;
-                }
+                loadFunc = (LoadFunc)PigContext.instantiateFuncFromSpec(fs.getFuncSpec());
+                loadFunc.setUDFContextSignature(getAlias());               
             }
             
             return loadFunc;

Added: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOSplit.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOSplit.java?rev=923043&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOSplit.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOSplit.java Mon Mar 15 03:28:27 2010
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.experimental.logical.relational;
+
+import java.io.IOException;
+
+import org.apache.pig.experimental.plan.Operator;
+import org.apache.pig.experimental.plan.OperatorPlan;
+import org.apache.pig.experimental.plan.PlanVisitor;
+
+public class LOSplit extends LogicalRelationalOperator {
+
+    public LOSplit(OperatorPlan plan) {
+        super("LOSplit", plan);
+    }
+
+    @Override
+    public LogicalSchema getSchema() {
+        LogicalRelationalOperator input = null;
+        try {
+            input = (LogicalRelationalOperator)plan.getPredecessors(this).get(0);
+        }catch(Exception e) {
+            throw new RuntimeException("Unable to get predecessor of LOSplit.", e);
+        }
+        
+        schema = input.getSchema();
+        return schema;
+    }
+
+    @Override
+    public void accept(PlanVisitor v) throws IOException {
+        if (!(v instanceof LogicalPlanVisitor)) {
+            throw new IOException("Expected LogicalPlanVisitor");
+        }
+        ((LogicalPlanVisitor)v).visitLOSplit(this);
+    }
+
+    @Override
+    public boolean isEqual(Operator other) {
+        if (other != null && other instanceof LOSplit) { 
+            return checkEquality((LOSplit)other);
+        } else {
+            return false;
+        }
+    }
+
+}

Added: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOSplitOutput.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOSplitOutput.java?rev=923043&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOSplitOutput.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOSplitOutput.java Mon Mar 15 03:28:27 2010
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.experimental.logical.relational;
+
+import java.io.IOException;
+
+import org.apache.pig.experimental.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.experimental.plan.Operator;
+import org.apache.pig.experimental.plan.PlanVisitor;
+
+public class LOSplitOutput extends LogicalRelationalOperator {
+    private LogicalExpressionPlan filterPlan;
+    public LOSplitOutput(LogicalPlan plan) {
+        super("LOSplitOutput", plan);       
+    }
+    
+    public LOSplitOutput(LogicalPlan plan, LogicalExpressionPlan filterPlan) {
+        super("LOSplitOutput", plan);
+        this.filterPlan = filterPlan;
+    }
+    
+    public LogicalExpressionPlan getFilterPlan() {
+        return filterPlan;
+    }
+    
+    public void setFilterPlan(LogicalExpressionPlan filterPlan) {
+        this.filterPlan = filterPlan;
+    }
+    
+    @Override
+    public LogicalSchema getSchema() {        
+        LogicalRelationalOperator input = null;
+        try {
+            input = (LogicalRelationalOperator)plan.getPredecessors(this).get(0);
+        }catch(Exception e) {
+            throw new RuntimeException("Unable to get predecessor of LOSplit.", e);
+        }
+        
+        schema = input.getSchema();        
+        return schema;
+    }   
+    
+    @Override
+    public void accept(PlanVisitor v) throws IOException {
+        if (!(v instanceof LogicalPlanVisitor)) {
+            throw new IOException("Expected LogicalPlanVisitor");
+        }
+        ((LogicalPlanVisitor)v).visitLOSplitOutput(this);
+    }
+    
+    @Override
+    public boolean isEqual(Operator other) {
+        if (other != null && other instanceof LOSplitOutput) { 
+            LOSplitOutput os = (LOSplitOutput)other;
+            return filterPlan.isEqual(os.filterPlan) && checkEquality(os);
+        } else {
+            return false;
+        }
+    }
+}

Added: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOUnion.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOUnion.java?rev=923043&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOUnion.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LOUnion.java Mon Mar 15 03:28:27 2010
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.experimental.logical.relational;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.pig.experimental.plan.Operator;
+import org.apache.pig.experimental.plan.OperatorPlan;
+import org.apache.pig.experimental.plan.PlanVisitor;
+
+public class LOUnion extends LogicalRelationalOperator {
+
+    public LOUnion(OperatorPlan plan) {
+        super("LOUnion", plan);
+    }    
+    @Override
+    public LogicalSchema getSchema() {
+        List<Operator> inputs = null;
+        try {
+            inputs = plan.getPredecessors(this);
+        }catch(Exception e) {
+            throw new RuntimeException("Unable to get predecessor of LOUnion.", e);
+        }
+        
+        // If any predecessor's schema is null, or length of predecessor's schema does not match,
+        // then the schema for union is null
+        int length = -1;
+        for (Operator input : inputs) {
+            LogicalRelationalOperator op = (LogicalRelationalOperator)input;
+            if (op.getSchema()==null)
+                return null;
+            if (length==-1)
+                op.getSchema().size();
+            else {
+                if (op.getSchema().size()!=length)
+                    return null;
+            }
+        }
+        
+        // Check if all predecessor's schema are compatible.
+        // TODO: Migrate all existing schema merging rules
+        LogicalSchema mergedSchema = ((LogicalRelationalOperator)inputs.get(0)).getSchema();
+        for (int i=1;i<inputs.size();i++) {
+            LogicalSchema otherSchema = ((LogicalRelationalOperator)inputs.get(i)).getSchema();
+            if (!mergedSchema.isEqual(otherSchema))
+                return null;
+        }
+        return mergedSchema;
+    }
+
+    @Override
+    public void accept(PlanVisitor v) throws IOException {
+        if (!(v instanceof LogicalPlanVisitor)) {
+            throw new IOException("Expected LogicalPlanVisitor");
+        }
+        ((LogicalPlanVisitor)v).visitLOUnion(this);
+    }
+
+    @Override
+    public boolean isEqual(Operator other) {
+        if (other != null && other instanceof LOUnion) { 
+            return checkEquality((LOUnion)other);
+        } else {
+            return false;
+        }
+    }
+
+}

Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogToPhyTranslationVisitor.java?rev=923043&r1=923042&r2=923043&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogToPhyTranslationVisitor.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogToPhyTranslationVisitor.java Mon Mar 15 03:28:27 2010
@@ -23,12 +23,15 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Stack;
+
+import org.apache.pig.FuncSpec;
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.LogicalToPhysicalTranslatorException;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
@@ -38,10 +41,14 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
+import org.apache.pig.builtin.BinStorage;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
+import org.apache.pig.experimental.logical.expression.BagDereferenceExpression;
 import org.apache.pig.experimental.logical.expression.ExpToPhyTranslationVisitor;
 import org.apache.pig.experimental.logical.expression.LogicalExpressionPlan;
 import org.apache.pig.experimental.logical.expression.ProjectExpression;
@@ -53,7 +60,10 @@ import org.apache.pig.experimental.plan.
 import org.apache.pig.experimental.plan.ReverseDependencyOrderWalker;
 import org.apache.pig.experimental.plan.SubtreeDependencyOrderWalker;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.LogicalOperator;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
 import org.apache.pig.impl.plan.NodeIdGenerator;
@@ -185,6 +195,7 @@ public class LogToPhyTranslationVisitor 
             exprOp.setResultType(s.getField(0).type);
         }
         exprOp.setColumn(load.getColNum());
+        exprOp.setStar(load.getProjection().isProjectStar());        
         
         // set input to POProject to the predecessor of foreach
         
@@ -221,8 +232,8 @@ public class LogToPhyTranslationVisitor 
             List<Operator> leaves = exps.get(i).getSinks();
             for(Operator l: leaves) {
                 PhysicalOperator op = logToPhyMap.get(l);
-                if (l instanceof ProjectExpression) {
-                    int input = ((ProjectExpression)l).getInputNum();
+                if (l instanceof ProjectExpression ) {
+                    int input = ((ProjectExpression)l).getInputNum();                    
                     
                     // for each sink projection, get its input logical plan and translate it
                     Operator pred = preds.get(input);
@@ -240,16 +251,15 @@ public class LogToPhyTranslationVisitor 
                         // comes from expression plan
                         currentPlan.remove(leaf);
                         logToPhyMap.remove(pred);
+
                         ((POProject)op).setColumn( ((POProject)leaf).getColumn() );
-                           
+                        ((POProject)op).setStar(((POProject)leaf).isStar());
+
                     }else{                    
                         currentPlan.connect(leaf, op);
                     }
                 }
             }  
-           
-            
-            
             innerPlans.add(currentPlan);
         }
         
@@ -416,6 +426,127 @@ public class LogToPhyTranslationVisitor 
     }
     
     @Override
+    public void visitLOCogroup( LOCogroup cg ) throws IOException {
+        if (cg.getGroupType() == LOCogroup.GROUPTYPE.COLLECTED) {
+            translateCollectedCogroup(cg);
+        } else {
+            translateRegularCogroup(cg);
+        }
+    }
+    
+    private void translateRegularCogroup(LOCogroup cg) throws IOException {
+        List<Operator> preds = plan.getPredecessors(cg);
+        
+        POGlobalRearrange poGlobal = new POGlobalRearrange(new OperatorKey(
+                DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE)), cg.getRequestedParallelisam() );
+        poGlobal.setAlias(cg.getAlias());
+        POPackage poPackage = new POPackage(new OperatorKey(DEFAULT_SCOPE, nodeGen
+                .getNextNodeId(DEFAULT_SCOPE)), cg.getRequestedParallelisam());
+        poPackage.setAlias(cg.getAlias());
+        currentPlan.add(poGlobal);
+        currentPlan.add(poPackage);
+
+        try {
+            currentPlan.connect(poGlobal, poPackage);
+        } catch (PlanException e1) {
+            int errCode = 2015;
+            String msg = "Invalid physical operators in the physical plan" ;
+            throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e1);
+        }
+
+        Byte type = null;
+        for( int i = 0 ; i < preds.size(); i++ ) {
+            ArrayList<LogicalExpressionPlan> exprPlans = 
+                (ArrayList<LogicalExpressionPlan>) cg.getExpressionPlans().get(i);
+            
+            POLocalRearrange physOp = new POLocalRearrange(new OperatorKey(
+                    DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE)), cg.getRequestedParallelisam() );
+            physOp.setAlias(cg.getAlias());
+            
+            List<PhysicalPlan> pExprPlans = translateExpressionPlans( cg, exprPlans );
+            
+            try {
+                physOp.setPlans(pExprPlans);
+            } catch (PlanException pe) {
+                int errCode = 2071;
+                String msg = "Problem with setting up local rearrange's plans.";
+                throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, pe);
+            }
+            try {
+                physOp.setIndex(i);
+            } catch (ExecException e1) {
+                // int errCode = 2058;
+                String msg = "Unable to set index on newly create POLocalRearrange.";
+                throw new IOException(msg);
+            }
+            if (exprPlans.size() > 1) {
+                type = DataType.TUPLE;
+                physOp.setKeyType(type);
+            } else {
+                type = pExprPlans.get(0).getLeaves().get(0).getResultType();
+                physOp.setKeyType(type);
+            }
+            physOp.setResultType(DataType.TUPLE);
+
+            currentPlan.add(physOp);
+
+            try {
+                currentPlan.connect(logToPhyMap.get(preds.get(i)), physOp);
+                currentPlan.connect(physOp, poGlobal);
+            } catch (PlanException e) {
+                int errCode = 2015;
+                String msg = "Invalid physical operators in the physical plan" ;
+                throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+            }
+        }
+        
+        poPackage.setKeyType(type);
+        poPackage.setResultType(DataType.TUPLE);
+        poPackage.setNumInps(preds.size());
+        poPackage.setInner(cg.getInner());
+        logToPhyMap.put(cg, poPackage);
+    }
+    
+    private void translateCollectedCogroup(LOCogroup cg) throws IOException {
+        // can have only one input
+        LogicalRelationalOperator pred = (LogicalRelationalOperator) plan.getPredecessors(cg).get(0);
+        List<LogicalExpressionPlan> exprPlans = (List<LogicalExpressionPlan>) cg.getExpressionPlans().get(0);
+        POCollectedGroup physOp = new POCollectedGroup(new OperatorKey(
+                DEFAULT_SCOPE, nodeGen.getNextNodeId(DEFAULT_SCOPE)));
+        physOp.setAlias(cg.getAlias());
+        List<PhysicalPlan> pExprPlans = translateExpressionPlans(cg, exprPlans);
+        
+        try {
+            physOp.setPlans(pExprPlans);
+        } catch (PlanException pe) {
+            int errCode = 2071;
+            String msg = "Problem with setting up map group's plans.";
+            throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, pe);
+        }
+        Byte type = null;
+        if (exprPlans.size() > 1) {
+            type = DataType.TUPLE;
+            physOp.setKeyType(type);
+        } else {
+            type = pExprPlans.get(0).getLeaves().get(0).getResultType();
+            physOp.setKeyType(type);
+        }
+        physOp.setResultType(DataType.TUPLE);
+
+        currentPlan.add(physOp);
+              
+        try {
+            currentPlan.connect(logToPhyMap.get(pred), physOp);
+        } catch (PlanException e) {
+            int errCode = 2015;
+            String msg = "Invalid physical operators in the physical plan" ;
+            throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+        }
+
+        logToPhyMap.put(cg, physOp);
+    }
+    
+    @Override
     public void visitLOJoin(LOJoin loj) throws IOException {
         String scope = DEFAULT_SCOPE;
 //        System.err.println("Entering Join");
@@ -448,23 +579,6 @@ public class LogToPhyTranslationVisitor 
             // Convert the expression plan into physical Plan
             List<PhysicalPlan> exprPlans = translateExpressionPlans(loj, plans);
 
-//            currentPlans.push(currentPlan);
-//            for (LogicalExpressionPlan lp : plans) {
-//                currentPlan = new PhysicalPlan();
-//                
-//                // We spawn a new Dependency Walker and use it 
-//                PlanWalker childWalker = currentWalker.spawnChildWalker(lp);
-//                pushWalker(childWalker);
-//                // We create a new ExpToPhyTranslationVisitor to walk the ExpressionPlan
-//                currentWalker.walk(
-//                        new ExpToPhyTranslationVisitor(currentWalker.getPlan(), 
-//                                childWalker) );
-//                
-//                exprPlans.add(currentPlan);
-//                popWalker();
-//            }
-//            currentPlan = currentPlans.pop();
-            
             ppLists.add(exprPlans);
             joinPlans.put(physOp, exprPlans);
             
@@ -745,6 +859,128 @@ public class LogToPhyTranslationVisitor 
 //        System.err.println("Exiting Join");
     }
     
+    @Override
+    public void visitLOUnion(LOUnion loUnion) throws IOException {
+        String scope = DEFAULT_SCOPE;
+        POUnion physOp = new POUnion(new OperatorKey(scope,nodeGen.getNextNodeId(scope)), loUnion.getRequestedParallelisam());
+        physOp.setAlias(loUnion.getAlias());
+        currentPlan.add(physOp);
+        physOp.setResultType(DataType.BAG);
+        logToPhyMap.put(loUnion, physOp);
+        List<Operator> ops = plan.getPredecessors(loUnion);
+
+        for (Operator l : ops) {
+            PhysicalOperator from = logToPhyMap.get(l);
+            try {
+                currentPlan.connect(from, physOp);
+            } catch (PlanException e) {
+                int errCode = 2015;
+                String msg = "Invalid physical operators in the physical plan" ;
+                throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+            }
+        }
+    }
+    
+    @Override
+    public void visitLOSplit(LOSplit loSplit) throws IOException {
+        String scope = DEFAULT_SCOPE;
+        POSplit physOp = new POSplit(new OperatorKey(scope, nodeGen
+                .getNextNodeId(scope)), loSplit.getRequestedParallelisam());
+        physOp.setAlias(loSplit.getAlias());
+        FileSpec splStrFile;
+        try {
+            splStrFile = new FileSpec(FileLocalizer.getTemporaryPath(null, pc).toString(),new FuncSpec(BinStorage.class.getName()));
+        } catch (IOException e1) {
+            byte errSrc = pc.getErrorSource();
+            int errCode = 0;
+            switch(errSrc) {
+            case PigException.BUG:
+                errCode = 2016;
+                break;
+            case PigException.REMOTE_ENVIRONMENT:
+                errCode = 6002;
+                break;
+            case PigException.USER_ENVIRONMENT:
+                errCode = 4003;
+                break;
+            }
+            String msg = "Unable to obtain a temporary path." ;
+            throw new LogicalToPhysicalTranslatorException(msg, errCode, errSrc, e1);
+
+        }
+        physOp.setSplitStore(splStrFile);
+        logToPhyMap.put(loSplit, physOp);
+
+        currentPlan.add(physOp);
+
+        List<Operator> op = plan.getPredecessors(loSplit); 
+        PhysicalOperator from;
+        
+        if(op != null) {
+            from = logToPhyMap.get(op.get(0));
+        } else {
+            int errCode = 2051;
+            String msg = "Did not find a predecessor for Split." ;
+            throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG);            
+        }        
+
+        try {
+            currentPlan.connect(from, physOp);
+        } catch (PlanException e) {
+            int errCode = 2015;
+            String msg = "Invalid physical operators in the physical plan" ;
+            throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+        }
+    }
+    
+    @Override
+    public void visitLOSplitOutput(LOSplitOutput loSplitOutput) throws IOException {
+        String scope = DEFAULT_SCOPE;
+//        System.err.println("Entering Filter");
+        POFilter poFilter = new POFilter(new OperatorKey(scope, nodeGen
+                .getNextNodeId(scope)), loSplitOutput.getRequestedParallelisam());
+        poFilter.setAlias(loSplitOutput.getAlias());
+        poFilter.setResultType(DataType.BAG);
+        currentPlan.add(poFilter);
+        logToPhyMap.put(loSplitOutput, poFilter);
+        currentPlans.push(currentPlan);
+
+        currentPlan = new PhysicalPlan();
+
+//        PlanWalker childWalker = currentWalker
+//                .spawnChildWalker(filter.getFilterPlan());
+        PlanWalker childWalker = new ReverseDependencyOrderWalker(loSplitOutput.getFilterPlan());
+        pushWalker(childWalker);
+        //currentWalker.walk(this);
+        currentWalker.walk(
+                new ExpToPhyTranslationVisitor( currentWalker.getPlan(), 
+                        childWalker, loSplitOutput, currentPlan, logToPhyMap ) );
+        popWalker();
+
+        poFilter.setPlan(currentPlan);
+        currentPlan = currentPlans.pop();
+
+        List<Operator> op = loSplitOutput.getPlan().getPredecessors(loSplitOutput);
+
+        PhysicalOperator from;
+        if(op != null) {
+            from = logToPhyMap.get(op.get(0));
+        } else {
+            int errCode = 2051;
+            String msg = "Did not find a predecessor for Filter." ;
+            throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG);
+        }
+        
+        try {
+            currentPlan.connect(from, poFilter);
+        } catch (PlanException e) {
+            int errCode = 2015;
+            String msg = "Invalid physical operators in the physical plan" ;
+            throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+        }
+//        System.err.println("Exiting Filter");
+    }
+
     /**
      * updates plan with check for empty bag and if bag is empty to flatten a bag
      * with as many null's as dictated by the schema

Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalPlan.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalPlan.java?rev=923043&r1=923042&r2=923043&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalPlan.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalPlan.java Mon Mar 15 03:28:27 2010
@@ -18,6 +18,10 @@
 
 package org.apache.pig.experimental.logical.relational;
 
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+
 import org.apache.pig.experimental.plan.BaseOperatorPlan;
 import org.apache.pig.experimental.plan.OperatorPlan;
 
@@ -50,5 +54,4 @@ public class LogicalPlan extends BaseOpe
         
         return super.isEqual(other);   
     }
-    
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalPlanVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalPlanVisitor.java?rev=923043&r1=923042&r2=923043&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalPlanVisitor.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalPlanVisitor.java Mon Mar 15 03:28:27 2010
@@ -62,4 +62,16 @@ public abstract class LogicalPlanVisitor
     
     public void visitLOInnerLoad(LOInnerLoad load) throws IOException {
     }
+
+    public void visitLOCogroup(LOCogroup loCogroup) throws IOException {
+    }
+    
+    public void visitLOSplit(LOSplit loSplit) throws IOException {
+    }
+    
+    public void visitLOSplitOutput(LOSplitOutput loSplitOutput) throws IOException {
+    }
+    
+    public void visitLOUnion(LOUnion loUnion) throws IOException {
+    }
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalRelationalOperator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalRelationalOperator.java?rev=923043&r1=923042&r2=923043&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalRelationalOperator.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalRelationalOperator.java Mon Mar 15 03:28:27 2010
@@ -124,7 +124,7 @@ abstract public class LogicalRelationalO
      * is defined here as having equal schemas and  predecessors that are equal.
      * This is intended to be used by operators' equals methods.
      * @param other LogicalRelationalOperator to compare predecessors against
-     * @return true if the equals() methods of this node's predecessor(s) returns
+     * @return true if the isEquals() methods of this node's predecessor(s) returns
      * true when invoked with other's predecessor(s).
      */
     protected boolean checkEquality(LogicalRelationalOperator other) {

Modified: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalSchema.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalSchema.java?rev=923043&r1=923042&r2=923043&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalSchema.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/LogicalSchema.java Mon Mar 15 03:28:27 2010
@@ -63,7 +63,7 @@ public class LogicalSchema {
                 return false;
             }
         }
-        
+               
         public String toString() {
             if( type == DataType.BAG ) {
                 if( schema == null ) {
@@ -182,6 +182,32 @@ public class LogicalSchema {
     }
     
     /**
+     * Look for the index of the field that contains the specified uid
+     * @param uid the uid to look for
+     * @return the index of the field, -1 if not found
+     */
+    public int findField(long uid) {            
+        
+        for(int i=0; i< size(); i++) {
+            LogicalFieldSchema f = getField(i);
+            // if this field has the same uid, then return this field
+            if (f.uid == uid) {
+                return i;
+            } 
+            
+            // if this field has a schema, check its schema
+            if (f.schema != null) {
+                if (f.schema.findField(uid) != -1) {
+                    return i;
+                }
+            }
+        }
+        
+        return -1;
+    }
+    
+    
+    /**
      * Merge two schemas.
      * @param s1
      * @param s2

Added: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/SchemaNotDefinedException.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/SchemaNotDefinedException.java?rev=923043&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/SchemaNotDefinedException.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/relational/SchemaNotDefinedException.java Mon Mar 15 03:28:27 2010
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.experimental.logical.relational;
+
+
+public class SchemaNotDefinedException extends RuntimeException {
+
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 1L;
+
+    public SchemaNotDefinedException(Throwable e) {
+        super(e);
+    }
+    
+    public SchemaNotDefinedException(String msg, Throwable e) {
+        super(msg, e);
+    }
+    
+    public SchemaNotDefinedException(String msg) {
+        super(msg);
+    }	
+}

Added: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/AddForEach.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/AddForEach.java?rev=923043&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/AddForEach.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/AddForEach.java Mon Mar 15 03:28:27 2010
@@ -0,0 +1,184 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.experimental.logical.rules;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.pig.experimental.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.experimental.logical.expression.ProjectExpression;
+import org.apache.pig.experimental.logical.relational.LOForEach;
+import org.apache.pig.experimental.logical.relational.LOGenerate;
+import org.apache.pig.experimental.logical.relational.LOInnerLoad;
+import org.apache.pig.experimental.logical.relational.LogicalPlan;
+import org.apache.pig.experimental.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.experimental.logical.relational.LogicalSchema;
+import org.apache.pig.experimental.plan.Operator;
+import org.apache.pig.experimental.plan.OperatorPlan;
+import org.apache.pig.experimental.plan.OperatorSubPlan;
+import org.apache.pig.experimental.plan.optimizer.Transformer;
+import org.apache.pig.impl.util.Pair;
+
+public class AddForEach extends WholePlanRule {
+    protected static final String REQUIREDCOLS = "AddForEach:RequiredColumns";
+    
+    public AddForEach(String n) {
+        super(n);		
+    }
+
+    @Override
+    public Transformer getNewTransformer() {
+        return new AddForEachTransformer();
+    }
+    
+    public class AddForEachTransformer extends Transformer {
+        LogicalRelationalOperator opForAdd;
+        OperatorSubPlan subPlan;
+
+        @Override
+        public boolean check(OperatorPlan matched) throws IOException {
+            Iterator<Operator> iter = matched.getOperators();
+            while(iter.hasNext()) {
+                LogicalRelationalOperator op = (LogicalRelationalOperator)iter.next();
+                if (shouldAdd(op)) {
+                    opForAdd = op;
+                    return true;
+                }
+            }
+            
+            return false;
+        }
+
+        @Override
+        public OperatorPlan reportChanges() {        	
+            return subPlan;
+        }
+
+        private void addSuccessors(Operator op) throws IOException {
+            subPlan.add(op);
+            List<Operator> ll = op.getPlan().getSuccessors(op);
+            if (ll != null) {
+                for(Operator suc: ll) {
+                    addSuccessors(suc);
+                }
+            }
+        }
+        
+        @Override
+        public void transform(OperatorPlan matched) throws IOException {            
+            addForeach(opForAdd);
+            
+            subPlan = new OperatorSubPlan(currentPlan);
+            addSuccessors(opForAdd);
+        }
+        
+        @SuppressWarnings("unchecked")
+        // check if an LOForEach should be added after the logical operator
+        private boolean shouldAdd(LogicalRelationalOperator op) throws IOException {
+            if (op instanceof LOForEach) {
+                return false;
+            }
+            Set<Long> output = (Set<Long>)op.getAnnotation(ColumnPruneHelper.OUTPUTUIDS);
+            
+            if (output == null) {
+                return false;
+            }
+                            
+            LogicalSchema s = op.getSchema();
+            if (s == null) {
+                return false;
+            }
+                               
+            // check if there is already a foreach
+            List<Operator> ll = op.getPlan().getSuccessors(op);
+            if (ll != null && ll.get(0) instanceof LOForEach) {
+                return false;
+            }
+            
+            Set<Integer> cols = new HashSet<Integer>();
+            for(long uid: output) {
+                int col = s.findField(uid);
+                if (col < 0) {
+                    throw new RuntimeException("Uid " + uid + " is not in the schema of " + op.getName());
+                }
+                cols.add(col);
+            }
+            
+            if (cols.size()<s.size()) {
+                op.annotate(REQUIREDCOLS, cols);
+                return true;
+            }
+            
+            return false;
+        }
+        
+        @SuppressWarnings("unchecked")
+        private void addForeach(LogicalRelationalOperator op) throws IOException {            
+            LOForEach foreach = new LOForEach(op.getPlan());
+            
+            // add foreach to the base plan
+            LogicalPlan p = (LogicalPlan)op.getPlan();
+            p.add(foreach);
+            List<Operator> next = p.getSuccessors(op);           
+            if (next != null) {
+                Operator[] nextArray = next.toArray(new Operator[0]);
+                for(Operator n: nextArray) {                  
+                    Pair<Integer, Integer> pos = p.disconnect(op, n);           
+                    p.connect(foreach, pos.first, n, pos.second);
+                }
+            }
+            
+            p.connect(op, foreach);                        
+            
+            LogicalPlan innerPlan = new LogicalPlan();
+            foreach.setInnerPlan(innerPlan);
+                      
+            // get output columns
+            Set<Integer> cols = (Set<Integer>)op.getAnnotation(REQUIREDCOLS);            
+            
+            // build foreach inner plan
+            List<LogicalExpressionPlan> exps = new ArrayList<LogicalExpressionPlan>();            	
+            LOGenerate gen = new LOGenerate(innerPlan, exps, new boolean[cols.size()]);
+            innerPlan.add(gen);
+            
+            LogicalSchema schema = op.getSchema();
+            for (int i=0, j=0; i<schema.size(); i++) {   
+                if (!cols.contains(i)) {
+                    continue;
+                }
+                               
+                LOInnerLoad innerLoad = new LOInnerLoad(innerPlan, foreach, i);
+                innerLoad.getProjection().setUid(foreach);                    
+                innerPlan.add(innerLoad);          
+                innerPlan.connect(innerLoad, gen);
+                
+                LogicalExpressionPlan exp = new LogicalExpressionPlan();
+                ProjectExpression prj = new ProjectExpression(exp, schema.getField(i).type, j++, 0);
+                prj.setUid(gen);
+                exp.add(prj);
+                exps.add(exp);                
+            }                
+           
+        }
+    }          
+}

Added: hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/ColumnMapKeyPrune.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/ColumnMapKeyPrune.java?rev=923043&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/ColumnMapKeyPrune.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/experimental/logical/rules/ColumnMapKeyPrune.java Mon Mar 15 03:28:27 2010
@@ -0,0 +1,440 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.experimental.logical.rules;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.pig.LoadFunc;
+import org.apache.pig.LoadPushDown;
+import org.apache.pig.LoadPushDown.RequiredField;
+import org.apache.pig.LoadPushDown.RequiredFieldList;
+import org.apache.pig.data.DataType;
+import org.apache.pig.experimental.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.experimental.logical.expression.ProjectExpression;
+import org.apache.pig.experimental.logical.relational.LOCogroup;
+import org.apache.pig.experimental.logical.relational.LOFilter;
+import org.apache.pig.experimental.logical.relational.LOForEach;
+import org.apache.pig.experimental.logical.relational.LOGenerate;
+import org.apache.pig.experimental.logical.relational.LOInnerLoad;
+import org.apache.pig.experimental.logical.relational.LOJoin;
+import org.apache.pig.experimental.logical.relational.LOLoad;
+import org.apache.pig.experimental.logical.relational.LOStore;
+import org.apache.pig.experimental.logical.relational.LogicalPlan;
+import org.apache.pig.experimental.logical.relational.LogicalPlanVisitor;
+import org.apache.pig.experimental.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.experimental.logical.relational.LogicalSchema;
+import org.apache.pig.experimental.plan.Operator;
+import org.apache.pig.experimental.plan.OperatorPlan;
+import org.apache.pig.experimental.plan.OperatorSubPlan;
+import org.apache.pig.experimental.plan.ReverseDependencyOrderWalker;
+import org.apache.pig.experimental.plan.optimizer.Transformer;
+import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.util.Pair;
+
+/**
+ * This Rule prunes columns and map keys and set to loader. This rule depends
+ * on MapKeysPruneHelper to calculate what keys are required for a loader,
+ * and ColumnPruneHelper to calculate the required columns for a loader. Then
+ * it combines the map keys and columns info to set into the loader.
+ */
+public class ColumnMapKeyPrune extends WholePlanRule {
+    private boolean hasRun;
+    
+    public ColumnMapKeyPrune(String n) {
+        super(n);
+        hasRun = false;
+    }
+
+    @Override
+    public Transformer getNewTransformer() {
+        return new ColumnMapKeyPruneTransformer();
+    }
+    
+    public class ColumnMapKeyPruneTransformer extends Transformer {
+        private MapKeysPruneHelper mapKeyHelper;
+        private ColumnPruneHelper columnHelper;
+        private boolean columnPrune;
+        private boolean mapKeyPrune;
+
+        /*
+         * This is a map of of required columns and map keys for each LOLoad        
+         * RequiredMapKeys --> Map<Integer, Set<String> >
+         * RequiredColumns --> Set<Integer>
+         * 
+         * The integer are column indexes.
+         */
+        private Map<LOLoad,Pair<Map<Integer,Set<String>>,Set<Integer>>> requiredItems = 
+            new HashMap<LOLoad,Pair<Map<Integer,Set<String>>,Set<Integer>>>();
+        
+        private OperatorSubPlan subPlan = null;
+        
+        @Override
+        public boolean check(OperatorPlan matched) throws IOException {
+            // only run this rule once
+            if (hasRun) {
+                return false;
+            }
+            
+            hasRun = true;
+            mapKeyHelper = new MapKeysPruneHelper(matched);
+            columnHelper = new ColumnPruneHelper(matched);
+            
+            // check if map keys can be pruned
+            mapKeyPrune = mapKeyHelper.check();
+            // check if columns can be pruned
+            columnPrune = columnHelper.check();
+            
+            return mapKeyPrune || columnPrune;
+        }
+
+        @Override
+        public OperatorPlan reportChanges() {
+            return subPlan;
+        }
+        
+        @SuppressWarnings("unchecked")
+        private void merge() {
+            // combine two subplans
+            subPlan = new OperatorSubPlan(currentPlan);
+            if (columnPrune) {
+                Iterator<Operator> iter = columnHelper.reportChanges().getOperators();
+                while(iter.hasNext()) {
+                    subPlan.add(iter.next());
+                }
+            }
+            
+            if (mapKeyPrune) {
+                Iterator<Operator> iter = mapKeyHelper.reportChanges().getOperators();
+                while(iter.hasNext()) {
+                    subPlan.add(iter.next());
+                }
+            }
+            
+            // combine annotations
+            for( Operator source : currentPlan.getSources() ) {
+                Map<Integer,Set<String>> mapKeys = 
+                    (Map<Integer, Set<String>>) source.getAnnotation(MapKeysPruneHelper.REQUIRED_MAPKEYS);
+                Set<Integer> requiredColumns = null;
+                if (source.getAnnotation(ColumnPruneHelper.REQUIREDCOLS) != null) {
+                    requiredColumns = new HashSet<Integer>((Set<Integer>) source.getAnnotation(ColumnPruneHelper.REQUIREDCOLS));
+                }
+                
+                // We dont have any information so skip
+                if( requiredColumns == null && mapKeys == null ) {
+                    continue;
+                }
+                                
+                if( requiredColumns != null && mapKeys != null ) { 
+
+                    Set<Integer> duplicatedCols = new HashSet<Integer>();
+
+                    // Remove the columns already marked by MapKeys
+                    for( Integer col : requiredColumns ) {
+                        if( mapKeys.containsKey(col) ) {
+                            duplicatedCols.add(col);
+                        }
+                    }
+                    requiredColumns.removeAll(duplicatedCols);
+                } else if ( mapKeys != null && requiredColumns == null ) {
+                    // This is the case where only mapKeys can be pruned. And none
+                    // of the columns can be pruned. So we add all columns to the
+                    // requiredcolumns part
+                    requiredColumns = new HashSet<Integer>();
+                    for(int i = 0; i < ((LogicalRelationalOperator)source).getSchema().size(); i++ ) {
+                        if( !mapKeys.containsKey(i) ) {
+                            requiredColumns.add(i);
+                        }
+                    }
+                }
+
+                requiredItems.put((LOLoad) source, new Pair<Map<Integer,Set<String>>,Set<Integer>>(mapKeys, requiredColumns));
+            }         
+        }
+
+        @Override
+        public void transform(OperatorPlan matched) throws IOException {        	            
+            merge();
+            
+            ColumnPruneVisitor v = new ColumnPruneVisitor(subPlan);           
+            v.visit();                   
+        }        
+    
+        
+        // visitor to change the plan to remove unnecessary columns
+        private class ColumnPruneVisitor extends LogicalPlanVisitor {
+            public ColumnPruneVisitor(OperatorPlan plan) {
+                super(plan, new ReverseDependencyOrderWalker(plan));                        
+            }
+            
+            public void visitLOLoad(LOLoad load) throws IOException {
+                if(! requiredItems.containsKey( load ) ) {
+                    return;
+                }
+                
+                Pair<Map<Integer,Set<String>>,Set<Integer>> required = 
+                    requiredItems.get(load);
+                
+                RequiredFieldList requiredFields = new RequiredFieldList();
+
+                LogicalSchema s = load.getSchema();
+                for (int i=0;i<s.size();i++) {
+                    RequiredField requiredField = null;
+                    // As we have done processing ahead, we assume that 
+                    // a column is not present in both ColumnPruner and 
+                    // MapPruner
+                    if( required.first != null && required.first.containsKey(i) ) {
+                        requiredField = new RequiredField();
+                        requiredField.setIndex(i);
+                        requiredField.setType(s.getField(i).type);
+                        List<RequiredField> subFields = new ArrayList<RequiredField>();
+                        for( String key : required.first.get(i) ) {
+                            RequiredField subField = new RequiredField(key,-1,null,DataType.BYTEARRAY);
+                            subFields.add(subField);
+                        }
+                        requiredField.setSubFields(subFields);
+                        requiredFields.add(requiredField);
+                    }
+                    if( required.second != null && required.second.contains(i) ) {
+                        requiredField = new RequiredField();
+                        requiredField.setIndex(i);
+                        requiredField.setType(s.getField(i).type);      
+                        requiredFields.add(requiredField);
+                    }
+                }         
+                    
+                log.info("Loader for " + load.getAlias() + " is pruned. Load fields " + requiredFields); 
+                for(RequiredField rf: requiredFields.getFields()) {
+                    List<RequiredField> sub = rf.getSubFields();
+                    if (sub != null) {
+                        // log.info("For column " + rf.getIndex() + ", set map keys: " + sub.toString());
+                        log.info("Map key required for " + load.getAlias() + ": $" + rf.getIndex() + "->" + sub);
+                    }
+                }
+                
+                LoadPushDown.RequiredFieldResponse response = null;
+                try {
+                    LoadFunc loadFunc = load.getLoadFunc();
+                    if (loadFunc instanceof LoadPushDown) {
+                        response = ((LoadPushDown)loadFunc).pushProjection(requiredFields);
+                    }
+                                        
+                } catch (FrontendException e) {
+                    log.warn("pushProjection on "+load+" throw an exception, skip it");
+                }                      
+                
+                // Loader does not support column pruning, insert foreach      
+                if (columnPrune) {
+                    if (response==null || !response.getRequiredFieldResponse()) {
+                        LogicalPlan p = (LogicalPlan)load.getPlan();                        
+                        Operator next = p.getSuccessors(load).get(0); 
+                        // if there is already a LOForEach after load, we don't need to 
+                        // add another LOForEach
+                        if (next instanceof LOForEach) {
+                            return;
+                        }
+                        
+                        LOForEach foreach = new LOForEach(load.getPlan());
+                        
+                        // add foreach to the base plan                       
+                        p.add(foreach);
+                                       
+                        Pair<Integer,Integer> disconnectedPos = p.disconnect(load, next);
+                        p.connect(load, disconnectedPos.first.intValue(), foreach, 0 );
+                        p.connect(foreach, 0, next, disconnectedPos.second.intValue());
+                        
+                        // add foreach to the subplan
+                        subPlan.add(foreach);
+                        
+                        LogicalPlan innerPlan = new LogicalPlan();
+                        foreach.setInnerPlan(innerPlan);
+                        
+                        // build foreach inner plan
+                        List<LogicalExpressionPlan> exps = new ArrayList<LogicalExpressionPlan>();            	
+                        LOGenerate gen = new LOGenerate(innerPlan, exps, new boolean[requiredFields.getFields().size()]);
+                        innerPlan.add(gen);
+                        
+                        for (int i=0; i<requiredFields.getFields().size(); i++) {
+                            LoadPushDown.RequiredField rf = requiredFields.getFields().get(i);
+                            LOInnerLoad innerLoad = new LOInnerLoad(innerPlan, foreach, rf.getIndex());
+                            innerLoad.getProjection().setUid(foreach);                    
+                            innerPlan.add(innerLoad);          
+                            innerPlan.connect(innerLoad, gen);
+                            
+                            LogicalExpressionPlan exp = new LogicalExpressionPlan();
+                            ProjectExpression prj = new ProjectExpression(exp, rf.getType(), i, 0);
+                            prj.setUid(gen);
+                            exp.add(prj);
+                            exps.add(exp);
+                        }                
+                       
+                    } else {
+                        // columns are pruned, reset schema for LOLoader
+                        LogicalSchema newSchema = new LogicalSchema();
+                        List<LoadPushDown.RequiredField> fieldList = requiredFields.getFields();
+                        for (int i=0; i<fieldList.size(); i++) {            		
+                            newSchema.addField(s.getField(fieldList.get(i).getIndex()));
+                        }
+
+                        load.setScriptSchema(newSchema);
+                    }
+                }
+            }
+                    
+
+            public void visitLOFilter(LOFilter filter) throws IOException {
+            }
+            
+            public void visitLOStore(LOStore store) throws IOException {
+            }
+            
+            public void visitLOCogroup( LOCogroup cg ) throws IOException {
+            }
+            
+            public void visitLOJoin(LOJoin join) throws IOException {
+            }
+            
+            @SuppressWarnings("unchecked")
+            public void visitLOForEach(LOForEach foreach) throws IOException {
+                if (!columnPrune) {
+                    return;
+                }
+                
+                // get column numbers from input uids
+                Set<Long> input = (Set<Long>)foreach.getAnnotation(ColumnPruneHelper.INPUTUIDS);
+                LogicalRelationalOperator op = (LogicalRelationalOperator)foreach.getPlan().getPredecessors(foreach).get(0);
+                Set<Integer> cols = columnHelper.getColumns(op.getSchema(), input);
+                
+                LogicalPlan innerPlan = foreach.getInnerPlan();
+                LOGenerate gen = (LOGenerate)innerPlan.getSinks().get(0);
+                            
+                // clean up the predecessors of LOGenerate
+                List<Operator> ll = innerPlan.getPredecessors(gen);
+                List<Operator> toRemove = new ArrayList<Operator>();
+                for(int i=0; i<ll.size(); i++) {
+                    
+                    // if the LOInnerLoads for this subplan are all in the column set, 
+                    // no change required, keep going
+                    if (checkInnerLoads((LogicalRelationalOperator)ll.get(i), cols)) {
+                        continue;
+                    }         
+                                       
+                    // clean up and adjust expression plans
+                    Iterator<LogicalExpressionPlan> iter = gen.getOutputPlans().iterator();
+                    int j=-1;
+                    while(iter.hasNext()) {
+                        j++;
+                        LogicalExpressionPlan exp = iter.next();
+                        List<Operator> sinks = exp.getSinks();
+                        for(Operator s: sinks) {
+                            if (s instanceof ProjectExpression) {
+                                int inputNo = ((ProjectExpression)s).getInputNum();
+                                if (inputNo + toRemove.size() == i) {
+                                    // if this expression has this input that is to be removed,
+                                    // then remove this expression plan
+                                    iter.remove();
+                                    
+                                    // adjust flatten flags
+                                    boolean[] flatten = gen.getFlattenFlags();
+                                    for(int k=j; k<flatten.length-1; k++) {
+                                        flatten[k] = flatten[k+1]; 
+                                    }
+                                    break;
+                                } else if (inputNo + toRemove.size() > i) {
+                                    // adjust input number for all projections whose
+                                    // input number is after the one to be removed
+                                    ((ProjectExpression)s).setInputNum(inputNo-1);
+                                }
+                            }
+                        }
+                    }
+                    
+                    // this LOInnerLoad and its successors should be removed, add to the remove list
+                    toRemove.add(ll.get(i));
+                                        
+                }
+                
+                for(Operator pred: toRemove) {
+                    removeSubTree((LogicalRelationalOperator)pred);
+                }
+                
+                // trim the flatten flags in case some expressions are removed 
+                boolean[] flatten = new boolean[gen.getOutputPlans().size()];
+                System.arraycopy(gen.getFlattenFlags(), 0, flatten, 0, flatten.length);
+                gen.setFlattenFlags(flatten);
+            }
+            
+            public void visitLOGenerate(LOGenerate gen) throws IOException {
+            }
+            
+            public void visitLOInnerLoad(LOInnerLoad load) throws IOException {
+            }
+            
+            // check if the column number in LOInnerLoad is inside a given column index set
+            protected boolean checkInnerLoads(LogicalRelationalOperator op, Set<Integer> cols) throws IOException {
+                if (op instanceof LOInnerLoad) {
+                    int col = ((LOInnerLoad)op).getColNum();
+                    if (!cols.contains(col)) {
+                        return false;
+                    }
+                }
+                
+                List<Operator> preds = op.getPlan().getPredecessors(op);
+                if (preds != null) {
+                    for(Operator pred: preds ) {
+                        if (!checkInnerLoads((LogicalRelationalOperator)pred, cols)) {
+                            return false;
+                        }
+                    }
+                }
+                
+                return true;
+            }             
+            
+            // remove all the operators starting from an operator
+            protected void removeSubTree(LogicalRelationalOperator op) throws IOException {
+                LogicalPlan p = (LogicalPlan)op.getPlan();
+                List<Operator> ll = p.getPredecessors(op);
+                if (ll != null) {
+                    for(Operator pred: ll) {    			
+                        removeSubTree((LogicalRelationalOperator)pred);
+                    }
+                }
+                        
+                if (p.getSuccessors(op) != null) {
+                    Operator[] succs = p.getSuccessors(op).toArray(new Operator[0]);
+                    
+                    for(Operator s: succs) {
+                        p.disconnect(op, s);
+                    }
+                }
+                
+                p.remove(op);
+            }
+            
+        }
+    }
+
+}