You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2013/11/29 00:58:01 UTC

svn commit: r1546477 [1/2] - in /pig/branches/tez: src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relati...

Author: cheolsoo
Date: Thu Nov 28 23:58:00 2013
New Revision: 1546477

URL: http://svn.apache.org/r1546477
Log:
PIG-3527: Allow PigProcessor to handle multiple inputs (mwagner via cheolsoo)

Added:
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/POSimpleTezLoad.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLoad.java
Removed:
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/FileInputHandler.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/InputHandler.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/ShuffledInputHandler.java
Modified:
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBigDecimalRawComparator.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBigIntegerRawComparator.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBooleanRawComparator.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBytesRawComparator.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDateTimeRawComparator.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDoubleRawComparator.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigFloatRawComparator.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigIntRawComparator.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigLongRawComparator.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSecondaryKeyComparator.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextRawComparator.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleDefaultRawComparator.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleSortComparator.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/JoinPackager.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/MultiQueryPackager.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java
    pig/branches/tez/src/org/apache/pig/data/BinInterSedes.java
    pig/branches/tez/src/org/apache/pig/data/DefaultTuple.java
    pig/branches/tez/test/e2e/pig/tests/tez.conf

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=1546477&r1=1546476&r2=1546477&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Thu Nov 28 23:58:00 2013
@@ -2208,8 +2208,7 @@ public class MRCompiler extends PhyPlanV
         POPackage pkg = new POPackage(new OperatorKey(scope,
                 nig.getNextNodeId(scope)));
         LitePackager pkgr = new LitePackager();
-        pkgr.setKeyType((fields == null || fields.length > 1) ? DataType.TUPLE
-                : keyType);
+        pkgr.setKeyType((fields == null || fields.length > 1) ? DataType.TUPLE : keyType);
         pkg.setPkgr(pkgr);
         pkg.setNumInps(1);
         mro.reducePlan.add(pkg);

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBigDecimalRawComparator.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBigDecimalRawComparator.java?rev=1546477&r1=1546476&r2=1546477&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBigDecimalRawComparator.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBigDecimalRawComparator.java Thu Nov 28 23:58:00 2013
@@ -46,14 +46,8 @@ public class PigBigDecimalRawComparator 
 
     @Override
     public void setConf(Configuration conf) {
-        if (!(conf instanceof JobConf)) {
-            mLog.warn("Expected jobconf in setConf, got "
-                    + conf.getClass().getName());
-            return;
-        }
-        JobConf jconf = (JobConf)conf;
         try {
-            mAsc = (boolean[])ObjectSerializer.deserialize(jconf.get(
+            mAsc = (boolean[])ObjectSerializer.deserialize(conf.get(
                 "pig.sortOrder"));
         } catch (IOException ioe) {
             mLog.error("Unable to deserialize pig.sortOrder "

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBigIntegerRawComparator.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBigIntegerRawComparator.java?rev=1546477&r1=1546476&r2=1546477&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBigIntegerRawComparator.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBigIntegerRawComparator.java Thu Nov 28 23:58:00 2013
@@ -46,14 +46,8 @@ public class PigBigIntegerRawComparator 
 
     @Override
     public void setConf(Configuration conf) {
-        if (!(conf instanceof JobConf)) {
-            mLog.warn("Expected jobconf in setConf, got "
-                    + conf.getClass().getName());
-            return;
-        }
-        JobConf jconf = (JobConf)conf;
         try {
-            mAsc = (boolean[])ObjectSerializer.deserialize(jconf.get(
+            mAsc = (boolean[])ObjectSerializer.deserialize(conf.get(
                 "pig.sortOrder"));
         } catch (IOException ioe) {
             mLog.error("Unable to deserialize pig.sortOrder "

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBooleanRawComparator.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBooleanRawComparator.java?rev=1546477&r1=1546476&r2=1546477&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBooleanRawComparator.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBooleanRawComparator.java Thu Nov 28 23:58:00 2013
@@ -40,14 +40,8 @@ public class PigBooleanRawComparator ext
         mWrappedComp = new BooleanWritable.Comparator();
     }
     public void setConf(Configuration conf) {
-        if (!(conf instanceof JobConf)) {
-            mLog.warn("Expected jobconf in setConf, got " +
-                conf.getClass().getName());
-            return;
-        }
-        JobConf jconf = (JobConf)conf;
         try {
-            mAsc = (boolean[])ObjectSerializer.deserialize(jconf.get(
+            mAsc = (boolean[])ObjectSerializer.deserialize(conf.get(
                 "pig.sortOrder"));
         } catch (IOException ioe) {
             mLog.error("Unable to deserialize pig.sortOrder " +
@@ -76,7 +70,7 @@ public class PigBooleanRawComparator ext
         if (b1[s1] == 0 && b2[s2] == 0) {
             byte byte1 = b1[s1 + 1];
             byte byte2 = b2[s2 + 1];
-            rc = (byte1 < byte2) ? -1 : ((byte1 > byte2) ? 1 : 0); 
+            rc = (byte1 < byte2) ? -1 : ((byte1 > byte2) ? 1 : 0);
         } else {
             // For sorting purposes two nulls are equal.
             if (b1[s1] != 0 && b2[s2] != 0) rc = 0;

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBytesRawComparator.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBytesRawComparator.java?rev=1546477&r1=1546476&r2=1546477&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBytesRawComparator.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigBytesRawComparator.java Thu Nov 28 23:58:00 2013
@@ -42,14 +42,8 @@ public class PigBytesRawComparator exten
     }
 
     public void setConf(Configuration conf) {
-        if (!(conf instanceof JobConf)) {
-            mLog.warn("Expected jobconf in setConf, got " +
-                conf.getClass().getName());
-            return;
-        }
-        JobConf jconf = (JobConf)conf;
         try {
-            mAsc = (boolean[])ObjectSerializer.deserialize(jconf.get(
+            mAsc = (boolean[])ObjectSerializer.deserialize(conf.get(
                 "pig.sortOrder"));
         } catch (IOException ioe) {
             mLog.error("Unable to deserialize pig.sortOrder " +

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDateTimeRawComparator.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDateTimeRawComparator.java?rev=1546477&r1=1546476&r2=1546477&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDateTimeRawComparator.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDateTimeRawComparator.java Thu Nov 28 23:58:00 2013
@@ -45,14 +45,8 @@ public class PigDateTimeRawComparator ex
     }
 
     public void setConf(Configuration conf) {
-        if (!(conf instanceof JobConf)) {
-            mLog.warn("Expected jobconf in setConf, got "
-                    + conf.getClass().getName());
-            return;
-        }
-        JobConf jconf = (JobConf) conf;
         try {
-            mAsc = (boolean[]) ObjectSerializer.deserialize(jconf
+            mAsc = (boolean[]) ObjectSerializer.deserialize(conf
                     .get("pig.sortOrder"));
         } catch (IOException ioe) {
             mLog.error("Unable to deserialize pig.sortOrder "

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDoubleRawComparator.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDoubleRawComparator.java?rev=1546477&r1=1546476&r2=1546477&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDoubleRawComparator.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigDoubleRawComparator.java Thu Nov 28 23:58:00 2013
@@ -43,14 +43,8 @@ public class PigDoubleRawComparator exte
     }
 
     public void setConf(Configuration conf) {
-        if (!(conf instanceof JobConf)) {
-            mLog.warn("Expected jobconf in setConf, got " +
-                conf.getClass().getName());
-            return;
-        }
-        JobConf jconf = (JobConf)conf;
         try {
-            mAsc = (boolean[])ObjectSerializer.deserialize(jconf.get(
+            mAsc = (boolean[])ObjectSerializer.deserialize(conf.get(
                 "pig.sortOrder"));
         } catch (IOException ioe) {
             mLog.error("Unable to deserialize pig.sortOrder " +

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigFloatRawComparator.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigFloatRawComparator.java?rev=1546477&r1=1546476&r2=1546477&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigFloatRawComparator.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigFloatRawComparator.java Thu Nov 28 23:58:00 2013
@@ -43,14 +43,8 @@ public class PigFloatRawComparator exten
     }
 
     public void setConf(Configuration conf) {
-        if (!(conf instanceof JobConf)) {
-            mLog.warn("Expected jobconf in setConf, got " +
-                conf.getClass().getName());
-            return;
-        }
-        JobConf jconf = (JobConf)conf;
         try {
-            mAsc = (boolean[])ObjectSerializer.deserialize(jconf.get(
+            mAsc = (boolean[])ObjectSerializer.deserialize(conf.get(
                 "pig.sortOrder"));
         } catch (IOException ioe) {
             mLog.error("Unable to deserialize pig.sortOrder " +

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigIntRawComparator.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigIntRawComparator.java?rev=1546477&r1=1546476&r2=1546477&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigIntRawComparator.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigIntRawComparator.java Thu Nov 28 23:58:00 2013
@@ -40,14 +40,8 @@ public class PigIntRawComparator extends
     }
 
     public void setConf(Configuration conf) {
-        if (!(conf instanceof JobConf)) {
-            mLog.warn("Expected jobconf in setConf, got " +
-                conf.getClass().getName());
-            return;
-        }
-        JobConf jconf = (JobConf)conf;
         try {
-            mAsc = (boolean[])ObjectSerializer.deserialize(jconf.get(
+            mAsc = (boolean[])ObjectSerializer.deserialize(conf.get(
                 "pig.sortOrder"));
         } catch (IOException ioe) {
             mLog.error("Unable to deserialize pig.sortOrder " +
@@ -76,7 +70,7 @@ public class PigIntRawComparator extends
         if (b1[s1] == 0 && b2[s2] == 0) {
             int int1 = readInt(b1, s1 + 1);
             int int2 = readInt(b2, s2 + 1);
-            rc = (int1 < int2) ? -1 : ((int1 > int2) ? 1 : 0); 
+            rc = (int1 < int2) ? -1 : ((int1 > int2) ? 1 : 0);
         } else {
             // For sorting purposes two nulls are equal.
             if (b1[s1] != 0 && b2[s2] != 0) rc = 0;

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigLongRawComparator.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigLongRawComparator.java?rev=1546477&r1=1546476&r2=1546477&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigLongRawComparator.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigLongRawComparator.java Thu Nov 28 23:58:00 2013
@@ -45,14 +45,8 @@ public class PigLongRawComparator extend
 
 
     public void setConf(Configuration conf) {
-        if (!(conf instanceof JobConf)) {
-            mLog.warn("Expected jobconf in setConf, got " +
-                conf.getClass().getName());
-            return;
-        }
-        JobConf jconf = (JobConf)conf;
         try {
-            mAsc = (boolean[])ObjectSerializer.deserialize(jconf.get(
+            mAsc = (boolean[])ObjectSerializer.deserialize(conf.get(
                 "pig.sortOrder"));
         } catch (IOException ioe) {
             mLog.error("Unable to deserialize pig.sortOrder " +

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSecondaryKeyComparator.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSecondaryKeyComparator.java?rev=1546477&r1=1546476&r2=1546477&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSecondaryKeyComparator.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigSecondaryKeyComparator.java Thu Nov 28 23:58:00 2013
@@ -34,11 +34,6 @@ public class PigSecondaryKeyComparator e
 
     @Override
     public void setConf(Configuration conf) {
-        if (!(conf instanceof JobConf)) {
-            mLog.warn("Expected jobconf in setConf, got " + conf.getClass().getName());
-            return;
-        }
-        JobConf jconf = (JobConf) conf;
         try {
             Class<? extends TupleRawComparator> mComparatorClass = TupleFactory.getInstance().tupleRawComparatorClass();
             mComparator = mComparatorClass.newInstance();
@@ -47,7 +42,7 @@ public class PigSecondaryKeyComparator e
         } catch (IllegalAccessException e) {
             throw new RuntimeException(e);
         }
-        mComparator.setConf(jconf);
+        mComparator.setConf(conf);
     }
 
     protected PigSecondaryKeyComparator() {

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextRawComparator.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextRawComparator.java?rev=1546477&r1=1546476&r2=1546477&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextRawComparator.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTextRawComparator.java Thu Nov 28 23:58:00 2013
@@ -44,14 +44,8 @@ public class PigTextRawComparator extend
 
 
     public void setConf(Configuration conf) {
-        if (!(conf instanceof JobConf)) {
-            mLog.warn("Expected jobconf in setConf, got " +
-                conf.getClass().getName());
-            return;
-        }
-        JobConf jconf = (JobConf)conf;
         try {
-            mAsc = (boolean[])ObjectSerializer.deserialize(jconf.get(
+            mAsc = (boolean[])ObjectSerializer.deserialize(conf.get(
                 "pig.sortOrder"));
         } catch (IOException ioe) {
             String msg = "Unable to deserialize pig.sortOrder";

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleDefaultRawComparator.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleDefaultRawComparator.java?rev=1546477&r1=1546476&r2=1546477&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleDefaultRawComparator.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleDefaultRawComparator.java Thu Nov 28 23:58:00 2013
@@ -47,13 +47,8 @@ public class PigTupleDefaultRawComparato
     }
 
     public void setConf(Configuration conf) {
-        if (!(conf instanceof JobConf)) {
-            mLog.warn("Expected jobconf in setConf, got " + conf.getClass().getName());
-            return;
-        }
-        JobConf jconf = (JobConf) conf;
         try {
-            mAsc = (boolean[]) ObjectSerializer.deserialize(jconf.get("pig.sortOrder"));
+            mAsc = (boolean[]) ObjectSerializer.deserialize(conf.get("pig.sortOrder"));
         } catch (IOException ioe) {
             mLog.error("Unable to deserialize pig.sortOrder " + ioe.getMessage());
             throw new RuntimeException(ioe);
@@ -75,7 +70,7 @@ public class PigTupleDefaultRawComparato
     public boolean hasComparedTupleNull() {
         return mHasNullField;
     }
-    
+
     private static final BinInterSedes bis = new BinInterSedes();
 
     /**

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleSortComparator.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleSortComparator.java?rev=1546477&r1=1546476&r2=1546477&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleSortComparator.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PigTupleSortComparator.java Thu Nov 28 23:58:00 2013
@@ -47,13 +47,8 @@ public class PigTupleSortComparator exte
 
     @Override
     public void setConf(Configuration conf) {
-        if (!(conf instanceof JobConf)) {
-            mLog.warn("Expected jobconf in setConf, got " + conf.getClass().getName());
-            return;
-        }
-        JobConf jconf = (JobConf) conf;
         try {
-            mAsc = (boolean[]) ObjectSerializer.deserialize(jconf.get("pig.sortOrder"));
+            mAsc = (boolean[]) ObjectSerializer.deserialize(conf.get("pig.sortOrder"));
         } catch (IOException ioe) {
             mLog.error("Unable to deserialize pig.sortOrder " + ioe.getMessage());
             throw new RuntimeException(ioe);
@@ -86,7 +81,7 @@ public class PigTupleSortComparator exte
                 throw new RuntimeException(e);
             }
         }
-        ((Configurable)mComparator).setConf(jconf);
+        ((Configurable)mComparator).setConf(conf);
     }
 
     @Override
@@ -165,4 +160,4 @@ public class PigTupleSortComparator exte
             return 0;
         }
     }
-}
\ No newline at end of file
+}

Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/POSimpleTezLoad.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/POSimpleTezLoad.java?rev=1546477&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/POSimpleTezLoad.java (added)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/POSimpleTezLoad.java Thu Nov 28 23:58:00 2013
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.hadoop.executionengine.physicalLayer;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezLoad;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.tez.mapreduce.input.MRInput;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.library.api.KeyValueReader;
+
+/**
+ * POSimpleTezLoad is used on the backend to read tuples from a Tez MRInput
+ */
+public class POSimpleTezLoad extends POLoad implements TezLoad {
+    /**
+     *
+     */
+    private static final long serialVersionUID = 1L;
+    private String inputKey;
+    private MRInput input;
+    private KeyValueReader reader;
+
+    private Result res;
+
+    public POSimpleTezLoad(OperatorKey k) {
+        super(k);
+        res = new Result();
+    }
+
+    @Override
+    public void attachInputs(Map<String, LogicalInput> inputs,
+            Configuration conf)
+            throws ExecException {
+        LogicalInput logInput = inputs.get(inputKey);
+        if (logInput == null || !(logInput instanceof MRInput)) {
+            throw new ExecException("POSimpleTezLoad only accepts MRInputs");
+        }
+        input = (MRInput) logInput;
+        try {
+            reader = input.getReader();
+        } catch (IOException e) {
+            throw new ExecException(e);
+        }
+    }
+
+    @Override
+    public Result getNextTuple() throws ExecException {
+        try {
+            if (!reader.next()) {
+                res.result = null;
+                res.returnStatus = POStatus.STATUS_EOP;
+            } else {
+                Tuple next = (Tuple) reader.getCurrentValue();
+                res.result = next;
+                res.returnStatus = POStatus.STATUS_OK;
+            }
+            return res;
+        } catch (IOException e) {
+            throw new ExecException(e);
+        }
+    }
+
+    public void setInputKey(String inputKey) {
+        this.inputKey = inputKey;
+    }
+}

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java?rev=1546477&r1=1546476&r2=1546477&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/CombinerPackager.java Thu Nov 28 23:58:00 2013
@@ -102,7 +102,11 @@ public class CombinerPackager extends Pa
 
     @Override
     public Result getNext() throws ExecException {
-        //Create numInputs bags
+        if (bags == null) {
+            return new Result(POStatus.STATUS_EOP, null);
+        }
+
+        // Create numInputs bags
         Object[] fields = new Object[mBags.length];
         for (int i = 0; i < mBags.length; i++) {
             if (mBags[i]) fields[i] = createDataBag(numBags);
@@ -131,6 +135,8 @@ public class CombinerPackager extends Pa
             }
         }
 
+        detachInput();
+
         // The successor of the POCombinerPackage as of
         // now SHOULD be a POForeach which has been adjusted
         // to look for its inputs by projecting from the corresponding
@@ -147,7 +153,7 @@ public class CombinerPackager extends Pa
     }
 
     @Override
-    protected Tuple getValueTuple(Object key, NullableTuple ntup, int index)
+    public Tuple getValueTuple(Object key, NullableTuple ntup, int index)
             throws ExecException {
         return (Tuple) ntup.getValueAsPigType();
     }

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/JoinPackager.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/JoinPackager.java?rev=1546477&r1=1546476&r2=1546477&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/JoinPackager.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/JoinPackager.java Thu Nov 28 23:58:00 2013
@@ -216,7 +216,7 @@ public class JoinPackager extends Packag
     }
 
     @Override
-    void attachInput(Object key, DataBag[] bags, boolean[] readOnce)
+    public void attachInput(Object key, DataBag[] bags, boolean[] readOnce)
             throws ExecException {
         super.attachInput(key, bags, readOnce);
         this.newKey = true;

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java?rev=1546477&r1=1546476&r2=1546477&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/LitePackager.java Thu Nov 28 23:58:00 2013
@@ -112,6 +112,10 @@ public class LitePackager extends Packag
      */
     @Override
     public Result getNext() throws ExecException {
+        if (bags == null) {
+            return new Result(POStatus.STATUS_EOP, null);
+        }
+
         Tuple res;
 
         //Construct the output tuple by appending

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/MultiQueryPackager.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/MultiQueryPackager.java?rev=1546477&r1=1546476&r2=1546477&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/MultiQueryPackager.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/MultiQueryPackager.java Thu Nov 28 23:58:00 2013
@@ -24,6 +24,7 @@ import java.util.List;
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.HDataType;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
@@ -128,6 +129,9 @@ public class MultiQueryPackager extends 
      */
     @Override
     public Result getNext() throws ExecException {
+        if (bags == null) {
+            return new Result(POStatus.STATUS_EOP, null);
+        }
 
         byte origIndex = myKey.getIndex();
 
@@ -159,6 +163,7 @@ public class MultiQueryPackager extends 
 
         Result res = pkgr.getNext();
         pkgr.detachInput();
+        detachInput();
 
         Tuple tuple = (Tuple)res.result;
 

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java?rev=1546477&r1=1546476&r2=1546477&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java Thu Nov 28 23:58:00 2013
@@ -70,12 +70,12 @@ public class POPackage extends PhysicalO
     transient Iterator<NullableTuple> tupIter;
 
     //The key being worked on
-    Object key;
+    protected Object key;
 
     //The number of inputs to this
     //co-group.  0 indicates a distinct, which means there will only be a
     //key, no value.
-    int numInputs;
+    protected int numInputs;
 
     // A mapping of input index to key information got from LORearrange
     // for that index. The Key information is a pair of boolean, Map.
@@ -152,7 +152,7 @@ public class POPackage extends PhysicalO
     public void attachInput(PigNullableWritable k, Iterator<NullableTuple> inp) {
         try {
             tupIter = inp;
-            key = pkgr.getKey(k.getValueAsPigType());
+            key = pkgr.getKey(k);
             inputAttached = true;
         } catch (Exception e) {
             throw new RuntimeException(

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java?rev=1546477&r1=1546476&r2=1546477&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/Packager.java Thu Nov 28 23:58:00 2013
@@ -12,6 +12,7 @@ import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.io.PigNullableWritable;
 import org.apache.pig.impl.util.Pair;
 
 public class Packager implements Serializable, Cloneable {
@@ -64,18 +65,18 @@ public class Packager implements Seriali
     private PackageType pkgType;
 
     protected static final BagFactory mBagFactory = BagFactory.getInstance();
-    protected static final TupleFactory mTupleFactory = TupleFactory
-            .getInstance();
+    protected static final TupleFactory mTupleFactory = TupleFactory.getInstance();
 
-    Object getKey(Object key) throws ExecException {
+    public Object getKey(PigNullableWritable key) throws ExecException {
+        Object keyObject = key.getValueAsPigType();
         if (useSecondaryKey) {
-            return ((Tuple) key).get(0);
+            return ((Tuple) keyObject).get(0);
         } else {
-            return key;
+            return keyObject;
         }
     }
 
-    void attachInput(Object key, DataBag[] bags, boolean[] readOnce)
+    public void attachInput(Object key, DataBag[] bags, boolean[] readOnce)
             throws ExecException {
         this.key = key;
         this.bags = bags;
@@ -83,6 +84,9 @@ public class Packager implements Seriali
     }
 
     public Result getNext() throws ExecException {
+        if (bags == null) {
+            return new Result(POStatus.STATUS_EOP, null);
+        }
         Tuple res;
 
         if (isDistinct()) {
@@ -117,21 +121,19 @@ public class Packager implements Seriali
                 }
             }
         }
+        detachInput();
         Result r = new Result();
         r.returnStatus = POStatus.STATUS_OK;
-        // if (!isAccumulative())
-        // r.result = illustratorMarkup(null, res, 0);
-        // else
         r.result = res;
         return r;
     }
 
-    void detachInput() {
+    public void detachInput() {
         key = null;
         bags = null;
     }
 
-    protected Tuple getValueTuple(Object key, NullableTuple ntup, int index)
+    public Tuple getValueTuple(Object key, NullableTuple ntup, int index)
             throws ExecException {
         // Need to make a copy of the value, as hadoop uses the same ntup
         // to represent each value.

Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java?rev=1546477&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java (added)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java Thu Nov 28 23:58:00 2013
@@ -0,0 +1,174 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.InternalCachedBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.io.NullableTuple;
+import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.tez.runtime.api.LogicalInput;
+import org.apache.tez.runtime.library.api.KeyValuesReader;
+import org.apache.tez.runtime.library.input.ShuffledMergedInput;
+
+public class POShuffleTezLoad extends POPackage implements TezLoad {
+
+    private static final long serialVersionUID = 1L;
+
+    List<String> inputKeys = new ArrayList<String>();
+
+    List<ShuffledMergedInput> inputs = new ArrayList<ShuffledMergedInput>();
+    List<KeyValuesReader> readers = new ArrayList<KeyValuesReader>();
+    List<Boolean> finished = new ArrayList<Boolean>();
+
+    private boolean[] readOnce;
+
+    Result res;
+
+    protected static final TupleFactory tf = TupleFactory.getInstance();
+
+    WritableComparator comparator = null;
+
+    public POShuffleTezLoad(OperatorKey k, POPackage pack) {
+        super(k);
+        setPkgr(pack.getPkgr());
+        this.setNumInps(pack.getNumInps());
+    }
+
+    @Override
+    public void attachInputs(Map<String, LogicalInput> inputs, Configuration conf)
+            throws ExecException {
+        readOnce = new boolean[numInputs];
+        for (int i = 0; i < numInputs; i++) {
+            readOnce[i] = false;
+        }
+
+        try {
+            comparator = ReflectionUtils.newInstance(
+                    TezDagBuilder.comparatorForKeyType(pkgr.getKeyType()), conf);
+        } catch (JobCreationException e) {
+            throw new ExecException(e);
+        }
+        try {
+            // TODO: Only take the inputs which are actually specified.
+            for (LogicalInput input : inputs.values()) {
+                ShuffledMergedInput smInput = (ShuffledMergedInput) input;
+                this.inputs.add(smInput);
+                this.readers.add(smInput.getReader());
+            }
+
+            for (int i = 0; i < numInputs; i++) {
+                finished.add(!readers.get(i).next());
+            }
+        } catch (IOException e) {
+            throw new ExecException(e);
+        }
+    }
+
+    @Override
+    public Result getNextTuple() throws ExecException {
+        Result res = pkgr.getNext();
+        while (res.returnStatus == POStatus.STATUS_EOP) {
+            PigNullableWritable minimum = null;
+            boolean newData = false;
+            try {
+                for (int i = 0; i < numInputs; i++) {
+                    if (!finished.get(i)) {
+                        newData = true;
+                        PigNullableWritable current = (PigNullableWritable) readers
+                                .get(i).getCurrentKey();
+                        if (minimum == null
+                                || comparator.compare(minimum, current) > 0) {
+                            minimum = current;
+                        }
+                    }
+                }
+            } catch (IOException e) {
+                throw new ExecException(e);
+            }
+
+            if (!newData) {
+                return new Result(POStatus.STATUS_EOP, null);
+            }
+
+            key = pkgr.getKey(minimum);
+
+            DataBag[] bags = new DataBag[numInputs];
+
+            try {
+                for (int i = 0; i < numInputs; i++) {
+                    if (!finished.get(i)) {
+                        PigNullableWritable current = (PigNullableWritable) readers
+                                .get(i).getCurrentKey();
+                        if (comparator.compare(minimum, current) == 0) {
+                            DataBag bag = new InternalCachedBag(numInputs);
+                            Iterable<Object> vals = readers.get(i).getCurrentValues();
+                            for (Object val : vals) {
+                                NullableTuple nTup = (NullableTuple) val;
+                                int index = nTup.getIndex();
+                                Tuple tup = pkgr.getValueTuple(key, nTup, index);
+                                bag.add(tup);
+                            }
+                            finished.set(i, !readers.get(i).next());
+                            bags[i] = bag;
+                        } else {
+                            bags[i] = new InternalCachedBag(numInputs);
+                        }
+                    } else {
+                        bags[i] = new InternalCachedBag(numInputs);
+                    }
+                }
+            } catch (IOException e) {
+                throw new ExecException(e);
+            }
+            pkgr.attachInput(key, bags, readOnce);
+            res = pkgr.getNext();
+        }
+        return res;
+    }
+
+    public void setInputKeys(List<String> inputKeys) {
+        this.inputKeys = inputKeys;
+    }
+
+    public void addInputKey(String inputKey) {
+        inputKeys.add(inputKey);
+    }
+
+    @Override
+    public boolean supportsMultipleInputs() {
+        return true;
+    }
+
+}

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java?rev=1546477&r1=1546476&r2=1546477&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java Thu Nov 28 23:58:00 2013
@@ -18,9 +18,9 @@
 package org.apache.pig.backend.hadoop.executionengine.tez;
 
 import java.io.IOException;
-import java.lang.reflect.Constructor;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -31,12 +31,14 @@ import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.HDataType;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POSimpleTezLoad;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
 import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
 import org.apache.pig.data.SchemaTupleBackend;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
@@ -70,8 +72,6 @@ public class PigProcessor implements Log
     private boolean shuffle;
     private byte keyType;
 
-    private InputHandler input;
-
     private Configuration conf;
 
     @Override
@@ -81,8 +81,6 @@ public class PigProcessor implements Log
         conf = TezUtils.createConfFromUserPayload(payload);
         PigContext pc = (PigContext) ObjectSerializer.deserialize(conf.get("pig.pigContext"));
 
-        input = createInputHandler(conf);
-
         UDFContext.getUDFContext().addJobConf(conf);
         UDFContext.getUDFContext().deserialize();
 
@@ -106,7 +104,7 @@ public class PigProcessor implements Log
     public void run(Map<String, LogicalInput> inputs,
             Map<String, LogicalOutput> outputs) throws Exception {
 
-        input.initialize(conf, inputs);
+        initializeInputs(inputs);
 
         initializeOutputs(outputs);
 
@@ -124,34 +122,24 @@ public class PigProcessor implements Log
             }
         }
 
-        while (input.next()){
-            Tuple inputTuple = input.getCurrentTuple();
-            if (execPlan.isEmpty()) {
-                writeResult(inputTuple);
-                continue;
-            }
-
-            for (PhysicalOperator root : roots) {
-                root.attachInput(inputTuple);
-            }
-
-            runPipeline(leaf);
-        }
+        runPipeline(leaf);
 
         for (MROutput fileOutput : fileOutputs){
             fileOutput.commit();
         }
     }
 
-    private  InputHandler createInputHandler(Configuration conf) throws PigException {
-        Class<? extends InputHandler> inputClass;
-        try {
-            inputClass = (Class<? extends InputHandler>)
-                    Class.forName(conf.get("pig.input.handler.class"));
-            Constructor<? extends InputHandler> constructor = inputClass.getConstructor();
-            return constructor.newInstance();
-        } catch (Exception e) {
-            throw new PigException("Could not instantiate input handler", e);
+    private void initializeInputs(Map<String, LogicalInput> inputs)
+            throws IOException {
+        //getPhysicalOperators only accept C extends PhysicalOperator, so we can't change it to look for TezLoad
+        // TODO: Change that.
+        LinkedList<POSimpleTezLoad> tezLds = PlanHelper.getPhysicalOperators(execPlan, POSimpleTezLoad.class);
+        for (POSimpleTezLoad tezLd : tezLds){
+            tezLd.attachInputs(inputs, conf);
+        }
+        LinkedList<POShuffleTezLoad> shuffles = PlanHelper.getPhysicalOperators(execPlan, POShuffleTezLoad.class);
+        for (POShuffleTezLoad shuffle : shuffles){
+            shuffle.attachInputs(inputs, conf);
         }
     }
 

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1546477&r1=1546476&r2=1546477&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Thu Nov 28 23:58:00 2013
@@ -33,6 +33,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableComparator;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.OutputFormat;
@@ -47,34 +48,25 @@ import org.apache.pig.PigException;
 import org.apache.pig.StoreFuncInterface;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.HDataType;
+import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigBigDecimalWritableComparator;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigBigIntegerWritableComparator;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigBooleanWritableComparator;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigCharArrayWritableComparator;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigDBAWritableComparator;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigDateTimeWritableComparator;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigDoubleWritableComparator;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigFloatWritableComparator;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingBigDecimalWritableComparator;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingBigIntegerWritableComparator;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingBooleanWritableComparator;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingCharArrayWritableComparator;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingDBAWritableComparator;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingDateTimeWritableComparator;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingDoubleWritableComparator;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingFloatWritableComparator;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingIntWritableComparator;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingLongWritableComparator;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingTupleWritableComparator;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigIntWritableComparator;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigLongWritableComparator;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigTupleWritableComparator;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigBigDecimalRawComparator;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigBigIntegerRawComparator;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigBooleanRawComparator;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigBytesRawComparator;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigCombiner;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigDateTimeRawComparator;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigDoubleRawComparator;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigFloatRawComparator;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigIntRawComparator;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigLongRawComparator;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputFormat;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextRawComparator;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTupleSortComparator;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.WeightedRangePartitioner;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POSimpleTezLoad;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
@@ -89,6 +81,7 @@ import org.apache.pig.impl.io.FileLocali
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.io.NullableTuple;
 import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.ObjectSerializer;
@@ -119,12 +112,20 @@ public class TezDagBuilder extends TezOp
     private DAG dag;
     private Map<String, LocalResource> localResources;
     private PigContext pc;
+    private Configuration globalConf;
 
-    public TezDagBuilder(PigContext pc, TezOperPlan plan, DAG dag, Map<String, LocalResource> localResources) {
+    private String scope;
+    private NodeIdGenerator nig;
+
+    public TezDagBuilder(PigContext pc, TezOperPlan plan, DAG dag,
+            Map<String, LocalResource> localResources) {
         super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan));
         this.pc = pc;
+        this.globalConf = ConfigurationUtil.toConfiguration(pc.getProperties(), true);
         this.localResources = localResources;
         this.dag = dag;
+        this.scope = plan.getRoots().get(0).getOperatorKey().getScope();
+        this.nig = NodeIdGenerator.getGenerator();
     }
 
     @Override
@@ -135,11 +136,12 @@ public class TezDagBuilder extends TezOp
             to = newVertex(tezOp);
             dag.addVertex(to);
         } catch (IOException e) {
-            throw new VisitorException("Cannot create vertex for " + tezOp.name(), e);
+            throw new VisitorException("Cannot create vertex for "
+                    + tezOp.name(), e);
         }
 
         // Connect the new vertex with predecessor vertices
-        TezOperPlan tezPlan =  getPlan();
+        TezOperPlan tezPlan = getPlan();
         List<TezOperator> predecessors = tezPlan.getPredecessors(tezOp);
         if (predecessors != null) {
             for (TezOperator predecessor : predecessors) {
@@ -150,8 +152,8 @@ public class TezDagBuilder extends TezOp
                 try {
                     prop = newEdge(predecessor, tezOp);
                 } catch (IOException e) {
-                    throw new VisitorException("Cannot create edge from " +
-                            predecessor.name() + " to " + tezOp.name(), e);
+                    throw new VisitorException("Cannot create edge from "
+                            + predecessor.name() + " to " + tezOp.name(), e);
                 }
                 Edge edge = new Edge(from, to, prop);
                 dag.addEdge(edge);
@@ -161,12 +163,14 @@ public class TezDagBuilder extends TezOp
 
     /**
      * Return EdgeProperty that connects two vertices.
+     *
      * @param from
      * @param to
      * @return EdgeProperty
      * @throws IOException
      */
-    private EdgeProperty newEdge(TezOperator from, TezOperator to) throws IOException {
+    private EdgeProperty newEdge(TezOperator from, TezOperator to)
+            throws IOException {
         TezEdgeDescriptor edge = to.inEdges.get(from.getOperatorKey());
         PhysicalPlan combinePlan = edge.combinePlan;
 
@@ -180,107 +184,127 @@ public class TezDagBuilder extends TezOp
             out.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
         }
 
-        return new EdgeProperty(
-                edge.dataMovementType,
-                edge.dataSourceType,
-                edge.schedulingType,
-                out, in);
+        return new EdgeProperty(edge.dataMovementType, edge.dataSourceType,
+                edge.schedulingType, out, in);
     }
 
     private void addCombiner(PhysicalPlan combinePlan, Configuration conf) throws IOException {
         POPackage combPack = (POPackage)combinePlan.getRoots().get(0);
         setIntermediateInputKeyValue(combPack.getPkgr().getKeyType(), conf);
 
-        POLocalRearrange combRearrange = (POLocalRearrange)combinePlan.getLeaves().get(0);
+        POLocalRearrange combRearrange = (POLocalRearrange) combinePlan
+                .getLeaves().get(0);
         setIntermediateOutputKeyValue(combRearrange.getKeyType(), conf);
 
-        LoRearrangeDiscoverer lrDiscoverer = new LoRearrangeDiscoverer(combinePlan, combPack);
+        LoRearrangeDiscoverer lrDiscoverer = new LoRearrangeDiscoverer(
+                combinePlan, combPack);
         lrDiscoverer.visit();
 
         combinePlan.remove(combPack);
-        conf.set(TezJobConfig.TEZ_RUNTIME_COMBINER_CLASS, MRCombiner.class.getName());
-        conf.set(MRJobConfig.COMBINE_CLASS_ATTR, PigCombiner.Combine.class.getName());
+        conf.set(TezJobConfig.TEZ_RUNTIME_COMBINER_CLASS,
+                MRCombiner.class.getName());
+        conf.set(MRJobConfig.COMBINE_CLASS_ATTR,
+                PigCombiner.Combine.class.getName());
         conf.setBoolean("mapred.mapper.new-api", true);
         conf.set("pig.pigContext", ObjectSerializer.serialize(pc));
         conf.set("pig.combinePlan", ObjectSerializer.serialize(combinePlan));
         conf.set("pig.combine.package", ObjectSerializer.serialize(combPack));
-        conf.set("pig.map.keytype", ObjectSerializer.serialize(new byte[] {combRearrange.getKeyType()}));
+        conf.set("pig.map.keytype", ObjectSerializer
+                .serialize(new byte[] { combRearrange.getKeyType() }));
     }
 
     private Vertex newVertex(TezOperator tezOp) throws IOException {
-        ProcessorDescriptor procDesc = new ProcessorDescriptor(tezOp.getProcessorName());
+        ProcessorDescriptor procDesc = new ProcessorDescriptor(
+                tezOp.getProcessorName());
 
         // Pass physical plans to vertex as user payload.
-        Configuration conf = new Configuration();
-        // We won't actually use this job, but we need it to talk with the Load Store funcs
+        Configuration payloadConf = new Configuration();
+        // We won't actually use this job, but we need it to talk with the Load
+        // Store funcs
         @SuppressWarnings("deprecation")
-        Job job = new Job(conf);
+        Job job = new Job(payloadConf);
 
         ArrayList<POStore> storeLocations = new ArrayList<POStore>();
         Path tmpLocation = null;
 
-        boolean loads = processLoads(tezOp, conf, job);
-        conf.set("pig.pigContext", ObjectSerializer.serialize(pc));
-
-        conf.set("udf.import.list", ObjectSerializer.serialize(PigContext.getPackageImportList()));
-        conf.setBoolean("mapred.mapper.new-api", true);
-        conf.setClass("mapreduce.inputformat.class", PigInputFormat.class, InputFormat.class);
+        List<POLoad> loads = processLoads(tezOp, payloadConf, job);
+        payloadConf.set("pig.pigContext", ObjectSerializer.serialize(pc));
 
-        // We need to remove all the POStores from the exec plan and later add them as outputs of the vertex
-        LinkedList<POStore> stores = PlanHelper.getPhysicalOperators(tezOp.plan, POStore.class);
+        payloadConf.set("udf.import.list",
+                ObjectSerializer.serialize(PigContext.getPackageImportList()));
+        payloadConf.setBoolean("mapred.mapper.new-api", true);
+        payloadConf.setClass("mapreduce.inputformat.class",
+                PigInputFormat.class, InputFormat.class);
+
+        // We need to remove all the POStores from the exec plan and later add
+        // them as outputs of the vertex
+        LinkedList<POStore> stores = PlanHelper.getPhysicalOperators(
+                tezOp.plan, POStore.class);
 
-        for (POStore st: stores) {
+        for (POStore st : stores) {
             storeLocations.add(st);
             StoreFuncInterface sFunc = st.getStoreFunc();
             sFunc.setStoreLocation(st.getSFile().getFileName(), job);
         }
 
-        if (stores.size() == 1){
+        if (stores.size() == 1) {
             POStore st = stores.get(0);
-            if(!pc.inIllustrator)
+            if (!pc.inIllustrator)
                 tezOp.plan.remove(st);
 
             // set out filespecs
             String outputPathString = st.getSFile().getFileName();
-            if (!outputPathString.contains("://") || outputPathString.startsWith("hdfs://")) {
-                conf.set("pig.streaming.log.dir",
-                        new Path(outputPathString, JobControlCompiler.LOG_DIR).toString());
+            if (!outputPathString.contains("://")
+                    || outputPathString.startsWith("hdfs://")) {
+                payloadConf.set("pig.streaming.log.dir", new Path(
+                        outputPathString, JobControlCompiler.LOG_DIR)
+                        .toString());
             } else {
-                String tmpLocationStr =  FileLocalizer
-                        .getTemporaryPath(pc).toString();
+                String tmpLocationStr = FileLocalizer.getTemporaryPath(pc)
+                        .toString();
                 tmpLocation = new Path(tmpLocationStr);
-                conf.set("pig.streaming.log.dir",
-                        new Path(tmpLocation, JobControlCompiler.LOG_DIR).toString());
+                payloadConf.set("pig.streaming.log.dir", new Path(tmpLocation,
+                        JobControlCompiler.LOG_DIR).toString());
             }
-            conf.set("pig.streaming.task.output.dir", outputPathString);
+            payloadConf.set("pig.streaming.task.output.dir", outputPathString);
         } else if (stores.size() > 0) { // multi store case
             log.info("Setting up multi store job");
-            String tmpLocationStr =  FileLocalizer
-                    .getTemporaryPath(pc).toString();
+            String tmpLocationStr = FileLocalizer.getTemporaryPath(pc)
+                    .toString();
             tmpLocation = new Path(tmpLocationStr);
 
-            boolean disableCounter = conf.getBoolean("pig.disable.counter", false);
+            boolean disableCounter = payloadConf.getBoolean(
+                    "pig.disable.counter", false);
             if (disableCounter) {
                 log.info("Disable Pig custom output counters");
             }
             int idx = 0;
-            for (POStore sto: storeLocations) {
+            for (POStore sto : storeLocations) {
                 sto.setDisableCounter(disableCounter);
                 sto.setMultiStore(true);
                 sto.setIndex(idx++);
             }
 
-            conf.set("pig.streaming.log.dir",
-                    new Path(tmpLocation, JobControlCompiler.LOG_DIR).toString());
-            conf.set("pig.streaming.task.output.dir", tmpLocation.toString());
+            payloadConf.set("pig.streaming.log.dir", new Path(tmpLocation,
+                    JobControlCompiler.LOG_DIR).toString());
+            payloadConf.set("pig.streaming.task.output.dir",
+                    tmpLocation.toString());
         }
 
         if (!pc.inIllustrator) {
-            // Unset inputs for POStore, otherwise, exec plan will be unnecessarily deserialized
-            for (POStore st: stores) { st.setInputs(null); st.setParentPlan(null);}
-            // We put them in the reduce because PigOutputCommitter checks the ID of the task to see if it's a map, and if not, calls the reduce committers.
-            conf.set(JobControlCompiler.PIG_MAP_STORES, ObjectSerializer.serialize(new ArrayList<POStore>()));
-            conf.set(JobControlCompiler.PIG_REDUCE_STORES, ObjectSerializer.serialize(stores));
+            // Unset inputs for POStore, otherwise, exec plan will be
+            // unnecessarily deserialized
+            for (POStore st : stores) {
+                st.setInputs(null);
+                st.setParentPlan(null);
+            }
+            // We put them in the reduce because PigOutputCommitter checks the
+            // ID of the task to see if it's a map, and if not, calls the reduce
+            // committers.
+            payloadConf.set(JobControlCompiler.PIG_MAP_STORES,
+                    ObjectSerializer.serialize(new ArrayList<POStore>()));
+            payloadConf.set(JobControlCompiler.PIG_REDUCE_STORES,
+                    ObjectSerializer.serialize(stores));
         }
 
         // For all shuffle outputs, configure the classes
@@ -290,30 +314,52 @@ public class TezDagBuilder extends TezOp
         // different keys.
         if (leaves.size() == 1 && leaves.get(0) instanceof POLocalRearrange) {
             byte keyType = ((POLocalRearrange)leaves.get(0)).getKeyType();
-            setIntermediateOutputKeyValue(keyType, conf);
-            conf.set("pig.reduce.key.type", Byte.toString(keyType));
+            setIntermediateOutputKeyValue(keyType, payloadConf);
+            payloadConf.set("pig.reduce.key.type", Byte.toString(keyType));
         }
 
         // Configure the classes for incoming shuffles to this TezOp
         List<PhysicalOperator> roots = tezOp.plan.getRoots();
         if (roots.size() == 1 && roots.get(0) instanceof POPackage) {
             POPackage pack = (POPackage) roots.get(0);
+
+            List<PhysicalOperator> succsList = tezOp.plan.getSuccessors(pack);
+            if (succsList != null) {
+                succsList = new ArrayList<PhysicalOperator>(succsList);
+            }
             byte keyType = pack.getPkgr().getKeyType();
             tezOp.plan.remove(pack);
-            conf.set("pig.reduce.package", ObjectSerializer.serialize(pack));
-            conf.set("pig.reduce.key.type", Byte.toString(keyType));
-            setIntermediateInputKeyValue(keyType, conf);
-            conf.setClass("pig.input.handler.class", ShuffledInputHandler.class, InputHandler.class);
-            conf.set("pig.reduce.key.type", Byte.toString(keyType));
-        } else {
-            conf.setClass("pig.input.handler.class", FileInputHandler.class, InputHandler.class);
-        }
+            payloadConf.set("pig.reduce.package",
+                    ObjectSerializer.serialize(pack));
+            payloadConf.set("pig.reduce.key.type", Byte.toString(keyType));
+            setIntermediateInputKeyValue(keyType, payloadConf);
+            // TODO: Move POShuffleTezLoad upstream to Physical Plan generation
+            POShuffleTezLoad shuffleLoad = new POShuffleTezLoad(
+                    new OperatorKey(scope, nig.getNextNodeId(scope)), pack);
+            tezOp.plan.add(shuffleLoad);
+
+            if (succsList != null) {
+                for (PhysicalOperator succs : succsList) {
+                    tezOp.plan.connect(shuffleLoad, succs);
+                }
+            }
 
-        conf.setClass("mapreduce.outputformat.class", PigOutputFormat.class, OutputFormat.class);
+            @SuppressWarnings("rawtypes")
+            Class<? extends WritableComparable> keyClass = HDataType
+                    .getWritableComparableTypes(pack.getPkgr().getKeyType())
+                    .getClass();
+            payloadConf.set(
+                    TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_CLASS,
+                    keyClass.getName());
+            payloadConf.set(
+                    TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_VALUE_CLASS,
+                    NullableTuple.class.getName());
+            selectInputComparator(payloadConf, pack.getPkgr().getKeyType());
+        }
 
         if(tezOp.isGlobalSort() || tezOp.isLimitAfterSort()){
             if (tezOp.isGlobalSort()) {
-                FileSystem fs = FileSystem.get(conf);
+                FileSystem fs = FileSystem.get(globalConf);
                 Path quantFilePath = new Path(tezOp.getQuantFile() + "/part-r-00000");
                 FileStatus fstat = fs.getFileStatus(quantFilePath);
                 LocalResource quantFileResource = LocalResource.newInstance(
@@ -323,48 +369,66 @@ public class TezDagBuilder extends TezOp
                         fstat.getLen(),
                         fstat.getModificationTime());
                 localResources.put(quantFilePath.getName(), quantFileResource);
-                conf.set("pig.quantilesFile", fstat.getPath().toString());
-                conf.set("pig.sortOrder",
+                payloadConf.set("pig.quantilesFile", fstat.getPath().toString());
+                payloadConf.set("pig.sortOrder",
                         ObjectSerializer.serialize(tezOp.getSortOrder()));
-                conf.setClass("mapreduce.job.partitioner.class", WeightedRangePartitioner.class,
+                payloadConf.setClass("mapreduce.job.partitioner.class",
+                        WeightedRangePartitioner.class,
                         Partitioner.class);
             }
         }
 
+        payloadConf.setClass("mapreduce.outputformat.class",
+                PigOutputFormat.class, OutputFormat.class);
+
         // Serialize the execution plan
-        conf.set(PigProcessor.PLAN, ObjectSerializer.serialize(tezOp.plan));
-        UDFContext.getUDFContext().serialize(conf);
+        payloadConf.set(PigProcessor.PLAN,
+                ObjectSerializer.serialize(tezOp.plan));
+        UDFContext.getUDFContext().serialize(payloadConf);
 
         // Take our assembled configuration and create a vertex
-        byte[] userPayload = TezUtils.createUserPayloadFromConf(conf);
+        byte[] userPayload = TezUtils.createUserPayloadFromConf(payloadConf);
         procDesc.setUserPayload(userPayload);
-        // Can only set parallelism here if the parallelism isn't derived from splits
-        int parallelism = !loads ? tezOp.requestedParallelism : -1;
+        // Can only set parallelism here if the parallelism isn't derived from
+        // splits
+        int parallelism = (loads.size() == 0) ? tezOp.requestedParallelism : -1;
         Vertex vertex = new Vertex(tezOp.name(), procDesc, parallelism,
                 Resource.newInstance(tezOp.requestedMemory, tezOp.requestedCpu));
 
         Map<String, String> env = new HashMap<String, String>();
-        MRHelpers.updateEnvironmentForMRTasks(conf, env, true);
+        MRHelpers.updateEnvironmentForMRTasks(globalConf, env, true);
         vertex.setTaskEnvironment(env);
 
         vertex.setTaskLocalResources(localResources);
 
         // This could also be reduce, but we need to choose one
-        // TODO: Create new or use existing settings that are specifically for Tez.
-        vertex.setJavaOpts(MRHelpers.getMapJavaOpts(conf));
-
-        // Right now there can only be one of each of these. Will need to be more generic when there can be more.
-        if (loads){
-            vertex.addInput("PigInput", new InputDescriptor(MRInput.class.getName()).setUserPayload(MRHelpers.createMRInputPayload(userPayload, null)), MRInputAMSplitGenerator.class);
-        }
-        if (!stores.isEmpty()){
-            vertex.addOutput("PigOutput", new OutputDescriptor(MROutput.class.getName()));
+        // TODO: Create new or use existing settings that are specifically for
+        // Tez.
+        vertex.setJavaOpts(MRHelpers.getMapJavaOpts(globalConf));
+
+        // Right now there can only be one of each of these. Will need to be
+        // more generic when there can be more.
+        for (POLoad ld : loads) {
+
+            // TODO: These should get the globalConf, or a merged version that
+            // keeps settings like pig.maxCombinedSplitSize
+            vertex.addInput(ld.getOperatorKey().toString(),
+                    new InputDescriptor(MRInput.class.getName())
+                            .setUserPayload(MRHelpers.createMRInputPayload(
+                                    userPayload, null)),
+                    MRInputAMSplitGenerator.class);
+        }
+        if (!stores.isEmpty()) {
+            vertex.addOutput("PigOutput",
+                    new OutputDescriptor(MROutput.class.getName()));
         }
         return vertex;
     }
 
     /**
-     * Do the final configuration of LoadFuncs and store what goes where. This will need to be changed as the inputs get un-bundled
+     * Do the final configuration of LoadFuncs and store what goes where. This
+     * will need to be changed as the inputs get un-bundled
+     *
      * @param tezOp
      * @param conf
      * @param job
@@ -372,32 +436,34 @@ public class TezDagBuilder extends TezOp
      * @throws VisitorException
      * @throws IOException
      */
-    private boolean processLoads(TezOperator tezOp, Configuration conf, Job job)
-            throws VisitorException, IOException {
+    private List<POLoad> processLoads(TezOperator tezOp, Configuration conf,
+            Job job) throws VisitorException, IOException {
         ArrayList<FileSpec> inp = new ArrayList<FileSpec>();
         ArrayList<List<OperatorKey>> inpTargets = new ArrayList<List<OperatorKey>>();
         ArrayList<String> inpSignatureLists = new ArrayList<String>();
         ArrayList<Long> inpLimits = new ArrayList<Long>();
 
-        List<POLoad> lds = PlanHelper.getPhysicalOperators(tezOp.plan, POLoad.class);
+        List<POLoad> lds = PlanHelper.getPhysicalOperators(tezOp.plan,
+                POLoad.class);
 
-        if(lds!=null && lds.size()>0){
+        if (lds != null && lds.size() > 0) {
             for (POLoad ld : lds) {
                 LoadFunc lf = ld.getLoadFunc();
                 lf.setLocation(ld.getLFile().getFileName(), job);
 
-                //Store the inp filespecs
+                // Store the inp filespecs
                 inp.add(ld.getLFile());
             }
         }
 
-        if(lds!=null && lds.size()>0){
+        if (lds != null && lds.size() > 0) {
             for (POLoad ld : lds) {
-                //Store the target operators for tuples read
-                //from this input
-                List<PhysicalOperator> ldSucs = tezOp.plan.getSuccessors(ld);
+                // Store the target operators for tuples read
+                // from this input
+                List<PhysicalOperator> ldSucs = new ArrayList<PhysicalOperator>(
+                        tezOp.plan.getSuccessors(ld));
                 List<OperatorKey> ldSucKeys = new ArrayList<OperatorKey>();
-                if(ldSucs!=null){
+                if (ldSucs != null) {
                     for (PhysicalOperator operator2 : ldSucs) {
                         ldSucKeys.add(operator2.getOperatorKey());
                     }
@@ -405,8 +471,18 @@ public class TezDagBuilder extends TezOp
                 inpTargets.add(ldSucKeys);
                 inpSignatureLists.add(ld.getSignature());
                 inpLimits.add(ld.getLimit());
-                //Remove the POLoad from the plan
+                // Remove the POLoad from the plan
                 tezOp.plan.remove(ld);
+                // Now add the input handling operator for the Tez backend
+                // TODO: Move this upstream to the PhysicalPlan generation
+                POSimpleTezLoad tezLoad = new POSimpleTezLoad(new OperatorKey(
+                        scope, nig.getNextNodeId(scope)));
+                tezLoad.setInputKey(ld.getOperatorKey().toString());
+                tezOp.plan.add(tezLoad);
+                for (PhysicalOperator sucs : ldSucs) {
+                    tezOp.plan.connect(tezLoad, sucs);
+                }
+
             }
         }
 
@@ -415,69 +491,69 @@ public class TezDagBuilder extends TezOp
         conf.set("pig.inpSignatures", ObjectSerializer.serialize(inpSignatureLists));
         conf.set("pig.inpLimits", ObjectSerializer.serialize(inpLimits));
 
-        return (lds.size() > 0);
+        return lds;
     }
 
     @SuppressWarnings("rawtypes")
-    private void setIntermediateInputKeyValue(byte keyType, Configuration conf) throws JobCreationException, ExecException {
-        Class<? extends WritableComparable> keyClass = HDataType.getWritableComparableTypes(keyType).getClass();
-        conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_CLASS, keyClass.getName());
-        conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_VALUE_CLASS, NullableTuple.class.getName());
-        selectInputComparator(keyType, conf);
+    private void setIntermediateInputKeyValue(byte keyType, Configuration conf)
+            throws JobCreationException, ExecException {
+        Class<? extends WritableComparable> keyClass = HDataType
+                .getWritableComparableTypes(keyType).getClass();
+        conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_CLASS,
+                keyClass.getName());
+        conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_VALUE_CLASS,
+                NullableTuple.class.getName());
+        selectInputComparator(conf, keyType);
     }
 
     @SuppressWarnings("rawtypes")
-    private void setIntermediateOutputKeyValue(byte keyType, Configuration conf) throws JobCreationException, ExecException {
-        Class<? extends WritableComparable> keyClass = HDataType.getWritableComparableTypes(keyType).getClass();
-        conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS, keyClass.getName());
-        conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_VALUE_CLASS, NullableTuple.class.getName());
-        conf.set(TezJobConfig.TEZ_RUNTIME_PARTITIONER_CLASS, MRPartitioner.class.getName());
+    private void setIntermediateOutputKeyValue(byte keyType, Configuration conf)
+            throws JobCreationException, ExecException {
+        Class<? extends WritableComparable> keyClass = HDataType
+                .getWritableComparableTypes(keyType).getClass();
+        conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS,
+                keyClass.getName());
+        conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_VALUE_CLASS,
+                NullableTuple.class.getName());
+        conf.set(TezJobConfig.TEZ_RUNTIME_PARTITIONER_CLASS,
+                MRPartitioner.class.getName());
         selectOutputComparator(keyType, conf);
     }
 
-    private void selectInputComparator(byte keyType, Configuration conf) throws JobCreationException {
-        //TODO: Handle sorting like in JobControlCompiler
+    static Class<? extends WritableComparator> comparatorForKeyType(byte keyType)
+            throws JobCreationException {
+        // TODO: Handle sorting like in JobControlCompiler
 
         switch (keyType) {
         case DataType.BOOLEAN:
-            conf.setClass(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_COMPARATOR_CLASS, PigBooleanWritableComparator.class, RawComparator.class);
-            break;
+            return PigBooleanRawComparator.class;
 
         case DataType.INTEGER:
-            conf.setClass(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_COMPARATOR_CLASS, PigIntWritableComparator.class, RawComparator.class);
-            break;
+            return PigIntRawComparator.class;
 
         case DataType.BIGINTEGER:
-            conf.setClass(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_COMPARATOR_CLASS, PigBigIntegerWritableComparator.class, RawComparator.class);
-            break;
+            return PigBigIntegerRawComparator.class;
 
         case DataType.BIGDECIMAL:
-            conf.setClass(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_COMPARATOR_CLASS, PigBigDecimalWritableComparator.class, RawComparator.class);
-            break;
+            return PigBigDecimalRawComparator.class;
 
         case DataType.LONG:
-            conf.setClass(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_COMPARATOR_CLASS, PigLongWritableComparator.class, RawComparator.class);
-            break;
+            return PigLongRawComparator.class;
 
         case DataType.FLOAT:
-            conf.setClass(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_COMPARATOR_CLASS, PigFloatWritableComparator.class, RawComparator.class);
-            break;
+            return PigFloatRawComparator.class;
 
         case DataType.DOUBLE:
-            conf.setClass(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_COMPARATOR_CLASS, PigDoubleWritableComparator.class, RawComparator.class);
-            break;
+            return PigDoubleRawComparator.class;
 
         case DataType.DATETIME:
-            conf.setClass(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_COMPARATOR_CLASS, PigDateTimeWritableComparator.class, RawComparator.class);
-            break;
+            return PigDateTimeRawComparator.class;
 
         case DataType.CHARARRAY:
-            conf.setClass(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_COMPARATOR_CLASS, PigCharArrayWritableComparator.class, RawComparator.class);
-            break;
+            return PigTextRawComparator.class;
 
         case DataType.BYTEARRAY:
-            conf.setClass(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_COMPARATOR_CLASS, PigDBAWritableComparator.class, RawComparator.class);
-            break;
+            return PigBytesRawComparator.class;
 
         case DataType.MAP:
             int errCode = 1068;
@@ -485,8 +561,7 @@ public class TezDagBuilder extends TezOp
             throw new JobCreationException(msg, errCode, PigException.INPUT);
 
         case DataType.TUPLE:
-            conf.setClass(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_COMPARATOR_CLASS, PigTupleWritableComparator.class, RawComparator.class);
-            break;
+            return PigTupleSortComparator.class;
 
         case DataType.BAG:
             errCode = 1068;
@@ -500,80 +575,19 @@ public class TezDagBuilder extends TezOp
         }
     }
 
-    private void selectOutputComparator(byte keyType, Configuration conf) throws JobCreationException {
-        //TODO: Handle sorting like in JobControlCompiler
-
-        switch (keyType) {
-        case DataType.BOOLEAN:
-            conf.setClass(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS, PigBooleanWritableComparator.class, RawComparator.class);
-            conf.setClass(TezJobConfig.TEZ_RUNTIME_GROUP_COMPARATOR_CLASS, PigGroupingBooleanWritableComparator.class, RawComparator.class);
-            break;
-
-        case DataType.INTEGER:
-            conf.setClass(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS, PigIntWritableComparator.class, RawComparator.class);
-            conf.setClass(TezJobConfig.TEZ_RUNTIME_GROUP_COMPARATOR_CLASS, PigGroupingIntWritableComparator.class, RawComparator.class);
-            break;
-
-        case DataType.BIGINTEGER:
-            conf.setClass(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS, PigBigIntegerWritableComparator.class, RawComparator.class);
-            conf.setClass(TezJobConfig.TEZ_RUNTIME_GROUP_COMPARATOR_CLASS, PigGroupingBigIntegerWritableComparator.class, RawComparator.class);
-            break;
-
-        case DataType.BIGDECIMAL:
-            conf.setClass(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS, PigBigDecimalWritableComparator.class, RawComparator.class);
-            conf.setClass(TezJobConfig.TEZ_RUNTIME_GROUP_COMPARATOR_CLASS, PigGroupingBigDecimalWritableComparator.class, RawComparator.class);
-            break;
-
-        case DataType.LONG:
-            conf.setClass(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS, PigLongWritableComparator.class, RawComparator.class);
-            conf.setClass(TezJobConfig.TEZ_RUNTIME_GROUP_COMPARATOR_CLASS, PigGroupingLongWritableComparator.class, RawComparator.class);
-            break;
-
-        case DataType.FLOAT:
-            conf.setClass(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS, PigFloatWritableComparator.class, RawComparator.class);
-            conf.setClass(TezJobConfig.TEZ_RUNTIME_GROUP_COMPARATOR_CLASS, PigGroupingFloatWritableComparator.class, RawComparator.class);
-            break;
-
-        case DataType.DOUBLE:
-            conf.setClass(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS, PigDoubleWritableComparator.class, RawComparator.class);
-            conf.setClass(TezJobConfig.TEZ_RUNTIME_GROUP_COMPARATOR_CLASS, PigGroupingDoubleWritableComparator.class, RawComparator.class);
-            break;
-
-        case DataType.DATETIME:
-            conf.setClass(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS, PigDateTimeWritableComparator.class, RawComparator.class);
-            conf.setClass(TezJobConfig.TEZ_RUNTIME_GROUP_COMPARATOR_CLASS, PigGroupingDateTimeWritableComparator.class, RawComparator.class);
-            break;
-
-        case DataType.CHARARRAY:
-            conf.setClass(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS, PigCharArrayWritableComparator.class, RawComparator.class);
-            conf.setClass(TezJobConfig.TEZ_RUNTIME_GROUP_COMPARATOR_CLASS, PigGroupingCharArrayWritableComparator.class, RawComparator.class);
-            break;
-
-        case DataType.BYTEARRAY:
-            conf.setClass(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS, PigDBAWritableComparator.class, RawComparator.class);
-            conf.setClass(TezJobConfig.TEZ_RUNTIME_GROUP_COMPARATOR_CLASS, PigGroupingDBAWritableComparator.class, RawComparator.class);
-            break;
-
-        case DataType.MAP:
-            int errCode = 1068;
-            String msg = "Using Map as key not supported.";
-            throw new JobCreationException(msg, errCode, PigException.INPUT);
-
-        case DataType.TUPLE:
-            conf.setClass(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS, PigTupleWritableComparator.class, RawComparator.class);
-            conf.setClass(TezJobConfig.TEZ_RUNTIME_GROUP_COMPARATOR_CLASS, PigGroupingTupleWritableComparator.class, RawComparator.class);
-            break;
-
-        case DataType.BAG:
-            errCode = 1068;
-            msg = "Using Bag as key not supported.";
-            throw new JobCreationException(msg, errCode, PigException.INPUT);
+    void selectInputComparator(Configuration conf, byte keyType)
+            throws JobCreationException {
+        // TODO: Handle sorting like in JobControlCompiler
+        conf.setClass(
+                TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_COMPARATOR_CLASS,
+                comparatorForKeyType(keyType), RawComparator.class);
+    }
 
-        default:
-            errCode = 2036;
-            msg = "Unhandled key type " + DataType.findTypeName(keyType);
-            throw new JobCreationException(msg, errCode, PigException.BUG);
-        }
+    void selectOutputComparator(byte keyType, Configuration conf)
+            throws JobCreationException {
+        // TODO: Handle sorting like in JobControlCompiler
+        conf.setClass(
+                TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS,
+                comparatorForKeyType(keyType), RawComparator.class);
     }
 }
-