You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2010/08/04 19:46:48 UTC

svn commit: r982345 [5/13] - in /hadoop/pig/trunk: ./ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/experimental/ src/org/apache/pig/newplan/ src/org/apache/pig/newplan/logical/ src/org/apache/pig/newplan/log...

Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOInnerLoad.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOInnerLoad.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOInnerLoad.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOInnerLoad.java Wed Aug  4 17:46:42 2010
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.newplan.logical.relational;
+
+import java.io.IOException;
+
+import org.apache.pig.data.DataType;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.PlanVisitor;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.logical.expression.ProjectExpression;
+import org.apache.pig.newplan.logical.relational.LogicalSchema.LogicalFieldSchema;
+
+/**
+ * Operator to map the data into the inner plan of LOForEach
+ * It can only be used in the inner plan of LOForEach
+ *
+ */
+public class LOInnerLoad extends LogicalRelationalOperator {    
+    private ProjectExpression prj; 
+    private LOForEach foreach;
+    private boolean sourceIsBag = false;
+
+    public LOInnerLoad(OperatorPlan plan, LOForEach foreach, int colNum) {
+        super("LOInnerLoad", plan);        
+        
+        // store column number as a ProjectExpression in a plan 
+        // to be able to dynamically adjust column number during optimization
+        LogicalExpressionPlan exp = new LogicalExpressionPlan();
+        
+        // we don't care about type, so set to -1
+        prj = new ProjectExpression(exp, 0, colNum, foreach);
+        this.foreach = foreach;
+    }
+
+    @Override
+    public LogicalSchema getSchema() {
+        if (schema!=null)
+            return schema;
+        
+        try {
+            if (prj.getFieldSchema()!=null) {
+                schema = new LogicalSchema();
+                if (prj.getFieldSchema().type==DataType.BAG && prj.getFieldSchema().schema.isTwoLevelAccessRequired()) {
+                    LogicalFieldSchema tupleSchema = prj.getFieldSchema().schema.getField(0);
+                    for (int i=0;i<tupleSchema.schema.size();i++)
+                        schema.addField(tupleSchema.schema.getField(i));
+                    sourceIsBag = true;
+                    alias = prj.getFieldSchema().alias;
+                }
+                else if (prj.getFieldSchema().type==DataType.BAG){
+                    for (int i=0;i<prj.getFieldSchema().schema.size();i++)
+                        schema.addField(prj.getFieldSchema().schema.getField(i));
+                    sourceIsBag = true;
+                    alias = prj.getFieldSchema().alias;
+                }
+                else {
+                    schema.addField(prj.getFieldSchema());
+                }
+            }
+        } catch (IOException e) {
+            // TODO
+        }
+        return schema;
+    }
+    
+    public ProjectExpression getProjection() {
+        return prj;
+    }
+
+    @Override
+    public boolean isEqual(Operator other) {
+        if (!(other instanceof LOInnerLoad)) {
+            return false;
+        }
+        
+        return (getColNum() == ((LOInnerLoad)other).getColNum());
+    }    
+    
+    @Override
+    public void accept(PlanVisitor v) throws IOException {
+         if (!(v instanceof LogicalRelationalNodesVisitor)) {
+             throw new IOException("Expected LogicalPlanVisitor");
+         }
+         ((LogicalRelationalNodesVisitor)v).visit(this);
+    }
+
+    public int getColNum() {
+        return prj.getColNum();
+    }
+    
+    /**
+     * Get the LOForEach operator that contains this operator as part of inner plan
+     * @return the LOForEach operator
+     */
+    public LOForEach getLOForEach() {
+        return foreach;
+    }
+    
+    public boolean sourceIsBag() {
+        return sourceIsBag;
+    }
+}

Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOJoin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOJoin.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOJoin.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOJoin.java Wed Aug  4 17:46:42 2010
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.newplan.logical.relational;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+
+//import org.apache.commons.logging.Log;
+//import org.apache.commons.logging.LogFactory;
+import org.apache.pig.impl.util.MultiMap;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.PlanVisitor;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+
+
+public class LOJoin extends LogicalRelationalOperator {
+    private static final long serialVersionUID = 2L;
+
+    /**
+     * Enum for the type of join
+     */
+    public static enum JOINTYPE {
+        HASH,    // Hash Join
+        REPLICATED, // Fragment Replicated join
+        SKEWED, // Skewed Join
+        MERGE   // Sort Merge Join
+    };
+
+    
+    /**
+     * LOJoin contains a list of logical operators corresponding to the
+     * relational operators and a list of generates for each relational
+     * operator. Each generate operator in turn contains a list of expressions
+     * for the columns that are projected
+     */
+    //private static Log log = LogFactory.getLog(LOJoin.class);
+    // expression plans for each input. 
+    private MultiMap<Integer, LogicalExpressionPlan> mJoinPlans;
+    // indicator for each input whether it is inner
+    private boolean[] mInnerFlags;
+    private JOINTYPE mJoinType; // Retains the type of the join
+    
+    public LOJoin(LogicalPlan plan) {
+        super("LOJoin", plan);     
+    }
+    
+    public LOJoin(LogicalPlan plan,
+                MultiMap<Integer, LogicalExpressionPlan> joinPlans,
+                JOINTYPE jt,
+                boolean[] isInner) {
+        super("LOJoin", plan);
+        mJoinPlans = joinPlans;
+        mJoinType = jt;
+        mInnerFlags = isInner;
+    }
+
+    public boolean isInner(int inputIndex) {
+        return mInnerFlags[inputIndex];
+    }
+    
+    public boolean[] getInnerFlags() {
+        return mInnerFlags;
+    }
+    
+    public JOINTYPE getJoinType() {
+        return mJoinType;
+    }
+    
+    public Collection<LogicalExpressionPlan> getJoinPlan(int inputIndex) {
+        return mJoinPlans.get(inputIndex);
+    }
+    
+    /**
+     * Get all of the expressions plans that are in this join.
+     * @return collection of all expression plans.
+     */
+    public Collection<LogicalExpressionPlan> getExpressionPlans() {
+        return mJoinPlans.values();
+    }
+    
+    @Override
+    public LogicalSchema getSchema() {
+        // if schema is calculated before, just return
+        if (schema != null) {
+            return schema;
+        }
+        
+        List<Operator> inputs = null;
+        try {
+            inputs = plan.getPredecessors(this);
+            if (inputs == null) {
+                return null;
+            }
+        }catch(Exception e) {
+            throw new RuntimeException("Unable to get predecessors of LOJoin operator. ", e);
+        }
+        
+        List<LogicalSchema.LogicalFieldSchema> fss = new ArrayList<LogicalSchema.LogicalFieldSchema>();
+        
+        for (Operator op : inputs) {
+            LogicalSchema inputSchema = ((LogicalRelationalOperator)op).getSchema();
+            // the schema of one input is unknown, so the join schema is unknown, just return 
+            if (inputSchema == null) {
+                schema = null;
+                return schema;
+            }
+                               
+            for (int i=0; i<inputSchema.size(); i++) {
+                 LogicalSchema.LogicalFieldSchema fs = inputSchema.getField(i);
+                 LogicalSchema.LogicalFieldSchema newFS = null;
+                 if(fs.alias != null) {                    
+                     newFS = new LogicalSchema.LogicalFieldSchema(((LogicalRelationalOperator)op).getAlias()+"::"+fs.alias ,fs.schema, fs.type, fs.uid);                    
+                 } else {
+                     newFS = new LogicalSchema.LogicalFieldSchema(fs.alias, fs.schema, fs.type, fs.uid);
+                 }                       
+                 fss.add(newFS);                 
+            }            
+        }        
+
+        schema = new LogicalSchema();
+        for(LogicalSchema.LogicalFieldSchema fieldSchema: fss) {
+            schema.addField(fieldSchema);
+        }         
+        
+        return schema;
+    }
+    
+    @Override
+    public void accept(PlanVisitor v) throws IOException {
+        if (!(v instanceof LogicalRelationalNodesVisitor)) {
+            throw new IOException("Expected LogicalPlanVisitor");
+        }
+        ((LogicalRelationalNodesVisitor)v).visit(this);
+
+    }
+    
+    @Override
+    public boolean isEqual(Operator other) {
+        if (other != null && other instanceof LOJoin) {
+            LOJoin oj = (LOJoin)other;
+            if (mJoinType != oj.mJoinType) return false;
+            if (mInnerFlags.length != oj.mInnerFlags.length) return false;
+            for (int i = 0; i < mInnerFlags.length; i++) {
+                if (mInnerFlags[i] != oj.mInnerFlags[i]) return false;
+            }
+            if (!checkEquality(oj)) return false;
+            
+            if (mJoinPlans.size() != oj.mJoinPlans.size())  return false;
+            
+            // Now, we need to make sure that for each input we are projecting
+            // the same columns.  This is slightly complicated since MultiMap
+            // doesn't return any particular order, so we have to find the 
+            // matching input in each case.
+            for (Integer p : mJoinPlans.keySet()) {
+                Iterator<Integer> iter = oj.mJoinPlans.keySet().iterator();
+                int op = -1;
+                while (iter.hasNext()) {
+                    op = iter.next();
+                    if (p.equals(op)) break;
+                }
+                if (op != -1) {
+                    Collection<LogicalExpressionPlan> c = mJoinPlans.get(p);
+                    Collection<LogicalExpressionPlan> oc = oj.mJoinPlans.get(op);
+                    if (c.size() != oc.size()) return false;
+                    
+                    if (!(c instanceof List) || !(oc instanceof List)) {
+                        throw new RuntimeException(
+                            "Expected list of expression plans");
+                    }
+                    List<LogicalExpressionPlan> elist = (List<LogicalExpressionPlan>)c;
+                    List<LogicalExpressionPlan> oelist = (List<LogicalExpressionPlan>)oc;
+                    for (int i = 0; i < elist.size(); i++) {
+                        if (!elist.get(i).isEqual(oelist.get(i))) return false;
+                    }
+                } else {
+                    return false;
+                }
+            }
+            return true;
+        } else {
+            return false;
+        }
+    }
+}

Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLimit.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLimit.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLimit.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLimit.java Wed Aug  4 17:46:42 2010
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.newplan.logical.relational;
+
+import java.io.IOException;
+
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.PlanVisitor;
+
+public class LOLimit extends LogicalRelationalOperator {
+
+    private long mLimit;
+    
+    private static final long serialVersionUID = 2L;
+    //private static Log log = LogFactory.getLog(LOFilter.class);
+
+        
+    public LOLimit(LogicalPlan plan, long limit) {
+        super("LOLimit", plan);
+        mLimit = limit;
+    }
+
+    public long getLimit() {
+        return mLimit;
+    }
+
+    public void setLimit(long limit) {
+        mLimit = limit;
+    }
+    
+    @Override
+    public LogicalSchema getSchema() {
+        LogicalRelationalOperator input = null;
+        try {
+            input = (LogicalRelationalOperator)plan.getPredecessors(this).get(0);
+        }catch(Exception e) {
+            throw new RuntimeException("Unable to get predecessor of LOLimit.", e);
+        }
+        
+        return input.getSchema();
+    }   
+    
+    @Override
+    public void accept(PlanVisitor v) throws IOException {
+        if (!(v instanceof LogicalRelationalNodesVisitor)) {
+            throw new IOException("Expected LogicalPlanVisitor");
+        }
+        ((LogicalRelationalNodesVisitor)v).visit(this);
+    }
+    
+    @Override
+    public boolean isEqual(Operator other) {
+        if (other != null && other instanceof LOLimit && ((LOLimit)other).getLimit() == mLimit)
+            return checkEquality((LogicalRelationalOperator)other);
+        else
+            return false;
+    }
+}

Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLoad.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLoad.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLoad.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLoad.java Wed Aug  4 17:46:42 2010
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan.logical.relational;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.LoadFunc;
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.PlanVisitor;
+
+public class LOLoad extends LogicalRelationalOperator {
+    
+    private LogicalSchema scriptSchema;
+    private FileSpec fs;
+    private transient LoadFunc loadFunc;
+    transient private Configuration conf;
+    private LogicalSchema determinedSchema;
+    private List<Integer> requiredFields = null;
+    private boolean castInserted = false;
+    private LogicalSchema uidOnlySchema;
+
+    /**
+     * 
+     * @param loader FuncSpec for load function to use for this load.
+     * @param schema schema user specified in script, or null if not
+     * specified.
+     * @param plan logical plan this load is part of.
+     */
+    public LOLoad(FileSpec loader, LogicalSchema schema, LogicalPlan plan, Configuration conf) {
+       super("LOLoad", plan);
+       scriptSchema = schema;
+       fs = loader;
+       this.conf = conf;
+    }
+    
+    public LoadFunc getLoadFunc() {
+        try { 
+            if (loadFunc == null) {
+                loadFunc = (LoadFunc)PigContext.instantiateFuncFromSpec(fs.getFuncSpec());
+                loadFunc.setUDFContextSignature(getAlias());               
+            }
+            
+            return loadFunc;
+        }catch (ClassCastException cce) {
+            throw new RuntimeException(fs.getFuncSpec() + " should implement the LoadFunc interface.");    		
+        }
+    }
+    
+    public void setScriptSchema(LogicalSchema schema) {
+        scriptSchema = schema;
+    }
+    
+    public void setRequiredFields(List<Integer> requiredFields) {
+        this.requiredFields = requiredFields;
+    }
+    
+    /**
+     * Get the schema for this load.  The schema will be either be what was
+     * given by the user in the script or what the load functions getSchema
+     * call returned.  Otherwise null will be returned, indicating that the
+     * schema is unknown.
+     * @return schema, or null if unknown
+     */
+    @Override
+    public LogicalSchema getSchema() {
+        if (schema != null)
+            return schema;
+        
+        LogicalSchema originalSchema = null;
+        // TODO get schema from LoaderMetadata interface.
+        if (determinedSchema!=null) {
+            determinedSchema = getSchemaFromMetaData();
+        }
+        
+        if (scriptSchema != null && determinedSchema != null) {
+            originalSchema = LogicalSchema.merge(scriptSchema, determinedSchema);
+        } else if (scriptSchema != null)  originalSchema = scriptSchema;
+        else if (determinedSchema != null) originalSchema = determinedSchema;
+        
+        if (isCastInserted()) {
+            for (int i=0;i<originalSchema.size();i++) {
+                LogicalSchema.LogicalFieldSchema fs = originalSchema.getField(i);
+                if(determinedSchema == null) {
+                    // Reset the loads field schema to byte array so that it
+                    // will reflect reality.
+                    fs.type = DataType.BYTEARRAY;
+                } else {
+                    // Reset the type to what determinedSchema says it is
+                    fs.type = determinedSchema.getField(i).type;
+                }
+            }
+        }
+        
+        if (originalSchema!=null) {
+            try {
+                uidOnlySchema = originalSchema.mergeUid(uidOnlySchema);
+            } catch (IOException e) {
+                //TODO Exception
+            }
+        }
+        
+        if (requiredFields!=null) {
+            schema = new LogicalSchema();
+            for (int i=0;i<originalSchema.size();i++) {
+                if (requiredFields.contains(i))
+                    schema.addField(originalSchema.getField(i));
+            }
+        } else
+            schema = originalSchema;
+        
+        return schema;
+    }
+
+    private LogicalSchema getSchemaFromMetaData() {
+        return null;
+    }
+
+    public FileSpec getFileSpec() {
+        return fs;
+    }
+    
+    @Override
+    public void accept(PlanVisitor v) throws IOException {
+        if (!(v instanceof LogicalRelationalNodesVisitor)) {
+            throw new IOException("Expected LogicalPlanVisitor");
+        }
+        ((LogicalRelationalNodesVisitor)v).visit(this);
+
+    }
+    
+    public LogicalSchema getDeterminedSchema() {
+        return determinedSchema;
+    }
+    
+    @Override
+    public boolean isEqual(Operator other) {
+        if (other != null && other instanceof LOLoad) {
+            LOLoad ol = (LOLoad)other;
+            if (!checkEquality(ol)) return false;
+            if (fs == null) {
+                if (ol.fs == null) {
+                    return true;
+                }else{
+                    return false;
+                }
+            }
+            
+            return fs.equals(ol.fs);
+        } else {
+            return false;
+        }
+    }
+    
+    public void setCastInserted(boolean flag) {
+        castInserted = flag;
+    }
+    
+    public boolean isCastInserted() {
+        return castInserted;
+    }
+    
+    public Configuration getConfiguration() {
+        return conf;
+    }
+}

Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOSort.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOSort.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOSort.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOSort.java Wed Aug  4 17:46:42 2010
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.newplan.logical.relational;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.pig.FuncSpec;
+import org.apache.pig.SortColInfo;
+import org.apache.pig.SortInfo;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.PlanVisitor;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.logical.expression.ProjectExpression;
+
+public class LOSort extends LogicalRelationalOperator{
+    private List<Boolean> mAscCols;
+    private FuncSpec mSortFunc;
+    private boolean mIsStar = false;
+    private long limit;
+    private List<LogicalExpressionPlan> mSortColPlans;
+
+    public LOSort(OperatorPlan plan, List<LogicalExpressionPlan> sortColPlans,
+            List<Boolean> ascCols,
+            FuncSpec sortFunc ) {
+        super("LOSort", plan);
+        mSortColPlans = sortColPlans;
+        mAscCols = ascCols;
+        mSortFunc = sortFunc;
+        limit = -1;
+    }
+
+    public List<LogicalExpressionPlan> getSortColPlans() {
+        return mSortColPlans;
+    }
+
+    public void setSortColPlans(List<LogicalExpressionPlan> sortPlans) {
+        mSortColPlans = sortPlans;
+    }
+
+    public List<Boolean> getAscendingCols() {
+        return mAscCols;
+    }
+
+    public void setAscendingCols(List<Boolean> ascCols) {
+        mAscCols = ascCols;
+    }
+
+    public FuncSpec getUserFunc() {
+        return mSortFunc;
+    }
+
+    public void setUserFunc(FuncSpec func) {
+        mSortFunc = func;
+    }
+
+    public boolean isStar() {
+        return mIsStar;
+    }
+
+    public void setStar(boolean b) {
+        mIsStar = b;
+    }
+
+    public void setLimit(long l)
+    {
+        limit = l;
+    }
+    
+    public long getLimit()
+    {
+        return limit;
+    }
+    
+    public boolean isLimited()
+    {
+        return (limit!=-1);
+    }
+
+    @Override
+    public LogicalSchema getSchema() {
+        LogicalRelationalOperator input = null;
+        try {
+            input = (LogicalRelationalOperator)plan.getPredecessors(this).get(0);
+        } catch (IOException e) {
+            throw new RuntimeException("Unable to get predecessor of LOSort.", e);
+        }
+        return input.getSchema();
+    }
+
+    @Override
+    public void accept(PlanVisitor v) throws IOException {
+        if (!(v instanceof LogicalRelationalNodesVisitor)) {
+            throw new IOException("Expected LogicalPlanVisitor");
+        }
+        ((LogicalRelationalNodesVisitor)v).visit(this);
+        
+    }
+    
+    public SortInfo getSortInfo() throws IOException {
+        LogicalSchema schema = this.getSchema();
+        List<SortColInfo> sortColInfoList = new ArrayList<SortColInfo>();
+        for (int i = 0; i < mSortColPlans.size(); i++) {
+            LogicalExpressionPlan lp = mSortColPlans.get(i);
+            Iterator<Operator> opsIterator = lp.getOperators();
+            List<Operator> opsList = new ArrayList<Operator>();
+            while(opsIterator.hasNext()) {
+                opsList.add(opsIterator.next());
+            }
+            if(opsList.size() != 1 || !(opsList.get(0) instanceof ProjectExpression)) {
+                throw new IOException("Unsupported operator in inner plan: " + opsList.get(0));
+            }
+            ProjectExpression project = (ProjectExpression) opsList.get(0);
+            int sortColIndex = project.getColNum();
+            String sortColName = (schema == null) ? null :
+                schema.getField(sortColIndex).alias;
+            sortColInfoList.add(new SortColInfo(sortColName, sortColIndex, 
+                    mAscCols.get(i)? SortColInfo.Order.ASCENDING :
+                        SortColInfo.Order.DESCENDING));
+        }
+        return new SortInfo(sortColInfoList);
+    }
+
+    @Override
+    public boolean isEqual(Operator other) {
+        if (other != null && other instanceof LOSort) {
+            LOSort otherSort = (LOSort)other;
+            if (!mAscCols.equals(otherSort.getAscendingCols()))
+                return false;
+            if (mSortFunc.equals(otherSort.getUserFunc()))
+                return false;
+            if (mIsStar!=otherSort.isStar())
+                return false;
+            if (limit!=otherSort.getLimit())
+                return false;
+            if (mSortColPlans.equals(otherSort.getSortColPlans()))
+                return false;
+        }
+        return checkEquality((LogicalRelationalOperator)other);
+    }
+}

Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOSplit.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOSplit.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOSplit.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOSplit.java Wed Aug  4 17:46:42 2010
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan.logical.relational;
+
+import java.io.IOException;
+
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.PlanVisitor;
+
+public class LOSplit extends LogicalRelationalOperator {
+
+    public LOSplit(OperatorPlan plan) {
+        super("LOSplit", plan);
+    }
+
+    @Override
+    public LogicalSchema getSchema() {
+        LogicalRelationalOperator input = null;
+        try {
+            input = (LogicalRelationalOperator)plan.getPredecessors(this).get(0);
+        }catch(Exception e) {
+            throw new RuntimeException("Unable to get predecessor of LOSplit.", e);
+        }
+        
+        return input.getSchema();
+    }
+
+    @Override
+    public void accept(PlanVisitor v) throws IOException {
+        if (!(v instanceof LogicalRelationalNodesVisitor)) {
+            throw new IOException("Expected LogicalPlanVisitor");
+        }
+        ((LogicalRelationalNodesVisitor)v).visit(this);
+    }
+
+    @Override
+    public boolean isEqual(Operator other) {
+        if (other != null && other instanceof LOSplit) { 
+            return checkEquality((LOSplit)other);
+        } else {
+            return false;
+        }
+    }
+
+}

Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOSplitOutput.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOSplitOutput.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOSplitOutput.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOSplitOutput.java Wed Aug  4 17:46:42 2010
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan.logical.relational;
+
+import java.io.IOException;
+
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.PlanVisitor;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+
+public class LOSplitOutput extends LogicalRelationalOperator {
+    private LogicalExpressionPlan filterPlan;
+    public LOSplitOutput(LogicalPlan plan) {
+        super("LOSplitOutput", plan);       
+    }
+    
+    public LOSplitOutput(LogicalPlan plan, LogicalExpressionPlan filterPlan) {
+        super("LOSplitOutput", plan);
+        this.filterPlan = filterPlan;
+    }
+    
+    public LogicalExpressionPlan getFilterPlan() {
+        return filterPlan;
+    }
+    
+    public void setFilterPlan(LogicalExpressionPlan filterPlan) {
+        this.filterPlan = filterPlan;
+    }
+    
+    @Override
+    public LogicalSchema getSchema() {
+        LogicalRelationalOperator input = null;
+        try {
+            input = (LogicalRelationalOperator)plan.getPredecessors(this).get(0);
+        }catch(Exception e) {
+            throw new RuntimeException("Unable to get predecessor of LOSplit.", e);
+        }
+        
+        return input.getSchema();
+    }   
+    
+    @Override
+    public void accept(PlanVisitor v) throws IOException {
+        if (!(v instanceof LogicalRelationalNodesVisitor)) {
+            throw new IOException("Expected LogicalPlanVisitor");
+        }
+        ((LogicalRelationalNodesVisitor)v).visit(this);
+    }
+    
+    @Override
+    public boolean isEqual(Operator other) {
+        if (other != null && other instanceof LOSplitOutput) { 
+            LOSplitOutput os = (LOSplitOutput)other;
+            return filterPlan.isEqual(os.filterPlan) && checkEquality(os);
+        } else {
+            return false;
+        }
+    }
+}

Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOStore.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOStore.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOStore.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOStore.java Wed Aug  4 17:46:42 2010
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.newplan.logical.relational;
+
+import java.io.IOException;
+
+//import org.apache.commons.logging.Log;
+//import org.apache.commons.logging.LogFactory;
+import org.apache.pig.StoreFuncInterface;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.PlanVisitor;
+
+public class LOStore extends LogicalRelationalOperator {
+    private static final long serialVersionUID = 2L;
+
+    private FileSpec output;  
+    transient private StoreFuncInterface storeFunc;
+    
+    //private static Log log = LogFactory.getLog(LOStore.class);
+    
+    public LOStore(LogicalPlan plan) {
+        super("LOStore", plan);
+    }
+    
+    public LOStore(LogicalPlan plan, FileSpec outputFileSpec) {
+        super("LOStore", plan);
+
+        output = outputFileSpec;
+      
+        try { 
+             storeFunc = (StoreFuncInterface) PigContext.instantiateFuncFromSpec(outputFileSpec.getFuncSpec()); 
+        } catch (Exception e) { 
+            throw new RuntimeException("Failed to instantiate StoreFunc.", e);
+        }
+    }
+    
+    public FileSpec getOutputSpec() {
+        return output;
+    }
+    
+    public StoreFuncInterface getStoreFunc() {
+        return storeFunc;
+    }
+    
+    @Override
+    public LogicalSchema getSchema() {
+        try {
+            schema = ((LogicalRelationalOperator)plan.getPredecessors(this).get(0)).getSchema();
+        }catch(Exception e) {
+            throw new RuntimeException("Unable to get predecessor of LOStore.", e);
+        }
+        return schema;
+    }
+
+    @Override
+    public void accept(PlanVisitor v) throws IOException {
+        if (!(v instanceof LogicalRelationalNodesVisitor)) {
+            throw new IOException("Expected LogicalPlanVisitor");
+        }
+        ((LogicalRelationalNodesVisitor)v).visit(this);
+    }
+
+    @Override
+    public boolean isEqual(Operator other) {
+        if (other != null && other instanceof LOStore) {
+            LOStore os = (LOStore)other;
+            if (!checkEquality(os)) return false;
+            // No need to test that storeFunc is equal, since it's
+            // being instantiated from output
+            if (output == null && os.output == null) {
+                return true;
+            } else if (output == null || os.output == null) {
+                return false;
+            } else {
+                return output.equals(os.output);
+            }
+        } else {
+            return false;
+        }
+    }
+}

Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOStream.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOStream.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOStream.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOStream.java Wed Aug  4 17:46:42 2010
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.newplan.logical.relational;
+
+import java.io.IOException;
+
+import org.apache.pig.impl.streaming.ExecutableManager;
+import org.apache.pig.impl.streaming.StreamingCommand;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.PlanVisitor;
+
+public class LOStream extends LogicalRelationalOperator {
+
+    private static final long serialVersionUID = 2L;
+    //private static Log log = LogFactory.getLog(LOFilter.class);
+    
+    // the StreamingCommand object for the
+    // Stream Operator this operator represents
+    private StreamingCommand command;
+    transient private ExecutableManager executableManager;
+        
+    public LOStream(LogicalPlan plan, ExecutableManager exeManager, StreamingCommand cmd) {
+        super("LODistinct", plan);
+        command = cmd;
+        executableManager = exeManager;
+    }
+    
+    /**
+     * Get the StreamingCommand object associated
+     * with this operator
+     * 
+     * @return the StreamingCommand object
+     */
+    public StreamingCommand getStreamingCommand() {
+        return command;
+    }
+    
+    /**
+     * @return the ExecutableManager
+     */
+    public ExecutableManager getExecutableManager() {
+        return executableManager;
+    }
+
+    @Override
+    public LogicalSchema getSchema() {
+        if (schema!=null)
+            return schema;
+        LogicalRelationalOperator input = null;
+        try {
+            input = (LogicalRelationalOperator)plan.getPredecessors(this).get(0);
+        }catch(Exception e) {
+            throw new RuntimeException("Unable to get predecessor of LOStream.", e);
+        }
+        
+        schema = input.getSchema();
+        return schema;
+    }   
+    
+    @Override
+    public void accept(PlanVisitor v) throws IOException {
+        if (!(v instanceof LogicalRelationalNodesVisitor)) {
+            throw new IOException("Expected LogicalPlanVisitor");
+        }
+        ((LogicalRelationalNodesVisitor)v).visit(this);
+    }
+    
+    @Override
+    public boolean isEqual(Operator other) {
+        if (other != null && other instanceof LOStream) { 
+            return checkEquality((LogicalRelationalOperator)other);
+        } else {
+            return false;
+        }
+    }
+
+}

Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java Wed Aug  4 17:46:42 2010
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan.logical.relational;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.pig.impl.util.Pair;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.PlanVisitor;
+import org.apache.pig.newplan.logical.expression.LogicalExpression;
+
+public class LOUnion extends LogicalRelationalOperator {
+
+    // uid mapping from output uid to input uid
+    private List<Pair<Long, Long>> uidMapping = new ArrayList<Pair<Long, Long>>();
+    
+    public LOUnion(OperatorPlan plan) {
+        super("LOUnion", plan);
+    }    
+    @Override
+    public LogicalSchema getSchema() {
+        if (schema != null) {
+            return schema;
+        }
+        List<Operator> inputs = null;
+        try {
+            inputs = plan.getPredecessors(this);
+        }catch(Exception e) {
+            throw new RuntimeException("Unable to get predecessor of LOUnion.", e);
+        }
+        
+        // If any predecessor's schema is null, or length of predecessor's schema does not match,
+        // then the schema for union is null
+        int length = -1;
+        for (Operator input : inputs) {
+            LogicalRelationalOperator op = (LogicalRelationalOperator)input;
+            if (op.getSchema()==null)
+                return null;
+            if (length==-1)
+                length = op.getSchema().size();
+            else {
+                if (op.getSchema().size()!=length)
+                    return null;
+            }
+        }
+        
+        // Check if all predecessor's schema are compatible.
+        // TODO: Migrate all existing schema merging rules
+        LogicalSchema schema0 = ((LogicalRelationalOperator)inputs.get(0)).getSchema();
+        for (int i=1;i<inputs.size();i++) {
+            LogicalSchema otherSchema = ((LogicalRelationalOperator)inputs.get(i)).getSchema();
+            if (!schema0.isEqual(otherSchema))
+                return null;
+        }
+        
+        // Generate merged schema based on schema of first input
+        schema = new LogicalSchema();
+        for (int i=0;i<schema0.size();i++)
+        {
+            LogicalSchema.LogicalFieldSchema fs = new LogicalSchema.LogicalFieldSchema(schema0.getField(i));
+            long uid = -1;
+            for (Pair<Long, Long> pair : uidMapping) {
+                if (pair.second==schema0.getField(i).uid) {
+                    uid = pair.first;
+                    break;
+                }
+            }
+            if (uid==-1) {
+                uid = LogicalExpression.getNextUid();
+                for (Operator input : inputs) {
+                    long inputUid = ((LogicalRelationalOperator)input).getSchema().getField(i).uid;
+                    uidMapping.add(new Pair<Long, Long>(uid, inputUid));
+                }
+            }
+
+            fs.uid = uid;
+            schema.addField(fs);
+        }
+        return schema;
+    }
+
+    @Override
+    public void accept(PlanVisitor v) throws IOException {
+        if (!(v instanceof LogicalRelationalNodesVisitor)) {
+            throw new IOException("Expected LogicalPlanVisitor");
+        }
+        ((LogicalRelationalNodesVisitor)v).visit(this);
+    }
+
+    @Override
+    public boolean isEqual(Operator other) {
+        if (other != null && other instanceof LOUnion) { 
+            return checkEquality((LOUnion)other);
+        } else {
+            return false;
+        }
+    }
+
+    // Get input uids mapping to the output uid
+    public Set<Long> getInputUids(long uid) {
+        Set<Long> result = new HashSet<Long>();
+        for (Pair<Long, Long> pair : uidMapping) {
+            if (pair.first==uid)
+                result.add(pair.second);
+        }
+        return result;
+    }
+}