You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2013/11/28 09:56:34 UTC

svn commit: r1546314 [2/6] - in /pig/branches/tez: src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer...

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=1546314&r1=1546313&r2=1546314&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Thu Nov 28 08:56:33 2013
@@ -57,6 +57,8 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.JoinPackager;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.LitePackager;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCross;
@@ -65,7 +67,6 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
@@ -73,8 +74,6 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PONative;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage.PackageType;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackageLite;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartitionRearrange;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
@@ -83,6 +82,8 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager.PackageType;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
 import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
@@ -115,21 +116,21 @@ import org.apache.pig.newplan.logical.re
 
 /**
  * The compiler that compiles a given physical plan
- * into a DAG of MapReduce operators which can then 
+ * into a DAG of MapReduce operators which can then
  * be converted into the JobControl structure.
- * 
+ *
  * Is implemented as a visitor of the PhysicalPlan it
  * is compiling.
- * 
+ *
  * Currently supports all operators except the MR Sort
- * operator 
- * 
- * Uses a predecessor based depth first traversal. 
+ * operator
+ *
+ * Uses a predecessor based depth first traversal.
  * To compile an operator, first compiles
  * the predecessors into MapReduce Operators and tries to
  * merge the current operator into one of them. The goal
  * being to keep the number of MROpers to a minimum.
- * 
+ *
  * It also merges multiple Map jobs, created by compiling
  * the inputs individually, into a single job. Here a new
  * map job is created and then the contents of the previous
@@ -137,28 +138,28 @@ import org.apache.pig.newplan.logical.re
  * the previous map plans, should be manually moved over. So,
  * if you are adding something new take care about this.
  * Ex of this is in requestedParallelism
- * 
- * Only in case of blocking operators and splits, a new 
+ *
+ * Only in case of blocking operators and splits, a new
  * MapReduce operator is started using a store-load combination
  * to connect the two operators. Whenever this happens
  * care is taken to add the MROper into the MRPlan and connect it
  * appropriately.
- * 
+ *
  *
  */
 public class MRCompiler extends PhyPlanVisitor {
     PigContext pigContext;
-    
+
     //The plan that is being compiled
     PhysicalPlan plan;
 
     //The plan of MapReduce Operators
     MROperPlan MRPlan;
-    
+
     //The current MapReduce Operator
     //that is being compiled
     MapReduceOper curMROp;
-    
+
     //The output of compiling the inputs
     MapReduceOper[] compiledInputs = null;
 
@@ -166,38 +167,38 @@ public class MRCompiler extends PhyPlanV
     //maintained they will haunt you.
     //During the traversal a split is the only
     //operator that can be revisited from a different
-    //path. So this map stores the split job. So 
+    //path. So this map stores the split job. So
     //whenever we hit the split, we create a new MROper
     //and connect the split job using load-store and also
     //in the MRPlan
     Map<OperatorKey, MapReduceOper> splitsSeen;
-    
+
     NodeIdGenerator nig;
 
     private String scope;
-    
+
     private Random r;
-    
+
     private UDFFinder udfFinder;
-    
+
     private CompilationMessageCollector messageCollector = null;
-    
+
     private Map<PhysicalOperator,MapReduceOper> phyToMROpMap;
-        
+
     public static final String USER_COMPARATOR_MARKER = "user.comparator.func:";
-   
+
     private static final Log LOG = LogFactory.getLog(MRCompiler.class);
-    
+
     public static final String FILE_CONCATENATION_THRESHOLD = "pig.files.concatenation.threshold";
     public static final String OPTIMISTIC_FILE_CONCATENATION = "pig.optimistic.files.concatenation";
-    
+
     private int fileConcatenationThreshold = 100;
     private boolean optimisticFileConcatenation = false;
-    
+
     public MRCompiler(PhysicalPlan plan) throws MRCompilerException {
         this(plan,null);
     }
-    
+
     public MRCompiler(PhysicalPlan plan,
             PigContext pigContext) throws MRCompilerException {
         super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
@@ -211,14 +212,14 @@ public class MRCompiler extends PhyPlanV
         udfFinder = new UDFFinder();
         List<PhysicalOperator> roots = plan.getRoots();
         if((roots == null) || (roots.size() <= 0)) {
-        	int errCode = 2053;
-        	String msg = "Internal error. Did not find roots in the physical plan.";
-        	throw new MRCompilerException(msg, errCode, PigException.BUG);
+            int errCode = 2053;
+            String msg = "Internal error. Did not find roots in the physical plan.";
+            throw new MRCompilerException(msg, errCode, PigException.BUG);
         }
         scope = roots.get(0).getOperatorKey().getScope();
         messageCollector = new CompilationMessageCollector() ;
         phyToMROpMap = new HashMap<PhysicalOperator, MapReduceOper>();
-        
+
         fileConcatenationThreshold = Integer.parseInt(pigContext.getProperties()
                 .getProperty(FILE_CONCATENATION_THRESHOLD, "100"));
         optimisticFileConcatenation = pigContext.getProperties().getProperty(
@@ -226,23 +227,23 @@ public class MRCompiler extends PhyPlanV
         LOG.info("File concatenation threshold: " + fileConcatenationThreshold
                 + " optimistic? " + optimisticFileConcatenation);
     }
-    
+
     public void aggregateScalarsFiles() throws PlanException, IOException {
         List<MapReduceOper> mrOpList = new ArrayList<MapReduceOper>();
         for(MapReduceOper mrOp: MRPlan) {
             mrOpList.add(mrOp);
         }
-        
-        Configuration conf = 
+
+        Configuration conf =
             ConfigurationUtil.toConfiguration(pigContext.getProperties());
         boolean combinable = !conf.getBoolean("pig.noSplitCombination", false);
-        
+
         Map<FileSpec, MapReduceOper> seen = new HashMap<FileSpec, MapReduceOper>();
-        
+
         for(MapReduceOper mrOp: mrOpList) {
-            for(PhysicalOperator scalar: mrOp.scalars) {                
+            for(PhysicalOperator scalar: mrOp.scalars) {
                 MapReduceOper mro = phyToMROpMap.get(scalar);
-                if (scalar instanceof POStore) {                                     
+                if (scalar instanceof POStore) {
                     FileSpec oldSpec = ((POStore)scalar).getSFile();
                     MapReduceOper mro2 = seen.get(oldSpec);
                     boolean hasSeen = false;
@@ -256,17 +257,17 @@ public class MRCompiler extends PhyPlanV
                                     : (mro.requestedParallelism >= fileConcatenationThreshold))) {
                         PhysicalPlan pl = mro.reducePlan.isEmpty() ? mro.mapPlan : mro.reducePlan;
                         FileSpec newSpec = getTempFileSpec();
-                        
+
                         // replace oldSpec in mro with newSpec
                         new FindStoreNameVisitor(pl, newSpec, oldSpec).visit();
-                        
+
                         POStore newSto = getStore();
                         newSto.setSFile(oldSpec);
-                        if (MRPlan.getPredecessors(mrOp)!=null && 
+                        if (MRPlan.getPredecessors(mrOp)!=null &&
                                 MRPlan.getPredecessors(mrOp).contains(mro))
                             MRPlan.disconnect(mro, mrOp);
-                        MapReduceOper catMROp = getConcatenateJob(newSpec, mro, newSto); 
-                        MRPlan.connect(catMROp, mrOp);   
+                        MapReduceOper catMROp = getConcatenateJob(newSpec, mro, newSto);
+                        MRPlan.connect(catMROp, mrOp);
                         seen.put(oldSpec, catMROp);
                     } else {
                         if (!hasSeen) seen.put(oldSpec, mro);
@@ -275,11 +276,11 @@ public class MRCompiler extends PhyPlanV
             }
         }
     }
-    
+
     public void randomizeFileLocalizer(){
         FileLocalizer.setR(new Random());
     }
-    
+
     /**
      * Used to get the compiled plan
      * @return map reduce plan built by the compiler
@@ -287,7 +288,7 @@ public class MRCompiler extends PhyPlanV
     public MROperPlan getMRPlan() {
         return MRPlan;
     }
-    
+
     /**
      * Used to get the plan that was compiled
      * @return physical plan
@@ -296,11 +297,11 @@ public class MRCompiler extends PhyPlanV
     public PhysicalPlan getPlan() {
         return plan;
     }
-    
+
     public CompilationMessageCollector getMessageCollector() {
-    	return messageCollector;
+        return messageCollector;
     }
-    
+
     /**
      * The front-end method that the user calls to compile
      * the plan. Assumes that all submitted plans have a Store
@@ -337,16 +338,16 @@ public class MRCompiler extends PhyPlanV
         }
         ops.addAll(nativeMRs);
         Collections.sort(ops);
-        
+
         for (PhysicalOperator op : ops) {
             compile(op);
         }
-        
+
         connectSoftLink();
-        
+
         return MRPlan;
     }
-    
+
     public void connectSoftLink() throws PlanException, IOException {
         for (PhysicalOperator op : plan) {
             if (plan.getSoftLinkPredecessors(op)!=null) {
@@ -362,7 +363,7 @@ public class MRCompiler extends PhyPlanV
             }
         }
     }
-    
+
     /**
      * Compiles the plan below op into a MapReduce Operator
      * and stores it in curMROp.
@@ -376,8 +377,8 @@ public class MRCompiler extends PhyPlanV
         //An artifact of the Visitor. Need to save
         //this so that it is not overwritten.
         MapReduceOper[] prevCompInp = compiledInputs;
-        
-        //Compile each predecessor into the MROper and 
+
+        //Compile each predecessor into the MROper and
         //store them away so that we can use them for compiling
         //op.
         List<PhysicalOperator> predecessors = plan.getPredecessors(op);
@@ -405,7 +406,7 @@ public class MRCompiler extends PhyPlanV
                 PhysicalOperator p = predecessors.get(0);
                 MapReduceOper oper = null;
                 if(p instanceof POStore || p instanceof PONative){
-                    oper = phyToMROpMap.get(p); 
+                    oper = phyToMROpMap.get(p);
                 }else{
                     int errCode = 2126;
                     String msg = "Predecessor of load should be a store or mapreduce operator. Got "+p.getClass();
@@ -422,7 +423,7 @@ public class MRCompiler extends PhyPlanV
                 phyToMROpMap.put(op, curMROp);
                 return;
             }
-            
+
             Collections.sort(predecessors);
             compiledInputs = new MapReduceOper[predecessors.size()];
             int i = -1;
@@ -449,35 +450,35 @@ public class MRCompiler extends PhyPlanV
             phyToMROpMap.put(op, curMROp);
             return;
         }
-        
+
         //Now we have the inputs compiled. Do something
         //with the input oper op.
         op.visit(this);
         if(op.getRequestedParallelism() > curMROp.requestedParallelism ) {
-        	// we don't want to change prallelism for skewed join due to sampling
-        	// and pre-allocated reducers for skewed keys
-        	if (!curMROp.isSkewedJoin()) {
-        		curMROp.requestedParallelism = op.getRequestedParallelism();
-        	}
+            // we don't want to change prallelism for skewed join due to sampling
+            // and pre-allocated reducers for skewed keys
+            if (!curMROp.isSkewedJoin()) {
+                curMROp.requestedParallelism = op.getRequestedParallelism();
+            }
         }
         compiledInputs = prevCompInp;
     }
-    
+
     private MapReduceOper getMROp(){
         return new MapReduceOper(new OperatorKey(scope,nig.getNextNodeId(scope)));
     }
-    
+
     private NativeMapReduceOper getNativeMROp(String mrJar, String[] parameters) {
         return new NativeMapReduceOper(new OperatorKey(scope,nig.getNextNodeId(scope)), mrJar, parameters);
     }
-    
+
     private POLoad getLoad(){
         POLoad ld = new POLoad(new OperatorKey(scope,nig.getNextNodeId(scope)));
         ld.setPc(pigContext);
         ld.setIsTmpLoad(true);
         return ld;
     }
-    
+
     private POStore getStore(){
         POStore st = new POStore(new OperatorKey(scope,nig.getNextNodeId(scope)));
         // mark store as tmp store. These could be removed by the
@@ -485,13 +486,13 @@ public class MRCompiler extends PhyPlanV
         st.setIsTmpStore(true);
         return st;
     }
-    
+
     /**
      * A map MROper is an MROper whose map plan is still open
      * for taking more non-blocking operators.
      * A reduce MROper is an MROper whose map plan is done but
      * the reduce plan is open for taking more non-blocking opers.
-     * 
+     *
      * Used for compiling non-blocking operators. The logic here
      * is simple. If there is a single input, just push the operator
      * into whichever phase is open. Otherwise, we merge the compiled
@@ -500,14 +501,14 @@ public class MRCompiler extends PhyPlanV
      * as reduce plans can't be merged.
      * Then we add the input oper op into the merged map MROper's map plan
      * as a leaf and connect the reduce MROpers using store-load combinations
-     * to the input operator which is the leaf. Also care is taken to 
+     * to the input operator which is the leaf. Also care is taken to
      * connect the MROpers according to the dependencies.
      * @param op
      * @throws PlanException
      * @throws IOException
      */
     private void nonBlocking(PhysicalOperator op) throws PlanException, IOException{
-        
+
         if (compiledInputs.length == 1) {
             //For speed
             MapReduceOper mro = compiledInputs[0];
@@ -517,29 +518,29 @@ public class MRCompiler extends PhyPlanV
                 mro.reducePlan.addAsLeaf(op);
             } else {
                 int errCode = 2022;
-                String msg = "Both map and reduce phases have been done. This is unexpected while compiling.";                
+                String msg = "Both map and reduce phases have been done. This is unexpected while compiling.";
                 throw new PlanException(msg, errCode, PigException.BUG);
             }
             curMROp = mro;
         } else {
             List<MapReduceOper> mergedPlans = merge(compiledInputs);
-            
+
             //The first MROper is always the merged map MROper
             MapReduceOper mro = mergedPlans.remove(0);
             //Push the input operator into the merged map MROper
             mro.mapPlan.addAsLeaf(op);
-            
+
             //Connect all the reduce MROpers
             if(mergedPlans.size()>0)
                 connRedOper(mergedPlans, mro);
-            
+
             //return the compiled MROper
             curMROp = mro;
         }
     }
-    
+
     private void addToMap(PhysicalOperator op) throws PlanException, IOException{
-        
+
         if (compiledInputs.length == 1) {
             //For speed
             MapReduceOper mro = compiledInputs[0];
@@ -547,7 +548,7 @@ public class MRCompiler extends PhyPlanV
                 mro.mapPlan.addAsLeaf(op);
             } else if (mro.isMapDone() && !mro.isReduceDone()) {
                 FileSpec fSpec = getTempFileSpec();
-                
+
                 POStore st = getStore();
                 st.setSFile(fSpec);
                 mro.reducePlan.addAsLeaf(st);
@@ -557,35 +558,35 @@ public class MRCompiler extends PhyPlanV
                 compiledInputs[0] = mro;
             } else {
                 int errCode = 2022;
-                String msg = "Both map and reduce phases have been done. This is unexpected while compiling.";                
+                String msg = "Both map and reduce phases have been done. This is unexpected while compiling.";
                 throw new PlanException(msg, errCode, PigException.BUG);
             }
             curMROp = mro;
         } else {
             List<MapReduceOper> mergedPlans = merge(compiledInputs);
-            
+
             //The first MROper is always the merged map MROper
             MapReduceOper mro = mergedPlans.remove(0);
             //Push the input operator into the merged map MROper
             mro.mapPlan.addAsLeaf(op);
-            
+
             //Connect all the reduce MROpers
             if(mergedPlans.size()>0)
                 connRedOper(mergedPlans, mro);
-            
+
             //return the compiled MROper
             curMROp = mro;
         }
     }
-    
+
     /**
      * Used for compiling blocking operators. If there is a single input
      * and its map phase is still open, then close it so that further
      * operators can be compiled into the reduce phase. If its reduce phase
      * is open, add a store and close it. Start a new map MROper into which
-     * further operators can be compiled into. 
-     * 
-     * If there are multiple inputs, the logic 
+     * further operators can be compiled into.
+     *
+     * If there are multiple inputs, the logic
      * is to merge all map MROpers into one map MROper and retain
      * the reduce MROpers. Since the operator is blocking, it has
      * to be a Global Rerrange at least now. This operator need not
@@ -593,15 +594,15 @@ public class MRCompiler extends PhyPlanV
      * But this creates the map-reduce boundary. So the merged map MROper
      * is closed and its reduce phase is started. Depending on the number
      * of reduce MROpers and the number of pipelines in the map MRoper
-     * a Union operator is inserted whenever necessary. This also leads to the 
+     * a Union operator is inserted whenever necessary. This also leads to the
      * possibility of empty map plans. So have to be careful while handling
      * it in the PigMapReduce class. If there are no map
      * plans, then a new one is created as a side effect of the merge
      * process. If there are no reduce MROpers, and only a single pipeline
-     * in the map, then no union oper is added. Otherwise a Union oper is 
-     * added to the merged map MROper to which all the reduce MROpers 
+     * in the map, then no union oper is added. Otherwise a Union oper is
+     * added to the merged map MROper to which all the reduce MROpers
      * are connected by store-load combinations. Care is taken
-     * to connect the MROpers in the MRPlan.  
+     * to connect the MROpers in the MRPlan.
      * @param op
      * @throws IOException
      * @throws PlanException
@@ -615,7 +616,7 @@ public class MRCompiler extends PhyPlanV
             }
             else if(mro.isMapDone() && !mro.isReduceDone()){
                 FileSpec fSpec = getTempFileSpec();
-                
+
                 POStore st = getStore();
                 st.setSFile(fSpec);
                 mro.reducePlan.addAsLeaf(st);
@@ -627,7 +628,7 @@ public class MRCompiler extends PhyPlanV
         else{
             List<MapReduceOper> mergedPlans = merge(compiledInputs);
             MapReduceOper mro = mergedPlans.remove(0);
-            
+
             if(mergedPlans.size()>0)
                 mro.setMapDoneMultiple(true);
             else
@@ -639,13 +640,13 @@ public class MRCompiler extends PhyPlanV
             curMROp = mro;
         }
     }
-    
+
     /**
      * Connect the reduce MROpers to the leaf node in the map MROper mro
      * by adding appropriate loads
      * @param mergedPlans - The list of reduce MROpers
      * @param mro - The map MROper
-     * @throws PlanException 
+     * @throws PlanException
      * @throws IOException
      */
     private void connRedOper(List<MapReduceOper> mergedPlans, MapReduceOper mro) throws PlanException, IOException{
@@ -668,8 +669,8 @@ public class MRCompiler extends PhyPlanV
             MRPlan.connect(mmro, mro);
         }
     }
-    
-    
+
+
     /**
      * Force an end to the current map reduce job with a store into a temporary
      * file.
@@ -699,16 +700,16 @@ public class MRCompiler extends PhyPlanV
         }
         return mro;
     }
-    
+
     /**
      * Starts a new MRoper and connects it to the old
-     * one by load-store. The assumption is that the 
+     * one by load-store. The assumption is that the
      * store is already inserted into the old MROper.
      * @param fSpec
      * @param old
      * @return
      * @throws IOException
-     * @throws PlanException 
+     * @throws PlanException
      */
     private MapReduceOper startNew(FileSpec fSpec, MapReduceOper old) throws PlanException{
         POLoad ld = getLoad();
@@ -719,7 +720,7 @@ public class MRCompiler extends PhyPlanV
         MRPlan.connect(old, ret);
         return ret;
     }
- 
+
     /**
      * Returns a temporary DFS Path
      * @return
@@ -729,18 +730,18 @@ public class MRCompiler extends PhyPlanV
         return new FileSpec(FileLocalizer.getTemporaryPath(pigContext).toString(),
                 new FuncSpec(Utils.getTmpFileCompressorName(pigContext)));
     }
-    
+
     /**
      * Merges the map MROpers in the compiledInputs into a single
      * merged map MRoper and returns a List with the merged map MROper
      * as the first oper and the rest being reduce MROpers.
-     * 
+     *
      * Care is taken to remove the map MROpers that are merged from the
      * MRPlan and their connections moved over to the merged map MROper.
-     * 
+     *
      * Merge is implemented as a sequence of binary merges.
-     * merge(PhyPlan finPlan, List<PhyPlan> lst) := finPlan,merge(p) foreach p in lst 
-     *   
+     * merge(PhyPlan finPlan, List<PhyPlan> lst) := finPlan,merge(p) foreach p in lst
+     *
      * @param compiledInputs
      * @return
      * @throws PlanException
@@ -749,11 +750,11 @@ public class MRCompiler extends PhyPlanV
     private List<MapReduceOper> merge(MapReduceOper[] compiledInputs)
             throws PlanException {
         List<MapReduceOper> ret = new ArrayList<MapReduceOper>();
-        
+
         MapReduceOper mergedMap = getMROp();
         ret.add(mergedMap);
         MRPlan.add(mergedMap);
-        
+
         Set<MapReduceOper> toBeConnected = new HashSet<MapReduceOper>();
         List<MapReduceOper> remLst = new ArrayList<MapReduceOper>();
 
@@ -772,12 +773,12 @@ public class MRCompiler extends PhyPlanV
                 ret.add(mro);
             } else {
                 int errCode = 2027;
-                String msg = "Both map and reduce phases have been done. This is unexpected for a merge."; 
+                String msg = "Both map and reduce phases have been done. This is unexpected for a merge.";
                 throw new PlanException(msg, errCode, PigException.BUG);
             }
         }
         merge(ret.get(0).mapPlan, mpLst);
-        
+
         Iterator<MapReduceOper> it = toBeConnected.iterator();
         while(it.hasNext())
             MRPlan.connect(it.next(), mergedMap);
@@ -804,12 +805,12 @@ public class MRCompiler extends PhyPlanV
             for (PhysicalOperator op : opsToChange) {
                 phyToMROpMap.put(op, mergedMap);
             }
-            
+
             MRPlan.remove(rmro);
         }
         return ret;
     }
-    
+
     /**
      * The merge of a list of map plans
      * @param <O>
@@ -831,24 +832,24 @@ public class MRCompiler extends PhyPlanV
             ScalarPhyFinder scalarPhyFinder = new ScalarPhyFinder(plan);
             scalarPhyFinder.visit();
             curMROp.scalars.addAll(scalarPhyFinder.getScalars());
-            
+
             //Process UDFs
             udfFinder.setPlan(plan);
             udfFinder.visit();
             curMROp.UDFs.addAll(udfFinder.getUDFs());
         }
     }
-    
-    
+
+
     /* The visitOp methods that decide what to do with the current operator */
-    
+
     /**
      * Compiles a split operator. The logic is to
      * close the split job by replacing the split oper by
      * a store and creating a new Map MRoper and return
      * that as the current MROper to which other operators
      * would be compiled into. The new MROper would be connected
-     * to the split job by load-store. Also add the split oper 
+     * to the split job by load-store. Also add the split oper
      * to the splitsSeen map.
      * @param op - The split operator
      * @throws VisitorException
@@ -868,7 +869,7 @@ public class MRCompiler extends PhyPlanV
             throw new MRCompilerException(msg, errCode, PigException.BUG, e);
         }
     }
-    
+
     @Override
     public void visitLoad(POLoad op) throws VisitorException{
         try{
@@ -880,7 +881,7 @@ public class MRCompiler extends PhyPlanV
             throw new MRCompilerException(msg, errCode, PigException.BUG, e);
         }
     }
-    
+
     @Override
     public void visitNative(PONative op) throws VisitorException{
         // We will explode the native operator here to add a new MROper for native Mapreduce job
@@ -897,7 +898,7 @@ public class MRCompiler extends PhyPlanV
             throw new MRCompilerException(msg, errCode, PigException.BUG, e);
         }
     }
-    
+
     @Override
     public void visitStore(POStore op) throws VisitorException{
         try{
@@ -911,7 +912,7 @@ public class MRCompiler extends PhyPlanV
             throw new MRCompilerException(msg, errCode, PigException.BUG, e);
         }
     }
-    
+
     @Override
     public void visitFilter(POFilter op) throws VisitorException{
         try{
@@ -924,7 +925,7 @@ public class MRCompiler extends PhyPlanV
             throw new MRCompilerException(msg, errCode, PigException.BUG, e);
         }
     }
-    
+
     @Override
     public void visitCross(POCross op) throws VisitorException {
         try{
@@ -948,7 +949,7 @@ public class MRCompiler extends PhyPlanV
             throw new MRCompilerException(msg, errCode, PigException.BUG, e);
         }
     }
-    
+
     @Override
     public void visitLimit(POLimit op) throws VisitorException{
         try{
@@ -959,14 +960,14 @@ public class MRCompiler extends PhyPlanV
                 mro.limitPlan = op.getLimitPlan();
             }
             if (!mro.isMapDone()) {
-            	// if map plan is open, add a limit for optimization, eventually we
-            	// will add another limit to reduce plan
+                // if map plan is open, add a limit for optimization, eventually we
+                // will add another limit to reduce plan
                 if (!pigContext.inIllustrator)
                 {
                     mro.mapPlan.addAsLeaf(op);
                     mro.setMapDone(true);
                 }
-                
+
                 if (mro.reducePlan.isEmpty())
                 {
                     MRUtil.simpleConnectMapToReduce(mro, scope, nig);
@@ -982,15 +983,15 @@ public class MRCompiler extends PhyPlanV
                 }
                 else
                 {
-                    messageCollector.collect("Something in the reduce plan while map plan is not done. Something wrong!", 
-                    		MessageType.Warning, PigWarning.REDUCE_PLAN_NOT_EMPTY_WHILE_MAP_PLAN_UNDER_PROCESS);
+                    messageCollector.collect("Something in the reduce plan while map plan is not done. Something wrong!",
+                            MessageType.Warning, PigWarning.REDUCE_PLAN_NOT_EMPTY_WHILE_MAP_PLAN_UNDER_PROCESS);
                 }
             } else if (mro.isMapDone() && !mro.isReduceDone()) {
-            	// limit should add into reduce plan
+                // limit should add into reduce plan
                 mro.reducePlan.addAsLeaf(op);
             } else {
-            	messageCollector.collect("Both map and reduce phases have been done. This is unexpected while compiling!",
-            			MessageType.Warning, PigWarning.UNREACHABLE_CODE_BOTH_MAP_AND_REDUCE_PLANS_PROCESSED);
+                messageCollector.collect("Both map and reduce phases have been done. This is unexpected while compiling!",
+                        MessageType.Warning, PigWarning.UNREACHABLE_CODE_BOTH_MAP_AND_REDUCE_PLANS_PROCESSED);
             }
             phyToMROpMap.put(op, mro);
         }catch(Exception e){
@@ -1018,24 +1019,24 @@ public class MRCompiler extends PhyPlanV
 
     @Override
     public void visitCollectedGroup(POCollectedGroup op) throws VisitorException {
-        
+
         if(!curMROp.mapDone){
-            
+
             List<PhysicalOperator> roots = curMROp.mapPlan.getRoots();
             if(roots.size() != 1){
                 int errCode = 2171;
                 String errMsg = "Expected one but found more then one root physical operator in physical plan.";
                 throw new MRCompilerException(errMsg,errCode,PigException.BUG);
             }
-            
+
             PhysicalOperator phyOp = roots.get(0);
             if(! (phyOp instanceof POLoad)){
                 int errCode = 2172;
                 String errMsg = "Expected physical operator at root to be POLoad. Found : "+phyOp.getClass().getCanonicalName();
                 throw new MRCompilerException(errMsg,errCode,PigException.BUG);
             }
-            
-            
+
+
             LoadFunc loadFunc = ((POLoad)phyOp).getLoadFunc();
             try {
                 if(!(CollectableLoadFunc.class.isAssignableFrom(loadFunc.getClass()))){
@@ -1062,19 +1063,19 @@ public class MRCompiler extends PhyPlanV
                 int errCode = 2034;
                 String msg = "Error compiling operator " + op.getClass().getSimpleName();
                 throw new MRCompilerException(msg, errCode, PigException.BUG, e);
-            }    
+            }
         }
         else if(!curMROp.reduceDone){
-        	int errCode=2250;
+            int errCode=2250;
             String msg = "Blocking operators are not allowed before Collected Group. Consider dropping using 'collected'.";
-            throw new MRCompilerException(msg, errCode, PigException.BUG);   
+            throw new MRCompilerException(msg, errCode, PigException.BUG);
         }
         else{
             int errCode = 2022;
             String msg = "Both map and reduce phases have been done. This is unexpected while compiling.";
-            throw new MRCompilerException(msg, errCode, PigException.BUG);   
+            throw new MRCompilerException(msg, errCode, PigException.BUG);
         }
-        
+
     }
 
     @Override
@@ -1093,7 +1094,7 @@ public class MRCompiler extends PhyPlanV
             throw new MRCompilerException(msg, errCode, PigException.BUG, e);
         }
     }
-    
+
     @Override
     public void visitGlobalRearrange(POGlobalRearrange op) throws VisitorException{
         try{
@@ -1106,15 +1107,15 @@ public class MRCompiler extends PhyPlanV
             throw new MRCompilerException(msg, errCode, PigException.BUG, e);
         }
     }
-    
+
     @Override
     public void visitPackage(POPackage op) throws VisitorException{
         try{
             nonBlocking(op);
             phyToMROpMap.put(op, curMROp);
-            if (op.getPackageType() == PackageType.JOIN) {
+            if (op.getPkgr().getPackageType() == PackageType.JOIN) {
                 curMROp.markRegularJoin();
-            } else if (op.getPackageType() == PackageType.GROUP) {
+            } else if (op.getPkgr().getPackageType() == PackageType.GROUP) {
                 if (op.getNumInps() == 1) {
                     curMROp.markGroupBy();
                 } else if (op.getNumInps() > 1) {
@@ -1127,7 +1128,7 @@ public class MRCompiler extends PhyPlanV
             throw new MRCompilerException(msg, errCode, PigException.BUG, e);
         }
     }
-    
+
     @Override
     public void visitUnion(POUnion op) throws VisitorException{
         try{
@@ -1139,7 +1140,7 @@ public class MRCompiler extends PhyPlanV
             throw new MRCompilerException(msg, errCode, PigException.BUG, e);
         }
     }
-            
+
     /**
      * This is an operator which will have multiple inputs(= to number of join inputs)
      * But it prunes off all inputs but the fragment input and creates separate MR jobs
@@ -1156,7 +1157,7 @@ public class MRCompiler extends PhyPlanV
                 replFiles[i] = getTempFileSpec();
             }
             op.setReplFiles(replFiles);
-            
+
 
             curMROp = phyToMROpMap.get(op.getInputs().get(op.getFragment()));
             for(int i=0;i<compiledInputs.length;i++){
@@ -1165,33 +1166,33 @@ public class MRCompiler extends PhyPlanV
                     continue;
                 POStore str = getStore();
                 str.setSFile(replFiles[i]);
-                
-                Configuration conf = 
+
+                Configuration conf =
                     ConfigurationUtil.toConfiguration(pigContext.getProperties());
                 boolean combinable = !conf.getBoolean("pig.noSplitCombination", false);
-                
-                if (!mro.isMapDone()) {   
-                    if (combinable && hasTooManyInputFiles(mro, conf)) { 
+
+                if (!mro.isMapDone()) {
+                    if (combinable && hasTooManyInputFiles(mro, conf)) {
                         POStore tmpSto = getStore();
                         FileSpec fSpec = getTempFileSpec();
-                        tmpSto.setSFile(fSpec);                         
+                        tmpSto.setSFile(fSpec);
                         mro.mapPlan.addAsLeaf(tmpSto);
-                        mro.setMapDoneSingle(true);                    
-                        MapReduceOper catMROp = getConcatenateJob(fSpec, mro, str); 
+                        mro.setMapDoneSingle(true);
+                        MapReduceOper catMROp = getConcatenateJob(fSpec, mro, str);
                         MRPlan.connect(catMROp, curMROp);
                     } else {
                         mro.mapPlan.addAsLeaf(str);
-                        mro.setMapDoneSingle(true); 
+                        mro.setMapDoneSingle(true);
                         MRPlan.connect(mro, curMROp);
                     }
                 } else if (mro.isMapDone() && !mro.isReduceDone()) {
                     if (combinable && (mro.requestedParallelism >= fileConcatenationThreshold)) {
                         POStore tmpSto = getStore();
                         FileSpec fSpec = getTempFileSpec();
-                        tmpSto.setSFile(fSpec); 
+                        tmpSto.setSFile(fSpec);
                         mro.reducePlan.addAsLeaf(tmpSto);
                         mro.setReduceDone(true);
-                        MapReduceOper catMROp = getConcatenateJob(fSpec, mro, str); 
+                        MapReduceOper catMROp = getConcatenateJob(fSpec, mro, str);
                         MRPlan.connect(catMROp, curMROp);
                     } else {
                         mro.reducePlan.addAsLeaf(str);
@@ -1202,15 +1203,15 @@ public class MRCompiler extends PhyPlanV
                     int errCode = 2022;
                     String msg = "Both map and reduce phases have been done. This is unexpected while compiling.";
                     throw new PlanException(msg, errCode, PigException.BUG);
-                }              
+                }
             }
-            
+
             if (!curMROp.isMapDone()) {
                 curMROp.mapPlan.addAsLeaf(op);
             } else if (curMROp.isMapDone() && !curMROp.isReduceDone()) {
                 curMROp.reducePlan.addAsLeaf(op);
             } else {
-            	int errCode = 2022;
+                int errCode = 2022;
                 String msg = "Both map and reduce phases have been done. This is unexpected while compiling.";
                 throw new PlanException(msg, errCode, PigException.BUG);
             }
@@ -1229,33 +1230,33 @@ public class MRCompiler extends PhyPlanV
             throw new MRCompilerException(msg, errCode, PigException.BUG, e);
         }
     }
-  
+
     @SuppressWarnings("unchecked")
     private boolean hasTooManyInputFiles(MapReduceOper mro, Configuration conf) {
         if (pigContext == null || pigContext.getExecType() == ExecType.LOCAL) {
             return false;
         }
-        
+
         if (mro instanceof NativeMapReduceOper) {
             return optimisticFileConcatenation ? false : true;
         }
-               
+
         PhysicalPlan mapPlan = mro.mapPlan;
-        
+
         List<PhysicalOperator> roots = mapPlan.getRoots();
         if (roots == null || roots.size() == 0) return false;
-        
+
         int numFiles = 0;
         boolean ret = false;
         try {
             for (PhysicalOperator root : roots) {
                 POLoad ld = (POLoad) root;
                 String fileName = ld.getLFile().getFileName();
-                
+
                 if(UriUtil.isHDFSFile(fileName)){
-                    // Only if the input is an hdfs file, this optimization is 
+                    // Only if the input is an hdfs file, this optimization is
                     // useful (to reduce load on namenode)
-                    
+
                     //separate out locations separated by comma
                     String [] locations = LoadFunc.getPathStrings(fileName);
                     for(String location : locations){
@@ -1281,14 +1282,14 @@ public class MRCompiler extends PhyPlanV
                             List<MapReduceOper> preds = MRPlan.getPredecessors(mro);
                             if (preds != null && preds.size() == 1) {
                                 MapReduceOper pred = preds.get(0);
-                                if (!pred.reducePlan.isEmpty()) { 
+                                if (!pred.reducePlan.isEmpty()) {
                                     numFiles += pred.requestedParallelism;
                                 } else { // map-only job
                                     ret = hasTooManyInputFiles(pred, conf);
                                     break;
                                 }
-                            } else if (!optimisticFileConcatenation) {                    
-                                // can't determine the number of input files. 
+                            } else if (!optimisticFileConcatenation) {
+                                // can't determine the number of input files.
                                 // Treat it as having too manyfiles
                                 numFiles = fileConcatenationThreshold;
                                 break;
@@ -1298,31 +1299,31 @@ public class MRCompiler extends PhyPlanV
                 }
             }
         } catch (IOException e) {
-            LOG.warn("failed to get number of input files", e); 
+            LOG.warn("failed to get number of input files", e);
         } catch (InterruptedException e) {
-            LOG.warn("failed to get number of input files", e); 
+            LOG.warn("failed to get number of input files", e);
         }
-                
+
         LOG.info("number of input files: " + numFiles);
         return ret ? true : (numFiles >= fileConcatenationThreshold);
     }
-    
+
     /*
      * Use Mult File Combiner to concatenate small input files
      */
     private MapReduceOper getConcatenateJob(FileSpec fSpec, MapReduceOper old, POStore str)
             throws PlanException, ExecException {
-        
+
         MapReduceOper mro = startNew(fSpec, old);
         mro.mapPlan.addAsLeaf(str);
         mro.setMapDone(true);
-        
+
         LOG.info("Insert a file-concatenation job");
-                
+
         return mro;
-    }    
-    
-    /** Leftmost relation is referred as base relation (this is the one fed into mappers.) 
+    }
+
+    /** Leftmost relation is referred as base relation (this is the one fed into mappers.)
      *  First, close all MROpers except for first one (referred as baseMROPer)
      *  Then, create a MROper which will do indexing job (idxMROper)
      *  Connect idxMROper before the mappedMROper in the MRPlan.
@@ -1334,20 +1335,20 @@ public class MRCompiler extends PhyPlanV
         if(compiledInputs.length < 2){
             int errCode=2251;
             String errMsg = "Merge Cogroup work on two or more relations." +
-            		"To use map-side group-by on single relation, use 'collected' qualifier.";
+                    "To use map-side group-by on single relation, use 'collected' qualifier.";
             throw new MRCompilerException(errMsg, errCode);
         }
-            
+
         List<FuncSpec> funcSpecs = new ArrayList<FuncSpec>(compiledInputs.length-1);
         List<String> fileSpecs = new ArrayList<String>(compiledInputs.length-1);
         List<String> loaderSigns = new ArrayList<String>(compiledInputs.length-1);
-        
+
         try{
-            // Iterate through all the MROpers, disconnect side MROPers from 
+            // Iterate through all the MROpers, disconnect side MROPers from
             // MROPerPlan and collect all the information needed in different lists.
-            
+
             for(int i=0 ; i < compiledInputs.length; i++){
-                
+
                 MapReduceOper mrOper = compiledInputs[i];
                 PhysicalPlan mapPlan = mrOper.mapPlan;
                 if(mapPlan.getRoots().size() != 1){
@@ -1362,18 +1363,18 @@ public class MRCompiler extends PhyPlanV
                     String errMsg = "Expected physical operator at root to be POLoad. Found : "+rootPOOp.getClass().getCanonicalName();
                     throw new MRCompilerException(errMsg,errCode);
                 }
-                
+
                 POLoad sideLoader = (POLoad)rootPOOp;
                 FileSpec loadFileSpec = sideLoader.getLFile();
                 FuncSpec funcSpec = loadFileSpec.getFuncSpec();
                 LoadFunc loadfunc = sideLoader.getLoadFunc();
                 if(i == 0){
-                    
+
                     if(!(CollectableLoadFunc.class.isAssignableFrom(loadfunc.getClass()))){
-                    	int errCode = 2252;
+                        int errCode = 2252;
                         throw new MRCompilerException("Base loader in Cogroup must implement CollectableLoadFunc.", errCode);
                     }
-                    
+
                     ((CollectableLoadFunc)loadfunc).ensureAllKeyInstancesInSameSplit();
                     continue;
                 }
@@ -1381,30 +1382,30 @@ public class MRCompiler extends PhyPlanV
                     int errCode = 2253;
                     throw new MRCompilerException("Side loaders in cogroup must implement IndexableLoadFunc.", errCode);
                 }
-                
+
                 funcSpecs.add(funcSpec);
                 fileSpecs.add(loadFileSpec.getFileName());
                 loaderSigns.add(sideLoader.getSignature());
                 MRPlan.remove(mrOper);
             }
-            
+
             poCoGrp.setSideLoadFuncs(funcSpecs);
             poCoGrp.setSideFileSpecs(fileSpecs);
             poCoGrp.setLoaderSignatures(loaderSigns);
-            
+
             // Use map-reduce operator of base relation for the cogroup operation.
             MapReduceOper baseMROp = phyToMROpMap.get(poCoGrp.getInputs().get(0));
             if(baseMROp.mapDone || !baseMROp.reducePlan.isEmpty()){
                 int errCode = 2254;
                 throw new MRCompilerException("Currently merged cogroup is not supported after blocking operators.", errCode);
             }
-            
+
             // Create new map-reduce operator for indexing job and then configure it.
             MapReduceOper indexerMROp = getMROp();
             FileSpec idxFileSpec = getIndexingJob(indexerMROp, baseMROp, poCoGrp.getLRInnerPlansOf(0));
             poCoGrp.setIdxFuncSpec(idxFileSpec.getFuncSpec());
             poCoGrp.setIndexFileName(idxFileSpec.getFileName());
-            
+
             baseMROp.mapPlan.addAsLeaf(poCoGrp);
             for (FuncSpec funcSpec : funcSpecs)
                 baseMROp.UDFs.add(funcSpec.toString());
@@ -1436,41 +1437,41 @@ public class MRCompiler extends PhyPlanV
             throw new MRCompilerException(errMsg, errCode,e);
         }
     }
-    
+
     // Sets up the indexing job for map-side cogroups.
-    private FileSpec getIndexingJob(MapReduceOper indexerMROp, 
+    private FileSpec getIndexingJob(MapReduceOper indexerMROp,
             final MapReduceOper baseMROp, final List<PhysicalPlan> mapperLRInnerPlans)
         throws MRCompilerException, PlanException, ExecException, IOException, CloneNotSupportedException {
-        
+
         // First replace loader with  MergeJoinIndexer.
         PhysicalPlan baseMapPlan = baseMROp.mapPlan;
-        POLoad baseLoader = (POLoad)baseMapPlan.getRoots().get(0);                            
+        POLoad baseLoader = (POLoad)baseMapPlan.getRoots().get(0);
         FileSpec origLoaderFileSpec = baseLoader.getLFile();
         FuncSpec funcSpec = origLoaderFileSpec.getFuncSpec();
         LoadFunc loadFunc = baseLoader.getLoadFunc();
-        
+
         if (! (OrderedLoadFunc.class.isAssignableFrom(loadFunc.getClass()))){
             int errCode = 1104;
             String errMsg = "Base relation of merge-coGroup must implement " +
-            "OrderedLoadFunc interface. The specified loader " 
+            "OrderedLoadFunc interface. The specified loader "
             + funcSpec + " doesn't implement it";
             throw new MRCompilerException(errMsg,errCode);
         }
-        
+
         String[] indexerArgs = new String[6];
         indexerArgs[0] = funcSpec.toString();
         indexerArgs[1] = ObjectSerializer.serialize((Serializable)mapperLRInnerPlans);
         indexerArgs[3] = baseLoader.getSignature();
         indexerArgs[4] = baseLoader.getOperatorKey().scope;
-        indexerArgs[5] = Boolean.toString(false); // we care for nulls. 
-            
+        indexerArgs[5] = Boolean.toString(false); // we care for nulls.
+
         PhysicalPlan phyPlan;
-        if (baseMapPlan.getSuccessors(baseLoader) == null 
+        if (baseMapPlan.getSuccessors(baseLoader) == null
                 || baseMapPlan.getSuccessors(baseLoader).isEmpty()){
          // Load-Load-Cogroup case.
-            phyPlan = null; 
+            phyPlan = null;
         }
-            
+
         else{ // We got something. Yank it and set it as inner plan.
             phyPlan = baseMapPlan.clone();
             PhysicalOperator root = phyPlan.getRoots().get(0);
@@ -1485,20 +1486,20 @@ public class MRCompiler extends PhyPlanV
                 new FuncSpec(MergeJoinIndexer.class.getName(), indexerArgs)));
         indexerMROp.mapPlan.add(idxJobLoader);
         indexerMROp.UDFs.add(baseLoader.getLFile().getFuncSpec().toString());
-        
-        // Loader of mro will return a tuple of form - 
+
+        // Loader of mro will return a tuple of form -
         // (key1, key2, .. , WritableComparable, splitIndex). See MergeJoinIndexer for details.
-        
-        // After getting an index entry in each mapper, send all of them to one 
+
+        // After getting an index entry in each mapper, send all of them to one
         // reducer where they will be sorted on the way by Hadoop.
         MRUtil.simpleConnectMapToReduce(indexerMROp, scope, nig);
-        
+
         indexerMROp.requestedParallelism = 1; // we need exactly one reducer for indexing job.
-        
-        // We want to use typed tuple comparator for this job, instead of default 
-        // raw binary comparator used by Pig, to make sure index entries are 
+
+        // We want to use typed tuple comparator for this job, instead of default
+        // raw binary comparator used by Pig, to make sure index entries are
         // sorted correctly by Hadoop.
-        indexerMROp.useTypedComparator(true); 
+        indexerMROp.useTypedComparator(true);
 
         POStore st = getStore();
         FileSpec strFile = getTempFileSpec();
@@ -1508,7 +1509,7 @@ public class MRCompiler extends PhyPlanV
 
         return strFile;
     }
-    
+
     /** Since merge-join works on two inputs there are exactly two MROper predecessors identified  as left and right.
      *  Instead of merging two operators, both are used to generate a MR job each. First MR oper is run to generate on-the-fly index on right side.
      *  Second is used to actually do the join. First MR oper is identified as rightMROper and second as curMROper.
@@ -1530,13 +1531,13 @@ public class MRCompiler extends PhyPlanV
             }
 
             curMROp = phyToMROpMap.get(joinOp.getInputs().get(0));
-             
+
             MapReduceOper rightMROpr = null;
             if(curMROp.equals(compiledInputs[0]))
                 rightMROpr = compiledInputs[1];
             else
                 rightMROpr = compiledInputs[0];
-            
+
             // We will first operate on right side which is indexer job.
             // First yank plan of the compiled right input and set that as an inner plan of right operator.
             PhysicalPlan rightPipelinePlan;
@@ -1547,18 +1548,18 @@ public class MRCompiler extends PhyPlanV
                     String errMsg = "Expected one but found more then one root physical operator in physical plan.";
                     throw new MRCompilerException(errMsg,errCode,PigException.BUG);
                 }
-                
+
                 PhysicalOperator rightLoader = rightMapPlan.getRoots().get(0);
                 if(! (rightLoader instanceof POLoad)){
                     int errCode = 2172;
                     String errMsg = "Expected physical operator at root to be POLoad. Found : "+rightLoader.getClass().getCanonicalName();
                     throw new MRCompilerException(errMsg,errCode);
                 }
-                
+
                 if (rightMapPlan.getSuccessors(rightLoader) == null || rightMapPlan.getSuccessors(rightLoader).isEmpty())
                     // Load - Join case.
-                    rightPipelinePlan = null; 
-                
+                    rightPipelinePlan = null;
+
                 else{ // We got something on right side. Yank it and set it as inner plan of right input.
                     rightPipelinePlan = rightMapPlan.clone();
                     PhysicalOperator root = rightPipelinePlan.getRoots().get(0);
@@ -1567,8 +1568,8 @@ public class MRCompiler extends PhyPlanV
                     rightMapPlan.trimBelow(rightLoader);
                 }
             }
-            
-            else if(!rightMROpr.reduceDone){ 
+
+            else if(!rightMROpr.reduceDone){
                 // Indexer must run in map. If we are in reduce, close it and start new MROper.
                 // No need of yanking in this case. Since we are starting brand new MR Operator and it will contain nothing.
                 POStore rightStore = getStore();
@@ -1577,18 +1578,18 @@ public class MRCompiler extends PhyPlanV
                 rightMROpr.reducePlan.addAsLeaf(rightStore);
                 rightMROpr.setReduceDone(true);
                 rightMROpr = startNew(rightStrFile, rightMROpr);
-                rightPipelinePlan = null; 
+                rightPipelinePlan = null;
             }
-            
+
             else{
                 int errCode = 2022;
                 String msg = "Both map and reduce phases have been done. This is unexpected while compiling.";
                 throw new PlanException(msg, errCode, PigException.BUG);
             }
-            
+
             joinOp.setupRightPipeline(rightPipelinePlan);
-            rightMROpr.requestedParallelism = 1; // we need exactly one reducer for indexing job.        
-            
+            rightMROpr.requestedParallelism = 1; // we need exactly one reducer for indexing job.
+
             // At this point, we must be operating on map plan of right input and it would contain nothing else other then a POLoad.
             POLoad rightLoader = (POLoad)rightMROpr.mapPlan.getRoots().get(0);
             joinOp.setSignature(rightLoader.getSignature());
@@ -1598,7 +1599,7 @@ public class MRCompiler extends PhyPlanV
                 joinOp.setRightLoaderFuncSpec(rightLoader.getLFile().getFuncSpec());
                 joinOp.setRightInputFileName(rightLoader.getLFile().getFileName());
                 udfs.add(rightLoader.getLFile().getFuncSpec().toString());
-                
+
                 // we don't need the right MROper since
                 // the right loader is an IndexableLoadFunc which can handle the index
                 // itself
@@ -1607,15 +1608,15 @@ public class MRCompiler extends PhyPlanV
                     compiledInputs[0] = null;
                 } else if(rightMROpr == compiledInputs[1]) {
                     compiledInputs[1] = null;
-                } 
+                }
                 rightMROpr = null;
-                
-                // validate that the join keys in merge join are only                                                                                                                                                                              
-                // simple column projections or '*' and not expression - expressions                                                                                                                                                               
-                // cannot be handled when the index is built by the storage layer on the sorted                                                                                                                                                    
-                // data when the sorted data (and corresponding index) is written.                                                                                                                                                                 
-                // So merge join will be restricted not have expressions as                                                                                                                                                                        
-                // join keys      
+
+                // validate that the join keys in merge join are only
+                // simple column projections or '*' and not expression - expressions
+                // cannot be handled when the index is built by the storage layer on the sorted
+                // data when the sorted data (and corresponding index) is written.
+                // So merge join will be restricted not have expressions as
+                // join keys
                 int numInputs = mPlan.getPredecessors(joinOp).size(); // should be 2
                 for(int i = 0; i < numInputs; i++) {
                     List<PhysicalPlan> keyPlans = joinOp.getInnerPlansOf(i);
@@ -1632,23 +1633,23 @@ public class MRCompiler extends PhyPlanV
                 }
             } else {
                 LoadFunc loadFunc = rightLoader.getLoadFunc();
-                //Replacing POLoad with indexer is disabled for 'merge-sparse' joins.  While 
+                //Replacing POLoad with indexer is disabled for 'merge-sparse' joins.  While
                 //this feature would be useful, the current implementation of DefaultIndexableLoader
                 //is not designed to handle multiple calls to seekNear.  Specifically, it rereads the entire index
                 //for each call.  Some refactoring of this class is required - and then the check below could be removed.
-		if (joinOp.getJoinType() == LOJoin.JOINTYPE.MERGESPARSE) {
+                if (joinOp.getJoinType() == LOJoin.JOINTYPE.MERGESPARSE) {
                     int errCode = 1104;
                     String errMsg = "Right input of merge-join must implement IndexableLoadFunc. " +
                     "The specified loader " + loadFunc + " doesn't implement it";
                     throw new MRCompilerException(errMsg,errCode);
-		}
-                
+                }
+
                 // Replace POLoad with  indexer.
 
                 if (! (OrderedLoadFunc.class.isAssignableFrom(loadFunc.getClass()))){
                     int errCode = 1104;
                     String errMsg = "Right input of merge-join must implement " +
-                    "OrderedLoadFunc interface. The specified loader " 
+                    "OrderedLoadFunc interface. The specified loader "
                     + loadFunc + " doesn't implement it";
                     throw new MRCompilerException(errMsg,errCode);
                 }
@@ -1663,22 +1664,22 @@ public class MRCompiler extends PhyPlanV
                 indexerArgs[3] = rightLoader.getSignature();
                 indexerArgs[4] = rightLoader.getOperatorKey().scope;
                 indexerArgs[5] = Boolean.toString(true);
-                
+
                 FileSpec lFile = new FileSpec(rightLoader.getLFile().getFileName(),new FuncSpec(MergeJoinIndexer.class.getName(), indexerArgs));
                 rightLoader.setLFile(lFile);
-    
-                // Loader of mro will return a tuple of form - 
+
+                // Loader of mro will return a tuple of form -
                 // (keyFirst1, keyFirst2, .. , position, splitIndex) See MergeJoinIndexer
 
                 MRUtil.simpleConnectMapToReduce(rightMROpr, scope, nig);
                 rightMROpr.useTypedComparator(true);
-                
+
                 POStore st = getStore();
                 FileSpec strFile = getTempFileSpec();
                 st.setSFile(strFile);
                 rightMROpr.reducePlan.addAsLeaf(st);
                 rightMROpr.setReduceDone(true);
-                
+
                 // set up the DefaultIndexableLoader for the join operator
                 String[] defaultIndexableLoaderArgs = new String[5];
                 defaultIndexableLoaderArgs[0] = origRightLoaderFileSpec.getFuncSpec().toString();
@@ -1687,17 +1688,17 @@ public class MRCompiler extends PhyPlanV
                 defaultIndexableLoaderArgs[3] = joinOp.getOperatorKey().scope;
                 defaultIndexableLoaderArgs[4] = origRightLoaderFileSpec.getFileName();
                 joinOp.setRightLoaderFuncSpec((new FuncSpec(DefaultIndexableLoader.class.getName(), defaultIndexableLoaderArgs)));
-                joinOp.setRightInputFileName(origRightLoaderFileSpec.getFileName());  
-                
+                joinOp.setRightInputFileName(origRightLoaderFileSpec.getFileName());
+
                 joinOp.setIndexFile(strFile.getFileName());
                 udfs.add(origRightLoaderFileSpec.getFuncSpec().toString());
             }
-            
+
             // We are done with right side. Lets work on left now.
             // Join will be materialized in leftMROper.
-            if(!curMROp.mapDone) // Life is easy 
+            if(!curMROp.mapDone) // Life is easy
                 curMROp.mapPlan.addAsLeaf(joinOp);
-            
+
             else if(!curMROp.reduceDone){  // This is a map-side join. Close this MROper and start afresh.
                 POStore leftStore = getStore();
                 FileSpec leftStrFile = getTempFileSpec();
@@ -1707,7 +1708,7 @@ public class MRCompiler extends PhyPlanV
                 curMROp = startNew(leftStrFile, curMROp);
                 curMROp.mapPlan.addAsLeaf(joinOp);
             }
-            
+
             else{
                 int errCode = 2022;
                 String msg = "Both map and reduce phases have been done. This is unexpected while compiling.";
@@ -1749,30 +1750,31 @@ public class MRCompiler extends PhyPlanV
             prjStar.setResultType(DataType.TUPLE);
             prjStar.setStar(true);
             ep.add(prjStar);
-            
+
             List<PhysicalPlan> eps = new ArrayList<PhysicalPlan>();
             eps.add(ep);
-            
+
             POLocalRearrange lr = new POLocalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)));
             lr.setIndex(0);
             lr.setKeyType(DataType.TUPLE);
             lr.setPlans(eps);
             lr.setResultType(DataType.TUPLE);
             lr.setDistinct(true);
-            
+
             addToMap(lr);
-            
+
             blocking(op);
             curMROp.customPartitioner = op.getCustomPartitioner();
-            
+
             POPackage pkg = new POPackage(new OperatorKey(scope,nig.getNextNodeId(scope)));
-            pkg.setKeyType(DataType.TUPLE);
-            pkg.setDistinct(true);
+            Packager pkgr = pkg.getPkgr();
+            pkgr.setKeyType(DataType.TUPLE);
+            pkgr.setDistinct(true);
             pkg.setNumInps(1);
-            boolean[] inner = {false}; 
-            pkg.setInner(inner);
+            boolean[] inner = {false};
+            pkgr.setInner(inner);
             curMROp.reducePlan.add(pkg);
-            
+
             List<PhysicalPlan> eps1 = new ArrayList<PhysicalPlan>();
             List<Boolean> flat1 = new ArrayList<Boolean>();
             PhysicalPlan ep1 = new PhysicalPlan();
@@ -1798,160 +1800,161 @@ public class MRCompiler extends PhyPlanV
             throw new MRCompilerException(msg, errCode, PigException.BUG, e);
         }
     }
-    
+
     @Override
     public void visitSkewedJoin(POSkewedJoin op) throws VisitorException {
-		try {
-			if (compiledInputs.length != 2) {
-				int errCode = 2255;
-				throw new VisitorException("POSkewedJoin operator has " + compiledInputs.length + " inputs. It should have 2.", errCode);
-			}
-			
-			//change plan to store the first join input into a temp file
-			FileSpec fSpec = getTempFileSpec();
-			MapReduceOper mro = compiledInputs[0];
-			POStore str = getStore();
-			str.setSFile(fSpec);
-			if (!mro.isMapDone()) {
-				mro.mapPlan.addAsLeaf(str);
-				mro.setMapDoneSingle(true);
-			} else if (mro.isMapDone() && !mro.isReduceDone()) {
-				mro.reducePlan.addAsLeaf(str);
-				mro.setReduceDone(true);
-			} else {
-				int errCode = 2022;
-				String msg = "Both map and reduce phases have been done. This is unexpected while compiling.";
-				throw new PlanException(msg, errCode, PigException.BUG);
-			}
-			
-			FileSpec partitionFile = getTempFileSpec();
-			int rp = op.getRequestedParallelism();
-			
-			Pair<MapReduceOper, Integer> sampleJobPair = getSkewedJoinSampleJob(op, mro, fSpec, partitionFile, rp);            
-			rp = sampleJobPair.second;
-			
-			// set parallelism of SkewedJoin as the value calculated by sampling job
-			// if "parallel" is specified in join statement, "rp" is equal to that number
-			// if not specified, use the value that sampling process calculated
-			// based on default.
-			op.setRequestedParallelism(rp);
-						
-			// load the temp file for first table as input of join            
-			MapReduceOper[] joinInputs = new MapReduceOper[] {startNew(fSpec, sampleJobPair.first), compiledInputs[1]};            
-			MapReduceOper[] rearrangeOutputs = new MapReduceOper[2];                       
-			
-			compiledInputs = new MapReduceOper[] {joinInputs[0]};
-			// run POLocalRearrange for first join table
-			POLocalRearrange lr = new POLocalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)), rp);            
-			try {
-				lr.setIndex(0);                
-			} catch (ExecException e) {
-				int errCode = 2058;
-				String msg = "Unable to set index on newly created POLocalRearrange.";
-				throw new PlanException(msg, errCode, PigException.BUG, e);
-			}
-			
-			List<PhysicalOperator> l = plan.getPredecessors(op);
-			MultiMap<PhysicalOperator, PhysicalPlan> joinPlans = op.getJoinPlans();
-			List<PhysicalPlan> groups = joinPlans.get(l.get(0));
-			// check the type of group keys, if there are more than one field, the key is TUPLE.
-			byte type = DataType.TUPLE;
-			if (groups.size() == 1) {
-				type = groups.get(0).getLeaves().get(0).getResultType();                
-			}               
-			
-			lr.setKeyType(type);            
-			lr.setPlans(groups);
-			lr.setResultType(DataType.TUPLE);
-			
-			lr.visit(this);
-			if(lr.getRequestedParallelism() > curMROp.requestedParallelism)
-				curMROp.requestedParallelism = lr.getRequestedParallelism();
-			rearrangeOutputs[0] = curMROp;
-			
-			compiledInputs = new MapReduceOper[] {joinInputs[1]};       
-			// if the map for current input is already closed, then start a new job
-			if (compiledInputs[0].isMapDone() && !compiledInputs[0].isReduceDone()) {
-				FileSpec f = getTempFileSpec();
-				POStore s = getStore();
-				s.setSFile(f);
-				compiledInputs[0].reducePlan.addAsLeaf(s);
-				compiledInputs[0].setReduceDone(true);
-				compiledInputs[0] = startNew(f, compiledInputs[0]);
-			}     		      
-			
-			// run POPartitionRearrange for second join table
-			POPartitionRearrange pr = 
-			    new POPartitionRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)), rp);
-			pr.setPigContext(pigContext);
-			lr = pr;
-			try {
-				lr.setIndex(1);
-			} catch (ExecException e) {
-				int errCode = 2058;
-				String msg = "Unable to set index on newly created POLocalRearrange.";
-				throw new PlanException(msg, errCode, PigException.BUG, e);
-			}               
-			
-			groups = joinPlans.get(l.get(1));
-			lr.setPlans(groups);
-			lr.setKeyType(type);            
-			lr.setResultType(DataType.BAG);
-			
-			lr.visit(this);
-			if(lr.getRequestedParallelism() > curMROp.requestedParallelism)
-				curMROp.requestedParallelism = lr.getRequestedParallelism();
-			rearrangeOutputs[1] = curMROp;                     
-			compiledInputs = rearrangeOutputs;
-					   
-			
-			// create POGlobalRearrange
-			POGlobalRearrange gr = new POGlobalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)), rp);
-			// Skewed join has its own special partitioner 
-			gr.setResultType(DataType.TUPLE);
-			gr.visit(this);
-			if(gr.getRequestedParallelism() > curMROp.requestedParallelism)
-				curMROp.requestedParallelism = gr.getRequestedParallelism();
-			compiledInputs = new MapReduceOper[] {curMROp};
-			
-			// create POPakcage
-			POPackage pkg = new POPackage(new OperatorKey(scope,nig.getNextNodeId(scope)), rp);
-			pkg.setKeyType(type);
-			pkg.setResultType(DataType.TUPLE);
-			pkg.setNumInps(2);
-			boolean [] inner = op.getInnerFlags();
-			pkg.setInner(inner);            
-			pkg.visit(this);       
-			compiledInputs = new MapReduceOper[] {curMROp};
-			
-			// create POForEach
-			List<PhysicalPlan> eps = new ArrayList<PhysicalPlan>();
-			List<Boolean> flat = new ArrayList<Boolean>();
-			
-			PhysicalPlan ep;
-			// Add corresponding POProjects
-			for (int i=0; i < 2; i++ ) {
-			    ep = new PhysicalPlan();
-			    POProject prj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
-			    prj.setColumn(i+1);
-			    prj.setOverloaded(false);
-			    prj.setResultType(DataType.BAG);
-			    ep.add(prj);
-			    eps.add(ep);
-			    if (!inner[i]) {
-			        // Add an empty bag for outer join
-			        CompilerUtils.addEmptyBagOuterJoin(ep, op.getSchema(i));
-			    }
-			    flat.add(true);
-			}
-
-			POForEach fe = new POForEach(new OperatorKey(scope,nig.getNextNodeId(scope)), -1, eps, flat);
-			fe.setResultType(DataType.TUPLE);
-			
-			fe.visit(this);
-			
-			curMROp.setSkewedJoinPartitionFile(partitionFile.getFileName());
-			phyToMROpMap.put(op, curMROp);
+        try {
+            if (compiledInputs.length != 2) {
+                int errCode = 2255;
+                throw new VisitorException("POSkewedJoin operator has " + compiledInputs.length + " inputs. It should have 2.", errCode);
+            }
+
+            //change plan to store the first join input into a temp file
+            FileSpec fSpec = getTempFileSpec();
+            MapReduceOper mro = compiledInputs[0];
+            POStore str = getStore();
+            str.setSFile(fSpec);
+            if (!mro.isMapDone()) {
+                mro.mapPlan.addAsLeaf(str);
+                mro.setMapDoneSingle(true);
+            } else if (mro.isMapDone() && !mro.isReduceDone()) {
+                mro.reducePlan.addAsLeaf(str);
+                mro.setReduceDone(true);
+            } else {
+                int errCode = 2022;
+                String msg = "Both map and reduce phases have been done. This is unexpected while compiling.";
+                throw new PlanException(msg, errCode, PigException.BUG);
+            }
+
+            FileSpec partitionFile = getTempFileSpec();
+            int rp = op.getRequestedParallelism();
+
+            Pair<MapReduceOper, Integer> sampleJobPair = getSkewedJoinSampleJob(op, mro, fSpec, partitionFile, rp);
+            rp = sampleJobPair.second;
+
+            // set parallelism of SkewedJoin as the value calculated by sampling job
+            // if "parallel" is specified in join statement, "rp" is equal to that number
+            // if not specified, use the value that sampling process calculated
+            // based on default.
+            op.setRequestedParallelism(rp);
+
+            // load the temp file for first table as input of join
+            MapReduceOper[] joinInputs = new MapReduceOper[] {startNew(fSpec, sampleJobPair.first), compiledInputs[1]};
+            MapReduceOper[] rearrangeOutputs = new MapReduceOper[2];
+
+            compiledInputs = new MapReduceOper[] {joinInputs[0]};
+            // run POLocalRearrange for first join table
+            POLocalRearrange lr = new POLocalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)), rp);
+            try {
+                lr.setIndex(0);
+            } catch (ExecException e) {
+                int errCode = 2058;
+                String msg = "Unable to set index on newly created POLocalRearrange.";
+                throw new PlanException(msg, errCode, PigException.BUG, e);
+            }
+
+            List<PhysicalOperator> l = plan.getPredecessors(op);
+            MultiMap<PhysicalOperator, PhysicalPlan> joinPlans = op.getJoinPlans();
+            List<PhysicalPlan> groups = joinPlans.get(l.get(0));
+            // check the type of group keys, if there are more than one field, the key is TUPLE.
+            byte type = DataType.TUPLE;
+            if (groups.size() == 1) {
+                type = groups.get(0).getLeaves().get(0).getResultType();
+            }
+
+            lr.setKeyType(type);
+            lr.setPlans(groups);
+            lr.setResultType(DataType.TUPLE);
+
+            lr.visit(this);
+            if(lr.getRequestedParallelism() > curMROp.requestedParallelism)
+                curMROp.requestedParallelism = lr.getRequestedParallelism();
+            rearrangeOutputs[0] = curMROp;
+
+            compiledInputs = new MapReduceOper[] {joinInputs[1]};
+            // if the map for current input is already closed, then start a new job
+            if (compiledInputs[0].isMapDone() && !compiledInputs[0].isReduceDone()) {
+                FileSpec f = getTempFileSpec();
+                POStore s = getStore();
+                s.setSFile(f);
+                compiledInputs[0].reducePlan.addAsLeaf(s);
+                compiledInputs[0].setReduceDone(true);
+                compiledInputs[0] = startNew(f, compiledInputs[0]);
+            }
+
+            // run POPartitionRearrange for second join table
+            POPartitionRearrange pr =
+                new POPartitionRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)), rp);
+            pr.setPigContext(pigContext);
+            lr = pr;
+            try {
+                lr.setIndex(1);
+            } catch (ExecException e) {
+                int errCode = 2058;
+                String msg = "Unable to set index on newly created POLocalRearrange.";
+                throw new PlanException(msg, errCode, PigException.BUG, e);
+            }
+
+            groups = joinPlans.get(l.get(1));
+            lr.setPlans(groups);
+            lr.setKeyType(type);
+            lr.setResultType(DataType.BAG);
+
+            lr.visit(this);
+            if(lr.getRequestedParallelism() > curMROp.requestedParallelism)
+                curMROp.requestedParallelism = lr.getRequestedParallelism();
+            rearrangeOutputs[1] = curMROp;
+            compiledInputs = rearrangeOutputs;
+
+
+            // create POGlobalRearrange
+            POGlobalRearrange gr = new POGlobalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)), rp);
+            // Skewed join has its own special partitioner
+            gr.setResultType(DataType.TUPLE);
+            gr.visit(this);
+            if(gr.getRequestedParallelism() > curMROp.requestedParallelism)
+                curMROp.requestedParallelism = gr.getRequestedParallelism();
+            compiledInputs = new MapReduceOper[] {curMROp};
+
+            // create POPakcage
+            POPackage pkg = new POPackage(new OperatorKey(scope,nig.getNextNodeId(scope)), rp);
+            Packager pkgr = pkg.getPkgr();
+            pkgr.setKeyType(type);
+            pkg.setResultType(DataType.TUPLE);
+            pkg.setNumInps(2);
+            boolean [] inner = op.getInnerFlags();
+            pkgr.setInner(inner);
+            pkg.visit(this);
+            compiledInputs = new MapReduceOper[] {curMROp};
+
+            // create POForEach
+            List<PhysicalPlan> eps = new ArrayList<PhysicalPlan>();
+            List<Boolean> flat = new ArrayList<Boolean>();
+
+            PhysicalPlan ep;
+            // Add corresponding POProjects
+            for (int i=0; i < 2; i++ ) {
+                ep = new PhysicalPlan();
+                POProject prj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
+                prj.setColumn(i+1);
+                prj.setOverloaded(false);
+                prj.setResultType(DataType.BAG);
+                ep.add(prj);
+                eps.add(ep);
+                if (!inner[i]) {
+                    // Add an empty bag for outer join
+                    CompilerUtils.addEmptyBagOuterJoin(ep, op.getSchema(i));
+                }
+                flat.add(true);
+            }
+
+            POForEach fe = new POForEach(new OperatorKey(scope,nig.getNextNodeId(scope)), -1, eps, flat);
+            fe.setResultType(DataType.TUPLE);
+
+            fe.visit(this);
+
+            curMROp.setSkewedJoinPartitionFile(partitionFile.getFileName());
+            phyToMROpMap.put(op, curMROp);
         }catch(PlanException e) {
             int errCode = 2034;
             String msg = "Error compiling operator " + op.getClass().getSimpleName();
@@ -1972,11 +1975,11 @@ public class MRCompiler extends PhyPlanV
             FileSpec quantFile = getTempFileSpec();
             int rp = op.getRequestedParallelism();
             Pair<POProject, Byte>[] fields = getSortCols(op.getSortPlans());
-            Pair<MapReduceOper, Integer> quantJobParallelismPair = 
+            Pair<MapReduceOper, Integer> quantJobParallelismPair =
                 getQuantileJob(op, mro, fSpec, quantFile, rp);
-            curMROp = getSortJob(op, quantJobParallelismPair.first, fSpec, quantFile, 
+            curMROp = getSortJob(op, quantJobParallelismPair.first, fSpec, quantFile,
                     quantJobParallelismPair.second, fields);
-            
+
             if(op.isUDFComparatorUsed){
                 curMROp.UDFs.add(op.getMSortFunc().getFuncSpec().toString());
                 curMROp.isUDFComparatorUsed = true;
@@ -2068,7 +2071,7 @@ public class MRCompiler extends PhyPlanV
         String msg = "No expression plan found in POSort.";
         throw new PlanException(msg, errCode, PigException.BUG);
     }
-    
+
     private MapReduceOper getSortJob(
             POSort sort,
             MapReduceOper quantJob,
@@ -2083,11 +2086,11 @@ public class MRCompiler extends PhyPlanV
 
         long limit = sort.getLimit();
         mro.limit = limit;
-        
+
         List<PhysicalPlan> eps1 = new ArrayList<PhysicalPlan>();
 
         byte keyType = DataType.UNKNOWN;
-        
+
         boolean[] sortOrder;
 
         List<Boolean> sortOrderList = sort.getMAscCols();
@@ -2139,13 +2142,13 @@ public class MRCompiler extends PhyPlanV
                 throw new PlanException(msg, errCode, PigException.BUG, ve);
             }
         }
-        
+
         POLocalRearrange lr = new POLocalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)));
         try {
             lr.setIndex(0);
         } catch (ExecException e) {
-        	int errCode = 2058;
-        	String msg = "Unable to set index on newly created POLocalRearrange.";
+            int errCode = 2058;
+            String msg = "Unable to set index on newly created POLocalRearrange.";
             throw new PlanException(msg, errCode, PigException.BUG, e);
         }
         lr.setKeyType((fields == null || fields.length>1) ? DataType.TUPLE :
@@ -2154,16 +2157,18 @@ public class MRCompiler extends PhyPlanV
         lr.setResultType(DataType.TUPLE);
         lr.addOriginalLocation(sort.getAlias(), sort.getOriginalLocations());
         mro.mapPlan.addAsLeaf(lr);
-        
+
         mro.setMapDone(true);
-        
+
         if (limit!=-1) {
-        	POPackageLite pkg_c = new POPackageLite(new OperatorKey(scope,nig.getNextNodeId(scope)));
-        	pkg_c.setKeyType((fields.length>1) ? DataType.TUPLE : keyType);
+            POPackage pkg_c = new POPackage(new OperatorKey(scope,
+                    nig.getNextNodeId(scope)));
+            LitePackager pkgr = new LitePackager();
+            pkgr.setKeyType((fields.length > 1) ? DataType.TUPLE : keyType);
+            pkg_c.setPkgr(pkgr);
             pkg_c.setNumInps(1);
-            //pkg.setResultType(DataType.TUPLE);            
             mro.combinePlan.add(pkg_c);
-        	
+
             List<PhysicalPlan> eps_c1 = new ArrayList<PhysicalPlan>();
             List<Boolean> flat_c1 = new ArrayList<Boolean>();
             PhysicalPlan ep_c1 = new PhysicalPlan();
@@ -2174,38 +2179,41 @@ public class MRCompiler extends PhyPlanV
             ep_c1.add(prj_c1);
             eps_c1.add(ep_c1);
             flat_c1.add(true);
-            POForEach fe_c1 = new POForEach(new OperatorKey(scope,nig.getNextNodeId(scope)), 
-            		-1, eps_c1, flat_c1);
+            POForEach fe_c1 = new POForEach(new OperatorKey(scope,nig.getNextNodeId(scope)),
+                    -1, eps_c1, flat_c1);
             fe_c1.setResultType(DataType.TUPLE);
             mro.combinePlan.addAsLeaf(fe_c1);
-            
+
             POLimit pLimit = new POLimit(new OperatorKey(scope,nig.getNextNodeId(scope)));
-        	pLimit.setLimit(limit);
-        	mro.combinePlan.addAsLeaf(pLimit);
-            
+            pLimit.setLimit(limit);
+            mro.combinePlan.addAsLeaf(pLimit);
+
             List<PhysicalPlan> eps_c2 = new ArrayList<PhysicalPlan>();
             eps_c2.addAll(sort.getSortPlans());
-        
-	        POLocalRearrange lr_c2 = new POLocalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)));
-	        try {
+
+            POLocalRearrange lr_c2 = new POLocalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)));
+            try {
                 lr_c2.setIndex(0);
             } catch (ExecException e) {
-            	int errCode = 2058;
-            	String msg = "Unable to set index on newly created POLocalRearrange.";            	
+                int errCode = 2058;
+                String msg = "Unable to set index on newly created POLocalRearrange.";
                 throw new PlanException(msg, errCode, PigException.BUG, e);
             }
-	        lr_c2.setKeyType((fields.length>1) ? DataType.TUPLE : keyType);
-	        lr_c2.setPlans(eps_c2);
-	        lr_c2.setResultType(DataType.TUPLE);
-	        mro.combinePlan.addAsLeaf(lr_c2);
-        }
-        
-        POPackageLite pkg = new POPackageLite(new OperatorKey(scope,nig.getNextNodeId(scope)));
-        pkg.setKeyType((fields == null || fields.length>1) ? DataType.TUPLE :
-            keyType);
-        pkg.setNumInps(1);       
+            lr_c2.setKeyType((fields.length>1) ? DataType.TUPLE : keyType);
+            lr_c2.setPlans(eps_c2);
+            lr_c2.setResultType(DataType.TUPLE);
+            mro.combinePlan.addAsLeaf(lr_c2);
+        }
+
+        POPackage pkg = new POPackage(new OperatorKey(scope,
+                nig.getNextNodeId(scope)));
+        LitePackager pkgr = new LitePackager();
+        pkgr.setKeyType((fields == null || fields.length > 1) ? DataType.TUPLE
+                : keyType);
+        pkg.setPkgr(pkgr);
+        pkg.setNumInps(1);
         mro.reducePlan.add(pkg);
-        
+
         PhysicalPlan ep = new PhysicalPlan();
         POProject prj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
         prj.setColumn(1);
@@ -2222,13 +2230,12 @@ public class MRCompiler extends PhyPlanV
         mro.phyToMRMap.put(sort, nfe1);
         if (limit!=-1)
         {
-	        POLimit pLimit2 = new POLimit(new OperatorKey(scope,nig.getNextNodeId(scope)));
-	    	pLimit2.setLimit(limit);
-	    	mro.reducePlan.addAsLeaf(pLimit2);
-	    	mro.phyToMRMap.put(sort, pLimit2);
+            POLimit pLimit2 = new POLimit(new OperatorKey(scope,nig.getNextNodeId(scope)));
+            pLimit2.setLimit(limit);
+            mro.reducePlan.addAsLeaf(pLimit2);
+            mro.phyToMRMap.put(sort, pLimit2);
         }
 
-//        ep1.add(innGen);
         return mro;
     }
 
@@ -2238,13 +2245,13 @@ public class MRCompiler extends PhyPlanV
             FileSpec lFile,
             FileSpec quantFile,
             int rp) throws PlanException, VisitorException {
-        
+
         POSort sort = new POSort(inpSort.getOperatorKey(), inpSort
                 .getRequestedParallelism(), null, inpSort.getSortPlans(),
                 inpSort.getMAscCols(), inpSort.getMSortFunc());
         sort.addOriginalLocation(inpSort.getAlias(), inpSort.getOriginalLocations());
-    	
-    	// Turn the asc/desc array into an array of strings so that we can pass it
+
+        // Turn the asc/desc array into an array of strings so that we can pass it
         // to the FindQuantiles function.
         List<Boolean> ascCols = inpSort.getMAscCols();
         String[] ascs = new String[ascCols.size()];
@@ -2261,65 +2268,65 @@ public class MRCompiler extends PhyPlanV
                 ctorArgs[j+1] = ascs[j];
             }
         }
-        
+
         return getSamplingJob(sort, prevJob, null, lFile, quantFile, rp, null, FindQuantiles.class.getName(), ctorArgs, RandomSampleLoader.class.getName());
     }
-    
+
     /**
      * Create Sampling job for skewed join.
      */
-    private Pair<MapReduceOper, Integer> getSkewedJoinSampleJob(POSkewedJoin op, MapReduceOper prevJob, 
-    		FileSpec lFile, FileSpec sampleFile, int rp ) throws PlanException, VisitorException {
-    	    	
-    	MultiMap<PhysicalOperator, PhysicalPlan> joinPlans = op.getJoinPlans();
-    	
-    	List<PhysicalOperator> l = plan.getPredecessors(op);
-    	List<PhysicalPlan> groups = joinPlans.get(l.get(0));
-    	List<Boolean> ascCol = new ArrayList<Boolean>();
-    	for(int i=0; i<groups.size(); i++) {    		    		
-    		ascCol.add(false);
-    	}
-    	
-    	POSort sort = new POSort(op.getOperatorKey(), op.getRequestedParallelism(), null, groups, ascCol, null);
-    	
-    	// set up transform plan to get keys and memory size of input tuples
-    	// it first adds all the plans to get key columns,
-    	List<PhysicalPlan> transformPlans = new ArrayList<PhysicalPlan>(); 
-    	transformPlans.addAll(groups);
-        
-    	// then it adds a column for memory size
-    	POProject prjStar = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
+    private Pair<MapReduceOper, Integer> getSkewedJoinSampleJob(POSkewedJoin op, MapReduceOper prevJob,
+            FileSpec lFile, FileSpec sampleFile, int rp ) throws PlanException, VisitorException {
+
+        MultiMap<PhysicalOperator, PhysicalPlan> joinPlans = op.getJoinPlans();
+
+        List<PhysicalOperator> l = plan.getPredecessors(op);
+        List<PhysicalPlan> groups = joinPlans.get(l.get(0));
+        List<Boolean> ascCol = new ArrayList<Boolean>();
+        for(int i=0; i<groups.size(); i++) {
+            ascCol.add(false);
+        }
+
+        POSort sort = new POSort(op.getOperatorKey(), op.getRequestedParallelism(), null, groups, ascCol, null);
+
+        // set up transform plan to get keys and memory size of input tuples
+        // it first adds all the plans to get key columns,
+        List<PhysicalPlan> transformPlans = new ArrayList<PhysicalPlan>();
+        transformPlans.addAll(groups);
+
+        // then it adds a column for memory size
+        POProject prjStar = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
         prjStar.setResultType(DataType.TUPLE);
-        prjStar.setStar(true);            
-        
+        prjStar.setStar(true);
+
         List<PhysicalOperator> ufInps = new ArrayList<PhysicalOperator>();
         ufInps.add(prjStar);
-        
-    	PhysicalPlan ep = new PhysicalPlan();
-    	POUserFunc uf = new POUserFunc(new OperatorKey(scope,nig.getNextNodeId(scope)), -1, ufInps,
-    	            new FuncSpec(GetMemNumRows.class.getName(), (String[])null));
-    	uf.setResultType(DataType.TUPLE);
-    	ep.add(uf);     
-    	ep.add(prjStar);
-    	ep.connect(prjStar, uf);
-
-        transformPlans.add(ep);      
-        
-    	try{    		
-    		// pass configurations to the User Function
-    		String per = pigContext.getProperties().getProperty("pig.skewedjoin.reduce.memusage", 
+
+        PhysicalPlan ep = new PhysicalPlan();
+        POUserFunc uf = new POUserFunc(new OperatorKey(scope,nig.getNextNodeId(scope)), -1, ufInps,
+                    new FuncSpec(GetMemNumRows.class.getName(), (String[])null));
+        uf.setResultType(DataType.TUPLE);
+        ep.add(uf);
+        ep.add(prjStar);
+        ep.connect(prjStar, uf);
+
+        transformPlans.add(ep);
+
+        try{
+            // pass configurations to the User Function
+            String per = pigContext.getProperties().getProperty("pig.skewedjoin.reduce.memusage",
                                    String.valueOf(PartitionSkewedKeys.DEFAULT_PERCENT_MEMUSAGE));
-    		String mc = pigContext.getProperties().getProperty("pig.skewedjoin.reduce.maxtuple", "0");
-    		String inputFile = lFile.getFileName();
+            String mc = pigContext.getProperties().getProperty("pig.skewedjoin.reduce.maxtuple", "0");
+            String inputFile = lFile.getFileName();
+
+            return getSamplingJob(sort, prevJob, transformPlans, lFile, sampleFile, rp, null,

[... 440 lines stripped ...]