You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2013/11/28 09:56:34 UTC

svn commit: r1546314 [5/6] - in /pig/branches/tez: src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer...

Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java?rev=1546314&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java (added)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java Thu Nov 28 08:56:33 2013
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ *
+ */
+package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.util.Pair;
+
+/**
+ * This package operator is a specialization
+ * of POPackage operator used for the specific
+ * case of the order by query. See JIRA 802
+ * for more details.
+ */
+public class LitePackager extends Packager {
+
+    private static final long serialVersionUID = 1L;
+
+    public boolean[] getInner() {
+        return null;
+    }
+
+    public void setInner(boolean[] inner) {
+    }
+
+    /**
+     * Make a deep copy of this operator.
+     * @throws CloneNotSupportedException
+     */
+    @Override
+    public LitePackager clone() throws CloneNotSupportedException {
+        LitePackager clone = (LitePackager) super.clone();
+        clone.inner = null;
+        if (keyInfo != null) {
+            clone.keyInfo = new HashMap<Integer, Pair<Boolean, Map<Integer, Integer>>>();
+
+            for (Entry<Integer, Pair<Boolean, Map<Integer, Integer>>> entry : keyInfo
+                    .entrySet()) {
+                clone.keyInfo.put(entry.getKey(), entry.getValue());
+            }
+        }
+        return clone;
+    }
+
+    /**
+     * @return the distinct
+     */
+    @Override
+    public boolean isDistinct() {
+        return false;
+    }
+
+    /**
+     * @param distinct the distinct to set
+     */
+    @Override
+    public void setDistinct(boolean distinct) {
+    }
+
+    /**
+     * @return the isKeyTuple
+     */
+    public boolean getKeyTuple() {
+        return isKeyTuple;
+    }
+
+    /**
+     * @return the keyAsTuple
+     */
+    public Tuple getKeyAsTuple() {
+        return isKeyTuple ? (Tuple) key : null;
+    }
+
+    /**
+     * @return the key
+     */
+    public Object getKey() {
+        return key;
+    }
+
+    /**
+     * Similar to POPackage.getNext except that
+     * only one input is expected with index 0
+     * and ReadOnceBag is used instead of
+     * DefaultDataBag.
+     */
+    @Override
+    public Result getNext() throws ExecException {
+        Tuple res;
+
+        //Construct the output tuple by appending
+        //the key and all the above constructed bags
+        //and return it.
+        res = mTupleFactory.newTuple(numInputs+1);
+        res.set(0,key);
+        res.set(1, bags[0]);
+        detachInput();
+        Result r = new Result();
+        r.returnStatus = POStatus.STATUS_OK;
+        r.result = res;
+        return r;
+    }
+
+    /**
+     * Makes use of the superclass method, but this requires
+     * an additional parameter key passed by ReadOnceBag.
+     * key of this instance will be set to null in detachInput
+     * call, but an instance of ReadOnceBag may have the original
+     * key that it uses. Therefore this extra argument is taken
+     * to temporarily set it before the call to the superclass method
+     * and then restore it.
+     */
+    @Override
+    public Tuple getValueTuple(Object key, NullableTuple ntup, int index)
+            throws ExecException {
+        Object origKey = this.key;
+        this.key = key;
+        Tuple retTuple = super.getValueTuple(key, ntup, index);
+        this.key = origKey;
+        return retTuple;
+    }
+}
+

Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/MultiQueryPackager.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/MultiQueryPackager.java?rev=1546314&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/MultiQueryPackager.java (added)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/MultiQueryPackager.java Thu Nov 28 08:56:33 2013
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.pig.PigException;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.HDataType;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.NullableUnknownWritable;
+import org.apache.pig.impl.io.PigNullableWritable;
+
+/**
+ * The package operator that packages the globally rearranged tuples
+ * into output format as required by multi-query de-multiplexer.
+ * <p>
+ * This operator is used when merging multiple Map-Reduce splittees
+ * into a Map-only splitter during multi-query optimization.
+ * The package operators of the reduce plans of the splittees form an
+ * indexed package list inside this operator. When this operator
+ * receives an input, it extracts the index from the key and calls the
+ * corresponding package to get the output data.
+ * <p>
+ * Due to the recursive nature of multi-query optimization, this operator
+ * may be contained in another multi-query packager.
+ * <p>
+ * The successor of this operator must be a PODemux operator which
+ * knows how to consume the output of this operator.
+ */
+public class MultiQueryPackager extends Packager {
+
+    private static final long serialVersionUID = 1L;
+
+    private static int idxPart = 0x7F;
+
+    private List<Packager> packagers = new ArrayList<Packager>();
+
+    /**
+     * If the POLocalRearranges corresponding to the reduce plans in
+     * myPlans (the list of inner plans of the demux) have different key types
+     * then the MultiQueryOptimizer converts all the keys to be of type tuple
+     * by wrapping any non-tuple keys into Tuples (keys which are already tuples
+     * are left alone).
+     * The list below is a list of booleans indicating whether extra tuple wrapping
+     * was done for the key in the corresponding POLocalRearranges and if we need
+     * to "unwrap" the tuple to get to the key
+     */
+    private ArrayList<Boolean> isKeyWrapped = new ArrayList<Boolean>();
+
+    /*
+     * Indicating if all the inner plans have the same
+     * map key type. If not, the keys passed in are
+     * wrapped inside tuples and need to be extracted
+     * out during the reduce phase
+     */
+    private boolean sameMapKeyType = true;
+
+    /*
+     * Indicating if this operator is in a combiner.
+     * If not, this operator is in a reducer and the key
+     * values must first be extracted from the tuple-wrap
+     * before writing out to the disk
+     */
+    private boolean inCombiner = false;
+
+    transient private PigNullableWritable myKey;
+
+    /**
+     * Appends the specified package object to the end of
+     * the package list.
+     *
+     * @param pack package to be appended to the list
+     */
+    public void addPackager(Packager pkgr) {
+        packagers.add(pkgr);
+    }
+
+    /**
+     * Appends the specified package object to the end of
+     * the package list.
+     *
+     * @param pack package to be appended to the list
+     * @param mapKeyType the map key type associated with the package
+     */
+    public void addPackager(Packager pkgr, byte mapKeyType) {
+        packagers.add(pkgr);
+        // if mapKeyType is already a tuple, we will NOT
+        // be wrapping it in an extra tuple. If it is not
+        // a tuple, we will wrap into in a tuple
+        isKeyWrapped.add(mapKeyType == DataType.TUPLE ? false : true);
+    }
+
+    /**
+     * Returns the list of packages.
+     *
+     * @return the list of the packages
+     */
+    public List<Packager> getPackagers() {
+        return packagers;
+    }
+
+    /**
+     * Constructs the output tuple from the inputs.
+     * <p>
+     * The output is consumed by for the demultiplexer operator
+     * (PODemux) in the format (key, {bag of tuples}) where key
+     * is an indexed WritableComparable, not the wrapped value as a pig type.
+     */
+    @Override
+    public Result getNext() throws ExecException {
+
+        byte origIndex = myKey.getIndex();
+
+        int index = (int)origIndex;
+        index &= idxPart;
+
+        if (index >= packagers.size() || index < 0) {
+            int errCode = 2140;
+            String msg = "Invalid package index " + index
+                    + " should be in the range between 0 and "
+                    + packagers.size();
+            throw new ExecException(msg, errCode, PigException.BUG);
+        }
+
+        Packager pkgr = packagers.get(index);
+
+        // check to see if we need to unwrap the key. The keys may be
+        // wrapped inside a tuple by LocalRearrange operator when jobs
+        // with different map key types are merged
+        PigNullableWritable curKey = myKey;
+        if (!sameMapKeyType && !inCombiner && isKeyWrapped.get(index)) {
+            Tuple tup = (Tuple)myKey.getValueAsPigType();
+            curKey = HDataType.getWritableComparableTypes(tup.get(0),
+                    pkgr.getKeyType());
+            curKey.setIndex(origIndex);
+        }
+
+        pkgr.attachInput(curKey, bags, readOnce);
+
+        Result res = pkgr.getNext();
+        pkgr.detachInput();
+
+        Tuple tuple = (Tuple)res.result;
+
+        // the object present in the first field
+        // of the tuple above is the real data without
+        // index information - this is because the
+        // package above, extracts the real data out of
+        // the PigNullableWritable object - we are going to
+        // give this result tuple to a PODemux operator
+        // which needs a PigNullableWritable first field so
+        // it can figure out the index. Therefore we need
+        // to add index to the first field of the tuple.
+
+        Object obj = tuple.get(0);
+        if (obj instanceof PigNullableWritable) {
+            ((PigNullableWritable)obj).setIndex(origIndex);
+        }
+        else {
+            PigNullableWritable myObj = null;
+            if (obj == null) {
+                myObj = new NullableUnknownWritable();
+                myObj.setNull(true);
+            }
+            else {
+                myObj = HDataType.getWritableComparableTypes(obj, HDataType.findTypeFromNullableWritable(curKey));
+            }
+            myObj.setIndex(origIndex);
+            tuple.set(0, myObj);
+        }
+        // illustrator markup has been handled by "pack"
+        return res;
+    }
+
+    /**
+     * Returns the list of booleans that indicates if the
+     * key needs to unwrapped for the corresponding plan.
+     *
+     * @return the list of isKeyWrapped boolean values
+     */
+    public List<Boolean> getIsKeyWrappedList() {
+        return Collections.unmodifiableList(isKeyWrapped);
+    }
+
+    /**
+     * Adds a list of IsKeyWrapped boolean values
+     *
+     * @param lst the list of boolean values to add
+     */
+    public void addIsKeyWrappedList(List<Boolean> lst) {
+        for (Boolean b : lst) {
+            isKeyWrapped.add(b);
+        }
+    }
+
+    public void setInCombiner(boolean inCombiner) {
+        this.inCombiner = inCombiner;
+    }
+
+    public boolean isInCombiner() {
+        return inCombiner;
+    }
+
+    public void setSameMapKeyType(boolean sameMapKeyType) {
+        this.sameMapKeyType = sameMapKeyType;
+    }
+
+    public boolean isSameMapKeyType() {
+        return sameMapKeyType;
+    }
+
+}

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java?rev=1546314&r1=1546313&r2=1546314&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java Thu Nov 28 08:56:33 2013
@@ -26,22 +26,21 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.data.AccumulativeBag;
 import org.apache.pig.data.BagFactory;
-import org.apache.pig.data.InternalCachedBag;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
+import org.apache.pig.data.InternalCachedBag;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.io.NullableTuple;
 import org.apache.pig.impl.io.PigNullableWritable;
-import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.NodeIdGenerator;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.IdentityHashSet;
 import org.apache.pig.impl.util.Pair;
@@ -62,13 +61,8 @@ import org.apache.pig.pen.util.LineageTr
  * bags based on the index.
  */
 public class POPackage extends PhysicalOperator {
-    /**
-     *
-     */
-    private static final long serialVersionUID = 1L;
 
-
-    public static enum PackageType { GROUP, JOIN };
+    private static final long serialVersionUID = 1L;
 
     //The iterator of indexed Tuples
     //that is typically provided by
@@ -78,32 +72,11 @@ public class POPackage extends PhysicalO
     //The key being worked on
     Object key;
 
-    // marker to indicate if key is a tuple
-    protected boolean isKeyTuple = false;
-    // marker to indicate if the tuple key is compound in nature
-    protected boolean isKeyCompound = false;
-    // key as a Tuple object (if the key is a tuple)
-    protected Tuple keyAsTuple;
-
-    //key's type
-    byte keyType;
-
     //The number of inputs to this
     //co-group.  0 indicates a distinct, which means there will only be a
     //key, no value.
     int numInputs;
 
-    // If the attaching map-reduce plan use secondary sort key
-    boolean useSecondaryKey = false;
-
-    //Denotes if inner is specified
-    //on a particular input
-    boolean[] inner;
-
-    // flag to denote whether there is a distinct
-    // leading to this package
-    protected boolean distinct = false;
-
     // A mapping of input index to key information got from LORearrange
     // for that index. The Key information is a pair of boolean, Map.
     // The boolean indicates whether there is a lone project(*) in the
@@ -119,7 +92,9 @@ public class POPackage extends PhysicalO
 
     private boolean useDefaultBag = false;
 
-    private PackageType pkgType;
+    protected Packager pkgr;
+
+    private boolean[] readOnce;
 
     public POPackage(OperatorKey k) {
         this(k, -1, null);
@@ -134,16 +109,22 @@ public class POPackage extends PhysicalO
     }
 
     public POPackage(OperatorKey k, int rp, List<PhysicalOperator> inp) {
+        this(k, rp, inp, new Packager());
+    }
+
+    public POPackage(OperatorKey k, int rp, List<PhysicalOperator> inp,
+            Packager pkgr) {
         super(k, rp, inp);
         numInputs = -1;
         keyInfo = new HashMap<Integer, Pair<Boolean, Map<Integer, Integer>>>();
+        this.pkgr = pkgr;
     }
 
     @Override
     public String name() {
-        return getAliasString() + "Package" + "["
+        return getAliasString() + "Package" + "(" + pkgr.name() + ")" + "["
                 + DataType.findTypeName(resultType) + "]" + "{"
-                + DataType.findTypeName(keyType) + "}" + " - "
+                + DataType.findTypeName(pkgr.getKeyType()) + "}" + " - "
                 + mKey.toString();
     }
 
@@ -171,16 +152,8 @@ public class POPackage extends PhysicalO
     public void attachInput(PigNullableWritable k, Iterator<NullableTuple> inp) {
         try {
             tupIter = inp;
-            key = k.getValueAsPigType();
-            if (useSecondaryKey) {
-                key = ((Tuple)key).get(0);
-
-            }
-            if(isKeyTuple) {
-                // key is a tuple, cache the key as a
-                // tuple for use in the getNext()
-                keyAsTuple = (Tuple)key;
-            }
+            key = pkgr.getKey(k.getValueAsPigType());
+            inputAttached = true;
         } catch (Exception e) {
             throw new RuntimeException(
                     "Error attaching input for key " + k +
@@ -194,6 +167,7 @@ public class POPackage extends PhysicalO
     public void detachInput() {
         tupIter = null;
         key = null;
+        inputAttached = false;
     }
 
     public int getNumInps() {
@@ -202,14 +176,10 @@ public class POPackage extends PhysicalO
 
     public void setNumInps(int numInps) {
         this.numInputs = numInps;
-    }
-
-    public boolean[] getInner() {
-        return inner;
-    }
-
-    public void setInner(boolean[] inner) {
-        this.inner = inner;
+        pkgr.setNumInputs(numInps);
+        readOnce = new boolean[numInputs];
+        for (int i = 0; i < numInputs; i++)
+            readOnce[i] = false;
     }
 
     /**
@@ -219,25 +189,18 @@ public class POPackage extends PhysicalO
      */
     @Override
     public Result getNextTuple() throws ExecException {
-        Tuple res;
-
         if(firstTime){
             firstTime = false;
             if (PigMapReduce.sJobConfInternal.get() != null) {
-                String bagType = PigMapReduce.sJobConfInternal.get().get("pig.cachedbag.type");
+                String bagType = PigMapReduce.sJobConfInternal.get().get(
+                        "pig.cachedbag.type");
                 if (bagType != null && bagType.equalsIgnoreCase("default")) {
                     useDefaultBag = true;
                 }
             }
         }
-
-        if(distinct) {
-            // only set the key which has the whole
-            // tuple
-            res = mTupleFactory.newTuple(1);
-            res.set(0, key);
-        } else {
-            //Create numInputs bags
+        if (isInputAttached()) {
+            // Create numInputs bags
             DataBag[] dbs = null;
             dbs = new DataBag[numInputs];
 
@@ -253,20 +216,24 @@ public class POPackage extends PhysicalO
             } else {
                 // create bag to pull all tuples out of iterator
                 for (int i = 0; i < numInputs; i++) {
-                    dbs[i] = useDefaultBag ? BagFactory.getInstance().newDefaultBag()
+                    dbs[i] = useDefaultBag ? BagFactory.getInstance()
+                            .newDefaultBag()
                     // In a very rare case if there is a POStream after this
-                    // POPackage in the pipeline and is also blocking the pipeline;
-                    // constructor argument should be 2 * numInputs. But for one obscure
+                    // POPackage in the pipeline and is also blocking the
+                    // pipeline;
+                    // constructor argument should be 2 * numInputs. But for one
+                    // obscure
                     // case we don't want to pay the penalty all the time.
                             : new InternalCachedBag(numInputs);
                 }
-                //For each indexed tup in the inp, sort them
-                //into their corresponding bags based
-                //on the index
+                // For each indexed tup in the inp, sort them
+                // into their corresponding bags based
+                // on the index
                 while (tupIter.hasNext()) {
                     NullableTuple ntup = tupIter.next();
                     int index = ntup.getIndex();
-                    Tuple copy = getValueTuple(ntup, index);
+                    Tuple copy = pkgr.getValueTuple(key,
+                            ntup, index);
 
                     if (numInputs == 1) {
 
@@ -278,109 +245,30 @@ public class POPackage extends PhysicalO
                     } else {
                         dbs[index].add(copy);
                     }
-                    if(getReporter()!=null) {
+                    if (getReporter() != null) {
                         getReporter().progress();
                     }
                 }
             }
-
-            //Construct the output tuple by appending
-            //the key and all the above constructed bags
-            //and return it.
-            res = mTupleFactory.newTuple(numInputs+1);
-            res.set(0,key);
-            int i=-1;
-            for (DataBag bag : dbs) {
-                i++;
-                if(inner[i] && !isAccumulative()){
-                    if(bag.size()==0){
-                        detachInput();
-                        Result r = new Result();
-                        r.returnStatus = POStatus.STATUS_NULL;
-                        return r;
-                    }
-                }
-
-                res.set(i+1,bag);
-            }
+            // Construct the output tuple by appending
+            // the key and all the above constructed bags
+            // and return it.
+            pkgr.attachInput(key, dbs, readOnce);
+            detachInput();
         }
-        Result r = new Result();
-        r.returnStatus = POStatus.STATUS_OK;
-        if (!isAccumulative())
-            r.result = illustratorMarkup(null, res, 0);
-        else
-            r.result = res;
-        detachInput();
-        return r;
-    }
 
-    protected Tuple getValueTuple(NullableTuple ntup, int index) throws ExecException {
-     // Need to make a copy of the value, as hadoop uses the same ntup
-        // to represent each value.
-        Tuple val = (Tuple)ntup.getValueAsPigType();
-
-        Tuple copy = null;
-        // The "value (val)" that we just got may not
-        // be the complete "value". It may have some portions
-        // in the "key" (look in POLocalRearrange for more comments)
-        // If this is the case we need to stitch
-        // the "value" together.
-        Pair<Boolean, Map<Integer, Integer>> lrKeyInfo =
-            keyInfo.get(index);
-        boolean isProjectStar = lrKeyInfo.first;
-        Map<Integer, Integer> keyLookup = lrKeyInfo.second;
-        int keyLookupSize = keyLookup.size();
-
-        if( keyLookupSize > 0) {
-
-            // we have some fields of the "value" in the
-            // "key".
-            int finalValueSize = keyLookupSize + val.size();
-            copy = mTupleFactory.newTuple(finalValueSize);
-            int valIndex = 0; // an index for accessing elements from
-                              // the value (val) that we have currently
-            for(int i = 0; i < finalValueSize; i++) {
-                Integer keyIndex = keyLookup.get(i);
-                if(keyIndex == null) {
-                    // the field for this index is not in the
-                    // key - so just take it from the "value"
-                    // we were handed
-                    copy.set(i, val.get(valIndex));
-                    valIndex++;
-                } else {
-                    // the field for this index is in the key
-                    if(isKeyTuple && isKeyCompound) {
-                        // the key is a tuple, extract the
-                        // field out of the tuple
-                        copy.set(i, keyAsTuple.get(keyIndex));
-                    } else {
-                        copy.set(i, key);
-                    }
-                }
-            }
-            copy = illustratorMarkup2(val, copy);
-        } else if (isProjectStar) {
-
-            // the whole "value" is present in the "key"
-            copy = mTupleFactory.newTuple(keyAsTuple.getAll());
-            copy = illustratorMarkup2(keyAsTuple, copy);
-        } else {
-
-            // there is no field of the "value" in the
-            // "key" - so just make a copy of what we got
-            // as the "value"
-            copy = mTupleFactory.newTuple(val.getAll());
-            copy = illustratorMarkup2(val, copy);
-        }
-        return copy;
+        Result r = pkgr.getNext();
+        Tuple packedTup = (Tuple) r.result;
+        packedTup = illustratorMarkup(null, packedTup, 0);
+        return r;
     }
 
-    public byte getKeyType() {
-        return keyType;
+    public Packager getPkgr() {
+        return pkgr;
     }
 
-    public void setKeyType(byte keyType) {
-        this.keyType = keyType;
+    public void setPkgr(Packager pkgr) {
+        this.pkgr = pkgr;
     }
 
     /**
@@ -393,74 +281,11 @@ public class POPackage extends PhysicalO
         clone.mKey = new OperatorKey(mKey.scope, NodeIdGenerator.getGenerator().getNextNodeId(mKey.scope));
         clone.requestedParallelism = requestedParallelism;
         clone.resultType = resultType;
-        clone.keyType = keyType;
         clone.numInputs = numInputs;
-        if (inner!=null)
-        {
-            clone.inner = new boolean[inner.length];
-            for (int i = 0; i < inner.length; i++) {
-                clone.inner[i] = inner[i];
-            }
-        }
-        else
-            clone.inner = null;
+        clone.pkgr = (Packager) this.pkgr.clone();
         return clone;
     }
 
-    /**
-     * @param keyInfo the keyInfo to set
-     */
-    public void setKeyInfo(Map<Integer, Pair<Boolean, Map<Integer, Integer>>> keyInfo) {
-        this.keyInfo = keyInfo;
-    }
-
-    /**
-     * @param keyTuple the keyTuple to set
-     */
-    public void setKeyTuple(boolean keyTuple) {
-        this.isKeyTuple = keyTuple;
-    }
-
-    /**
-     * @param keyCompound the keyCompound to set
-     */
-    public void setKeyCompound(boolean keyCompound) {
-        this.isKeyCompound = keyCompound;
-    }
-
-    /**
-     * @return the keyInfo
-     */
-    public Map<Integer, Pair<Boolean, Map<Integer, Integer>>> getKeyInfo() {
-        return keyInfo;
-    }
-
-    /**
-     * @return the distinct
-     */
-    public boolean isDistinct() {
-        return distinct;
-    }
-
-    /**
-     * @param distinct the distinct to set
-     */
-    public void setDistinct(boolean distinct) {
-        this.distinct = distinct;
-    }
-
-    public void setUseSecondaryKey(boolean useSecondaryKey) {
-        this.useSecondaryKey = useSecondaryKey;
-    }
-
-    public void setPackageType(PackageType type) {
-        this.pkgType = type;
-    }
-
-    public PackageType getPackageType() {
-        return pkgType;
-    }
-
     class POPackageTupleBuffer implements AccumulativeTupleBuffer {
         private List<Tuple>[] bags;
         private Iterator<NullableTuple> iter;
@@ -499,18 +324,17 @@ public class POPackage extends PhysicalO
             key = currKey;
             for(int i=0; i<batchSize; i++) {
                 if (iter.hasNext()) {
-                     NullableTuple ntup = iter.next();
-                     int index = ntup.getIndex();
-                     Tuple copy = getValueTuple(ntup, index);
-                     if (numInputs == 1) {
-
-                            // this is for multi-query merge where
-                            // the numInputs is always 1, but the index
-                            // (the position of the inner plan in the
-                            // enclosed operator) may not be 1.
-                            bags[0].add(copy);
+                    NullableTuple ntup = iter.next();
+                    int index = ntup.getIndex();
+                    Tuple copy = pkgr.getValueTuple(key, ntup, index);
+                    if (numInputs == 1) {
+                        // this is for multi-query merge where
+                         // the numInputs is always 1, but the index
+                        // (the position of the inner plan in the
+                        // enclosed operator) may not be 1.
+                        bags[0].add(copy);
                      } else {
-                            bags[index].add(copy);
+                        bags[index].add(copy);
                      }
                 }else{
                     break;
@@ -532,76 +356,76 @@ public class POPackage extends PhysicalO
         public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
             return POPackage.this.illustratorMarkup(in, out, eqClassIndex);
         }
-       };
+    };
+
+    public Tuple illustratorMarkup2(Object in, Object out) {
+       if(illustrator != null) {
+           ExampleTuple tOut = new ExampleTuple((Tuple) out);
+           illustrator.getLineage().insert(tOut);
+           tOut.synthetic = ((ExampleTuple) in).synthetic;
+           illustrator.getLineage().union(tOut, (Tuple) in);
+           return tOut;
+       } else
+           return (Tuple) out;
+    }
 
-       private Tuple illustratorMarkup2(Object in, Object out) {
-           if(illustrator != null) {
-               ExampleTuple tOut = new ExampleTuple((Tuple) out);
-               illustrator.getLineage().insert(tOut);
-               tOut.synthetic = ((ExampleTuple) in).synthetic;
-               illustrator.getLineage().union(tOut, (Tuple) in);
-               return tOut;
-           } else
-               return (Tuple) out;
-       }
-
-       @Override
-       public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
-           if(illustrator != null) {
-               ExampleTuple tOut = new ExampleTuple((Tuple) out);
-               LineageTracer lineageTracer = illustrator.getLineage();
-               lineageTracer.insert(tOut);
-               Tuple tmp;
-               boolean synthetic = false;
-               if (illustrator.getEquivalenceClasses() == null) {
-                   LinkedList<IdentityHashSet<Tuple>> equivalenceClasses = new LinkedList<IdentityHashSet<Tuple>>();
-                   for (int i = 0; i < numInputs; ++i) {
-                       IdentityHashSet<Tuple> equivalenceClass = new IdentityHashSet<Tuple>();
-                       equivalenceClasses.add(equivalenceClass);
-                   }
-                   illustrator.setEquivalenceClasses(equivalenceClasses, this);
-               }
-
-               if (distinct) {
-                   int count;
-                   for (count = 0; tupIter.hasNext(); ++count) {
-                       NullableTuple ntp = tupIter.next();
-                       tmp = (Tuple) ntp.getValueAsPigType();
-                       if (!tmp.equals(tOut))
-                           lineageTracer.union(tOut, tmp);
-                   }
-                   if (count > 1) // only non-distinct tuples are inserted into the equivalence class
-                       illustrator.getEquivalenceClasses().get(eqClassIndex).add(tOut);
-                   illustrator.addData((Tuple) tOut);
-                   return (Tuple) tOut;
-               }
-               boolean outInEqClass = true;
-               try {
-                   for (int i = 1; i < numInputs+1; i++)
-                   {
-                       DataBag dbs = (DataBag) ((Tuple) out).get(i);
-                       Iterator<Tuple> iter = dbs.iterator();
-                       if (dbs.size() <= 1 && outInEqClass) // all inputs have >= 2 records
-                           outInEqClass = false;
-                       while (iter.hasNext()) {
-                           tmp = iter.next();
-                           // any of synthetic data in bags causes the output tuple to be synthetic
-                           if (!synthetic && ((ExampleTuple)tmp).synthetic)
-                               synthetic = true;
-                           lineageTracer.union(tOut, tmp);
-                       }
-                   }
-               } catch (ExecException e) {
-                 // TODO better exception handling
-                 throw new RuntimeException("Illustrator exception :"+e.getMessage());
-               }
-               if (outInEqClass)
-                   illustrator.getEquivalenceClasses().get(eqClassIndex).add(tOut);
-               tOut.synthetic = synthetic;
-               illustrator.addData((Tuple) tOut);
-               return tOut;
-           } else
-               return (Tuple) out;
-       }
+    @Override
+    public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+        if (illustrator != null) {
+            ExampleTuple tOut = new ExampleTuple((Tuple) out);
+            LineageTracer lineageTracer = illustrator.getLineage();
+            lineageTracer.insert(tOut);
+            Tuple tmp;
+            boolean synthetic = false;
+            if (illustrator.getEquivalenceClasses() == null) {
+                LinkedList<IdentityHashSet<Tuple>> equivalenceClasses = new LinkedList<IdentityHashSet<Tuple>>();
+                for (int i = 0; i < numInputs; ++i) {
+                    IdentityHashSet<Tuple> equivalenceClass = new IdentityHashSet<Tuple>();
+                    equivalenceClasses.add(equivalenceClass);
+                }
+                illustrator.setEquivalenceClasses(equivalenceClasses, this);
+            }
+
+            if (pkgr.isDistinct()) {
+                int count;
+                for (count = 0; tupIter.hasNext(); ++count) {
+                    NullableTuple ntp = tupIter.next();
+                    tmp = (Tuple) ntp.getValueAsPigType();
+                    if (!tmp.equals(tOut))
+                        lineageTracer.union(tOut, tmp);
+                }
+                if (count > 1) // only non-distinct tuples are inserted into the equivalence class
+                    illustrator.getEquivalenceClasses().get(eqClassIndex).add(tOut);
+                illustrator.addData((Tuple) tOut);
+                return (Tuple) tOut;
+            }
+            boolean outInEqClass = true;
+            try {
+                for (int i = 1; i < numInputs + 1; i++) {
+                    DataBag dbs = (DataBag) ((Tuple) out).get(i);
+                    Iterator<Tuple> iter = dbs.iterator();
+                    if (dbs.size() <= 1 && outInEqClass) // all inputs have >= 2 records
+                        outInEqClass = false;
+                    while (iter.hasNext()) {
+                        tmp = iter.next();
+                        // any of synthetic data in bags causes the output tuple to be synthetic
+                        if (!synthetic && ((ExampleTuple) tmp).synthetic)
+                            synthetic = true;
+                        lineageTracer.union(tOut, tmp);
+                    }
+                }
+            } catch (ExecException e) {
+                // TODO better exception handling
+                throw new RuntimeException("Illustrator exception :"
+                        + e.getMessage());
+            }
+            if (outInEqClass)
+                illustrator.getEquivalenceClasses().get(eqClassIndex).add(tOut);
+            tOut.synthetic = synthetic;
+            illustrator.addData((Tuple) tOut);
+            return tOut;
+        } else
+            return (Tuple) out;
+    }
 
 }

Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java?rev=1546314&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java (added)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java Thu Nov 28 08:56:33 2013
@@ -0,0 +1,293 @@
+package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.util.Pair;
+
+public class Packager implements Serializable, Cloneable {
+
+    private static final long serialVersionUID = 1L;
+
+    protected boolean[] readOnce;
+
+    protected DataBag[] bags;
+
+    public static enum PackageType {
+        GROUP, JOIN
+    };
+
+    // The key being worked on
+    Object key;
+
+    // marker to indicate if key is a tuple
+    protected boolean isKeyTuple = false;
+    // marker to indicate if the tuple key is compound in nature
+    protected boolean isKeyCompound = false;
+
+    // key's type
+    byte keyType;
+
+    // The number of inputs to this
+    // co-group. 0 indicates a distinct, which means there will only be a
+    // key, no value.
+    int numInputs;
+
+    // If the attaching map-reduce plan use secondary sort key
+    boolean useSecondaryKey = false;
+
+    // Denotes if inner is specified
+    // on a particular input
+    boolean[] inner;
+
+    // flag to denote whether there is a distinct
+    // leading to this package
+    protected boolean distinct = false;
+
+    // A mapping of input index to key information got from LORearrange
+    // for that index. The Key information is a pair of boolean, Map.
+    // The boolean indicates whether there is a lone project(*) in the
+    // cogroup by. If not, the Map has a mapping of column numbers in the
+    // "value" to column numbers in the "key" which contain the fields in
+    // the "value"
+    protected Map<Integer, Pair<Boolean, Map<Integer, Integer>>> keyInfo;
+
+    private PackageType pkgType;
+
+    protected static final BagFactory mBagFactory = BagFactory.getInstance();
+    protected static final TupleFactory mTupleFactory = TupleFactory
+            .getInstance();
+
+    Object getKey(Object key) throws ExecException {
+        if (useSecondaryKey) {
+            return ((Tuple) key).get(0);
+        } else {
+            return key;
+        }
+    }
+
+    void attachInput(Object key, DataBag[] bags, boolean[] readOnce)
+            throws ExecException {
+        this.key = key;
+        this.bags = bags;
+        this.readOnce = readOnce;
+    }
+
+    public Result getNext() throws ExecException {
+        Tuple res;
+
+        if (isDistinct()) {
+            // only set the key which has the whole
+            // tuple
+            res = mTupleFactory.newTuple(1);
+            res.set(0, key);
+        } else {
+            // Construct the output tuple by appending
+            // the key and all the above constructed bags
+            // and return it.
+            res = mTupleFactory.newTuple(numInputs + 1);
+            res.set(0, key);
+            int i = -1;
+            for (DataBag bag : bags) {
+                i++;
+                if (inner[i]) {
+                    if (bag.size() == 0) {
+                        detachInput();
+                        Result r = new Result();
+                        r.returnStatus = POStatus.STATUS_NULL;
+                        return r;
+                    }
+                }
+
+                if (!readOnce[i]) {
+                    res.set(i + 1, bag);
+                } else {
+                    DataBag readBag = mBagFactory.newDefaultBag();
+                    readBag.addAll(bag);
+                    res.set(i + 1, readBag);
+                }
+            }
+        }
+        Result r = new Result();
+        r.returnStatus = POStatus.STATUS_OK;
+        // if (!isAccumulative())
+        // r.result = illustratorMarkup(null, res, 0);
+        // else
+        r.result = res;
+        return r;
+    }
+
+    void detachInput() {
+        key = null;
+        bags = null;
+    }
+
+    protected Tuple getValueTuple(Object key, NullableTuple ntup, int index)
+            throws ExecException {
+        // Need to make a copy of the value, as hadoop uses the same ntup
+        // to represent each value.
+        Tuple val = (Tuple) ntup.getValueAsPigType();
+
+        Tuple copy = null;
+        // The "value (val)" that we just got may not
+        // be the complete "value". It may have some portions
+        // in the "key" (look in POLocalRearrange for more comments)
+        // If this is the case we need to stitch
+        // the "value" together.
+        Pair<Boolean, Map<Integer, Integer>> lrKeyInfo = keyInfo.get(index);
+        boolean isProjectStar = lrKeyInfo.first;
+        Map<Integer, Integer> keyLookup = lrKeyInfo.second;
+        int keyLookupSize = keyLookup.size();
+
+        if (keyLookupSize > 0) {
+
+            // we have some fields of the "value" in the
+            // "key".
+            int finalValueSize = keyLookupSize + val.size();
+            copy = mTupleFactory.newTuple(finalValueSize);
+            int valIndex = 0; // an index for accessing elements from
+                              // the value (val) that we have currently
+            for (int i = 0; i < finalValueSize; i++) {
+                Integer keyIndex = keyLookup.get(i);
+                if (keyIndex == null) {
+                    // the field for this index is not in the
+                    // key - so just take it from the "value"
+                    // we were handed
+                    copy.set(i, val.get(valIndex));
+                    valIndex++;
+                } else {
+                    // the field for this index is in the key
+                    if (isKeyTuple && isKeyCompound) {
+                        // the key is a tuple, extract the
+                        // field out of the tuple
+                        copy.set(i, ((Tuple) key).get(keyIndex));
+                    } else {
+                        copy.set(i, key);
+                    }
+                }
+            }
+            // copy = illustratorMarkup2(val, copy);
+        } else if (isProjectStar) {
+
+            // the whole "value" is present in the "key"
+            copy = mTupleFactory.newTuple(((Tuple) key).getAll());
+            // copy = illustratorMarkup2((Tuple) key, copy);
+        } else {
+
+            // there is no field of the "value" in the
+            // "key" - so just make a copy of what we got
+            // as the "value"
+            copy = mTupleFactory.newTuple(val.getAll());
+            // copy = illustratorMarkup2(val, copy);
+        }
+        return copy;
+    }
+
+    public byte getKeyType() {
+        return keyType;
+    }
+
+    public void setKeyType(byte keyType) {
+        this.keyType = keyType;
+    }
+
+    public boolean[] getInner() {
+        return inner;
+    }
+
+    public void setInner(boolean[] inner) {
+        this.inner = inner;
+    }
+
+    /**
+     * @param keyInfo the keyInfo to set
+     */
+    public void setKeyInfo(
+            Map<Integer, Pair<Boolean, Map<Integer, Integer>>> keyInfo) {
+        this.keyInfo = keyInfo;
+    }
+
+    /**
+     * @param keyTuple the keyTuple to set
+     */
+    public void setKeyTuple(boolean keyTuple) {
+        this.isKeyTuple = keyTuple;
+    }
+
+    /**
+     * @param keyCompound the keyCompound to set
+     */
+    public void setKeyCompound(boolean keyCompound) {
+        this.isKeyCompound = keyCompound;
+    }
+
+    /**
+     * @return the keyInfo
+     */
+    public Map<Integer, Pair<Boolean, Map<Integer, Integer>>> getKeyInfo() {
+        return keyInfo;
+    }
+
+    /**
+     * @return the distinct
+     */
+    public boolean isDistinct() {
+        return distinct;
+    }
+
+    /**
+     * @param distinct the distinct to set
+     */
+    public void setDistinct(boolean distinct) {
+        this.distinct = distinct;
+    }
+
+    public void setUseSecondaryKey(boolean useSecondaryKey) {
+        this.useSecondaryKey = useSecondaryKey;
+    }
+
+    public void setPackageType(PackageType type) {
+        this.pkgType = type;
+    }
+
+    public PackageType getPackageType() {
+        return pkgType;
+    }
+
+    public int getNumInputs() {
+        return numInputs;
+    }
+
+    public void setNumInputs(int numInputs) {
+        this.numInputs = numInputs;
+    }
+
+    public Packager clone() throws CloneNotSupportedException {
+        Packager clone = (Packager) super.clone();
+        clone.setNumInputs(numInputs);
+        clone.setPackageType(pkgType);
+        clone.setDistinct(distinct);
+        clone.setInner(inner);
+        if (keyInfo != null)
+            clone.setKeyInfo(new HashMap<Integer, Pair<Boolean, Map<Integer, Integer>>>(
+                    keyInfo));
+        clone.setKeyCompound(isKeyCompound);
+        clone.setKeyTuple(isKeyTuple);
+        clone.setKeyType(keyType);
+        clone.setUseSecondaryKey(useSecondaryKey);
+        return clone;
+    }
+
+    public String name() {
+        return this.getClass().getSimpleName();
+    }
+}

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java?rev=1546314&r1=1546313&r2=1546314&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java Thu Nov 28 08:56:33 2013
@@ -51,7 +51,6 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCombinerPackage;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCross;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODemux;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PODistinct;
@@ -59,13 +58,11 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMultiQueryPackage;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PONative;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POOptimizedForEach;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
@@ -209,20 +206,6 @@ public class PlanHelper {
         }
 
         @Override
-        public void visitCombinerPackage(POCombinerPackage pkg)
-                throws VisitorException {
-            super.visitCombinerPackage(pkg);
-            visit(pkg);
-        }
-
-        @Override
-        public void visitMultiQueryPackage(POMultiQueryPackage pkg)
-                throws VisitorException {
-            super.visitMultiQueryPackage(pkg);
-            visit(pkg);
-        }
-
-        @Override
         public void visitPOForEach(POForEach nfe) throws VisitorException {
             super.visitPOForEach(nfe);
             visit(nfe);
@@ -400,13 +383,6 @@ public class PlanHelper {
         }
 
         @Override
-        public void visitJoinPackage(POJoinPackage joinPackage)
-                throws VisitorException {
-            super.visitJoinPackage(joinPackage);
-            visit(joinPackage);
-        }
-
-        @Override
         public void visitCast(POCast cast) {
             super.visitCast(cast);
             visit(cast);
@@ -449,7 +425,6 @@ public class PlanHelper {
             visit(stream);
         }
 
-        @Override
         public void visitSkewedJoin(POSkewedJoin sk) throws VisitorException {
             super.visitSkewedJoin(sk);
             visit(sk);

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java?rev=1546314&r1=1546313&r2=1546314&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java Thu Nov 28 08:56:33 2013
@@ -39,6 +39,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.LitePackager;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCross;
@@ -54,8 +55,6 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PONative;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackageLite;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage.PackageType;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
@@ -63,6 +62,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager.PackageType;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.PigContext;
@@ -160,7 +160,7 @@ public class TezCompiler extends PhyPlan
     public TezOperPlan getTezPlan() {
         return tezPlan;
     }
-    
+
     // Segment a single DAG into a DAG graph
     public TezPlanContainer getPlanContainer() throws PlanException {
         TezPlanContainer tezPlanContainer = new TezPlanContainer(pigContext);
@@ -345,7 +345,7 @@ public class TezCompiler extends PhyPlan
         tezOp.plan.addAsLeaf(op);
         curTezOp = tezOp;
     }
-    
+
     private void connect(TezOperPlan plan, TezOperator from, TezOperator to) throws PlanException {
         plan.connect(from, to);
         // Add edge descriptors to old and new operators
@@ -504,7 +504,7 @@ public class TezCompiler extends PhyPlan
             blocking();
 
             POPackage pkg = getPackage();
-            pkg.setDistinct(true);
+            pkg.getPkgr().setDistinct(true);
             curTezOp.plan.add(pkg);
 
             POProject project = new POProject(new OperatorKey(scope, nig.getNextNodeId(scope)));
@@ -659,9 +659,9 @@ public class TezCompiler extends PhyPlan
         try{
             nonBlocking(op);
             phyToTezOpMap.put(op, curTezOp);
-            if (op.getPackageType() == PackageType.JOIN) {
+            if (op.getPkgr().getPackageType() == PackageType.JOIN) {
                 curTezOp.markRegularJoin();
-            } else if (op.getPackageType() == PackageType.GROUP) {
+            } else if (op.getPkgr().getPackageType() == PackageType.GROUP) {
                 if (op.getNumInps() == 1) {
                     curTezOp.markGroupBy();
                 } else if (op.getNumInps() > 1) {
@@ -716,7 +716,7 @@ public class TezCompiler extends PhyPlan
         return new FileSpec(FileLocalizer.getTemporaryPath(pigContext).toString(),
                 new FuncSpec(Utils.getTmpFileCompressorName(pigContext)));
     }
-    
+
     private POStore getStore(){
         POStore st = new POStore(new OperatorKey(scope,nig.getNextNodeId(scope)));
         // mark store as tmp store. These could be removed by the
@@ -724,7 +724,7 @@ public class TezCompiler extends PhyPlan
         st.setIsTmpStore(true);
         return st;
     }
-    
+
     /**
      * Force an end to the current vertex with a store into a temporary
      * file.
@@ -750,19 +750,19 @@ public class TezCompiler extends PhyPlan
         }
         return oper;
     }
-    
+
     private Pair<TezOperator[],Integer> getQuantileJobs(
             POSort inpSort,
             TezOperator prevJob,
             FileSpec lFile,
             FileSpec quantFile,
             int rp) throws PlanException, VisitorException {
-        
+
         POSort sort = new POSort(inpSort.getOperatorKey(), inpSort
                 .getRequestedParallelism(), null, inpSort.getSortPlans(),
                 inpSort.getMAscCols(), inpSort.getMSortFunc());
         sort.addOriginalLocation(inpSort.getAlias(), inpSort.getOriginalLocations());
-        
+
         // Turn the asc/desc array into an array of strings so that we can pass it
         // to the FindQuantiles function.
         List<Boolean> ascCols = inpSort.getMAscCols();
@@ -780,10 +780,10 @@ public class TezCompiler extends PhyPlan
                 ctorArgs[j+1] = ascs[j];
             }
         }
-        
+
         return getSamplingJobs(sort, prevJob, null, lFile, quantFile, rp, null, FindQuantiles.class.getName(), ctorArgs, RandomSampleLoader.class.getName());
     }
-    
+
     /**
      * Create a sampling job to collect statistics by sampling an input file. The sequence of operations is as
      * following:
@@ -794,7 +794,7 @@ public class TezCompiler extends PhyPlan
      * <li>Sorting the bag </li>
      * <li>Invoke UDF with the number of reducers and the sorted bag.</li>
      * <li>Data generated by UDF is stored into a file.</li>
-     * 
+     *
      * @param sort  the POSort operator used to sort the bag
      * @param prevJob  previous job of current sampling job
      * @param transformPlans  PhysicalPlans to transform input samples
@@ -809,44 +809,43 @@ public class TezCompiler extends PhyPlan
      * @throws PlanException
      * @throws VisitorException
      */
-    @SuppressWarnings("deprecation")
     private Pair<TezOperator[],Integer> getSamplingJobs(POSort sort, TezOperator prevJob, List<PhysicalPlan> transformPlans,
-            FileSpec lFile, FileSpec sampleFile, int rp, List<PhysicalPlan> sortKeyPlans, 
+            FileSpec lFile, FileSpec sampleFile, int rp, List<PhysicalPlan> sortKeyPlans,
             String udfClassName, String[] udfArgs, String sampleLdrClassName ) throws PlanException, VisitorException {
-        
+
         String[] rslargs = new String[2];
-        // SampleLoader expects string version of FuncSpec 
+        // SampleLoader expects string version of FuncSpec
         // as its first constructor argument.
-        
+
         rslargs[0] = (new FuncSpec(Utils.getTmpFileCompressorName(pigContext))).toString();
-        
+
         rslargs[1] = "100"; // The value is calculated based on the file size for skewed join
         FileSpec quantLdFilName = new FileSpec(lFile.getFileName(),
                 new FuncSpec(sampleLdrClassName, rslargs));
-        
+
         TezOperator[] opers = new TezOperator[2];
-        
+
         TezOperator oper1 = startNew(quantLdFilName, prevJob);
         opers[0] = oper1;
-       
+
         // TODO: Review sort udf
 //        if(sort.isUDFComparatorUsed) {
 //            mro.UDFs.add(sort.getMSortFunc().getFuncSpec().toString());
 //            curMROp.isUDFComparatorUsed = true;
-//        }        
-    
-        List<Boolean> flat1 = new ArrayList<Boolean>();         
+//        }
+
+        List<Boolean> flat1 = new ArrayList<Boolean>();
         List<PhysicalPlan> eps1 = new ArrayList<PhysicalPlan>();
-        
+
         // if transform plans are not specified, project the columns of sorting keys
-        if (transformPlans == null) {           
+        if (transformPlans == null) {
             Pair<POProject, Byte>[] sortProjs = null;
             try{
                 sortProjs = getSortCols(sort.getSortPlans());
             }catch(Exception e) {
                 throw new RuntimeException(e);
             }
-            // Set up the projections of the key columns 
+            // Set up the projections of the key columns
             if (sortProjs == null) {
                 PhysicalPlan ep = new PhysicalPlan();
                 POProject prj = new POProject(new OperatorKey(scope,
@@ -860,7 +859,7 @@ public class TezCompiler extends PhyPlan
             } else {
                 for (Pair<POProject, Byte> sortProj : sortProjs) {
                     // Check for proj being null, null is used by getSortCols for a non POProject
-                    // operator. Since Order by does not allow expression operators, 
+                    // operator. Since Order by does not allow expression operators,
                     //it should never be set to null
                     if(sortProj == null){
                         int errCode = 2174;
@@ -889,10 +888,10 @@ public class TezCompiler extends PhyPlan
             }
         }
 
-        // This foreach will pick the sort key columns from the RandomSampleLoader output 
+        // This foreach will pick the sort key columns from the RandomSampleLoader output
         POForEach nfe1 = new POForEach(new OperatorKey(scope,nig.getNextNodeId(scope)),-1,eps1,flat1);
         oper1.plan.addAsLeaf(nfe1);
-        
+
         // Now set up a POLocalRearrange which has "all" as the key and the output of the
         // foreach will be the "value" out of POLocalRearrange
         PhysicalPlan ep1 = new PhysicalPlan();
@@ -900,10 +899,10 @@ public class TezCompiler extends PhyPlan
         ce.setValue("all");
         ce.setResultType(DataType.CHARARRAY);
         ep1.add(ce);
-        
+
         List<PhysicalPlan> eps = new ArrayList<PhysicalPlan>();
         eps.add(ep1);
-        
+
         POLocalRearrange lr = new POLocalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)));
         try {
             lr.setIndex(0);
@@ -918,46 +917,46 @@ public class TezCompiler extends PhyPlan
         lr.addOriginalLocation(sort.getAlias(), sort.getOriginalLocations());
         oper1.plan.add(lr);
         oper1.plan.connect(nfe1, lr);
-        
+
         oper1.setClosed(true);
-        
+
         TezOperator oper2 = getTezOp();
         opers[1] = oper2;
         tezPlan.add(oper2);
         connect(tezPlan, oper1, oper2);
-        
+
         POPackage pkg = new POPackage(new OperatorKey(scope,nig.getNextNodeId(scope)));
-        pkg.setKeyType(DataType.CHARARRAY);
+        pkg.getPkgr().setKeyType(DataType.CHARARRAY);
         pkg.setNumInps(1);
-        boolean[] inner = {false}; 
-        pkg.setInner(inner);
+        boolean[] inner = {false};
+        pkg.getPkgr().setInner(inner);
         oper2.plan.add(pkg);
-        
+
         // Lets start building the plan which will have the sort
         // for the foreach
         PhysicalPlan fe2Plan = new PhysicalPlan();
-        // Top level project which just projects the tuple which is coming 
+        // Top level project which just projects the tuple which is coming
         // from the foreach after the package
         POProject topPrj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
         topPrj.setColumn(1);
         topPrj.setResultType(DataType.BAG);
         topPrj.setOverloaded(true);
         fe2Plan.add(topPrj);
-        
+
         // the projections which will form sort plans
-        List<PhysicalPlan> nesSortPlanLst = new ArrayList<PhysicalPlan>();             
+        List<PhysicalPlan> nesSortPlanLst = new ArrayList<PhysicalPlan>();
         if (sortKeyPlans != null) {
-            for(int i=0; i<sortKeyPlans.size(); i++) {          
-                nesSortPlanLst.add(sortKeyPlans.get(i));            
+            for(int i=0; i<sortKeyPlans.size(); i++) {
+                nesSortPlanLst.add(sortKeyPlans.get(i));
             }
-        }else{   
+        }else{
             Pair<POProject, Byte>[] sortProjs = null;
             try{
                 sortProjs = getSortCols(sort.getSortPlans());
             }catch(Exception e) {
                 throw new RuntimeException(e);
             }
-            // Set up the projections of the key columns 
+            // Set up the projections of the key columns
             if (sortProjs == null) {
                 PhysicalPlan ep = new PhysicalPlan();
                 POProject prj = new POProject(new OperatorKey(scope,
@@ -971,7 +970,7 @@ public class TezCompiler extends PhyPlan
                 for (int i=0; i<sortProjs.length; i++) {
                     POProject prj =
                         new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
-                    
+
                     prj.setResultType(sortProjs[i].second);
                     if(sortProjs[i].first != null && sortProjs[i].first.isProjectToEnd()){
                         if(i != sortProjs.length -1){
@@ -991,14 +990,14 @@ public class TezCompiler extends PhyPlan
                     ep.add(prj);
                     nesSortPlanLst.add(ep);
                 }
-            }                       
+            }
         }
-        
+
         sort.setSortPlans(nesSortPlanLst);
         sort.setResultType(DataType.BAG);
         fe2Plan.add(sort);
         fe2Plan.connect(topPrj, sort);
-        
+
         // The plan which will have a constant representing the
         // degree of parallelism for the final order by map-reduce job
         // this will either come from a "order by parallel x" in the script
@@ -1007,66 +1006,66 @@ public class TezCompiler extends PhyPlan
         PhysicalPlan rpep = new PhysicalPlan();
         ConstantExpression rpce = new ConstantExpression(new OperatorKey(scope,nig.getNextNodeId(scope)));
         rpce.setRequestedParallelism(rp);
-        
+
         // We temporarily set it to rp and will adjust it at runtime, because the final degree of parallelism
         // is unknown until we are ready to submit it. See PIG-2779.
         rpce.setValue(rp);
-        
+
         rpce.setResultType(DataType.INTEGER);
         rpep.add(rpce);
-        
+
         List<PhysicalPlan> genEps = new ArrayList<PhysicalPlan>();
         genEps.add(rpep);
         genEps.add(fe2Plan);
-        
+
         List<Boolean> flattened2 = new ArrayList<Boolean>();
         flattened2.add(false);
         flattened2.add(false);
-        
+
         POForEach nfe2 = new POForEach(new OperatorKey(scope,nig.getNextNodeId(scope)),-1, genEps, flattened2);
         oper2.plan.add(nfe2);
         oper2.plan.connect(pkg, nfe2);
-        
+
         // Let's connect the output from the foreach containing
         // number of quantiles and the sorted bag of samples to
         // another foreach with the FindQuantiles udf. The input
-        // to the FindQuantiles udf is a project(*) which takes the 
+        // to the FindQuantiles udf is a project(*) which takes the
         // foreach input and gives it to the udf
         PhysicalPlan ep4 = new PhysicalPlan();
         POProject prjStar4 = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
         prjStar4.setResultType(DataType.TUPLE);
         prjStar4.setStar(true);
         ep4.add(prjStar4);
-        
+
         List<PhysicalOperator> ufInps = new ArrayList<PhysicalOperator>();
         ufInps.add(prjStar4);
-      
-        POUserFunc uf = new POUserFunc(new OperatorKey(scope,nig.getNextNodeId(scope)), -1, ufInps, 
+
+        POUserFunc uf = new POUserFunc(new OperatorKey(scope,nig.getNextNodeId(scope)), -1, ufInps,
             new FuncSpec(udfClassName, udfArgs));
         ep4.add(uf);
         ep4.connect(prjStar4, uf);
-        
+
         List<PhysicalPlan> ep4s = new ArrayList<PhysicalPlan>();
         ep4s.add(ep4);
         List<Boolean> flattened3 = new ArrayList<Boolean>();
         flattened3.add(false);
         POForEach nfe3 = new POForEach(new OperatorKey(scope,nig.getNextNodeId(scope)), -1, ep4s, flattened3);
-        
+
         oper2.plan.add(nfe3);
         oper2.plan.connect(nfe2, nfe3);
-        
+
         POStore str = getStore();
         str.setSFile(sampleFile);
-        
+
         oper2.plan.add(str);
         oper2.plan.connect(nfe3, str);
-        
+
         oper2.setClosed(true);
         oper2.requestedParallelism = 1;
         // oper2.markSampler();
         return new Pair<TezOperator[], Integer>(opers, rp);
     }
-    
+
     private static class FindKeyTypeVisitor extends PhyPlanVisitor {
 
         byte keyType = DataType.UNKNOWN;
@@ -1081,7 +1080,7 @@ public class TezCompiler extends PhyPlan
             keyType = p.getResultType();
         }
     }
-    
+
     private Pair<POProject,Byte> [] getSortCols(List<PhysicalPlan> plans) throws PlanException, ExecException {
         if(plans!=null){
             @SuppressWarnings("unchecked")
@@ -1105,7 +1104,7 @@ public class TezCompiler extends PhyPlan
         String msg = "No expression plan found in POSort.";
         throw new PlanException(msg, errCode, PigException.BUG);
     }
-    
+
     private TezOperator[] getSortJobs(
             POSort sort,
             TezOperator quantJob,
@@ -1123,11 +1122,11 @@ public class TezCompiler extends PhyPlan
 
         long limit = sort.getLimit();
         oper1.limit = limit;
-        
+
         List<PhysicalPlan> eps1 = new ArrayList<PhysicalPlan>();
 
         byte keyType = DataType.UNKNOWN;
-        
+
         boolean[] sortOrder;
 
         List<Boolean> sortOrderList = sort.getMAscCols();
@@ -1167,7 +1166,7 @@ public class TezCompiler extends PhyPlan
                 throw new PlanException(msg, errCode, PigException.BUG, ve);
             }
         }
-        
+
         POLocalRearrange lr = new POLocalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)));
         try {
             lr.setIndex(0);
@@ -1182,23 +1181,26 @@ public class TezCompiler extends PhyPlan
         lr.setResultType(DataType.TUPLE);
         lr.addOriginalLocation(sort.getAlias(), sort.getOriginalLocations());
         oper1.plan.addAsLeaf(lr);
-        
+
         oper1.setClosed(true);
-        
+
         TezOperator oper2 = getTezOp();
         opers[1] = oper2;
         tezPlan.add(oper2);
         connect(tezPlan, oper1, oper2);
-        
+
         if (limit!=-1) {
-            POPackageLite pkg_c = new POPackageLite(new OperatorKey(scope,nig.getNextNodeId(scope)));
-            pkg_c.setKeyType((fields.length>1) ? DataType.TUPLE : keyType);
+            POPackage pkg_c = new POPackage(new OperatorKey(scope,
+                    nig.getNextNodeId(scope)));
+            pkg_c.setPkgr(new LitePackager());
+            pkg_c.getPkgr().setKeyType(
+                    (fields.length > 1) ? DataType.TUPLE : keyType);
             pkg_c.setNumInps(1);
             oper2.inEdges.put(oper1.getOperatorKey(), new TezEdgeDescriptor());
             PhysicalPlan combinePlan = oper2.inEdges.get(oper1.getOperatorKey()).combinePlan;
-            
+
             combinePlan.add(pkg_c);
-            
+
             List<PhysicalPlan> eps_c1 = new ArrayList<PhysicalPlan>();
             List<Boolean> flat_c1 = new ArrayList<Boolean>();
             PhysicalPlan ep_c1 = new PhysicalPlan();
@@ -1209,25 +1211,25 @@ public class TezCompiler extends PhyPlan
             ep_c1.add(prj_c1);
             eps_c1.add(ep_c1);
             flat_c1.add(true);
-            POForEach fe_c1 = new POForEach(new OperatorKey(scope,nig.getNextNodeId(scope)), 
+            POForEach fe_c1 = new POForEach(new OperatorKey(scope,nig.getNextNodeId(scope)),
                     -1, eps_c1, flat_c1);
             fe_c1.setResultType(DataType.TUPLE);
-            
+
             combinePlan.addAsLeaf(fe_c1);
-            
+
             POLimit pLimit = new POLimit(new OperatorKey(scope,nig.getNextNodeId(scope)));
             pLimit.setLimit(limit);
             combinePlan.addAsLeaf(pLimit);
-            
+
             List<PhysicalPlan> eps_c2 = new ArrayList<PhysicalPlan>();
             eps_c2.addAll(sort.getSortPlans());
-        
+
             POLocalRearrange lr_c2 = new POLocalRearrange(new OperatorKey(scope,nig.getNextNodeId(scope)));
             try {
                 lr_c2.setIndex(0);
             } catch (ExecException e) {
                 int errCode = 2058;
-                String msg = "Unable to set index on newly created POLocalRearrange.";              
+                String msg = "Unable to set index on newly created POLocalRearrange.";
                 throw new PlanException(msg, errCode, PigException.BUG, e);
             }
             lr_c2.setKeyType((fields.length>1) ? DataType.TUPLE : keyType);
@@ -1235,13 +1237,17 @@ public class TezCompiler extends PhyPlan
             lr_c2.setResultType(DataType.TUPLE);
             combinePlan.addAsLeaf(lr_c2);
         }
-        
-        POPackageLite pkg = new POPackageLite(new OperatorKey(scope,nig.getNextNodeId(scope)));
-        pkg.setKeyType((fields == null || fields.length>1) ? DataType.TUPLE :
+
+        POPackage pkg = new POPackage(new OperatorKey(scope,
+                nig.getNextNodeId(scope)));
+        pkg.setPkgr(new LitePackager());
+        pkg.getPkgr().setKeyType(
+                (fields == null || fields.length > 1) ? DataType.TUPLE
+                        :
             keyType);
-        pkg.setNumInps(1);       
+        pkg.setNumInps(1);
         oper2.plan.add(pkg);
-        
+
         PhysicalPlan ep = new PhysicalPlan();
         POProject prj = new POProject(new OperatorKey(scope,nig.getNextNodeId(scope)));
         prj.setColumn(1);
@@ -1264,7 +1270,7 @@ public class TezCompiler extends PhyPlan
 
         return opers;
     }
-    
+
     @Override
     public void visitSort(POSort op) throws VisitorException {
         try{
@@ -1274,15 +1280,15 @@ public class TezCompiler extends PhyPlan
             FileSpec quantFile = getTempFileSpec();
             int rp = op.getRequestedParallelism();
             Pair<POProject, Byte>[] fields = getSortCols(op.getSortPlans());
-            Pair<TezOperator[], Integer> quantJobParallelismPair = 
+            Pair<TezOperator[], Integer> quantJobParallelismPair =
                 getQuantileJobs(op, oper, fSpec, quantFile, rp);
-            TezOperator[] opers = getSortJobs(op, quantJobParallelismPair.first[1], fSpec, quantFile, 
+            TezOperator[] opers = getSortJobs(op, quantJobParallelismPair.first[1], fSpec, quantFile,
                     quantJobParallelismPair.second, fields);
 
             quantJobParallelismPair.first[1].segmentBelow = true;
-            
+
             curTezOp = opers[1];
-            
+
             // TODO: Review sort udf
 //            if(op.isUDFComparatorUsed){
 //                curTezOp.UDFs.add(op.getMSortFunc().getFuncSpec().toString());
@@ -1396,8 +1402,8 @@ public class TezCompiler extends PhyPlan
     private POPackage getPackage() {
         boolean[] inner = { false };
         POPackage pkg = new POPackage(new OperatorKey(scope, nig.getNextNodeId(scope)));
-        pkg.setInner(inner);
-        pkg.setKeyType(DataType.TUPLE);
+        pkg.getPkgr().setInner(inner);
+        pkg.getPkgr().setKeyType(DataType.TUPLE);
         pkg.setNumInps(1);
         return pkg;
     }

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1546314&r1=1546313&r2=1546314&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Thu Nov 28 08:56:33 2013
@@ -71,10 +71,10 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigIntWritableComparator;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigLongWritableComparator;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigTupleWritableComparator;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.WeightedRangePartitioner;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigCombiner;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputFormat;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.WeightedRangePartitioner;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
@@ -189,7 +189,7 @@ public class TezDagBuilder extends TezOp
 
     private void addCombiner(PhysicalPlan combinePlan, Configuration conf) throws IOException {
         POPackage combPack = (POPackage)combinePlan.getRoots().get(0);
-        setIntermediateInputKeyValue(combPack.getKeyType(), conf);
+        setIntermediateInputKeyValue(combPack.getPkgr().getKeyType(), conf);
 
         POLocalRearrange combRearrange = (POLocalRearrange)combinePlan.getLeaves().get(0);
         setIntermediateOutputKeyValue(combRearrange.getKeyType(), conf);
@@ -232,7 +232,7 @@ public class TezDagBuilder extends TezOp
         for (POStore st: stores) {
             storeLocations.add(st);
             StoreFuncInterface sFunc = st.getStoreFunc();
-            sFunc.setStoreLocation(st.getSFile().getFileName(), job); 
+            sFunc.setStoreLocation(st.getSFile().getFileName(), job);
         }
 
         if (stores.size() == 1){
@@ -298,7 +298,7 @@ public class TezDagBuilder extends TezOp
         List<PhysicalOperator> roots = tezOp.plan.getRoots();
         if (roots.size() == 1 && roots.get(0) instanceof POPackage) {
             POPackage pack = (POPackage) roots.get(0);
-            byte keyType = pack.getKeyType();
+            byte keyType = pack.getPkgr().getKeyType();
             tezOp.plan.remove(pack);
             conf.set("pig.reduce.package", ObjectSerializer.serialize(pack));
             conf.set("pig.reduce.key.type", Byte.toString(keyType));
@@ -310,7 +310,7 @@ public class TezDagBuilder extends TezOp
         }
 
         conf.setClass("mapreduce.outputformat.class", PigOutputFormat.class, OutputFormat.class);
-        
+
         if(tezOp.isGlobalSort() || tezOp.isLimitAfterSort()){
             if (tezOp.isGlobalSort()) {
                 FileSystem fs = FileSystem.get(conf);
@@ -326,7 +326,7 @@ public class TezDagBuilder extends TezOp
                 conf.set("pig.quantilesFile", fstat.getPath().toString());
                 conf.set("pig.sortOrder",
                         ObjectSerializer.serialize(tezOp.getSortOrder()));
-                conf.setClass("mapreduce.job.partitioner.class", WeightedRangePartitioner.class, 
+                conf.setClass("mapreduce.job.partitioner.class", WeightedRangePartitioner.class,
                         Partitioner.class);
             }
         }

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPOPackageAnnotator.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPOPackageAnnotator.java?rev=1546314&r1=1546313&r2=1546314&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPOPackageAnnotator.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPOPackageAnnotator.java Thu Nov 28 08:56:33 2013
@@ -27,11 +27,9 @@ import org.apache.pig.PigException;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCombinerPackage;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.LitePackager;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackageLite;
 import org.apache.pig.impl.plan.DepthFirstWalker;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.plan.optimizer.OptimizerException;
@@ -102,32 +100,11 @@ public class TezPOPackageAnnotator exten
             super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
         }
 
-        /* (non-Javadoc)
-         * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor#visitStream(org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStream)
-         */
         @Override
         public void visitPackage(POPackage pkg) throws VisitorException {
             this.pkg = pkg;
         };
 
-        /* (non-Javadoc)
-         * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor#visitJoinPackage(org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POJoinPackage)
-         */
-        @Override
-        public void visitJoinPackage(POJoinPackage joinPackage)
-                throws VisitorException {
-            this.pkg = joinPackage;
-        }
-
-        /* (non-Javadoc)
-         * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor#visitCombinerPackage(org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPostCombinerPackage)
-         */
-        @Override
-        public void visitCombinerPackage(POCombinerPackage pkg)
-                throws VisitorException {
-            this.pkg = pkg;
-        }
-
         /**
          * @return the pkg
          */
@@ -154,15 +131,12 @@ public class TezPOPackageAnnotator exten
             this.pkg = pkg;
         }
 
-        /* (non-Javadoc)
-         * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor#visitStream(org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStream)
-         */
         @Override
         public void visitLocalRearrange(POLocalRearrange lrearrange) throws VisitorException {
             loRearrangeFound++;
             Map<Integer,Pair<Boolean, Map<Integer, Integer>>> keyInfo;
 
-            if (pkg instanceof POPackageLite) {
+            if (pkg.getPkgr() instanceof LitePackager) {
                 if(lrearrange.getIndex() != 0) {
                     // Throw some exception here
                     throw new RuntimeException("POLocalRearrange for POPackageLite cannot have index other than 0, but has index - "+lrearrange.getIndex());
@@ -171,7 +145,7 @@ public class TezPOPackageAnnotator exten
 
             // annotate the package with information from the LORearrange
             // update the keyInfo information if already present in the POPackage
-            keyInfo = pkg.getKeyInfo();
+            keyInfo = pkg.getPkgr().getKeyInfo();
             if(keyInfo == null)
                 keyInfo = new HashMap<Integer, Pair<Boolean, Map<Integer, Integer>>>();
 
@@ -188,9 +162,9 @@ public class TezPOPackageAnnotator exten
             keyInfo.put(Integer.valueOf(lrearrange.getIndex()),
                     new Pair<Boolean, Map<Integer, Integer>>(
                             lrearrange.isProjectStar(), lrearrange.getProjectedColsMap()));
-            pkg.setKeyInfo(keyInfo);
-            pkg.setKeyTuple(lrearrange.isKeyTuple());
-            pkg.setKeyCompound(lrearrange.isKeyCompound());
+            pkg.getPkgr().setKeyInfo(keyInfo);
+            pkg.getPkgr().setKeyTuple(lrearrange.isKeyTuple());
+            pkg.getPkgr().setKeyCompound(lrearrange.isKeyCompound());
         }
 
         /**