You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2014/04/06 12:54:14 UTC

svn commit: r1585283 [1/3] - in /pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/fetch/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/ src/org/ap...

Author: cheolsoo
Date: Sun Apr  6 10:54:13 2014
New Revision: 1585283

URL: http://svn.apache.org/r1585283
Log:
PIG-3591: Refactor POPackage to separate MR specific code from packaging (mwagner via cheolsoo)

Added:
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/JoinPackager.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/MultiQueryPackager.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java
Removed:
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCombinerPackage.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POJoinPackage.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackageLite.java
Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchOptimizer.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/AccumulatorOptimizer.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRUtil.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizer.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/XMLPhysicalPlanPrinter.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPartialAgg.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java
    pig/trunk/src/org/apache/pig/data/ReadOnceBag.java
    pig/trunk/src/org/apache/pig/data/SelfSpillBag.java
    pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
    pig/trunk/src/org/apache/pig/pen/IllustratorAttacher.java
    pig/trunk/src/org/apache/pig/pen/PhysicalPlanResetter.java
    pig/trunk/test/org/apache/pig/test/TestExampleGenerator.java
    pig/trunk/test/org/apache/pig/test/TestJobSubmission.java
    pig/trunk/test/org/apache/pig/test/TestPackage.java
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/Cogroup.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC1.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC10.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC11.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC12.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC13.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC14.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC15.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC16.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC17.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC18.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC19.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC2.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC3.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC4.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC6.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC7.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC8.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC9.gld

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1585283&r1=1585282&r2=1585283&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Sun Apr  6 10:54:13 2014
@@ -30,6 +30,8 @@ PIG-2207: Support custom counters for ag
 
 IMPROVEMENTS
 
+PIG-3591: Refactor POPackage to separate MR specific code from packaging (mwagner via cheolsoo)
+
 PIG-3449: Move JobCreationException to org.apache.pig.backend.hadoop.executionengine (cheolsoo)
 
 PIG-3765: Ability to disable Pig commands and operators (prkommireddi)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchOptimizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchOptimizer.java?rev=1585283&r1=1585282&r2=1585283&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchOptimizer.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/fetch/FetchOptimizer.java Sun Apr  6 10:54:13 2014
@@ -32,19 +32,16 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCombinerPackage;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCross;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODemux;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMultiQueryPackage;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PONative;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POOptimizedForEach;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
@@ -231,16 +228,6 @@ public class FetchOptimizer {
         }
 
         @Override
-        public void visitCombinerPackage(POCombinerPackage pkg) throws VisitorException {
-            planFetchable = false;
-        }
-
-        @Override
-        public void visitMultiQueryPackage(POMultiQueryPackage pkg) throws VisitorException {
-            planFetchable = false;
-        }
-
-        @Override
         public void visitSplit(POSplit spl) throws VisitorException {
             planFetchable = false;
         }
@@ -271,11 +258,6 @@ public class FetchOptimizer {
         }
 
         @Override
-        public void visitJoinPackage(POJoinPackage joinPackage) throws VisitorException {
-            planFetchable = false;
-        }
-
-        @Override
         public void visitCross(POCross cross) throws VisitorException {
             planFetchable = false;
         }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/AccumulatorOptimizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/AccumulatorOptimizer.java?rev=1585283&r1=1585282&r2=1585283&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/AccumulatorOptimizer.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/AccumulatorOptimizer.java Sun Apr  6 10:54:13 2014
@@ -37,10 +37,10 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.UnaryExpressionOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSortedDistinct;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.plan.DepthFirstWalker;
@@ -71,13 +71,19 @@ public class AccumulatorOptimizer extend
             return; 
         }
         
+        Packager pkgr = ((POPackage) po_package).getPkgr();
+        // Check that this is a standard package, not a subclass
+        if (!pkgr.getClass().equals(Packager.class)) {
+            return;
+        }
+
         // if POPackage is for distinct, just return
-        if (((POPackage)po_package).isDistinct()) {
+        if (pkgr.isDistinct()) {
             return;
         }
         
         // if any input to POPackage is inner, just return
-        boolean[] isInner = ((POPackage)po_package).getInner();
+        boolean[] isInner = pkgr.getInner();
         for(boolean b: isInner) {
             if (b) {
                 return;

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java?rev=1585283&r1=1585282&r2=1585283&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/CombinerOptimizer.java Sun Apr  6 10:54:13 2014
@@ -24,41 +24,38 @@ import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-
-import org.apache.pig.PigException;
 import org.apache.pig.FuncSpec;
+import org.apache.pig.PigException;
 import org.apache.pig.PigWarning;
-import org.apache.pig.data.DataType;
 import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.CombinerPackager;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCombinerPackage;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartialAgg;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPreCombinerLocalRearrange;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
+import org.apache.pig.data.DataType;
 import org.apache.pig.impl.plan.CompilationMessageCollector;
+import org.apache.pig.impl.plan.CompilationMessageCollector.MessageType;
 import org.apache.pig.impl.plan.DependencyOrderWalker;
 import org.apache.pig.impl.plan.DepthFirstWalker;
-import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.PlanWalker;
 import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.impl.plan.CompilationMessageCollector.MessageType;
 import org.apache.pig.impl.plan.optimizer.OptimizerException;
 import org.apache.pig.impl.util.Pair;
 
@@ -265,8 +262,10 @@ public class CombinerOptimizer extends M
                 // as it needs to act differently than the regular
                 // package operator.
                 mr.combinePlan = new PhysicalPlan();
-                POCombinerPackage combinePack =
-                    new POCombinerPackage(pack, bags);
+                CombinerPackager pkgr = new CombinerPackager(pack.getPkgr(),
+                        bags);
+                POPackage combinePack = pack.clone();
+                combinePack.setPkgr(pkgr);
                 mr.combinePlan.add(combinePack);
                 mr.combinePlan.add(cfe);
                 mr.combinePlan.connect(combinePack, cfe);
@@ -304,20 +303,7 @@ public class CombinerOptimizer extends M
                 // Change the package operator in the reduce plan to
                 // be the POCombiner package, as it needs to act
                 // differently than the regular package operator.
-                POCombinerPackage newReducePack =
-                    new POCombinerPackage(pack, bags);
-                mr.reducePlan.replace(pack, newReducePack);
-
-                // the replace() above only changes
-                // the plan and does not change "inputs" to 
-                // operators
-                // set up "inputs" for the operator after
-                // package correctly
-                List<PhysicalOperator> packList = new ArrayList<PhysicalOperator>();
-                packList.add(newReducePack);
-                List<PhysicalOperator> sucs = mr.reducePlan.getSuccessors(newReducePack);
-                // there should be only one successor to package
-                sucs.get(0).setInputs(packList);
+                pack.setPkgr(pkgr.clone());
             } catch (Exception e) {
                 int errCode = 2018;
                 String msg = "Internal error. Unable to introduce the combiner for optimization.";

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1585283&r1=1585282&r2=1585283&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Sun Apr  6 10:54:13 2014
@@ -827,7 +827,8 @@ public class JobControlCompiler{
                 }
                 if (!pigContext.inIllustrator)
                     conf.set("pig.reduce.package", ObjectSerializer.serialize(pack));
-                conf.set("pig.reduce.key.type", Byte.toString(pack.getKeyType()));
+                conf.set("pig.reduce.key.type",
+                        Byte.toString(pack.getPkgr().getKeyType()));
 
                 if (mro.getUseSecondaryKey()) {
                     nwJob.setGroupingComparatorClass(PigSecondaryKeyGroupComparator.class);
@@ -840,9 +841,11 @@ public class JobControlCompiler{
                 }
                 else
                 {
-                    Class<? extends WritableComparable> keyClass = HDataType.getWritableComparableTypes(pack.getKeyType()).getClass();
+                    Class<? extends WritableComparable> keyClass = HDataType
+                            .getWritableComparableTypes(
+                                    pack.getPkgr().getKeyType()).getClass();
                     nwJob.setOutputKeyClass(keyClass);
-                    selectComparator(mro, pack.getKeyType(), nwJob);
+                    selectComparator(mro, pack.getPkgr().getKeyType(), nwJob);
                 }
                 nwJob.setOutputValueClass(NullableTuple.class);
             }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=1585283&r1=1585282&r2=1585283&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Sun Apr  6 10:54:13 2014
@@ -57,6 +57,8 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.JoinPackager;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.LitePackager;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCross;
@@ -65,7 +67,6 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
@@ -73,8 +74,6 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PONative;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage.PackageType;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackageLite;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartitionRearrange;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
@@ -83,6 +82,8 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager.PackageType;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
 import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
@@ -1104,9 +1105,9 @@ public class MRCompiler extends PhyPlanV
         try{
             nonBlocking(op);
             phyToMROpMap.put(op, curMROp);
-            if (op.getPackageType() == PackageType.JOIN) {
+            if (op.getPkgr().getPackageType() == PackageType.JOIN) {
                 curMROp.markRegularJoin();
-            } else if (op.getPackageType() == PackageType.GROUP) {
+            } else if (op.getPkgr().getPackageType() == PackageType.GROUP) {
                 if (op.getNumInps() == 1) {
                     curMROp.markGroupBy();
                 } else if (op.getNumInps() > 1) {
@@ -1758,11 +1759,12 @@ public class MRCompiler extends PhyPlanV
             curMROp.customPartitioner = op.getCustomPartitioner();
             
             POPackage pkg = new POPackage(new OperatorKey(scope,nig.getNextNodeId(scope)));
-            pkg.setKeyType(DataType.TUPLE);
-            pkg.setDistinct(true);
+            Packager pkgr = pkg.getPkgr();
+            pkgr.setKeyType(DataType.TUPLE);
+            pkgr.setDistinct(true);
             pkg.setNumInps(1);
             boolean[] inner = {false}; 
-            pkg.setInner(inner);
+            pkgr.setInner(inner);
             curMROp.reducePlan.add(pkg);
             
             List<PhysicalPlan> eps1 = new ArrayList<PhysicalPlan>();
@@ -1908,11 +1910,12 @@ public class MRCompiler extends PhyPlanV
 			
 			// create POPakcage
 			POPackage pkg = new POPackage(new OperatorKey(scope,nig.getNextNodeId(scope)), rp);
-			pkg.setKeyType(type);
+            Packager pkgr = pkg.getPkgr();
+            pkgr.setKeyType(type);
 			pkg.setResultType(DataType.TUPLE);
 			pkg.setNumInps(2);
 			boolean [] inner = op.getInnerFlags();
-			pkg.setInner(inner);            
+            pkgr.setInner(inner);
 			pkg.visit(this);       
 			compiledInputs = new MapReduceOper[] {curMROp};
 			
@@ -2150,8 +2153,11 @@ public class MRCompiler extends PhyPlanV
         mro.setMapDone(true);
         
         if (limit!=-1) {
-        	POPackageLite pkg_c = new POPackageLite(new OperatorKey(scope,nig.getNextNodeId(scope)));
-        	pkg_c.setKeyType((fields.length>1) ? DataType.TUPLE : keyType);
+            POPackage pkg_c = new POPackage(new OperatorKey(scope,
+                    nig.getNextNodeId(scope)));
+            LitePackager pkgr = new LitePackager();
+            pkgr.setKeyType((fields.length > 1) ? DataType.TUPLE : keyType);
+            pkg_c.setPkgr(pkgr);
             pkg_c.setNumInps(1);
             //pkg.setResultType(DataType.TUPLE);            
             mro.combinePlan.add(pkg_c);
@@ -2191,11 +2197,14 @@ public class MRCompiler extends PhyPlanV
 	        lr_c2.setResultType(DataType.TUPLE);
 	        mro.combinePlan.addAsLeaf(lr_c2);
         }
-        
-        POPackageLite pkg = new POPackageLite(new OperatorKey(scope,nig.getNextNodeId(scope)));
-        pkg.setKeyType((fields == null || fields.length>1) ? DataType.TUPLE :
-            keyType);
-        pkg.setNumInps(1);       
+
+        POPackage pkg = new POPackage(new OperatorKey(scope,
+                nig.getNextNodeId(scope)));
+        LitePackager pkgr = new LitePackager();
+        pkgr.setKeyType((fields == null || fields.length > 1) ? DataType.TUPLE
+                : keyType);
+        pkg.setPkgr(pkgr);
+        pkg.setNumInps(1);
         mro.reducePlan.add(pkg);
         
         PhysicalPlan ep = new PhysicalPlan();
@@ -2447,10 +2456,12 @@ public class MRCompiler extends PhyPlanV
         mro.setMapDone(true);
         
         POPackage pkg = new POPackage(new OperatorKey(scope,nig.getNextNodeId(scope)));
-        pkg.setKeyType(DataType.CHARARRAY);
+        Packager pkgr = new Packager();
+        pkg.setPkgr(pkgr);
+        pkgr.setKeyType(DataType.CHARARRAY);
         pkg.setNumInps(1);
         boolean[] inner = {false}; 
-        pkg.setInner(inner);
+        pkgr.setInner(inner);
         mro.reducePlan.add(pkg);
         
         // Lets start building the plan which will have the sort
@@ -2738,35 +2749,30 @@ public class MRCompiler extends PhyPlanV
 
         public static void replaceWithPOJoinPackage(PhysicalPlan plan, MapReduceOper mr,
                 POPackage pack, POForEach forEach, String chunkSize) throws VisitorException {
-            String scope = pack.getOperatorKey().scope;
-            NodeIdGenerator nig = NodeIdGenerator.getGenerator();
-            POJoinPackage joinPackage;
-            joinPackage = new POJoinPackage(
-                        new OperatorKey(scope, nig.getNextNodeId(scope)), 
-                        -1, pack, forEach);
-            joinPackage.setChunkSize(Long.parseLong(chunkSize));
+            JoinPackager pkgr = new JoinPackager(pack.getPkgr(), forEach);
+            pkgr.setChunkSize(Long.parseLong(chunkSize));
+            pack.setPkgr(pkgr);
             List<PhysicalOperator> succs = plan.getSuccessors(forEach);
-            if (succs!=null)
-            {
-                if (succs.size()!=1)
-                {
+            if (succs != null) {
+                if (succs.size() != 1) {
                     int errCode = 2028;
-                    String msg = "ForEach can only have one successor. Found " + succs.size() + " successors.";
-                    throw new MRCompilerException(msg, errCode, PigException.BUG);
+                    String msg = "ForEach can only have one successor. Found "
+                            + succs.size() + " successors.";
+                    throw new MRCompilerException(msg, errCode,
+                            PigException.BUG);
                 }
             }
             plan.remove(pack);
-            
             try {
-                plan.replace(forEach, joinPackage);
+                plan.replace(forEach, pack);
             } catch (PlanException e) {
                 int errCode = 2029;
-                String msg = "Error rewriting POJoinPackage.";
+                String msg = "Error rewriting join package.";
                 throw new MRCompilerException(msg, errCode, PigException.BUG, e);
             }
-            mr.phyToMRMap.put(forEach, joinPackage);
-            LogFactory.
-            getLog(LastInputStreamingOptimizer.class).info("Rewrite: POPackage->POForEach to POJoinPackage");
+            mr.phyToMRMap.put(forEach, pack);
+            LogFactory.getLog(LastInputStreamingOptimizer.class).info(
+                    "Rewrite: POPackage->POForEach to POPackage(JoinPackager)");
         }
 
     }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRUtil.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRUtil.java?rev=1585283&r1=1585282&r2=1585283&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRUtil.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRUtil.java Sun Apr  6 10:54:13 2014
@@ -65,10 +65,10 @@ public class MRUtil {
         mro.mapPlan.addAsLeaf(lr);
         
         POPackage pkg = new POPackage(new OperatorKey(scope,nig.getNextNodeId(scope)));
-        pkg.setKeyType(DataType.TUPLE);
+        pkg.getPkgr().setKeyType(DataType.TUPLE);
         pkg.setNumInps(1);
         boolean[] inner = {false};
-        pkg.setInner(inner);
+        pkg.getPkgr().setInner(inner);
         mro.reducePlan.add(pkg);
         
         mro.reducePlan.addAsLeaf(getPlainForEachOP(scope, nig));

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=1585283&r1=1585282&r2=1585283&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Sun Apr  6 10:54:13 2014
@@ -60,7 +60,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.POPackageAnnotator;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.XMLMRPrinter;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.JoinPackager;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.impl.PigContext;
@@ -637,8 +637,8 @@ public class MapReduceLauncher extends L
         comp.getMessageCollector().logMessages(MessageType.Warning, aggregateWarning, log);
 
         String lastInputChunkSize =
-                pc.getProperties().getProperty(
-                        "last.input.chunksize", POJoinPackage.DEFAULT_CHUNK_SIZE);
+            pc.getProperties().getProperty(
+                "last.input.chunksize", JoinPackager.DEFAULT_CHUNK_SIZE);
 
         String prop = pc.getProperties().getProperty(PigConfiguration.PROP_NO_COMBINER);
         if (!pc.inIllustrator && !("true".equals(prop)))  {

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java?rev=1585283&r1=1585282&r2=1585283&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java Sun Apr  6 10:54:13 2014
@@ -18,10 +18,10 @@
 package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
 
 import java.util.ArrayList;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.Iterator;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -31,14 +31,15 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.MultiQueryPackager;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODemux;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMultiQueryPackage;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.io.PigNullableWritable;
 import org.apache.pig.impl.plan.NodeIdGenerator;
@@ -686,28 +687,33 @@ class MultiQueryOptimizer extends MROpPl
             PhysicalPlan to, int initial, int current, byte mapKeyType) throws VisitorException {                    
         POPackage pk = (POPackage)from.getRoots().get(0);
         from.remove(pk);
+        Packager fromPkgr = pk.getPkgr();
  
-        if(!(pk instanceof POMultiQueryPackage)){
+        if (!(fromPkgr instanceof MultiQueryPackager)) {
             // XXX the index of the original keyInfo map is always 0,
             // we need to shift the index so that the lookups works
             // with the new indexed key
-            addShiftedKeyInfoIndex(initial, pk); 
+            addShiftedKeyInfoIndex(initial, fromPkgr);
         }
          
         int total = current - initial;
         
-        POMultiQueryPackage pkg = (POMultiQueryPackage)to.getRoots().get(0);        
+        MultiQueryPackager toPkgr = (MultiQueryPackager) ((POPackage) to
+                .getRoots().get(0)).getPkgr();
         int pkCount = 0;
-        if (pk instanceof POMultiQueryPackage) {
-            List<POPackage> pkgs = ((POMultiQueryPackage)pk).getPackages();
-            for (POPackage p : pkgs) {
-                pkg.addPackage(p);
+        if (fromPkgr instanceof MultiQueryPackager) {
+            List<Packager> pkgs = ((MultiQueryPackager) fromPkgr)
+                    .getPackagers();
+            for (Packager p : pkgs) {
+                ((MultiQueryPackager) toPkgr).addPackager(p);
                 pkCount++;
             }
-            pkg.addIsKeyWrappedList(((POMultiQueryPackage)pk).getIsKeyWrappedList());
-            addShiftedKeyInfoIndex(initial, current, (POMultiQueryPackage)pk);
+            toPkgr.addIsKeyWrappedList(((MultiQueryPackager) fromPkgr)
+                            .getIsKeyWrappedList());
+            addShiftedKeyInfoIndex(initial, current,
+                    (MultiQueryPackager) fromPkgr);
         } else {
-            pkg.addPackage(pk, mapKeyType);
+            toPkgr.addPackager(fromPkgr, mapKeyType);
             pkCount = 1;
         }
         
@@ -740,14 +746,15 @@ class MultiQueryOptimizer extends MROpPl
             throw new OptimizerException(msg, errCode, PigException.BUG);
         }
 
-        if (pkg.isSameMapKeyType()) {
-            pkg.setKeyType(pk.getKeyType());
+        if (toPkgr.isSameMapKeyType()) {
+            toPkgr.setKeyType(fromPkgr.getKeyType());
         } else {
-            pkg.setKeyType(DataType.TUPLE);
+            toPkgr.setKeyType(DataType.TUPLE);
         }            
     }
     
-    private void addShiftedKeyInfoIndex(int index, POPackage pkg) throws OptimizerException {
+    private void addShiftedKeyInfoIndex(int index, Packager pkg)
+            throws OptimizerException {
         /**
          * we only do multi query optimization for single input MROpers
          * Hence originally the keyInfo would have had only index 0. As
@@ -761,7 +768,8 @@ class MultiQueryOptimizer extends MROpPl
          * addition should be the same as the "value" in the existing Entry. After
          * addition, we should remove the older entry
          */
-        Map<Integer, Pair<Boolean, Map<Integer, Integer>>> keyInfo = pkg.getKeyInfo();
+        Map<Integer, Pair<Boolean, Map<Integer, Integer>>> keyInfo = pkg
+                .getKeyInfo();
         byte newIndex = (byte)(index | PigNullableWritable.mqFlag);
         
         Set<Integer> existingIndices = keyInfo.keySet();
@@ -792,9 +800,9 @@ class MultiQueryOptimizer extends MROpPl
      * @throws OptimizerException 
      */
     private int addShiftedKeyInfoIndex(int initialIndex, int onePastEndIndex,
-            POMultiQueryPackage mpkg) throws OptimizerException {
+            MultiQueryPackager mpkgr) throws OptimizerException {
         
-        List<POPackage> pkgs = mpkg.getPackages();
+        List<Packager> pkgs = mpkgr.getPackagers();
         // if we have lesser pkgs than (onePastEndIndex - initialIndex)
         // its because one or more of the pkgs is a POMultiQueryPackage which
         // internally has packages.
@@ -810,7 +818,7 @@ class MultiQueryOptimizer extends MROpPl
         int i = 0;
         int curIndex = initialIndex;
         while (i < end) {
-            POPackage pkg = pkgs.get(i);
+            Packager pkg = pkgs.get(i);
             addShiftedKeyInfoIndex(curIndex, pkg);
             curIndex++;
             i++;
@@ -823,12 +831,14 @@ class MultiQueryOptimizer extends MROpPl
             PhysicalPlan to, int initial, int current, byte mapKeyType) throws VisitorException {
         POPackage cpk = (POPackage)from.getRoots().get(0);
         from.remove(cpk);
+        Packager cpkgr = cpk.getPkgr();
         
         PODemux demux = (PODemux)to.getLeaves().get(0);
                 
-        POMultiQueryPackage pkg = (POMultiQueryPackage)to.getRoots().get(0);
+        MultiQueryPackager toPkgr = (MultiQueryPackager) ((POPackage) to
+                .getRoots().get(0)).getPkgr();
         
-        boolean isSameKeyType = pkg.isSameMapKeyType();
+        boolean isSameKeyType = toPkgr.isSameMapKeyType();
         
         // if current > initial + 1, it means we had
         // a split in the map of the MROper we are trying to
@@ -844,21 +854,21 @@ class MultiQueryOptimizer extends MROpPl
         // POLocalRearranges.
         int total = current - initial;
         int pkCount = 0;
-        if (cpk instanceof POMultiQueryPackage) {
-            List<POPackage> pkgs = ((POMultiQueryPackage)cpk).getPackages();
-            for (POPackage p : pkgs) {
-                pkg.addPackage(p);
+        if (cpkgr instanceof MultiQueryPackager) {
+            List<Packager> pkgrs = ((MultiQueryPackager) cpkgr).getPackagers();
+            for (Packager p : pkgrs) {
+                toPkgr.addPackager(p);
                 if (!isSameKeyType) {
                     p.setKeyType(DataType.TUPLE);
                 }
                 pkCount++;
             }
         } else {
-            pkg.addPackage(cpk);
+            toPkgr.addPackager(cpkgr);
             pkCount = 1;
         }
 
-        pkg.setSameMapKeyType(isSameKeyType);
+        toPkgr.setSameMapKeyType(isSameKeyType);
         
         if (pkCount != total) {
             int errCode = 2146;
@@ -868,10 +878,10 @@ class MultiQueryOptimizer extends MROpPl
 
         // all packages should have the same key type
         if (!isSameKeyType) {
-            cpk.setKeyType(DataType.TUPLE);          
+            cpk.getPkgr().setKeyType(DataType.TUPLE);
         } 
         
-        pkg.setKeyType(cpk.getKeyType());
+        toPkgr.setKeyType(cpk.getPkgr().getKeyType());
         
         // See comment above for why we flatten the Packages
         // in the from plan - for the same reason, we flatten
@@ -936,7 +946,7 @@ class MultiQueryOptimizer extends MROpPl
     private PhysicalPlan createDemuxPlan(boolean sameKeyType, boolean isCombiner) 
         throws VisitorException {
         PODemux demux = getDemux(isCombiner);
-        POMultiQueryPackage pkg= getMultiQueryPackage(sameKeyType, isCombiner);
+        POPackage pkg = getMultiQueryPackage(sameKeyType, isCombiner);
         
         PhysicalPlan pl = new PhysicalPlan();
         pl.add(pkg);
@@ -1186,11 +1196,14 @@ class MultiQueryOptimizer extends MROpPl
         return demux;
     } 
     
-    private POMultiQueryPackage getMultiQueryPackage(boolean sameMapKeyType, boolean inCombiner){
-        POMultiQueryPackage pkg =  
-            new POMultiQueryPackage(new OperatorKey(scope, nig.getNextNodeId(scope)));
-        pkg.setInCombiner(inCombiner);
-        pkg.setSameMapKeyType(sameMapKeyType);
+    private POPackage getMultiQueryPackage(boolean sameMapKeyType,
+            boolean inCombiner) {
+        POPackage pkg = new POPackage(new OperatorKey(scope,
+                nig.getNextNodeId(scope)));
+        MultiQueryPackager pkgr = new MultiQueryPackager();
+        pkgr.setInCombiner(inCombiner);
+        pkgr.setSameMapKeyType(sameMapKeyType);
+        pkg.setPkgr(pkgr);
         return pkg;
     }   
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java?rev=1585283&r1=1585282&r2=1585283&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java Sun Apr  6 10:54:13 2014
@@ -20,10 +20,56 @@ package org.apache.pig.backend.hadoop.ex
 import java.util.List;
 
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.*;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.*;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.Add;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.Divide;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.EqualToExpr;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.GTOrEqualToExpr;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.GreaterThanExpr;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.LTOrEqualToExpr;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.LessThanExpr;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.Mod;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.Multiply;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.NotEqualToExpr;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POAnd;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POBinCond;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POCast;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POIsNull;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POMapLookUp;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.PONegative;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.PONot;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POOr;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.PORegexp;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserComparisonFunc;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.Subtract;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODemux;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PONative;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POOptimizedForEach;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartialAgg;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPreCombinerLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
 import org.apache.pig.impl.plan.DependencyOrderWalker;
 import org.apache.pig.impl.plan.VisitorException;
 
@@ -84,11 +130,6 @@ public class PhyPlanSetter extends PhyPl
     }
 
     @Override
-    public void visitCombinerPackage(POCombinerPackage pkg) throws VisitorException{
-        pkg.setParentPlan(parent);
-    }
-
-    @Override
     public void visitPOForEach(POForEach nfe) throws VisitorException {
         super.visitPOForEach(nfe);
         nfe.setParentPlan(parent);
@@ -251,11 +292,6 @@ public class PhyPlanSetter extends PhyPl
     }
 
     @Override
-    public void visitJoinPackage(POJoinPackage joinPackage) throws VisitorException{
-        joinPackage.setParentPlan(parent);
-    }
-
-    @Override
     public void visitCast(POCast cast) {
         cast.setParentPlan(parent);
     }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java?rev=1585283&r1=1585282&r2=1585283&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigCombiner.java Sun Apr  6 10:54:13 2014
@@ -35,7 +35,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.JoinPackager;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
@@ -153,7 +153,7 @@ public class PigCombiner {
             // tuples out of the getnext() call of POJoinPackage
             // In this case, we process till we see EOP from
             // POJoinPacakage.getNext()
-            if (pack instanceof POJoinPackage)
+            if (pack.getPkgr() instanceof JoinPackager)
             {
                 pack.attachInput(key, tupIter.iterator());
                 while (true)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java?rev=1585283&r1=1585282&r2=1585283&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java Sun Apr  6 10:54:13 2014
@@ -23,8 +23,6 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
-import org.joda.time.DateTimeZone;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -40,7 +38,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.JoinPackager;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
@@ -60,6 +58,7 @@ import org.apache.pig.impl.util.Pair;
 import org.apache.pig.impl.util.SpillableMemoryManager;
 import org.apache.pig.impl.util.UDFContext;
 import org.apache.pig.tools.pigstats.PigStatusReporter;
+import org.joda.time.DateTimeZone;
 
 /**
  * This class is the static Mapper &amp; Reducer classes that
@@ -396,7 +395,7 @@ public class PigGenericMapReduce {
             // tuples out of the getnext() call of POJoinPackage
             // In this case, we process till we see EOP from 
             // POJoinPacakage.getNext()
-            if (pack instanceof POJoinPackage)
+            if (pack.getPkgr() instanceof JoinPackager)
             {
                 pack.attachInput(key, tupIter.iterator());
                 while (true)
@@ -583,7 +582,7 @@ public class PigGenericMapReduce {
         @Override
         protected void setup(Context context) throws IOException, InterruptedException {
             super.setup(context);
-            keyType = pack.getKeyType();
+            keyType = pack.getPkgr().getKeyType();
         }
 
         /**

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizer.java?rev=1585283&r1=1585282&r2=1585283&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizer.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizer.java Sun Apr  6 10:54:13 2014
@@ -30,10 +30,10 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.PORelationToExprProject;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.JoinPackager;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
@@ -221,7 +221,8 @@ public class SecondaryKeyOptimizer exten
         PhysicalOperator currentNode = root;
         POForEach foreach = null;
         while (currentNode != null) {
-            if (currentNode instanceof POPackage && !(currentNode instanceof POJoinPackage)
+            if (currentNode instanceof POPackage
+                    && !(((POPackage) currentNode).getPkgr() instanceof JoinPackager)
                     || currentNode instanceof POFilter
                     || currentNode instanceof POLimit) {
                 List<PhysicalOperator> succs = mr.reducePlan
@@ -372,7 +373,7 @@ public class SecondaryKeyOptimizer exten
                 }
             }
             POPackage pack = (POPackage) root;
-            pack.setUseSecondaryKey(true);
+            pack.getPkgr().setUseSecondaryKey(true);
         }
     }
 

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java?rev=1585283&r1=1585282&r2=1585283&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java Sun Apr  6 10:54:13 2014
@@ -27,11 +27,10 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.CombinerPackager;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.LitePackager;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackageLite;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCombinerPackage;
 import org.apache.pig.impl.plan.DepthFirstWalker;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.plan.optimizer.OptimizerException;
@@ -78,7 +77,7 @@ public class POPackageAnnotator extends 
             if(pkg != null) {
                 // if the POPackage is actually a POPostCombinerPackage, then we should
                 // just look for the corresponding LocalRearrange(s) in the combine plan
-                if(pkg instanceof POCombinerPackage) {
+                if (pkg.getPkgr() instanceof CombinerPackager) {
                     if(patchPackage(mr.combinePlan, pkg) != pkg.getNumInps()) {
                         int errCode = 2085;
                         String msg = "Unexpected problem during optimization." +
@@ -148,24 +147,6 @@ public class POPackageAnnotator extends 
         public void visitPackage(POPackage pkg) throws VisitorException {
             this.pkg = pkg;
         };
-        
-        /* (non-Javadoc)
-         * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor#visitJoinPackage(org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage)
-         */
-        @Override
-        public void visitJoinPackage(POJoinPackage joinPackage)
-                throws VisitorException {
-            this.pkg = joinPackage;
-        }
-        
-        /* (non-Javadoc)
-         * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor#visitCombinerPackage(org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPostCombinerPackage)
-         */
-        @Override
-        public void visitCombinerPackage(POCombinerPackage pkg)
-                throws VisitorException {
-            this.pkg = pkg;
-        }
 
         /**
          * @return the pkg
@@ -201,7 +182,7 @@ public class POPackageAnnotator extends 
             loRearrangeFound++;
             Map<Integer,Pair<Boolean, Map<Integer, Integer>>> keyInfo;
 
-            if (pkg instanceof POPackageLite) {
+            if (pkg.getPkgr() instanceof LitePackager) {
                 if(lrearrange.getIndex() != 0) {
                     // Throw some exception here
                     throw new RuntimeException("POLocalRearrange for POPackageLite cannot have index other than 0, but has index - "+lrearrange.getIndex());
@@ -210,7 +191,7 @@ public class POPackageAnnotator extends 
 
             // annotate the package with information from the LORearrange
             // update the keyInfo information if already present in the POPackage
-            keyInfo = pkg.getKeyInfo();
+            keyInfo = pkg.getPkgr().getKeyInfo();
             if(keyInfo == null)
                 keyInfo = new HashMap<Integer, Pair<Boolean, Map<Integer, Integer>>>();
             
@@ -227,9 +208,9 @@ public class POPackageAnnotator extends 
             keyInfo.put(Integer.valueOf(lrearrange.getIndex()), 
                 new Pair<Boolean, Map<Integer, Integer>>(
                         lrearrange.isProjectStar(), lrearrange.getProjectedColsMap()));
-            pkg.setKeyInfo(keyInfo);
-            pkg.setKeyTuple(lrearrange.isKeyTuple());
-            pkg.setKeyCompound(lrearrange.isKeyCompound());
+            pkg.getPkgr().setKeyInfo(keyInfo);
+            pkg.getPkgr().setKeyTuple(lrearrange.isKeyTuple());
+            pkg.getPkgr().setKeyCompound(lrearrange.isKeyCompound());
         }
 
         /**

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java?rev=1585283&r1=1585282&r2=1585283&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java Sun Apr  6 10:54:13 2014
@@ -20,8 +20,57 @@ package org.apache.pig.backend.hadoop.ex
 import java.util.List;
 
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.*;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.*;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.Add;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.Divide;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.EqualToExpr;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.GTOrEqualToExpr;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.GreaterThanExpr;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.LTOrEqualToExpr;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.LessThanExpr;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.Mod;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.Multiply;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.NotEqualToExpr;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POAnd;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POBinCond;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POCast;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POIsNull;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POMapLookUp;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.PONegative;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.PONot;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POOr;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.PORegexp;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserComparisonFunc;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.Subtract;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCross;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODemux;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PONative;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POOptimizedForEach;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartialAgg;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartitionRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPreCombinerLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
 import org.apache.pig.impl.plan.PlanVisitor;
 import org.apache.pig.impl.plan.PlanWalker;
 import org.apache.pig.impl.plan.VisitorException;
@@ -89,14 +138,6 @@ public class PhyPlanVisitor extends Plan
         //do nothing
     }
     
-    public void visitCombinerPackage(POCombinerPackage pkg) throws VisitorException{
-        //do nothing
-    }
- 
-    public void visitMultiQueryPackage(POMultiQueryPackage pkg) throws VisitorException{
-        //do nothing
-    }
-    
     public void visitPOForEach(POForEach nfe) throws VisitorException {
         List<PhysicalPlan> inpPlans = nfe.getInputPlans();
         for (PhysicalPlan plan : inpPlans) {
@@ -242,10 +283,6 @@ public class PhyPlanVisitor extends Plan
         // TODO Auto-generated method stub
         
     }
-    
-    public void visitJoinPackage(POJoinPackage joinPackage) throws VisitorException{
-        //do nothing
-    }
 
     public void visitCast(POCast cast) {
         // TODO Auto-generated method stub

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java?rev=1585283&r1=1585282&r2=1585283&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java Sun Apr  6 10:54:13 2014
@@ -30,7 +30,25 @@ import java.util.Set;
 
 import org.apache.pig.PigException;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.*;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.MultiQueryPackager;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODemux;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartialAgg;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager;
 import org.apache.pig.impl.plan.DepthFirstWalker;
 import org.apache.pig.impl.plan.Operator;
 import org.apache.pig.impl.plan.OperatorPlan;
@@ -175,11 +193,14 @@ public class PlanPrinter<O extends Opera
           else if(node instanceof POForEach){
             sb.append(planString(((POForEach)node).getInputPlans()));
           }
-          else if (node instanceof POMultiQueryPackage) {
-              List<POPackage> pkgs = ((POMultiQueryPackage)node).getPackages();
-              for (POPackage pkg : pkgs) {
-                  sb.append(LSep + pkg.name() + "\n");
+          else if(node instanceof POPackage){
+            Packager pkgr = ((POPackage) node).getPkgr();
+            if(pkgr instanceof MultiQueryPackager){
+              List<Packager> pkgrs = ((MultiQueryPackager) pkgr).getPackagers();
+              for (Packager child : pkgrs){
+                  sb.append(LSep + child.name() + "\n");
               }
+            }
           }
           else if(node instanceof POFRJoin){
             POFRJoin frj = (POFRJoin)node;

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/XMLPhysicalPlanPrinter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/XMLPhysicalPlanPrinter.java?rev=1585283&r1=1585282&r2=1585283&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/XMLPhysicalPlanPrinter.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/XMLPhysicalPlanPrinter.java Sun Apr  6 10:54:13 2014
@@ -33,6 +33,7 @@ import javax.xml.transform.stream.Stream
 
 import org.apache.pig.PigException;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.MultiQueryPackager;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODemux;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
@@ -40,26 +41,27 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMultiQueryPackage;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager;
 import org.apache.pig.impl.plan.DepthFirstWalker;
 import org.apache.pig.impl.plan.OperatorPlan;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.MultiMap;
 import org.w3c.dom.Document;
 import org.w3c.dom.Element;
+import org.w3c.dom.Node;
 
 
 public class XMLPhysicalPlanPrinter<P extends OperatorPlan<PhysicalOperator>> extends
-        PhyPlanVisitor {
-    
+PhyPlanVisitor {
+
     private Document doc = null;
     private Element parent = null;
-    
+
     public XMLPhysicalPlanPrinter(PhysicalPlan plan, Document doc, Element parent) {
         super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
         this.doc = doc;
@@ -84,7 +86,7 @@ public class XMLPhysicalPlanPrinter<P ex
             transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "yes");
             transformer.setOutputProperty(OutputKeys.INDENT, "yes");
             transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "2");
-            
+
             StringWriter sw = new StringWriter();
             StreamResult result = new StreamResult(sw);
             DOMSource source = new DOMSource(doc);
@@ -94,7 +96,7 @@ public class XMLPhysicalPlanPrinter<P ex
             e.printStackTrace();
         }
     }
-    
+
     private Element createAlias(PhysicalOperator po) {
         Element aliasNode = null;
         String alias = po.getAlias();
@@ -112,8 +114,8 @@ public class XMLPhysicalPlanPrinter<P ex
             depthFirst(leaf, parentNode);
         }
     }
-    
-    
+
+
     private void visitPlan(PhysicalPlan pp, Element parentNode) throws VisitorException {
         if(pp!=null) {
             XMLPhysicalPlanPrinter<PhysicalPlan> ppp =
@@ -121,15 +123,15 @@ public class XMLPhysicalPlanPrinter<P ex
             ppp.visit();
         }
     }
-    
-    
+
+
     private void visitPlan(List<PhysicalPlan> lep, Element parentNode) throws VisitorException {
         if(lep!=null)
             for (PhysicalPlan ep : lep) {
                 visitPlan(ep, parentNode);
             }
     }
-    
+
     private Element createPONode(PhysicalOperator node) {
         Element PONode = doc.createElement(node.getClass().getSimpleName());
         PONode.setAttribute("scope", "" + node.getOperatorKey().id);
@@ -150,18 +152,18 @@ public class XMLPhysicalPlanPrinter<P ex
             Element loadFile = doc.createElement("loadFile");
             loadFile.setTextContent(((POLoad)node).getLFile().getFileName());
             PONode.appendChild(loadFile);
-            
+
             Element isTmpLoad = doc.createElement("isTmpLoad");
             isTmpLoad.setTextContent(Boolean.valueOf(((POLoad)node).isTmpLoad()).toString());
             PONode.appendChild(isTmpLoad);
         }
         return PONode;
     }
-    
+
 
     private void depthFirst(PhysicalOperator node, Element parentNode) throws VisitorException {
         Element childNode = null;
-        
+
         List<PhysicalPlan> subPlans = new ArrayList<PhysicalPlan>();
         if(node instanceof POFilter){
             subPlans.add(((POFilter) node).getPlan());
@@ -177,12 +179,11 @@ public class XMLPhysicalPlanPrinter<P ex
             subPlans = ((POSplit)node).getPlans();
         } else if (node instanceof PODemux) {
             subPlans = ((PODemux)node).getPlans();
-        } else if (node instanceof POMultiQueryPackage) {
-            childNode = createPONode(node);   
-            List<POPackage> pkgs = ((POMultiQueryPackage)node).getPackages();
-            for (POPackage pkg : pkgs) {
-                childNode.appendChild(createPONode(pkg));
-            }
+        } else if(node instanceof POPackage){
+            childNode = createPONode(node);
+            Packager pkgr = ((POPackage) node).getPkgr();
+            Node pkgrNode = createPackagerNode(pkgr);
+            childNode.appendChild(pkgrNode);
         } else if(node instanceof POFRJoin){
             childNode = createPONode(node);
             POFRJoin frj = (POFRJoin)node;
@@ -198,11 +199,11 @@ public class XMLPhysicalPlanPrinter<P ex
             MultiMap<PhysicalOperator, PhysicalPlan> joinPlans = skewed.getJoinPlans();
             if(joinPlans!=null) {
                 List<PhysicalPlan> inner_plans = new ArrayList<PhysicalPlan>();
-            	inner_plans.addAll(joinPlans.values());   
-            	visitPlan(inner_plans, childNode);
+                inner_plans.addAll(joinPlans.values());
+                visitPlan(inner_plans, childNode);
             }
         }
-        
+
         if (childNode == null) {
             childNode = createPONode(node);
             if (subPlans.size() > 0) {
@@ -210,17 +211,29 @@ public class XMLPhysicalPlanPrinter<P ex
             }
         }
         parentNode.appendChild(childNode);
-        
+
         List<PhysicalOperator> originalPredecessors = mPlan.getPredecessors(node);
         if (originalPredecessors == null) {
             return;
         }
-        
+
         List<PhysicalOperator> predecessors =  new ArrayList<PhysicalOperator>(originalPredecessors);
-        
+
         Collections.sort(predecessors);
         for (PhysicalOperator pred : predecessors) {
             depthFirst(pred, childNode);
         }
     }
+
+    private Node createPackagerNode(Packager pkgr) {
+        Element pkgrNode = doc.createElement(pkgr.getClass().getSimpleName());
+        if (pkgr instanceof MultiQueryPackager) {
+            List<Packager> pkgrs = ((MultiQueryPackager) pkgr)
+                    .getPackagers();
+            for (Packager child : pkgrs) {
+                pkgrNode.appendChild(createPackagerNode(child));
+            }
+        }
+        return pkgrNode;
+    }
 }
\ No newline at end of file

Added: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java?rev=1585283&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java (added)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java Sun Apr  6 10:54:13 2014
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
+
+import java.util.Arrays;
+import java.util.Map;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.InternalCachedBag;
+import org.apache.pig.data.NonSpillableDataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.pig.impl.util.Pair;
+/**
+ * The package operator that packages the globally rearranged tuples into
+ * output format after the combiner stage.  It differs from POPackage in that
+ * it does not use the index in the NullableTuple to find the bag to put a
+ * tuple in.  Instead, the inputs are put in a bag corresponding to their
+ * offset in the tuple.
+ */
+public class CombinerPackager extends Packager {
+    private boolean[] mBags; // For each field, indicates whether or not it
+    // needs to be put in a bag.
+
+    private Map<Integer, Integer> keyLookup;
+
+    private int numBags;
+
+    /**
+     * A new POPostCombinePackage will be constructed as a near clone of the
+     * provided POPackage.
+     * @param pkg POPackage to clone.
+     * @param bags for each field, indicates whether it should be a bag (true)
+     * or a simple field (false).
+     */
+    public CombinerPackager(Packager pkg, boolean[] bags) {
+        super();
+        keyType = pkg.keyType;
+        numInputs = 1;
+        inner = new boolean[1];
+        for (int i = 0; i < pkg.inner.length; i++) {
+            inner[i] = true;
+        }
+        if (bags != null) {
+            mBags = Arrays.copyOf(bags, bags.length);
+        }
+        numBags = 0;
+        for (int i = 0; i < mBags.length; i++) {
+            if (mBags[i]) numBags++;
+        }
+    }
+
+    /**
+     * @param keyInfo the keyInfo to set
+     */
+    @Override
+    public void setKeyInfo(Map<Integer, Pair<Boolean, Map<Integer, Integer>>> keyInfo) {
+        this.keyInfo = keyInfo;
+        // TODO: IMPORTANT ASSUMPTION: Currently we only combine in the
+        // group case and not in cogroups. So there should only
+        // be one LocalRearrange from which we get the keyInfo for
+        // which field in the value is in the key. This LocalRearrange
+        // has an index of 0. When we do support combiner in Cogroups
+        // THIS WILL NEED TO BE REVISITED.
+        Pair<Boolean, Map<Integer, Integer>> lrKeyInfo =
+                keyInfo.get(0); // assumption: only group are "combinable", hence index 0
+        keyLookup = lrKeyInfo.second;
+    }
+
+    private DataBag createDataBag(int numBags) {
+        String bagType = null;
+        if (PigMapReduce.sJobConfInternal.get() != null) {
+            bagType = PigMapReduce.sJobConfInternal.get().get("pig.cachedbag.type");
+        }
+
+        if (bagType != null && bagType.equalsIgnoreCase("default")) {
+            return new NonSpillableDataBag();
+        }
+        return new InternalCachedBag(numBags);
+    }
+
+    @Override
+    public Result getNext() throws ExecException {
+        if (bags == null) {
+            return new Result(POStatus.STATUS_EOP, null);
+        }
+
+        //Create numInputs bags
+        Object[] fields = new Object[mBags.length];
+        for (int i = 0; i < mBags.length; i++) {
+            if (mBags[i]) fields[i] = createDataBag(numBags);
+        }
+
+        // For each indexed tup in the inp, split them up and place their
+        // fields into the proper bags.  If the given field isn't a bag, just
+        // set the value as is.
+        for (Tuple tup : bags[0]) {
+            int tupIndex = 0; // an index for accessing elements from
+            // the value (tup) that we have currently
+            for(int i = 0; i < mBags.length; i++) {
+                Integer keyIndex = keyLookup.get(i);
+                if(keyIndex == null && mBags[i]) {
+                    // the field for this index is not the
+                    // key - so just take it from the "value"
+                    // we were handed - Currently THIS HAS TO BE A BAG
+                    // In future if this changes, THIS WILL NEED TO BE
+                    // REVISITED.
+                    ((DataBag)fields[i]).add((Tuple)tup.get(tupIndex));
+                    tupIndex++;
+                } else {
+                    // the field for this index is in the key
+                    fields[i] = key;
+                }
+            }
+        }
+
+        detachInput();
+
+        // The successor of the POPackage(Combiner) as of
+        // now SHOULD be a POForeach which has been adjusted
+        // to look for its inputs by projecting from the corresponding
+        // positions in the POPackage(Combiner) output.
+        // So we will NOT be adding the key in the result here but merely
+        // putting all bags into a result tuple and returning it.
+        Tuple res;
+        res = mTupleFactory.newTuple(mBags.length);
+        for (int i = 0; i < mBags.length; i++) res.set(i, fields[i]);
+        Result r = new Result();
+        r.result = res;
+        r.returnStatus = POStatus.STATUS_OK;
+        return r;
+
+    }
+
+    @Override
+    public Tuple getValueTuple(PigNullableWritable keyWritable,
+            NullableTuple ntup, int index) throws ExecException {
+        return (Tuple) ntup.getValueAsPigType();
+    }
+
+}