You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2010/08/23 21:10:34 UTC

svn commit: r988256 [1/3] - in /hadoop/pig/trunk: src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/newplan/logical/ src/org/apache/pig/newplan/lo...

Author: daijy
Date: Mon Aug 23 19:10:32 2010
New Revision: 988256

URL: http://svn.apache.org/viewvc?rev=988256&view=rev
Log:
PIG-1178: LogicalPlan and Optimizer are too complex and hard to work with (PIG-1178-7.patch)

Added:
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/LoadTypeCastInserter.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/StreamTypeCastInserter.java
Removed:
    hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanColumnPrune2.java
Modified:
    hadoop/pig/trunk/src/org/apache/pig/PigServer.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/ForeachInnerPlanVisitor.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/LogicalExpPlanMigrationVistor.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/LogicalPlanMigrationVistor.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/Util.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/CastExpression.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/DereferenceExpression.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/ProjectExpression.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/AllExpressionVisitor.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanPrinter.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/SchemaResetter.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCogroup.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LODistinct.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOFilter.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOGenerate.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOInnerLoad.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOJoin.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLimit.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLoad.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOSort.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOSplit.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOSplitOutput.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOStore.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOStream.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalRelationalOperator.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/AddForEach.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnPruneHelper.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnPruneVisitor.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/FilterAboveForeach.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/MapKeysPruneHelper.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/PushUpFilter.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/TypeCastInserter.java
    hadoop/pig/trunk/test/newlogicalplan-tests
    hadoop/pig/trunk/test/org/apache/pig/test/TestLogToPhyCompiler.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQueryCompiler.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQueryLocal.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanColumnPrune.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanOperatorPlan.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestPigRunner.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestPruneColumn.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestStore.java

Modified: hadoop/pig/trunk/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/PigServer.java?rev=988256&r1=988255&r2=988256&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/PigServer.java Mon Aug 23 19:10:32 2010
@@ -884,7 +884,7 @@ public class PigServer {
             }
             PhysicalPlan pp = compilePp(lp);
             lp.explain(lps, format, verbose);
-            if( pigContext.getProperties().getProperty("pig.usenewlogicalplan", "false").equals("true") ) {
+            if( pigContext.getProperties().getProperty("pig.usenewlogicalplan", "true").equals("true") ) {
                 LogicalPlanMigrationVistor migrator = new LogicalPlanMigrationVistor(lp);
                 migrator.visit();
                 org.apache.pig.newplan.logical.relational.LogicalPlan newPlan = migrator.getNewLogicalPlan();
@@ -1269,7 +1269,7 @@ public class PigServer {
         validate(lp, collector, isBeforeOptimizer);
         
         // optimize
-        if (optimize && pigContext.getProperties().getProperty("pig.usenewlogicalplan", "false").equals("false")) {
+        if (optimize && pigContext.getProperties().getProperty("pig.usenewlogicalplan", "true").equals("false")) {
             HashSet<String> optimizerRules = null;
             try {
                 optimizerRules = (HashSet<String>) ObjectSerializer

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=988256&r1=988255&r2=988256&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Mon Aug 23 19:10:32 2010
@@ -234,12 +234,13 @@ public class HExecutionEngine {
         }
 
         try {
-            if (getConfiguration().getProperty("pig.usenewlogicalplan", "false").equals("true")) {
+            if (getConfiguration().getProperty("pig.usenewlogicalplan", "true").equals("true")) {
                 log.info("pig.usenewlogicalplan is set to true. New logical plan will be used.");
                 
                 // translate old logical plan to new plan
                 LogicalPlanMigrationVistor visitor = new LogicalPlanMigrationVistor(plan);
                 visitor.visit();
+                visitor.finish();
                 org.apache.pig.newplan.logical.relational.LogicalPlan newPlan = visitor.getNewLogicalPlan();
                 
                 SchemaResetter schemaResetter = new SchemaResetter(newPlan);
@@ -267,6 +268,7 @@ public class HExecutionEngine {
                 
                 translator.setPigContext(pigContext);
                 translator.visit();
+                translator.finish();
                 return translator.getPhysicalPlan();
                 
             }else{       
@@ -279,7 +281,7 @@ public class HExecutionEngine {
             }
         } catch (Exception ve) {
             int errCode = 2042;
-            String msg = "Internal error. Unable to translate logical plan to physical plan.";
+            String msg = "Error in new logical plan. Try -Dpig.usenewlogicalplan=false.";
             throw new ExecException(msg, errCode, PigException.BUG, ve);
         }
     }

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=988256&r1=988255&r2=988256&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Mon Aug 23 19:10:32 2010
@@ -2715,23 +2715,30 @@ public class MRCompiler extends PhyPlanV
         private void fixProjectionAfterLimit(MapReduceOper mro,
                 MapReduceOper sortMROp) throws PlanException, VisitorException {
                         
-            PhysicalOperator op = sortMROp.reducePlan.getLeaves().get(0);
+            PhysicalOperator op = sortMROp.reducePlan.getRoots().get(0);
+            assert(op instanceof POPackage);
+            
+            op = sortMROp.reducePlan.getSuccessors(op).get(0);
+            assert(op instanceof POForEach);
             
             while (true) {
-                List<PhysicalOperator> preds = sortMROp.reducePlan
-                        .getPredecessors(op);
-                op = preds.get(0); 
-                if (op instanceof POLimit) break;
+                List<PhysicalOperator> succs = sortMROp.reducePlan
+                        .getSuccessors(op);
+                if (succs==null) break;
+                op = succs.get(0);
+                if (op instanceof POForEach) break;
             }
             
             while (true) {
-                List<PhysicalOperator> succes = sortMROp.reducePlan
+                if (op instanceof POStore) break;
+                PhysicalOperator opToMove = op;
+                List<PhysicalOperator> succs = sortMROp.reducePlan
                         .getSuccessors(op);
-                PhysicalOperator succ = succes.get(0);               
-                if (succ instanceof POStore) break;
-            
-                sortMROp.reducePlan.removeAndReconnect(succ);
-                mro.reducePlan.addAsLeaf(succ);
+                op = succs.get(0);
+                
+                sortMROp.reducePlan.removeAndReconnect(opToMove);
+                mro.reducePlan.addAsLeaf(opToMove);
+                
             }
         }
     }

Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/ForeachInnerPlanVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/ForeachInnerPlanVisitor.java?rev=988256&r1=988255&r2=988256&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/ForeachInnerPlanVisitor.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/ForeachInnerPlanVisitor.java Mon Aug 23 19:10:32 2010
@@ -20,6 +20,7 @@ package org.apache.pig.newplan.logical;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.pig.impl.logicalLayer.ExpressionOperator;
 import org.apache.pig.impl.logicalLayer.FrontendException;
@@ -36,10 +37,12 @@ import org.apache.pig.impl.plan.Dependen
 import org.apache.pig.impl.plan.PlanWalker;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.Pair;
+import org.apache.pig.newplan.OperatorPlan;
 import org.apache.pig.newplan.logical.expression.DereferenceExpression;
 import org.apache.pig.newplan.logical.expression.LogicalExpression;
 import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
 import org.apache.pig.newplan.logical.expression.ProjectExpression;
+import org.apache.pig.newplan.logical.relational.LOGenerate;
 import org.apache.pig.newplan.logical.relational.LOInnerLoad;
 import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
 
@@ -53,10 +56,11 @@ public class ForeachInnerPlanVisitor ext
     private org.apache.pig.newplan.logical.relational.LogicalRelationalOperator gen;
     private int inputNo;
     private HashMap<LogicalOperator, LogicalRelationalOperator> innerOpsMap;
+    private Map<LogicalExpression, LogicalOperator> scalarAliasMap = new HashMap<LogicalExpression, LogicalOperator>();
 
     public ForeachInnerPlanVisitor(org.apache.pig.newplan.logical.relational.LOForEach foreach, LOForEach oldForeach, LogicalPlan innerPlan, 
-            LogicalPlan oldLogicalPlan) throws FrontendException {
-        super(innerPlan, foreach, oldLogicalPlan);
+            LogicalPlan oldLogicalPlan, Map<LogicalExpression, LogicalOperator> scalarMap) throws FrontendException {
+        super(innerPlan, oldForeach, foreach, oldLogicalPlan, scalarMap);
         newInnerPlan = foreach.getInnerPlan();
         
         // get next inputNo 
@@ -71,6 +75,7 @@ public class ForeachInnerPlanVisitor ext
         this.oldForeach = oldForeach;
                     
         innerOpsMap = new HashMap<LogicalOperator, LogicalRelationalOperator>();
+        scalarAliasMap = scalarMap;
     }
     
     private void translateInnerPlanConnection(LogicalOperator oldOp, org.apache.pig.newplan.Operator newOp) throws FrontendException {
@@ -92,11 +97,11 @@ public class ForeachInnerPlanVisitor ext
         }
     }
     
-    private LogicalExpressionPlan translateInnerExpressionPlan(LogicalPlan lp, LogicalRelationalOperator op, LogicalPlan outerPlan) throws VisitorException {
+    private LogicalExpressionPlan translateInnerExpressionPlan(LogicalPlan lp, LogicalOperator oldOp, LogicalRelationalOperator op, LogicalPlan outerPlan) throws VisitorException {
         PlanWalker<LogicalOperator, LogicalPlan> childWalker = 
             new DependencyOrderWalker<LogicalOperator, LogicalPlan>(lp);
         
-        LogicalExpPlanMigrationVistor childPlanVisitor = new LogicalExpPlanMigrationVistor(lp, op, outerPlan);
+        LogicalExpPlanMigrationVistor childPlanVisitor = new LogicalExpPlanMigrationVistor(lp, oldOp, op, outerPlan, scalarAliasMap);
         
         childWalker.walk(childPlanVisitor);
         return childPlanVisitor.exprPlan;
@@ -186,7 +191,7 @@ public class ForeachInnerPlanVisitor ext
         }
         
         for (LogicalPlan sortPlan : sortPlans) {
-            LogicalExpressionPlan newSortPlan = translateInnerExpressionPlan(sortPlan, newSort, mPlan);
+            LogicalExpressionPlan newSortPlan = translateInnerExpressionPlan(sortPlan, sort, newSort, mPlan);
             newSortPlans.add(newSortPlan);
         }
     }
@@ -228,7 +233,7 @@ public class ForeachInnerPlanVisitor ext
         
         newFilter.setAlias(filter.getAlias());
         newFilter.setRequestedParallelism(filter.getRequestedParallelism());
-        LogicalExpressionPlan newFilterPlan = translateInnerExpressionPlan(filter.getComparisonPlan(), newFilter, mPlan);
+        LogicalExpressionPlan newFilterPlan = translateInnerExpressionPlan(filter.getComparisonPlan(), filter, newFilter, mPlan);
         newFilter.setFilterPlan(newFilterPlan);
         newInnerPlan.add(newFilter);
         innerOpsMap.put(filter, newFilter);
@@ -238,4 +243,43 @@ public class ForeachInnerPlanVisitor ext
             throw new VisitorException(e);
         }
     }
+    
+    public void visit(LOForEach foreach) throws VisitorException {
+        org.apache.pig.newplan.logical.relational.LOForEach newForEach = 
+            new org.apache.pig.newplan.logical.relational.LOForEach(newInnerPlan);
+        
+        newForEach.setAlias(foreach.getAlias());
+        newForEach.setRequestedParallelism(foreach.getRequestedParallelism());
+        
+        org.apache.pig.newplan.logical.relational.LogicalPlan newForEachInnerPlan 
+            = new org.apache.pig.newplan.logical.relational.LogicalPlan();        
+        newForEach.setInnerPlan(newForEachInnerPlan);
+        List<LogicalExpressionPlan> expPlans = new ArrayList<LogicalExpressionPlan>();
+        boolean[] flattens = new boolean[foreach.getForEachPlans().size()];
+        LOGenerate generate = new LOGenerate(newForEachInnerPlan, expPlans, flattens);
+        newForEachInnerPlan.add(generate);
+        
+        for (int i=0;i<foreach.getForEachPlans().size();i++) {
+            LogicalPlan innerPlan = foreach.getForEachPlans().get(i);
+            // Assume only one project is allowed in this level of foreach
+            LOProject project = (LOProject)innerPlan.iterator().next();
+
+            LOInnerLoad innerLoad = new LOInnerLoad(newForEachInnerPlan, 
+                    newForEach, project.isStar()?-1:project.getCol());
+            newForEachInnerPlan.add(innerLoad);
+            newForEachInnerPlan.connect(innerLoad, generate);
+            LogicalExpressionPlan expPlan = new LogicalExpressionPlan();
+            expPlans.add(expPlan);
+            ProjectExpression pe = new ProjectExpression(expPlan, i, -1, generate);
+            expPlan.add(pe);
+        }
+        
+        newInnerPlan.add(newForEach);
+        innerOpsMap.put(foreach, newForEach);
+        try {
+            translateInnerPlanConnection(foreach, newForEach);
+        } catch (FrontendException e) {
+            throw new VisitorException(e);
+        }
+    }
 }
\ No newline at end of file

Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/LogicalExpPlanMigrationVistor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/LogicalExpPlanMigrationVistor.java?rev=988256&r1=988255&r2=988256&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/LogicalExpPlanMigrationVistor.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/LogicalExpPlanMigrationVistor.java Mon Aug 23 19:10:32 2010
@@ -20,6 +20,7 @@ package org.apache.pig.newplan.logical;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.pig.impl.logicalLayer.ExpressionOperator;
 import org.apache.pig.impl.logicalLayer.FrontendException;
@@ -44,6 +45,8 @@ import org.apache.pig.impl.logicalLayer.
 import org.apache.pig.impl.logicalLayer.LOOr;
 import org.apache.pig.impl.logicalLayer.LOProject;
 import org.apache.pig.impl.logicalLayer.LORegexp;
+import org.apache.pig.impl.logicalLayer.LOSplit;
+import org.apache.pig.impl.logicalLayer.LOSplitOutput;
 import org.apache.pig.impl.logicalLayer.LOSubtract;
 import org.apache.pig.impl.logicalLayer.LOUserFunc;
 import org.apache.pig.impl.logicalLayer.LOVisitor;
@@ -86,14 +89,19 @@ public class LogicalExpPlanMigrationVist
     protected org.apache.pig.newplan.logical.expression.LogicalExpressionPlan exprPlan;
     protected HashMap<LogicalOperator, LogicalExpression> exprOpsMap;
     protected LogicalRelationalOperator attachedRelationalOp;
+    protected LogicalOperator oldAttachedRelationalOp;
     protected LogicalPlan outerPlan;
+    protected Map<LogicalExpression, LogicalOperator> scalarAliasMap = new HashMap<LogicalExpression, LogicalOperator>();
     
-    public LogicalExpPlanMigrationVistor(LogicalPlan expressionPlan, LogicalRelationalOperator attachedOperator, LogicalPlan outerPlan) {
+    public LogicalExpPlanMigrationVistor(LogicalPlan expressionPlan, LogicalOperator oldAttachedOperator,
+            LogicalRelationalOperator attachedOperator, LogicalPlan outerPlan, Map<LogicalExpression, LogicalOperator> scalarMap) {
         super(expressionPlan, new DependencyOrderWalker<LogicalOperator, LogicalPlan>(expressionPlan));
         exprPlan = new org.apache.pig.newplan.logical.expression.LogicalExpressionPlan();
         exprOpsMap = new HashMap<LogicalOperator, LogicalExpression>();
         attachedRelationalOp = attachedOperator;
+        oldAttachedRelationalOp = oldAttachedOperator;
         this.outerPlan = outerPlan;
+        scalarAliasMap = scalarMap;
     }    
 
     private void translateConnection(LogicalOperator oldOp, org.apache.pig.newplan.Operator newOp) {       
@@ -122,8 +130,14 @@ public class LogicalExpPlanMigrationVist
         }
         else {
             LogicalOperator lg = project.getExpression();
-            LogicalOperator succed = outerPlan.getSuccessors(lg).get(0);
-            int input = outerPlan.getPredecessors(succed).indexOf(lg);
+            int input;
+            if (oldAttachedRelationalOp instanceof LOSplitOutput) {
+                LOSplit split = (LOSplit)outerPlan.getPredecessors(oldAttachedRelationalOp).get(0);
+                input = outerPlan.getPredecessors(split).indexOf(lg);
+            }
+            else {
+                input = outerPlan.getPredecessors(oldAttachedRelationalOp).indexOf(lg);
+            }
             pe = new ProjectExpression(exprPlan, input, project.isStar()?-1:col, attachedRelationalOp);
         }
         
@@ -200,6 +214,11 @@ public class LogicalExpPlanMigrationVist
         }
         
         exprOpsMap.put(op, exp);
+        // We need to track all the scalars
+        if(op.getImplicitReferencedOperator() != null) {
+            scalarAliasMap.put(exp, op.getImplicitReferencedOperator());
+        }
+
     }
 
     public void visit(LOBinCond op) throws VisitorException {

Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/LogicalPlanMigrationVistor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/LogicalPlanMigrationVistor.java?rev=988256&r1=988255&r2=988256&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/LogicalPlanMigrationVistor.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/LogicalPlanMigrationVistor.java Mon Aug 23 19:10:32 2010
@@ -20,6 +20,7 @@ package org.apache.pig.newplan.logical;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.logicalLayer.FrontendException;
@@ -56,7 +57,9 @@ import org.apache.pig.newplan.logical.re
  */
 public class LogicalPlanMigrationVistor extends LOVisitor { 
     private org.apache.pig.newplan.logical.relational.LogicalPlan logicalPlan;
-    private HashMap<LogicalOperator, LogicalRelationalOperator> opsMap;
+    private Map<LogicalOperator, LogicalRelationalOperator> opsMap;
+    private Map<org.apache.pig.newplan.logical.expression.LogicalExpression, LogicalOperator> scalarAliasMap = 
+        new HashMap<org.apache.pig.newplan.logical.expression.LogicalExpression, LogicalOperator>();
    
     public LogicalPlanMigrationVistor(LogicalPlan plan) {
         super(plan, new DependencyOrderWalker<LogicalOperator, LogicalPlan>(plan));
@@ -75,11 +78,11 @@ public class LogicalPlanMigrationVistor 
         }        
     }      
     
-    private LogicalExpressionPlan translateExpressionPlan(LogicalPlan lp, LogicalRelationalOperator op) throws VisitorException {
+    private LogicalExpressionPlan translateExpressionPlan(LogicalPlan lp, LogicalOperator oldOp, LogicalRelationalOperator op) throws VisitorException {
         PlanWalker<LogicalOperator, LogicalPlan> childWalker = 
             new DependencyOrderWalker<LogicalOperator, LogicalPlan>(lp);
         
-        LogicalExpPlanMigrationVistor childPlanVisitor = new LogicalExpPlanMigrationVistor(lp, op, mPlan);
+        LogicalExpPlanMigrationVistor childPlanVisitor = new LogicalExpPlanMigrationVistor(lp, oldOp, op, mPlan, scalarAliasMap);
         
         childWalker.walk(childPlanVisitor);
         return childPlanVisitor.exprPlan;
@@ -95,6 +98,8 @@ public class LogicalPlanMigrationVistor 
         org.apache.pig.newplan.logical.relational.LOCogroup.GROUPTYPE grouptype;
         if( cg.getGroupType() == GROUPTYPE.COLLECTED ) {
             grouptype = org.apache.pig.newplan.logical.relational.LOCogroup.GROUPTYPE.COLLECTED;
+        } else if (cg.getGroupType() == GROUPTYPE.MERGE ){
+            grouptype = org.apache.pig.newplan.logical.relational.LOCogroup.GROUPTYPE.MERGE;
         } else {
             grouptype = org.apache.pig.newplan.logical.relational.LOCogroup.GROUPTYPE.REGULAR;
         }
@@ -113,12 +118,14 @@ public class LogicalPlanMigrationVistor 
             ArrayList<LogicalPlan> plans = 
                 (ArrayList<LogicalPlan>) cg.getGroupByPlans().get(inputs.get(i));
             for( LogicalPlan plan : plans ) {
-                LogicalExpressionPlan expPlan = translateExpressionPlan(plan, newCogroup);
+                LogicalExpressionPlan expPlan = translateExpressionPlan(plan, cg, newCogroup);
                 newExpressionPlans.put(Integer.valueOf(i), expPlan);
             }
         }
         
         newCogroup.setAlias(cg.getAlias());
+        newCogroup.setRequestedParallelism(cg.getRequestedParallelism());
+        newCogroup.setCustomPartitioner(cg.getCustomPartitioner());
         
         logicalPlan.add(newCogroup);
         opsMap.put(cg, newCogroup);
@@ -154,12 +161,13 @@ public class LogicalPlanMigrationVistor 
         for (int i=0; i<inputs.size(); i++) {
             List<LogicalPlan> plans = (List<LogicalPlan>) loj.getJoinPlans().get(inputs.get(i));
             for (LogicalPlan lp : plans) {                               
-                joinPlans.put(i, translateExpressionPlan(lp, join));
+                joinPlans.put(i, translateExpressionPlan(lp, loj, join));
             }        
         }
         
         join.setAlias(loj.getAlias());
         join.setRequestedParallelism(loj.getRequestedParallelism());
+        join.setCustomPartitioner(join.getCustomPartitioner());
         
         logicalPlan.add(join);
         opsMap.put(loj, join);       
@@ -173,6 +181,7 @@ public class LogicalPlanMigrationVistor 
      
         newCross.setAlias(cross.getAlias());
         newCross.setRequestedParallelism(cross.getRequestedParallelism());
+        newCross.setCustomPartitioner(cross.getCustomPartitioner());
         
         logicalPlan.add(newCross);
         opsMap.put(cross, newCross);       
@@ -205,7 +214,7 @@ public class LogicalPlanMigrationVistor 
         try {
             for(int i=0; i<ll.size(); i++) {
                 LogicalPlan lp = ll.get(i);
-                ForeachInnerPlanVisitor v = new ForeachInnerPlanVisitor(newForeach, forEach, lp, mPlan);
+                ForeachInnerPlanVisitor v = new ForeachInnerPlanVisitor(newForeach, forEach, lp, mPlan, scalarAliasMap);
                 v.visit();
                 
                 expPlans.add(v.exprPlan);
@@ -231,7 +240,7 @@ public class LogicalPlanMigrationVistor 
                     newSortPlans, sort.getAscendingCols(), sort.getUserFunc());
         
         for (LogicalPlan sortPlan : sortPlans) {
-            LogicalExpressionPlan newSortPlan = translateExpressionPlan(sortPlan, newSort);
+            LogicalExpressionPlan newSortPlan = translateExpressionPlan(sortPlan, sort, newSort);
             newSortPlans.add(newSortPlan);
         }
         
@@ -256,9 +265,17 @@ public class LogicalPlanMigrationVistor 
     }
     
     public void visit(LOStream stream) throws VisitorException {
+        
+        LogicalSchema s;
+        try {
+            s = Util.translateSchema(stream.getSchema());
+        }catch(Exception e) {
+            throw new VisitorException("Failed to translate schema.", e);
+        }
+        
         org.apache.pig.newplan.logical.relational.LOStream newStream = 
             new org.apache.pig.newplan.logical.relational.LOStream(logicalPlan,
-                    stream.getExecutableManager(), stream.getStreamingCommand());
+                    stream.getExecutableManager(), stream.getStreamingCommand(), s);
         
         newStream.setAlias(stream.getAlias());
         newStream.setRequestedParallelism(stream.getRequestedParallelism());
@@ -272,7 +289,7 @@ public class LogicalPlanMigrationVistor 
         org.apache.pig.newplan.logical.relational.LOFilter newFilter = new org.apache.pig.newplan.logical.relational.LOFilter(logicalPlan);
         
         LogicalPlan filterPlan = filter.getComparisonPlan();
-        LogicalExpressionPlan newFilterPlan = translateExpressionPlan(filterPlan, newFilter);
+        LogicalExpressionPlan newFilterPlan = translateExpressionPlan(filterPlan, filter, newFilter);
       
         newFilter.setFilterPlan(newFilterPlan);
         newFilter.setAlias(filter.getAlias());
@@ -327,6 +344,10 @@ public class LogicalPlanMigrationVistor 
        
         newStore.setAlias(store.getAlias());
         newStore.setRequestedParallelism(store.getRequestedParallelism());
+        newStore.setSignature(store.getSignature());
+        newStore.setInputSpec(store.getInputSpec());
+        newStore.setSortInfo(store.getSortInfo());
+        newStore.setTmpStore(store.isTmpStore());
         
         logicalPlan.add(newStore);
         opsMap.put(store, newStore);       
@@ -349,7 +370,7 @@ public class LogicalPlanMigrationVistor 
             new org.apache.pig.newplan.logical.relational.LOSplitOutput(logicalPlan);
         
         LogicalPlan filterPlan = splitOutput.getConditionPlan();
-        LogicalExpressionPlan newFilterPlan = translateExpressionPlan(filterPlan, newSplitOutput);
+        LogicalExpressionPlan newFilterPlan = translateExpressionPlan(filterPlan, splitOutput, newSplitOutput);
       
         newSplitOutput.setFilterPlan(newFilterPlan);
         newSplitOutput.setAlias(splitOutput.getAlias());
@@ -366,11 +387,18 @@ public class LogicalPlanMigrationVistor 
         
         newDistinct.setAlias(distinct.getAlias());
         newDistinct.setRequestedParallelism(distinct.getRequestedParallelism());
+        newDistinct.setCustomPartitioner(distinct.getCustomPartitioner());
         
         logicalPlan.add(newDistinct);
         opsMap.put(distinct, newDistinct);
         translateConnection(distinct, newDistinct);
     }
     
+    public void finish() {
+        for(org.apache.pig.newplan.logical.expression.LogicalExpression exp: scalarAliasMap.keySet()) {
+            ((org.apache.pig.newplan.logical.expression.UserFuncExpression)exp).setImplicitReferencedOperator(
+                    opsMap.get(scalarAliasMap.get(exp)));
+        }
+    }
     
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/Util.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/Util.java?rev=988256&r1=988255&r2=988256&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/Util.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/Util.java Mon Aug 23 19:10:32 2010
@@ -64,6 +64,13 @@ public class Util {
         return newFs;
     }
     
+    /**
+     * This function translates the new LogicalSchema into old Schema format required
+     * by PhysicalOperators
+     * @param schema LogicalSchema to be converted to Schema
+     * @return Schema that is converted from LogicalSchema
+     * @throws FrontendException 
+     */
     public static Schema translateSchema(LogicalSchema schema) {       
         if (schema == null) {
             return null;
@@ -75,6 +82,7 @@ public class Util {
             Schema.FieldSchema f2 = null;
             try {
                 f2 = new Schema.FieldSchema(f.alias, translateSchema(f.schema), f.type);
+                f2.canonicalName = ((Long)f.uid).toString();
                 s2.add(f2);
             } catch (FrontendException e) {
             }
@@ -99,14 +107,14 @@ public class Util {
         return newFs;
     }
     
-    public static LOForEach addForEachAfter(LogicalPlan plan, LogicalRelationalOperator op,
+    public static LOForEach addForEachAfter(LogicalPlan plan, LogicalRelationalOperator op, int branch,
             Set<Integer> columnsToDrop) throws FrontendException {
         LOForEach foreach = new LOForEach(plan);
         
         plan.add(foreach);
         List<Operator> next = plan.getSuccessors(op);
         if (next != null) {
-            LogicalRelationalOperator nextOp = (LogicalRelationalOperator)next.get(0);
+            LogicalRelationalOperator nextOp = (LogicalRelationalOperator)next.get(branch);
             Pair<Integer, Integer> pos = plan.disconnect(op, nextOp);
             plan.connect(foreach, pos.first, nextOp, pos.second);
         }

Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/CastExpression.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/CastExpression.java?rev=988256&r1=988255&r2=988256&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/CastExpression.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/CastExpression.java Mon Aug 23 19:10:32 2010
@@ -76,7 +76,10 @@ public class CastExpression extends Unar
         uidOnlyFieldSchema = fieldSchema.mergeUid(uidOnlyFieldSchema);
         // Bring back the top level uid, this is not changed
         LogicalExpression exp = (LogicalExpression)plan.getSuccessors(this).get(0);
-        fieldSchema.uid = exp.getFieldSchema().uid;
+        if (exp.getFieldSchema()!=null) {
+            fieldSchema.uid = exp.getFieldSchema().uid;
+            fieldSchema.alias = exp.getFieldSchema().alias;
+        }
         return fieldSchema;
     }
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/DereferenceExpression.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/DereferenceExpression.java?rev=988256&r1=988255&r2=988256&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/DereferenceExpression.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/DereferenceExpression.java Mon Aug 23 19:10:32 2010
@@ -127,16 +127,29 @@ public class DereferenceExpression exten
         LogicalExpression successor = (LogicalExpression)plan.getSuccessors(this).get(0);
         LogicalFieldSchema predFS = successor.getFieldSchema();
         if (predFS!=null) {
-            LogicalSchema innerSchema = new LogicalSchema();
             if (columns.size()>1 || predFS.type==DataType.BAG) {
-                for (int column:columns) {
-                    innerSchema.addField(predFS.schema.getField(column));
+                LogicalSchema innerSchema = null;
+                if (predFS.schema!=null) {
+                    innerSchema = new LogicalSchema();
+                    LogicalSchema realSchema;
+                    if (predFS.schema.isTwoLevelAccessRequired()) {
+                        realSchema = predFS.schema.getField(0).schema;
+                    }
+                    else {
+                        realSchema = predFS.schema;
+                    }
+                    if (realSchema!=null) {
+                        for (int column:columns) {
+                            innerSchema.addField(realSchema.getField(column));
+                        }
+                    }
                 }
                 fieldSchema = new LogicalSchema.LogicalFieldSchema(null, innerSchema, predFS.type, 
                         LogicalExpression.getNextUid());
             }
             else { // Dereference a field out of a tuple
-                fieldSchema = predFS.schema.getField(columns.get(0));
+                if (predFS.schema!=null)
+                    fieldSchema = predFS.schema.getField(columns.get(0));
             }
         }
         return fieldSchema;

Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java?rev=988256&r1=988255&r2=988256&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java Mon Aug 23 19:10:32 2010
@@ -78,21 +78,21 @@ public class ExpToPhyTranslationVisitor 
 
     // This value points to the current LogicalRelationalOperator we are working on
     protected LogicalRelationalOperator currentOp;
+    protected Map<PhysicalOperator, LogicalRelationalOperator> scalarAliasMap;
     
-    public ExpToPhyTranslationVisitor(OperatorPlan plan, LogicalRelationalOperator op, PhysicalPlan phyPlan, Map<Operator, PhysicalOperator> map) throws FrontendException {
-        super(plan, new DependencyOrderWalker(plan));
-        currentOp = op;
-        logToPhyMap = map;
-        currentPlan = phyPlan;
-        currentPlans = new Stack<PhysicalPlan>();
+    public ExpToPhyTranslationVisitor(OperatorPlan plan, LogicalRelationalOperator op, PhysicalPlan phyPlan, 
+            Map<Operator, PhysicalOperator> map, Map<PhysicalOperator, LogicalRelationalOperator> scalarMap) throws FrontendException {
+        this(plan, new DependencyOrderWalker(plan), op, phyPlan, map, scalarMap);
     }
     
-    public ExpToPhyTranslationVisitor(OperatorPlan plan, PlanWalker walker, LogicalRelationalOperator op, PhysicalPlan phyPlan, Map<Operator, PhysicalOperator> map) throws FrontendException {
+    public ExpToPhyTranslationVisitor(OperatorPlan plan, PlanWalker walker, LogicalRelationalOperator op, PhysicalPlan phyPlan, Map<Operator, PhysicalOperator> map, 
+            Map<PhysicalOperator, LogicalRelationalOperator> scalarMap) throws FrontendException {
         super(plan, walker);
         currentOp = op;
         logToPhyMap = map;
         currentPlan = phyPlan;
         currentPlans = new Stack<PhysicalPlan>();
+        scalarAliasMap = scalarMap;
     }
     
     protected Map<Operator, PhysicalOperator> logToPhyMap;
@@ -374,7 +374,7 @@ public class ExpToPhyTranslationVisitor 
                 .getExpression());
         pIsNull.setExpr(from);
         pIsNull.setResultType(op.getType());
-        pIsNull.setOperandType(op.getType());
+        pIsNull.setOperandType(op.getExpression().getType());
         try {
             currentPlan.connect(from, pIsNull);
         } catch (PlanException e) {
@@ -394,6 +394,7 @@ public class ExpToPhyTranslationVisitor 
         logToPhyMap.put(op, pNegative);
         ExpressionOperator from = (ExpressionOperator) logToPhyMap.get(op
                 .getExpression());
+        pNegative.setExpr(from);
         pNegative.setResultType(op.getType());        
         try {
             currentPlan.connect(from, pNegative);
@@ -507,6 +508,11 @@ public class ExpToPhyTranslationVisitor 
             }
         }
         logToPhyMap.put(op, p);
+        
+        //We need to track all the scalars
+        if(op.getImplicitReferencedOperator() != null) {
+            scalarAliasMap.put(p, (LogicalRelationalOperator)op.getImplicitReferencedOperator());
+        }
     }
     
     @Override

Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/ProjectExpression.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/ProjectExpression.java?rev=988256&r1=988255&r2=988256&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/ProjectExpression.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/ProjectExpression.java Mon Aug 23 19:10:32 2010
@@ -123,39 +123,42 @@ public class ProjectExpression extends C
         
         LogicalSchema schema = referent.getSchema();
         
-        if (schema == null) {
-            fieldSchema = new LogicalSchema.LogicalFieldSchema(null, null, DataType.BYTEARRAY);
-            uidOnlyFieldSchema = fieldSchema.mergeUid(uidOnlyFieldSchema);
-        } 
-        else {
-            if (attachedRelationalOp instanceof LOGenerate && plan.getSuccessors(this)==null) {
-                if (!(findReferent() instanceof LOInnerLoad)||
-                        ((LOInnerLoad)findReferent()).sourceIsBag()) {
-                    String alias = findReferent().getAlias();
-                    List<LOInnerLoad> innerLoads = LOForEach.findReacheableInnerLoadFromBoundaryProject(this);
-                    
-                    // pull tuple information from innerload
-                    if (innerLoads.get(0).getProjection().getFieldSchema().schema.isTwoLevelAccessRequired()) {
-                        LogicalFieldSchema originalTupleFieldSchema = innerLoads.get(0).getProjection().getFieldSchema().schema.getField(0);
-                        LogicalFieldSchema newTupleFieldSchema = new LogicalFieldSchema(originalTupleFieldSchema.alias,
-                                schema, DataType.TUPLE);
-                        newTupleFieldSchema.uid = originalTupleFieldSchema.uid;
-                        LogicalSchema newTupleSchema = new LogicalSchema();
-                        newTupleSchema.setTwoLevelAccessRequired(true);
-                        newTupleSchema.addField(newTupleFieldSchema);
-                        fieldSchema = new LogicalSchema.LogicalFieldSchema(alias, newTupleSchema, DataType.BAG);
-                    }
-                    else {
-                        fieldSchema = new LogicalSchema.LogicalFieldSchema(alias, schema, DataType.BAG);
-                    }
-                    fieldSchema.uid = innerLoads.get(0).getProjection().getFieldSchema().uid;
+        if (attachedRelationalOp instanceof LOGenerate && plan.getSuccessors(this)==null) {
+            if (!(findReferent() instanceof LOInnerLoad)||
+                    ((LOInnerLoad)findReferent()).sourceIsBag()) {
+                String alias = findReferent().getAlias();
+                List<LOInnerLoad> innerLoads = LOForEach.findReacheableInnerLoadFromBoundaryProject(this);
+                
+                // pull tuple information from innerload
+                if (innerLoads.get(0).getProjection().getFieldSchema().schema!=null &&
+                        innerLoads.get(0).getProjection().getFieldSchema().schema.isTwoLevelAccessRequired()) {
+                    LogicalFieldSchema originalTupleFieldSchema = innerLoads.get(0).getProjection().getFieldSchema().schema.getField(0);
+                    LogicalFieldSchema newTupleFieldSchema = new LogicalFieldSchema(originalTupleFieldSchema.alias,
+                            schema, DataType.TUPLE);
+                    newTupleFieldSchema.uid = originalTupleFieldSchema.uid;
+                    LogicalSchema newTupleSchema = new LogicalSchema();
+                    newTupleSchema.setTwoLevelAccessRequired(true);
+                    newTupleSchema.addField(newTupleFieldSchema);
+                    fieldSchema = new LogicalSchema.LogicalFieldSchema(alias, newTupleSchema, DataType.BAG);
                 }
                 else {
-                    fieldSchema = findReferent().getSchema().getField(0);
+                    fieldSchema = new LogicalSchema.LogicalFieldSchema(alias, schema, DataType.BAG);
                 }
-                uidOnlyFieldSchema = fieldSchema.mergeUid(uidOnlyFieldSchema);
+                fieldSchema.uid = innerLoads.get(0).getProjection().getFieldSchema().uid;
             }
             else {
+                if (findReferent().getSchema()!=null)
+                    fieldSchema = findReferent().getSchema().getField(0);
+            }
+            if (fieldSchema!=null)
+                uidOnlyFieldSchema = fieldSchema.mergeUid(uidOnlyFieldSchema);
+        }
+        else {
+            if (schema == null) {
+                fieldSchema = new LogicalSchema.LogicalFieldSchema(null, null, DataType.BYTEARRAY);
+                uidOnlyFieldSchema = fieldSchema.mergeUid(uidOnlyFieldSchema);
+            } 
+            else {
                 int index = -1;
                 if (!isProjectStar() && uidOnlyFieldSchema!=null) {
                     long uid = uidOnlyFieldSchema.uid;
@@ -170,11 +173,17 @@ public class ProjectExpression extends C
                     index = col;
                 
                 if (!isProjectStar()) {
-                    fieldSchema = schema.getField(index);
+                    if (schema!=null && schema.size()>index)
+                        fieldSchema = schema.getField(index);
+                    else
+                        fieldSchema = new LogicalSchema.LogicalFieldSchema(null, null, DataType.BYTEARRAY);
                     uidOnlyFieldSchema = fieldSchema.cloneUid();
                 }
                 else {
-                    fieldSchema = new LogicalSchema.LogicalFieldSchema(null, schema.deepCopy(), DataType.TUPLE);
+                    LogicalSchema newTupleSchema = null;
+                    if (schema!=null)
+                        newTupleSchema = schema.deepCopy();
+                    fieldSchema = new LogicalSchema.LogicalFieldSchema(null, newTupleSchema, DataType.TUPLE);
                     uidOnlyFieldSchema = fieldSchema.mergeUid(uidOnlyFieldSchema);
                 }
             }
@@ -194,7 +203,7 @@ public class ProjectExpression extends C
         if (preds == null || input >= preds.size()) {
             throw new FrontendException("Projection with nothing to reference!", 2225);
         }
-            
+        
         LogicalRelationalOperator pred =
             (LogicalRelationalOperator)preds.get(input);
         if (pred == null) {
@@ -215,6 +224,8 @@ public class ProjectExpression extends C
     
     public String toString() {
         StringBuilder msg = new StringBuilder();
+        if (fieldSchema!=null && fieldSchema.alias!=null)
+            msg.append(fieldSchema.alias+":");
         msg.append("(Name: " + name + " Type: ");
         if (fieldSchema!=null)
             msg.append(DataType.findTypeName(fieldSchema.type));
@@ -242,4 +253,18 @@ public class ProjectExpression extends C
     public void setAttachedRelationalOp(LogicalRelationalOperator attachedRelationalOp) {
         this.attachedRelationalOp = attachedRelationalOp;
     }
+    
+    @Override
+    public byte getType() throws FrontendException {
+        // for boundary project, if 
+        if (getFieldSchema()==null) {
+            if (attachedRelationalOp instanceof LOGenerate && findReferent() instanceof
+                    LOInnerLoad) {
+                if (((LOInnerLoad)findReferent()).getProjection().getColNum()==-1)
+                    return DataType.TUPLE;
+            }
+            return DataType.BYTEARRAY;
+        }
+        return super.getType();
+    }
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java?rev=988256&r1=988255&r2=988256&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java Mon Aug 23 19:10:32 2010
@@ -36,6 +36,7 @@ import org.apache.pig.newplan.logical.re
 public class UserFuncExpression extends LogicalExpression {
 
     private FuncSpec mFuncSpec;
+    private Operator implicitReferencedOperator = null; 
     
     public UserFuncExpression(OperatorPlan plan, FuncSpec funcSpec) {
         super("UserFunc", plan);
@@ -97,8 +98,14 @@ public class UserFuncExpression extends 
         LogicalSchema inputSchema = new LogicalSchema();
         List<Operator> succs = plan.getSuccessors(this);
 
-        for(Operator lo : succs){
-            inputSchema.addField(((LogicalExpression)lo).getFieldSchema());
+        if (succs!=null) {
+            for(Operator lo : succs){
+                if (((LogicalExpression)lo).getFieldSchema()==null) {
+                    inputSchema = null;
+                    break;
+                }
+                inputSchema.addField(((LogicalExpression)lo).getFieldSchema());
+            }
         }
 
         EvalFunc<?> ef = (EvalFunc<?>) PigContext.instantiateFuncFromSpec(mFuncSpec);
@@ -120,4 +127,12 @@ public class UserFuncExpression extends 
         uidOnlyFieldSchema = fieldSchema.mergeUid(uidOnlyFieldSchema);
         return fieldSchema;
     }
+    
+    public Operator getImplicitReferencedOperator() {
+        return implicitReferencedOperator;
+    }
+    
+    public void setImplicitReferencedOperator(Operator implicitReferencedOperator) {
+        this.implicitReferencedOperator = implicitReferencedOperator;
+    }
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/AllExpressionVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/AllExpressionVisitor.java?rev=988256&r1=988255&r2=988256&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/AllExpressionVisitor.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/AllExpressionVisitor.java Mon Aug 23 19:10:32 2010
@@ -74,7 +74,7 @@ public abstract class AllExpressionVisit
     @Override
     public void visit(LOJoin join) throws FrontendException {
         currentOp = join;
-        Collection<LogicalExpressionPlan> c = join.getExpressionPlans();
+        Collection<LogicalExpressionPlan> c = join.getExpressionPlanValues();
         for (LogicalExpressionPlan plan : c) {
             LogicalExpressionVisitor v = getVisitor(plan);
             v.visit();

Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java?rev=988256&r1=988255&r2=988256&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java Mon Aug 23 19:10:32 2010
@@ -22,15 +22,18 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
-import org.apache.pig.impl.logicalLayer.LOLoad;
 import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.logical.relational.LOLoad;
+import org.apache.pig.newplan.logical.relational.LOStream;
 import org.apache.pig.newplan.logical.rules.AddForEach;
 import org.apache.pig.newplan.logical.rules.ColumnMapKeyPrune;
 import org.apache.pig.newplan.logical.rules.FilterAboveForeach;
 import org.apache.pig.newplan.logical.rules.ImplicitSplitInserter;
+import org.apache.pig.newplan.logical.rules.LoadTypeCastInserter;
 import org.apache.pig.newplan.logical.rules.MergeFilter;
 import org.apache.pig.newplan.logical.rules.PushUpFilter;
 import org.apache.pig.newplan.logical.rules.SplitFilter;
+import org.apache.pig.newplan.logical.rules.StreamTypeCastInserter;
 import org.apache.pig.newplan.logical.rules.TypeCastInserter;
 import org.apache.pig.newplan.optimizer.PlanOptimizer;
 import org.apache.pig.newplan.optimizer.Rule;
@@ -61,7 +64,9 @@ public class LogicalPlanOptimizer extend
         // This set of rules Insert Foreach dedicated for casting after load
         s = new HashSet<Rule>();
         // add split filter rule
-        r = new TypeCastInserter("TypeCastInserter", LOLoad.class.getName());
+        r = new LoadTypeCastInserter("LoadTypeCastInserter");
+        checkAndAddRule(s, r);
+        r = new StreamTypeCastInserter("StreamTypeCastInserter");
         checkAndAddRule(s, r);
         if (!s.isEmpty())
             ls.add(s);

Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanPrinter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanPrinter.java?rev=988256&r1=988255&r2=988256&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanPrinter.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanPrinter.java Mon Aug 23 19:10:32 2010
@@ -42,6 +42,7 @@ import org.apache.pig.newplan.logical.re
 import org.apache.pig.newplan.logical.relational.LOSplit;
 import org.apache.pig.newplan.logical.relational.LOSplitOutput;
 import org.apache.pig.newplan.logical.relational.LOStore;
+import org.apache.pig.newplan.logical.relational.LOStream;
 import org.apache.pig.newplan.logical.relational.LOUnion;
 import org.apache.pig.newplan.logical.relational.LogicalRelationalNodesVisitor;
 
@@ -107,7 +108,7 @@ public class LogicalPlanPrinter extends 
         
         LogicalExpressionVisitor v = null;
         level++;
-        for (LogicalExpressionPlan plan : op.getExpressionPlans()) {
+        for (LogicalExpressionPlan plan : op.getExpressionPlanValues()) {
             v = getVisitor(plan);
             v.visit();
         }
@@ -208,6 +209,12 @@ public class LogicalPlanPrinter extends 
         stream.println( op.toString() );
     }
 
+    @Override
+    public void visit(LOStream op) throws FrontendException {
+        printLevel();
+        stream.println( op.toString() );
+    }
+    
     public String toString() {
         return stream.toString();
     }   

Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/SchemaResetter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/SchemaResetter.java?rev=988256&r1=988255&r2=988256&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/SchemaResetter.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/SchemaResetter.java Mon Aug 23 19:10:32 2010
@@ -8,6 +8,7 @@ import org.apache.pig.impl.util.MultiMap
 import org.apache.pig.newplan.DependencyOrderWalker;
 import org.apache.pig.newplan.OperatorPlan;
 import org.apache.pig.newplan.PlanWalker;
+import org.apache.pig.newplan.ReverseDependencyOrderWalker;
 import org.apache.pig.newplan.logical.expression.AllSameExpressionVisitor;
 import org.apache.pig.newplan.logical.expression.LogicalExpression;
 import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
@@ -58,7 +59,7 @@ public class SchemaResetter extends Logi
     @Override
     public void visit(LOJoin join) throws FrontendException {
         join.resetSchema();
-        Collection<LogicalExpressionPlan> joinPlans = join.getExpressionPlans();
+        Collection<LogicalExpressionPlan> joinPlans = join.getExpressionPlanValues();
         for (LogicalExpressionPlan joinPlan : joinPlans) {
             FieldSchemaResetter fsResetter = new FieldSchemaResetter(joinPlan);
             fsResetter.visit();
@@ -165,7 +166,7 @@ public class SchemaResetter extends Logi
 class FieldSchemaResetter extends AllSameExpressionVisitor {
 
     protected FieldSchemaResetter(OperatorPlan p) throws FrontendException {
-        super(p, new DependencyOrderWalker(p));
+        super(p, new ReverseDependencyOrderWalker(p));
     }
 
     @Override

Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCogroup.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCogroup.java?rev=988256&r1=988255&r2=988256&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCogroup.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCogroup.java Mon Aug 23 19:10:32 2010
@@ -49,7 +49,8 @@ public class LOCogroup extends LogicalRe
      */
     public static enum GROUPTYPE {
         REGULAR,    // Regular (co)group
-        COLLECTED   // Collected group
+        COLLECTED,  // Collected group
+        MERGE       // Map-side CoGroup on sorted data
     };
     
     private GROUPTYPE mGroupType;
@@ -175,11 +176,6 @@ public class LOCogroup extends LogicalRe
         int counter = 0;
         for (Operator op : inputs) {
             LogicalSchema inputSchema = ((LogicalRelationalOperator)op).getSchema();
-            // the schema of one input is unknown, so the join schema is unknown, just return 
-            if (inputSchema == null) {
-                schema = null;
-                return schema;
-            }
            
             // Check if we already have calculated Uid for this bag for given 
             // input operator

Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LODistinct.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LODistinct.java?rev=988256&r1=988255&r2=988256&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LODistinct.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LODistinct.java Mon Aug 23 19:10:32 2010
@@ -33,10 +33,14 @@ public class LODistinct extends LogicalR
 
     @Override
     public LogicalSchema getSchema() throws FrontendException {      
+        if (schema!=null)
+            return schema;
+        
         LogicalRelationalOperator input = null;
         input = (LogicalRelationalOperator)plan.getPredecessors(this).get(0);
         
-        return input.getSchema();
+        schema = input.getSchema();
+        return schema;
     }   
     
     @Override

Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOFilter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOFilter.java?rev=988256&r1=988255&r2=988256&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOFilter.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOFilter.java Mon Aug 23 19:10:32 2010
@@ -46,9 +46,14 @@ public class LOFilter extends LogicalRel
     
     @Override
     public LogicalSchema getSchema() throws FrontendException {
+        if (schema!=null)
+            return schema;
+        
         LogicalRelationalOperator input = null;
         input = (LogicalRelationalOperator)plan.getPredecessors(this).get(0);
-        return input.getSchema();
+        
+        schema = input.getSchema();
+        return schema;
     }   
     
     @Override

Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOGenerate.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOGenerate.java?rev=988256&r1=988255&r2=988256&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOGenerate.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOGenerate.java Mon Aug 23 19:10:32 2010
@@ -51,6 +51,10 @@ public class LOGenerate extends LogicalR
             LogicalExpression exp = (LogicalExpression)outputPlans.get(i).getSources().get(0);
             
             LogicalFieldSchema fieldSchema = null;
+            if (exp.getFieldSchema()==null) {
+                schema = null;
+                break;
+            }
             fieldSchema = exp.getFieldSchema().deepCopy();
             
             if (fieldSchema.type != DataType.TUPLE && fieldSchema.type != DataType.BAG) {
@@ -58,19 +62,26 @@ public class LOGenerate extends LogicalR
                 schema.addField(fieldSchema);
                 continue;
             } else {
+                // if bag/tuple don't have inner schema, after flatten, we don't have schema for the entire operator
+                if (fieldSchema.schema==null) {
+                    schema=null;
+                    break;
+                }
                 // if flatten is set, set schema of tuple field to this schema
                 List<LogicalSchema.LogicalFieldSchema> innerFieldSchemas = new ArrayList<LogicalSchema.LogicalFieldSchema>();
                 if (flattenFlags[i]) {
                     if (fieldSchema.type == DataType.BAG) {
                         // if it is bag of tuples, get the schema of tuples
-                        if (fieldSchema.schema.isTwoLevelAccessRequired()) {
-                            //  assert(fieldSchema.schema.size() == 1 && fieldSchema.schema.getField(0).type == DataType.TUPLE)
-                            innerFieldSchemas = fieldSchema.schema.getField(0).schema.getFields();
-                        } else {
-                            innerFieldSchemas = fieldSchema.schema.getFields();
-                        }
-                        for (LogicalSchema.LogicalFieldSchema fs : innerFieldSchemas) {
-                            fs.alias = fieldSchema.alias + "::" + fs.alias;
+                        if (fieldSchema.schema!=null) {
+                            if (fieldSchema.schema.isTwoLevelAccessRequired()) {
+                                //  assert(fieldSchema.schema.size() == 1 && fieldSchema.schema.getField(0).type == DataType.TUPLE)
+                                innerFieldSchemas = fieldSchema.schema.getField(0).schema.getFields();
+                            } else {
+                                innerFieldSchemas = fieldSchema.schema.getFields();
+                            }
+                            for (LogicalSchema.LogicalFieldSchema fs : innerFieldSchemas) {
+                                fs.alias = fieldSchema.alias + "::" + fs.alias;
+                            }
                         }
                     } else { // DataType.TUPLE
                         innerFieldSchemas = fieldSchema.schema.getFields();
@@ -79,7 +90,6 @@ public class LOGenerate extends LogicalR
                         }
                     }
                     
-                    
                     for (LogicalSchema.LogicalFieldSchema fs : innerFieldSchemas)
                         schema.addField(fs);
                 }
@@ -87,6 +97,8 @@ public class LOGenerate extends LogicalR
                     schema.addField(fieldSchema);
             }
         }
+        if (schema!=null && schema.size()==0)
+            schema = null;
         return schema;
     }
 

Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOInnerLoad.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOInnerLoad.java?rev=988256&r1=988255&r2=988256&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOInnerLoad.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOInnerLoad.java Mon Aug 23 19:10:32 2010
@@ -17,6 +17,8 @@
  */
 package org.apache.pig.newplan.logical.relational;
 
+import java.util.Map;
+
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.newplan.Operator;
@@ -53,23 +55,31 @@ public class LOInnerLoad extends Logical
         if (schema!=null)
             return schema;
         
-        if (prj.getFieldSchema()!=null) {
-            schema = new LogicalSchema();
-            if (prj.getFieldSchema().type==DataType.BAG && prj.getFieldSchema().schema.isTwoLevelAccessRequired()) {
-                LogicalFieldSchema tupleSchema = prj.getFieldSchema().schema.getField(0);
-                for (int i=0;i<tupleSchema.schema.size();i++)
-                    schema.addField(tupleSchema.schema.getField(i));
-                sourceIsBag = true;
-                alias = prj.getFieldSchema().alias;
-            }
-            else if (prj.getFieldSchema().type==DataType.BAG){
-                for (int i=0;i<prj.getFieldSchema().schema.size();i++)
-                    schema.addField(prj.getFieldSchema().schema.getField(i));
-                sourceIsBag = true;
-                alias = prj.getFieldSchema().alias;
-            }
-            else {
-                schema.addField(prj.getFieldSchema());
+        if (prj.findReferent().getSchema()!=null) {
+            if (prj.getFieldSchema()!=null) {
+                if (prj.getFieldSchema().type==DataType.BAG && prj.getFieldSchema().schema!=null &&
+                        prj.getFieldSchema().schema.isTwoLevelAccessRequired()) {
+                    schema = new LogicalSchema();
+                    LogicalFieldSchema tupleSchema = prj.getFieldSchema().schema.getField(0);
+                    for (int i=0;i<tupleSchema.schema.size();i++)
+                        schema.addField(tupleSchema.schema.getField(i));
+                    sourceIsBag = true;
+                    alias = prj.getFieldSchema().alias;
+                }
+                else if (prj.getFieldSchema().type==DataType.BAG){
+                    sourceIsBag = true;
+                    alias = prj.getFieldSchema().alias;
+                    if (prj.getFieldSchema().schema!=null) {
+                        schema = new LogicalSchema();
+                        for (int i=0;i<prj.getFieldSchema().schema.size();i++)
+                            schema.addField(prj.getFieldSchema().schema.getField(i));
+                    }
+                    
+                }
+                else {
+                    schema = new LogicalSchema();
+                    schema.addField(prj.getFieldSchema());
+                }
             }
         }
         return schema;
@@ -111,4 +121,31 @@ public class LOInnerLoad extends Logical
     public boolean sourceIsBag() {
         return sourceIsBag;
     }
+    
+    public String toString() {
+        StringBuilder msg = new StringBuilder();
+
+        if (alias!=null) {
+            msg.append(alias + ": ");
+        }
+        msg.append("(Name: " + name);
+        msg.append("[");
+        if (getProjection().getColNum()==-1)
+            msg.append("*");
+        else
+            msg.append(getProjection().getColNum());
+        msg.append("]");
+        msg.append(" Schema: ");
+        if (schema!=null)
+            msg.append(schema);
+        else
+            msg.append("null");
+        msg.append(")");
+        if (annotations!=null) {
+            for (Map.Entry<String, Object> entry : annotations.entrySet()) {
+                msg.append(entry);
+            }
+        }
+        return msg.toString();
+    }
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOJoin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOJoin.java?rev=988256&r1=988255&r2=988256&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOJoin.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOJoin.java Mon Aug 23 19:10:32 2010
@@ -90,7 +90,11 @@ public class LOJoin extends LogicalRelat
      * Get all of the expressions plans that are in this join.
      * @return collection of all expression plans.
      */
-    public Collection<LogicalExpressionPlan> getExpressionPlans() {
+    public MultiMap<Integer,LogicalExpressionPlan> getExpressionPlans() {
+        return mJoinPlans;
+    }
+    
+    public Collection<LogicalExpressionPlan> getExpressionPlanValues() {
         return mJoinPlans.values();
     }
     

Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLimit.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLimit.java?rev=988256&r1=988255&r2=988256&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLimit.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLimit.java Mon Aug 23 19:10:32 2010
@@ -44,10 +44,14 @@ public class LOLimit extends LogicalRela
     
     @Override
     public LogicalSchema getSchema() throws FrontendException {
+        if (schema!=null)
+            return schema;
+        
         LogicalRelationalOperator input = null;
         input = (LogicalRelationalOperator)plan.getPredecessors(this).get(0);
         
-        return input.getSchema();
+        schema = input.getSchema();
+        return schema;
     }   
     
     @Override

Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLoad.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLoad.java?rev=988256&r1=988255&r2=988256&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLoad.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLoad.java Mon Aug 23 19:10:32 2010
@@ -18,16 +18,22 @@
 
 package org.apache.pig.newplan.logical.relational;
 
+import java.io.IOException;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.pig.LoadFunc;
+import org.apache.pig.LoadMetadata;
+import org.apache.pig.ResourceSchema;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.newplan.Operator;
 import org.apache.pig.newplan.PlanVisitor;
+import org.apache.pig.newplan.logical.Util;
 
 public class LOLoad extends LogicalRelationalOperator {
     
@@ -56,7 +62,7 @@ public class LOLoad extends LogicalRelat
     
     public LoadFunc getLoadFunc() throws FrontendException {
         try { 
-            if (loadFunc == null) {
+            if (loadFunc == null && fs!=null) {
                 loadFunc = (LoadFunc)PigContext.instantiateFuncFromSpec(fs.getFuncSpec());
                 loadFunc.setUDFContextSignature(getAlias());               
             }
@@ -88,8 +94,8 @@ public class LOLoad extends LogicalRelat
             return schema;
         
         LogicalSchema originalSchema = null;
-        // TODO get schema from LoaderMetadata interface.
-        if (determinedSchema!=null) {
+
+        if (determinedSchema==null) {
             determinedSchema = getSchemaFromMetaData();
         }
         
@@ -128,7 +134,16 @@ public class LOLoad extends LogicalRelat
         return schema;
     }
 
-    private LogicalSchema getSchemaFromMetaData() {
+    private LogicalSchema getSchemaFromMetaData() throws FrontendException {
+        if (getLoadFunc()!=null && getLoadFunc() instanceof LoadMetadata) {
+            try {
+                ResourceSchema resourceSchema = ((LoadMetadata)loadFunc).getSchema(getFileSpec().getFileName(), new Job(conf));
+                Schema oldSchema = Schema.getPigSchema(resourceSchema);
+                return Util.translateSchema(oldSchema);
+            } catch (IOException e) {
+                throw new FrontendException("Cannot get schema from loadFunc " + loadFunc.getClass().getName(), 9999, e);
+            }
+        }
         return null;
     }
 

Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOSort.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOSort.java?rev=988256&r1=988255&r2=988256&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOSort.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOSort.java Mon Aug 23 19:10:32 2010
@@ -97,9 +97,14 @@ public class LOSort extends LogicalRelat
 
     @Override
     public LogicalSchema getSchema() throws FrontendException {
+        if (schema!=null)
+            return schema;
+        
         LogicalRelationalOperator input = null;
         input = (LogicalRelationalOperator)plan.getPredecessors(this).get(0);
-        return input.getSchema();
+        
+        schema = input.getSchema();
+        return schema;
     }
 
     @Override

Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOSplit.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOSplit.java?rev=988256&r1=988255&r2=988256&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOSplit.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOSplit.java Mon Aug 23 19:10:32 2010
@@ -31,10 +31,14 @@ public class LOSplit extends LogicalRela
 
     @Override
     public LogicalSchema getSchema() throws FrontendException {
+        if (schema!=null)
+            return schema;
+        
         LogicalRelationalOperator input = null;
         input = (LogicalRelationalOperator)plan.getPredecessors(this).get(0);
         
-        return input.getSchema();
+        schema = input.getSchema();
+        return schema;
     }
 
     @Override

Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOSplitOutput.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOSplitOutput.java?rev=988256&r1=988255&r2=988256&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOSplitOutput.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOSplitOutput.java Mon Aug 23 19:10:32 2010
@@ -44,10 +44,14 @@ public class LOSplitOutput extends Logic
     
     @Override
     public LogicalSchema getSchema() throws FrontendException {
+        if (schema!=null)
+            return schema;
+        
         LogicalRelationalOperator input = null;
         input = (LogicalRelationalOperator)plan.getPredecessors(this).get(0);
         
-        return input.getSchema();
+        schema = input.getSchema();
+        return schema;
     }   
     
     @Override

Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOStore.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOStore.java?rev=988256&r1=988255&r2=988256&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOStore.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOStore.java Mon Aug 23 19:10:32 2010
@@ -19,6 +19,7 @@ package org.apache.pig.newplan.logical.r
 
 //import org.apache.commons.logging.Log;
 //import org.apache.commons.logging.LogFactory;
+import org.apache.pig.SortInfo;
 import org.apache.pig.StoreFuncInterface;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileSpec;
@@ -29,7 +30,15 @@ import org.apache.pig.newplan.PlanVisito
 public class LOStore extends LogicalRelationalOperator {
     private static final long serialVersionUID = 2L;
 
-    private FileSpec output;  
+    private FileSpec output;
+    
+ // If we know how to reload the store, here's how. The lFile
+    // FileSpec is set in PigServer.postProcess. It can be used to
+    // reload this store, if the optimizer has the need.
+    private FileSpec mInputSpec;
+    private String signature;
+    private boolean isTmpStore;
+    private SortInfo sortInfo;
     transient private StoreFuncInterface storeFunc;
     
     //private static Log log = LogFactory.getLog(LOStore.class);
@@ -85,4 +94,36 @@ public class LOStore extends LogicalRela
             return false;
         }
     }
+    
+    public SortInfo getSortInfo() {
+        return sortInfo;
+    }
+
+    public void setSortInfo(SortInfo sortInfo) {
+        this.sortInfo = sortInfo;
+    }
+    
+    public boolean isTmpStore() {
+        return isTmpStore;
+    }
+
+    public void setTmpStore(boolean isTmpStore) {
+        this.isTmpStore = isTmpStore;
+    }
+    
+    public void setInputSpec(FileSpec in) {
+        mInputSpec = in;
+    }
+
+    public FileSpec getInputSpec() {
+        return mInputSpec;
+    }
+    
+    public String getSignature() {
+        return signature;
+    }
+    
+    public void setSignature(String sig) {
+        signature = sig;
+    }
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOStream.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOStream.java?rev=988256&r1=988255&r2=988256&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOStream.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOStream.java Mon Aug 23 19:10:32 2010
@@ -17,6 +17,7 @@
  */
 package org.apache.pig.newplan.logical.relational;
 
+import org.apache.pig.data.DataType;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.streaming.ExecutableManager;
 import org.apache.pig.impl.streaming.StreamingCommand;
@@ -25,6 +26,7 @@ import org.apache.pig.newplan.PlanVisito
 
 public class LOStream extends LogicalRelationalOperator {
 
+    private LogicalSchema scriptSchema;
     private static final long serialVersionUID = 2L;
     //private static Log log = LogFactory.getLog(LOFilter.class);
     
@@ -32,11 +34,14 @@ public class LOStream extends LogicalRel
     // Stream Operator this operator represents
     private StreamingCommand command;
     transient private ExecutableManager executableManager;
+    private LogicalSchema uidOnlySchema;
+    private boolean castInserted = false;
         
-    public LOStream(LogicalPlan plan, ExecutableManager exeManager, StreamingCommand cmd) {
-        super("LODistinct", plan);
+    public LOStream(LogicalPlan plan, ExecutableManager exeManager, StreamingCommand cmd, LogicalSchema schema) {
+        super("LOStream", plan);
         command = cmd;
         executableManager = exeManager;
+        scriptSchema = schema;
     }
     
     /**
@@ -60,10 +65,22 @@ public class LOStream extends LogicalRel
     public LogicalSchema getSchema() throws FrontendException {
         if (schema!=null)
             return schema;
-        LogicalRelationalOperator input = null;
-        input = (LogicalRelationalOperator)plan.getPredecessors(this).get(0);
         
-        schema = input.getSchema();
+        if (isCastInserted()) {
+            schema = new LogicalSchema();
+            for (int i=0;i<scriptSchema.size();i++) {
+                LogicalSchema.LogicalFieldSchema fs = scriptSchema.getField(i).deepCopy();
+                fs.type = DataType.BYTEARRAY;
+                schema.addField(fs);
+            }
+        } else {
+            if (scriptSchema!=null)
+                schema = scriptSchema.deepCopy();
+        }
+        
+        if (schema!=null)
+            uidOnlySchema = schema.mergeUid(uidOnlySchema);
+
         return schema;
     }   
     
@@ -83,5 +100,13 @@ public class LOStream extends LogicalRel
             return false;
         }
     }
+    
+    public void setCastInserted(boolean flag) {
+        castInserted = flag;
+    }
+        
+    public boolean isCastInserted() {
+        return castInserted;
+    }
 
 }

Modified: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java?rev=988256&r1=988255&r2=988256&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java Mon Aug 23 19:10:32 2010
@@ -46,38 +46,34 @@ public class LOUnion extends LogicalRela
         List<Operator> inputs = null;
         inputs = plan.getPredecessors(this);
         
-        // If any predecessor's schema is null, or length of predecessor's schema does not match,
-        // then the schema for union is null
-        int length = -1;
+        // If any predecessor's schema is null, then the schema for union is null
         for (Operator input : inputs) {
             LogicalRelationalOperator op = (LogicalRelationalOperator)input;
             if (op.getSchema()==null)
                 return null;
-            if (length==-1)
-                length = op.getSchema().size();
-            else {
-                if (op.getSchema().size()!=length)
-                    return null;
-            }
         }
         
-        // Check if all predecessor's schema are compatible.
-        // TODO: Migrate all existing schema merging rules
-        LogicalSchema schema0 = ((LogicalRelationalOperator)inputs.get(0)).getSchema();
-        for (int i=1;i<inputs.size();i++) {
+        LogicalSchema s0 = ((LogicalRelationalOperator)inputs.get(0)).getSchema();
+        if (inputs.size()==1)
+            return s0;
+        LogicalSchema s1 = ((LogicalRelationalOperator)inputs.get(1)).getSchema();
+        LogicalSchema mergedSchema = LogicalSchema.merge(s0, s1);
+        
+        // Merge schema
+        for (int i=2;i<inputs.size();i++) {
             LogicalSchema otherSchema = ((LogicalRelationalOperator)inputs.get(i)).getSchema();
-            if (!schema0.isEqual(otherSchema))
+            mergedSchema = LogicalSchema.merge(mergedSchema, otherSchema);
+            if (mergedSchema == null)
                 return null;
         }
         
-        // Generate merged schema based on schema of first input
-        schema = new LogicalSchema();
-        for (int i=0;i<schema0.size();i++)
+        // Bring back cached uid if any; otherwise, cache uid generated
+        for (int i=0;i<s0.size();i++)
         {
-            LogicalSchema.LogicalFieldSchema fs = new LogicalSchema.LogicalFieldSchema(schema0.getField(i));
+            LogicalSchema.LogicalFieldSchema fs = mergedSchema.getField(i);
             long uid = -1;
             for (Pair<Long, Long> pair : uidMapping) {
-                if (pair.second==schema0.getField(i).uid) {
+                if (pair.second==s0.getField(i).uid) {
                     uid = pair.first;
                     break;
                 }
@@ -91,8 +87,8 @@ public class LOUnion extends LogicalRela
             }
 
             fs.uid = uid;
-            schema.addField(fs);
         }
+        schema = mergedSchema;
         return schema;
     }