You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2014/07/07 22:08:16 UTC

svn commit: r1608593 [1/3] - in /pig/trunk: ./ shims/test/hadoop23/org/apache/pig/test/ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/ src/org...

Author: rohini
Date: Mon Jul  7 20:08:14 2014
New Revision: 1608593

URL: http://svn.apache.org/r1608593
Log:
PIG-3935: Port more mini cluster tests to Tez - part 5 (rohini)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/build.xml
    pig/trunk/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java
    pig/trunk/src/org/apache/pig/PigConfiguration.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/MultiQueryOptimizerTez.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/POLocalRearrangeTez.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueOutputTez.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/ReadScalarsTez.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPrinter.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/UnionOptimizer.java
    pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java
    pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezTaskStats.java
    pig/trunk/test/e2e/pig/tests/nightly.conf
    pig/trunk/test/org/apache/pig/test/MiniGenericCluster.java
    pig/trunk/test/org/apache/pig/test/TestEvalPipeline2.java
    pig/trunk/test/org/apache/pig/test/TestGrunt.java
    pig/trunk/test/org/apache/pig/test/TestLimitAdjuster.java
    pig/trunk/test/org/apache/pig/test/TestLimitVariable.java
    pig/trunk/test/org/apache/pig/test/TestParser.java
    pig/trunk/test/org/apache/pig/test/TestPigSplit.java
    pig/trunk/test/org/apache/pig/test/TestScalarAliases.java
    pig/trunk/test/org/apache/pig/test/TestStore.java
    pig/trunk/test/org/apache/pig/test/TestTmpFileCompression.java
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/TEZC16.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/TEZC4.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-2-OPTOFF.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-MQ-2.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Rank-2.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-1.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-10.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-2.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-3.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-5.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-6.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-7-OPTOFF.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-7.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-8-OPTOFF.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-8.gld
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-9.gld
    pig/trunk/test/tez-tests

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1608593&r1=1608592&r2=1608593&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Jul  7 20:08:14 2014
@@ -42,6 +42,8 @@ OPTIMIZATIONS
  
 BUG FIXES
 
+PIG-3935: Port more mini cluster tests to Tez - part 5 (rohini)
+
 PIG-3984: PigServer.shutdown removes the tez resource folder (daijy via rohini)
 
 PIG-4048: TEZ-692 has a incompatible API change removing TezSession (rohini)

Modified: pig/trunk/build.xml
URL: http://svn.apache.org/viewvc/pig/trunk/build.xml?rev=1608593&r1=1608592&r2=1608593&view=diff
==============================================================================
--- pig/trunk/build.xml (original)
+++ pig/trunk/build.xml Mon Jul  7 20:08:14 2014
@@ -157,6 +157,15 @@
     <condition property="isWindows">
       <equals arg1="${windows}" arg2="true" />
     </condition>
+	
+    <target name="setTezEnv">
+        <propertyreset name="hadoopversion" value="23" />
+        <propertyreset name="isHadoop23" value="true" />
+        <propertyreset name="src.shims.dir" value="${basedir}/shims/src/hadoop${hadoopversion}" />
+        <propertyreset name="src.shims.test.dir" value="${basedir}/shims/test/hadoop${hadoopversion}" />
+        <propertyreset name="src.exclude.dir" value="" />
+        <propertyreset name="test.exec.type" value="tez" />
+    </target>
 
     <target name="setWindowsPath" if="${isWindows}">
       <property name="build.path" value="${env.Path};${hadoop.root}\bin" />
@@ -869,10 +878,10 @@
         <macro-test-runner test.file="${test.smoke.file}" />
     </target>
 
-    <target name="test-tez" depends="setWindowsPath,setLinuxPath,compile-test,jar-withouthadoop,debugger.check" description="Run tez unit tests">
+    <target name="test-tez" depends="setTezEnv,setWindowsPath,setLinuxPath,compile-test,jar-withouthadoop,debugger.check" description="Run tez unit tests">
         <macro-test-runner test.file="${test.tez.file}" />
     </target>
-
+	
     <target name="debugger.check" depends="debugger.set,debugger.unset"/>
     <target name="debugger.set" if="debugPort">
         <property name="debugArgs" value="-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=${debugPort}"/>

Modified: pig/trunk/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java
URL: http://svn.apache.org/viewvc/pig/trunk/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java?rev=1608593&r1=1608592&r2=1608593&view=diff
==============================================================================
--- pig/trunk/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java (original)
+++ pig/trunk/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java Mon Jul  7 20:08:14 2014
@@ -105,8 +105,6 @@ public class TezMiniCluster extends Mini
 
             // Write tez-site.xml
             Configuration tez_conf = new Configuration(false);
-            // TODO Remove this once TezSession reuse in mini cluster is fixed
-            tez_conf.set(PigConfiguration.TEZ_SESSION_REUSE, "false");
             // TODO PIG-3659 - Remove this once memory management is fixed
             tez_conf.set(TezJobConfig.TEZ_RUNTIME_IO_SORT_MB, "20");
             tez_conf.set("tez.lib.uris", "hdfs:///tez,hdfs:///tez/lib");

Modified: pig/trunk/src/org/apache/pig/PigConfiguration.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigConfiguration.java?rev=1608593&r1=1608592&r2=1608593&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigConfiguration.java (original)
+++ pig/trunk/src/org/apache/pig/PigConfiguration.java Mon Jul  7 20:08:14 2014
@@ -81,6 +81,12 @@ public class PigConfiguration {
      */
     public static final String OPT_ACCUMULATOR = "opt.accumulator";
 
+
+    /**
+     * This key is used to configure auto parallelism in tez. Default is true.
+     */
+    public static final String TEZ_AUTO_PARALLELISM = "pig.tez.auto.parallelism";
+
     /**
      * This key is used to enable union optimization.
      */

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1608593&r1=1608592&r2=1608593&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Mon Jul  7 20:08:14 2014
@@ -690,7 +690,7 @@ public class JobControlCompiler{
                 }
             }
 
-            setOutputFormat((JobConf) nwJob.getConfiguration());
+            setOutputFormat(nwJob);
 
             if (mapStores.size() + reduceStores.size() == 1) { // single store case
                 log.info("Setting up single store job");
@@ -1877,24 +1877,24 @@ public class JobControlCompiler{
         }
     }
 
-    public static void setOutputFormat(JobConf conf) {
+    public static void setOutputFormat(org.apache.hadoop.mapreduce.Job job) {
         // the OutputFormat we report to Hadoop is always PigOutputFormat which
         // can be wrapped with LazyOutputFormat provided if it is supported by
         // the Hadoop version and PigConfiguration.PIG_OUTPUT_LAZY is set
-        if ("true".equalsIgnoreCase(conf.get(PigConfiguration.PIG_OUTPUT_LAZY))) {
+        if ("true".equalsIgnoreCase(job.getConfiguration().get(PigConfiguration.PIG_OUTPUT_LAZY))) {
             try {
                 Class<?> clazz = PigContext
                         .resolveClassName("org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat");
                 Method method = clazz.getMethod("setOutputFormatClass",
-                        JobConf.class, Class.class);
-                method.invoke(null, conf, PigOutputFormat.class);
+                        org.apache.hadoop.mapreduce.Job.class, Class.class);
+                method.invoke(null, job, PigOutputFormat.class);
             } catch (Exception e) {
-                conf.set("mapreduce.outputformat.class", PigOutputFormat.class.getName());
+                job.setOutputFormatClass(PigOutputFormat.class);
                 log.warn(PigConfiguration.PIG_OUTPUT_LAZY
                         + " is set but LazyOutputFormat couldn't be loaded. Default PigOutputFormat will be used");
             }
         } else {
-            conf.set("mapreduce.outputformat.class", PigOutputFormat.class.getName());
+            job.setOutputFormatClass(PigOutputFormat.class);
         }
     }
 

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java?rev=1608593&r1=1608592&r2=1608593&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java Mon Jul  7 20:08:14 2014
@@ -38,6 +38,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
@@ -218,7 +219,13 @@ public class PlanPrinter<O extends Opera
                   inner_plans.addAll(joinPlans.values());
                   sb.append(planString(inner_plans));
               }
-            }
+          }
+          else if(node instanceof POLimit) {
+              PhysicalPlan limitPlan = ((POLimit)node).getLimitPlan();
+              if (limitPlan != null) {
+                  sb.append(planString(limitPlan));
+              }
+          }
         }
 
         if (node instanceof POSplit) {

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/MultiQueryOptimizerTez.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/MultiQueryOptimizerTez.java?rev=1608593&r1=1608592&r2=1608593&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/MultiQueryOptimizerTez.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/MultiQueryOptimizerTez.java Mon Jul  7 20:08:14 2014
@@ -22,6 +22,7 @@ import java.util.List;
 import java.util.Map.Entry;
 
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
@@ -107,9 +108,9 @@ public class MultiQueryOptimizerTez exte
                     addSubPlanPropertiesToParent(tezOp, splitee);
 
                     removeSplittee(getPlan(), tezOp, splitee);
-                    valueOutput.outputKeys.remove(splitee.getOperatorKey().toString());
+                    valueOutput.removeOutputKey(splitee.getOperatorKey().toString());
                 }
-                if (!valueOutput.outputKeys.isEmpty()) {
+                if (valueOutput.getTezOutputs().length > 0) {
                     // We still need valueOutput
                     PhysicalPlan phyPlan = new PhysicalPlan();
                     phyPlan.addAsLeaf(valueOutput);
@@ -145,6 +146,15 @@ public class MultiQueryOptimizerTez exte
                         input.replaceInput(splittee.getOperatorKey().toString(),
                                 splitter.getOperatorKey().toString());
                     }
+                    List<POUserFunc> userFuncs = PlanHelper.getPhysicalOperators(succTezOperator.plan, POUserFunc.class);
+                    for (POUserFunc userFunc : userFuncs) {
+                        if (userFunc.getFunc() instanceof ReadScalarsTez) {
+                            TezInput tezInput = (TezInput)userFunc.getFunc();
+                            tezInput.replaceInput(splittee.getOperatorKey().toString(),
+                                    splitter.getOperatorKey().toString());
+                            userFunc.getFuncSpec().setCtorArgs(tezInput.getTezInputs());
+                        }
+                    }
                 } catch (VisitorException e) {
                     throw new PlanException(e);
                 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/POLocalRearrangeTez.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/POLocalRearrangeTez.java?rev=1608593&r1=1608592&r2=1608593&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/POLocalRearrangeTez.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/POLocalRearrangeTez.java Mon Jul  7 20:08:14 2014
@@ -21,6 +21,8 @@ package org.apache.pig.backend.hadoop.ex
 import java.io.IOException;
 import java.util.Map;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.HDataType;
@@ -34,8 +36,6 @@ import org.apache.pig.impl.plan.NodeIdGe
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.tez.runtime.api.LogicalOutput;
 import org.apache.tez.runtime.library.api.KeyValueWriter;
-import org.apache.tez.runtime.library.output.OnFileSortedOutput;
-import org.apache.tez.runtime.library.output.OnFileUnorderedKVOutput;
 
 /**
  * POLocalRearrangeTez is used to write to a Tez OnFileSortedOutput
@@ -44,6 +44,7 @@ import org.apache.tez.runtime.library.ou
 public class POLocalRearrangeTez extends POLocalRearrange implements TezOutput {
 
     private static final long serialVersionUID = 1L;
+    private static final Log LOG = LogFactory.getLog(POLocalRearrangeTez.class);
 
     protected String outputKey;
     protected transient KeyValueWriter writer;
@@ -108,28 +109,13 @@ public class POLocalRearrangeTez extends
     @Override
     public void attachOutputs(Map<String, LogicalOutput> outputs,
             Configuration conf) throws ExecException {
-        LogicalOutput logicalOut = outputs.get(outputKey);
-        if (logicalOut == null) {
-            throw new ExecException(
-                    "POLocalRearrangeTez only accepts OnFileSortedOutput (shuffle)"
-                            + " or OnFileUnorderedKVOutput(broadcast). key =  "
-                            + outputKey + ", outputs = " + outputs
-                            + ", operator = " + this);
+        LogicalOutput output = outputs.get(outputKey);
+        if (output == null) {
+            throw new ExecException("Output to vertex " + outputKey + " is missing");
         }
         try {
-            if (logicalOut instanceof OnFileSortedOutput) {
-                writer = ((OnFileSortedOutput) logicalOut).getWriter();
-            } else if (logicalOut instanceof OnFileUnorderedKVOutput) {
-                writer = ((OnFileUnorderedKVOutput) logicalOut).getWriter();
-            } else {
-                throw new ExecException(
-                        "POLocalRearrangeTez only accepts OnFileSortedOutput (shuffle)"
-                                + " or OnFileUnorderedKVOutput(broadcast). key =  "
-                                + outputKey + ", outputs = " + outputs
-                                + ", operator = " + this);
-            }
-        } catch (ExecException e) {
-            throw e;
+            writer = (KeyValueWriter) output.getWriter();
+            LOG.info("Attached output to vertex " + outputKey + " : output=" + output + ", writer=" + writer);
         } catch (Exception e) {
             throw new ExecException(e);
         }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueOutputTez.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueOutputTez.java?rev=1608593&r1=1608592&r2=1608593&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueOutputTez.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/POValueOutputTez.java Mon Jul  7 20:08:14 2014
@@ -29,7 +29,6 @@ import java.util.Set;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.Writable;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
@@ -37,21 +36,31 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.tez.runtime.api.LogicalOutput;
+import org.apache.tez.runtime.api.TezProcessorContext;
 import org.apache.tez.runtime.library.api.KeyValueWriter;
 
-public class POValueOutputTez extends PhysicalOperator implements TezOutput {
+public class POValueOutputTez extends PhysicalOperator implements TezOutput, TezTaskConfigurable {
 
     private static final long serialVersionUID = 1L;
     private static final Log LOG = LogFactory.getLog(POValueOutputTez.class);
+
+    private static final TupleFactory tupleFactory = TupleFactory.getInstance();
+
+    private boolean taskIndexWithRecordIndexAsKey;
     // TODO Change this to outputKey and write only once
     // when a shared edge support is available in Tez
-    protected Set<String> outputKeys = new HashSet<String>();
+    private Set<String> outputKeys = new HashSet<String>();
     // TODO Change this to value only writer after implementing
     // value only input output
-    protected transient List<KeyValueWriter> writers;
+    private transient List<KeyValueWriter> writers;
+    private transient Object key;
+    private transient int taskIndex;
+    private transient long count;
+
 
     public static EmptyWritable EMPTY_KEY = new EmptyWritable();
 
@@ -59,6 +68,24 @@ public class POValueOutputTez extends Ph
         super(k);
     }
 
+    public boolean isTaskIndexWithRecordIndexAsKey() {
+        return taskIndexWithRecordIndexAsKey;
+    }
+
+    /*
+     * Sets tuple with task index and record index as the key. For eg: (0,1), (0,2), etc
+     * Default is empty key
+     */
+    public void setTaskIndexWithRecordIndexAsKey(boolean taskIndexWithRecordIndexAsKey) {
+        this.taskIndexWithRecordIndexAsKey = taskIndexWithRecordIndexAsKey;
+    }
+
+    @Override
+    public void initialize(TezProcessorContext processorContext)
+            throws ExecException {
+        taskIndex = processorContext.getTaskIndex();
+    }
+
     @Override
     public String[] getTezOutputs() {
         return outputKeys.toArray(new String[outputKeys.size()]);
@@ -89,6 +116,10 @@ public class POValueOutputTez extends Ph
                 throw new ExecException(e);
             }
         }
+        count = 0;
+        if (!taskIndexWithRecordIndexAsKey) {
+            key = EMPTY_KEY;
+        }
     }
 
     public void addOutputKey(String outputKey) {
@@ -117,7 +148,13 @@ public class POValueOutputTez extends Ph
             }
             for (KeyValueWriter writer : writers) {
                 try {
-                    writer.write(EMPTY_KEY, inp.result);
+                    if (taskIndexWithRecordIndexAsKey) {
+                        Tuple tuple = tupleFactory.newTuple(2);
+                        tuple.set(0, taskIndex);
+                        tuple.set(1, count++);
+                        key = tuple;
+                    }
+                    writer.write(key, inp.result);
                 } catch (IOException e) {
                     throw new ExecException(e);
                 }
@@ -164,24 +201,4 @@ public class POValueOutputTez extends Ph
         }
     }
 
-    //TODO: Remove after PIG-3775/TEZ-661
-    public static class EmptyWritableComparator implements RawComparator<EmptyWritable> {
-
-        @Override
-        public int compare(EmptyWritable o1, EmptyWritable o2) {
-            return 0;
-        }
-
-        @Override
-        public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
-            // 0 - Reverses the input order. 0 groups all values into
-            // single record on reducer which is additional overhead.
-            // -1, 1 - Returns input in random order. But comparator is invoked way more
-            // times than 0. Compared to 1, -1 invokes comparator even more.
-            // Going with 0 for now. After TEZ-661 this will not be required any more.
-            return 0;
-        }
-
-    }
-
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java?rev=1608593&r1=1608592&r2=1608593&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/PigProcessor.java Mon Jul  7 20:08:14 2014
@@ -91,7 +91,7 @@ public class PigProcessor implements Log
 
     private Configuration conf;
     private PigHadoopLogger pigHadoopLogger;
-    
+
     private TezProcessorContext processorContext;
 
     public static String sampleVertex;
@@ -102,7 +102,7 @@ public class PigProcessor implements Log
     public void initialize(TezProcessorContext processorContext)
             throws Exception {
         this.processorContext = processorContext;
-        
+
         // Reset any static variables to avoid conflict in container-reuse.
         sampleVertex = null;
         sampleMap = null;

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/ReadScalarsTez.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/ReadScalarsTez.java?rev=1608593&r1=1608592&r2=1608593&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/ReadScalarsTez.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/ReadScalarsTez.java Mon Jul  7 20:08:14 2014
@@ -78,9 +78,10 @@ public class ReadScalarsTez extends Eval
             KeyValueReader reader = (KeyValueReader) input.getReader();
             if (reader.next()) {
                 t = (Tuple) reader.getCurrentValue();
+                String first = t == null ? null : t.toString();
                 if (reader.next()) {
                     String msg = "Scalar has more than one row in the output. "
-                            + "1st : " + t + ", 2nd :"
+                            + "1st : " + first + ", 2nd :"
                             + reader.getCurrentValue();
                     throw new ExecException(msg);
                 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java?rev=1608593&r1=1608592&r2=1608593&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezCompiler.java Mon Jul  7 20:08:14 2014
@@ -42,6 +42,7 @@ import org.apache.pig.OrderedLoadFunc;
 import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigTupleWritableComparator;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompilerException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MergeJoinIndexer;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.ScalarPhyFinder;
@@ -107,6 +108,7 @@ import org.apache.pig.impl.util.Pair;
 import org.apache.pig.impl.util.Utils;
 import org.apache.pig.newplan.logical.relational.LOJoin;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
+import org.apache.tez.runtime.library.input.ShuffledMergedInput;
 import org.apache.tez.runtime.library.input.ShuffledUnorderedKVInput;
 import org.apache.tez.runtime.library.output.OnFileUnorderedKVOutput;
 
@@ -260,12 +262,11 @@ public class TezCompiler extends PhyPlan
 
                     TezOperator from = phyToTezOpMap.get(store);
 
-                    FuncSpec newSpec = new FuncSpec(ReadScalarsTez.class.getName(), from.getOperatorKey().toString());
-                    userFunc.setFuncSpec(newSpec);
-
                     if (storeSeen.containsKey(store)) {
-                        storeSeen.get(store).outputKeys.add(tezOp.getOperatorKey().toString());
+                        storeSeen.get(store).addOutputKey(tezOp.getOperatorKey().toString());
                     } else {
+                        FuncSpec newSpec = new FuncSpec(ReadScalarsTez.class.getName(), from.getOperatorKey().toString());
+                        userFunc.setFuncSpec(newSpec);
                         POValueOutputTez output = new POValueOutputTez(OperatorKey.genOpKey(scope));
                         output.addOutputKey(tezOp.getOperatorKey().toString());
                         from.plan.remove(from.plan.getOperator(store.getOperatorKey()));
@@ -771,28 +772,28 @@ public class TezCompiler extends PhyPlan
                 }
             }
 
-            // Need to add POLocalRearrange to the end of the last tezOp before we shuffle.
-            POLocalRearrange lr = localRearrangeFactory.create();
-            lr.setAlias(op.getAlias());
-            curTezOp.plan.addAsLeaf(lr);
+            // Need to add POValueOutputTez to the end of the last tezOp
+            POValueOutputTez output = new POValueOutputTez(OperatorKey.genOpKey(scope));
+            output.setAlias(op.getAlias());
+            curTezOp.plan.addAsLeaf(output);
+            TezOperator prevOp = curTezOp;
 
-            // Mark the start of a new TezOperator, connecting the inputs.
+            // Mark the start of a new TezOperator which will do the actual limiting with 1 task.
             blocking();
 
-            // As an optimization, don't do any sorting in the shuffle, as LIMIT does not make any
-            // ordering guarantees.
-            // TODO Enable this after TEZ-661
-            // TezEdgeDescriptor edge = curTezOp.inEdges.get(lastOp.getOperatorKey());
-            // edge.outputClassName = OnFileUnorderedKVOutput.class.getName();
-            // edge.inputClassName = ShuffledUnorderedKVInput.class.getName();
+            // Explicitly set the parallelism for the new vertex to 1.
+            curTezOp.setRequestedParallelism(1);
 
-            // Then add a POPackage and a POForEach to the start of the new tezOp.
-            POPackage pkg = getPackage(1, DataType.TUPLE);
-            POForEach forEach = TezCompilerUtil.getForEachPlain(scope, nig);
-            pkg.setAlias(op.getAlias());
-            forEach.setAlias(op.getAlias());
-            curTezOp.plan.add(pkg);
-            curTezOp.plan.addAsLeaf(forEach);
+            output.addOutputKey(curTezOp.getOperatorKey().toString());
+            // LIMIT does not make any ordering guarantees and this is unsorted shuffle.
+            TezEdgeDescriptor edge = curTezOp.inEdges.get(prevOp.getOperatorKey());
+            TezCompilerUtil.configureValueOnlyTupleOutput(edge, DataMovementType.SCATTER_GATHER);
+
+            // Then add a POValueInputTez to the start of the new tezOp.
+            POValueInputTez input = new POValueInputTez(OperatorKey.genOpKey(scope));
+            input.setAlias(op.getAlias());
+            input.setInputKey(prevOp.getOperatorKey().toString());
+            curTezOp.plan.addAsLeaf(input);
 
             if (!pigContext.inIllustrator) {
                 POLimit limitCopy = new POLimit(OperatorKey.genOpKey(scope));
@@ -804,8 +805,6 @@ public class TezCompiler extends PhyPlan
                 curTezOp.plan.addAsLeaf(op);
             }
 
-            // Explicitly set the parallelism for the new vertex to 1.
-            curTezOp.setRequestedParallelism(1);
         } catch (Exception e) {
             int errCode = 2034;
             String msg = "Error compiling operator " + op.getClass().getSimpleName();
@@ -1972,6 +1971,7 @@ public class TezCompiler extends PhyPlan
             if (rp == -1) {
                 rp = pigContext.defaultParallel;
             }
+
             // if rp is still -1, let it be, TezParallelismEstimator will set it to an estimated rp
             Pair<TezOperator, Integer> quantJobParallelismPair = getOrderbySamplingAggregationJob(op, rp);
             TezOperator[] sortOpers = getSortJobs(prevOper, lr, op, keyType, fields);
@@ -2020,6 +2020,48 @@ public class TezCompiler extends PhyPlan
 //                curTezOp.isUDFComparatorUsed = true;
 //            }
             quantJobParallelismPair.first.sortOperator = sortOpers[1];
+
+            // If Order by followed by Limit and parallelism of order by is not 1
+            // add a new vertex for Limit with parallelism 1.
+            // Equivalent of LimitAdjuster.java in MR
+            if (op.isLimited() && rp != 1) {
+                POValueOutputTez output = new POValueOutputTez(OperatorKey.genOpKey(scope));
+                output.setAlias(op.getAlias());
+                sortOpers[1].plan.addAsLeaf(output);
+
+                TezOperator limitOper = getTezOp();
+                tezPlan.add(limitOper);
+                curTezOp = limitOper;
+
+                // Explicitly set the parallelism for the new vertex to 1.
+                limitOper.setRequestedParallelism(1);
+
+                edge = TezCompilerUtil.connect(tezPlan, sortOpers[1], limitOper);
+                // LIMIT in this case should be ordered. So we output unordered with key as task index
+                // and on the input we use ShuffledMergedInput to do ordered merge to retain sorted order.
+                output.addOutputKey(limitOper.getOperatorKey().toString());
+                output.setTaskIndexWithRecordIndexAsKey(true);
+                edge = curTezOp.inEdges.get(sortOpers[1].getOperatorKey());
+                TezCompilerUtil.configureValueOnlyTupleOutput(edge, DataMovementType.SCATTER_GATHER);
+                // POValueOutputTez will write key (task index, record index) in
+                // sorted order. So using OnFileUnorderedKVOutput instead of OnFileSortedOutput.
+                // But input needs to be merged in sorter order and requires ShuffledMergedInput
+                edge.outputClassName = OnFileUnorderedKVOutput.class.getName();
+                edge.inputClassName = ShuffledMergedInput.class.getName();
+                edge.setIntermediateOutputKeyClass(TezCompilerUtil.TUPLE_CLASS);
+                edge.setIntermediateOutputKeyComparatorClass(PigTupleWritableComparator.class.getName());
+
+                // Then add a POValueInputTez to the start of the new tezOp followed by a LIMIT
+                POValueInputTez input = new POValueInputTez(OperatorKey.genOpKey(scope));
+                input.setAlias(op.getAlias());
+                input.setInputKey(sortOpers[1].getOperatorKey().toString());
+                curTezOp.plan.addAsLeaf(input);
+
+                POLimit limit = new POLimit(OperatorKey.genOpKey(scope));
+                limit.setLimit(op.getLimit());
+                curTezOp.plan.addAsLeaf(limit);
+            }
+
             phyToTezOpMap.put(op, curTezOp);
         }catch(Exception e){
             int errCode = 2034;

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1608593&r1=1608592&r2=1608593&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Mon Jul  7 20:08:14 2014
@@ -47,6 +47,7 @@ import org.apache.pig.backend.executione
 import org.apache.pig.backend.hadoop.HDataType;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.InputSizeReducerEstimator;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingBigDecimalWritableComparator;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingBigIntegerWritableComparator;
@@ -61,7 +62,6 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingPartitionWritableComparator;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigGroupingTupleWritableComparator;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler.PigSecondaryKeyGroupComparator;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.InputSizeReducerEstimator;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PhyPlanSetter;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigBigDecimalRawComparator;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigBigIntegerRawComparator;
@@ -130,6 +130,7 @@ import org.apache.tez.mapreduce.input.MR
 import org.apache.tez.mapreduce.output.MROutput;
 import org.apache.tez.mapreduce.partition.MRPartitioner;
 import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValueInput;
+import org.apache.tez.runtime.library.input.ShuffledMergedInput;
 import org.apache.tez.runtime.library.input.SortedGroupedMergedInput;
 
 /**
@@ -242,10 +243,15 @@ public class TezDagBuilder extends TezOp
 
         EdgeProperty edgeProperty = newEdge(fromOp, toOp);
 
-        String groupInputClass = edgeProperty.getDataMovementType().equals(
-                DataMovementType.SCATTER_GATHER)
-                        ? SortedGroupedMergedInput.class.getName()
-                        : ConcatenatedMergedKeyValueInput.class.getName();
+        String groupInputClass = ConcatenatedMergedKeyValueInput.class.getName();
+
+        // In case of SCATTER_GATHER and ShuffledUnorderedKVInput it will still be
+        // ConcatenatedMergedKeyValueInput
+        if(edgeProperty.getDataMovementType().equals(DataMovementType.SCATTER_GATHER)
+                && edgeProperty.getEdgeDestination().getClassName().equals(ShuffledMergedInput.class.getName())) {
+            groupInputClass = SortedGroupedMergedInput.class.getName();
+        }
+
         return new GroupInputEdge(from, to, edgeProperty,
                 new InputDescriptor(groupInputClass).setUserPayload(edgeProperty.getEdgeDestination().getUserPayload()));
     }
@@ -533,7 +539,7 @@ public class TezDagBuilder extends TezOp
                 }
             }
         }
-        JobControlCompiler.setOutputFormat(payloadConf);
+        JobControlCompiler.setOutputFormat(job);
 
         // set parent plan in all operators. currently the parent plan is really
         // used only when POStream, POSplit are present in the plan
@@ -631,7 +637,7 @@ public class TezDagBuilder extends TezOp
                 }
             }
             sortOper = mPlan.getSuccessors(sampleBasedPartionerOper).get(0);
-            
+
             if (sortOper.getRequestedParallelism()==-1 && pc.defaultParallel==-1) {
                 // set estimate parallelism for order by/skewed join to sampler parallelism
                 // that include:
@@ -652,7 +658,7 @@ public class TezDagBuilder extends TezOp
         // Take our assembled configuration and create a vertex
         byte[] userPayload = TezUtils.createUserPayloadFromConf(payloadConf);
         procDesc.setUserPayload(userPayload);
-        
+
         Vertex vertex = new Vertex(tezOp.getOperatorKey().toString(), procDesc, parallelism,
                 isMap ? MRHelpers.getMapResource(globalConf) : MRHelpers.getReduceResource(globalConf));
 
@@ -728,7 +734,7 @@ public class TezDagBuilder extends TezOp
         if (stores.size() > 0) {
             new PigOutputFormat().checkOutputSpecs(job);
         }
-        
+
         // Set the right VertexManagerPlugin
         if (tezOp.getEstimatedParallelism() != -1) {
             if (tezOp.isGlobalSort()||tezOp.isSkewedJoin()) {
@@ -1147,7 +1153,7 @@ public class TezDagBuilder extends TezOp
         conf.set(TezJobConfig.TEZ_RUNTIME_INTERMEDIATE_INPUT_KEY_SECONDARY_COMPARATOR_CLASS,
                 comparatorClass);
     }
-    
+
     public static int estimateParallelism(Job job, TezOperPlan tezPlan,
             TezOperator tezOp) throws IOException {
         Configuration conf = job.getConfiguration();

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java?rev=1608593&r1=1608592&r2=1608593&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java Mon Jul  7 20:08:14 2014
@@ -41,19 +41,19 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.tez.optimizers.UnionOptimizer;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
-import org.apache.pig.impl.plan.CompilationMessageCollector.MessageType;
 import org.apache.pig.impl.plan.CompilationMessageCollector;
+import org.apache.pig.impl.plan.CompilationMessageCollector.MessageType;
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.UDFContext;
 import org.apache.pig.tools.pigstats.OutputStats;
 import org.apache.pig.tools.pigstats.PigStats;
+import org.apache.pig.tools.pigstats.tez.TezScriptState;
 import org.apache.pig.tools.pigstats.tez.TezStats;
 import org.apache.pig.tools.pigstats.tez.TezTaskStats;
-import org.apache.pig.tools.pigstats.tez.TezScriptState;
 import org.apache.tez.common.TezUtils;
-import org.apache.tez.dag.api.Vertex;
 import org.apache.tez.dag.api.TezConfiguration;
+import org.apache.tez.dag.api.Vertex;
 
 /**
  * Main class that launches pig for Tez
@@ -67,8 +67,8 @@ public class TezLauncher extends Launche
 
     @Override
     public PigStats launchPig(PhysicalPlan php, String grpName, PigContext pc) throws Exception {
-        if (pc.defaultParallel == -1 && 
-                !Boolean.parseBoolean(pc.getProperties().getProperty("pig.tez.auto.parallelism", "true"))) {
+        if (pc.defaultParallel == -1 &&
+                !Boolean.parseBoolean(pc.getProperties().getProperty(PigConfiguration.TEZ_AUTO_PARALLELISM, "true"))) {
             pc.defaultParallel = 1;
         }
         aggregateWarning = Boolean.parseBoolean(pc.getProperties().getProperty("aggregate.warning", "false"));

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPrinter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPrinter.java?rev=1608593&r1=1608592&r2=1608593&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPrinter.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezPrinter.java Mon Jul  7 20:08:14 2014
@@ -99,12 +99,20 @@ public class TezPrinter extends TezOpPla
 
         @Override
         public void visitTezOp(TezOperator tezOper) throws VisitorException {
-            buf.append("Tez vertex " + tezOper.getOperatorKey().toString());
+            if (tezOper.isVertexGroup()) {
+                buf.append("Tez vertex group " + tezOper.getOperatorKey().toString());
+            } else {
+                buf.append("Tez vertex " + tezOper.getOperatorKey().toString());
+            }
             List<TezOperator> succs = mPlan.getSuccessors(tezOper);
             if (succs != null) {
                 buf.append("\t->\t");
                 for (TezOperator op : succs) {
-                    buf.append("Tez vertex " + op.getOperatorKey().toString()).append(",");
+                    if (op.isVertexGroup()) {
+                        buf.append("Tez vertex group " + op.getOperatorKey().toString()).append(",");
+                    } else {
+                        buf.append("Tez vertex " + op.getOperatorKey().toString()).append(",");
+                    }
                 }
             }
             buf.append("\n");

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/UnionOptimizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/UnionOptimizer.java?rev=1608593&r1=1608592&r2=1608593&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/UnionOptimizer.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/UnionOptimizer.java Mon Jul  7 20:08:14 2014
@@ -22,7 +22,6 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map.Entry;
 
-import org.apache.commons.lang.ArrayUtils;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
@@ -41,8 +40,8 @@ import org.apache.pig.impl.plan.Operator
 import org.apache.pig.impl.plan.ReverseDependencyOrderWalker;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
-import org.apache.tez.runtime.library.input.ShuffledMergedInput;
-import org.apache.tez.runtime.library.output.OnFileSortedOutput;
+import org.apache.tez.runtime.library.input.ShuffledUnorderedKVInput;
+import org.apache.tez.runtime.library.output.OnFileUnorderedPartitionedKVOutput;
 
 /**
  * Optimizes union by removing the intermediate union vertex and making the
@@ -207,15 +206,8 @@ public class UnionOptimizer extends TezO
                 if (edge.dataMovementType == DataMovementType.ONE_TO_ONE) {
                     edge.dataMovementType = DataMovementType.SCATTER_GATHER;
                     edge.partitionerClass = RoundRobinPartitioner.class;
-                    edge.outputClassName = OnFileSortedOutput.class.getName();
-                    edge.inputClassName = ShuffledMergedInput.class.getName();
-
-                    for (TezOutput tezOutput : valueOnlyOutputs) {
-                        if (ArrayUtils.contains(tezOutput.getTezOutputs(), entry.getKey().toString())) {
-                            edge.setIntermediateOutputKeyComparatorClass(
-                                    POValueOutputTez.EmptyWritableComparator.class.getName());
-                        }
-                    }
+                    edge.outputClassName = OnFileUnorderedPartitionedKVOutput.class.getName();
+                    edge.inputClassName = ShuffledUnorderedKVInput.class.getName();
                 }
                 TezOperator vertexGroupOp = outputVertexGroupOps[unionOutputKeys.indexOf(entry.getKey().toString())];
                 for (OperatorKey predKey : vertexGroupOp.getVertexGroupMembers()) {

Modified: pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java?rev=1608593&r1=1608592&r2=1608593&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java (original)
+++ pig/trunk/src/org/apache/pig/tools/grunt/GruntParser.java Mon Jul  7 20:08:14 2014
@@ -177,7 +177,8 @@ public class GruntParser extends PigScri
                         mNumFailedJobs++;
                         Exception exp = (js.getException() != null) ? js.getException()
                                 : new ExecException(
-                                        "Job " + js.getName() + " failed, hadoop does not return any error message",
+                                        "Job " + (js.getJobId() == null ? "" : js.getJobId() + " ") +
+                                        "failed, hadoop does not return any error message",
                                         2244);
                         LogUtils.writeLog(exp,
                                 mPigServer.getPigContext().getProperties().getProperty("pig.logfile"),

Modified: pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezTaskStats.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezTaskStats.java?rev=1608593&r1=1608592&r2=1608593&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezTaskStats.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezTaskStats.java Mon Jul  7 20:08:14 2014
@@ -120,8 +120,8 @@ public class TezTaskStats extends JobSta
             String filename = sto.getSFile().getFileName();
             if (map != null) {
                 if (sto.isMultiStore()) {
-                    Long n = map.get(PigStatsUtil.MULTI_STORE_COUNTER_GROUP)
-                            .get(PigStatsUtil.getMultiStoreCounterName(sto));
+                    Map<String, Long> msGroup = map.get(PigStatsUtil.MULTI_STORE_COUNTER_GROUP);
+                    Long n = msGroup == null ? null: msGroup.get(PigStatsUtil.getMultiStoreCounterName(sto));
                     if (n != null) records = n;
                 } else if (map.get(TASK_COUNTER_GROUP) != null
                         && map.get(TASK_COUNTER_GROUP).get(TaskCounter.OUTPUT_RECORDS.name()) != null) {

Modified: pig/trunk/test/e2e/pig/tests/nightly.conf
URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/tests/nightly.conf?rev=1608593&r1=1608592&r2=1608593&view=diff
==============================================================================
--- pig/trunk/test/e2e/pig/tests/nightly.conf (original)
+++ pig/trunk/test/e2e/pig/tests/nightly.conf Mon Jul  7 20:08:14 2014
@@ -2056,12 +2056,11 @@ store d into ':OUTPATH:';\,
 			},
 			{
 				'num' => 2,
-                                'ignore23' => 'The record limit pick is different in 23',
 				'pig' =>q\a = load ':INPATH:/singlefile/studentnulltab10k';
-b = order a by $0, $1;
+b = order a by $0, $1, $2;
 c = limit b 100;
 store c into ':OUTPATH:';\,
-            	'sortArgs' => ['-t', '	', '-k', '1,2'],
+            	'sortArgs' => ['-t', '	', '-k', '1,3'],
 			},
 			{
 				# Make sure that limit higher than number of rows doesn't mess stuff up
@@ -2604,7 +2603,7 @@ register :FUNCPATH:/testudf.jar;
 a = load ':INPATH:/singlefile/studenttab10k' using PigStorage() as (name:chararray, age:int, gpa: double);
 b = foreach a generate CONCAT('(', name), CONCAT((chararray)age, ' )');
 store b into ':OUTPATH:.intermediate' using PigStorage(',');
-c = load ':OUTPATH:.intermediate' using org.apache.pig.test.udf.storefunc.DumpLoader();
+c = load ':OUTPATH:.intermediate' using DumpLoader();
 store c into ':OUTPATH:';\,
 
             'notmq' => 1,

Modified: pig/trunk/test/org/apache/pig/test/MiniGenericCluster.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/MiniGenericCluster.java?rev=1608593&r1=1608592&r2=1608593&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/MiniGenericCluster.java (original)
+++ pig/trunk/test/org/apache/pig/test/MiniGenericCluster.java Mon Jul  7 20:08:14 2014
@@ -58,7 +58,9 @@ abstract public class MiniGenericCluster
         if (INSTANCE == null) {
             String execType = System.getProperty("test.exec.type");
             if (execType == null) {
-                throw new RuntimeException("test.exec.type is not set");
+                // Default to MR
+                System.setProperty("test.exec.type", EXECTYPE_MR);
+                return buildCluster(EXECTYPE_MR);
             }
 
             return buildCluster(execType);