You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by sm...@apache.org on 2009/05/22 22:50:33 UTC

svn commit: r777708 [1/3] - in /hadoop/pig/trunk: src/org/apache/pig/impl/logicalLayer/ src/org/apache/pig/impl/plan/ src/org/apache/pig/impl/util/ test/org/apache/pig/test/

Author: sms
Date: Fri May 22 20:50:32 2009
New Revision: 777708

URL: http://svn.apache.org/viewvc?rev=777708&view=rev
Log:
PIG-697: Proposed improvements to pig's optimizer

Added:
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/TopLevelProjectFinder.java
    hadoop/pig/trunk/src/org/apache/pig/impl/plan/RequiredFields.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestRequiredFields.java
Modified:
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOCogroup.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOCross.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LODistinct.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOFRJoin.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOFilter.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOForEach.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLimit.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLoad.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOSort.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOSplit.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOSplitOutput.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStore.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStream.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOUnion.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java
    hadoop/pig/trunk/src/org/apache/pig/impl/plan/ProjectionMap.java
    hadoop/pig/trunk/src/org/apache/pig/impl/util/Pair.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestProjectionMap.java

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOCogroup.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOCogroup.java?rev=777708&r1=777707&r2=777708&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOCogroup.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOCogroup.java Fri May 22 20:50:32 2009
@@ -30,6 +30,7 @@
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.ProjectionMap;
+import org.apache.pig.impl.plan.RequiredFields;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.logicalLayer.parser.ParseException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
@@ -553,7 +554,6 @@
         for(int inputNum = 0; (inputNum < predecessors.size()) && (!groupByAdded); ++inputNum) {
             LogicalOperator predecessor = predecessors.get(inputNum);
 
-            
             List<LogicalPlan> predecessorPlans = (ArrayList<LogicalPlan>) groupByPlans.get(predecessor);
 
             int inputColumn = -1;
@@ -612,4 +612,54 @@
         return new ProjectionMap(mapFields, removedFields, addedFields);
     }
 
+    @Override
+    public List<RequiredFields> getRequiredFields() {
+        List<LogicalOperator> predecessors = mPlan.getPredecessors(this);
+        
+        if(predecessors == null) {
+            return null;
+        }
+
+        List<RequiredFields> requiredFields = new ArrayList<RequiredFields>();
+        
+        for(int inputNum = 0; inputNum < predecessors.size(); ++inputNum) {
+            Set<Pair<Integer, Integer>> fields = new HashSet<Pair<Integer, Integer>>();
+            Set<LOProject> projectSet = new HashSet<LOProject>();
+            boolean groupByStar = false;
+
+            for (LogicalPlan plan : getGroupByPlans().get(predecessors.get(inputNum))) {
+                TopLevelProjectFinder projectFinder = new TopLevelProjectFinder(plan);
+                try {
+                    projectFinder.visit();
+                } catch (VisitorException ve) {
+                    requiredFields.clear();
+                    requiredFields.add(null);
+                    return requiredFields;
+                }
+                projectSet.addAll(projectFinder.getProjectSet());
+                if(projectFinder.getProjectStarSet() != null) {
+                    groupByStar = true;
+                }
+            }
+
+            if(groupByStar) {
+                requiredFields.add(new RequiredFields(true));
+            } else {                
+                for (LOProject project : projectSet) {
+                    for (int inputColumn : project.getProjection()) {
+                        fields.add(new Pair<Integer, Integer>(inputNum, inputColumn));
+                    }
+                }
+        
+                if(fields.size() == 0) {
+                    requiredFields.add(new RequiredFields(false, true));
+                } else {                
+                    requiredFields.add(new RequiredFields(new ArrayList<Pair<Integer, Integer>>(fields)));
+                }
+            }
+        }
+        
+        return (requiredFields.size() == 0? null: requiredFields);
+    }
+
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOCross.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOCross.java?rev=777708&r1=777707&r2=777708&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOCross.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOCross.java Fri May 22 20:50:32 2009
@@ -33,6 +33,7 @@
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.PlanVisitor;
 import org.apache.pig.impl.plan.ProjectionMap;
+import org.apache.pig.impl.plan.RequiredFields;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.MultiMap;
 import org.apache.pig.impl.util.Pair;
@@ -255,4 +256,21 @@
         return new ProjectionMap(mapFields, null, addedFields);
     }
 
+    @Override
+    public List<RequiredFields> getRequiredFields() {
+        List<LogicalOperator> predecessors = mPlan.getPredecessors(this);
+        
+        if(predecessors == null) {
+            return null;
+        }
+
+        List<RequiredFields> requiredFields = new ArrayList<RequiredFields>();
+        
+        for(int inputNum = 0; inputNum < predecessors.size(); ++inputNum) {
+            requiredFields.add(new RequiredFields(true));
+        }
+        
+        return (requiredFields.size() == 0? null: requiredFields);
+    }
+
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LODistinct.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LODistinct.java?rev=777708&r1=777707&r2=777708&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LODistinct.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LODistinct.java Fri May 22 20:50:32 2009
@@ -27,7 +27,9 @@
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.PlanVisitor;
 import org.apache.pig.impl.plan.ProjectionMap;
+import org.apache.pig.impl.plan.RequiredFields;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.Pair;
 import org.apache.pig.data.DataType;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -156,5 +158,12 @@
             return null;
         }
     }
+    
+    @Override
+    public List<RequiredFields> getRequiredFields() {
+        List<RequiredFields> requiredFields = new ArrayList<RequiredFields>();
+        requiredFields.add(new RequiredFields(false, true));
+        return requiredFields;
+    }
 
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOFRJoin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOFRJoin.java?rev=777708&r1=777707&r2=777708&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOFRJoin.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOFRJoin.java Fri May 22 20:50:32 2009
@@ -19,8 +19,10 @@
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.Hashtable;
 import java.util.List;
+import java.util.Set;
 import java.util.Map.Entry;
 
 import org.apache.commons.logging.Log;
@@ -32,6 +34,7 @@
 import org.apache.pig.impl.logicalLayer.schema.Schema.FieldSchema;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.ProjectionMap;
+import org.apache.pig.impl.plan.RequiredFields;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.MultiMap;
 import org.apache.pig.impl.util.Pair;
@@ -339,4 +342,54 @@
         return new ProjectionMap(mapFields, null, addedFields);
     }
 
+    @Override
+    public List<RequiredFields> getRequiredFields() {        
+        List<LogicalOperator> predecessors = mPlan.getPredecessors(this);
+        
+        if(predecessors == null) {
+            return null;
+        }
+        
+        List<RequiredFields> requiredFields = new ArrayList<RequiredFields>();
+        
+        for(int inputNum = 0; inputNum < predecessors.size(); ++inputNum) {
+            Set<Pair<Integer, Integer>> fields = new HashSet<Pair<Integer, Integer>>();
+            Set<LOProject> projectSet = new HashSet<LOProject>();
+            boolean groupByStar = false;
+
+            for (LogicalPlan plan : this.getJoinColPlans().get(predecessors.get(inputNum))) {
+                TopLevelProjectFinder projectFinder = new TopLevelProjectFinder(plan);
+                try {
+                    projectFinder.visit();
+                } catch (VisitorException ve) {
+                    requiredFields.clear();
+                    requiredFields.add(null);
+                    return requiredFields;
+                }
+                projectSet.addAll(projectFinder.getProjectSet());
+                if(projectFinder.getProjectStarSet() != null) {
+                    groupByStar = true;
+                }
+            }
+
+            if(groupByStar) {
+                requiredFields.add(new RequiredFields(true));
+            } else {                
+                for (LOProject project : projectSet) {
+                    for (int inputColumn : project.getProjection()) {
+                        fields.add(new Pair<Integer, Integer>(inputNum, inputColumn));
+                    }
+                }
+        
+                if(fields.size() == 0) {
+                    requiredFields.add(new RequiredFields(false, true));
+                } else {                
+                    requiredFields.add(new RequiredFields(new ArrayList<Pair<Integer, Integer>>(fields)));
+                }
+            }
+        }
+        
+        return (requiredFields.size() == 0? null: requiredFields);
+    }
+
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOFilter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOFilter.java?rev=777708&r1=777707&r2=777708&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOFilter.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOFilter.java Fri May 22 20:50:32 2009
@@ -17,14 +17,18 @@
  */
 package org.apache.pig.impl.logicalLayer;
 
+import java.util.HashSet;
 import java.util.List;
 import java.util.ArrayList;
+import java.util.Set;
 
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.PlanVisitor;
 import org.apache.pig.impl.plan.ProjectionMap;
+import org.apache.pig.impl.plan.RequiredFields;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.Pair;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.logicalLayer.optimizer.SchemaRemover;
 import org.apache.commons.logging.Log;
@@ -165,4 +169,38 @@
         }
     }
 
+    @Override
+    public List<RequiredFields> getRequiredFields() {
+        List<RequiredFields> requiredFields = new ArrayList<RequiredFields>();
+        Set<Pair<Integer, Integer>> fields = new HashSet<Pair<Integer, Integer>>();
+        TopLevelProjectFinder projectFinder = new TopLevelProjectFinder(
+                mComparisonPlan);
+        try {
+            projectFinder.visit();
+        } catch (VisitorException ve) {
+            requiredFields.clear();
+            requiredFields.add(null);
+            return requiredFields;
+        }
+        Set<LOProject> projectStarSet = projectFinder.getProjectStarSet();
+
+        if (projectStarSet != null) {
+            requiredFields.add(new RequiredFields(true));
+            return requiredFields;
+        } else {
+            for (LOProject project : projectFinder.getProjectSet()) {
+                for (int inputColumn : project.getProjection()) {
+                    fields.add(new Pair<Integer, Integer>(0,
+                            inputColumn));
+                }
+            }
+            if(fields.size() == 0) {
+                requiredFields.add(new RequiredFields(false, true));
+            } else {                
+                requiredFields.add(new RequiredFields(new ArrayList<Pair<Integer, Integer>>(fields)));
+            }
+            return (requiredFields.size() == 0? null: requiredFields);
+        }
+    }
+    
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOForEach.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOForEach.java?rev=777708&r1=777707&r2=777708&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOForEach.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOForEach.java Fri May 22 20:50:32 2009
@@ -18,6 +18,7 @@
 package org.apache.pig.impl.logicalLayer;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -30,13 +31,11 @@
 import org.apache.pig.impl.logicalLayer.schema.SchemaMergeException;
 import org.apache.pig.impl.logicalLayer.optimizer.SchemaRemover;
 import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.impl.plan.PlanVisitor;
 import org.apache.pig.impl.plan.ProjectionMap;
+import org.apache.pig.impl.plan.RequiredFields;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.MultiMap;
 import org.apache.pig.impl.util.Pair;
-import org.apache.pig.impl.logicalLayer.parser.QueryParser ;
-import org.apache.pig.impl.logicalLayer.parser.ParseException;
 import org.apache.pig.data.DataType;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -644,4 +643,46 @@
         return new ProjectionMap(mapFields, removedFields, addedFields);
     }
 
+    @Override
+    public List<RequiredFields> getRequiredFields() {
+        List<RequiredFields> requiredFields = new ArrayList<RequiredFields>();
+        Set<Pair<Integer, Integer>> fields = new HashSet<Pair<Integer, Integer>>();
+        Set<LOProject> projectSet = new HashSet<LOProject>();
+        boolean starRequired = false;
+
+        for (LogicalPlan plan : getForEachPlans()) {
+            TopLevelProjectFinder projectFinder = new TopLevelProjectFinder(
+                    plan);
+            try {
+                projectFinder.visit();
+            } catch (VisitorException ve) {
+                requiredFields.clear();
+                requiredFields.add(null);
+                return requiredFields;
+            }
+            projectSet.addAll(projectFinder.getProjectSet());
+            if(projectFinder.getProjectStarSet() != null) {
+                starRequired = true;
+            }
+        }
+
+        if(starRequired) {
+            requiredFields.add(new RequiredFields(true));
+            return requiredFields;
+        } else {
+            for (LOProject project : projectSet) {
+                for (int inputColumn : project.getProjection()) {
+                    fields.add(new Pair<Integer, Integer>(0, inputColumn));
+                }
+            }
+    
+            if(fields.size() == 0) {
+                requiredFields.add(new RequiredFields(false, true));
+            } else {                
+                requiredFields.add(new RequiredFields(new ArrayList<Pair<Integer, Integer>>(fields)));
+            }
+            return (requiredFields.size() == 0? null: requiredFields);
+        }
+    }
+
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLimit.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLimit.java?rev=777708&r1=777707&r2=777708&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLimit.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLimit.java Fri May 22 20:50:32 2009
@@ -17,11 +17,16 @@
  */
 package org.apache.pig.impl.logicalLayer;
 
+import java.util.ArrayList;
+import java.util.List;
+
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.logicalLayer.parser.QueryParser;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
 import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.ProjectionMap;
+import org.apache.pig.impl.plan.RequiredFields;
 import org.apache.pig.impl.plan.VisitorException;
 
 public class LOLimit extends LogicalOperator {
@@ -109,4 +114,51 @@
         LOLimit limitClone = (LOLimit)super.clone();
         return limitClone;
     }
+    
+    @Override
+    public ProjectionMap getProjectionMap() {
+        Schema outputSchema;
+        try {
+            outputSchema = getSchema();
+        } catch (FrontendException fee) {
+            return null;
+        }
+        
+        if(outputSchema == null) {
+            return null;
+        }
+        
+        Schema inputSchema = null;        
+        
+        List<LogicalOperator> predecessors = (ArrayList<LogicalOperator>)mPlan.getPredecessors(this);
+        if(predecessors != null) {
+            try {
+                inputSchema = predecessors.get(0).getSchema();
+            } catch (FrontendException fee) {
+                return null;
+            }
+        } else {
+            return null;
+        }
+        
+        if(inputSchema == null) {
+            return null;
+        }
+        
+        if(Schema.equals(inputSchema, outputSchema, false, true)) {
+            //there is a one is to one mapping between input and output schemas
+            return new ProjectionMap(false);
+        } else {
+            //problem - input and output schemas for a distinct have to match!
+            return null;
+        }
+    }
+    
+    @Override
+    public List<RequiredFields> getRequiredFields() {
+        List<RequiredFields> requiredFields = new ArrayList<RequiredFields>();
+        requiredFields.add(new RequiredFields(false, true));
+        return requiredFields;
+    }
+
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLoad.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLoad.java?rev=777708&r1=777707&r2=777708&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLoad.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLoad.java Fri May 22 20:50:32 2009
@@ -21,9 +21,10 @@
 import java.net.URL;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-
+import java.util.Set;
 import org.apache.pig.ExecType;
 import org.apache.pig.LoadFunc;
 import org.apache.pig.PigException;
@@ -33,6 +34,7 @@
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.ProjectionMap;
+import org.apache.pig.impl.plan.RequiredFields;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.MultiMap;
 import org.apache.pig.impl.util.Pair;
@@ -273,4 +275,11 @@
         }
     }
 
+    @Override
+    public List<RequiredFields> getRequiredFields() {
+        List<RequiredFields> requiredFields = new ArrayList<RequiredFields>();
+        requiredFields.add(new RequiredFields(false, true));
+        return requiredFields;
+    }
+
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOSort.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOSort.java?rev=777708&r1=777707&r2=777708&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOSort.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOSort.java Fri May 22 20:50:32 2009
@@ -19,17 +19,21 @@
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Iterator;
+import java.util.Set;
 
 import org.apache.pig.FuncSpec;
 import org.apache.pig.PigException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.ProjectionMap;
+import org.apache.pig.impl.plan.RequiredFields;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.plan.PlanVisitor;
+import org.apache.pig.impl.util.Pair;
 import org.apache.pig.data.DataType;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -242,5 +246,47 @@
             return null;
         }
     }
+    
+    @Override
+    public List<RequiredFields> getRequiredFields() {
+        List<RequiredFields> requiredFields = new ArrayList<RequiredFields>();
+        Set<Pair<Integer, Integer>> fields = new HashSet<Pair<Integer, Integer>>();
+        Set<LOProject> projectSet = new HashSet<LOProject>();
+        boolean orderByStar = false;
+
+        for (LogicalPlan plan : getSortColPlans()) {
+            TopLevelProjectFinder projectFinder = new TopLevelProjectFinder(
+                    plan);
+            try {
+                projectFinder.visit();
+            } catch (VisitorException ve) {
+                requiredFields.clear();
+                requiredFields.add(null);
+                return requiredFields;
+            }
+            projectSet.addAll(projectFinder.getProjectSet());
+            if(projectFinder.getProjectStarSet() != null) {
+                orderByStar = true;
+            }
+        }
+
+        if(orderByStar) {
+            requiredFields.add(new RequiredFields(true));
+            return requiredFields;
+        } else {
+            for (LOProject project : projectSet) {
+                for (int inputColumn : project.getProjection()) {
+                    fields.add(new Pair<Integer, Integer>(0, inputColumn));
+                }
+            }
+    
+            if(fields.size() == 0) {
+                requiredFields.add(new RequiredFields(false, true));
+            } else {                
+                requiredFields.add(new RequiredFields(new ArrayList<Pair<Integer, Integer>>(fields)));
+            }
+            return (requiredFields.size() == 0? null: requiredFields);
+        }
+    }
 
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOSplit.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOSplit.java?rev=777708&r1=777707&r2=777708&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOSplit.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOSplit.java Fri May 22 20:50:32 2009
@@ -27,6 +27,8 @@
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.PlanVisitor;
+import org.apache.pig.impl.plan.ProjectionMap;
+import org.apache.pig.impl.plan.RequiredFields;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.data.DataType;
 import org.apache.commons.logging.Log;
@@ -121,4 +123,50 @@
         LOSplit splitClone = (LOSplit)super.clone();
         return splitClone;
     }
+    
+    @Override
+    public ProjectionMap getProjectionMap() {
+        Schema outputSchema;
+        try {
+            outputSchema = getSchema();
+        } catch (FrontendException fee) {
+            return null;
+        }
+        
+        if(outputSchema == null) {
+            return null;
+        }
+        
+        Schema inputSchema = null;        
+        
+        List<LogicalOperator> predecessors = (ArrayList<LogicalOperator>)mPlan.getPredecessors(this);
+        if(predecessors != null) {
+            try {
+                inputSchema = predecessors.get(0).getSchema();
+            } catch (FrontendException fee) {
+                return null;
+            }
+        } else {
+            return null;
+        }
+        
+        if(inputSchema == null) {
+            return null;
+        }
+        
+        if(Schema.equals(inputSchema, outputSchema, false, true)) {
+            //there is a one is to one mapping between input and output schemas
+            return new ProjectionMap(false);
+        } else {
+            //problem - input and output schemas for a distinct have to match!
+            return null;
+        }
+    }
+    
+    @Override
+    public List<RequiredFields> getRequiredFields() {
+        List<RequiredFields> requiredFields = new ArrayList<RequiredFields>();
+        requiredFields.add(new RequiredFields(false, true));
+        return requiredFields;
+    }
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOSplitOutput.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOSplitOutput.java?rev=777708&r1=777707&r2=777708&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOSplitOutput.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOSplitOutput.java Fri May 22 20:50:32 2009
@@ -19,15 +19,21 @@
 package org.apache.pig.impl.logicalLayer;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import org.apache.pig.PigException;
 import org.apache.pig.impl.logicalLayer.LogicalOperator;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.ProjectionMap;
+import org.apache.pig.impl.plan.RequiredFields;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.Pair;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.logicalLayer.optimizer.SchemaRemover;
 
@@ -125,4 +131,77 @@
         return splitOutputClone;
     }
 
+    @Override
+    public ProjectionMap getProjectionMap() {
+        Schema outputSchema;
+        try {
+            outputSchema = getSchema();
+        } catch (FrontendException fee) {
+            return null;
+        }
+        
+        if(outputSchema == null) {
+            return null;
+        }
+        
+        Schema inputSchema = null;        
+        
+        List<LogicalOperator> predecessors = (ArrayList<LogicalOperator>)mPlan.getPredecessors(this);
+        if(predecessors != null) {
+            try {
+                inputSchema = predecessors.get(0).getSchema();
+            } catch (FrontendException fee) {
+                return null;
+            }
+        } else {
+            return null;
+        }
+        
+        if(inputSchema == null) {
+            return null;
+        }
+        
+        if(Schema.equals(inputSchema, outputSchema, false, true)) {
+            //there is a one is to one mapping between input and output schemas
+            return new ProjectionMap(false);
+        } else {
+            //problem - input and output schemas for a split output have to match!
+            return null;
+        }
+    }
+
+    @Override
+    public List<RequiredFields> getRequiredFields() {
+        List<RequiredFields> requiredFields = new ArrayList<RequiredFields>();
+        Set<Pair<Integer, Integer>> fields = new HashSet<Pair<Integer, Integer>>();
+        TopLevelProjectFinder projectFinder = new TopLevelProjectFinder(
+                mCondPlan);
+        try {
+            projectFinder.visit();
+        } catch (VisitorException ve) {
+            requiredFields.clear();
+            requiredFields.add(null);
+            return requiredFields;
+        }
+        Set<LOProject> projectStarSet = projectFinder.getProjectStarSet();
+
+        if (projectStarSet != null) {
+            requiredFields.add(new RequiredFields(true));
+            return requiredFields;
+        } else {
+            for (LOProject project : projectFinder.getProjectSet()) {
+                for (int inputColumn : project.getProjection()) {
+                    fields.add(new Pair<Integer, Integer>(0,
+                            inputColumn));
+                }
+            }
+            if(fields.size() == 0) {
+                requiredFields.add(new RequiredFields(false, true));
+            } else {                
+                requiredFields.add(new RequiredFields(new ArrayList<Pair<Integer, Integer>>(fields)));
+            }
+            return (requiredFields.size() == 0? null: requiredFields);
+        }
+    }
+
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStore.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStore.java?rev=777708&r1=777707&r2=777708&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStore.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStore.java Fri May 22 20:50:32 2009
@@ -29,6 +29,7 @@
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.ProjectionMap;
+import org.apache.pig.impl.plan.RequiredFields;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.plan.PlanVisitor;
@@ -166,4 +167,11 @@
         }
     }
 
+    @Override
+    public List<RequiredFields> getRequiredFields() {
+        List<RequiredFields> requiredFields = new ArrayList<RequiredFields>();
+        requiredFields.add(new RequiredFields(false, true));
+        return requiredFields;
+    }
+
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStream.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStream.java?rev=777708&r1=777707&r2=777708&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStream.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOStream.java Fri May 22 20:50:32 2009
@@ -20,13 +20,21 @@
  */
 package org.apache.pig.impl.logicalLayer;
 
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.ProjectionMap;
+import org.apache.pig.impl.plan.RequiredFields;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.streaming.ExecutableManager;
 import org.apache.pig.impl.streaming.StreamingCommand;
 import org.apache.pig.impl.streaming.StreamingCommand.Handle;
 import org.apache.pig.impl.streaming.StreamingCommand.HandleSpec;
+import org.apache.pig.impl.util.MultiMap;
+import org.apache.pig.impl.util.Pair;
 
 /**
  * {@link LOStream} represents the specification of an external
@@ -169,4 +177,55 @@
         return executableManager;
     }
 
+    @Override
+    public ProjectionMap getProjectionMap() {
+        Schema outputSchema;
+        
+        try {
+            outputSchema = getSchema();
+        } catch (FrontendException fee) {
+            return null;
+        }
+        
+        if(outputSchema == null) {
+            return null;
+        }
+        
+        Schema inputSchema = null;        
+        
+        List<LogicalOperator> predecessors = (ArrayList<LogicalOperator>)mPlan.getPredecessors(this);
+        if(predecessors != null) {
+            try {
+                inputSchema = predecessors.get(0).getSchema();
+            } catch (FrontendException fee) {
+                return null;
+            }
+        } else {
+                return null;
+        }
+        
+        List<Integer> addedFields = new ArrayList<Integer>();
+        List<Pair<Integer, Integer>> removedFields = new ArrayList<Pair<Integer, Integer>>();
+        
+        for(int i = 0; i < outputSchema.size(); ++i) {
+            //add all the elements of the output schema to the added fields
+            addedFields.add(i);
+        }
+        
+        if(inputSchema != null) {
+            //add all the elements of the input schema to the removed fields
+            for(int i = 0; i < inputSchema.size(); ++i) {
+                removedFields.add(new Pair<Integer, Integer>(0, i));
+            }
+        }
+        return new ProjectionMap(null, (removedFields.size() == 0? null: removedFields), addedFields);
+    }
+
+    @Override
+    public List<RequiredFields> getRequiredFields() {
+        List<RequiredFields> requiredFields = new ArrayList<RequiredFields>();
+        requiredFields.add(new RequiredFields(true, false));
+        return requiredFields;
+    }
+
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOUnion.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOUnion.java?rev=777708&r1=777707&r2=777708&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOUnion.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOUnion.java Fri May 22 20:50:32 2009
@@ -29,6 +29,7 @@
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.PlanVisitor;
 import org.apache.pig.impl.plan.ProjectionMap;
+import org.apache.pig.impl.plan.RequiredFields;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.MultiMap;
 import org.apache.pig.impl.util.Pair;
@@ -178,4 +179,21 @@
         return new ProjectionMap(mapFields, null, null);
     }
 
+    @Override
+    public List<RequiredFields> getRequiredFields() {
+        List<LogicalOperator> predecessors = mPlan.getPredecessors(this);
+        
+        if(predecessors == null) {
+            return null;
+        }
+
+        List<RequiredFields> requiredFields = new ArrayList<RequiredFields>();
+        
+        for(int inputNum = 0; inputNum < predecessors.size(); ++inputNum) {
+            requiredFields.add(new RequiredFields(true));
+        }
+        
+        return (requiredFields.size() == 0? null: requiredFields);
+    }
+
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java?rev=777708&r1=777707&r2=777708&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java Fri May 22 20:50:32 2009
@@ -29,6 +29,7 @@
 import org.apache.pig.impl.plan.Operator;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.ProjectionMap;
+import org.apache.pig.impl.plan.RequiredFields;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.Pair;
 import org.apache.commons.logging.Log;
@@ -292,15 +293,16 @@
     };
 
     /**
-     * Get a list of fields that this operator requires.  This is not necessarily
-     * equivalent to the list of fields the operator projects.  For example,
-     * a filter will project anything passed to it, but requires only the fields
-     * explicitly referenced in its filter expression.
-     * @return list of fields, numbered from 0.
-     */
-    public List<Pair<Integer, Integer>> getRequiredFields()
-    {
-        return null;
-    }
+	 * Get a list of fields that this operator requires. This is not necessarily
+	 * equivalent to the list of fields the operator projects. For example, a
+	 * filter will project anything passed to it, but requires only the fields
+	 * explicitly referenced in its filter expression.
+	 * 
+	 * @return list of RequiredFields null indicates that the operator does not need any
+	 *         fields from its input.
+	 */
+	public List<RequiredFields> getRequiredFields() {
+		return null;
+	}
 
 }

Added: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/TopLevelProjectFinder.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/TopLevelProjectFinder.java?rev=777708&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/TopLevelProjectFinder.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/TopLevelProjectFinder.java Fri May 22 20:50:32 2009
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.logicalLayer;
+
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.ArrayList;
+
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * A visitor to track the top-level projection operators in a plan.
+ * If there is a $1.$0 then only $1 is tracked 
+ */
+public class TopLevelProjectFinder extends
+        LOVisitor {
+	
+	List<LOProject> mProjectList = new ArrayList<LOProject>();
+
+    public TopLevelProjectFinder(LogicalPlan plan) {
+        super(plan, new DepthFirstWalker<LogicalOperator, LogicalPlan>(plan));
+    }
+
+
+    
+    /* (non-Javadoc)
+     * @see org.apache.pig.impl.logicalLayer.LOVisitor#visit(org.apache.pig.impl.logicalLayer.LODistinct)
+     */
+    @Override
+    protected void visit(LODistinct dt) throws VisitorException {
+    }
+
+
+
+    /* (non-Javadoc)
+     * @see org.apache.pig.impl.logicalLayer.LOVisitor#visit(org.apache.pig.impl.logicalLayer.LOFilter)
+     */
+    @Override
+    protected void visit(LOFilter filter) throws VisitorException {
+    }
+
+
+
+    /* (non-Javadoc)
+     * @see org.apache.pig.impl.logicalLayer.LOVisitor#visit(org.apache.pig.impl.logicalLayer.LOForEach)
+     */
+    @Override
+    protected void visit(LOForEach forEach) throws VisitorException {
+    }
+
+
+
+    /* (non-Javadoc)
+     * @see org.apache.pig.impl.logicalLayer.LOVisitor#visit(org.apache.pig.impl.logicalLayer.LOSort)
+     */
+    @Override
+    protected void visit(LOSort s) throws VisitorException {
+    }
+
+
+
+    /* (non-Javadoc)
+     * @see org.apache.pig.impl.logicalLayer.LOVisitor#visit(org.apache.pig.impl.logicalLayer.LOProject)
+     */
+    @Override
+    protected void visit(LOProject project) throws VisitorException {
+        //If the project is a root then add it to the list
+    	List<LogicalOperator> projectPreds = this.getPlan().getPredecessors(project);
+    	if(projectPreds == null) {
+        	//check if the project's predecessor is null then add it to the list
+    		mProjectList.add(project);
+    	} /*else if (!(projectPreds.get(0) instanceof LOProject)) {
+    		//check if the project's predecessor is not a project then add it to the list
+    		mProjectList.add(project);
+    	}*/
+    }
+    
+    public List<LOProject> getProjectList() {
+    	return mProjectList;
+    }
+    
+    public Set<LOProject> getProjectSet() {
+    	return new HashSet<LOProject>(mProjectList);
+    }
+    
+    public Set<LOProject> getProjectStarSet() {
+    	Set<LOProject> projectStarSet = new HashSet<LOProject>();
+    	
+    	for(LOProject project: getProjectSet()) {
+    		if(project.isStar() && (this.getPlan().getPredecessors(project) == null)) {
+    			projectStarSet.add(project);
+    		}
+    	}
+    	
+    	return (projectStarSet.size() == 0? null : projectStarSet);
+    }
+}

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/plan/ProjectionMap.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/plan/ProjectionMap.java?rev=777708&r1=777707&r2=777708&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/plan/ProjectionMap.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/plan/ProjectionMap.java Fri May 22 20:50:32 2009
@@ -18,13 +18,9 @@
 
 package org.apache.pig.impl.plan;
 
-import java.io.Serializable;
 import java.lang.StringBuilder;
-import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 
-import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.util.MultiMap;
 import org.apache.pig.impl.util.Pair;
 
@@ -107,7 +103,7 @@
      * 
      * @return the mapping of input column to output column
      */
-    public MultiMap<Integer, Pair<Integer, Integer>> getMappedFileds() {
+    public MultiMap<Integer, Pair<Integer, Integer>> getMappedFields() {
         return mMappedFields;
     }
 
@@ -116,7 +112,7 @@
      * @param fields
      *            the mapping of input column to output column
      */
-    public void setMappedFileds(MultiMap<Integer, Pair<Integer, Integer>> fields) {
+    public void setMappedFields(MultiMap<Integer, Pair<Integer, Integer>> fields) {
         mMappedFields = fields;
     }
 
@@ -124,7 +120,7 @@
      * 
      * @return the list of input columns that are removed
      */
-    public List<Pair<Integer, Integer>> getRemovedFileds() {
+    public List<Pair<Integer, Integer>> getRemovedFields() {
         return mRemovedFields;
     }
 
@@ -133,7 +129,7 @@
      * @param fields
      *            the list of input columns that are removed
      */
-    public void setRemovedFileds(List<Pair<Integer, Integer>> fields) {
+    public void setRemovedFields(List<Pair<Integer, Integer>> fields) {
         mRemovedFields = fields;
     }
 
@@ -141,7 +137,7 @@
      * 
      * @return the list of columns that are added to the output
      */
-    public List<Integer> getAddedFileds() {
+    public List<Integer> getAddedFields() {
         return mAddedFields;
     }
 
@@ -150,7 +146,7 @@
      * @param fields
      *            the list of columns that are added to the output
      */
-    public void setAddedFileds(List<Integer> fields) {
+    public void setAddedFields(List<Integer> fields) {
         mAddedFields = fields;
     }
 

Added: hadoop/pig/trunk/src/org/apache/pig/impl/plan/RequiredFields.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/plan/RequiredFields.java?rev=777708&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/plan/RequiredFields.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/plan/RequiredFields.java Fri May 22 20:50:32 2009
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.impl.plan;
+
+import java.lang.StringBuilder;
+import java.util.List;
+
+import org.apache.pig.impl.util.Pair;
+
+/**
+ * A struct detailing how a projection is altered by an operator.
+ */
+public class RequiredFields {
+    /**
+     * Quick way for an operator to note that all columns from an input are required.
+     */
+    private boolean mNeedAllFields = false;
+    
+    /**
+     * Quick way for an operator to note that no columns from an input are required.
+     */
+    private boolean mNeedNoFields = false;
+
+
+    /**
+     * List of fields required from the input. This includes fields that are
+     * transformed, and thus are no longer the same fields. Using the example 'B
+     * = foreach A generate $0, $2, $3, udf($1)' would produce the list (0, 0),
+     * (0, 2), (0, 3), (0, 1). Note that the order is not guaranteed.
+     */
+    private List<Pair<Integer, Integer>> mFields;
+
+    /**
+     * 
+     * @param needAllFields
+     *            to indicate if this required fields needs all the fields from
+     *            its input
+     */
+    public RequiredFields(boolean needAllFields) {
+        this(null, needAllFields, false);
+    }
+    
+    /**
+     * 
+     * @param needAllFields
+     *            to indicate if this required fields needs no fields from
+     *            its input
+     */
+    public RequiredFields(boolean needAllFields, boolean needNoFields) {
+        this(null, needAllFields, needNoFields);
+    }
+
+    /**
+     * 
+     * @param fields
+     *            the list of input columns that are required
+     */
+    public RequiredFields(List<Pair<Integer, Integer>> fields) {
+        this(fields, false, false);
+    }
+
+    /**
+     * 
+     * @param fields
+     *            the list of input columns that are required
+     * @param needAllFields
+     *            to indicate if this required fields needs all the fields from
+     *            its input; cannot be true if needNoFields is true
+     * @param needNoFields
+     *            to indicate if this required fields needs no fields from
+     *            its input; cannot be true if needAllFields is true
+     */
+    private RequiredFields(List<Pair<Integer, Integer>> removedFields,
+            boolean needAllFields,
+            boolean needNoFields) {
+        mFields = removedFields;
+        if(needAllFields && needNoFields) {
+            //both cannot be true
+            //set both of them to false
+            mNeedAllFields = false;
+            mNeedNoFields = false;
+        } else {
+            mNeedAllFields = needAllFields;
+            mNeedNoFields = needNoFields;
+        }
+    }
+
+    /**
+     * 
+     * @return the list of input columns that are required
+     */
+    public List<Pair<Integer, Integer>> getFields() {
+        return mFields;
+    }
+
+    /**
+     * 
+     * @param fields
+     *            the list of input columns that are required
+     */
+    public void setFields(List<Pair<Integer, Integer>> fields) {
+        mFields = fields;
+    }
+
+    /**
+     * 
+     * @return if this required fields needs all the fields from its input(s)
+     */
+    public boolean needAllFields() {
+        return getNeedAllFields();
+    }
+
+    /**
+     * 
+     * @return if this required fields needs all the fields from its input(s)
+     */
+    public boolean getNeedAllFields() {
+        return mNeedAllFields;
+    }
+
+    /**
+     * 
+     * @param needAllFields
+     *            to indicate if this required fields needs all the fields from
+     *            its input; cannot be true if needNoFields() is true
+     */
+    public void setNeedAllFields(boolean needAllFields) {
+        if(needAllFields && needNoFields()) return;
+        mNeedAllFields = needAllFields;
+    }
+
+    /**
+     * 
+     * @return if this required fields needs no fields from its input(s)
+     */
+    public boolean needNoFields() {
+        return getNeedNoFields();
+    }
+
+    /**
+     * 
+     * @return if this required fields needs no fields from its input(s)
+     */
+    public boolean getNeedNoFields() {
+        return mNeedNoFields;
+    }
+
+    /**
+     * 
+     * @param needNoFields
+     *            to indicate if this required fields needs no fields from
+     *            its input; cannot be true if needAllFields() is true
+     */
+    public void setNeedNoFields(boolean needNoFields) {
+        if(needNoFields && needAllFields()) return;
+        mNeedNoFields = needNoFields;
+    }
+    
+
+
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("needAllFields: " + mNeedAllFields);
+        sb.append(" needNoFields: " + mNeedNoFields);
+        sb.append(" fields: " + mFields);
+        return sb.toString();
+    }
+}
\ No newline at end of file

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/util/Pair.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/util/Pair.java?rev=777708&r1=777707&r2=777708&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/util/Pair.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/util/Pair.java Fri May 22 20:50:32 2009
@@ -42,4 +42,45 @@
     public String toString() {
         return "[" + first.toString() +"," + second.toString() + "]";
     }
+
+    @Override
+    public int hashCode() {
+        return (((this.first == null ? 1 : this.first.hashCode()) * 17)
+                + (this.second == null ? 1 : this.second.hashCode()) * 19);
+    }
+    
+    @Override
+    public boolean equals(Object other) {
+        if(other == null) {
+            return false;
+        }
+        
+        if(! (other instanceof Pair)) {
+            return false;
+        }
+        
+        Pair otherPair = (Pair) other;
+        
+        if(this.first == null) {
+            if(otherPair.first != null) {
+                return false;
+            } else {
+                return true;
+            }
+        }
+        
+        if(this.second == null) {
+            if(otherPair.second != null) {
+                return false;
+            } else {
+                return true;
+            }
+        }
+        
+        if(this.first.equals(otherPair.first) && this.second.equals(otherPair.second)) {
+            return true;
+        } else {
+            return false;
+        }
+    }
 }