You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2008/03/13 00:57:09 UTC

svn commit: r636581 - in /incubator/pig/branches/types: ./ src/org/apache/pig/ src/org/apache/pig/backend/executionengine/ src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/backend/local/executionengine/ src/org/apache/pig/impl/logi...

Author: gates
Date: Wed Mar 12 16:57:05 2008
New Revision: 636581

URL: http://svn.apache.org/viewvc?rev=636581&view=rev
Log:
Beginnings of changes for logical plans.  Modified build.xml so that almost no files actually are built, except the changed logical operators.  That way
developers can work with them without worrying about all the parts that don't compile yet.


Added:
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/ExpressionOperator.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOGenerate.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUserFunc.java
Removed:
    incubator/pig/branches/types/src/org/apache/pig/backend/executionengine/ExecLogicalPlan.java
Modified:
    incubator/pig/branches/types/build.xml
    incubator/pig/branches/types/src/org/apache/pig/LoadFunc.java
    incubator/pig/branches/types/src/org/apache/pig/backend/executionengine/ExecutionEngine.java
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/MapreducePlanCompiler.java
    incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/POMapreduce.java
    incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOEval.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLoad.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSort.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOVisitor.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalPlan.java
    incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java
    incubator/pig/branches/types/src/org/apache/pig/impl/plan/OperatorPlan.java

Modified: incubator/pig/branches/types/build.xml
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/build.xml?rev=636581&r1=636580&r2=636581&view=diff
==============================================================================
--- incubator/pig/branches/types/build.xml (original)
+++ incubator/pig/branches/types/build.xml Wed Mar 12 16:57:05 2008
@@ -132,9 +132,28 @@
     </target>
 
     <target name="compile-sources">
-        <javac encoding="${build.encoding}" srcdir="${sources}" includes="**/*.java" destdir="${dist}" debug="${javac.debug}" optimize="${javac.optimize}" target="${javac.version}" source="${javac.version}" deprecation="${javac.deprecation}">
+        <javac encoding="${build.encoding}" srcdir="${sources}" includes="**/plan/*.java, **/logicalLayer/LogicalPlan.java, **/logicalLayer/LOEval.java, **/logicalLayer/LOSort.java, **/logicalLayer/LOGenerate.java, **/logicalLayer/LOVisitor.java  " destdir="${dist}" debug="${javac.debug}" optimize="${javac.optimize}" target="${javac.version}" source="${javac.version}" deprecation="${javac.deprecation}">
             <compilerarg line="${javac.args} ${javac.args.warnings}" />
             <classpath refid="${cp}" />
+            <!--
+            <exclude name="**/StandAloneParser.java"/>
+            <exclude name="**/hadoop/executionengine/*.java"/>
+            <exclude name="**/local/executionengine/*.java"/>
+            <exclude name="**/physicalLayer/*.java"/>
+            <exclude name="**/builtin/*.java"/>
+            <exclude name="**/LOSplit.java"/>
+            <exclude name="**/LOCogroup.java"/>
+            <exclude name="**/LOLoad.java"/>
+            <exclude name="**/LOPrinter.java"/>
+            <exclude name="**/LOUserFunc.java"/>
+            <exclude name="**/LOSplitOutput.java"/>
+            <exclude name="**/LOStore.java"/>
+            <exclude name="**/LOUnion.java"/>
+            <exclude name="**/impl/parser/*.java"/>
+            <exclude name="**/tools/grunt/*.java"/>
+            <exclude name="**/Main.java"/>
+            <exclude name="**/PigServer.java"/>
+            -->
         </javac>
     </target>
 

Modified: incubator/pig/branches/types/src/org/apache/pig/LoadFunc.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/LoadFunc.java?rev=636581&r1=636580&r2=636581&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/LoadFunc.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/LoadFunc.java Wed Mar 12 16:57:05 2008
@@ -15,51 +15,155 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.pig;
-
-import java.io.IOException;
+package org.apache.pig;
 
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.io.BufferedPositionedInputStream;
-
-
-public interface LoadFunc {
-    /**
-     * This interface is used to implement functions to parse records
-     * from a dataset.
-     * 
-     * @author database-systems@research.yahoo
-     *
-     */
-    /**
-     * Specifies a portion of an InputStream to read tuples. Because the
-     * starting and ending offsets may not be on record boundaries it is up to
-     * the implementor to deal with figuring out the actual starting and ending
-     * offsets in such a way that an arbitrarily sliced up file will be processed
-     * in its entirety.
-     * <p>
-     * A common way of handling slices in the middle of records is to start at
-     * the given offset and, if the offset is not zero, skip to the end of the
-     * first record (which may be a partial record) before reading tuples.
-     * Reading continues until a tuple has been read that ends at an offset past
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+
+/**
+ * This interface is used to implement functions to parse records
+ * from a dataset.  This also includes functions to cast raw byte data into various
+ * datatypes.  These are external functions because we want loaders, whenever
+ * possible, to delay casting of datatypes until the last possible moment (i.e.
+ * don't do it on load).  This means we need to expose the functionality so that
+ * other sections of the code can call back to the loader to do the cast.
+ */
+public interface LoadFunc {
+    /**
+     * Specifies a portion of an InputStream to read tuples. Because the
+     * starting and ending offsets may not be on record boundaries it is up to
+     * the implementor to deal with figuring out the actual starting and ending
+     * offsets in such a way that an arbitrarily sliced up file will be processed
+     * in its entirety.
+     * <p>
+     * A common way of handling slices in the middle of records is to start at
+     * the given offset and, if the offset is not zero, skip to the end of the
+     * first record (which may be a partial record) before reading tuples.
+     * Reading continues until a tuple has been read that ends at an offset past
      * the ending offset.
      * <p>
      * <b>The load function should not do any buffering on the input stream</b>. Buffering will
-     * cause the offsets returned by is.getPos() to be unreliable.
-     *  
-     * @param fileName the name of the file to be read
-     * @param is the stream representing the file to be processed, and which can also provide its position.
-     * @param offset the offset to start reading tuples.
-     * @param end the ending offset for reading.
-     * @throws IOException
-     */
-    public abstract void bindTo(String fileName, BufferedPositionedInputStream is, long offset, long end) throws IOException;
-    /**
-     * Retrieves the next tuple to be processed.
-     * @return the next tuple to be processed or null if there are no more tuples
-     * to be processed.
-     * @throws IOException
-     */
-    public abstract Tuple getNext() throws IOException;
-    
-}
+     * cause the offsets returned by is.getPos() to be unreliable.
+     *  
+     * @param fileName the name of the file to be read
+     * @param is the stream representing the file to be processed, and which can also provide its position.
+     * @param offset the offset to start reading tuples.
+     * @param end the ending offset for reading.
+     * @throws IOException
+     */
+    public void bindTo(String fileName,
+                       BufferedPositionedInputStream is,
+                       long offset,
+                       long end) throws IOException;
+
+    /**
+     * Retrieves the next tuple to be processed.
+     * @return the next tuple to be processed or null if there are no more tuples
+     * to be processed.
+     * @throws IOException
+     */
+    public Tuple getNext() throws IOException;
+    
+    /**
+     * Cast data from bytes to boolean value.  
+     * @param bytes byte array to be cast.
+     * @return Boolean value.
+     * @throws IOException if the value cannot be cast.
+     */
+    public Boolean bytesToBoolean(byte[] b) throws IOException;
+    
+    /**
+     * Cast data from bytes to integer value.  
+     * @param bytes byte array to be cast.
+     * @return Integer value.
+     * @throws IOException if the value cannot be cast.
+     */
+    public Integer bytesToInteger(byte[] b) throws IOException;
+
+    /**
+     * Cast data from bytes to long value.  
+     * @param bytes byte array to be cast.
+     * @return Long value.
+     * @throws IOException if the value cannot be cast.
+     */
+    public Long bytesToLong(byte[] b) throws IOException;
+
+    /**
+     * Cast data from bytes to float value.  
+     * @param bytes byte array to be cast.
+     * @return Float value.
+     * @throws IOException if the value cannot be cast.
+     */
+    public Float bytesToFloat(byte[] b) throws IOException;
+
+    /**
+     * Cast data from bytes to double value.  
+     * @param bytes byte array to be cast.
+     * @return Double value.
+     * @throws IOException if the value cannot be cast.
+     */
+    public Double bytesToDouble(byte[] b) throws IOException;
+
+    /**
+     * Cast data from bytes to chararray value.  
+     * @param bytes byte array to be cast.
+     * @return String value.
+     * @throws IOException if the value cannot be cast.
+     */
+    public String bytesToCharArray(byte[] b) throws IOException;
+
+    /**
+     * Cast data from bytes to map value.  
+     * @param bytes byte array to be cast.
+     * @return Map value.
+     * @throws IOException if the value cannot be cast.
+     */
+    public Map<Object, Object> bytesToMap(byte[] b) throws IOException;
+
+    /**
+     * Cast data from bytes to tuple value.  
+     * @param bytes byte array to be cast.
+     * @return Tuple value.
+     * @throws IOException if the value cannot be cast.
+     */
+    public Tuple bytesToTuple(byte[] b) throws IOException;
+
+    /**
+     * Cast data from bytes to bag value.  
+     * @param bytes byte array to be cast.
+     * @return Bag value.
+     * @throws IOException if the value cannot be cast.
+     */
+    public DataBag bytesToBag(byte[] b) throws IOException;
+
+    /**
+     * Indicate to the loader fields that will be needed.  This can be useful for
+     * loaders that access data that is stored in a columnar format where indicating
+     * columns to be accessed a head of time will save scans.  If the loader
+     * function cannot make use of this information, it is free to ignore it.
+     * @param schema Schema indicating which columns will be needed.
+     */
+    public void fieldsToRead(Schema schema);
+
+    /**
+     * Find the schema from the loader.  This function will be called at parse time
+     * (not run time) to see if the loader can provide a schema for the data.  The
+     * loader may be able to do this if the data is self describing (e.g. JSON).  If
+     * the loader cannot determine the schema, it can return a null.
+     * @param fileName Name of the file to be read.
+     * @param in inpu stream, so that the function can read enough of the
+     * data to determine the schema.
+     * @param end Function should not read past this position in the stream.
+     * @return a Schema describing the data if possible, or null otherwise.
+     * @throws IOException.
+     */
+    public Schema determineSchema(String fileName,
+                                  BufferedPositionedInputStream in,
+                                  long end) throws IOException;
+}

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/executionengine/ExecutionEngine.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/executionengine/ExecutionEngine.java?rev=636581&r1=636580&r2=636581&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/executionengine/ExecutionEngine.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/executionengine/ExecutionEngine.java Wed Mar 12 16:57:05 2008
@@ -23,6 +23,7 @@
 import java.util.Map;
 
 import org.apache.pig.backend.datastorage.DataStorage;
+import org.apache.pig.impl.logicalLayer.LogicalPlan;
 
 /**
  * This is the main interface that various execution engines
@@ -87,11 +88,11 @@
      * @param properties
      * @return physical plan
      */
-    public ExecPhysicalPlan compile(ExecLogicalPlan plan,
+    public ExecPhysicalPlan compile(LogicalPlan plan,
                                     Properties properties)
         throws ExecException;
 
-    public ExecPhysicalPlan compile(ExecLogicalPlan[] plans,
+    public ExecPhysicalPlan compile(LogicalPlan[] plans,
                                     Properties properties)
         throws ExecException;
 

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=636581&r1=636580&r2=636581&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Wed Mar 12 16:57:05 2008
@@ -33,15 +33,16 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobSubmissionProtocol;
 import org.apache.hadoop.mapred.JobTracker;
+
 import org.apache.pig.backend.datastorage.DataStorage;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.executionengine.ExecJob;
-import org.apache.pig.backend.executionengine.ExecLogicalPlan;
 import org.apache.pig.backend.executionengine.ExecPhysicalOperator;
 import org.apache.pig.backend.executionengine.ExecPhysicalPlan;
 import org.apache.pig.backend.executionengine.ExecutionEngine;
@@ -54,10 +55,12 @@
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.logicalLayer.LogicalPlan;
 import org.apache.pig.impl.logicalLayer.OperatorKey;
 import org.apache.pig.shock.SSHSocketImplFactory;
 
 
+/*
 public class HExecutionEngine implements ExecutionEngine {
     
     private final Log log = LogFactory.getLog(getClass());
@@ -208,15 +211,13 @@
         throw new UnsupportedOperationException();
     }
 
-    public ExecPhysicalPlan compile(ExecLogicalPlan plan,
-                                               Properties properties)
-            throws ExecException {
-        return compile(new ExecLogicalPlan[] { plan },
-                       properties);
+    public ExecPhysicalPlan compile(LogicalPlan plan,
+                                    Properties properties) throws ExecException {
+        return compile(new LogicalPlan[] { plan }, properties);
     }
 
-    public ExecPhysicalPlan compile(ExecLogicalPlan[] plans,
-                                               Properties properties)
+    public ExecPhysicalPlan compile(LogicalPlan[] plans,
+                                    Properties properties)
             throws ExecException {
         if (plans == null) {
             throw new ExecException("No Plans to compile");
@@ -224,7 +225,7 @@
 
         OperatorKey physicalKey = null;
         for (int i = 0; i < plans.length; ++i) {
-            ExecLogicalPlan curPlan = null;
+            LogicalPlan curPlan = null;
 
             curPlan = plans[ i ];
      
@@ -476,6 +477,7 @@
     }
     
 }
+*/
 
 
 

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/MapreducePlanCompiler.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/MapreducePlanCompiler.java?rev=636581&r1=636580&r2=636581&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/MapreducePlanCompiler.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/MapreducePlanCompiler.java Wed Mar 12 16:57:05 2008
@@ -152,7 +152,7 @@
         else if (lo instanceof LOSplitOutput){
             POMapreduce child = (POMapreduce)execEngine.getPhysicalOpTable().get(compiledInputs[0]);
             String fileName = child.toSplit.tempFiles.get(((LOSplitOutput)lo).getReadFrom());
-            POMapreduce pom = new POMapreduce(lo.getScope(),
+            POMapreduce pom = new POMapreduce(lo.getOperatorKey().scope,
                                               nodeIdGenerator.getNextNodeId(logicalKey.getScope()),
                                               execEngine.getPhysicalOpTable(),
                                               logicalKey,
@@ -181,7 +181,7 @@
             return pom.getOperatorKey();
         }
         else if (lo instanceof LOLoad) {
-            POMapreduce pom = new POMapreduce(lo.getScope(),
+            POMapreduce pom = new POMapreduce(lo.getOperatorKey().scope,
                                               nodeIdGenerator.getNextNodeId(logicalKey.getScope()),
                                               execEngine.getPhysicalOpTable(),
                                               logicalKey,
@@ -204,7 +204,7 @@
             return compiledInputs[0];
         } 
         else if (lo instanceof LOUnion) {
-            POMapreduce pom = new POMapreduce(lo.getScope(), 
+            POMapreduce pom = new POMapreduce(lo.getOperatorKey().scope, 
                                               nodeIdGenerator.getNextNodeId(logicalKey.getScope()),
                                               execEngine.getPhysicalOpTable(),
                                               logicalKey,
@@ -217,14 +217,14 @@
         else if (lo instanceof LOSort) {
             LOSort loSort = (LOSort) lo;
             //must break up into 2 map reduce jobs, one for gathering quantiles, another for sorting
-            POMapreduce quantileJob = getQuantileJob(lo.getScope(), 
+            POMapreduce quantileJob = getQuantileJob(lo.getOperatorKey().scope, 
                                                      nodeIdGenerator.getNextNodeId(logicalKey.getScope()),
                                                      execEngine.getPhysicalOpTable(),
                                                      logicalKey,
                                                      (POMapreduce) (execEngine.getPhysicalOpTable().get(compiledInputs[0])), 
                                                      loSort);
             
-            return getSortJob(lo.getScope(), 
+            return getSortJob(lo.getOperatorKey().scope, 
                               nodeIdGenerator.getNextNodeId(logicalKey.getScope()),
                               execEngine.getPhysicalOpTable(),
                               logicalKey,

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/POMapreduce.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/POMapreduce.java?rev=636581&r1=636580&r2=636581&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/POMapreduce.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/POMapreduce.java Wed Mar 12 16:57:05 2008
@@ -105,7 +105,7 @@
                        OperatorKey sourceLogicalKey,
                        PigContext pigContext, 
                        OperatorKey[] inputsIn) {
-        super(scope, id, opTable, LogicalOperator.FIXED);
+        super(scope, id, opTable, 0);
         this.sourceLogicalKey = sourceLogicalKey;
         this.pigContext = pigContext;
         inputs = inputsIn;
@@ -117,7 +117,7 @@
                        OperatorKey sourceLogicalKey,
                        PigContext pigContext, 
                        OperatorKey inputIn) {
-        super(scope, id, opTable, LogicalOperator.FIXED);
+        super(scope, id, opTable, 0);
         this.sourceLogicalKey = sourceLogicalKey;
         this.pigContext = pigContext;
         inputs = new OperatorKey[1];
@@ -129,7 +129,7 @@
                        Map<OperatorKey, ExecPhysicalOperator> opTable,
                        OperatorKey sourceLogicalKey,
                        PigContext pigContext) {
-        super(scope, id, opTable, LogicalOperator.FIXED);
+        super(scope, id, opTable, 0);
         this.sourceLogicalKey = sourceLogicalKey;
         this.pigContext = pigContext;
         inputs = new OperatorKey[0];

Modified: incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java?rev=636581&r1=636580&r2=636581&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java Wed Mar 12 16:57:05 2008
@@ -35,12 +35,12 @@
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.executionengine.ExecJob;
 import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS;
-import org.apache.pig.backend.executionengine.ExecLogicalPlan;
 import org.apache.pig.backend.executionengine.ExecPhysicalOperator;
 import org.apache.pig.backend.executionengine.ExecScopedLogicalOperator;
 import org.apache.pig.backend.executionengine.ExecPhysicalPlan;
 import org.apache.pig.impl.logicalLayer.*;
 import org.apache.pig.impl.physicalLayer.PhysicalOperator;
+import org.apache.pig.impl.logicalLayer.LogicalPlan;
 import org.apache.pig.impl.logicalLayer.parser.NodeIdGenerator;
 import org.apache.pig.impl.eval.cond.Cond;
 import org.apache.pig.impl.io.FileSpec;
@@ -98,9 +98,8 @@
     }
 
     
-    public LocalPhysicalPlan compile(ExecLogicalPlan plan,
-                                     Properties properties)
-            throws ExecException {
+    public LocalPhysicalPlan compile(LogicalPlan plan,
+                                     Properties properties) throws ExecException {
         if (plan == null) {
             throw new ExecException("No Plan to compile");
         }
@@ -108,16 +107,15 @@
         return compile(new ExecLogicalPlan[]{ plan } , properties);
     }
 
-    public LocalPhysicalPlan compile(ExecLogicalPlan[] plans,
-                                     Properties properties)
-            throws ExecException {
+    public LocalPhysicalPlan compile(LogicalPlan[] plans,
+                                     Properties properties) throws ExecException {
         if (plans == null) {
             throw new ExecException("No Plans to compile");
         }
 
         OperatorKey physicalKey = null;
         for (int i = 0; i < plans.length; ++i) {
-            ExecLogicalPlan curPlan = null;
+            LogicalPlan curPlan = null;
 
             curPlan = plans[ i ];
      

Added: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/ExpressionOperator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/ExpressionOperator.java?rev=636581&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/ExpressionOperator.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/ExpressionOperator.java Wed Mar 12 16:57:05 2008
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.logicalLayer;
+
+
+public abstract class ExpressionOperator extends LogicalOperator {
+
+    /**
+     * @param plan LogicalPlan this operator will be a part of.
+     * @param k OperatorKey of this operator
+     * @param rp requested level of parallelism, -1 for default.
+     */
+    public ExpressionOperator(LogicalPlan plan, OperatorKey k, int rp) {
+        super(plan, k, rp);
+    }
+
+}
+

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java?rev=636581&r1=636580&r2=636581&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOCogroup.java Wed Mar 12 16:57:05 2008
@@ -22,166 +22,51 @@
 import java.util.List;
 import java.util.Map;
 
-import org.apache.pig.data.Tuple;
-import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.eval.EvalSpec;
-import org.apache.pig.impl.logicalLayer.schema.AtomSchema;
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.impl.logicalLayer.schema.TupleSchema;
-
+import org.apache.pig.impl.plan.PlanVisitor;
 
 
 public class LOCogroup extends LogicalOperator {
-    private static final long serialVersionUID = 1L;
-
-    protected ArrayList<EvalSpec> specs;
+    private static final long serialVersionUID = 2L;
 
-    public LOCogroup(Map<OperatorKey, LogicalOperator> opTable,
-                     String scope,
-                     long id,
-                     List<OperatorKey> inputs,
-                     ArrayList<EvalSpec> specs) {
-        super(opTable, scope, id, inputs);
-        this.specs = specs;
-        getOutputType();
+    public LOCogroup(LogicalPlan plan, OperatorKey k) { 
+        super(plan, k, -1);
     }
 
     @Override
     public String name() {
-        if (inputs.size() == 1) return "Group " + scope + "-" + id;
-        else return "CoGroup " + scope + "-" + id;
+        return "CoGroup " + mKey.scope + "-" + mKey.id;
     }
-    @Override
-    public String arguments() {
-        StringBuffer sb = new StringBuffer();
-
-        for (int i = 0; i < specs.size(); i++) {
-            sb.append(specs.get(i));
-            if (i + 1 < specs.size())
-                sb.append(", ");
-        }
 
-        return sb.toString();
+    @Override
+    public String typeName() {
+        return "LOCogroup";
     }
 
-    public static Object[] getGroupAndTuple(Object d) {
-        if (!(d instanceof Tuple)) {
-            throw new RuntimeException
-                ("Internal Error: Evaluation of group expression did not return a tuple");
-        }
-        Tuple output = (Tuple) d;
-        if (output.size() < 2) {
-            throw new RuntimeException
-                ("Internal Error: Evaluation of group expression returned a tuple with <2 fields");
-        }
-
-        Object[] groupAndTuple = new Object[2];
-        try {
-            if (output.size() == 2) {
-                groupAndTuple[0] = output.get(0);
-                groupAndTuple[1] = output.get(1);
-            } else {
-                Tuple group = TupleFactory.getInstance().newTuple(output.size() - 1);
-                for (int j = 0; j < output.size() - 1; j++) {
-                    group.set(j, output.get(j));
-                }
-                groupAndTuple[0] = group;
-                groupAndTuple[1] = output.get(output.size() - 1);
-            }
-        } catch(IOException e) {
-            throw new RuntimeException(e);
-        }
-        return groupAndTuple;
+    @Override
+    public boolean supportsMultipleInputs() {
+        return true;
     }
 
     @Override
-    public TupleSchema outputSchema() {
-        if (schema == null) {
-            schema = new TupleSchema();
-
-
-            Schema groupElementSchema =
-                specs.get(0).getOutputSchemaForPipe(opTable.get(getInputs().get(0)).
-                                                    outputSchema());
-            if (groupElementSchema == null) {
-                groupElementSchema = new TupleSchema();
-                groupElementSchema.setAlias("group");
-            } else {
-
-                if (!(groupElementSchema instanceof TupleSchema))
-                    throw new RuntimeException
-                        ("Internal Error: Schema of group expression was atomic");
-                List<Schema> fields =
-                    ((TupleSchema) groupElementSchema).getFields();
-
-                if (fields.size() < 2)
-                    throw new RuntimeException
-                        ("Internal Error: Schema of group expression retured <2 fields");
-
-                if (fields.size() == 2) {
-                    groupElementSchema = fields.get(0);
-                    groupElementSchema.removeAllAliases();
-                    groupElementSchema.setAlias("group");
-                } else {
-                    groupElementSchema = new TupleSchema();
-                    groupElementSchema.setAlias("group");
-
-                    for (int i = 0; i < fields.size() - 1; i++) {
-                        ((TupleSchema) groupElementSchema).add(fields.get(i));
-                    }
-                }
-
-            }
-
-            schema.add(groupElementSchema);
-
-          for (OperatorKey key: getInputs()) {
-              LogicalOperator lo = opTable.get(key);
-                TupleSchema inputSchema = lo.outputSchema();
-                if (inputSchema == null)
-                    inputSchema = new TupleSchema();
-                schema.add(inputSchema);
-            }
-        }
-
-        schema.setAlias(alias);
-        return schema;
+    public boolean supportsMultipleOutputs() {
+        return false;
     }
 
     @Override
-    public int getOutputType() {
-        int outputType = FIXED;
-        for (int i = 0; i < getInputs().size(); i++) {
-            switch (opTable.get(getInputs().get(i)).getOutputType()) {
-            case FIXED:
-                continue;
-            case MONOTONE:
-                outputType = AMENDABLE;
-                break;
-            case AMENDABLE:
-            default:
-                throw new RuntimeException
-                    ("Can't feed a cogroup into another in the streaming case");
-            }
-        }
-        return outputType;
+    public Schema getSchema() {
+        // TODO create schema
+        return null;
     }
 
     @Override
-    public List<String> getFuncs() {
-        List<String> funcs = super.getFuncs();
-      for (EvalSpec spec:specs) {
-            funcs.addAll(spec.getFuncs());
+    public void visit(PlanVisitor v) throws ParseException {
+        if (!(v instanceof LOVisitor)) {
+            throw new RuntimeException("You can only visit LogicalOperators "
+                + "with an LOVisitor!");
         }
-        return funcs;
-    }
-
-    public ArrayList<EvalSpec> getSpecs() {
-        return specs;
-    }
-
-    public void visit(LOVisitor v) {
-        v.visitCogroup(this);
+        ((LOVisitor)v).visitCogroup(this);
     }
 
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOEval.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOEval.java?rev=636581&r1=636580&r2=636581&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOEval.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOEval.java Wed Mar 12 16:57:05 2008
@@ -18,78 +18,61 @@
 package org.apache.pig.impl.logicalLayer;
 
 import java.util.List;
-import java.util.Map;
-
-import org.apache.pig.impl.eval.EvalSpec;
-import org.apache.pig.impl.logicalLayer.schema.TupleSchema;
-
 
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.plan.PlanVisitor;
 
 public class LOEval extends LogicalOperator {
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 2L;
 
-    protected EvalSpec spec;
+    private LOGenerate mGen = null;
 
-    public LOEval(Map<OperatorKey, LogicalOperator> opTable,
-                  String scope, 
-                  long id, 
-                  OperatorKey input, 
-                  EvalSpec specIn) {
-        super(opTable, scope, id, input);
-        spec = specIn;
-        getOutputType();
+    public LOEval(LogicalPlan plan, OperatorKey key) {
+        super(plan, key);
+    }
+
+    public void setGenerate(LOGenerate gen) {
+        mGen = gen;
     }
 
     @Override
     public String name() {
-        return "Eval " + scope + "-" + id;
+        return "Eval " + mKey.scope + "-" + mKey.id;
     }
 
     @Override
-    public String arguments() {
-        return spec.toString();
+    public String typeName() {
+        return "LOEval";
     }
 
     @Override
-    public TupleSchema outputSchema() {
-        if (schema == null) {
-            //log.info("LOEval input: " + inputs[0].outputSchema());
-            //log.info("LOEval spec: " + spec);
-            schema =
-                (TupleSchema) spec.getOutputSchemaForPipe(opTable.get(getInputs().get(0)).
-                                                          outputSchema());
-
-            //log.info("LOEval output: " + schema);
-        }
-        schema.setAlias(alias);
-        return schema;
+    public boolean supportsMultipleInputs() {
+        return true;
     }
 
     @Override
-    public int getOutputType() {
-        switch (opTable.get(getInputs().get(0)).getOutputType()) {
-        case FIXED:
-            return FIXED;
-        case MONOTONE:
-        case AMENDABLE:
-            return MONOTONE;
-        default:
-            throw new RuntimeException("Wrong type of input to EVAL");
-        }
+    public boolean supportsMultipleOutputs() {
+        return false;
     }
 
     @Override
-    public List<String> getFuncs() {
-        List<String> funcs = super.getFuncs();
-        funcs.addAll(spec.getFuncs());
-        return funcs;
+    public Schema getSchema() {
+        if (mSchema == null) {
+            // Ask the generate at the end of the plan for its schema,
+            // and return that.
+            mSchema = mGen.getSchema();
+            }
+        return mSchema;
     }
 
-    public EvalSpec getSpec() {
-        return spec;
+    @Override
+    public void visit(PlanVisitor v) throws ParseException {
+        if (!(v instanceof LOVisitor)) {
+            throw new RuntimeException("You can only visit LogicalOperators "
+                + "with an LOVisitor!");
+        }
+        ((LOVisitor)v).visitEval(this);
     }
 
-    public void visit(LOVisitor v) {
-        v.visitEval(this);
-    }
 }

Added: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOGenerate.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOGenerate.java?rev=636581&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOGenerate.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOGenerate.java Wed Mar 12 16:57:05 2008
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.logicalLayer;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.plan.PlanVisitor;
+
+public class LOGenerate extends LogicalOperator {
+    private static final long serialVersionUID = 2L;
+
+    /**
+     * The projection list of this generate.
+     */
+    private ArrayList<LogicalOperator> mProjections;
+
+    public LOGenerate(LogicalPlan plan, OperatorKey key) {
+        super(plan, key);
+    }
+
+    @Override
+    public String name() {
+        return "Generate " + mKey.scope + "-" + mKey.id;
+    }
+
+    @Override
+    public String typeName() {
+        return "LOGenerate";
+    }
+
+    @Override
+    public boolean supportsMultipleInputs() {
+        return true;
+    }
+
+    @Override
+    public boolean supportsMultipleOutputs() {
+        return false;
+    }
+
+    @Override
+    public Schema getSchema() {
+        if (mSchema == null) {
+            List<Schema.FieldSchema> fss =
+                new ArrayList<Schema.FieldSchema>(mProjections.size());
+            for (LogicalOperator op : mProjections) {
+                if (op.getType() == DataType.TUPLE) {
+                    fss.add(new Schema.FieldSchema(null, op.getSchema()));
+                } else {
+                    fss.add(new Schema.FieldSchema(null, op.getType()));
+                }
+            }
+            mSchema = new Schema(fss);
+        }
+        return mSchema;
+    }
+
+    @Override
+    public void visit(PlanVisitor v) throws ParseException {
+        if (!(v instanceof LOVisitor)) {
+            throw new RuntimeException("You can only visit LogicalOperators "
+                + "with an LOVisitor!");
+        }
+        ((LOVisitor)v).visitGenerate(this);
+    }
+
+    public List<LogicalOperator> getProjections() {
+        return mProjections;
+    }
+
+}

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLoad.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLoad.java?rev=636581&r1=636580&r2=636581&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLoad.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOLoad.java Wed Mar 12 16:57:05 2008
@@ -21,8 +21,6 @@
 import java.util.List;
 import java.util.Map;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.pig.LoadFunc;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileSpec;
@@ -32,28 +30,23 @@
 
 
 public class LOLoad extends LogicalOperator {
-    private static final long serialVersionUID = 1L;
+    private static final long serialVersionUID = 2L;
     
-    private final Log log = LogFactory.getLog(getClass());
+    private FileSpec mInput;
+    private LoadFunc mLoadFunc;
 
-    protected FileSpec inputFileSpec;
-
-    protected int outputType = FIXED;
-
-
-    public LOLoad(Map<OperatorKey, LogicalOperator> opTable, 
-                  String scope, 
-                  long id, 
-                  FileSpec inputFileSpec) throws IOException, ParseException {
-        super(opTable, scope, id);
-        this.inputFileSpec = inputFileSpec;
+    public LOLoad(OperatorKey k,
+                  FileSpec inputFileSpec,
+                  String loader) throws IOException, ParseException {
+        super(k);
+        this.mInput = inputFileSpec;
         
         // check if we can instantiate load func
-        LoadFunc storageFunc = (LoadFunc) PigContext
-                .instantiateFuncFromSpec(inputFileSpec.getFuncSpec());
+        LoadFunc storageFunc =
+            (LoadFunc)PigContext.instantiateFuncFromSpec(loader);
 
         // TODO: Handle Schemas defined by Load Functions
-        schema = new TupleSchema();
+        //schema = new TupleSchema();
     }
 
     @Override
@@ -62,34 +55,12 @@
     }
 
     public FileSpec getInputFileSpec() {
-        return inputFileSpec;
-    }
-
-    public void setInputFileSpec(FileSpec spec) {
-        inputFileSpec = spec;
+        return mInput;
     }
 
     @Override
     public String arguments() {
-        return inputFileSpec.toString();
-    }
-
-    @Override
-    public TupleSchema outputSchema() {
-        schema.setAlias(alias);
-        return this.schema;
-    }
-
-    @Override
-    public int getOutputType() {
-        return outputType;
-    }
-
-    public void setOutputType(int type) {
-        if (type < FIXED || type > AMENDABLE) {
-            throw new RuntimeException("Illegal output type");
-        }
-        outputType = type;
+        return mInput.toString();
     }
 
     @Override
@@ -104,7 +75,7 @@
     @Override
     public List<String> getFuncs() {
         List<String> funcs = super.getFuncs();
-        funcs.add(inputFileSpec.getFuncName());
+        funcs.add(mInput.getFuncName());
         return funcs;
     }
 

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSort.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSort.java?rev=636581&r1=636580&r2=636581&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSort.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOSort.java Wed Mar 12 16:57:05 2008
@@ -17,70 +17,91 @@
  */
 package org.apache.pig.impl.logicalLayer;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 
-import org.apache.pig.impl.eval.EvalSpec;
-import org.apache.pig.impl.logicalLayer.schema.TupleSchema;
-
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.plan.PlanVisitor;
 
 public class LOSort extends LogicalOperator {
-    private static final long serialVersionUID = 1L;
-    private EvalSpec sortSpec;
+    private static final long serialVersionUID = 2L;
 
+    private List<Integer> mSortCols;
+    private List<Boolean> mAscCols;
+    private LOUserFunc mSortFunc;
+
+    /**
+     * @param plan LogicalPlan this operator is a part of.
+     * @param key OperatorKey for this operator
+     * @param sortCols Array of column numbers that will be used for sorting
+     * data.
+     * @param ascCols Array of booleans.  Should be same size as sortCols.  True
+     * indicates sort ascending (default), false sort descending.  If this array
+     * is null, then all columns will be sorted ascending.
+     * @param rp Requested level of parallelism to be used in the sort.
+     */
+    public LOSort(LogicalPlan plan,
+                  OperatorKey key,
+                  List<Integer> sortCols,
+                  List<Boolean> ascCols,
+                  LOUserFunc sortFunc,
+                  int rp) {
+        super(plan, key, rp);
+        mSortCols = sortCols;
+        mAscCols = ascCols;
+        mSortFunc = sortFunc;
+    }
+
+    public List<Integer> getSortCols() {
+        return mSortCols;
+    }
 
-    protected EvalSpec spec;
+    public List<Boolean> getAscendingCols() {
+        return mAscCols;
+    }
 
-    public EvalSpec getSpec() {
-        return spec;
+    public LOUserFunc getUserFunc() {
+        return mSortFunc;
     }
     
-    public LOSort(Map<OperatorKey, LogicalOperator> opTable,
-                  String scope, 
-                  long id, 
-                  OperatorKey input, 
-                  EvalSpec sortSpec) {
-        super(opTable, scope, id, input);
-        this.sortSpec = sortSpec;
-        getOutputType();
-    }
-
     @Override
     public String name() {
-        return "SORT " + scope + "-" + id;
+        return "SORT " + mKey.scope + "-" + mKey.id;
     }
 
     @Override
-    public String arguments() {
-        return sortSpec.toString();
+    public String typeName() {
+        return "LOSort";
     }
 
     @Override
-    public int getOutputType() {
-        switch (opTable.get(getInputs().get(0)).getOutputType()) {
-        case FIXED:
-            return FIXED;
-        default:
-            throw new RuntimeException
-                ("Blocking operator such as sort cannot handle streaming input");
+    public Schema getSchema() {
+        if (mSchema == null) {
+            // get our parent's schema
+            Collection<LogicalOperator> s = mPlan.getSuccessors(this);
+            mSchema = s.iterator().next().getSchema();
         }
+        return mSchema;
     }
 
     @Override
-    public TupleSchema outputSchema() {
-        if (schema == null)
-            schema = opTable.get(getInputs().get(0)).outputSchema().copy();
-
-        schema.setAlias(alias);
-        return schema;
-
+    public boolean supportsMultipleInputs() {
+        return false;
     }
 
-    public EvalSpec getSortSpec() {
-        return sortSpec;
+    @Override
+    public boolean supportsMultipleOutputs() {
+        return false;
     }
 
-    public void visit(LOVisitor v) {
-        v.visitSort(this);
+    public void visit(PlanVisitor v) throws ParseException {
+        if (!(v instanceof LOVisitor)) {
+            throw new RuntimeException("You can only visit LogicalOperators "
+                + "with an LOVisitor!");
+        }
+        ((LOVisitor)v).visitSort(this);
     }
-
 }

Added: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUserFunc.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUserFunc.java?rev=636581&view=auto
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUserFunc.java (added)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOUserFunc.java Wed Mar 12 16:57:05 2008
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.impl.logicalLayer;
+
+import java.util.List;
+
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.plan.PlanVisitor;
+
+public class LOUserFunc extends ExpressionOperator {
+    private static final long serialVersionUID = 2L;
+
+    private List<ExpressionOperator> mArgs;
+
+    /**
+     * @param plan LogicalPlan this operator is a part of.
+     * @param k OperatorKey for this operator.
+     * @param args List of expressions that form the arguments for this
+     * function.
+     * @param returnType return type of this function.
+     */
+    public LOUserFunc(LogicalPlan plan,
+                      OperatorKey k,
+                      List<ExpressionOperator> args,
+                      byte returnType) {
+        super(plan, k, -1);
+        mArgs = args;
+        mType = returnType;
+    }
+
+    public List<ExpressionOperator> getArguments() {
+        return mArgs;
+    }
+
+    @Override
+    public boolean supportsMultipleInputs() {
+        return true;
+    }
+
+    @Override
+    public boolean supportsMultipleOutputs() {
+        return false;
+    }
+    
+    @Override
+    public String name() {
+        return "UserFunc " + mKey.scope + "-" + mKey.id;
+    }
+
+    @Override
+    public String typeName() {
+        return "LOUserFunc";
+    }
+
+    @Override
+    public Schema getSchema() {
+        if (mSchema == null) {
+            mSchema = new Schema(new Schema.FieldSchema(null, mType));
+        }
+        return mSchema;
+    }
+
+    @Override
+    public void visit(PlanVisitor v) throws ParseException {
+        if (!(v instanceof LOVisitor)) {
+            throw new RuntimeException("You can only visit LogicalOperators "
+                + "with an LOVisitor!");
+        }
+        ((LOVisitor)v).visitUserFunc(this);
+    }
+}

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOVisitor.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOVisitor.java?rev=636581&r1=636580&r2=636581&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOVisitor.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LOVisitor.java Wed Mar 12 16:57:05 2008
@@ -20,6 +20,9 @@
 import java.util.List;
 import java.util.Iterator;
 
+import org.apache.pig.impl.plan.PlanVisitor;
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
+
 
 /**
  * A visitor mechanism for navigating and operating on a tree of Logical
@@ -42,77 +45,36 @@
  * WRONG:  LOEval myEval; MyVisitor v; v.visitEval(myEval);
  * These methods are only public to make them accessible to the LO* objects.
  */
-abstract public class LOVisitor {
+abstract public class LOVisitor extends PlanVisitor {
 
-    /**
-     * Only LOCogroup.visit() and subclass implementations of this function
-     * should ever call this method.
-     */
-    public void visitCogroup(LOCogroup g) {
-        basicVisit(g);
-    }
-        
-    /**
-     * Only LOEval.visit() and subclass implementations of this function
-     * should ever call this method.
-     */
-    public void visitEval(LOEval e) {
-        basicVisit(e);
-    }
-        
-    /**
-     * Only LOUnion.visit() and subclass implementations of this function
-     * should ever call this method.
-     */
-    public void visitUnion(LOUnion u) {
-        basicVisit(u);
-    }
-        
-        
-    /**
-     * Only LOLoad.visit() and subclass implementations of this function
-     * should ever call this method.
-     */
-    public void visitLoad(LOLoad load) {
-        basicVisit(load);
+    public LOVisitor(LogicalPlan plan) {
+        super(plan);
     }
-        
-    /**
-     * Only LOSort.visit() and subclass implementations of this function
-     * should ever call this method.
-     */
-    public void visitSort(LOSort s) {
-        basicVisit(s);
+
+    void visitCogroup(LOCogroup cg) throws ParseException {
     }
-        
-    /**
-     * Only LOSplit.visit() and subclass implementations of this function
-     * should ever call this method.
-     */
-    public void visitSplit(LOSplit s) {
-        basicVisit(s);
+
+    void visitEval(LOEval e) throws ParseException {
+        // Don't worry about visiting the contained logical operators, as the logical
+        // operators in it are already contained in the outer plan.
     }
-    
-    public void visitSplitOutput(LOSplitOutput s) {
-        basicVisit(s);
+
+    void visitGenerate(LOGenerate g) throws ParseException {
+        // Visit each of generates projection elements.
+        Iterator<LogicalOperator> i = g.getProjections().iterator();
+        while (i.hasNext()) {
+            i.next().visit(this);
+        }
     }
-    
         
-    /**
-     * Only LOStore.visit() and subclass implementations of this function
-     * should ever call this method.
-     */
-    public void visitStore(LOStore s) {
-        basicVisit(s);
+    void visitSort(LOSort s) throws ParseException {
     }
 
-    private void basicVisit(LogicalOperator lo) {
-        List<OperatorKey> inputs = lo.getInputs();
-        Iterator<OperatorKey> i = inputs.iterator();
-        
+    void visitUserFunc(LOUserFunc func) throws ParseException {
+        // Visit each of the arguments
+        Iterator<ExpressionOperator> i = func.getArguments().iterator();
         while (i.hasNext()) {
-            LogicalOperator input = lo.getOpTable().get(i.next());
-            input.visit(this);
+            i.next().visit(this);
         }
     }
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java?rev=636581&r1=636580&r2=636581&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalOperator.java Wed Mar 12 16:57:05 2008
@@ -15,146 +15,151 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.pig.impl.logicalLayer;
 
-import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
 
-import org.apache.pig.impl.logicalLayer.schema.TupleSchema;
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.plan.Operator;
 
-import org.apache.pig.backend.executionengine.ExecScopedLogicalOperator;
 
+/**
+ * Parent for all Logical operators.
+ */
+abstract public class LogicalOperator extends Operator {
+    private static final long serialVersionUID = 2L;
 
-abstract public class LogicalOperator implements Serializable, ExecScopedLogicalOperator {
-    public String alias = null;
+    /**
+     * Schema that defines the output of this operator.
+     */
+    protected Schema mSchema = null;
 
-    public static final int FIXED = 1;
-    public static final int MONOTONE = 2;
-    public static final int UPDATABLE = 3;      // Reserved for future use
-    public static final int AMENDABLE = 4;
+    /**
+     * Datatype of this output of this operator.  Operators start out with data type
+     * set to UNKNOWN, and have it set for them by the type checker.
+     */
+    protected byte mType = DataType.UNKNOWN;
 
-    protected int requestedParallelism = -1;
-    protected TupleSchema schema = null;
-    protected List<OperatorKey> inputs;
+    /**
+     * Requested level of parallelism for this operation.
+     */
+    protected int mRequestedParallelism;
 
-    protected Map<OperatorKey, LogicalOperator> opTable;
+    /**
+     * Name of the record set that results from this operator.
+     */
+    protected String mAlias; 
 
-    protected String scope;
-    protected long id;
-    
-    protected LogicalOperator(Map<OperatorKey, LogicalOperator> opTable,
-                              String scope, 
-                              long id) {
-        this.inputs = new ArrayList<OperatorKey> ();
-        this.opTable = opTable;
-        this.scope = scope;
-        this.id = id;
+    /**
+     * Logical plan that this operator is a part of.
+     */
+    protected LogicalPlan mPlan;
 
-        opTable.put(getOperatorKey(), this);
-    } 
-    
-    protected LogicalOperator(Map<OperatorKey, LogicalOperator> opTable, 
-                              String scope, 
-                              long id, 
-                              List<OperatorKey> inputs) {
-        this.opTable = opTable;
-        this.inputs = inputs;
-        this.scope = scope;
-        this.id = id;
-        
-        opTable.put(getOperatorKey(), this);
+    /**
+     * Equivalent to LogicalOperator(k, 0).
+     * @param plan Logical plan this operator is a part of.
+     * @param k Operator key to assign to this node.
+     */
+    public LogicalOperator(LogicalPlan plan, OperatorKey k) {
+        this(plan, k, -1);
     }
 
-    protected LogicalOperator(Map<OperatorKey, LogicalOperator> opTable,
-                              String scope, 
-                              long id, 
-                              OperatorKey input) {
-        this.opTable = opTable;
-        this.inputs = new ArrayList<OperatorKey> ();
-        inputs.add(input);
-        this.scope = scope;
-        this.id = id;
-
-        opTable.put(getOperatorKey(), this);
+    /**
+     * @param plan Logical plan this operator is a part of.
+     * @param - k Operator key to assign to this node.
+     * @param = rp degree of requested parallelism with which to execute this node.
+     */
+    public LogicalOperator(LogicalPlan plan, OperatorKey k, int rp) {
+        super(k);
+        mPlan = plan;
+        mRequestedParallelism = rp;
     }
     
+    /**
+     * Get the operator key for this operator.
+     */
     public OperatorKey getOperatorKey() {
-        return new OperatorKey(scope, id);
+        return mKey;
     }
-    
-    public String getScope() {
-        return this.scope;
+
+    /**
+     * Set the output schema for this operator.  If a schema already
+     * exists, an attempt will be made to reconcile it with this new
+     * schema.
+     * @param schema Schema to set.
+     * @throws ParseException if there is already a schema and the existing
+     * schema cannot be reconciled with this new schema.
+     */
+    public final void setSchema(Schema schema) throws ParseException {
+        // In general, operators don't generate their schema until they're
+        // asked, so ask them to do it.
+        getSchema();
+        if (mSchema == null) mSchema = schema;
+        else mSchema.reconcile(schema);
     }
-    
-    public long getId() {
-        return this.id;
+
+    /**
+     * Get a copy of the schema for the output of this operator.
+     */
+    public abstract Schema getSchema();
+
+    /**
+     * Set the type of this operator.  This should only be called by the type
+     * checking routines.
+     * @param type - Type to set this operator to.
+     */
+    final public void setType(byte t) {
+        mType = t;
+    }
+
+    /**
+     * Get the type of this operator.
+     */
+    public byte getType() {
+        return mType;
     }
     
     public String getAlias() {
-        return alias;
+        return mAlias;
     }
 
     public void setAlias(String newAlias) {
-        alias = newAlias;
+        mAlias = newAlias;
     }
 
     public int getRequestedParallelism() {
-        return requestedParallelism;
+        return mRequestedParallelism;
     }
 
     public void setRequestedParallelism(int newRequestedParallelism) {
-        requestedParallelism = newRequestedParallelism;
+        mRequestedParallelism = newRequestedParallelism;
     }
 
-    @Override public String toString() {
-        StringBuffer result = new StringBuffer(super.toString());
-        result.append(" (alias: ");
-        result.append(alias);
-        result.append(", requestedParallelism: ");
-        result.append(requestedParallelism);
-        result.append(')');
-        return result.toString();
-    }
-
-    public abstract TupleSchema outputSchema();
-
-    public String name() {
-        return "ROOT " + scope + "-" + id;
-    }
-
-    public List<OperatorKey> getInputs() {
-        return inputs;
-    }
+    @Override
+    public String toString() {
+        StringBuffer msg = new StringBuffer();
+        
+        msg.append("(Name: " + name() + " Operator Key: " + mKey + ")");
 
-    public Map<OperatorKey, LogicalOperator> getOpTable() {
-        return opTable;
-    }
-    
-    public String arguments() {
-        return "";
+        return msg.toString();
     }
 
-    public List<String> getFuncs() {
-        List<String> funcs = new LinkedList<String>();
-        for (int i = 0; i < inputs.size(); i++) {
-            funcs.addAll(opTable.get(inputs.get(i)).getFuncs());
+    /**
+     * Given a schema, reconcile it with our existing schema.
+     * @param schema Schema to reconcile with the existing.
+     * @throws ParseException if the reconciliation is not possible.
+     */
+    protected void reconcileSchema(Schema schema) throws ParseException {
+        if (mSchema == null) {
+            mSchema = schema;
+            return;
         }
-        return funcs;
-    }
 
-    public abstract int getOutputType();
-
-    public void setSchema(TupleSchema schema) {
-        this.schema = schema;
+        // TODO
     }
 
-    /**
-     * Visit all of the logical operators in a tree, starting with this
-     * one.  
-     * @param v LOVisitor to visit this logical plan with.
-     */
-    public abstract void visit(LOVisitor v);
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalPlan.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalPlan.java?rev=636581&r1=636580&r2=636581&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalPlan.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/LogicalPlan.java Wed Mar 12 16:57:05 2008
@@ -17,94 +17,31 @@
  */
 package org.apache.pig.impl.logicalLayer;
 
+import java.io.IOException;
+import java.io.OutputStream;
 import java.io.Serializable;
-import java.util.LinkedList;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
-import java.io.OutputStream;
-import java.io.PrintStream;
 import java.util.Map;
 
-import org.apache.pig.impl.PigContext;
-
-import org.apache.pig.backend.executionengine.ExecLogicalPlan;
-
-public class LogicalPlan implements Serializable, ExecLogicalPlan{
-    private static final long serialVersionUID = 1L;
-
-    protected OperatorKey root;
-    protected Map<OperatorKey, LogicalOperator> opTable;
-    protected PigContext pigContext = null;
-    
-    protected String alias = null;
-    
-    public OperatorKey getRoot() {
-        return root;
-    }
-
-    public LogicalPlan(OperatorKey rootIn,
-                       Map<OperatorKey, LogicalOperator> opTable ,
-                       PigContext pigContext) {
-        this.pigContext = pigContext;
-        this.root = rootIn;
-        this.opTable = opTable;
-        alias = opTable.get(root).alias;
-    }
-    
-    public Map<OperatorKey, LogicalOperator> getOpTable() {
-        return this.opTable;
-    }
-    
-    public void setRoot(OperatorKey newRoot) {
-        root = newRoot;
-    }
-
-    public PigContext getPigContext() {
-        return pigContext;
-    }
+import org.apache.pig.impl.plan.OperatorPlan;
 
-    public String getAlias() {
-        return alias;
-    }
+public class LogicalPlan extends OperatorPlan<LogicalOperator> {
+    private static final long serialVersionUID = 2L;
 
-    public void setAlias(String newAlias) {
-        alias = newAlias;
+    public LogicalPlan() {
+        super();
     }
 
-    public List<String> getFuncs() {
-        if (root == null) return new LinkedList<String>();
-        else return opTable.get(root).getFuncs();
-    }
-    
-    // indentation for root is 0
-    @Override
-    public String toString() {        
-        StringBuffer sb = new StringBuffer();
-        sb.append(opTable.get(root).name() +"(" + opTable.get(root).arguments() +")\n");
-        sb.append(appendChildren(root, 1));
-        return sb.toString();
-    }
-    public String appendChildren(OperatorKey parent, int indentation) {
-        StringBuffer sb = new StringBuffer();
-        List<OperatorKey> children = opTable.get(parent).getInputs();
-        for(int i=0; i != children.size(); i++) {
-            for(int j=0; j != indentation; j++) {
-                sb.append("\t");
-            }
-            
-            sb.append(opTable.get(children.get(i)).name() + 
-                      "(" + opTable.get(children.get(i)).arguments()+ ")\n");
-            sb.append(appendChildren(children.get(i), indentation+1));
-        }
-        return sb.toString();
-    }
-    
-    public int getOutputType(){
-        return opTable.get(root).getOutputType();
-    }
-    
     public void explain(OutputStream out) {
+        // TODO
+        /*
         LOVisitor lprinter = new LOPrinter(new PrintStream(out));
         
         opTable.get(root).visit(lprinter);
+        */
     }
+
 }

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java?rev=636581&r1=636580&r2=636581&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/logicalLayer/schema/Schema.java Wed Mar 12 16:57:05 2008
@@ -17,59 +17,144 @@
  */
 package org.apache.pig.impl.logicalLayer.schema;
 
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.HashSet;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Set;
+import java.util.Map;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.pig.impl.util.ObjectSerializer;
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.logicalLayer.parser.ParseException;
 
+public class Schema {
 
+    public static class FieldSchema {
+        /**
+         * Alias for this field.
+         */
+        public String alias;
+
+        /**
+         * Datatype, using codes from {@link org.apache.pig.data.DataType}.
+         */
+        public byte type;
+
+        /**
+         * If this is a tuple itself, it can have a schema.  Otherwise this
+         * field must be null.
+         */
+        public Schema schema;
+
+        /**
+         * Constructor for any type.
+         * @param a Alias, if known.  If unknown leave null.
+         * @param t Type, using codes from {@link org.apache.pig.data.DataType}.
+         */
+        public FieldSchema(String a, byte t) {
+            alias = a;
+            type = t ;
+            schema = null;
+        }
 
-public abstract class Schema implements Serializable {
+        /**
+         * Constructor for tuple fields.
+         * @param a Alias, if known.  If unknown leave null.
+         * @param s Schema of this tuple.
+         */
+        public FieldSchema(String a, Schema s) {
+            alias = a;
+            type = DataType.TUPLE;
+            schema = s;
+        }
+    }
 
-    private final Log log = LogFactory.getLog(getClass());
-    
-    protected Set<String> aliases = new HashSet<String>();    
+    private List<FieldSchema> mFields;
+    private Map<String, FieldSchema> mAliases;
 
-    public Schema copy(){
-        try{
-            return (Schema)ObjectSerializer.deserialize(ObjectSerializer.serialize(this));
-        }catch (IOException e){
-            log.error(e);
-            throw new RuntimeException(e);
+    /**
+     * @param fields List of field schemas that describes the fields.
+     */
+    public Schema(List<FieldSchema> fields) {
+        mFields = fields;
+        mAliases = new HashMap<String, FieldSchema>(fields.size());
+        for (FieldSchema fs : fields) {
+            if (fs.alias != null) mAliases.put(fs.alias, fs);
         }
     }
-    public abstract int colFor(String alias);
-    public abstract Schema schemaFor(int col);
-    
-   
-    public abstract List<Schema> flatten();
 
-    public void setAlias(String alias) {
-        if (alias!=null)
-            aliases.add(alias);
-    }
-    
-    public void removeAlias(String alias){
-        aliases.remove(alias);
-    }
-    
-    public void removeAllAliases(){
-        aliases.clear();
+    /**
+     * Create a schema with only one field.
+     * @param fieldSchema field to put in this schema.
+     */
+    public Schema(FieldSchema fieldSchema) {
+        mFields = new ArrayList<FieldSchema>(1);
+        mFields.add(fieldSchema);
+        if (fieldSchema.alias != null) {
+            mAliases.put(fieldSchema.alias, fieldSchema);
+        }
     }
-    public Set<String> getAliases() {
-        return aliases;
+
+    /**
+     * Given an alias name, find the associated FieldSchema.
+     * @param alias Alias to look up.
+     * @return FieldSchema, or null if no such alias is in this tuple.
+     */
+    public FieldSchema getField(String alias) {
+        return mAliases.get(alias);
+    }
+
+    /**
+     * Given a field number, find the associated FieldSchema.
+     * @param fieldNum Field number to look up.
+     * @return FieldSchema for this field.
+     * @throws ParseException if the field number exceeds the number of
+     * fields in the tuple.
+     */
+    public FieldSchema getField(int fieldNum) throws ParseException {
+        if (fieldNum >= mFields.size()) {
+            throw new ParseException("Attempt to fetch field " + fieldNum +
+                " from tuple of size " + mFields.size());
+        }
+
+        return mFields.get(fieldNum);
     }
-    public String getAlias() {
-        Iterator<String> iter = aliases.iterator();
-        if (iter.hasNext())
-            return iter.next();
-        else
-            return null;
+
+    /**
+     * Find the number of fields in the schema.
+     * @return number of fields.
+     */
+    public int size() {
+        return mFields.size();
+    }
+
+    /**
+     * Reconcile this schema with another schema.  The schema being reconciled
+     * with should have the same number of columns.  The use case is where a
+     * schema already exists but may not have alias and or type information.  If
+     * an alias exists in this schema and a new one is given, then the new one
+     * will be used.  Similarly with types, though this needs to be used
+     * carefully, as types should not be lightly changed.
+     * @param other Schema to reconcile with.
+     * @throws ParseException if this cannot be reconciled.
+     */
+    public void reconcile(Schema other) throws ParseException {
+        if (other.size() != size()) {
+            throw new ParseException("Cannot reconcile schemas with different "
+                + "sizes.  This schema has size " + size() + " other has size " 
+                + "of " + other.size());
+        }
+
+        Iterator<FieldSchema> i = other.mFields.iterator();
+        for (int j = 0; i.hasNext(); j++) {
+            FieldSchema otherFs = i.next();
+            FieldSchema ourFs = mFields.get(j);
+            if (otherFs.alias != null) ourFs.alias = otherFs.alias; 
+            if (otherFs.type != DataType.UNKNOWN) ourFs.type = otherFs.type; 
+            if (otherFs.schema != null) ourFs.schema = otherFs.schema; 
+        }
+
     }
 }
+
+
+

Modified: incubator/pig/branches/types/src/org/apache/pig/impl/plan/OperatorPlan.java
URL: http://svn.apache.org/viewvc/incubator/pig/branches/types/src/org/apache/pig/impl/plan/OperatorPlan.java?rev=636581&r1=636580&r2=636581&view=diff
==============================================================================
--- incubator/pig/branches/types/src/org/apache/pig/impl/plan/OperatorPlan.java (original)
+++ incubator/pig/branches/types/src/org/apache/pig/impl/plan/OperatorPlan.java Wed Mar 12 16:57:05 2008
@@ -33,21 +33,21 @@
 /**
  * A generic graphing class for use by LogicalPlan, PhysicalPlan, etc.
  */
-public abstract class OperatorPlan implements Iterable {
-    protected Map<Operator, OperatorKey> mOps;
-    protected Map<OperatorKey, Operator> mKeys;
+public abstract class OperatorPlan<E extends Operator> implements Iterable {
+    protected Map<E, OperatorKey> mOps;
+    protected Map<OperatorKey, E> mKeys;
     protected MultiValueMap mFromEdges;
     protected MultiValueMap mToEdges;
 
-    private List<Operator> mRoots;
-    private List<Operator> mLeaves;
+    private List<E> mRoots;
+    private List<E> mLeaves;
 
     
     public OperatorPlan() {
-        mRoots = new ArrayList<Operator>();
-        mLeaves = new ArrayList<Operator>();
-        mOps = new HashMap<Operator, OperatorKey>();
-        mKeys = new HashMap<OperatorKey, Operator>();
+        mRoots = new ArrayList<E>();
+        mLeaves = new ArrayList<E>();
+        mOps = new HashMap<E, OperatorKey>();
+        mKeys = new HashMap<OperatorKey, E>();
         mFromEdges = new MultiValueMap();
         mToEdges = new MultiValueMap();
     }
@@ -56,9 +56,9 @@
      * Get a list of all nodes in the graph that are roots.  A root is defined to
      * be a node that has no input.
      */
-    public List<Operator> getRoots() {
+    public List<E> getRoots() {
         if (mRoots.size() == 0 && mOps.size() > 0) {
-            for (Operator op : mOps.keySet()) {
+            for (E op : mOps.keySet()) {
                 if (mToEdges.getCollection(op) == null) {
                     mRoots.add(op);
                 }
@@ -71,9 +71,9 @@
      * Get a list of all nodes in the graph that are leaves.  A leaf is defined to
      * be a node that has no output.
      */
-    public List<Operator> getLeaves() {
+    public List<E> getLeaves() {
         if (mLeaves.size() == 0 && mOps.size() > 0) {
-            for (Operator op : mOps.keySet()) {
+            for (E op : mOps.keySet()) {
                 if (mFromEdges.getCollection(op) == null) {
                     mLeaves.add(op);
                 }
@@ -87,7 +87,7 @@
      * @param op Logical operator.
      * @return associated OperatorKey
      */
-    public OperatorKey getOperatorKey(Operator op) {
+    public OperatorKey getOperatorKey(E op) {
         return mOps.get(op);
     }
 
@@ -96,7 +96,7 @@
      * @param opKey OperatorKey
      * @return associated operator.
      */
-    public Operator getOperator(OperatorKey opKey) {
+    public E getOperator(OperatorKey opKey) {
         return mKeys.get(opKey);
     }
 
@@ -106,7 +106,7 @@
      * be done as a separate step using connect.
      * @param op Operator to add to the plan.
      */
-    public void add(Operator op) {
+    public void add(E op) {
         markDirty();
         mOps.put(op, op.getOperatorKey());
         mKeys.put(op.getOperatorKey(), op);
@@ -121,7 +121,7 @@
      * operator that does not support multiple inputs or create multiple outputs
      * for an operator that does not support multiple outputs.
      */
-    public void connect(Operator from, Operator to) throws IOException {
+    public void connect(E from, E to) throws IOException {
         markDirty();
 
         // Check that both nodes are in the plan.
@@ -157,7 +157,7 @@
      * @return true if the nodes were connected according to the specified data
      * flow, false otherwise.
      */
-    public boolean disconnect(Operator from, Operator to) {
+    public boolean disconnect(E from, E to) {
         markDirty();
 
         boolean sawNull = false;
@@ -172,7 +172,7 @@
      * be removed as well.
      * @param op Operator to remove.
      */
-    public void remove(Operator op) {
+    public void remove(E op) {
         markDirty();
 
         removeEdges(op, mFromEdges, mToEdges);
@@ -189,8 +189,8 @@
      * @param op Node to look to
      * @return Collection of nodes.
      */
-    public Collection<Operator> getPredecessors(Operator op) {
-        return (Collection<Operator>)mToEdges.getCollection(op);
+    public List<E> getPredecessors(E op) {
+        return (List<E>)mToEdges.getCollection(op);
     }
 
 
@@ -200,11 +200,11 @@
      * @param op Node to look from
      * @return Collection of nodes.
      */
-    public Collection<Operator> getSuccessors(Operator op) {
-        return (Collection<Operator>)mFromEdges.getCollection(op);
+    public List<E> getSuccessors(E op) {
+        return (List<E>)mFromEdges.getCollection(op);
     }
 
-    public Iterator<Operator> iterator() { 
+    public Iterator<E> iterator() { 
         return mOps.keySet().iterator();
     }
 
@@ -213,7 +213,7 @@
         mLeaves.clear();
     }
 
-    private void removeEdges(Operator op,
+    private void removeEdges(E op,
                              MultiValueMap fromMap,
                              MultiValueMap toMap) {
         // Find all of the from edges, as I have to remove all the associated to
@@ -225,13 +225,13 @@
         ArrayList al = new ArrayList(c);
         Iterator i = al.iterator();
         while (i.hasNext()) {
-            Operator to = (Operator)i.next();
+            E to = (E)i.next();
             toMap.remove(to, op);
             fromMap.remove(op, to);
         }
     }
 
-    private void checkInPlan(Operator op) throws IOException {
+    private void checkInPlan(E op) throws IOException {
         if (mOps.get(op) == null) {
             throw new IOException("Attempt to connect operator " +
                 op.name() + " which is not in the plan.");