You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2014/01/23 02:06:22 UTC

svn commit: r1560565 [1/3] - in /pig/branches/tez: shims/test/hadoop23/org/apache/pig/test/ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/optimizer/ src/org/apach...

Author: rohini
Date: Thu Jan 23 01:06:21 2014
New Revision: 1560565

URL: http://svn.apache.org/r1560565
Log:
PIG-3626: Make combiners, custom partitioners and secondary key sort work for multiple outputs (rohini)

Added:
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizerMR.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/optimizer/
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/optimizer/SecondaryKeyOptimizer.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/SecondaryKeyOptimizerTez.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/util/SecondaryKeyOptimizerUtil.java
    pig/branches/tez/test/org/apache/pig/test/TestCustomPartitioner.java
    pig/branches/tez/test/org/apache/pig/test/TestSecondarySortMR.java
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC14.gld
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC15.gld
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC16.gld
    pig/branches/tez/test/org/apache/pig/tez/TestSecondarySortTez.java
Modified:
    pig/branches/tez/shims/test/hadoop23/org/apache/pig/test/MiniCluster.java
    pig/branches/tez/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java
    pig/branches/tez/src/org/apache/pig/PigConfiguration.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ColumnInfo.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/CombinerOptimizer.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/SkewedPartitionerTez.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompilerUtil.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezEdgeDescriptor.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPrinter.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java
    pig/branches/tez/src/org/apache/pig/impl/io/NullableTuple.java
    pig/branches/tez/src/org/apache/pig/impl/io/PigNullableWritable.java
    pig/branches/tez/test/org/apache/pig/test/MiniGenericCluster.java
    pig/branches/tez/test/org/apache/pig/test/TestAccumulator.java
    pig/branches/tez/test/org/apache/pig/test/TestCombiner.java
    pig/branches/tez/test/org/apache/pig/test/TestEvalPipeline2.java
    pig/branches/tez/test/org/apache/pig/test/TestPigServer.java
    pig/branches/tez/test/org/apache/pig/test/TestSecondarySort.java
    pig/branches/tez/test/org/apache/pig/test/TestSkewedJoin.java
    pig/branches/tez/test/org/apache/pig/test/TestSplitStore.java
    pig/branches/tez/test/org/apache/pig/test/Util.java
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC4.gld
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC5.gld
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC7.gld
    pig/branches/tez/test/org/apache/pig/test/data/GoldenFiles/TEZC8.gld
    pig/branches/tez/test/org/apache/pig/tez/TestTezCompiler.java
    pig/branches/tez/test/tez-tests

Modified: pig/branches/tez/shims/test/hadoop23/org/apache/pig/test/MiniCluster.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/shims/test/hadoop23/org/apache/pig/test/MiniCluster.java?rev=1560565&r1=1560564&r2=1560565&view=diff
==============================================================================
--- pig/branches/tez/shims/test/hadoop23/org/apache/pig/test/MiniCluster.java (original)
+++ pig/branches/tez/shims/test/hadoop23/org/apache/pig/test/MiniCluster.java Thu Jan 23 01:06:21 2014
@@ -22,10 +22,11 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.filecache.DistributedCache;
-import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapreduce.filecache.DistributedCache;
+import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
+import org.apache.pig.ExecType;
 
 /**
  * This class builds a single instance of itself with the Singleton
@@ -48,7 +49,12 @@ public class MiniCluster extends MiniGen
     @Deprecated
     public static MiniCluster buildCluster() {
         System.setProperty("test.exec.type", "mr");
-        return (MiniCluster)MiniGenericCluster.buildCluster();
+        return (MiniCluster)MiniGenericCluster.buildCluster("mr");
+    }
+
+    @Override
+    protected ExecType getExecType() {
+        return ExecType.MAPREDUCE;
     }
 
     @Override

Modified: pig/branches/tez/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java?rev=1560565&r1=1560564&r2=1560565&view=diff
==============================================================================
--- pig/branches/tez/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java (original)
+++ pig/branches/tez/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java Thu Jan 23 01:06:21 2014
@@ -27,9 +27,13 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.pig.ExecType;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezExecType;
 import org.apache.pig.backend.hadoop.executionengine.tez.TezSessionManager;
+import org.apache.tez.common.TezJobConfig;
 
 public class TezMiniCluster extends MiniGenericCluster {
     private static final File CONF_DIR = new File("build/classes");
@@ -38,26 +42,21 @@ public class TezMiniCluster extends Mini
     private static final File CORE_CONF_FILE = new File(CONF_DIR, "core-site.xml");
     private static final File HDFS_CONF_FILE = new File(CONF_DIR, "hdfs-site.xml");
     private static final File MAPRED_CONF_FILE = new File(CONF_DIR, "mapred-site.xml");
+    private static final ExecType TEZ = new TezExecType();
 
     protected MiniMRYarnCluster m_mr = null;
     private Configuration m_dfs_conf = null;
     private Configuration m_mr_conf = null;
 
     @Override
+    protected ExecType getExecType() {
+        return TEZ;
+    }
+
+    @Override
     public void setupMiniDfsAndMrClusters() {
         try {
-            if (TEZ_CONF_FILE.exists()) {
-                TEZ_CONF_FILE.delete();
-            }
-            if (CORE_CONF_FILE.exists()) {
-                CORE_CONF_FILE.delete();
-            }
-            if (HDFS_CONF_FILE.exists()) {
-                HDFS_CONF_FILE.delete();
-            }
-            if (MAPRED_CONF_FILE.exists()) {
-                MAPRED_CONF_FILE.delete();
-            }
+            deleteConfFiles();
             CONF_DIR.mkdirs();
 
             // Build mini DFS cluster
@@ -86,7 +85,8 @@ public class TezMiniCluster extends Mini
             m_mr_conf.set("mapreduce.framework.name", "yarn-tez");
             m_mr_conf.set(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
                     System.getProperty("java.class.path"));
-
+            // TODO PIG-3659 - Remove this once memory management is fixed
+            m_mr_conf.set(MRJobConfig.MAP_JAVA_OPTS, "-Xmx384M");// -Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8005 -Xnoagent -Djava.compiler=NONE");
             m_mr_conf.writeXml(new FileOutputStream(MAPRED_CONF_FILE));
             m_fileSys.copyFromLocalFile(
                     new Path(MAPRED_CONF_FILE.getAbsoluteFile().toString()),
@@ -103,6 +103,8 @@ public class TezMiniCluster extends Mini
 
             // Write tez-site.xml
             Configuration tez_conf = new Configuration(false);
+            // TODO PIG-3659 - Remove this once memory management is fixed
+            tez_conf.set(TezJobConfig.TEZ_RUNTIME_IO_SORT_MB, "20");
             tez_conf.set("tez.lib.uris", "hdfs:///tez,hdfs:///tez/lib");
             tez_conf.writeXml(new FileOutputStream(TEZ_CONF_FILE));
             m_fileSys.copyFromLocalFile(
@@ -141,6 +143,14 @@ public class TezMiniCluster extends Mini
 
     @Override
     protected void shutdownMiniMrClusters() {
+        deleteConfFiles();
+        if (m_mr != null) {
+            m_mr.stop();
+            m_mr = null;
+        }
+    }
+
+    private void deleteConfFiles() {
         if(TEZ_CONF_FILE.exists()) {
             TEZ_CONF_FILE.delete();
         }
@@ -153,9 +163,5 @@ public class TezMiniCluster extends Mini
         if(MAPRED_CONF_FILE.exists()) {
             MAPRED_CONF_FILE.delete();
         }
-        if (m_mr != null) {
-            m_mr.stop();
-            m_mr = null;
-        }
     }
 }

Modified: pig/branches/tez/src/org/apache/pig/PigConfiguration.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/PigConfiguration.java?rev=1560565&r1=1560564&r2=1560565&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/PigConfiguration.java (original)
+++ pig/branches/tez/src/org/apache/pig/PigConfiguration.java Thu Jan 23 01:06:21 2014
@@ -96,13 +96,13 @@ public class PigConfiguration {
      * will be set in the environment.
      */
     public static final String PIG_STREAMING_ENVIRONMENT = "pig.streaming.environment";
-    
+
     /**
      * This key is used to define the default load func. Pig will fallback on PigStorage
      * as default in case this is undefined.
      */
     public static final String PIG_DEFAULT_LOAD_FUNC = "pig.default.load.func";
-    
+
     /**
      * This key is used to define the default store func. Pig will fallback on PigStorage
      * as default in case this is undefined.
@@ -180,5 +180,11 @@ public class PigConfiguration {
      * Smaller files are combined untill this size is reached.
      */
     public static final String PIG_MAX_COMBINED_SPLIT_SIZE = "pig.maxCombinedSplitSize";
+
+    /**
+     * This key controls whether secondary sort key is used for optimization in case
+     * of nested distinct or sort
+     */
+    public static final String PIG_EXEC_NO_SECONDARY_KEY = "pig.exec.nosecondarykey";
 }
 

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ColumnInfo.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ColumnInfo.java?rev=1560565&r1=1560564&r2=1560565&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ColumnInfo.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ColumnInfo.java Thu Jan 23 01:06:21 2014
@@ -28,7 +28,7 @@ public class ColumnInfo implements Clone
     byte resultType;
     int startCol = -1;
     boolean isRangeProject = false;
-    
+
     public ColumnInfo(List<Integer> columns, byte type) {
         this.columns = columns;
         this.resultType = type;
@@ -44,7 +44,24 @@ public class ColumnInfo implements Clone
         this.resultType = type;
         this.isRangeProject = true;
     }
-    
+
+    public byte getResultType() {
+        return resultType;
+    }
+
+    public int getStartCol() {
+        return startCol;
+    }
+
+    public boolean isRangeProject() {
+        return isRangeProject;
+    }
+
+    public List<Integer> getColumns() {
+        return columns;
+    }
+
+    @Override
     public String toString() {
         String result;
         if (isStar())
@@ -53,32 +70,33 @@ public class ColumnInfo implements Clone
         result+=DataType.findTypeName(resultType);
         return result;
     }
-    
+
     private boolean isStar() {
         return isRangeProject && startCol == 0;
     }
 
+    @Override
     public boolean equals(Object o2)
     {
-        
+
         if (o2 == null || !(o2 instanceof ColumnInfo))
             return false;
         ColumnInfo c2 = (ColumnInfo)o2;
         if (
                 isRangeProject == c2.isRangeProject &&
                 startCol == c2.startCol &&
-                ((columns == null && c2.columns == null) || 
+                ((columns == null && c2.columns == null) ||
                         (columns != null && columns.equals(c2.columns)))
         )
             return true;
-        
+
         return false;
     }
     @Override
     public int hashCode() {
         return toString().hashCode();
     }
-    
+
     @Override
     public Object clone() throws CloneNotSupportedException{
         ColumnInfo newColInfo = (ColumnInfo)super.clone();
@@ -87,6 +105,6 @@ public class ColumnInfo implements Clone
         cols.addAll(this.columns);
         newColInfo.columns = cols;
         return newColInfo;
-        
+
     }
 }

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=1560565&r1=1560564&r2=1560565&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Thu Jan 23 01:06:21 2014
@@ -95,6 +95,7 @@ public class MapReduceLauncher extends L
 
     private boolean aggregateWarning = false;
 
+    @Override
     public void kill() {
         try {
             log.debug("Receive kill signal");
@@ -111,6 +112,7 @@ public class MapReduceLauncher extends L
         }
     }
 
+    @Override
     public void killJob(String jobID, Configuration conf) throws BackendException {
         try {
             if (conf != null) {
@@ -639,9 +641,9 @@ public class MapReduceLauncher extends L
         la.adjust();
         }
         // Optimize to use secondary sort key if possible
-        prop = pc.getProperties().getProperty("pig.exec.nosecondarykey");
+        prop = pc.getProperties().getProperty(PigConfiguration.PIG_EXEC_NO_SECONDARY_KEY);
         if (!pc.inIllustrator && !("true".equals(prop)))  {
-            SecondaryKeyOptimizer skOptimizer = new SecondaryKeyOptimizer(plan);
+            SecondaryKeyOptimizerMR skOptimizer = new SecondaryKeyOptimizerMR(plan);
             skOptimizer.visit();
         }
 

Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizerMR.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizerMR.java?rev=1560565&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizerMR.java (added)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/SecondaryKeyOptimizerMR.java Thu Jan 23 01:06:21 2014
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
+import org.apache.pig.backend.hadoop.executionengine.optimizer.SecondaryKeyOptimizer;
+import org.apache.pig.backend.hadoop.executionengine.util.SecondaryKeyOptimizerUtil;
+import org.apache.pig.backend.hadoop.executionengine.util.SecondaryKeyOptimizerUtil.SecondaryKeyOptimizerInfo;
+import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.VisitorException;
+
+@InterfaceAudience.Private
+public class SecondaryKeyOptimizerMR extends MROpPlanVisitor implements SecondaryKeyOptimizer {
+    private static Log log = LogFactory.getLog(SecondaryKeyOptimizerMR.class);
+    private SecondaryKeyOptimizerInfo info;
+
+    /**
+     * @param plan
+     *            The MROperPlan to visit to discover keyType
+     */
+    public SecondaryKeyOptimizerMR(MROperPlan plan) {
+        super(plan, new DepthFirstWalker<MapReduceOper, MROperPlan>(plan));
+    }
+
+
+    @Override
+    public void visitMROp(MapReduceOper mr) throws VisitorException {
+        // Only optimize for Cogroup case
+        if (mr.isGlobalSort())
+            return;
+
+        info = SecondaryKeyOptimizerUtil.applySecondaryKeySort(mr.mapPlan, mr.reducePlan);
+        if (info != null && info.isUseSecondaryKey()) {
+            mr.setUseSecondaryKey(true);
+            mr.setSecondarySortOrder(info.getSecondarySortOrder());
+            log.info("Using Secondary Key Optimization for MapReduce node " + mr.getOperatorKey());
+        }
+    }
+
+
+    @Override
+    public int getNumSortRemoved() {
+        return (info == null) ? 0 : info.getNumSortRemoved();
+    }
+
+    @Override
+    public int getNumDistinctChanged() {
+        return (info == null) ? 0 : info.getNumDistinctChanged();
+    }
+
+    @Override
+    public int getNumUseSecondaryKey() {
+        return (info == null) ? 0 : info.getNumUseSecondaryKey();
+    }
+
+}

Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/optimizer/SecondaryKeyOptimizer.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/optimizer/SecondaryKeyOptimizer.java?rev=1560565&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/optimizer/SecondaryKeyOptimizer.java (added)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/optimizer/SecondaryKeyOptimizer.java Thu Jan 23 01:06:21 2014
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.optimizer;
+
+import org.apache.pig.classification.InterfaceAudience;
+
+@InterfaceAudience.Private
+/**
+ * Remove POSort and change PODistinct to use POSortedDistinct in nested foreach plan by sorting on a secondary key
+ */
+public interface SecondaryKeyOptimizer {
+
+    int getNumSortRemoved();
+
+    int getNumDistinctChanged();
+
+    int getNumUseSecondaryKey();
+
+}

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java?rev=1560565&r1=1560564&r2=1560565&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java Thu Jan 23 01:06:21 2014
@@ -181,6 +181,10 @@ public abstract class PhysicalOperator e
         return (alias == null) ? "" : (alias + ": ");
     }
 
+    public void setAlias(String alias) {
+        this.alias = alias;
+    }
+
     public void addOriginalLocation(String alias, SourceLocation sourceLocation) {
         this.alias = alias;
         this.originalLocations.add(new OriginalLocation(alias, sourceLocation.line(), sourceLocation.offset()));
@@ -282,9 +286,7 @@ public abstract class PhysicalOperator e
         try {
             if (input == null && (inputs == null || inputs.size() == 0)) {
                 // log.warn("No inputs found. Signaling End of Processing.");
-                Result res = new Result();
-                res.returnStatus = POStatus.STATUS_EOP;
-                return res;
+                return new Result(POStatus.STATUS_EOP, null);
             }
 
             // Should be removed once the model is clear

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java?rev=1560565&r1=1560564&r2=1560565&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java Thu Jan 23 01:06:21 2014
@@ -81,7 +81,7 @@ public class POLocalRearrange extends Ph
     protected boolean mIsDistinct = false;
 
     protected boolean isCross = false;
-    
+
     protected Result inp;
 
     // map to store mapping of projected columns to
@@ -529,11 +529,16 @@ public class POLocalRearrange extends Ph
         return keyType;
     }
 
+    public byte getMainKeyType() {
+        return mainKeyType;
+    }
+
     public void setKeyType(byte keyType) {
         if (useSecondaryKey) {
             this.mainKeyType = keyType;
         } else {
             this.keyType = keyType;
+            this.mainKeyType = keyType;
         }
     }
 

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java?rev=1560565&r1=1560564&r2=1560565&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java Thu Jan 23 01:06:21 2014
@@ -83,9 +83,9 @@ public class POReservoirSample extends P
         // populate the samples array with first numSamples tuples
         Result res = null;
         int rowProcessed = 0;
-        while (rowProcessed<numSamples) {
+        while (rowProcessed < numSamples) {
             res = processInput();
-            if(res.returnStatus == POStatus.STATUS_OK) {
+            if (res.returnStatus == POStatus.STATUS_OK) {
                 samples[rowProcessed] = res;
                 rowProcessed++;
             } else if (res.returnStatus == POStatus.STATUS_NULL) {
@@ -95,14 +95,14 @@ public class POReservoirSample extends P
             }
         }
 
-        int rowNum = numSamples+1;
+        int rowNum = numSamples + 1;
         Random randGen = new Random();
 
-        if(res.returnStatus == POStatus.STATUS_OK){ // did not exhaust all tuples
-            while(true){
+        if (res.returnStatus == POStatus.STATUS_OK) { // did not exhaust all tuples
+            while (true) {
                 // pick this as sample
                 Result sampleResult = processInput();
-                if(sampleResult.returnStatus == POStatus.STATUS_NULL) {
+                if (sampleResult.returnStatus == POStatus.STATUS_NULL) {
                     continue;
                 } else if (sampleResult.returnStatus != POStatus.STATUS_OK) {
                     break;
@@ -110,7 +110,7 @@ public class POReservoirSample extends P
 
                 // collect samples until input is exhausted
                 int rand = randGen.nextInt(rowNum);
-                if(rand < numSamples){
+                if (rand < numSamples) {
                     samples[rand] = sampleResult;
                 }
                 rowNum++;
@@ -125,11 +125,14 @@ public class POReservoirSample extends P
             if (illustrator != null) {
                 illustratorMarkup(samples[nextSampleIdx].result, samples[nextSampleIdx].result, 0);
             }
-            return samples[nextSampleIdx++];
+             Result res = samples[nextSampleIdx++];
+             if (res == null) { // Input data has lesser rows than numSamples
+                 return new Result(POStatus.STATUS_EOP, null);
+             }
+             return res;
         }
         else{
-            Result res = new Result();
-            res.returnStatus = POStatus.STATUS_EOP;
+            Result res = new Result(POStatus.STATUS_EOP, null);
             return res;
         }
     }

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java?rev=1560565&r1=1560564&r2=1560565&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java Thu Jan 23 01:06:21 2014
@@ -19,6 +19,7 @@ package org.apache.pig.backend.hadoop.ex
 
 import java.net.URI;
 import java.util.LinkedList;
+import java.util.List;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -76,6 +77,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
 import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
 
 import com.google.common.collect.Lists;
@@ -128,6 +130,25 @@ public class PlanHelper {
         return finder.getFoundOps();
     }
 
+    /**
+     * Finds POLocalRearrange from POSplit sub-plan
+     * @param plan physical plan
+     * @param rearrangeKey operator key of the POLocalRearrange
+     * @return POLocalRearrange with the specified operator key which is in a sub-plan of POSplit
+     * @throws VisitorException
+     */
+    public static PhysicalPlan getLocalRearrangePlanFromSplit(PhysicalPlan plan, OperatorKey rearrangeKey) throws VisitorException {
+        List<POSplit> splits = PlanHelper.getPhysicalOperators(plan, POSplit.class);
+        for (POSplit split : splits) {
+            for (PhysicalPlan subPlan : split.getPlans()) {
+                if (subPlan.getOperator(rearrangeKey) != null) {
+                    return subPlan;
+                }
+            }
+        }
+        return plan;
+    }
+
     private static class OpFinder<C extends PhysicalOperator> extends PhyPlanVisitor {
 
         final Class<C> opClass;
@@ -425,6 +446,7 @@ public class PlanHelper {
             visit(stream);
         }
 
+        @Override
         public void visitSkewedJoin(POSkewedJoin sk) throws VisitorException {
             super.visitSkewedJoin(sk);
             visit(sk);

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/CombinerOptimizer.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/CombinerOptimizer.java?rev=1560565&r1=1560564&r2=1560565&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/CombinerOptimizer.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/CombinerOptimizer.java Thu Jan 23 01:06:21 2014
@@ -20,13 +20,11 @@ package org.apache.pig.backend.hadoop.ex
 import java.util.List;
 
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
 import org.apache.pig.backend.hadoop.executionengine.util.CombinerOptimizerUtil;
 import org.apache.pig.impl.plan.CompilationMessageCollector;
 import org.apache.pig.impl.plan.DepthFirstWalker;
-import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.VisitorException;
 
 /**
@@ -34,7 +32,6 @@ import org.apache.pig.impl.plan.VisitorE
  */
 public class CombinerOptimizer extends TezOpPlanVisitor {
     private CompilationMessageCollector messageCollector = null;
-    private TezOperPlan parentPlan;
     private boolean doMapAgg;
 
     public CombinerOptimizer(TezOperPlan plan, boolean doMapAgg) {
@@ -46,7 +43,6 @@ public class CombinerOptimizer extends T
         super(plan, new DepthFirstWalker<TezOperator, TezOperPlan>(plan));
         this.messageCollector = messageCollector;
         this.doMapAgg = doMapAgg;
-        this.parentPlan = plan;
     }
 
     public CompilationMessageCollector getMessageCollector() {
@@ -60,37 +56,38 @@ public class CombinerOptimizer extends T
             return;
         }
 
-        List<TezOperator> predecessors = parentPlan.getPredecessors(to);
+        List<TezOperator> predecessors = mPlan.getPredecessors(to);
         if (predecessors == null) {
             return;
         }
 
         for (TezOperator from : predecessors) {
-            List<POLocalRearrange> rearranges = PlanHelper.getPhysicalOperators(from.plan, POLocalRearrange.class);
+            List<POLocalRearrangeTez> rearranges = PlanHelper.getPhysicalOperators(from.plan, POLocalRearrangeTez.class);
             if (rearranges.isEmpty()) {
                 continue;
             }
 
+            POLocalRearrangeTez connectingLR = null;
+            PhysicalPlan rearrangePlan = from.plan;
+            for (POLocalRearrangeTez lr : rearranges) {
+                if (lr.getOutputKey().equals(to.getOperatorKey().toString())) {
+                    connectingLR = lr;
+                    break;
+                }
+            }
+
+            if (from.plan.getOperator(connectingLR.getOperatorKey()) == null) {
+                // The POLocalRearrange is sub-plan of a POSplit
+                rearrangePlan = PlanHelper.getLocalRearrangePlanFromSplit(from.plan, connectingLR.getOperatorKey());
+            }
+
             // Detected the POLocalRearrange -> POPackage pattern. Let's add
             // combiner if possible.
             PhysicalPlan combinePlan = to.inEdges.get(from.getOperatorKey()).combinePlan;
-            // TODO: Right now, CombinerOptimzerUtil doesn't handle a single map
-            // plan with multiple POLocalRearrange leaves. i.e. SPLIT + multiple
-            // GROUP BY with different keys.
-            CombinerOptimizerUtil.addCombiner(from.plan, to.plan, combinePlan, messageCollector, doMapAgg);
-
-            //Replace POLocalRearrange with POLocalRearrangeTez
-            if (!combinePlan.isEmpty()) {
-                POLocalRearrange lr = (POLocalRearrange) combinePlan.getLeaves().get(0);
-                POLocalRearrangeTez lrt = new POLocalRearrangeTez(lr);
-                lrt.setOutputKey(to.getOperatorKey().toString());
-                try {
-                    combinePlan.replace(lr, lrt);
-                } catch (PlanException e) {
-                    throw new VisitorException(e);
-                }
-            }
+            CombinerOptimizerUtil.addCombiner(rearrangePlan, to.plan, combinePlan, messageCollector, doMapAgg);
+
         }
     }
+
 }
 

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java?rev=1560565&r1=1560564&r2=1560565&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/POShuffleTezLoad.java Thu Jan 23 01:06:21 2014
@@ -25,9 +25,7 @@ import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.WritableComparator;
-import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
@@ -39,6 +37,7 @@ import org.apache.pig.impl.io.NullableTu
 import org.apache.pig.impl.io.PigNullableWritable;
 import org.apache.tez.runtime.api.LogicalInput;
 import org.apache.tez.runtime.library.api.KeyValuesReader;
+import org.apache.tez.runtime.library.common.ConfigUtils;
 import org.apache.tez.runtime.library.input.ShuffledMergedInput;
 
 public class POShuffleTezLoad extends POPackage implements TezLoad {
@@ -62,12 +61,8 @@ public class POShuffleTezLoad extends PO
     @Override
     public void attachInputs(Map<String, LogicalInput> inputs, Configuration conf)
             throws ExecException {
-        try {
-            comparator = ReflectionUtils.newInstance(
-                    TezDagBuilder.comparatorForKeyType(pkgr.getKeyType(), isSkewedJoin), conf);
-        } catch (JobCreationException e) {
-            throw new ExecException(e);
-        }
+
+        comparator = (WritableComparator) ConfigUtils.getInputKeySecondaryGroupingComparator(conf);
         try {
             for (String key : inputKeys) {
                 LogicalInput input = inputs.get(key);
@@ -105,7 +100,7 @@ public class POShuffleTezLoad extends PO
         while (res.returnStatus == POStatus.STATUS_EOP) {
             boolean hasData = false;
             Object cur = null;
-            Object min = null;
+            PigNullableWritable min = null;
 
             try {
                 for (int i = 0; i < numInputs; i++) {
@@ -113,11 +108,12 @@ public class POShuffleTezLoad extends PO
                         hasData = true;
                         cur = readers.get(i).getCurrentKey();
                         if (min == null || comparator.compare(min, cur) > 0) {
-                            min = cur;
+                            min = PigNullableWritable.newInstance((PigNullableWritable)cur);
+                            cur = min;
                         }
                     }
                 }
-            } catch (IOException e) {
+            } catch (Exception e) {
                 throw new ExecException(e);
             }
 
@@ -125,7 +121,7 @@ public class POShuffleTezLoad extends PO
                 return new Result(POStatus.STATUS_EOP, null);
             }
 
-            key = pkgr.getKey((PigNullableWritable) min);
+            key = pkgr.getKey(min);
 
             DataBag[] bags = new DataBag[numInputs];
             POPackageTupleBuffer buffer = new POPackageTupleBuffer();
@@ -133,11 +129,13 @@ public class POShuffleTezLoad extends PO
 
             try {
                 for (int i = 0; i < numInputs; i++) {
+
                     DataBag bag = null;
 
                     if (!finished[i]) {
                         cur = readers.get(i).getCurrentKey();
-                        if (comparator.compare(min, cur) == 0) {
+                        // We need to loop in case of Grouping Comparators
+                        while (comparator.compare(min, cur) == 0) {
                             Iterable<Object> vals = readers.get(i).getCurrentValues();
                             if (isAccumulative()) {
                                 // TODO: POPackageTupleBuffer expects the
@@ -150,14 +148,21 @@ public class POShuffleTezLoad extends PO
                                 // into a new list and pass the iterator of this
                                 // new list.
                                 for (Object val : vals) {
-                                    nTups.add((NullableTuple) val);
+                                    // Make a copy of key and val and avoid reference.
+                                    // getCurrentKey() or value iterator resets value
+                                    // on the same object by calling readFields() again.
+                                    nTups.add(new NullableTuple((NullableTuple) val));
                                 }
                                 // Attach input to POPackageTupleBuffer
-                                buffer.setKey(cur);
                                 buffer.setIterator(nTups.iterator());
-                                bag = new AccumulativeBag(buffer, i);
+                                if(bags[i] == null) {
+                                    buffer.setKey(cur);
+                                    bag = new AccumulativeBag(buffer, i);
+                                } else {
+                                    bag = bags[i];
+                                }
                             } else {
-                                bag = new InternalCachedBag(numInputs);
+                                bag = bags[i] == null? new InternalCachedBag(numInputs) : bags[i];
                                 for (Object val : vals) {
                                     NullableTuple nTup = (NullableTuple) val;
                                     int index = nTup.getIndex();
@@ -165,8 +170,12 @@ public class POShuffleTezLoad extends PO
                                     bag.add(tup);
                                 }
                             }
-                            finished[i] = !readers.get(i).next();
                             bags[i] = bag;
+                            finished[i] = !readers.get(i).next();
+                            if (finished[i]) {
+                                break;
+                            }
+                            cur = readers.get(i).getCurrentKey();
                         }
                     }
 

Added: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/SecondaryKeyOptimizerTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/SecondaryKeyOptimizerTez.java?rev=1560565&view=auto
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/SecondaryKeyOptimizerTez.java (added)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/SecondaryKeyOptimizerTez.java Thu Jan 23 01:06:21 2014
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.tez;
+
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.hadoop.executionengine.optimizer.SecondaryKeyOptimizer;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import org.apache.pig.backend.hadoop.executionengine.util.SecondaryKeyOptimizerUtil;
+import org.apache.pig.backend.hadoop.executionengine.util.SecondaryKeyOptimizerUtil.SecondaryKeyOptimizerInfo;
+import org.apache.pig.classification.InterfaceAudience;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.VisitorException;
+
+@InterfaceAudience.Private
+public class SecondaryKeyOptimizerTez extends TezOpPlanVisitor implements SecondaryKeyOptimizer {
+
+    private static Log log = LogFactory.getLog(SecondaryKeyOptimizerTez.class);
+
+    private int numSortRemoved = 0;
+    private int numDistinctChanged = 0;
+    private int numUseSecondaryKey = 0;
+
+    public SecondaryKeyOptimizerTez(TezOperPlan plan) {
+        super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan));
+    }
+
+    @Override
+    public void visitTezOp(TezOperator to) throws VisitorException {
+
+        List<TezOperator> predecessors = mPlan.getPredecessors(to);
+        if (predecessors == null) {
+            return;
+        }
+
+        for (TezOperator from : predecessors) {
+            List<POLocalRearrangeTez> rearranges = PlanHelper.getPhysicalOperators(from.plan, POLocalRearrangeTez.class);
+            if (rearranges.isEmpty()) {
+                continue;
+            }
+
+            POLocalRearrangeTez connectingLR = null;
+            PhysicalPlan rearrangePlan = from.plan;
+            for (POLocalRearrangeTez lr : rearranges) {
+                if (lr.getOutputKey().equals(to.getOperatorKey().toString())) {
+                    connectingLR = lr;
+                    break;
+                }
+            }
+
+            // Detected the POLocalRearrange -> POPackage pattern. Let's add
+            // combiner if possible.
+            TezEdgeDescriptor inEdge = to.inEdges.get(from.getOperatorKey());
+            // Only optimize for Cogroup case
+            if (from.isGlobalSort()) {
+                return;
+            }
+
+            // If there is a custom partitioner do not do secondary key optimization.
+            // MR SecondaryKeyOptimizer currently does not check for this case.
+            if (inEdge.partitionerClass != null) {
+                return;
+            }
+
+            if (from.plan.getOperator(connectingLR.getOperatorKey()) == null) {
+                // The POLocalRearrange is sub-plan of a POSplit
+                rearrangePlan = PlanHelper.getLocalRearrangePlanFromSplit(from.plan, connectingLR.getOperatorKey());
+            }
+
+            //TODO: Case of from plan leaf being POUnion.
+            SecondaryKeyOptimizerInfo info = SecondaryKeyOptimizerUtil.applySecondaryKeySort(rearrangePlan, to.plan);
+            if (info != null) {
+                numSortRemoved += info.getNumSortRemoved();
+                numDistinctChanged += info.getNumDistinctChanged();
+                numUseSecondaryKey += info.getNumUseSecondaryKey();
+                if (info.isUseSecondaryKey()) {
+                    // Set it on the receiving vertex and the connecting edge.
+                    to.setUseSecondaryKey(true);
+                    inEdge.setUseSecondaryKey(true);
+                    inEdge.setSecondarySortOrder(info.getSecondarySortOrder());
+                    log.info("Using Secondary Key Optimization in the edge between vertex - "
+                            + to.getOperatorKey()
+                            + " and vertex - "
+                            + from.getOperatorKey());
+                }
+            }
+        }
+    }
+
+    @Override
+    public int getNumSortRemoved() {
+        return numSortRemoved;
+    }
+
+    @Override
+    public int getNumDistinctChanged() {
+        return numDistinctChanged;
+    }
+
+    @Override
+    public int getNumUseSecondaryKey() {
+        return numUseSecondaryKey;
+    }
+
+}

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/SkewedPartitionerTez.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/SkewedPartitionerTez.java?rev=1560565&r1=1560564&r2=1560565&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/SkewedPartitionerTez.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/SkewedPartitionerTez.java Thu Jan 23 01:06:21 2014
@@ -43,12 +43,11 @@ public class SkewedPartitionerTez extend
 
         try {
             if (distMap != null) {
-                Integer[] totalReducers = new Integer[1];
 
                 // The distMap is structured as (key, min, max) where min, max
                 // being the index of the reducers
                 DataBag partitionList = (DataBag) distMap.get(PartitionSkewedKeys.PARTITION_LIST);
-                totalReducers[0] = Integer.valueOf("" + distMap.get(PartitionSkewedKeys.TOTAL_REDUCERS));
+                totalReducers = Integer.valueOf("" + distMap.get(PartitionSkewedKeys.TOTAL_REDUCERS));
                 Iterator<Tuple> it = partitionList.iterator();
                 while (it.hasNext()) {
                     Tuple idxTuple = it.next();
@@ -56,7 +55,7 @@ public class SkewedPartitionerTez extend
                     Integer minIndex = (Integer) idxTuple.get(idxTuple.size() - 2);
                     // Used to replace the maxIndex with the number of reducers
                     if (maxIndex < minIndex) {
-                        maxIndex = totalReducers[0] + maxIndex;
+                        maxIndex = totalReducers + maxIndex;
                     }
 
                     Tuple keyT;

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java?rev=1560565&r1=1560564&r2=1560565&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java Thu Jan 23 01:06:21 2014
@@ -92,9 +92,9 @@ import org.apache.pig.impl.plan.Operator
 import org.apache.pig.impl.plan.OperatorPlan;
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.CompilerUtils;
 import org.apache.pig.impl.util.MultiMap;
+import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.Pair;
 import org.apache.pig.impl.util.Utils;
 import org.apache.pig.newplan.logical.relational.LOJoin;
@@ -237,12 +237,6 @@ public class TezCompiler extends PhyPlan
 
         for (TezOperator tezOper : splitsSeen.values()) {
             int idx = 0;
-            List<POLocalRearrange> rearranges = PlanHelper
-                    .getPhysicalOperators(tezOper.plan, POLocalRearrange.class);
-            for (POLocalRearrange op : rearranges) {
-                op.setIndex(idx++);
-            }
-            idx = 0;
             List<POStore> strs = PlanHelper.getPhysicalOperators(tezOper.plan,
                     POStore.class);
             for (POStore op : strs) {
@@ -268,7 +262,6 @@ public class TezCompiler extends PhyPlan
                 parentOper.outEdges.put(entry.getKey(), entry.getValue());
             }
         }
-        // TODO: handle custom partitioner, secondary key on edges
     }
 
     private void connectSoftLink() throws PlanException, IOException {
@@ -405,12 +398,18 @@ public class TezCompiler extends PhyPlan
         tezPlan.add(newTezOp);
         for (TezOperator tezOp : compiledInputs) {
             tezOp.setClosed(true);
-            handleSplitAndConnect(tezOp, newTezOp);
+            handleSplitAndConnect(tezPlan, tezOp, newTezOp);
         }
         curTezOp = newTezOp;
     }
 
-    private void handleSplitAndConnect(TezOperator from, TezOperator to)
+    private TezEdgeDescriptor handleSplitAndConnect(TezOperPlan tezPlan, TezOperator from, TezOperator to)
+            throws PlanException {
+        return handleSplitAndConnect(tezPlan, from, to, true);
+    }
+
+    private TezEdgeDescriptor handleSplitAndConnect(TezOperPlan tezPlan,
+            TezOperator from, TezOperator to, boolean addToSplitPlan)
             throws PlanException {
         // Add edge descriptors from POLocalRearrange in POSplit
         // sub-plan to new operators
@@ -420,19 +419,23 @@ public class TezCompiler extends PhyPlan
             POLocalRearrangeTez lr = (POLocalRearrangeTez) leaf;
             lr.setOutputKey(to.getOperatorKey().toString());
         }
+        TezEdgeDescriptor edge = null;
         if (from.isSplitSubPlan()) {
-            // Set inputs to null as POSplit will attach input to roots
-            for (PhysicalOperator root : from.plan.getRoots()) {
-                root.setInputs(null);
-            }
             TezOperator splitOp = splitsSeen.get(from.getSplitOperatorKey());
-            POSplit split = findPOSplit(splitOp, from.getSplitOperatorKey());
-            split.addPlan(from.plan);
-            addSubPlanPropertiesToParent(splitOp, curTezOp);
-            TezCompilerUtil.connect(tezPlan, splitOp, to);
+            if (addToSplitPlan) {
+                // Set inputs to null as POSplit will attach input to roots
+                for (PhysicalOperator root : from.plan.getRoots()) {
+                    root.setInputs(null);
+                }
+                POSplit split = findPOSplit(splitOp, from.getSplitOperatorKey());
+                split.addPlan(from.plan);
+                addSubPlanPropertiesToParent(splitOp, curTezOp);
+            }
+            edge = TezCompilerUtil.connect(tezPlan, splitOp, to);
         } else {
-            TezCompilerUtil.connect(tezPlan, from, to);
+            edge = TezCompilerUtil.connect(tezPlan, from, to);
         }
+        return edge;
     }
 
     private POSplit findPOSplit(TezOperator tezOp, OperatorKey splitKey)
@@ -647,12 +650,13 @@ public class TezCompiler extends PhyPlan
         try {
             POLocalRearrange lr = localRearrangeFactory.create();
             lr.setDistinct(true);
+            lr.setAlias(op.getAlias());
             curTezOp.plan.addAsLeaf(lr);
-            curTezOp.customPartitioner = op.getCustomPartitioner();
             TezOperator lastOp = curTezOp;
 
             // Mark the start of a new TezOperator, connecting the inputs.
             blocking();
+            TezCompilerUtil.setCustomPartitioner(op.getCustomPartitioner(), curTezOp);
 
             // Add the DISTINCT plan as the combine plan. In MR Pig, the combiner is implemented
             // with a global variable and a specific DistinctCombiner class. This seems better.
@@ -660,6 +664,7 @@ public class TezCompiler extends PhyPlan
             addDistinctPlan(combinePlan, 1);
 
             POLocalRearrangeTez clr = localRearrangeFactory.create();
+            clr.setOutputKey(curTezOp.getOperatorKey().toString());
             clr.setDistinct(true);
             combinePlan.addAsLeaf(clr);
 
@@ -745,11 +750,14 @@ public class TezCompiler extends PhyPlan
                     lr.setOutputKey(curTezOp.getOperatorKey().toString());
 
                     tezOp.plan.addAsLeaf(lr);
-                    TezCompilerUtil.connect(tezPlan, tezOp, curTezOp);
-                    inputKeys.add(tezOp.getOperatorKey().toString());
+                    TezEdgeDescriptor edge = handleSplitAndConnect(tezPlan, tezOp, curTezOp);
+                    if (tezOp.getSplitOperatorKey() == null) {
+                        inputKeys.add(tezOp.getSplitOperatorKey().toString());
+                    } else {
+                        inputKeys.add(tezOp.getOperatorKey().toString());
+                    }
 
                     // Configure broadcast edges for replicated tables
-                    TezEdgeDescriptor edge = curTezOp.inEdges.get(tezOp.getOperatorKey());
                     edge.dataMovementType = DataMovementType.BROADCAST;
                     edge.outputClassName = OnFileUnorderedKVOutput.class.getName();
                     edge.inputClassName = ShuffledUnorderedKVInput.class.getName();
@@ -818,6 +826,7 @@ public class TezCompiler extends PhyPlan
 
             // Need to add POLocalRearrange to the end of the last tezOp before we shuffle.
             POLocalRearrange lr = localRearrangeFactory.create();
+            lr.setAlias(op.getAlias());
             curTezOp.plan.addAsLeaf(lr);
 
             // Mark the start of a new TezOperator, connecting the inputs.
@@ -833,11 +842,14 @@ public class TezCompiler extends PhyPlan
             // Then add a POPackage and a POForEach to the start of the new tezOp.
             POPackage pkg = getPackage(1, DataType.TUPLE);
             POForEach forEach = TezCompilerUtil.getForEachPlain(scope, nig);
+            pkg.setAlias(op.getAlias());
+            forEach.setAlias(op.getAlias());
             curTezOp.plan.add(pkg);
             curTezOp.plan.addAsLeaf(forEach);
 
             if (!pigContext.inIllustrator) {
                 POLimit limitCopy = new POLimit(new OperatorKey(scope, nig.getNextNodeId(scope)));
+                limitCopy.setAlias(op.getAlias());
                 limitCopy.setLimit(op.getLimit());
                 limitCopy.setLimitPlan(op.getLimitPlan());
                 curTezOp.plan.addAsLeaf(limitCopy);
@@ -889,7 +901,7 @@ public class TezCompiler extends PhyPlan
     public void visitGlobalRearrange(POGlobalRearrange op) throws VisitorException {
         try {
             blocking();
-            curTezOp.customPartitioner = op.getCustomPartitioner();
+            TezCompilerUtil.setCustomPartitioner(op.getCustomPartitioner(), curTezOp);
             phyToTezOpMap.put(op, curTezOp);
         } catch (Exception e) {
             int errCode = 2034;
@@ -1179,6 +1191,7 @@ public class TezCompiler extends PhyPlan
 
     @Override
     public void visitSkewedJoin(POSkewedJoin op) throws VisitorException {
+        //TODO: handle split and connect
         try {
             // LR that transfers loaded input to partition vertex
             POLocalRearrangeTez lrTez = localRearrangeFactory.create(LocalRearrangeType.NULL);
@@ -1939,21 +1952,18 @@ public class TezCompiler extends PhyPlan
             Pair<TezOperator, Integer> quantJobParallelismPair = getQuantileJobs(op, rp);
             TezOperator[] sortOpers = getSortJobs(op, quantJobParallelismPair.second, keyType, fields);
 
-            lr.setOutputKey(sortOpers[0].getOperatorKey().toString());
-            lrSample.setOutputKey(quantJobParallelismPair.first.getOperatorKey().toString());
-
-            TezCompilerUtil.connect(tezPlan, prevOper, sortOpers[0]);
-            TezEdgeDescriptor edge = sortOpers[0].inEdges.get(prevOper.getOperatorKey());
+            TezEdgeDescriptor edge = handleSplitAndConnect(tezPlan, prevOper, sortOpers[0]);
 
             // TODO: Convert to unsorted shuffle after TEZ-661
             // edge.outputClassName = OnFileUnorderedKVOutput.class.getName();
             // edge.inputClassName = ShuffledUnorderedKVInput.class.getName();
             edge.partitionerClass = RoundRobinPartitioner.class;
 
-            TezCompilerUtil.connect(tezPlan, prevOper, quantJobParallelismPair.first);
+            handleSplitAndConnect(tezPlan, prevOper, quantJobParallelismPair.first, false);
+            lr.setOutputKey(sortOpers[0].getOperatorKey().toString());
+            lrSample.setOutputKey(quantJobParallelismPair.first.getOperatorKey().toString());
 
-            TezCompilerUtil.connect(tezPlan, quantJobParallelismPair.first, sortOpers[0]);
-            edge = sortOpers[0].inEdges.get(quantJobParallelismPair.first.getOperatorKey());
+            edge = TezCompilerUtil.connect(tezPlan, quantJobParallelismPair.first, sortOpers[0]);
             edge.dataMovementType = DataMovementType.BROADCAST;
             edge.outputClassName = OnFileUnorderedKVOutput.class.getName();
             edge.inputClassName = ShuffledUnorderedKVInput.class.getName();
@@ -1961,8 +1971,7 @@ public class TezCompiler extends PhyPlan
             lr.setOutputKey(sortOpers[0].getOperatorKey().toString());
             sortOpers[0].sampleOperator = quantJobParallelismPair.first;
 
-            TezCompilerUtil.connect(tezPlan, sortOpers[0], sortOpers[1]);
-            edge = sortOpers[1].inEdges.get(sortOpers[0].getOperatorKey());
+            edge = TezCompilerUtil.connect(tezPlan, sortOpers[0], sortOpers[1]);
             edge.partitionerClass = WeightedRangePartitionerTez.class;
 
             curTezOp = sortOpers[1];
@@ -2052,6 +2061,7 @@ public class TezCompiler extends PhyPlan
             // before we broadcast.
             for (int i = 0; i < compiledInputs.length; i++) {
                 POLocalRearrangeTez lr = localRearrangeFactory.create(i, LocalRearrangeType.STAR);
+                lr.setAlias(op.getAlias());
                 lr.setUnion(true);
                 compiledInputs[i].plan.addAsLeaf(lr);
             }
@@ -2062,6 +2072,7 @@ public class TezCompiler extends PhyPlan
 
             // Then add a POPackage to the start of the new tezOp.
             POPackage pkg = getPackage(compiledInputs.length, DataType.TUPLE);
+            pkg.setAlias(op.getAlias());
             curTezOp.markUnion();
             curTezOp.plan.add(pkg);
             // TODO: Union should use OnFileUnorderedKVOutput instead of

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompilerUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompilerUtil.java?rev=1560565&r1=1560564&r2=1560565&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompilerUtil.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompilerUtil.java Thu Jan 23 01:06:21 2014
@@ -1,5 +1,6 @@
 package org.apache.pig.backend.hadoop.executionengine.tez;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -11,6 +12,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.data.DataType;
+import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.PlanException;
@@ -19,6 +21,9 @@ import com.google.common.collect.Lists;
 
 public class TezCompilerUtil {
 
+    private TezCompilerUtil() {
+    }
+
     // simpleConnectTwoVertex is a utility to end a vertex equivalent to map and start vertex equivalent to
     // reduce in a tez operator:
     // 1. op1 is open
@@ -64,11 +69,13 @@ public class TezCompilerUtil {
         connect(tezPlan, op1, op2);
     }
 
-    static public void connect(TezOperPlan plan, TezOperator from, TezOperator to) throws PlanException {
+    static public TezEdgeDescriptor connect(TezOperPlan plan, TezOperator from, TezOperator to) throws PlanException {
         plan.connect(from, to);
         // Add edge descriptors to old and new operators
-        to.inEdges.put(from.getOperatorKey(), new TezEdgeDescriptor());
-        from.outEdges.put(to.getOperatorKey(), new TezEdgeDescriptor());
+        TezEdgeDescriptor edge = new TezEdgeDescriptor();
+        to.inEdges.put(from.getOperatorKey(), edge);
+        from.outEdges.put(to.getOperatorKey(), edge);
+        return edge;
     }
 
     static public POForEach getForEach(POProject project, int rp, String scope, NodeIdGenerator nig) {
@@ -103,4 +110,12 @@ public class TezCompilerUtil {
         st.setIsTmpStore(true);
         return st;
     }
+
+    static public void setCustomPartitioner(String customPartitioner, TezOperator tezOp) throws IOException {
+        if (customPartitioner != null) {
+            for (TezEdgeDescriptor edge : tezOp.inEdges.values()) {
+                edge.partitionerClass = PigContext.resolveClassName(customPartitioner);
+            }
+        }
+    }
 }

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1560565&r1=1560564&r2=1560565&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Thu Jan 23 01:06:21 2014
@@ -28,8 +28,6 @@ import java.util.Map;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.WritableComparable;
@@ -37,13 +35,9 @@ import org.apache.hadoop.io.WritableComp
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.Partitioner;
 import org.apache.hadoop.mapreduce.security.TokenCache;
 import org.apache.hadoop.yarn.api.records.LocalResource;
-import org.apache.hadoop.yarn.api.records.LocalResourceType;
-import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.apache.pig.LoadFunc;
 import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigException;
@@ -53,6 +47,8 @@ import org.apache.pig.backend.hadoop.HDa
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingPartitionWritableComparator;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigSecondaryKeyGroupComparator;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PhyPlanSetter;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigBigDecimalRawComparator;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigBigIntegerRawComparator;
@@ -66,8 +62,10 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigIntRawComparator;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigLongRawComparator;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigOutputFormat;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSecondaryKeyComparator;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextRawComparator;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTupleSortComparator;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.SecondaryKeyPartitioner;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.EndOfAllInputSetter;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
@@ -193,9 +191,10 @@ public class TezDagBuilder extends TezOp
         for (POLocalRearrangeTez lr : lrs) {
             if (lr.getOutputKey().equals(to.getOperatorKey().toString())) {
                 byte keyType = lr.getKeyType();
-                setIntermediateInputKeyValue(keyType, conf, to.isSkewedJoin());
-                setIntermediateOutputKeyValue(keyType, conf, to.isSkewedJoin());
-                conf.set("pig.reduce.key.type", Byte.toString(keyType));
+                setIntermediateInputKeyValue(keyType, conf, to);
+                setIntermediateOutputKeyValue(keyType, conf, to);
+                // In case of secondary key sort, main key type is the actual key type
+                conf.set("pig.reduce.key.type", Byte.toString(lr.getMainKeyType()));
                 break;
             }
         }
@@ -203,28 +202,42 @@ public class TezDagBuilder extends TezOp
         conf.setBoolean("mapred.mapper.new-api", true);
         conf.set("pig.pigContext", ObjectSerializer.serialize(pc));
 
-        if (edge.partitionerClass != null) {
-            conf.setClass("mapreduce.job.partitioner.class",
-                    edge.partitionerClass, Partitioner.class);
-        }
-
         if(from.isGlobalSort() || from.isLimitAfterSort()){
             if (from.isGlobalSort()) {
-                FileSystem fs = FileSystem.get(globalConf);
-                Path quantFilePath = new Path(from.getQuantFile() + "/part-r-00000");
-                FileStatus fstat = fs.getFileStatus(quantFilePath);
-                LocalResource quantFileResource = LocalResource.newInstance(
-                        ConverterUtils.getYarnUrlFromPath(fstat.getPath()),
-                        LocalResourceType.FILE,
-                        LocalResourceVisibility.APPLICATION,
-                        fstat.getLen(),
-                        fstat.getModificationTime());
-                localResources.put(quantFilePath.getName(), quantFileResource);
                 conf.set("pig.sortOrder",
                         ObjectSerializer.serialize(from.getSortOrder()));
             }
         }
 
+        if (edge.isUseSecondaryKey()) {
+            conf.set("pig.secondarySortOrder",
+                    ObjectSerializer.serialize(edge.getSecondarySortOrder()));
+            conf.set(org.apache.hadoop.mapreduce.MRJobConfig.PARTITIONER_CLASS_ATTR,
+                    SecondaryKeyPartitioner.class.getName());
+            // In MR - job.setSortComparatorClass() or MRJobConfig.KEY_COMPARATOR
+            conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS,
+                    PigSecondaryKeyComparator.class.getName());
+            // In MR - job.setOutputKeyClass() or MRJobConfig.OUTPUT_KEY_CLASS
+            conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS, NullableTuple.class.getName());
+            // These needs to be on the vertex as well for POShuffleTezLoad to pick it up.
+            // Tez framework also expects this to be per vertex and not edge. IFile.java picks
+            // up keyClass and valueClass from vertex config. TODO - check with Tez folks
+            conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_COMPARATOR_CLASS,
+                    PigSecondaryKeyComparator.class.getName());
+            conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_CLASS, NullableTuple.class.getName());
+            // In MR - job.setGroupingComparatorClass() or MRJobConfig.GROUP_COMPARATOR_CLASS
+            // TODO: Check why tez-mapreduce ReduceProcessor use two different tez
+            // settings for the same MRJobConfig.GROUP_COMPARATOR_CLASS and use only one
+            conf.set(TezJobConfig.TEZ_RUNTIME_GROUP_COMPARATOR_CLASS, PigSecondaryKeyGroupComparator.class.getName());
+            conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_SECONDARY_COMPARATOR_CLASS,
+                    PigSecondaryKeyGroupComparator.class.getName());
+        }
+
+        if (edge.partitionerClass != null) {
+            conf.set(org.apache.hadoop.mapreduce.MRJobConfig.PARTITIONER_CLASS_ATTR,
+                    edge.partitionerClass.getName());
+        }
+
         in.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
         out.setUserPayload(TezUtils.createUserPayloadFromConf(conf));
 
@@ -235,11 +248,11 @@ public class TezDagBuilder extends TezOp
     private void addCombiner(PhysicalPlan combinePlan, TezOperator pkgTezOp,
             Configuration conf) throws IOException {
         POPackage combPack = (POPackage) combinePlan.getRoots().get(0);
-        setIntermediateInputKeyValue(combPack.getPkgr().getKeyType(), conf, pkgTezOp.isSkewedJoin());
+        setIntermediateInputKeyValue(combPack.getPkgr().getKeyType(), conf, pkgTezOp);
 
         POLocalRearrange combRearrange = (POLocalRearrange) combinePlan
                 .getLeaves().get(0);
-        setIntermediateOutputKeyValue(combRearrange.getKeyType(), conf, pkgTezOp.isSkewedJoin());
+        setIntermediateOutputKeyValue(combRearrange.getKeyType(), conf, pkgTezOp);
 
         LoRearrangeDiscoverer lrDiscoverer = new LoRearrangeDiscoverer(
                 combinePlan, pkgTezOp, combPack);
@@ -324,8 +337,7 @@ public class TezDagBuilder extends TezOp
             byte keyType = pack.getPkgr().getKeyType();
             tezOp.plan.remove(pack);
             payloadConf.set("pig.reduce.package", ObjectSerializer.serialize(pack));
-            payloadConf.set("pig.reduce.key.type", Byte.toString(keyType));
-            setIntermediateInputKeyValue(keyType, payloadConf, tezOp.isSkewedJoin());
+            setIntermediateInputKeyValue(keyType, payloadConf, tezOp);
             POShuffleTezLoad newPack;
             if (tezOp.isUnion()) {
                 newPack = new POUnionTezLoad(pack);
@@ -355,7 +367,7 @@ public class TezDagBuilder extends TezOp
             }
 
             setIntermediateInputKeyValue(pack.getPkgr().getKeyType(), payloadConf,
-                    tezOp.isSkewedJoin());
+                    tezOp);
         }
 
         payloadConf.setClass("mapreduce.outputformat.class",
@@ -601,28 +613,32 @@ public class TezDagBuilder extends TezOp
     }
 
     @SuppressWarnings("rawtypes")
-    private void setIntermediateInputKeyValue(byte keyType, Configuration conf, boolean isSkewedJoin)
+    private void setIntermediateInputKeyValue(byte keyType, Configuration conf, TezOperator tezOp)
             throws JobCreationException, ExecException {
-        Class<? extends WritableComparable> keyClass = HDataType
-                .getWritableComparableTypes(keyType).getClass();
-        if (isSkewedJoin) {
+        if (tezOp.isUseSecondaryKey()) {
+            conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_CLASS,
+                    NullableTuple.class.getName());
+        } else if (tezOp.isSkewedJoin()) {
             conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_CLASS,
                     NullablePartitionWritable.class.getName());
         } else {
+            Class<? extends WritableComparable> keyClass = HDataType
+                    .getWritableComparableTypes(keyType).getClass();
             conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_CLASS,
                     keyClass.getName());
+
         }
+        selectInputComparator(conf, keyType, tezOp);
         conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_VALUE_CLASS,
                 NullableTuple.class.getName());
-        selectInputComparator(conf, keyType, isSkewedJoin);
     }
 
     @SuppressWarnings("rawtypes")
-    private void setIntermediateOutputKeyValue(byte keyType, Configuration conf, boolean isSkewedJoin)
+    private void setIntermediateOutputKeyValue(byte keyType, Configuration conf, TezOperator tezOp)
             throws JobCreationException, ExecException {
         Class<? extends WritableComparable> keyClass = HDataType
                 .getWritableComparableTypes(keyType).getClass();
-        if (isSkewedJoin) {
+        if (tezOp.isSkewedJoin()) {
             conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_CLASS,
                     NullablePartitionWritable.class.getName());
         } else {
@@ -633,17 +649,13 @@ public class TezDagBuilder extends TezOp
                 NullableTuple.class.getName());
         conf.set(TezJobConfig.TEZ_RUNTIME_PARTITIONER_CLASS,
                 MRPartitioner.class.getName());
-        selectOutputComparator(keyType, conf, isSkewedJoin);
+        selectOutputComparator(keyType, conf, tezOp);
     }
 
-    static Class<? extends WritableComparator> comparatorForKeyType(byte keyType, boolean isSkewedJoin)
+    static Class<? extends WritableComparator> comparatorForKeyType(byte keyType)
             throws JobCreationException {
         // TODO: Handle sorting like in JobControlCompiler
 
-        if (isSkewedJoin) {
-            return JobControlCompiler.PigGroupingPartitionWritableComparator.class;
-        }
-
         switch (keyType) {
         case DataType.BOOLEAN:
             return PigBooleanRawComparator.class;
@@ -695,19 +707,47 @@ public class TezDagBuilder extends TezOp
         }
     }
 
-    void selectInputComparator(Configuration conf, byte keyType, boolean isSkewedJoin)
+    void selectInputComparator(Configuration conf, byte keyType, TezOperator tezOp)
             throws JobCreationException {
         // TODO: Handle sorting like in JobControlCompiler
-        conf.setClass(
-                TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_COMPARATOR_CLASS,
-                comparatorForKeyType(keyType, isSkewedJoin), RawComparator.class);
+        // TODO: Group comparators as in JobControlCompiler
+        if (tezOp.isUseSecondaryKey()) {
+            conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_COMPARATOR_CLASS,
+                    PigSecondaryKeyComparator.class.getName());
+            conf.set(TezJobConfig.TEZ_RUNTIME_GROUP_COMPARATOR_CLASS,
+                    PigSecondaryKeyGroupComparator.class.getName());
+            conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_SECONDARY_COMPARATOR_CLASS,
+                    PigSecondaryKeyGroupComparator.class.getName());
+        } else {
+            if (tezOp.isSkewedJoin()) {
+                // TODO: PigGroupingPartitionWritableComparator only used as Group comparator in MR.
+                // What should be TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_COMPARATOR_CLASS if same as MR?
+                conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_COMPARATOR_CLASS,
+                        PigGroupingPartitionWritableComparator.class.getName());
+                conf.set(TezJobConfig.TEZ_RUNTIME_GROUP_COMPARATOR_CLASS,
+                        PigGroupingPartitionWritableComparator.class.getName());
+                conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_SECONDARY_COMPARATOR_CLASS,
+                        PigGroupingPartitionWritableComparator.class.getName());
+            } else {
+                conf.setClass(
+                        TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_COMPARATOR_CLASS,
+                        comparatorForKeyType(keyType), RawComparator.class);
+            }
+        }
     }
 
-    void selectOutputComparator(byte keyType, Configuration conf, boolean isSkewedJoin)
+    void selectOutputComparator(byte keyType, Configuration conf, TezOperator tezOp)
             throws JobCreationException {
         // TODO: Handle sorting like in JobControlCompiler
-        conf.setClass(
-                TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS,
-                comparatorForKeyType(keyType, isSkewedJoin), RawComparator.class);
+        if (tezOp.isSkewedJoin()) {
+            // TODO: PigGroupingPartitionWritableComparator only used as Group comparator in MR.
+            // What should be TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS if same as MR?
+            conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS,
+                    PigGroupingPartitionWritableComparator.class.getName());
+        } else {
+            conf.setClass(
+                    TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_OUTPUT_KEY_COMPARATOR_CLASS,
+                    comparatorForKeyType(keyType), RawComparator.class);
+        }
     }
 }

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezEdgeDescriptor.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezEdgeDescriptor.java?rev=1560565&r1=1560564&r2=1560565&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezEdgeDescriptor.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezEdgeDescriptor.java Thu Jan 23 01:06:21 2014
@@ -34,11 +34,19 @@ public class TezEdgeDescriptor {
 
     public String inputClassName;
     public String outputClassName;
-    public Class<? extends Partitioner> partitionerClass;
+
     public SchedulingType schedulingType;
     public DataSourceType dataSourceType;
     public DataMovementType dataMovementType;
 
+    public Class<? extends Partitioner> partitionerClass;
+
+    // If true, we will use secondary key sort in the job
+    private boolean useSecondaryKey = false;
+
+    // Sort order for secondary keys;
+    private boolean[] secondarySortOrder;
+
     public TezEdgeDescriptor() {
         combinePlan = new PhysicalPlan();
 
@@ -50,4 +58,25 @@ public class TezEdgeDescriptor {
         dataSourceType = DataSourceType.PERSISTED;
         dataMovementType = DataMovementType.SCATTER_GATHER;
     }
+
+    public boolean isUseSecondaryKey() {
+        return useSecondaryKey;
+    }
+
+    public void setUseSecondaryKey(boolean useSecondaryKey) {
+        this.useSecondaryKey = useSecondaryKey;
+    }
+
+    public boolean[] getSecondarySortOrder() {
+        return secondarySortOrder;
+    }
+
+    public void setSecondarySortOrder(boolean[] secondarySortOrder) {
+        if(null == secondarySortOrder) return;
+        this.secondarySortOrder = new boolean[secondarySortOrder.length];
+        for(int i = 0; i < secondarySortOrder.length; ++i) {
+            this.secondarySortOrder[i] = secondarySortOrder[i];
+        }
+    }
+
 }

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java?rev=1560565&r1=1560564&r2=1560565&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java Thu Jan 23 01:06:21 2014
@@ -150,7 +150,7 @@ public class TezLauncher extends Launche
             throws PlanException, IOException, VisitorException {
         TezCompiler comp = new TezCompiler(php, pc);
         TezOperPlan tezPlan = comp.compile();
-        Boolean nocombiner = Boolean.parseBoolean(pc.getProperties().getProperty(
+        boolean nocombiner = Boolean.parseBoolean(pc.getProperties().getProperty(
                 PigConfiguration.PROP_NO_COMBINER, "false"));
 
         // Run CombinerOptimizer on Tez plan
@@ -162,6 +162,15 @@ public class TezLauncher extends Launche
             co.getMessageCollector().logMessages(MessageType.Warning, aggregateWarning, log);
         }
 
+        // Run optimizer to make use of secondary sort key when possible for nested foreach
+        // order by and distinct. Should be done before AccumulatorOptimizer
+        boolean noSecKeySort = Boolean.parseBoolean(pc.getProperties().getProperty(
+                PigConfiguration.PIG_EXEC_NO_SECONDARY_KEY, "false"));
+        if (!pc.inIllustrator && !noSecKeySort)  {
+            SecondaryKeyOptimizerTez skOptimizer = new SecondaryKeyOptimizerTez(tezPlan);
+            skOptimizer.visit();
+        }
+
         // Run AccumulatorOptimizer on Tez plan
         boolean isAccum = Boolean.parseBoolean(pc.getProperties().getProperty(
                     PigConfiguration.OPT_ACCUMULATOR, "true"));

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java?rev=1560565&r1=1560564&r2=1560565&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperator.java Thu Jan 23 01:06:21 2014
@@ -59,9 +59,6 @@ public class TezOperator extends Operato
     int requestedMemory = 1024;
     int requestedCpu = 1;
 
-    // Name of the Custom Partitioner used
-    String customPartitioner = null;
-
     // Presence indicates that this TezOper is sub-plan of a POSplit.
     // Only POStore or POLocalRearrange leaf can be a sub-plan of POSplit
     private OperatorKey splitOperatorKey = null;
@@ -79,9 +76,6 @@ public class TezOperator extends Operato
     //Indicates if this job is an order by job
     boolean globalSort = false;
 
-    //The quantiles file name if globalSort is true
-    String quantFile;
-
     //The sort order of the columns;
     //asc is true and desc is false
     boolean[] sortOrder;
@@ -100,6 +94,9 @@ public class TezOperator extends Operato
     // If not null, need to collect sample sent from predecessor
     TezOperator sampleOperator = null;
 
+    // If true, we will use secondary key sort in the job
+    private boolean useSecondaryKey = false;
+
     // Types of blocking operators. For now, we only support the following ones.
     private static enum OPER_FEATURE {
         NONE,
@@ -211,6 +208,14 @@ public class TezOperator extends Operato
         feature = OPER_FEATURE.SAMPLER;
     }
 
+    public boolean isUseSecondaryKey() {
+        return useSecondaryKey;
+    }
+
+    public void setUseSecondaryKey(boolean useSecondaryKey) {
+        this.useSecondaryKey = useSecondaryKey;
+    }
+
     @Override
     public String name() {
         String udfStr = getUDFsAsStr();
@@ -261,14 +266,6 @@ public class TezOperator extends Operato
         return segmentBelow;
     }
 
-    public String getQuantFile() {
-        return quantFile;
-    }
-
-    public void setQuantFile(String quantFile) {
-        this.quantFile = quantFile;
-    }
-
     public void setSortOrder(boolean[] sortOrder) {
         if(null == sortOrder) return;
         this.sortOrder = new boolean[sortOrder.length];

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPrinter.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPrinter.java?rev=1560565&r1=1560564&r2=1560565&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPrinter.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPrinter.java Thu Jan 23 01:06:21 2014
@@ -54,6 +54,7 @@ public class TezPrinter extends TezOpPla
         mStream.println("Tez vertex " + tezOper.getOperatorKey().toString());
         if (tezOper.inEdges.size() > 0) {
             for (Entry<OperatorKey, TezEdgeDescriptor> inEdge : tezOper.inEdges.entrySet()) {
+                //TODO: Print other edge properties like custom partitioner
                 if (!inEdge.getValue().combinePlan.isEmpty()) {
                     mStream.println("# Combine plan on edge <" + inEdge.getKey() + ">");
                     PlanPrinter<PhysicalOperator, PhysicalPlan> printer =

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java?rev=1560565&r1=1560564&r2=1560565&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/util/CombinerOptimizerUtil.java Thu Jan 23 01:06:21 2014
@@ -26,6 +26,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartialAgg;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPreCombinerLocalRearrange;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
+import org.apache.pig.classification.InterfaceAudience;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.plan.CompilationMessageCollector;
 import org.apache.pig.impl.plan.CompilationMessageCollector.MessageType;
@@ -42,10 +43,14 @@ import org.apache.pig.impl.util.Pair;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 
+@InterfaceAudience.Private
 public class CombinerOptimizerUtil {
     private static final String DISTINCT_UDF_CLASSNAME = org.apache.pig.builtin.Distinct.class.getName();
     private static final Log LOG = LogFactory.getLog(CombinerOptimizerUtil.class);
 
+    private CombinerOptimizerUtil() {
+    }
+
     /**
      * Algebraic functions and distinct in nested plan of a foreach are
      * partially computed in the map and combine phase. A new foreach statement