You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2010/01/26 01:02:25 UTC

svn commit: r903018 [3/5] - in /hadoop/pig/branches/load-store-redesign: ./ src/docs/src/documentation/content/xdocs/ src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pi...

Modified: hadoop/pig/branches/load-store-redesign/src/docs/src/documentation/content/xdocs/setup.xml
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/docs/src/documentation/content/xdocs/setup.xml?rev=903018&r1=903017&r2=903018&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/docs/src/documentation/content/xdocs/setup.xml (original)
+++ hadoop/pig/branches/load-store-redesign/src/docs/src/documentation/content/xdocs/setup.xml Tue Jan 26 00:02:23 2010
@@ -71,7 +71,8 @@
 <section>
 <title>Grunt Shell</title>
 <p>Use Pig's interactive shell, Grunt, to enter pig commands manually. See the <a href="setup.html#Sample+Code">Sample Code</a> for instructions about the passwd file used in the example.</p>
-<p>You can also run or execute script files from the Grunt shell. See the RUN and EXEC commands in the <a href="piglatin_reference.html">Pig Latin Reference Manual</a>. </p>
+<p>You can also run or execute script files from the Grunt shell. 
+See the <a href="piglatin_ref2.html#run">run</a> and <a href="piglatin_ref2.html#exec">exec</a> commands. </p>
 <p><strong>Local Mode</strong></p>
 <source>
 $ pig -x local

Modified: hadoop/pig/branches/load-store-redesign/src/docs/src/documentation/content/xdocs/site.xml
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/docs/src/documentation/content/xdocs/site.xml?rev=903018&r1=903017&r2=903018&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/docs/src/documentation/content/xdocs/site.xml (original)
+++ hadoop/pig/branches/load-store-redesign/src/docs/src/documentation/content/xdocs/site.xml Tue Jan 26 00:02:23 2010
@@ -45,8 +45,8 @@
     <tutorial label="Tutorial"				 	href="tutorial.html" />
     </docs>  
      <docs label="Guides"> 
-    <plusers label="Pig Latin Users "	href="piglatin_users.html" />
-    <plref label="Pig Latin Reference"	href="piglatin_reference.html" />
+    <plref1 label="Pig Latin 1"	href="piglatin_ref1.html" />
+    <plref2 label="Pig Latin 2"	href="piglatin_ref2.html" />
     <cookbook label="Cookbook" 		href="cookbook.html" />
     <udf label="UDFs" href="udf.html" />
     </docs>  

Modified: hadoop/pig/branches/load-store-redesign/src/docs/src/documentation/content/xdocs/zebra_pig.xml
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/docs/src/documentation/content/xdocs/zebra_pig.xml?rev=903018&r1=903017&r2=903018&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/docs/src/documentation/content/xdocs/zebra_pig.xml (original)
+++ hadoop/pig/branches/load-store-redesign/src/docs/src/documentation/content/xdocs/zebra_pig.xml Tue Jan 26 00:02:23 2010
@@ -29,7 +29,7 @@
    <section>
    <title>Overview</title>
    <p>With Pig you can load and store data in Zebra format. You can also take advantage of sorted Zebra tables for map-side groups and merge joins. When working with Pig keep in mind that, unlike MapReduce, you do not need to declare Zebra schemas. Zebra automatically converts Zebra schemas to Pig schemas (and vice versa) for you.</p>
-
+   
  </section>
  <!-- END OVERVIEW-->
  
@@ -54,19 +54,19 @@
  <ol>
  <li>You need to register a Zebra jar file the same way you would do it for any other UDF.</li>
  <li>You need to place the jar on your classpath.</li>
- <li>When using Zebra with Pig, Zebra data is self-described and always contains a schema. This means that the AS clause is unnecessary as long as 
-  you know what the column names and types are. To determine the column names and types, you can run the DESCRIBE statement right after the load:
+  </ol>
+  
+ <p>Zebra data is self-described meaning that the name and type information is stored with the data; you don't need to provide an AS clause or perform type casting unless you actually need to change the data. To check column names and types, you can run the DESCRIBE statement right after the load:</p>
  <source>
 A = LOAD 'studenttab' USING org.apache.hadoop.zebra.pig.TableLoader();
 DESCRIBE A;
-a: {name: chararray,age: int,gpa: float}
+A: {name: chararray,age: int,gpa: float}
 </source>
- </li>
- </ol>
    
-<p>You can provide alternative names to the columns with the AS clause. You can also provide types as long as the 
- original type can be converted to the new type. <em>In general</em>, Zebra supports Pig type compatibilities 
- (see <a href="piglatin_reference.html#Arithmetic+Operators+and+More">Arithmetic Operators and More</a>).</p>
+<p>You can provide alternative names to the columns with the AS clause. You can also provide alternative types as long as the 
+ original type can be converted to the new type. (One exception to this rule are maps since you can't specify schema for a map. Zebra always creates map values as bytearrays which would require casting to real type in the script. Note that this is not different for treating maps in Pig for any other storage.) For more information see <a href="piglatin_ref2.html#Schemas">Schemas</a> and
+<a href="piglatin_ref2.html#Arithmetic+Operators+and+More">Arithmetic Operators and More</a>.
+ </p>
  
 <p>You can provide multiple, comma-separated files to the loader:</p>
 <source>
@@ -186,7 +186,8 @@
    <section>
     <title>HDFS File Globs</title>
         <p>Pig supports HDFS file globs 
-    (for more information about globs, see <a href="http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/fs/FileSystem.html">FileSystem</a> and GlobStatus).</p>
+    (for more information 
+    see <a href="http://hadoop.apache.org/common/docs/current/api/org/apache/hadoop/fs/FileSystem.html#globStatus(org.apache.hadoop.fs.Path)">GlobStatus</a>).</p>
     <p>In this example, all Zebra tables in the directory of /path/to/PIG/tables will be loaded as a union (table union). </p>
  <source>
  A = LOAD ‘/path/to/PIG/tables/*’ USING org.apache.hadoop.zebra.pig.TableLoader(‘’);

Modified: hadoop/pig/branches/load-store-redesign/src/docs/src/documentation/content/xdocs/zebra_users.xml
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/docs/src/documentation/content/xdocs/zebra_users.xml?rev=903018&r1=903017&r2=903018&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/docs/src/documentation/content/xdocs/zebra_users.xml (original)
+++ hadoop/pig/branches/load-store-redesign/src/docs/src/documentation/content/xdocs/zebra_users.xml Tue Jan 26 00:02:23 2010
@@ -155,7 +155,7 @@
 <section>
 <title>MapReduce Jobs</title>
 <p>
-TableInputFormat has static method, requireSortedTable, that allows the caller to specify the behavior of a single sorted table or an order-preserving sorted table union as described above. The method ensures all tables in a union are sorted. For more information, see <a href="zebra_reference.html#TableInputFormat">TableInputFormat</a>.
+TableInputFormat has static method, requireSortedTable, that allows the caller to specify the behavior of a single sorted table or an order-preserving sorted table union as described above. The method ensures all tables in a union are sorted. For more information, see <a href="zebra_mapreduce.html#TableInputFormat">TableInputFormat</a>.
 </p>
 
 <p>One simple example: A order-preserving sorted union B. A and B are sorted tables. </p>

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=903018&r1=903017&r2=903018&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Tue Jan 26 00:02:23 2010
@@ -350,7 +350,14 @@
                 hadoopProperties.put(key, val);
             }
             
+            //clear user defined properties and re-populate
+            properties.clear();
+            Enumeration<Object> hodPropertiesIter = hadoopProperties.keys();
+            while (hodPropertiesIter.hasMoreElements()) {
+                String key = (String) hodPropertiesIter.nextElement();
+                String val = hadoopProperties.getProperty(key);
+                properties.put(key, val);
+            }
         }
-    }
-    
+    }    
 }

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizer.java?rev=903018&r1=903017&r2=903018&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizer.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizer.java Tue Jan 26 00:02:23 2010
@@ -29,7 +29,6 @@
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.PORelationToExprProject;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
@@ -43,7 +42,7 @@
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.io.PigNullableWritable;
-import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.plan.DepthFirstWalker;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.OperatorKey;
@@ -245,17 +244,22 @@
         for (PhysicalPlan innerPlan : foreach.getInputPlans()) {
             // visit inner plans to figure out the sort order for distinct /
             // sort
-            SecondaryKeyDiscoverVisitor innerPlanVisitor = new SecondaryKeyDiscoverVisitor(
+            SecondaryKeyDiscover innerPlanDiscover = new SecondaryKeyDiscover(
                     innerPlan, sortKeyInfos, secondarySortKeyInfo);
-            innerPlanVisitor.visit();
-            secondarySortKeyInfo = innerPlanVisitor.getSecondarySortKeyInfo();
-            if (innerPlanVisitor.getSortsToRemove() != null) {
-                for (POSort sort : innerPlanVisitor.getSortsToRemove()) {
+            try {
+                innerPlanDiscover.process();
+            } catch (FrontendException e) {
+                int errorCode = 2213;
+                throw new VisitorException("Error visiting inner plan for ForEach", errorCode, e);
+            }
+            secondarySortKeyInfo = innerPlanDiscover.getSecondarySortKeyInfo();
+            if (innerPlanDiscover.getSortsToRemove() != null) {
+                for (POSort sort : innerPlanDiscover.getSortsToRemove()) {
                     sortsToRemove.add(new POToChange(sort, innerPlan, foreach));
                 }
             }
-            if (innerPlanVisitor.getDistinctsToChange() != null) {
-                for (PODistinct distinct : innerPlanVisitor
+            if (innerPlanDiscover.getDistinctsToChange() != null) {
+                for (PODistinct distinct : innerPlanDiscover
                         .getDistinctsToChange()) {
                     distinctsToChange.add(new POToChange(distinct, innerPlan,
                             foreach));
@@ -289,10 +293,14 @@
                 String scope = oldSort.getOperatorKey().scope;
                 List<PhysicalOperator> preds = sortToRemove.plan
                         .getPredecessors(sortToRemove.oper);
+                List<PhysicalOperator> succs = sortToRemove.plan
+                .getSuccessors(sortToRemove.oper);
                 POProject project = null;
-                if (preds == null
+                if ((preds == null
                         || preds.get(0).getResultType() != DataType.BAG
-                        && oldSort.getResultType() == DataType.BAG) {
+                        && oldSort.getResultType() == DataType.BAG) // sort to remove do change the result type
+                        && (succs == null || !(succs.get(0) instanceof PORelationToExprProject))) // successor is not PORelationToExprProject
+                {
                     project = new PORelationToExprProject(new OperatorKey(
                             scope, NodeIdGenerator.getGenerator()
                                     .getNextNodeId(scope)), oldSort
@@ -351,7 +359,7 @@
                 }
                 if (!found)
                 {
-                    int errorCode = 2209;
+                    int errorCode = 2214;
                     new VisitorException("Cannot find POLocalRearrange to set secondary plan", errorCode);
                 }
             }
@@ -429,7 +437,9 @@
     // we cannot do any secondary key optimization because we only have 1
     // secondary
     // sort key.
-    private static class SecondaryKeyDiscoverVisitor extends PhyPlanVisitor {
+    private static class SecondaryKeyDiscover {
+        PhysicalPlan mPlan;
+        
         List<POSort> sortsToRemove = new ArrayList<POSort>();
 
         List<PODistinct> distinctsToChange = new ArrayList<PODistinct>();
@@ -438,25 +448,60 @@
 
         SortKeyInfo secondarySortKeyInfo;
 
-        ColumnChainInfo columnChainInfo = new ColumnChainInfo();
-
-        boolean sawInvalidPhysicalOper = false;
+        ColumnChainInfo columnChainInfo = null;
 
         // PhysicalPlan here is foreach inner plan
-        SecondaryKeyDiscoverVisitor(PhysicalPlan plan,
+        SecondaryKeyDiscover(PhysicalPlan plan,
                 List<SortKeyInfo> sortKeyInfos, SortKeyInfo secondarySortKeyInfo) {
-            super(plan,
-                    new DependencyOrderWalker<PhysicalOperator, PhysicalPlan>(
-                            plan));
+            this.mPlan = plan;
             this.sortKeyInfos = sortKeyInfos;
             this.secondarySortKeyInfo = secondarySortKeyInfo;
         }
+        
+        public void process() throws FrontendException
+        {
+            List<PhysicalOperator> roots = mPlan.getRoots();
+            for (PhysicalOperator root : roots) {
+                columnChainInfo = new ColumnChainInfo();
+                processRoot(root);
+            }
+        }
+        
+        public void processRoot(PhysicalOperator root) throws FrontendException {
+            PhysicalOperator currentNode = root;
+            while (currentNode!=null) {
+                boolean sawInvalidPhysicalOper = false;
+                if (currentNode instanceof PODistinct)
+                    sawInvalidPhysicalOper = processDistinct((PODistinct)currentNode);
+                else if (currentNode instanceof POSort)
+                    sawInvalidPhysicalOper = processSort((POSort)currentNode);
+                else if (currentNode instanceof POProject)
+                    sawInvalidPhysicalOper = processProject((POProject)currentNode);
+                else if (currentNode instanceof POForEach)
+                    sawInvalidPhysicalOper = processForEach((POForEach)currentNode);
+                else if (currentNode instanceof POUserFunc ||
+                         currentNode instanceof POUnion)
+                    break;
+                
+                if (sawInvalidPhysicalOper)
+                    break;
+                
+                List<PhysicalOperator> succs = mPlan.getSuccessors(currentNode);
+                if (succs==null)
+                    currentNode = null;
+                else {
+                    if (succs.size()>1) {
+                        int errorCode = 2215;
+                        throw new FrontendException("See more than 1 successors in the nested plan for "+currentNode,
+                                errorCode);
+                    }
+                    currentNode = succs.get(0);
+                }
+            }
+        }
 
         // We see PODistinct, check which key it is using
-        @Override
-        public void visitDistinct(PODistinct distinct) throws VisitorException {
-            if (sawInvalidPhysicalOper)
-                return;
+        public boolean processDistinct(PODistinct distinct) throws FrontendException {
             SortKeyInfo keyInfos = new SortKeyInfo();
             try {
                 keyInfos.insertColumnChainInfo(0,
@@ -469,7 +514,7 @@
             for (SortKeyInfo sortKeyInfo : sortKeyInfos) {
                 if (sortKeyInfo.moreSpecificThan(keyInfos)) {
                     distinctsToChange.add(distinct);
-                    return;
+                    return false;
                 }
             }
 
@@ -477,7 +522,7 @@
             if (secondarySortKeyInfo != null
                     && secondarySortKeyInfo.moreSpecificThan(keyInfos)) {
                 distinctsToChange.add(distinct);
-                return;
+                return false;
             }
 
             // Now set the secondary key
@@ -485,40 +530,22 @@
                 distinctsToChange.add(distinct);
                 secondarySortKeyInfo = keyInfos;
             }
-        }
-
-        @Override
-        public void visitLimit(POLimit limit) throws VisitorException {
+            return false;
         }
 
         // Accumulate column info
-        @Override
-        public void visitProject(POProject project) throws VisitorException {
+        public boolean processProject(POProject project) throws FrontendException {
             columnChainInfo.insertInReduce(project.isStar(), project
                     .getColumns(), project.getResultType());
-        }
-
-        @Override
-        public void visitFilter(POFilter filter) throws VisitorException {
-        }
-
-        @Override
-        public void visitUserFunc(POUserFunc userFunc) throws VisitorException {
-            sawInvalidPhysicalOper = true;
-        }
-        
-        @Override
-        public void visitUnion(POUnion union) throws VisitorException {
-            sawInvalidPhysicalOper = true;
+            return false;
         }
 
         // Accumulate column info from nested project
-        @Override
-        public void visitPOForEach(POForEach fe) throws VisitorException {
+        public boolean processForEach(POForEach fe) throws FrontendException {
             if (fe.getInputPlans().size() > 1) {
                 // I may be wrong, but for now let's assume foreach plan before
                 // sort/distinct only have one foreach plan
-                throw new VisitorException(
+                throw new FrontendException(
                         "POForEach has more than 1 input plans");
             }
             boolean r = false;
@@ -527,19 +554,15 @@
                         columnChainInfo);
             } catch (PlanException e) {
                 int errorCode = 2205;
-                throw new VisitorException("Error visiting POForEach inner plan",
+                throw new FrontendException("Error visiting POForEach inner plan",
                         errorCode, e);
             }
             // See something other than POProject in POForEach, set the flag to stop further processing
-            if (r)
-                sawInvalidPhysicalOper = true;
+            return r;
         }
 
         // We see POSort, check which key it is using
-        @Override
-        public void visitSort(POSort sort) throws VisitorException {
-            if (sawInvalidPhysicalOper)
-                return;
+        public boolean processSort(POSort sort) throws FrontendException{
             SortKeyInfo keyInfo = new SortKeyInfo();
             for (int i = 0; i < sort.getSortPlans().size(); i++) {
                 PhysicalPlan sortPlan = sort.getSortPlans().get(i);
@@ -555,13 +578,12 @@
                     r = collectColumnChain(sortPlan, sortChainInfo);
                 } catch (PlanException e) {
                     int errorCode = 2206;
-                    throw new VisitorException("Error visiting POSort inner plan",
+                    throw new FrontendException("Error visiting POSort inner plan",
                             errorCode, e);
                 }
-                if (r) // if we saw physical operator other than project in sort plan
+                if (r==true) // if we saw physical operator other than project in sort plan
                 {
-                    sawInvalidPhysicalOper = true;
-                    return;
+                    return true;
                 }
                 keyInfo.insertColumnChainInfo(i, sortChainInfo, sort
                         .getMAscCols().get(i));
@@ -570,14 +592,14 @@
             for (SortKeyInfo sortKeyInfo : sortKeyInfos) {
                 if (sortKeyInfo.moreSpecificThan(keyInfo)) {
                     sortsToRemove.add(sort);
-                    return;
+                    return false;
                 }
             }
             // if it is part of secondary key
             if (secondarySortKeyInfo != null
                     && secondarySortKeyInfo.moreSpecificThan(keyInfo)) {
                 sortsToRemove.add(sort);
-                return;
+                return false;
             }
 
             // Now set the secondary key
@@ -585,6 +607,7 @@
                 sortsToRemove.add(sort);
                 secondarySortKeyInfo = keyInfo;
             }
+            return false;
         }
 
         public List<POSort> getSortsToRemove() {

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java?rev=903018&r1=903017&r2=903018&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java Tue Jan 26 00:02:23 2010
@@ -75,7 +75,7 @@
 	private List<Boolean> mAscCols;
 	private POUserComparisonFunc mSortFunc;
 	transient private final Log log = LogFactory.getLog(getClass());
-	transient private Comparator<Tuple> mComparator;
+	private Comparator<Tuple> mComparator;
 
 	private boolean inputsAccumulated = false;
 	private long limit;

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DefaultDataBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DefaultDataBag.java?rev=903018&r1=903017&r2=903018&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DefaultDataBag.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DefaultDataBag.java Tue Jan 26 00:02:23 2010
@@ -242,6 +242,11 @@
                 } catch (EOFException eof) {
                     // Fall through to the next case where we find the
                     // next file, or go to memory
+                    try {
+                        mIn.close();
+                    }catch(IOException e) {
+                        log.warn("Failed to close spill file.", e);
+                    }
                 } catch (IOException ioe) {
                     String msg = "Unable to read our spill file."; 
                     log.fatal(msg, ioe);

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DistinctDataBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DistinctDataBag.java?rev=903018&r1=903017&r2=903018&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DistinctDataBag.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/DistinctDataBag.java Tue Jan 26 00:02:23 2010
@@ -434,6 +434,11 @@
                         // Out of tuples in this file.  Set our slot in the
                         // array to null so we don't keep trying to read from
                         // this file.
+                        try {
+                            in.close();
+                        }catch(IOException e) {
+                            log.warn("Failed to close spill file.", e);
+                        }
                         mStreams.set(fileNum, null);
                         return;
                     } catch (IOException ioe) {
@@ -522,6 +527,7 @@
                             t.write(out);
                         }
                         out.flush();
+                        out.close();
                     } catch (IOException ioe) {
                         String msg = "Unable to find our spill file.";
                         log.fatal(msg, ioe);

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/InternalDistinctBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/InternalDistinctBag.java?rev=903018&r1=903017&r2=903018&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/InternalDistinctBag.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/InternalDistinctBag.java Tue Jan 26 00:02:23 2010
@@ -409,6 +409,11 @@
                         // Out of tuples in this file.  Set our slot in the
                         // array to null so we don't keep trying to read from
                         // this file.
+                        try {
+                            in.close();
+                        }catch(IOException e) {
+                            log.warn("Failed to close spill file.", e);
+                        }
                         mStreams.set(fileNum, null);
                         return;
                     } catch (IOException ioe) {
@@ -497,6 +502,7 @@
                             t.write(out);
                         }
                         out.flush();
+                        out.close();
                     } catch (IOException ioe) {
                         String msg = "Unable to find our spill file.";
                         log.fatal(msg, ioe);

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/InternalSortedBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/InternalSortedBag.java?rev=903018&r1=903017&r2=903018&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/InternalSortedBag.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/InternalSortedBag.java Tue Jan 26 00:02:23 2010
@@ -404,6 +404,11 @@
                     // Out of tuples in this file.  Set our slot in the
                     // array to null so we don't keep trying to read from
                     // this file.
+                    try {
+                        in.close();
+                    }catch(IOException e) {
+                        log.warn("Failed to close spill file.", e);
+                    }
                     mStreams.set(fileNum, null);
                 } catch (IOException ioe) {
                     String msg = "Unable to find our spill file.";
@@ -488,6 +493,7 @@
                             t.write(out);
                         }
                         out.flush();
+                        out.close();
                     } catch (IOException ioe) {
                         String msg = "Unable to find our spill file.";
                         log.fatal(msg, ioe);

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/SortedDataBag.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/SortedDataBag.java?rev=903018&r1=903017&r2=903018&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/SortedDataBag.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/data/SortedDataBag.java Tue Jan 26 00:02:23 2010
@@ -396,6 +396,11 @@
                     // Out of tuples in this file.  Set our slot in the
                     // array to null so we don't keep trying to read from
                     // this file.
+                    try {
+                        in.close();
+                    }catch(IOException e) {
+                        log.warn("Failed to close spill file.", e);
+                    }                	
                     mStreams.set(fileNum, null);
                 } catch (IOException ioe) {
                     String msg = "Unable to find our spill file.";
@@ -480,6 +485,7 @@
                             t.write(out);
                         }
                         out.flush();
+                        out.close();
                     } catch (IOException ioe) {
                         String msg = "Unable to find our spill file.";
                         log.fatal(msg, ioe);

Added: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/logical/expression/AndExpression.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/logical/expression/AndExpression.java?rev=903018&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/logical/expression/AndExpression.java (added)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/logical/expression/AndExpression.java Tue Jan 26 00:02:23 2010
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.experimental.logical.expression;
+
+import org.apache.pig.data.DataType;
+import org.apache.pig.experimental.plan.OperatorPlan;
+import org.apache.pig.experimental.plan.PlanVisitor;
+
+/**
+ * Boolean and expression.
+ */
+public class AndExpression extends BinaryExpression {
+
+    /**
+     * Will add this operator to the plan and connect it to the 
+     * left and right hand side operators.
+     * @param plan plan this operator is part of
+     * @param lhs expression on its left hand side
+     * @param rhs expression on its right hand side
+     */
+    public AndExpression(OperatorPlan plan,
+                         LogicalExpression lhs,
+                         LogicalExpression rhs) {
+        super("And", plan, DataType.BOOLEAN, lhs, rhs);
+    }
+
+    /**
+     * @link org.apache.pig.experimental.plan.Operator#accept(org.apache.pig.experimental.plan.PlanVisitor)
+     */
+    @Override
+    public void accept(PlanVisitor v) {
+        if (!(v instanceof LogicalExpressionVisitor)) {
+            throw new RuntimeException("Expected LogicalExpressionVisitor");
+        }
+        ((LogicalExpressionVisitor)v).visitAnd(this);
+    }
+
+}

Added: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/logical/expression/BinaryExpression.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/logical/expression/BinaryExpression.java?rev=903018&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/logical/expression/BinaryExpression.java (added)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/logical/expression/BinaryExpression.java Tue Jan 26 00:02:23 2010
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.experimental.logical.expression;
+
+import java.io.IOException;
+
+import org.apache.pig.experimental.plan.OperatorPlan;
+
+/**
+ * Superclass for all binary expressions
+ *
+ */
+public abstract class BinaryExpression extends LogicalExpression {
+    
+    /**
+     * Will add this operator to the plan and connect it to the 
+     * left and right hand side operators.
+     * @param name of the operator
+     * @param plan plan this operator is part of
+     * @param b Datatype of this expression
+     * @param lhs expression on its left hand side
+     * @param rhs expression on its right hand side
+     */
+    public BinaryExpression(String name,
+                            OperatorPlan plan,
+                            byte b,
+                            LogicalExpression lhs,
+                            LogicalExpression rhs) {
+        super(name, plan, b);
+        plan.add(this);
+        plan.connect(this, lhs);
+        plan.connect(this, rhs);
+    }
+
+    /**
+     * Get the left hand side of this binary expression.
+     * @return expression on the left hand side
+     * @throws IOException 
+     */
+    public LogicalExpression getLhs() throws IOException {
+        return (LogicalExpression)plan.getSuccessors(this).get(0);
+        
+    }
+
+    /**
+     * Get the right hand side of this binary expression.
+     * @return expression on the right hand side
+     * @throws IOException 
+     */
+    public LogicalExpression getRhs() throws IOException {
+        return (LogicalExpression)plan.getSuccessors(this).get(1);
+        
+    }
+
+}

Added: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/logical/expression/ColumnExpression.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/logical/expression/ColumnExpression.java?rev=903018&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/logical/expression/ColumnExpression.java (added)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/logical/expression/ColumnExpression.java Tue Jan 26 00:02:23 2010
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.experimental.logical.expression;
+
+import org.apache.pig.experimental.plan.OperatorPlan;
+
+/**
+ * Super class for all column expressions, including projection, constants, and deferences.
+ *
+ */
+public abstract class ColumnExpression extends LogicalExpression {
+
+    /**
+     * 
+     * @param name of the operator
+     * @param plan LogicalExpressionPlan this column expression is part of
+     * @param type datatype of this column expression
+     */
+    public ColumnExpression(String name, OperatorPlan plan, byte type) {
+        super(name, plan, type);
+    }
+
+}

Added: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/logical/expression/ConstantExpression.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/logical/expression/ConstantExpression.java?rev=903018&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/logical/expression/ConstantExpression.java (added)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/logical/expression/ConstantExpression.java Tue Jan 26 00:02:23 2010
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.experimental.logical.expression;
+
+import org.apache.pig.experimental.plan.OperatorPlan;
+import org.apache.pig.experimental.plan.PlanVisitor;
+
+/**
+ * A constant
+ *
+ */
+public class ConstantExpression extends ColumnExpression {
+    
+    // Stupid Java needs a union
+    Object val;
+    
+    /**
+     * Adds expression to the plan 
+     * @param plan LogicalExpressionPlan this constant is a part of.
+     * @param type type of the constant.  This could be determined dynamically,
+     * but it would require a long chain of instanceofs, and the parser will 
+     * already know the type, so there's no reason to take the performance hit.
+     * @param val Value of this constant.
+     */
+    public ConstantExpression(OperatorPlan plan, byte type, Object val) {
+        super("Constant", plan, type);
+        this.val = val;
+        plan.add(this);
+    }
+
+    /**
+     * @link org.apache.pig.experimental.plan.Operator#accept(org.apache.pig.experimental.plan.PlanVisitor)
+     */
+    @Override
+    public void accept(PlanVisitor v) {
+        if (!(v instanceof LogicalExpressionVisitor)) {
+            throw new RuntimeException("Expected LogicalExpressionVisitor");
+        }
+        ((LogicalExpressionVisitor)v).visitConstant(this);
+
+    }
+
+    /**
+     * Get the value of this constant.
+     * @return value of the constant
+     */
+    public Object getValue() {
+        return val;
+    }
+
+}

Added: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/logical/expression/EqualExpression.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/logical/expression/EqualExpression.java?rev=903018&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/logical/expression/EqualExpression.java (added)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/logical/expression/EqualExpression.java Tue Jan 26 00:02:23 2010
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.experimental.logical.expression;
+
+import org.apache.pig.data.DataType;
+import org.apache.pig.experimental.plan.OperatorPlan;
+import org.apache.pig.experimental.plan.PlanVisitor;
+
+/**
+ * Equality test expression.
+ */
+public class EqualExpression extends BinaryExpression {
+
+    /**
+     * Will add this operator to the plan and connect it to the 
+     * left and right hand side operators.
+     * @param plan plan this operator is part of
+     * @param lhs expression on its left hand side
+     * @param rhs expression on its right hand side
+     */
+    public EqualExpression(OperatorPlan plan,
+                           LogicalExpression lhs,
+                           LogicalExpression rhs) {
+        super("Equal", plan, DataType.BOOLEAN, lhs, rhs);
+    }
+
+    /**
+     * @link org.apache.pig.experimental.plan.Operator#accept(org.apache.pig.experimental.plan.PlanVisitor)
+     */
+    @Override
+    public void accept(PlanVisitor v) {
+        if (!(v instanceof LogicalExpressionVisitor)) {
+            throw new RuntimeException(
+                "Expected LogicalExpressionVisitor");
+        }
+        ((LogicalExpressionVisitor)v).visitEqual(this);
+    }
+
+}

Added: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/logical/expression/LogicalExpression.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/logical/expression/LogicalExpression.java?rev=903018&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/logical/expression/LogicalExpression.java (added)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/logical/expression/LogicalExpression.java Tue Jan 26 00:02:23 2010
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.experimental.logical.expression;
+
+import org.apache.pig.experimental.plan.Operator;
+import org.apache.pig.experimental.plan.OperatorPlan;
+
+/**
+ * Logical representation of expression operators.  Expression operators have
+ * a data type and a uid.  Uid is a unique id for each expression. 
+ *
+ */
+public abstract class LogicalExpression extends Operator {
+    
+    protected byte type;
+    protected long uid;
+
+    /**
+     * 
+     * @param name of the operator
+     * @param plan LogicalExpressionPlan this is part of
+     * @param b datatype of this expression
+     */
+    public LogicalExpression(String name, OperatorPlan plan, byte b) {
+        super(name, plan);
+        type = b;
+    }
+    
+    /**
+     * Get the data type for this expression.
+     * @return data type, one of the static bytes of DataType
+     */
+    public byte getType() {
+        return type;
+    }
+    
+    /**
+     * Get the unique identifier for this expression
+     * @return unique identifier
+     */
+    public long getUid() {
+        return uid;
+    }
+    
+    /**
+     * Set the unique identify for this expression
+     * @param uid unique identifier
+     */
+    public void setUid(long uid) {
+       this.uid = uid; 
+    }
+
+}

Added: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/logical/expression/LogicalExpressionPlan.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/logical/expression/LogicalExpressionPlan.java?rev=903018&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/logical/expression/LogicalExpressionPlan.java (added)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/logical/expression/LogicalExpressionPlan.java Tue Jan 26 00:02:23 2010
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.experimental.logical.expression;
+
+import org.apache.pig.experimental.plan.BaseOperatorPlan;
+
+/**
+ * A plan containing LogicalExpressionOperators.
+ */
+public class LogicalExpressionPlan extends BaseOperatorPlan {
+
+}

Added: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/logical/expression/LogicalExpressionVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/logical/expression/LogicalExpressionVisitor.java?rev=903018&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/logical/expression/LogicalExpressionVisitor.java (added)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/logical/expression/LogicalExpressionVisitor.java Tue Jan 26 00:02:23 2010
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.experimental.logical.expression;
+
+import org.apache.pig.experimental.plan.OperatorPlan;
+import org.apache.pig.experimental.plan.PlanVisitor;
+import org.apache.pig.experimental.plan.PlanWalker;
+
+/**
+ * A visitor for expression plans.
+ */
+public abstract class LogicalExpressionVisitor extends PlanVisitor {
+
+    protected LogicalExpressionVisitor(OperatorPlan p,
+                                       PlanWalker walker) {
+        super(p, walker);
+        
+        if (!(plan instanceof LogicalExpressionPlan)) {
+            throw new RuntimeException(
+                "LogicalExpressionVisitor expects to visit " +
+                "expression plans.");
+        }
+    }
+    
+    public void visitAnd(AndExpression andExpr) {
+    }
+
+    public void visitEqual(EqualExpression equal) {
+    }
+    
+    public void visitProject(ProjectExpression project) {
+    }
+    
+    public void visitConstant(ConstantExpression constant) {
+    }
+    
+    
+
+}

Added: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/logical/expression/ProjectExpression.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/logical/expression/ProjectExpression.java?rev=903018&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/logical/expression/ProjectExpression.java (added)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/logical/expression/ProjectExpression.java Tue Jan 26 00:02:23 2010
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.experimental.logical.expression;
+
+import org.apache.pig.experimental.plan.OperatorPlan;
+import org.apache.pig.experimental.plan.PlanVisitor;
+
+/**
+ * Projection of columns in an expression.
+ *
+ */
+public class ProjectExpression extends ColumnExpression {
+    
+    private int input; // Which input of the relational operator this project
+                       // is projecting from.  Count is zero based.  So if this
+                       // project is in a filter the input number will always
+                       // be 0 (since filter has only one input).  If it is
+                       // in a join, cross, cogroup, or union it could be
+                       // greater than 0.
+    private int col; // The column in the input which the project references.
+                     // Count is zero based.
+                      
+    
+    /**
+     * Adds projection to the plan.
+     * @param plan LogicalExpressionPlan this projection will be a part of
+     * @param type type of this projection, can be unknown
+     * @param inputNum Input number this project references.
+     * @param colNum Column number this project references.
+     */
+    public ProjectExpression(OperatorPlan plan,
+                             byte type,
+                             int inputNum,
+                             int colNum) {
+        super("Project", plan, type);
+        input = inputNum;
+        col = colNum;
+        plan.add(this);
+    }
+
+    /**
+     * @link org.apache.pig.experimental.plan.Operator#accept(org.apache.pig.experimental.plan.PlanVisitor)
+     */
+    @Override
+    public void accept(PlanVisitor v) {
+        if (!(v instanceof LogicalExpressionVisitor)) {
+            throw new RuntimeException("Expected LogicalExpressionVisitor");
+        }
+        ((LogicalExpressionVisitor)v).visitProject(this);
+
+    }
+
+    /**
+     * Input number this project references.  This is the input number for the
+     * relational operator that contains this expression.  The count is zero
+     * based.
+     * @return input number
+     */
+    public int getInputNum() {
+        return input;
+    }
+    
+   
+    /**
+     * Column number this project references.  The column number is the column
+     * in the relational operator that contains this expression.  The count
+     * is zero based.
+     * @return column number
+     */
+    public int getColNum() {
+        return col;
+    }
+    
+    /**
+     * Set the column number for this project.  This should only be called by
+     * ProjectionPatcher.  Stupid Java needs friends.  
+     * @param colNum new column number for projection
+     */
+    public void setColNum(int colNum) {
+        col = colNum;
+    }
+    
+     /**
+     * Set the type of the projection.
+     * @param type to set this projection to
+     */
+    public void setType(byte type) {
+        this.type = type;
+    }
+
+}

Added: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/logical/relational/LOLoad.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/logical/relational/LOLoad.java?rev=903018&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/logical/relational/LOLoad.java (added)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/logical/relational/LOLoad.java Tue Jan 26 00:02:23 2010
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.experimental.logical.relational;
+
+import org.apache.pig.FuncSpec;
+import org.apache.pig.experimental.plan.PlanVisitor;
+
+public class LOLoad extends LogicalRelationalOperator {
+    
+    private LogicalSchema scriptSchema;
+
+    /**
+     * 
+     * @param loader FuncSpec for load function to use for this load.
+     * @param schema schema user specified in script, or null if not
+     * specified.
+     * @param plan logical plan this load is part of.
+     */
+    public LOLoad(FuncSpec loader, LogicalSchema schema, LogicalPlan plan) {
+       super("LOLoad", plan);
+       scriptSchema = schema;
+    }
+    
+    /**
+     * Get the schema for this load.  The schema will be either be what was
+     * given by the user in the script or what the load functions getSchema
+     * call returned.  Otherwise null will be returned, indicating that the
+     * schema is unknown.
+     * @return schema, or null if unknown
+     */
+    @Override
+    public LogicalSchema getSchema() {
+        if (schema != null) return schema;
+        
+        // TODO get schema from LoaderMetadata interface.
+        LogicalSchema fromMetadata = getSchemaFromMetaData();
+        
+        if (scriptSchema != null && fromMetadata != null) {
+            schema = LogicalSchema.merge(scriptSchema, fromMetadata);
+            return schema;
+        }
+        
+        if (scriptSchema != null)  schema = scriptSchema;
+        else if (fromMetadata != null) schema = fromMetadata;
+        return schema;
+    }
+
+    private LogicalSchema getSchemaFromMetaData() {
+        return null;
+    }
+
+    @Override
+    public void accept(PlanVisitor v) {
+        if (!(v instanceof LogicalPlanVisitor)) {
+            throw new RuntimeException("Expected LogicalPlanVisitor");
+        }
+        ((LogicalPlanVisitor)v).visitLOLoad(this);
+
+    }
+
+}

Added: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/logical/relational/LogicalPlan.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/logical/relational/LogicalPlan.java?rev=903018&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/logical/relational/LogicalPlan.java (added)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/logical/relational/LogicalPlan.java Tue Jan 26 00:02:23 2010
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.experimental.logical.relational;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.pig.experimental.plan.BaseOperatorPlan;
+import org.apache.pig.experimental.plan.Operator;
+import org.apache.pig.experimental.plan.OperatorPlan;
+
+/**
+ * LogicalPlan is the logical view of relational operations Pig will execute 
+ * for a given script.  Note that it contains only realtional operations.
+ * All expressions will be contained in LogicalExpressionPlans inside
+ * each relational operator.  LogicalPlan provides operations for
+ * removing and adding LogicalRelationalOperators.  These will handle doing
+ * all of the necessary add, remove, connect, and disconnect calls in
+ * OperatorPlan.  They will not handle patching up individual relational
+ * operators.  That will be handle by the various Patchers.
+ *
+ */
+public class LogicalPlan extends BaseOperatorPlan {
+    
+    /**
+     * Add a relational operation to the plan.
+     * @param before operator that will be before the new operator.  This
+     * operator should already be in the plan.  If before is null then
+     * the new operator will be a root.
+     * @param newOper new operator to add.  This operator should not already
+     * be in the plan.
+     * @param after operator  that will be after the new operator.  This
+     * operator should already be in the plan.  If after is null, then the
+     * new operator will be a root.
+     * @throws IOException if add is already in the plan, or before or after
+     * are not in the plan.
+     */
+    public void add(LogicalRelationalOperator before,
+                    LogicalRelationalOperator newOper,
+                    LogicalRelationalOperator after) throws IOException {
+        doAdd(before, newOper, after);
+    }
+   
+    /**
+     * Add a relational operation with multiple outputs to the plan.
+     * @param before operators that will be before the new operator.  These
+     * operator should already be in the plan.
+     * @param newOper new operator to add.  This operator should not already
+     * be in the plan.
+     * @param after operator  that will be after the new operator.  This
+     * operator should already be in the plan.  If after is null, then the
+     * new operator will be a root.
+     * @throws IOException if add is already in the plan, or before or after
+     * are not in the plan.
+     */
+    public void add(List<LogicalRelationalOperator> before,
+                    LogicalRelationalOperator newOper,
+                    LogicalRelationalOperator after) throws IOException {
+        doAdd(null, newOper, after);
+        
+        for (LogicalRelationalOperator op : before) {
+            checkIn(op);
+            connect(op, newOper);
+        }
+    }
+    
+    /**
+     * Add a relational operation with multiple inputs to the plan.
+     * @param before operator that will be before the new operator.  This
+     * operator should already be in the plan.  If before is null then
+     * the new operator will be a root.
+     * @param newOper new operator to add.  This operator should not already
+     * be in the plan.
+     * @param after operators that will be after the new operator.  These
+     * operator should already be in the plan.
+     * @throws IOException if add is already in the plan, or before or after
+     * are not in the plan.
+     */
+    public void add(LogicalRelationalOperator before,
+                    LogicalRelationalOperator newOper,
+                    List<LogicalRelationalOperator> after) throws IOException {
+        doAdd(before, newOper, null);
+        
+        for (LogicalRelationalOperator op : after) {
+            checkIn(op);
+            connect(newOper, op);
+        }
+    }
+    
+    /**
+     * Add a relational operation to the plan when the caller wants to control
+     * how the nodes are connected in the graph.
+     * @param before operator that will be before the new operator.  This
+     * operator should already be in the plan.  before should not be null.
+     * the new operator will be a root.
+     * @param beforeToPos Position in before's edges to connect newOper at.
+     * @param beforeFromPos Position in newOps's edges to connect before at.
+     * @param newOper new operator to add.  This operator should not already
+     * be in the plan.
+     * @param afterToPos Position in after's edges to connect newOper at.
+     * @param afterFromPos Position in newOps's edges to connect after at.
+     * @param after operator  that will be after the new operator.  This
+     * operator should already be in the plan.  If after is null, then the
+     * new operator will be a root.
+     * @throws IOException if add is already in the plan, or before or after
+     * are not in the plan.
+     */
+    public void add(LogicalRelationalOperator before,
+                    int beforeToPos,
+                    int beforeFromPos,
+                    LogicalRelationalOperator newOper,
+                    int afterToPos,
+                    int afterFromPos,
+                    LogicalRelationalOperator after) throws IOException {
+        if (before != null) checkIn(before);
+        if (after != null) checkIn(after);
+        checkNotIn(newOper);
+        
+        add(newOper);
+        if (before != null) connect(before, beforeToPos, newOper, beforeFromPos);
+        if (after != null) connect(newOper, afterToPos, after, afterFromPos);
+        
+    }
+    
+    /**
+     * Remove an operator from the logical plan.  This call will take care
+     * of disconnecting the operator, connecting the predecessor(s) and 
+     * successor(s) and patching up the plan. 
+     * @param op operator to be removed.
+     * @throws IOException If the operator is not in the plan.
+     */
+    public void removeLogical(LogicalRelationalOperator op) throws IOException {
+        
+        checkIn(op);
+        List<Operator> pred = getPredecessors(op);
+        List<Operator> succ = getSuccessors(op);
+        int predSz = pred.size();
+        int succSz = succ.size();
+        if (predSz > 1 && succSz > 1) {
+            // Don't have a clue what to do here.  We shouldn't have any
+            // operators that have multiple inputs and multiple outputs.
+            throw new IOException("Attempt to remove a node with multiple "
+                + "inputs and outputs!");
+        }
+        
+        // Disconnect and remove the given node.
+        for (Operator p : pred) {
+            disconnect(p, op);
+        }
+        for (Operator s : succ) {
+            disconnect(op, s);
+        }
+        remove(op);
+        
+        // Now reconnect the before and after
+        if (predSz > 1 && succSz == 1) {
+            for (Operator p : pred) {
+                connect(p, succ.get(0));
+            }
+        } else if (predSz == 1 && succSz >= 1) {
+            for (Operator s : succ) {
+                connect(pred.get(0), s);
+            }
+        }
+        
+    }
+    
+    private void doAdd(LogicalRelationalOperator before,
+                       LogicalRelationalOperator newOper,
+                       LogicalRelationalOperator after) throws IOException {
+        if (before != null) checkIn(before);
+        if (after != null) checkIn(after);
+        checkNotIn(newOper);
+        
+        add(newOper);
+        if (before != null) connect(before, newOper);
+        if (after != null) connect(newOper, after);
+    }
+    
+    private void checkIn(LogicalRelationalOperator op) throws IOException {
+        if (!ops.contains(op)) {
+            throw new IOException("Attempt to use operator " + op.getName() + 
+                " which is not in the plan.");
+        }
+    }
+    
+     private void checkNotIn(LogicalRelationalOperator op) throws IOException {
+        if (ops.contains(op)) {
+            throw new IOException("Attempt to add operator " + op.getName() + 
+                " which is already in the plan.");
+        }
+    }
+        
+}

Added: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/logical/relational/LogicalPlanVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/logical/relational/LogicalPlanVisitor.java?rev=903018&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/logical/relational/LogicalPlanVisitor.java (added)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/logical/relational/LogicalPlanVisitor.java Tue Jan 26 00:02:23 2010
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.experimental.logical.relational;
+
+import org.apache.pig.experimental.plan.OperatorPlan;
+import org.apache.pig.experimental.plan.PlanVisitor;
+import org.apache.pig.experimental.plan.PlanWalker;
+
+/**
+ * A visitor for logical plans.
+ */
+public abstract class LogicalPlanVisitor extends PlanVisitor {
+
+    protected LogicalPlanVisitor(OperatorPlan plan, PlanWalker walker) {
+        super(plan, walker);
+        
+        if (!(plan instanceof LogicalPlan)) {
+            throw new RuntimeException(
+                "LogicalPlanVisitor expects to visit logical plans");
+        }
+    }
+    
+    public void visitLOLoad(LOLoad load) {
+    }
+
+}

Added: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/logical/relational/LogicalRelationalOperator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/logical/relational/LogicalRelationalOperator.java?rev=903018&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/logical/relational/LogicalRelationalOperator.java (added)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/logical/relational/LogicalRelationalOperator.java Tue Jan 26 00:02:23 2010
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.experimental.logical.relational;
+
+import org.apache.pig.experimental.plan.Operator;
+import org.apache.pig.experimental.plan.OperatorPlan;
+
+/**
+ * Logical representation of relational operators.  Relational operators have
+ * a schema.
+ */
+abstract public class LogicalRelationalOperator extends Operator {
+    
+    protected LogicalSchema schema;
+    int requestedParallelism;
+
+    /**
+     * 
+     * @param name of this operator
+     * @param plan this operator is in
+     */
+    public LogicalRelationalOperator(String name, OperatorPlan plan) {
+        this(name, plan, -1);
+    }
+    
+    /**
+     * 
+     * @param name of this operator
+     * @param plan this operator is in
+     * @param rp requested parallelism
+     */
+    public LogicalRelationalOperator(String name,
+                                     OperatorPlan plan,
+                                     int rp) {
+        super(name, plan);
+        requestedParallelism = rp;
+    }
+    
+
+    public int getRequestedParallelisam() {
+        return requestedParallelism;
+    } 
+
+    /**
+     * Get the schema for the output of this relational operator.
+     * @return the schema
+     */
+    abstract public LogicalSchema getSchema();
+
+}

Added: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/logical/relational/LogicalSchema.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/logical/relational/LogicalSchema.java?rev=903018&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/logical/relational/LogicalSchema.java (added)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/logical/relational/LogicalSchema.java Tue Jan 26 00:02:23 2010
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.experimental.logical.relational;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.pig.data.DataType;
+
+/**
+ * Schema, from a logical perspective.
+ */
+public class LogicalSchema {
+
+    public static class LogicalFieldSchema {
+        public String alias;
+        public DataType type;
+        public long uid;
+        public LogicalSchema schema;
+    }
+    
+    private List<LogicalFieldSchema> fields;
+    private Map<String, Integer> aliases;
+    
+    public LogicalSchema() {
+        fields = new ArrayList<LogicalFieldSchema>();
+        aliases = new HashMap<String, Integer>();
+    }
+    
+    /**
+     * Add a field to this schema.
+     * @param field to be added to the schema
+     */
+    public void addField(LogicalFieldSchema field) {
+        fields.add(field);
+        if (field.alias != null && field.alias.equals("")) {
+            aliases.put(field.alias, fields.size() - 1);
+        }
+    }
+    
+    /**
+     * Fetch a field by alias
+     * @param alias
+     * @return field associated with alias, or null if no such field
+     */
+    public LogicalFieldSchema getField(String alias) {
+        Integer i = aliases.get(alias);
+        if (i == null) return null;
+        else return fields.get(i);
+    }
+
+    /**
+     * Fetch a field by field number
+     * @param fieldNum field number to fetch
+     * @return field
+     */
+    public LogicalFieldSchema getField(int fieldNum) {
+        return fields.get(fieldNum);
+    }
+    
+    /**
+     * Get all fields
+     * @return list of all fields
+     */
+    public List<LogicalFieldSchema> getFields() {
+        return fields;
+    }
+
+    /**
+     * Get the size of the schema.
+     * @return size
+     */
+    public Integer size() {
+       return null;
+    }
+    
+    /**
+     * Merge two schemas.
+     * @param s1
+     * @param s2
+     * @return a merged schema, or null if the merge fails
+     */
+    public static LogicalSchema merge(LogicalSchema s1, LogicalSchema s2) {
+        // TODO
+        return null;
+    }
+    
+}

Added: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/BaseOperatorPlan.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/BaseOperatorPlan.java?rev=903018&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/BaseOperatorPlan.java (added)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/BaseOperatorPlan.java Tue Jan 26 00:02:23 2010
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.experimental.plan;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.impl.util.Pair;
+
+public abstract class BaseOperatorPlan implements OperatorPlan {
+
+    protected Set<Operator> ops;
+    protected PlanEdge fromEdges;
+    protected PlanEdge toEdges;
+
+    private List<Operator> roots;
+    private List<Operator> leaves;
+    protected static final Log log =
+        LogFactory.getLog(BaseOperatorPlan.class);
+ 
+    public BaseOperatorPlan() {
+        ops = new HashSet<Operator>();
+        roots = new ArrayList<Operator>();
+        leaves = new ArrayList<Operator>();
+        fromEdges = new PlanEdge();
+        toEdges = new PlanEdge();
+    }
+    
+    /**
+     * Get number of nodes in the plan.
+     */
+    public int size() {
+        return ops.size();
+    }
+
+    /**
+     * Get all operators in the plan that have no predecessors.
+     * @return all operators in the plan that have no predecessors, or
+     * an empty list if the plan is empty.
+     */
+    public List<Operator> getRoots() {
+        if (roots.size() == 0 && ops.size() > 0) {
+            for (Operator op : ops) {
+                if (toEdges.get(op) == null) {
+                    roots.add(op);
+                }
+            }
+        }
+        return roots;
+    }
+
+    /**
+     * Get all operators in the plan that have no successors.
+     * @return all operators in the plan that have no successors, or
+     * an empty list if the plan is empty.
+     */
+    public List<Operator> getLeaves() {
+        if (leaves.size() == 0 && ops.size() > 0) {
+            for (Operator op : ops) {
+                if (fromEdges.get(op) == null) {
+                    leaves.add(op);
+                }
+            }
+        }
+        return leaves;
+    }
+
+    /**
+     * For a given operator, get all operators immediately before it in the
+     * plan.
+     * @param op operator to fetch predecessors of
+     * @return list of all operators imeediately before op, or an empty list
+     * if op is a root.
+     * @throws IOException if op is not in the plan.
+     */
+    public List<Operator> getPredecessors(Operator op) throws IOException {
+        return (List<Operator>)toEdges.get(op);
+    }
+    
+    /**
+     * For a given operator, get all operators immediately after it.
+     * @param op operator to fetch successors of
+     * @return list of all operators imeediately after op, or an empty list
+     * if op is a leaf.
+     * @throws IOException if op is not in the plan.
+     */
+    public List<Operator> getSuccessors(Operator op) throws IOException {
+        return (List<Operator>)fromEdges.get(op);
+    }
+
+    /**
+     * Add a new operator to the plan.  It will not be connected to any
+     * existing operators.
+     * @param op operator to add
+     */
+    public void add(Operator op) {
+        markDirty();
+        ops.add(op);
+    }
+
+    /**
+     * Remove an operator from the plan.
+     * @param op Operator to be removed
+     * @throws IOException if the remove operation attempts to 
+     * remove an operator that is still connected to other operators.
+     */
+    public void remove(Operator op) throws IOException {
+        
+        if (fromEdges.containsKey(op) || toEdges.containsKey(op)) {
+            throw new IOException("Attempt to remove operator " + op.getName()
+                    + " that is still connected in the plan");
+        }
+        markDirty();
+        ops.remove(op);
+    }
+    
+    /**
+     * Connect two operators in the plan, controlling which position in the
+     * edge lists that the from and to edges are placed.
+     * @param from Operator edge will come from
+     * @param fromPos Position in the array for the from edge
+     * @param to Operator edge will go to
+     * @param toPos Position in the array for the to edge
+     */
+    public void connect(Operator from,
+                        int fromPos,
+                        Operator to,
+                        int toPos) {
+        markDirty();
+        fromEdges.put(from, to, fromPos);
+        toEdges.put(to, from, toPos);
+    }
+    
+    /**
+     * Connect two operators in the plan.
+     * @param from Operator edge will come from
+     * @param to Operator edge will go to
+     */
+    public void connect(Operator from, Operator to) {
+        markDirty();
+        fromEdges.put(from, to);
+        toEdges.put(to, from);
+    }
+    
+    /**
+     * Disconnect two operators in the plan.
+     * @param from Operator edge is coming from
+     * @param to Operator edge is going to
+     * @return pair of positions, indicating the position in the from and
+     * to arrays.
+     * @throws IOException if the two operators aren't connected.
+     */
+    public Pair<Integer, Integer> disconnect(Operator from,
+                                             Operator to) throws IOException {
+        Pair<Operator, Integer> f = fromEdges.removeWithPosition(from, to);
+        if (f == null) { 
+            throw new IOException("Attempt to disconnect operators " + 
+                from.getName() + " and " + to.getName() +
+                " which are not connected.");
+        }
+        
+        Pair<Operator, Integer> t = toEdges.removeWithPosition(to, from);
+        if (t == null) { 
+            throw new IOException("Plan in inconssistent state " + 
+                from.getName() + " and " + to.getName() +
+                " connected in fromEdges but not toEdges.");
+        }
+        
+        markDirty();
+        return new Pair<Integer, Integer>(f.second, t.second);
+    }
+
+    private void markDirty() {
+        roots.clear();
+        leaves.clear();
+    }
+
+    public Iterator<Operator> getOperators() {
+        return ops.iterator();
+    }
+
+}

Added: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/DependencyOrderWalker.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/DependencyOrderWalker.java?rev=903018&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/DependencyOrderWalker.java (added)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/experimental/plan/DependencyOrderWalker.java Tue Jan 26 00:02:23 2010
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.experimental.plan;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * A walker to walk graphs in dependency order.  It is guaranteed that a node
+ * will not be visited until all of its predecessors have been visited.  This
+ * is equivalent to doing a topilogical sort on the graph and then visiting
+ * the nodes in order.
+ */
+public class DependencyOrderWalker extends PlanWalker {
+
+    /**
+     * @param plan for this walker to traverse.
+     */
+    public DependencyOrderWalker(OperatorPlan plan) {
+        super(plan);
+    }
+
+    @Override
+    public PlanWalker spawnChildWalker(OperatorPlan plan) {
+        return new DependencyOrderWalker(plan);
+    }
+
+    /**
+     * Begin traversing the graph.
+     * @param visitor Visitor this walker is being used by.
+     * @throws VisitorException if an error is encountered while walking.
+     */
+    @Override
+    public void walk(PlanVisitor visitor) throws IOException {
+        // This is highly inefficient, but our graphs are small so it should be okay.
+        // The algorithm works by starting at any node in the graph, finding it's
+        // predecessors and calling itself for each of those predecessors.  When it
+        // finds a node that has no unfinished predecessors it puts that node in the
+        // list.  It then unwinds itself putting each of the other nodes in the list.
+        // It keeps track of what nodes it's seen as it goes so it doesn't put any
+        // nodes in the graph twice.
+
+        List<Operator> fifo = new ArrayList<Operator>();
+        Set<Operator> seen = new HashSet<Operator>();
+        List<Operator> leaves = plan.getLeaves();
+        if (leaves == null) return;
+        for (Operator op : leaves) {
+            doAllPredecessors(op, seen, fifo);
+        }
+
+        for (Operator op: fifo) {
+            op.accept(visitor);
+        }
+    }
+
+    protected void doAllPredecessors(Operator node,
+                                   Set<Operator> seen,
+                                   Collection<Operator> fifo) throws IOException {
+        if (!seen.contains(node)) {
+            // We haven't seen this one before.
+            Collection<Operator> preds = plan.getPredecessors(node);
+            if (preds != null && preds.size() > 0) {
+                // Do all our predecessors before ourself
+                for (Operator op : preds) {
+                    doAllPredecessors(op, seen, fifo);
+                }
+            }
+            // Now do ourself
+            seen.add(node);
+            fifo.add(node);
+        }
+    }
+}