You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2013/11/28 09:56:34 UTC

svn commit: r1546314 [4/6] - in /pig/branches/tez: src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer...

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java?rev=1546314&r1=1546313&r2=1546314&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigGenericMapReduce.java Thu Nov 28 08:56:33 2013
@@ -23,8 +23,6 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
-import org.joda.time.DateTimeZone;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -40,7 +38,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.JoinPackager;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
@@ -60,24 +58,25 @@ import org.apache.pig.impl.util.Pair;
 import org.apache.pig.impl.util.SpillableMemoryManager;
 import org.apache.pig.impl.util.UDFContext;
 import org.apache.pig.tools.pigstats.PigStatusReporter;
+import org.joda.time.DateTimeZone;
 
 /**
  * This class is the static Mapper & Reducer classes that
  * are used by Pig to execute Pig Map Reduce jobs. Since
- * there is a reduce phase, the leaf is bound to be a 
+ * there is a reduce phase, the leaf is bound to be a
  * POLocalRearrange. So the map phase has to separate the
  * key and tuple and collect it into the output
  * collector.
- * 
+ *
  * The shuffle and sort phase sorts these keys & tuples
  * and creates key, List<Tuple> and passes the key and
  * iterator to the list. The deserialized POPackage operator
- * is used to package the key, List<Tuple> into pigKey, 
+ * is used to package the key, List<Tuple> into pigKey,
  * Bag<Tuple> where pigKey is of the appropriate pig type and
  * then the result of the package is attached to the reduce
- * plan which is executed if its not empty. Either the result 
+ * plan which is executed if its not empty. Either the result
  * of the reduce plan or the package res is collected into
- * the output collector. 
+ * the output collector.
  *
  * The index of the tuple (that is, which bag it should be placed in by the
  * package) is packed into the key.  This is done so that hadoop sorts the
@@ -90,28 +89,28 @@ import org.apache.pig.tools.pigstats.Pig
 public class PigGenericMapReduce {
 
     public static JobContext sJobContext = null;
-    
+
     /**
-     * @deprecated Use {@link UDFContext} instead in the following way to get 
+     * @deprecated Use {@link UDFContext} instead in the following way to get
      * the job's {@link Configuration}:
      * <pre>UdfContext.getUdfContext().getJobConf()</pre>
      */
     @Deprecated
     public static Configuration sJobConf = null;
-    
+
     public static final ThreadLocal<Configuration> sJobConfInternal = new ThreadLocal<Configuration>();
-    
+
     public static class Map extends PigMapBase {
 
         @Override
-        public void collect(Context oc, Tuple tuple) 
+        public void collect(Context oc, Tuple tuple)
                 throws InterruptedException, IOException {
-            
+
             Byte index = (Byte)tuple.get(0);
             PigNullableWritable key =
                 HDataType.getWritableComparableTypes(tuple.get(1), keyType);
             NullableTuple val = new NullableTuple((Tuple)tuple.get(2));
-            
+
             // Both the key and the value need the index.  The key needs it so
             // that it can be sorted on the index in addition to the key
             // value.  The value needs it so that POPackage can properly
@@ -122,7 +121,7 @@ public class PigGenericMapReduce {
             oc.write(key, val);
         }
     }
-    
+
     /**
      * This "specialized" map class is ONLY to be used in pig queries with
      * order by a udf. A UDF used for comparison in the order by expects
@@ -132,9 +131,9 @@ public class PigGenericMapReduce {
     public static class MapWithComparator extends PigMapBase {
 
         @Override
-        public void collect(Context oc, Tuple tuple) 
+        public void collect(Context oc, Tuple tuple)
                 throws InterruptedException, IOException {
-            
+
             Object keyTuple = null;
             if(keyType != DataType.TUPLE) {
                 Object k = tuple.get(1);
@@ -142,13 +141,13 @@ public class PigGenericMapReduce {
             } else {
                 keyTuple = tuple.get(1);
             }
-            
+
 
             Byte index = (Byte)tuple.get(0);
             PigNullableWritable key =
                 HDataType.getWritableComparableTypes(keyTuple, DataType.TUPLE);
             NullableTuple val = new NullableTuple((Tuple)tuple.get(2));
-            
+
             // Both the key and the value need the index.  The key needs it so
             // that it can be sorted on the index in addition to the key
             // value.  The value needs it so that POPackage can properly
@@ -166,23 +165,23 @@ public class PigGenericMapReduce {
     public static class MapWithPartitionIndex extends Map {
 
         @Override
-        public void collect(Context oc, Tuple tuple) 
+        public void collect(Context oc, Tuple tuple)
                 throws InterruptedException, IOException {
-            
+
             Byte tupleKeyIdx = 2;
             Byte tupleValIdx = 3;
 
             Byte index = (Byte)tuple.get(0);
-			Integer partitionIndex = -1;
-        	// for partitioning table, the partition index isn't present
-			if (tuple.size() == 3) {
-				//super.collect(oc, tuple);
-				//return;
-				tupleKeyIdx--;
-				tupleValIdx--;
-			} else {
-				partitionIndex = (Integer)tuple.get(1);
-			}
+            Integer partitionIndex = -1;
+            // for partitioning table, the partition index isn't present
+            if (tuple.size() == 3) {
+                //super.collect(oc, tuple);
+                //return;
+                tupleKeyIdx--;
+                tupleValIdx--;
+            } else {
+                partitionIndex = (Integer)tuple.get(1);
+            }
 
             PigNullableWritable key =
                 HDataType.getWritableComparableTypes(tuple.get(tupleKeyIdx), keyType);
@@ -190,13 +189,13 @@ public class PigGenericMapReduce {
             NullablePartitionWritable wrappedKey = new NullablePartitionWritable(key);
 
             NullableTuple val = new NullableTuple((Tuple)tuple.get(tupleValIdx));
-            
+
             // Both the key and the value need the index.  The key needs it so
             // that it can be sorted on the index in addition to the key
             // value.  The value needs it so that POPackage can properly
             // assign the tuple to its slot in the projection.
             wrappedKey.setIndex(index);
-            
+
             // set the partition
             wrappedKey.setPartition(partitionIndex);
             val.setIndex(index);
@@ -204,14 +203,14 @@ public class PigGenericMapReduce {
         }
 
         @Override
-        protected void runPipeline(PhysicalOperator leaf) 
+        protected void runPipeline(PhysicalOperator leaf)
                 throws IOException, InterruptedException {
-            
+
             while(true){
                 Result res = leaf.getNextTuple();
-                
+
                 if(res.returnStatus==POStatus.STATUS_OK){
-                    // For POPartitionRearrange, the result is a bag. 
+                    // For POPartitionRearrange, the result is a bag.
                     // This operator is used for skewed join
                     if (res.result instanceof DataBag) {
                         Iterator<Tuple> its = ((DataBag)res.result).iterator();
@@ -223,7 +222,7 @@ public class PigGenericMapReduce {
                     }
                     continue;
                 }
-                
+
                 if(res.returnStatus==POStatus.STATUS_EOP) {
                     return;
                 }
@@ -233,7 +232,7 @@ public class PigGenericMapReduce {
                 }
 
                 if(res.returnStatus==POStatus.STATUS_ERR){
-                    // remember that we had an issue so that in 
+                    // remember that we had an issue so that in
                     // close() we can do the right thing
                     errorInMap  = true;
                     // if there is an errmessage use it
@@ -253,39 +252,39 @@ public class PigGenericMapReduce {
         }
     }
 
-    abstract public static class Reduce 
+    abstract public static class Reduce
             extends Reducer <PigNullableWritable, NullableTuple, PigNullableWritable, Writable> {
-        
+
         protected final Log log = LogFactory.getLog(getClass());
-        
+
         //The reduce plan
         protected PhysicalPlan rp = null;
 
         // Store operators
         protected List<POStore> stores;
-        
+
         //The POPackage operator which is the
         //root of every Map Reduce plan is
         //obtained through the job conf. The portion
         //remaining after its removal is the reduce
         //plan
         protected POPackage pack;
-        
+
         ProgressableReporter pigReporter;
 
         protected Context outputCollector;
 
         protected boolean errorInReduce = false;
-        
+
         PhysicalOperator[] roots;
 
         private PhysicalOperator leaf;
-        
+
         PigContext pigContext = null;
         protected volatile boolean initialized = false;
-        
+
         private boolean inIllustrator = false;
-        
+
         /**
          * Set the reduce plan: to be used by local runner for illustrator
          * @param plan Reduce plan
@@ -313,7 +312,7 @@ public class PigGenericMapReduce {
             try {
                 PigContext.setPackageImportList((ArrayList<String>)ObjectSerializer.deserialize(jConf.get("udf.import.list")));
                 pigContext = (PigContext)ObjectSerializer.deserialize(jConf.get("pig.pigContext"));
-                
+
                 // This attempts to fetch all of the generated code from the distributed cache, and resolve it
                 SchemaTupleBackend.initialize(jConf, pigContext);
 
@@ -337,36 +336,36 @@ public class PigGenericMapReduce {
                     roots = rp.getRoots().toArray(new PhysicalOperator[1]);
                     leaf = rp.getLeaves().get(0);
                 }
-                
+
                 // Get the UDF specific context
-            	MapRedUtil.setupUDFContext(jConf);
-            
+                MapRedUtil.setupUDFContext(jConf);
+
             } catch (IOException ioe) {
                 String msg = "Problem while configuring reduce plan.";
                 throw new RuntimeException(msg, ioe);
             }
             log.info("Aliases being processed per job phase (AliasName[line,offset]): " + jConf.get("pig.alias.location"));
-            
+
             String dtzStr = PigMapReduce.sJobConfInternal.get().get("pig.datetime.default.tz");
             if (dtzStr != null && dtzStr.length() > 0) {
                 // ensure that the internal timezone is uniformly in UTC offset style
                 DateTimeZone.setDefault(DateTimeZone.forOffsetMillis(DateTimeZone.forID(dtzStr).getOffset(null)));
             }
         }
-        
+
         /**
          * The reduce function which packages the key and List&lt;Tuple&gt;
          * into key, Bag&lt;Tuple&gt; after converting Hadoop type key into Pig type.
          * The package result is either collected as is, if the reduce plan is
          * empty or after passing through the reduce plan.
-         */       
+         */
         @Override
-        protected void reduce(PigNullableWritable key, Iterable<NullableTuple> tupIter, Context context) 
-                throws IOException, InterruptedException {            
-            
+        protected void reduce(PigNullableWritable key, Iterable<NullableTuple> tupIter, Context context)
+                throws IOException, InterruptedException {
+
             if (!initialized) {
                 initialized = true;
-                
+
                 // cache the collector for use in runPipeline()
                 // which could additionally be called from close()
                 this.outputCollector = context;
@@ -379,24 +378,24 @@ public class PigGenericMapReduce {
                 pigHadoopLogger.setAggregate(aggregateWarning);
                 PigStatusReporter.setContext(context);
                 pigHadoopLogger.setReporter(PigStatusReporter.getInstance());
-                
+
                 PhysicalOperator.setPigLogger(pigHadoopLogger);
 
                 if (!inIllustrator)
                     for (POStore store: stores) {
-                        MapReducePOStoreImpl impl 
+                        MapReducePOStoreImpl impl
                             = new MapReducePOStoreImpl(context);
                         store.setStoreImpl(impl);
                         store.setUp();
                     }
             }
-          
+
             // In the case we optimize the join, we combine
             // POPackage and POForeach - so we could get many
             // tuples out of the getnext() call of POJoinPackage
-            // In this case, we process till we see EOP from 
+            // In this case, we process till we see EOP from
             // POJoinPacakage.getNext()
-            if (pack instanceof POJoinPackage)
+            if (pack.getPkgr() instanceof JoinPackager)
             {
                 pack.attachInput(key, tupIter.iterator());
                 while (true)
@@ -410,18 +409,18 @@ public class PigGenericMapReduce {
                 // give only one tuple out for the key
                 pack.attachInput(key, tupIter.iterator());
                 processOnePackageOutput(context);
-            } 
+            }
         }
-        
+
         // return: false-more output
         //         true- end of processing
-        public boolean processOnePackageOutput(Context oc) 
+        public boolean processOnePackageOutput(Context oc)
                 throws IOException, InterruptedException {
 
             Result res = pack.getNextTuple();
             if(res.returnStatus==POStatus.STATUS_OK){
                 Tuple packRes = (Tuple)res.result;
-                
+
                 if(rp.isEmpty()){
                     oc.write(null, packRes);
                     return false;
@@ -430,35 +429,35 @@ public class PigGenericMapReduce {
                     roots[i].attachInput(packRes);
                 }
                 runPipeline(leaf);
-                
+
             }
-            
+
             if(res.returnStatus==POStatus.STATUS_NULL) {
                 return false;
             }
-            
+
             if(res.returnStatus==POStatus.STATUS_ERR){
                 int errCode = 2093;
                 String msg = "Encountered error in package operator while processing group.";
                 throw new ExecException(msg, errCode, PigException.BUG);
             }
-            
+
             if(res.returnStatus==POStatus.STATUS_EOP) {
                 return true;
             }
-                
+
             return false;
-            
+
         }
-        
+
         /**
          * @param leaf
          * @throws InterruptedException
-         * @throws IOException 
+         * @throws IOException
          */
-        protected void runPipeline(PhysicalOperator leaf) 
+        protected void runPipeline(PhysicalOperator leaf)
                 throws InterruptedException, IOException {
-            
+
             while(true)
             {
                 Result redRes = leaf.getNextTuple();
@@ -470,17 +469,17 @@ public class PigGenericMapReduce {
                     }
                     continue;
                 }
-                
+
                 if(redRes.returnStatus==POStatus.STATUS_EOP) {
                     return;
                 }
-                
+
                 if(redRes.returnStatus==POStatus.STATUS_NULL) {
                     continue;
                 }
-                
+
                 if(redRes.returnStatus==POStatus.STATUS_ERR){
-                    // remember that we had an issue so that in 
+                    // remember that we had an issue so that in
                     // close() we can do the right thing
                     errorInReduce   = true;
                     // if there is an errmessage use it
@@ -497,22 +496,22 @@ public class PigGenericMapReduce {
                 }
             }
         }
-        
+
         /**
          * Will be called once all the intermediate keys and values are
          * processed. So right place to stop the reporter thread.
          */
-        @Override 
+        @Override
         protected void cleanup(Context context) throws IOException, InterruptedException {
             super.cleanup(context);
-            
+
             if(errorInReduce) {
                 // there was an error in reduce - just return
                 return;
             }
-            
+
             if(PigMapReduce.sJobConfInternal.get().get("pig.stream.in.reduce", "false").equals("true")) {
-                // If there is a stream in the pipeline we could 
+                // If there is a stream in the pipeline we could
                 // potentially have more to process - so lets
                 // set the flag stating that all map input has been sent
                 // already and then lets run the pipeline one more time
@@ -525,7 +524,7 @@ public class PigGenericMapReduce {
             if (!inIllustrator) {
                 for (POStore store: stores) {
                     if (!initialized) {
-                        MapReducePOStoreImpl impl 
+                        MapReducePOStoreImpl impl
                             = new MapReducePOStoreImpl(context);
                         store.setStoreImpl(impl);
                         store.setUp();
@@ -533,7 +532,7 @@ public class PigGenericMapReduce {
                     store.tearDown();
                 }
             }
-                        
+
             //Calling EvalFunc.finish()
             UDFFinishVisitor finisher = new UDFFinishVisitor(rp, new DependencyOrderWalker<PhysicalOperator, PhysicalPlan>(rp));
             try {
@@ -541,14 +540,14 @@ public class PigGenericMapReduce {
             } catch (VisitorException e) {
                 throw new IOException("Error trying to finish UDFs",e);
             }
-            
+
             PhysicalOperator.setReporter(null);
             initialized = false;
         }
-        
+
         /**
          * Get reducer's illustrator context
-         * 
+         *
          * @param input Input buffer as output by maps
          * @param pkg package
          * @return reducer's illustrator context
@@ -557,25 +556,25 @@ public class PigGenericMapReduce {
          */
         abstract public Context getIllustratorContext(Job job,
                List<Pair<PigNullableWritable, Writable>> input, POPackage pkg) throws IOException, InterruptedException;
-        
+
         abstract public boolean inIllustrator(Context context);
-        
+
         abstract public POPackage getPack(Context context);
     }
-    
+
     /**
      * This "specialized" reduce class is ONLY to be used in pig queries with
      * order by a udf. A UDF used for comparison in the order by expects
      * to be handed tuples. Hence a specialized map class (PigMapReduce.MapWithComparator)
-     * ensures that the "key" used in the order by is wrapped into a tuple (if it 
+     * ensures that the "key" used in the order by is wrapped into a tuple (if it
      * isn't already a tuple). This reduce class unwraps this tuple in the case where
      * the map had wrapped into a tuple and handes the "unwrapped" key to the POPackage
      * for processing
      */
     public static class ReduceWithComparator extends PigMapReduce.Reduce {
-        
+
         private byte keyType;
-        
+
         /**
          * Configures the Reduce plan, the POPackage operator
          * and the reporter thread
@@ -583,7 +582,7 @@ public class PigGenericMapReduce {
         @Override
         protected void setup(Context context) throws IOException, InterruptedException {
             super.setup(context);
-            keyType = pack.getKeyType();
+            keyType = pack.getPkgr().getKeyType();
         }
 
         /**
@@ -593,12 +592,12 @@ public class PigGenericMapReduce {
          * empty or after passing through the reduce plan.
          */
         @Override
-        protected void reduce(PigNullableWritable key, Iterable<NullableTuple> tupIter, Context context) 
+        protected void reduce(PigNullableWritable key, Iterable<NullableTuple> tupIter, Context context)
                 throws IOException, InterruptedException {
-            
+
             if (!initialized) {
                 initialized = true;
-                
+
                 // cache the collector for use in runPipeline()
                 // which could additionally be called from close()
                 this.outputCollector = context;
@@ -606,24 +605,24 @@ public class PigGenericMapReduce {
                 PhysicalOperator.setReporter(pigReporter);
 
                 boolean aggregateWarning = "true".equalsIgnoreCase(pigContext.getProperties().getProperty("aggregate.warning"));
-                
+
                 PigHadoopLogger pigHadoopLogger = PigHadoopLogger.getInstance();
                 pigHadoopLogger.setAggregate(aggregateWarning);
                 PigStatusReporter.setContext(context);
                 pigHadoopLogger.setReporter(PigStatusReporter.getInstance());
 
                 PhysicalOperator.setPigLogger(pigHadoopLogger);
-                
+
                 for (POStore store: stores) {
-                    MapReducePOStoreImpl impl 
+                    MapReducePOStoreImpl impl
                         = new MapReducePOStoreImpl(context);
                     store.setStoreImpl(impl);
                     store.setUp();
                 }
             }
-            
+
             // If the keyType is not a tuple, the MapWithComparator.collect()
-            // would have wrapped the key into a tuple so that the 
+            // would have wrapped the key into a tuple so that the
             // comparison UDF used in the order by can process it.
             // We need to unwrap the key out of the tuple and hand it
             // to the POPackage for processing
@@ -635,31 +634,31 @@ public class PigGenericMapReduce {
                     throw e;
                 }
             }
-            
+
             pack.attachInput(key, tupIter.iterator());
-            
+
             Result res = pack.getNextTuple();
             if(res.returnStatus==POStatus.STATUS_OK){
                 Tuple packRes = (Tuple)res.result;
-                
+
                 if(rp.isEmpty()){
                     context.write(null, packRes);
                     return;
                 }
-                
+
                 rp.attachInput(packRes);
 
                 List<PhysicalOperator> leaves = rp.getLeaves();
-                
+
                 PhysicalOperator leaf = leaves.get(0);
                 runPipeline(leaf);
-                
+
             }
-            
+
             if(res.returnStatus==POStatus.STATUS_NULL) {
                 return;
             }
-            
+
             if(res.returnStatus==POStatus.STATUS_ERR){
                 int errCode = 2093;
                 String msg = "Encountered error in package operator while processing group.";
@@ -669,5 +668,5 @@ public class PigGenericMapReduce {
         }
 
     }
-   
+
 }

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizer.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizer.java?rev=1546314&r1=1546313&r2=1546314&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizer.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizer.java Thu Nov 28 08:56:33 2013
@@ -30,10 +30,10 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.PORelationToExprProject;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.JoinPackager;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
@@ -100,7 +100,7 @@ public class SecondaryKeyOptimizer exten
                 columns
                         .add(rearrange.getIndex()
                                 & PigNullableWritable.idxSpace);
-                
+
                 // The first item inside columnChainInfo is set to type Tuple.
                 // This value is not actually in use, but it intends to match
                 // the type of POProject in reduce side
@@ -111,13 +111,13 @@ public class SecondaryKeyOptimizer exten
                     if (node instanceof POProject) {
                         POProject project = (POProject) node;
                         if(project.isProjectToEnd()){
-                            columnChainInfo.insert(project.getStartCol(), 
+                            columnChainInfo.insert(project.getStartCol(),
                                     project.getResultType());
                         }else {
                             columnChainInfo.insert(
                                     project.getColumns(), project.getResultType());
                         }
-                        
+
                         if (plan.getSuccessors(node) == null)
                             node = null;
                         else if (plan.getSuccessors(node).size() != 1) {
@@ -154,8 +154,7 @@ public class SecondaryKeyOptimizer exten
 
         List<PhysicalOperator> mapLeaves = mr.mapPlan.getLeaves();
         if (mapLeaves == null || mapLeaves.size() != 1) {
-            log
-                    .debug("Expected map to have single leaf! Skip secondary key optimizing");
+            log.debug("Expected map to have single leaf! Skip secondary key optimizing");
             return;
         }
         PhysicalOperator mapLeaf = mapLeaves.get(0);
@@ -165,8 +164,7 @@ public class SecondaryKeyOptimizer exten
             if (mapLeaf instanceof POLocalRearrange) {
                 SortKeyInfo sortKeyInfo = getSortKeyInfo((POLocalRearrange) mapLeaf);
                 if (sortKeyInfo == null) {
-                    log
-                            .debug("Cannot get sortKeyInfo from POLocalRearrange, skip secondary key optimizing");
+                    log.debug("Cannot get sortKeyInfo from POLocalRearrange, skip secondary key optimizing");
                     return;
                 }
                 sortKeyInfos.add(sortKeyInfo);
@@ -188,8 +186,7 @@ public class SecondaryKeyOptimizer exten
                 return;
             }
         } catch (ExecException e) {
-            log
-                    .debug("Cannot get sortKeyInfo from POLocalRearrange, skip secondary key optimizing");
+            log.debug("Cannot get sortKeyInfo from POLocalRearrange, skip secondary key optimizing");
             return;
         }
 
@@ -200,15 +197,13 @@ public class SecondaryKeyOptimizer exten
 
         List<PhysicalOperator> reduceRoots = mr.reducePlan.getRoots();
         if (reduceRoots.size() != 1) {
-            log
-                    .debug("Expected reduce to have single root, skip secondary key optimizing");
+            log.debug("Expected reduce to have single root, skip secondary key optimizing");
             return;
         }
 
         PhysicalOperator root = reduceRoots.get(0);
         if (!(root instanceof POPackage)) {
-            log
-                    .debug("Expected reduce root to be a POPackage, skip secondary key optimizing");
+            log.debug("Expected reduce root to be a POPackage, skip secondary key optimizing");
             return;
         }
 
@@ -217,7 +212,8 @@ public class SecondaryKeyOptimizer exten
         PhysicalOperator currentNode = root;
         POForEach foreach = null;
         while (currentNode != null) {
-            if (currentNode instanceof POPackage && !(currentNode instanceof POJoinPackage)
+            if (currentNode instanceof POPackage
+                    && !(((POPackage) currentNode).getPkgr() instanceof JoinPackager)
                     || currentNode instanceof POFilter
                     || currentNode instanceof POLimit) {
                 List<PhysicalOperator> succs = mr.reducePlan
@@ -237,7 +233,7 @@ public class SecondaryKeyOptimizer exten
                 return;
             }
         }
-        
+
         // We do not find a foreach (we shall not come here, a trick to fool findbugs)
         if (foreach==null)
             return;
@@ -355,7 +351,7 @@ public class SecondaryKeyOptimizer exten
                 for (PhysicalOperator pred : preds) {
                     POLocalRearrange rearrange = (POLocalRearrange) pred;
                     rearrange.setUseSecondaryKey(true);
-                    if (rearrange.getIndex() == indexOfRearrangeToChange) { 
+                    if (rearrange.getIndex() == indexOfRearrangeToChange) {
                         // Try to find the POLocalRearrange for the secondary key
                         found = true;
                         setSecondaryPlan(mr.mapPlan, rearrange, secondarySortKeyInfo);
@@ -368,7 +364,7 @@ public class SecondaryKeyOptimizer exten
                 }
             }
             POPackage pack = (POPackage) root;
-            pack.setUseSecondaryKey(true);
+            pack.getPkgr().setUseSecondaryKey(true);
         }
     }
 
@@ -443,7 +439,7 @@ public class SecondaryKeyOptimizer exten
     // sort key.
     private static class SecondaryKeyDiscover {
         PhysicalPlan mPlan;
-        
+
         List<POSort> sortsToRemove = new ArrayList<POSort>();
 
         List<PODistinct> distinctsToChange = new ArrayList<PODistinct>();
@@ -461,7 +457,7 @@ public class SecondaryKeyOptimizer exten
             this.sortKeyInfos = sortKeyInfos;
             this.secondarySortKeyInfo = secondarySortKeyInfo;
         }
-        
+
         public void process() throws FrontendException
         {
             List<PhysicalOperator> roots = mPlan.getRoots();
@@ -470,7 +466,7 @@ public class SecondaryKeyOptimizer exten
                 processRoot(root);
             }
         }
-        
+
         public void processRoot(PhysicalOperator root) throws FrontendException {
             PhysicalOperator currentNode = root;
             while (currentNode!=null) {
@@ -486,10 +482,10 @@ public class SecondaryKeyOptimizer exten
                          // We don't process foreach, since foreach is too complex to get right
                          currentNode instanceof POForEach)
                     break;
-                
+
                 if (sawInvalidPhysicalOper)
                     break;
-                
+
                 List<PhysicalOperator> succs = mPlan.getSuccessors(currentNode);
                 if (succs==null)
                     currentNode = null;
@@ -609,7 +605,7 @@ public class SecondaryKeyOptimizer exten
     static private boolean collectColumnChain(PhysicalPlan plan,
             ColumnChainInfo columnChainInfo) throws PlanException {
         if (plan.getRoots().size() != 1) {
-        	return true;
+            return true;
         }
 
         PhysicalOperator currentNode = plan.getRoots().get(0);

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java?rev=1546314&r1=1546313&r2=1546314&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/POPackageAnnotator.java Thu Nov 28 08:56:33 2013
@@ -27,11 +27,10 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.CombinerPackager;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.LitePackager;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackageLite;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCombinerPackage;
 import org.apache.pig.impl.plan.DepthFirstWalker;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.plan.optimizer.OptimizerException;
@@ -57,7 +56,7 @@ public class POPackageAnnotator extends 
 
     @Override
     public void visitMROp(MapReduceOper mr) throws VisitorException {
-        
+
         // POPackage OR POJoinPackage could be present in the combine plan
         // OR in the reduce plan. POPostCombinerPackage could
         // be present only in the reduce plan. Search in these two
@@ -68,9 +67,9 @@ public class POPackageAnnotator extends 
             POPackage pkg = pkgDiscoverer.getPkg();
             if(pkg != null) {
                 handlePackage(mr, pkg);
-            }   
+            }
         }
-        
+
         if(!mr.reducePlan.isEmpty()) {
             PackageDiscoverer pkgDiscoverer = new PackageDiscoverer(mr.reducePlan);
             pkgDiscoverer.visit();
@@ -78,7 +77,7 @@ public class POPackageAnnotator extends 
             if(pkg != null) {
                 // if the POPackage is actually a POPostCombinerPackage, then we should
                 // just look for the corresponding LocalRearrange(s) in the combine plan
-                if(pkg instanceof POCombinerPackage) {
+                if (pkg.getPkgr() instanceof CombinerPackager) {
                     if(patchPackage(mr.combinePlan, pkg) != pkg.getNumInps()) {
                         int errCode = 2085;
                         String msg = "Unexpected problem during optimization." +
@@ -90,14 +89,14 @@ public class POPackageAnnotator extends 
                 }
             }
         }
-        
+
     }
-    
+
     private void handlePackage(MapReduceOper mr, POPackage pkg) throws VisitorException {
         // the LocalRearrange(s) could either be in the map of this MapReduceOper
         // OR in the reduce of predecessor MapReduceOpers
         int lrFound = 0;
-        
+
         lrFound = patchPackage(mr.mapPlan, pkg);
         if(lrFound != pkg.getNumInps()) {
             // we did not find the LocalRearrange(s) in the map plan
@@ -110,7 +109,7 @@ public class POPackageAnnotator extends 
                 lrFound += patchPackage(mrOper.reducePlan, pkg);
                 if(lrFound == pkg.getNumInps()) {
                     break;
-                }     
+                }
             }
         }
         if(lrFound != pkg.getNumInps()) {
@@ -127,7 +126,7 @@ public class POPackageAnnotator extends 
         // the package
         return lrDiscoverer.getLoRearrangeFound();
     }
-    
+
     /**
      * Simple visitor of the "Reduce" physical plan
      * which will get a reference to the POPacakge
@@ -136,36 +135,15 @@ public class POPackageAnnotator extends 
     static class PackageDiscoverer extends PhyPlanVisitor {
 
         private POPackage pkg;
-        
+
         public PackageDiscoverer(PhysicalPlan plan) {
             super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
         }
-        
-        /* (non-Javadoc)
-         * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor#visitStream(org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStream)
-         */
+
         @Override
         public void visitPackage(POPackage pkg) throws VisitorException {
             this.pkg = pkg;
         };
-        
-        /* (non-Javadoc)
-         * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor#visitJoinPackage(org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage)
-         */
-        @Override
-        public void visitJoinPackage(POJoinPackage joinPackage)
-                throws VisitorException {
-            this.pkg = joinPackage;
-        }
-        
-        /* (non-Javadoc)
-         * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor#visitCombinerPackage(org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPostCombinerPackage)
-         */
-        @Override
-        public void visitCombinerPackage(POCombinerPackage pkg)
-                throws VisitorException {
-            this.pkg = pkg;
-        }
 
         /**
          * @return the pkg
@@ -173,9 +151,9 @@ public class POPackageAnnotator extends 
         public POPackage getPkg() {
             return pkg;
         }
-        
+
     }
-    
+
     /**
      * Physical Plan visitor which tries to get the
      * LocalRearrange(s) present in the plan (if any) and
@@ -184,24 +162,21 @@ public class POPackageAnnotator extends 
      * present in the "key")
      */
     static class LoRearrangeDiscoverer extends PhyPlanVisitor {
-        
+
         private int loRearrangeFound = 0;
         private POPackage pkg;
-        
+
         public LoRearrangeDiscoverer(PhysicalPlan plan, POPackage pkg) {
             super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
             this.pkg = pkg;
         }
-        
-        /* (non-Javadoc)
-         * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor#visitStream(org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStream)
-         */
+
         @Override
         public void visitLocalRearrange(POLocalRearrange lrearrange) throws VisitorException {
             loRearrangeFound++;
             Map<Integer,Pair<Boolean, Map<Integer, Integer>>> keyInfo;
 
-            if (pkg instanceof POPackageLite) {
+            if (pkg.getPkgr() instanceof LitePackager) {
                 if(lrearrange.getIndex() != 0) {
                     // Throw some exception here
                     throw new RuntimeException("POLocalRearrange for POPackageLite cannot have index other than 0, but has index - "+lrearrange.getIndex());
@@ -210,26 +185,26 @@ public class POPackageAnnotator extends 
 
             // annotate the package with information from the LORearrange
             // update the keyInfo information if already present in the POPackage
-            keyInfo = pkg.getKeyInfo();
+            keyInfo = pkg.getPkgr().getKeyInfo();
             if(keyInfo == null)
                 keyInfo = new HashMap<Integer, Pair<Boolean, Map<Integer, Integer>>>();
-            
+
             if(keyInfo.get(Integer.valueOf(lrearrange.getIndex())) != null) {
-                // something is wrong - we should not be getting key info 
+                // something is wrong - we should not be getting key info
                 // for the same index from two different Local Rearranges
                 int errCode = 2087;
                 String msg = "Unexpected problem during optimization." +
-                " Found index:" + lrearrange.getIndex() + 
+                " Found index:" + lrearrange.getIndex() +
                 " in multiple LocalRearrange operators.";
                 throw new OptimizerException(msg, errCode, PigException.BUG);
-                
+
             }
-            keyInfo.put(Integer.valueOf(lrearrange.getIndex()), 
+            keyInfo.put(Integer.valueOf(lrearrange.getIndex()),
                 new Pair<Boolean, Map<Integer, Integer>>(
                         lrearrange.isProjectStar(), lrearrange.getProjectedColsMap()));
-            pkg.setKeyInfo(keyInfo);
-            pkg.setKeyTuple(lrearrange.isKeyTuple());
-            pkg.setKeyCompound(lrearrange.isKeyCompound());
+            pkg.getPkgr().setKeyInfo(keyInfo);
+            pkg.getPkgr().setKeyTuple(lrearrange.isKeyTuple());
+            pkg.getPkgr().setKeyCompound(lrearrange.isKeyCompound());
         }
 
         /**

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java?rev=1546314&r1=1546313&r2=1546314&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java Thu Nov 28 08:56:33 2013
@@ -20,18 +20,67 @@ package org.apache.pig.backend.hadoop.ex
 import java.util.List;
 
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.*;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.*;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.Add;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.Divide;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.EqualToExpr;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.GTOrEqualToExpr;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.GreaterThanExpr;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.LTOrEqualToExpr;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.LessThanExpr;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.Mod;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.Multiply;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.NotEqualToExpr;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POAnd;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POBinCond;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POCast;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POIsNull;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POMapLookUp;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.PONegative;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.PONot;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POOr;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.PORegexp;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserComparisonFunc;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.Subtract;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCross;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODemux;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PONative;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POOptimizedForEach;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartialAgg;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartitionRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPreCombinerLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
 import org.apache.pig.impl.plan.PlanVisitor;
 import org.apache.pig.impl.plan.PlanWalker;
 import org.apache.pig.impl.plan.VisitorException;
 
 /**
  * The visitor class for the Physical Plan. To use this,
- * create the visitor with the plan to be visited. Call 
+ * create the visitor with the plan to be visited. Call
  * the visit() method to traverse the plan in a depth first
  * fashion.
- * 
+ *
  * This class also visits the nested plans inside the operators.
  * One has to extend this class to modify the nature of each visit
  * and to maintain any relevant state information between the visits
@@ -47,15 +96,15 @@ public class PhyPlanVisitor extends Plan
     public void visitLoad(POLoad ld) throws VisitorException{
         //do nothing
     }
-    
+
     public void visitStore(POStore st) throws VisitorException{
         //do nothing
     }
-    
+
     public void visitNative(PONative nat) throws VisitorException{
         //do nothing
     }
-    
+
     public void visitFilter(POFilter fl) throws VisitorException{
         pushWalker(mCurrentWalker.spawnChildWalker(fl.getPlan()));
         visit();
@@ -71,7 +120,7 @@ public class PhyPlanVisitor extends Plan
             popWalker();
         }
     }
-    
+
     public void visitLocalRearrange(POLocalRearrange lr) throws VisitorException{
         List<PhysicalPlan> inpPlans = lr.getPlans();
         for (PhysicalPlan plan : inpPlans) {
@@ -84,19 +133,11 @@ public class PhyPlanVisitor extends Plan
     public void visitGlobalRearrange(POGlobalRearrange gr) throws VisitorException{
         //do nothing
     }
-    
+
     public void visitPackage(POPackage pkg) throws VisitorException{
         //do nothing
     }
-    
-    public void visitCombinerPackage(POCombinerPackage pkg) throws VisitorException{
-        //do nothing
-    }
- 
-    public void visitMultiQueryPackage(POMultiQueryPackage pkg) throws VisitorException{
-        //do nothing
-    }
-    
+
     public void visitPOForEach(POForEach nfe) throws VisitorException {
         List<PhysicalPlan> inpPlans = nfe.getInputPlans();
         for (PhysicalPlan plan : inpPlans) {
@@ -105,11 +146,11 @@ public class PhyPlanVisitor extends Plan
             popWalker();
         }
     }
-    
+
     public void visitUnion(POUnion un) throws VisitorException{
         //do nothing
     }
-    
+
     public void visitSplit(POSplit spl) throws VisitorException{
         List<PhysicalPlan> plans = spl.getPlans();
         for (PhysicalPlan plan : plans) {
@@ -136,78 +177,78 @@ public class PhyPlanVisitor extends Plan
         //do nothing
     }
 
-	public void visitDistinct(PODistinct distinct) throws VisitorException {
+    public void visitDistinct(PODistinct distinct) throws VisitorException {
         //do nothing
-	}
+    }
 
-	public void visitSort(POSort sort) throws VisitorException {
+    public void visitSort(POSort sort) throws VisitorException {
         List<PhysicalPlan> inpPlans = sort.getSortPlans();
         for (PhysicalPlan plan : inpPlans) {
             pushWalker(mCurrentWalker.spawnChildWalker(plan));
             visit();
             popWalker();
         }
-	}
-    
+    }
+
     public void visitConstant(ConstantExpression cnst) throws VisitorException{
         //do nothing
     }
-    
+
     public void visitProject(POProject proj) throws VisitorException{
         //do nothing
     }
-    
+
     public void visitGreaterThan(GreaterThanExpr grt) throws VisitorException{
         //do nothing
     }
-    
+
     public void visitLessThan(LessThanExpr lt) throws VisitorException{
         //do nothing
     }
-    
+
     public void visitGTOrEqual(GTOrEqualToExpr gte) throws VisitorException{
         //do nothing
     }
-    
+
     public void visitLTOrEqual(LTOrEqualToExpr lte) throws VisitorException{
         //do nothing
     }
-    
+
     public void visitEqualTo(EqualToExpr eq) throws VisitorException{
         //do nothing
     }
-    
+
     public void visitNotEqualTo(NotEqualToExpr eq) throws VisitorException{
         //do nothing
     }
-    
+
     public void visitRegexp(PORegexp re) throws VisitorException{
         //do nothing
     }
 
     public void visitIsNull(POIsNull isNull) throws VisitorException {
     }
-    
+
     public void visitAdd(Add add) throws VisitorException{
         //do nothing
     }
-    
+
     public void visitSubtract(Subtract sub) throws VisitorException {
         //do nothing
     }
-    
+
     public void visitMultiply(Multiply mul) throws VisitorException {
         //do nothing
     }
-    
+
     public void visitDivide(Divide dv) throws VisitorException {
         //do nothing
     }
-    
+
     public void visitMod(Mod mod) throws VisitorException {
         //do nothing
     }
-    
+
     public void visitAnd(POAnd and) throws VisitorException {
         //do nothing
     }
@@ -222,83 +263,79 @@ public class PhyPlanVisitor extends Plan
 
     public void visitBinCond(POBinCond binCond) {
         // do nothing
-        
+
     }
 
     public void visitNegative(PONegative negative) {
         //do nothing
-        
+
     }
-    
+
     public void visitUserFunc(POUserFunc userFunc) throws VisitorException {
         //do nothing
     }
-    
+
     public void visitComparisonFunc(POUserComparisonFunc compFunc) throws VisitorException {
         //do nothing
     }
 
     public void visitMapLookUp(POMapLookUp mapLookUp) {
         // TODO Auto-generated method stub
-        
-    }
-    
-    public void visitJoinPackage(POJoinPackage joinPackage) throws VisitorException{
-        //do nothing
+
     }
 
     public void visitCast(POCast cast) {
         // TODO Auto-generated method stub
-        
+
     }
-    
+
     public void visitLimit(POLimit lim) throws VisitorException{
         //do nothing
     }
-    
+
     public void visitCross(POCross cross) throws VisitorException{
         //do nothing
     }
-    
+
     public void visitFRJoin(POFRJoin join) throws VisitorException {
         //do nothing
     }
-    
+
     public void visitMergeJoin(POMergeJoin join) throws VisitorException {
         //do nothing
     }
-    
+
     public void visitMergeCoGroup(POMergeCogroup mergeCoGrp) throws VisitorException{
-        
+
     }
     /**
      * @param stream
-     * @throws VisitorException 
+     * @throws VisitorException
      */
     public void visitStream(POStream stream) throws VisitorException {
         // TODO Auto-generated method stub
-        
+
     }
 
-	public void visitSkewedJoin(POSkewedJoin sk) throws VisitorException {
+    public void visitSkewedJoin(POSkewedJoin sk) throws VisitorException {
 
-	}
+    }
 
-	public void visitPartitionRearrange(POPartitionRearrange pr) throws VisitorException {
+    public void visitPartitionRearrange(POPartitionRearrange pr) throws VisitorException {
         List<PhysicalPlan> inpPlans = pr.getPlans();
         for (PhysicalPlan plan : inpPlans) {
             pushWalker(mCurrentWalker.spawnChildWalker(plan));
             visit();
             popWalker();
         }
-	}
+    }
 
     /**
      * @param optimizedForEach
      */
     public void visitPOOptimizedForEach(POOptimizedForEach optimizedForEach) throws VisitorException {
         // TODO Auto-generated method stub
-        
+
     }
 
     /**

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java?rev=1546314&r1=1546313&r2=1546314&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java Thu Nov 28 08:56:33 2013
@@ -30,7 +30,23 @@ import java.util.Set;
 
 import org.apache.pig.PigException;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.*;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODemux;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFRJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartialAgg;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
 import org.apache.pig.impl.plan.DepthFirstWalker;
 import org.apache.pig.impl.plan.Operator;
 import org.apache.pig.impl.plan.OperatorPlan;
@@ -46,7 +62,7 @@ public class PlanPrinter<O extends Opera
     String TABMore = "|   ";
 
     String LSep = "|\n|---";
-    
+
     String USep = "|   |\n|   ";
 
     int levelCntr = -1;
@@ -58,7 +74,7 @@ public class PlanPrinter<O extends Opera
     public PlanPrinter(P plan) {
         super(plan, new DepthFirstWalker<O, P>(plan));
     }
-    
+
     public PlanPrinter(P plan, PrintStream stream) {
         super(plan, new DepthFirstWalker<O, P>(plan));
         this.stream = stream;
@@ -124,7 +140,7 @@ public class PlanPrinter<O extends Opera
         sb.delete(sb.length() - "\n".length(), sb.length());
         return sb.toString();
     }
-    
+
     private String planString(PhysicalPlan pp){
         StringBuilder sb = new StringBuilder();
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
@@ -136,7 +152,7 @@ public class PlanPrinter<O extends Opera
         sb.append(shiftStringByTabs(baos.toString(), 2));
         return sb.toString();
     }
-    
+
     private String planString(List<PhysicalPlan> lep){
         StringBuilder sb = new StringBuilder();
         if(lep!=null)
@@ -175,12 +191,6 @@ public class PlanPrinter<O extends Opera
           else if(node instanceof POForEach){
             sb.append(planString(((POForEach)node).getInputPlans()));
           }
-          else if (node instanceof POMultiQueryPackage) {
-              List<POPackage> pkgs = ((POMultiQueryPackage)node).getPackages();
-              for (POPackage pkg : pkgs) {
-                  sb.append(LSep + pkg.name() + "\n");
-              }
-          }
           else if(node instanceof POFRJoin){
             POFRJoin frj = (POFRJoin)node;
             List<List<PhysicalPlan>> joinPlans = frj.getJoinPlans();
@@ -193,13 +203,13 @@ public class PlanPrinter<O extends Opera
               POSkewedJoin skewed = (POSkewedJoin)node;
               MultiMap<PhysicalOperator, PhysicalPlan> joinPlans = skewed.getJoinPlans();
               if(joinPlans!=null) {
-            	  List<PhysicalPlan> inner_plans = new ArrayList<PhysicalPlan>();
-            	  inner_plans.addAll(joinPlans.values());                
-                  sb.append(planString(inner_plans));                
+                  List<PhysicalPlan> inner_plans = new ArrayList<PhysicalPlan>();
+                  inner_plans.addAll(joinPlans.values());
+                  sb.append(planString(inner_plans));
               }
             }
         }
-        
+
         if (node instanceof POSplit) {
             sb.append(planString(((POSplit)node).getPlans()));
         }
@@ -210,13 +220,13 @@ public class PlanPrinter<O extends Opera
             plans.addAll(pl);
             sb.append(planString(plans));
         }
-        
+
         List<O> originalPredecessors = mPlan.getPredecessors(node);
         if (originalPredecessors == null)
             return sb.toString();
-        
+
         List<O> predecessors =  new ArrayList<O>(originalPredecessors);
-        
+
         Collections.sort(predecessors);
         int i = 0;
         for (O pred : predecessors) {
@@ -280,5 +290,5 @@ public class PlanPrinter<O extends Opera
     public void visitStartMap(POUnion op) {
         stream.print(op.name() + "   ");
     }
-    
+
 }

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/XMLPhysicalPlanPrinter.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/XMLPhysicalPlanPrinter.java?rev=1546314&r1=1546313&r2=1546314&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/XMLPhysicalPlanPrinter.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/XMLPhysicalPlanPrinter.java Thu Nov 28 08:56:33 2013
@@ -40,8 +40,6 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMultiQueryPackage;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
@@ -56,10 +54,10 @@ import org.w3c.dom.Element;
 
 public class XMLPhysicalPlanPrinter<P extends OperatorPlan<PhysicalOperator>> extends
         PhyPlanVisitor {
-    
+
     private Document doc = null;
     private Element parent = null;
-    
+
     public XMLPhysicalPlanPrinter(PhysicalPlan plan, Document doc, Element parent) {
         super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
         this.doc = doc;
@@ -84,7 +82,7 @@ public class XMLPhysicalPlanPrinter<P ex
             transformer.setOutputProperty(OutputKeys.OMIT_XML_DECLARATION, "yes");
             transformer.setOutputProperty(OutputKeys.INDENT, "yes");
             transformer.setOutputProperty("{http://xml.apache.org/xslt}indent-amount", "2");
-            
+
             StringWriter sw = new StringWriter();
             StreamResult result = new StreamResult(sw);
             DOMSource source = new DOMSource(doc);
@@ -94,7 +92,7 @@ public class XMLPhysicalPlanPrinter<P ex
             e.printStackTrace();
         }
     }
-    
+
     private Element createAlias(PhysicalOperator po) {
         Element aliasNode = null;
         String alias = po.getAlias();
@@ -112,8 +110,8 @@ public class XMLPhysicalPlanPrinter<P ex
             depthFirst(leaf, parentNode);
         }
     }
-    
-    
+
+
     private void visitPlan(PhysicalPlan pp, Element parentNode) throws VisitorException {
         if(pp!=null) {
             XMLPhysicalPlanPrinter<PhysicalPlan> ppp =
@@ -121,15 +119,15 @@ public class XMLPhysicalPlanPrinter<P ex
             ppp.visit();
         }
     }
-    
-    
+
+
     private void visitPlan(List<PhysicalPlan> lep, Element parentNode) throws VisitorException {
         if(lep!=null)
             for (PhysicalPlan ep : lep) {
                 visitPlan(ep, parentNode);
             }
     }
-    
+
     private Element createPONode(PhysicalOperator node) {
         Element PONode = doc.createElement(node.getClass().getSimpleName());
         PONode.setAttribute("scope", "" + node.getOperatorKey().id);
@@ -150,18 +148,18 @@ public class XMLPhysicalPlanPrinter<P ex
             Element loadFile = doc.createElement("loadFile");
             loadFile.setTextContent(((POLoad)node).getLFile().getFileName());
             PONode.appendChild(loadFile);
-            
+
             Element isTmpLoad = doc.createElement("isTmpLoad");
             isTmpLoad.setTextContent(Boolean.valueOf(((POLoad)node).isTmpLoad()).toString());
             PONode.appendChild(isTmpLoad);
         }
         return PONode;
     }
-    
+
 
     private void depthFirst(PhysicalOperator node, Element parentNode) throws VisitorException {
         Element childNode = null;
-        
+
         List<PhysicalPlan> subPlans = new ArrayList<PhysicalPlan>();
         if(node instanceof POFilter){
             subPlans.add(((POFilter) node).getPlan());
@@ -177,12 +175,6 @@ public class XMLPhysicalPlanPrinter<P ex
             subPlans = ((POSplit)node).getPlans();
         } else if (node instanceof PODemux) {
             subPlans = ((PODemux)node).getPlans();
-        } else if (node instanceof POMultiQueryPackage) {
-            childNode = createPONode(node);   
-            List<POPackage> pkgs = ((POMultiQueryPackage)node).getPackages();
-            for (POPackage pkg : pkgs) {
-                childNode.appendChild(createPONode(pkg));
-            }
         } else if(node instanceof POFRJoin){
             childNode = createPONode(node);
             POFRJoin frj = (POFRJoin)node;
@@ -198,11 +190,11 @@ public class XMLPhysicalPlanPrinter<P ex
             MultiMap<PhysicalOperator, PhysicalPlan> joinPlans = skewed.getJoinPlans();
             if(joinPlans!=null) {
                 List<PhysicalPlan> inner_plans = new ArrayList<PhysicalPlan>();
-            	inner_plans.addAll(joinPlans.values());   
-            	visitPlan(inner_plans, childNode);
+                inner_plans.addAll(joinPlans.values());
+                visitPlan(inner_plans, childNode);
             }
         }
-        
+
         if (childNode == null) {
             childNode = createPONode(node);
             if (subPlans.size() > 0) {
@@ -210,17 +202,17 @@ public class XMLPhysicalPlanPrinter<P ex
             }
         }
         parentNode.appendChild(childNode);
-        
+
         List<PhysicalOperator> originalPredecessors = mPlan.getPredecessors(node);
         if (originalPredecessors == null) {
             return;
         }
-        
+
         List<PhysicalOperator> predecessors =  new ArrayList<PhysicalOperator>(originalPredecessors);
-        
+
         Collections.sort(predecessors);
         for (PhysicalOperator pred : predecessors) {
             depthFirst(pred, childNode);
         }
     }
-}
\ No newline at end of file
+}

Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java?rev=1546314&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java (added)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java Thu Nov 28 08:56:33 2013
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
+
+import java.util.Arrays;
+import java.util.Map;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.InternalCachedBag;
+import org.apache.pig.data.NonSpillableDataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.util.Pair;
+/**
+ * The package operator that packages the globally rearranged tuples into
+ * output format after the combiner stage.  It differs from POPackage in that
+ * it does not use the index in the NullableTuple to find the bag to put a
+ * tuple in.  Instead, the inputs are put in a bag corresponding to their
+ * offset in the tuple.
+ */
+public class CombinerPackager extends Packager {
+
+    private static final long serialVersionUID = 1L;
+
+    private boolean[] mBags; // For each field, indicates whether or not it
+                             // needs to be put in a bag.
+
+    private Map<Integer, Integer> keyLookup;
+
+    private int numBags;
+
+    /**
+     * A new POPostCombinePackage will be constructed as a near clone of the
+     * provided POPackage.
+     * @param pkg POPackage to clone.
+     * @param bags for each field, indicates whether it should be a bag (true)
+     * or a simple field (false).
+     */
+    public CombinerPackager(Packager pkgr, boolean[] bags) {
+        super();
+        keyType = pkgr.keyType;
+        numInputs = 1;
+        inner = new boolean[1];
+        for (int i = 0; i < pkgr.inner.length; i++) {
+            inner[i] = true;
+        }
+        if (bags != null) {
+            mBags = Arrays.copyOf(bags, bags.length);
+        }
+        numBags = 0;
+        for (int i = 0; i < mBags.length; i++) {
+            if (mBags[i]) numBags++;
+        }
+    }
+
+    /**
+     * @param keyInfo the keyInfo to set
+     */
+    public void setKeyInfo(Map<Integer, Pair<Boolean, Map<Integer, Integer>>> keyInfo) {
+        this.keyInfo = keyInfo;
+        // TODO: IMPORTANT ASSUMPTION: Currently we only combine in the
+        // group case and not in cogroups. So there should only
+        // be one LocalRearrange from which we get the keyInfo for
+        // which field in the value is in the key. This LocalRearrange
+        // has an index of 0. When we do support combiner in Cogroups
+        // THIS WILL NEED TO BE REVISITED.
+        Pair<Boolean, Map<Integer, Integer>> lrKeyInfo =
+            keyInfo.get(0); // assumption: only group are "combinable", hence index 0
+        keyLookup = lrKeyInfo.second;
+    }
+
+    private DataBag createDataBag(int numBags) {
+        String bagType = null;
+        if (PigMapReduce.sJobConfInternal.get() != null) {
+               bagType = PigMapReduce.sJobConfInternal.get().get("pig.cachedbag.type");
+           }
+
+        if (bagType != null && bagType.equalsIgnoreCase("default")) {
+            return new NonSpillableDataBag();
+        }
+        return new InternalCachedBag(numBags);
+    }
+
+    @Override
+    public Result getNext() throws ExecException {
+        //Create numInputs bags
+        Object[] fields = new Object[mBags.length];
+        for (int i = 0; i < mBags.length; i++) {
+            if (mBags[i]) fields[i] = createDataBag(numBags);
+        }
+
+        // For each indexed tup in the inp, split them up and place their
+        // fields into the proper bags.  If the given field isn't a bag, just
+        // set the value as is.
+        for (Tuple tup : bags[0]) {
+            int tupIndex = 0; // an index for accessing elements from
+                              // the value (tup) that we have currently
+            for(int i = 0; i < mBags.length; i++) {
+                Integer keyIndex = keyLookup.get(i);
+                if(keyIndex == null && mBags[i]) {
+                    // the field for this index is not the
+                    // key - so just take it from the "value"
+                    // we were handed - Currently THIS HAS TO BE A BAG
+                    // In future if this changes, THIS WILL NEED TO BE
+                    // REVISITED.
+                    ((DataBag)fields[i]).add((Tuple)tup.get(tupIndex));
+                    tupIndex++;
+                } else {
+                    // the field for this index is in the key
+                    fields[i] = key;
+                }
+            }
+        }
+
+        // The successor of the POCombinerPackage as of
+        // now SHOULD be a POForeach which has been adjusted
+        // to look for its inputs by projecting from the corresponding
+        // positions in the POCombinerPackage output.
+        // So we will NOT be adding the key in the result here but merely
+        // putting all bags into a result tuple and returning it.
+        Tuple res;
+        res = mTupleFactory.newTuple(mBags.length);
+        for (int i = 0; i < mBags.length; i++) res.set(i, fields[i]);
+        Result r = new Result();
+        r.result = res;
+        r.returnStatus = POStatus.STATUS_OK;
+        return r;
+    }
+
+    @Override
+    protected Tuple getValueTuple(Object key, NullableTuple ntup, int index)
+            throws ExecException {
+        return (Tuple) ntup.getValueAsPigType();
+    }
+
+}

Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/JoinPackager.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/JoinPackager.java?rev=1546314&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/JoinPackager.java (added)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/JoinPackager.java Thu Nov 28 08:56:33 2013
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.InternalCachedBag;
+import org.apache.pig.data.NonSpillableDataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.plan.NodeIdGenerator;
+import org.apache.pig.impl.plan.OperatorKey;
+
+public class JoinPackager extends Packager {
+
+    private static final long serialVersionUID = 1L;
+
+    private POOptimizedForEach forEach;
+    private boolean newKey = true;
+    private Tuple res = null;
+    private static final Result eopResult = new Result(POStatus.STATUS_EOP, null);
+    private boolean firstTime = true;
+    private boolean useDefaultBag = false;
+
+    public static final String DEFAULT_CHUNK_SIZE = "1000";
+
+    private long chunkSize = Long.parseLong(DEFAULT_CHUNK_SIZE);
+    private Result forEachResult;
+    private DataBag[] dbs = null;
+
+    private int lastBagIndex;
+
+    private Iterator<Tuple> lastBagIter;
+
+    public JoinPackager(Packager p, POForEach f) {
+        super();
+        String scope = f.getOperatorKey().getScope();
+        NodeIdGenerator nig = NodeIdGenerator.getGenerator();
+        forEach = new POOptimizedForEach(new OperatorKey(scope,nig.getNextNodeId(scope)));
+        if (p!=null)
+        {
+            setKeyType(p.getKeyType());
+            setNumInputs(p.getNumInputs());
+            lastBagIndex = numInputs - 1;
+            setInner(p.getInner());
+            setKeyInfo(p.getKeyInfo());
+            this.isKeyTuple = p.isKeyTuple;
+            this.isKeyCompound = p.isKeyCompound;
+        }
+        if (f!=null)
+        {
+            setInputPlans(f.getInputPlans());
+            setToBeFlattened(f.getToBeFlattened());
+        }
+    }
+
+    /**
+     * Calls getNext to get next ForEach result. The input for POJoinPackage is
+     * a (key, NullableTuple) pair. We will materialize n-1 inputs into bags, feed input#n
+     * one tuple a time to the delegated ForEach operator, the input for ForEach is
+     *
+     *     (input#1, input#2, input#3....input#n[i]), i=(1..k), suppose input#n consists
+     *
+     * of k tuples.
+     * For every ForEach input, pull all the results from ForEach.
+     * getNext will be called multiple times for a particular input,
+     * it returns one output tuple from ForEach every time we call getNext,
+     * so we need to maintain internal status to keep tracking of where we are.
+     */
+    @Override
+    public Result getNext() throws ExecException {
+
+        if(firstTime){
+            firstTime = false;
+            if (PigMapReduce.sJobConfInternal.get() != null) {
+                String bagType = PigMapReduce.sJobConfInternal.get().get("pig.cachedbag.type");
+                if (bagType != null && bagType.equalsIgnoreCase("default")) {
+                    useDefaultBag = true;
+                }
+            }
+        }
+
+        Tuple it = null;
+
+        // If we see a new NullableTupleIterator, materialize n-1 inputs, construct ForEach input
+        // tuple res = (key, input#1, input#2....input#n), the only missing value is input#n,
+        // we will get input#n one tuple a time, fill in res, feed to ForEach.
+        // After this block, we have the first tuple of input#n in hand (kept in variable it)
+        if (newKey)
+        {
+            // Put n-1 inputs into bags
+            dbs = new DataBag[numInputs];
+            for (int i = 0; i < numInputs - 1; i++) {
+                if (!readOnce[i]) {
+                    dbs[i] = bags[i];
+                } else {
+                    dbs[i] = useDefaultBag ? BagFactory.getInstance()
+                            .newDefaultBag()
+                    // In a very rare case if there is a POStream after this
+                    // POJoinPackage in the pipeline and is also blocking the
+                    // pipeline;
+                    // constructor argument should be 2 * numInputs. But for one
+                    // obscure
+                    // case we don't want to pay the penalty all the time.
+                            : new InternalCachedBag(numInputs - 1);
+                    dbs[i].addAll(bags[i]);
+                }
+            }
+
+            // For last bag, we always use NonSpillableBag.
+            dbs[lastBagIndex] = new NonSpillableDataBag((int)chunkSize);
+
+            lastBagIter = bags[lastBagIndex].iterator();
+
+            // If we don't have any tuple for input#n
+            // we do not need any further process, return EOP
+            if (!lastBagIter.hasNext()) {
+                // we will return at this point because we ought
+                // to be having a flatten on this last input
+                // and we have an empty bag which should result
+                // in this key being taken out of the output
+                newKey = true;
+                return eopResult;
+            }
+
+            res = mTupleFactory.newTuple(numInputs+1);
+            for (int i = 0; i < dbs.length; i++)
+                res.set(i+1,dbs[i]);
+
+            res.set(0,key);
+            // if we have an inner anywhere and the corresponding
+            // bag is empty, we can just return
+            for (int i = 0; i < dbs.length - 1; i++) {
+                if(inner[i]&&dbs[i].size()==0){
+                    detachInput();
+                    return eopResult;
+                }
+            }
+            newKey = false;
+        }
+
+        // Keep attaching input tuple to ForEach, until:
+        // 1. We can initialize ForEach.getNext();
+        // 2. There is no more input#n
+        while (lastBagIter.hasNext() || forEach.processingPlan) {
+            // if a previous call to foreach.getNext()
+            // has still not returned all output, process it
+            while (forEach.processingPlan) {
+                forEachResult = forEach.getNextTuple();
+                switch (forEachResult.returnStatus) {
+                case POStatus.STATUS_OK:
+                case POStatus.STATUS_ERR:
+                    return forEachResult;
+                case POStatus.STATUS_NULL:
+                    continue;
+                case POStatus.STATUS_EOP:
+                    break;
+                }
+            }
+
+            if (lastBagIter.hasNext()) {
+                // try setting up a bag of CHUNKSIZE OR
+                // the remainder of the bag of last input
+                // (if < CHUNKSIZE) to foreach
+                dbs[lastBagIndex].clear(); // clear last chunk
+                for (int i = 0; i < chunkSize && lastBagIter.hasNext(); i++) {
+                    it = lastBagIter.next();
+                    dbs[lastBagIndex].add(it);
+                }
+            } else {
+                detachInput();
+                return eopResult;
+            }
+
+            // Attach the input to forEach
+            forEach.attachInput(res);
+
+            // pull output tuple from ForEach
+            Result forEachResult = forEach.getNextTuple();
+            {
+                switch (forEachResult.returnStatus) {
+                case POStatus.STATUS_OK:
+                case POStatus.STATUS_ERR:
+                    return forEachResult;
+                case POStatus.STATUS_NULL:
+                    continue;
+                case POStatus.STATUS_EOP:
+                    break;
+                }
+            }
+        }
+        detachInput();
+        return eopResult;
+    }
+
+    @Override
+    void attachInput(Object key, DataBag[] bags, boolean[] readOnce)
+            throws ExecException {
+        super.attachInput(key, bags, readOnce);
+        this.newKey = true;
+    };
+
+    public List<PhysicalPlan> getInputPlans() {
+        return forEach.getInputPlans();
+    }
+
+    public void setInputPlans(List<PhysicalPlan> plans) {
+        forEach.setInputPlans(plans);
+    }
+
+    public void setToBeFlattened(List<Boolean> flattens) {
+        forEach.setToBeFlattened(flattens);
+    }
+
+    /**
+     * @return the forEach
+     */
+    public POOptimizedForEach getForEach() {
+        return forEach;
+    }
+
+    /**
+     * @param chunkSize - the chunk size for the biggest input
+     */
+    public void setChunkSize(long chunkSize) {
+        this.chunkSize = chunkSize;
+    }
+}