You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2014/09/24 19:55:35 UTC

svn commit: r1627376 - in /pig/trunk: ./ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/tez/ src/org/apache/pig/backend/hadoop/executionengine/tez/plan/ src/org/ap...

Author: rohini
Date: Wed Sep 24 17:55:34 2014
New Revision: 1627376

URL: http://svn.apache.org/r1627376
Log:
PIG-4162: Intermediate reducer parallelism in Tez should be higher (rohini)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/Main.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezParallelismEstimator.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/ParallelConstantVisitor.java
    pig/trunk/src/org/apache/pig/impl/PigImplConstants.java
    pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java
    pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezStats.java
    pig/trunk/test/e2e/pig/tests/bigdata.conf
    pig/trunk/test/org/apache/pig/test/TestAlgebraicEval.java
    pig/trunk/test/org/apache/pig/test/TestForEachNestedPlan.java
    pig/trunk/test/tez-tests

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1627376&r1=1627375&r2=1627376&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Sep 24 17:55:34 2014
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
  
 IMPROVEMENTS
 
+PIG-4162: Intermediate reducer parallelism in Tez should be higher (rohini)
+
 PIG-4186: Fix e2e run against new build of pig and some enhancements (rohini)
 
 PIG-3838: Organize tez code into subpackages (rohini)

Modified: pig/trunk/src/org/apache/pig/Main.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/Main.java?rev=1627376&r1=1627375&r2=1627376&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/Main.java (original)
+++ pig/trunk/src/org/apache/pig/Main.java Wed Sep 24 17:55:34 2014
@@ -167,26 +167,17 @@ public class Main {
      * @throws IOException
      */
     public static void main(String args[]) {
-        DateTime startTime = new DateTime();
-
-        int exitcode = run(args, null);
-
-        DateTime endTime = new DateTime();
-        Duration duration = new Duration(startTime, endTime);
-        Period period = duration.toPeriod().normalizedStandard(PeriodType.time());
-        log.info("Pig script completed in "
-                + PeriodFormat.getDefault().print(period)
-                + " (" + duration.getMillis() + " ms)");
-
-        System.exit(exitcode);
+        System.exit(run(args, null));
     }
 
     static int run(String args[], PigProgressNotificationListener listener) {
+        DateTime startTime = new DateTime();
         int rc = 1;
         boolean verbose = false;
         boolean gruntCalled = false;
         boolean deleteTempFiles = true;
         String logFileName = null;
+        boolean printScriptRunTime = true;
 
         try {
             Configuration conf = new Configuration(false);
@@ -301,6 +292,7 @@ public class Main {
                     return ReturnCode.SUCCESS;
 
                 case 'i':
+                    printScriptRunTime = false;
                     System.out.println(getVersionString());
                     return ReturnCode.SUCCESS;
 
@@ -670,6 +662,9 @@ public class Main {
                 LogUtils.writeLog(e, logFileName, log, verbose, "Error before Pig is launched");
             }
         } finally {
+            if (printScriptRunTime) {
+                printScriptRunTime(startTime);
+            }
             if (deleteTempFiles) {
                 // clear temp files
                 FileLocalizer.deleteTempFiles();
@@ -680,6 +675,15 @@ public class Main {
         return rc;
     }
 
+    private static void printScriptRunTime(DateTime startTime) {
+        DateTime endTime = new DateTime();
+        Duration duration = new Duration(startTime, endTime);
+        Period period = duration.toPeriod().normalizedStandard(PeriodType.time());
+        log.info("Pig script completed in "
+                + PeriodFormat.getDefault().print(period)
+                + " (" + duration.getMillis() + " ms)");
+    }
+
     protected static PigProgressNotificationListener makeListener(Properties properties) {
 
         try {

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1627376&r1=1627375&r2=1627376&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Wed Sep 24 17:55:34 2014
@@ -1039,9 +1039,9 @@ public class JobControlCompiler{
         Configuration conf = nwJob.getConfiguration();
 
         // set various parallelism into the job conf for later analysis, PIG-2779
-        conf.setInt("pig.info.reducers.default.parallel", pigContext.defaultParallel);
-        conf.setInt("pig.info.reducers.requested.parallel", mro.requestedParallelism);
-        conf.setInt("pig.info.reducers.estimated.parallel", mro.estimatedParallelism);
+        conf.setInt(PigImplConstants.REDUCER_DEFAULT_PARALLELISM, pigContext.defaultParallel);
+        conf.setInt(PigImplConstants.REDUCER_REQUESTED_PARALLELISM, mro.requestedParallelism);
+        conf.setInt(PigImplConstants.REDUCER_ESTIMATED_PARALLELISM, mro.estimatedParallelism);
 
         // this is for backward compatibility, and we encourage to use runtimeParallelism at runtime
         mro.requestedParallelism = jobParallelism;

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1627376&r1=1627375&r2=1627376&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Wed Sep 24 17:55:34 2014
@@ -29,6 +29,7 @@ import java.util.TreeMap;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.RawComparator;
 import org.apache.hadoop.io.WritableComparable;
@@ -88,6 +89,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezEdgeDescriptor;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOpPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
@@ -102,8 +104,10 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigProcessor;
 import org.apache.pig.backend.hadoop.executionengine.tez.util.MRToTezHelper;
 import org.apache.pig.backend.hadoop.executionengine.tez.util.SecurityHelper;
+import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.PigImplConstants;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.NullablePartitionWritable;
 import org.apache.pig.impl.io.NullableTuple;
@@ -156,6 +160,7 @@ public class TezDagBuilder extends TezOp
     private Map<String, LocalResource> localResources;
     private PigContext pc;
     private Configuration globalConf;
+    private long intermediateTaskInputSize;
 
     public TezDagBuilder(PigContext pc, TezOperPlan plan, DAG dag,
             Map<String, LocalResource> localResources) {
@@ -172,6 +177,19 @@ public class TezDagBuilder extends TezOp
         } catch (IOException e) {
             throw new RuntimeException("Error while fetching delegation tokens", e);
         }
+
+        try {
+            intermediateTaskInputSize = HadoopShims.getDefaultBlockSize(FileSystem.get(globalConf), FileLocalizer.getTemporaryResourcePath(pc));
+        } catch (Exception e) {
+            log.warn("Unable to get the block size for temporary directory, defaulting to 128MB", e);
+            intermediateTaskInputSize = 134217728L;
+        }
+        // At least 128MB. Else we will end up with too many tasks
+        intermediateTaskInputSize = Math.max(intermediateTaskInputSize, 134217728L);
+        intermediateTaskInputSize = Math.min(intermediateTaskInputSize,
+                globalConf.getLong(
+                        InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
+                        InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER));
     }
 
     @Override
@@ -410,7 +428,19 @@ public class TezDagBuilder extends TezOp
         }
 
         if (tezOp.getSortOperator() != null) {
+            // Required by Sample Aggregation job for estimating quantiles
             payloadConf.set(PigProcessor.SORT_VERTEX, tezOp.getSortOperator().getOperatorKey().toString());
+            // PIG-4162: Order by/Skew Join in intermediate stage.
+            // Increasing order by parallelism may not be required as it is
+            // usually followed by limit other than store. But would benefit
+            // cases like skewed join followed by group by.
+            if (tezOp.getSortOperator().getEstimatedParallelism() != -1
+                    && TezCompilerUtil.isIntermediateReducer(tezOp.getSortOperator())) {
+                payloadConf.setLong(
+                        InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
+                        intermediateTaskInputSize);
+            }
+
         }
 
         payloadConf.set("pig.inputs", ObjectSerializer.serialize(tezOp.getLoaderInfo().getInp()));
@@ -453,8 +483,7 @@ public class TezDagBuilder extends TezOp
             tezOp.plan.remove(pack);
             payloadConf.set("pig.reduce.package", ObjectSerializer.serialize(pack));
             setIntermediateOutputKeyValue(keyType, payloadConf, tezOp);
-            POShuffleTezLoad newPack;
-            newPack = new POShuffleTezLoad(pack);
+            POShuffleTezLoad newPack = new POShuffleTezLoad(pack);
             if (tezOp.isSkewedJoin()) {
                 newPack.setSkewedJoins(true);
             }
@@ -556,9 +585,9 @@ public class TezDagBuilder extends TezOp
         }
 
         // set various parallelism into the job conf for later analysis, PIG-2779
-        payloadConf.setInt("pig.info.reducers.default.parallel", pc.defaultParallel);
-        payloadConf.setInt("pig.info.reducers.requested.parallel", tezOp.getRequestedParallelism());
-        payloadConf.setInt("pig.info.reducers.estimated.parallel", tezOp.getEstimatedParallelism());
+        payloadConf.setInt(PigImplConstants.REDUCER_DEFAULT_PARALLELISM, pc.defaultParallel);
+        payloadConf.setInt(PigImplConstants.REDUCER_REQUESTED_PARALLELISM, tezOp.getRequestedParallelism());
+        payloadConf.setInt(PigImplConstants.REDUCER_ESTIMATED_PARALLELISM, tezOp.getEstimatedParallelism());
 
         // Take our assembled configuration and create a vertex
         UserPayload userPayload = TezUtils.createUserPayloadFromConf(payloadConf);
@@ -670,8 +699,12 @@ public class TezDagBuilder extends TezOp
                     vmPluginName = ShuffleVertexManager.class.getName();
                     vmPluginConf = (vmPluginConf == null) ? ConfigurationUtil.toConfiguration(pc.getProperties(), false) : vmPluginConf;
                     vmPluginConf.setBoolean(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL, true);
-                    if (vmPluginConf.getLong(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
-                            InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER)!=
+                    if (stores.size() <= 0) {
+                        // Intermediate reduce. Set the bytes per reducer to be block size.
+                        vmPluginConf.setLong(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE,
+                                        intermediateTaskInputSize);
+                    } else if (vmPluginConf.getLong(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
+                                    InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER) !=
                                     InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER) {
                         vmPluginConf.setLong(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE,
                                 vmPluginConf.getLong(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java?rev=1627376&r1=1627375&r2=1627376&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java Wed Sep 24 17:55:34 2014
@@ -270,7 +270,11 @@ public class TezLauncher extends Launche
                     TezTaskStats tts = tezStats.getVertexStats(v.getName());
                     tezScriptState.emitjobFinishedNotification(tts);
                     Map<String, Map<String, Long>> counterGroups = runningJob.getVertexCounters(v.getName());
-                    computeWarningAggregate(counterGroups, warningAggMap);
+                    if (counterGroups == null) {
+                        log.warn("Counters are not available for vertex " + v.getName() + ". Not computing warning aggregates.");
+                    } else {
+                        computeWarningAggregate(counterGroups, warningAggMap);
+                    }
                 }
                 if (aggregateWarning) {
                     CompilationMessageCollector.logAggregate(warningAggMap, MessageType.Warning, log);

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java?rev=1627376&r1=1627375&r2=1627376&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java Wed Sep 24 17:55:34 2014
@@ -33,7 +33,6 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.pig.backend.hadoop.datastorage.HPath;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
 
@@ -54,7 +53,7 @@ public class TezResourceManager {
 
     public void init(PigContext pigContext, Configuration conf) throws IOException {
         if (!inited) {
-            this.stagingDir = ((HPath)FileLocalizer.getTemporaryResourcePath(pigContext)).getPath();
+            this.stagingDir = FileLocalizer.getTemporaryResourcePath(pigContext);
             this.remoteFs = FileSystem.get(conf);
             this.conf = conf;
             this.inited = true;

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java?rev=1627376&r1=1627375&r2=1627376&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java Wed Sep 24 17:55:34 2014
@@ -810,6 +810,7 @@ public class TezCompiler extends PhyPlan
                     }
                 }
                 if (canStop) {
+                    curTezOp.setDontEstimateParallelism(true);
                     if (limitAfterSort) {
                         curTezOp.markLimitAfterSort();
                     } else {
@@ -830,6 +831,7 @@ public class TezCompiler extends PhyPlan
 
             // Explicitly set the parallelism for the new vertex to 1.
             curTezOp.setRequestedParallelism(1);
+            curTezOp.setDontEstimateParallelism(true);
 
             output.addOutputKey(curTezOp.getOperatorKey().toString());
             // LIMIT does not make any ordering guarantees and this is unsorted shuffle.
@@ -1094,6 +1096,7 @@ public class TezCompiler extends PhyPlan
         indexAggrOper.segmentBelow = true;
 
         indexerTezOp.setRequestedParallelism(1); // we need exactly one reducer for indexing job.
+        indexerTezOp.setDontEstimateParallelism(true);
 
         POStore st = TezCompilerUtil.getStore(scope, nig);
         FileSpec strFile = getTempFileSpec();
@@ -1261,6 +1264,7 @@ public class TezCompiler extends PhyPlan
                 tezPlan.add(rightTezOprAggr);
                 TezCompilerUtil.simpleConnectTwoVertex(tezPlan, rightTezOpr, rightTezOprAggr, scope, nig);
                 rightTezOprAggr.setRequestedParallelism(1); // we need exactly one task for indexing job.
+                rightTezOprAggr.setDontEstimateParallelism(true);
 
                 POStore st = TezCompilerUtil.getStore(scope, nig);
                 FileSpec strFile = getTempFileSpec();
@@ -1401,6 +1405,7 @@ public class TezCompiler extends PhyPlan
             POCounterStatsTez counterStatsTez = new POCounterStatsTez(OperatorKey.genOpKey(scope));
             statsOper.plan.addAsLeaf(counterStatsTez);
             statsOper.setRequestedParallelism(1);
+            statsOper.setDontEstimateParallelism(true);
 
             //Construct Vertex 3
             TezOperator rankOper = getTezOp();
@@ -2037,6 +2042,7 @@ public class TezCompiler extends PhyPlan
         oper.setClosed(true);
 
         oper.setRequestedParallelism(1);
+        oper.setDontEstimateParallelism(true);
         oper.markSampleAggregation();
         return new Pair<TezOperator, Integer>(oper, rp);
     }
@@ -2286,6 +2292,7 @@ public class TezCompiler extends PhyPlan
 
                 // Explicitly set the parallelism for the new vertex to 1.
                 limitOper.setRequestedParallelism(1);
+                limitOper.setDontEstimateParallelism(true);
                 limitOper.markLimitAfterSort();
 
                 edge = TezCompilerUtil.connect(tezPlan, sortOpers[1], limitOper);

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java?rev=1627376&r1=1627375&r2=1627376&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java Wed Sep 24 17:55:34 2014
@@ -70,6 +70,10 @@ public class TezOperator extends Operato
 
     private int estimatedParallelism = -1;
 
+    // Do not estimate parallelism for specific vertices like limit, indexer,
+    // etc which should always be one
+    private boolean dontEstimateParallelism = false;
+
     // This is the parallelism of the vertex, it take account of:
     // 1. default_parallel
     // 2. -1 parallelism for one_to_one edge
@@ -251,7 +255,17 @@ public class TezOperator extends Operato
     }
 
     public int getEffectiveParallelism() {
-        return getRequestedParallelism()!=-1? getRequestedParallelism() : getEstimatedParallelism();
+        // PIG-4162: For intermediate reducers, use estimated parallelism over user set parallelism.
+        return getEstimatedParallelism() == -1 ? getRequestedParallelism()
+                : getEstimatedParallelism();
+    }
+
+    public boolean isDontEstimateParallelism() {
+        return dontEstimateParallelism;
+    }
+
+    public void setDontEstimateParallelism(boolean dontEstimateParallelism) {
+        this.dontEstimateParallelism = dontEstimateParallelism;
     }
 
     public OperatorKey getSplitParent() {

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java?rev=1627376&r1=1627375&r2=1627376&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java Wed Sep 24 17:55:34 2014
@@ -17,17 +17,21 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer;
 
-import java.io.IOException;
+import java.util.LinkedList;
 import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.pig.PigConfiguration;
+import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezEdgeDescriptor;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOpPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.NativeTezOper;
+import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil;
 import org.apache.pig.backend.hadoop.executionengine.util.ParallelConstantVisitor;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.plan.DependencyOrderWalker;
@@ -36,12 +40,26 @@ import org.apache.pig.impl.plan.VisitorE
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 
 public class ParallelismSetter extends TezOpPlanVisitor {
-    Configuration conf;
-    PigContext pc;
+    private Configuration conf;
+    private PigContext pc;
+    private TezParallelismEstimator estimator;
+    private boolean autoParallelismEnabled;
+
     public ParallelismSetter(TezOperPlan plan, PigContext pigContext) {
         super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan));
         this.pc = pigContext;
         this.conf = ConfigurationUtil.toConfiguration(pc.getProperties());
+        this.autoParallelismEnabled = conf.getBoolean(PigConfiguration.TEZ_AUTO_PARALLELISM, true);
+        try {
+            this.estimator = conf.get(PigConfiguration.REDUCER_ESTIMATOR_KEY) == null ? new TezOperDependencyParallelismEstimator()
+            : PigContext.instantiateObjectFromParams(conf,
+                    PigConfiguration.REDUCER_ESTIMATOR_KEY, PigConfiguration.REDUCER_ESTIMATOR_ARG_KEY,
+                    TezParallelismEstimator.class);
+            this.estimator.setPigContext(pc);
+
+        } catch (ExecException e) {
+            throw new RuntimeException("Error instantiating TezParallelismEstimator", e);
+        }
     }
 
     @Override
@@ -53,6 +71,11 @@ public class ParallelismSetter extends T
             // Can only set parallelism here if the parallelism isn't derived from
             // splits
             int parallelism = -1;
+            boolean intermediateReducer = false;
+            LinkedList<POStore> stores = PlanHelper.getPhysicalOperators(tezOp.plan, POStore.class);
+            if (stores.size() <= 0) {
+                intermediateReducer = true;
+            }
             if (tezOp.getLoaderInfo().getLoads() != null && tezOp.getLoaderInfo().getLoads().size() > 0) {
                 // TODO: Can be set to -1 if TEZ-601 gets fixed and getting input
                 // splits can be moved to if(loads) block below
@@ -61,6 +84,8 @@ public class ParallelismSetter extends T
             } else {
                 int prevParallelism = -1;
                 boolean isOneToOneParallelism = false;
+                intermediateReducer = TezCompilerUtil.isIntermediateReducer(tezOp);
+
                 for (Map.Entry<OperatorKey,TezEdgeDescriptor> entry : tezOp.inEdges.entrySet()) {
                     if (entry.getValue().dataMovementType == DataMovementType.ONE_TO_ONE) {
                         TezOperator pred = mPlan.getOperator(entry.getKey());
@@ -71,24 +96,29 @@ public class ParallelismSetter extends T
                             throw new VisitorException("one to one sources parallelism for vertex "
                                     + tezOp.getOperatorKey().toString() + " are not equal");
                         }
-                        if (pred.getRequestedParallelism()!=-1) {
-                            tezOp.setRequestedParallelism(pred.getRequestedParallelism());
-                        } else {
-                            tezOp.setEstimatedParallelism(pred.getEstimatedParallelism());
-                        }
+                        tezOp.setRequestedParallelism(pred.getRequestedParallelism());
+                        tezOp.setEstimatedParallelism(pred.getEstimatedParallelism());
                         isOneToOneParallelism = true;
                         parallelism = -1;
                     }
                 }
                 if (!isOneToOneParallelism) {
-                    if (tezOp.getRequestedParallelism()!=-1) {
+                    if (tezOp.getRequestedParallelism() != -1) {
                         parallelism = tezOp.getRequestedParallelism();
-                    } else if (pc.defaultParallel!=-1) {
+                    } else if (pc.defaultParallel != -1) {
                         parallelism = pc.defaultParallel;
-                    } else {
-                        parallelism = estimateParallelism(mPlan, tezOp);
-                        tezOp.setEstimatedParallelism(parallelism);
-                        if (tezOp.isGlobalSort()||tezOp.isSkewedJoin()) {
+                    }
+                    if (autoParallelismEnabled &&
+                            ((parallelism == -1 || intermediateReducer) && !tezOp.isDontEstimateParallelism())) {
+                        if (tezOp.getEstimatedParallelism() == -1) {
+                            // Override user specified parallelism with the estimated value
+                            // if it is intermediate reducer
+                            parallelism = estimator.estimateParallelism(mPlan, tezOp, conf);
+                            tezOp.setEstimatedParallelism(parallelism);
+                        } else {
+                            parallelism = tezOp.getEstimatedParallelism();
+                        }
+                        if (tezOp.isGlobalSort() || tezOp.isSkewedJoin()) {
                             // Vertex manager will set parallelism
                             parallelism = -1;
                         }
@@ -98,7 +128,7 @@ public class ParallelismSetter extends T
 
             // Once we decide the parallelism of the sampler, propagate to
             // downstream operators if necessary
-            if (tezOp.isSampler()) {
+            if (tezOp.isSampler() && autoParallelismEnabled) {
                 // There could be multiple sampler and share the same sample aggregation job
                 // and partitioner job
                 TezOperator sampleAggregationOper = null;
@@ -116,7 +146,7 @@ public class ParallelismSetter extends T
                 }
                 sortOper = mPlan.getSuccessors(sampleBasedPartionerOper).get(0);
 
-                if (sortOper.getRequestedParallelism()==-1 && pc.defaultParallel==-1) {
+                if ((sortOper.getRequestedParallelism() == -1 && pc.defaultParallel == -1) || TezCompilerUtil.isIntermediateReducer(sortOper)) {
                     // set estimate parallelism for order by/skewed join to sampler parallelism
                     // that include:
                     // 1. sort operator
@@ -125,6 +155,7 @@ public class ParallelismSetter extends T
                     ParallelConstantVisitor visitor =
                             new ParallelConstantVisitor(sampleAggregationOper.plan, parallelism);
                     visitor.visit();
+                    sampleAggregationOper.setNeedEstimatedQuantile(true);
                 }
             }
 
@@ -139,14 +170,4 @@ public class ParallelismSetter extends T
         }
     }
 
-    private int estimateParallelism(TezOperPlan tezPlan, TezOperator tezOp) throws IOException {
-
-        TezParallelismEstimator estimator = conf.get(PigConfiguration.REDUCER_ESTIMATOR_KEY) == null ? new TezOperDependencyParallelismEstimator()
-                : PigContext.instantiateObjectFromParams(conf,
-                        PigConfiguration.REDUCER_ESTIMATOR_KEY, PigConfiguration.REDUCER_ESTIMATOR_ARG_KEY,
-                        TezParallelismEstimator.class);
-
-        int numberOfReducers = estimator.estimateParallelism(tezPlan, tezOp, conf);
-        return numberOfReducers;
-    }
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java?rev=1627376&r1=1627375&r2=1627376&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java Wed Sep 24 17:55:34 2014
@@ -42,7 +42,8 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POLocalRearrangeTez;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POValueOutputTez;
-import org.apache.pig.backend.hadoop.executionengine.util.ParallelConstantVisitor;
+import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil;
+import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.plan.DepthFirstWalker;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
@@ -64,6 +65,15 @@ public class TezOperDependencyParallelis
     static final double DEFAULT_FILTER_FACTOR = 0.7;
     static final double DEFAULT_LIMIT_FACTOR = 0.1;
 
+    static final int DEFAULT_MAX_INTERMEDIATE_REDUCER_COUNT_PARAM = 2999;
+
+    private PigContext pc;
+
+    @Override
+    public void setPigContext(PigContext pc) {
+        this.pc = pc;
+    }
+
     @Override
     public int estimateParallelism(TezOperPlan plan, TezOperator tezOper, Configuration conf) throws IOException {
 
@@ -71,11 +81,13 @@ public class TezOperDependencyParallelis
             return -1;
         }
 
+        boolean intermediateReducer = TezCompilerUtil.isIntermediateReducer(tezOper);
+
         maxTaskCount = conf.getInt(PigReducerEstimator.MAX_REDUCER_COUNT_PARAM,
                 PigReducerEstimator.DEFAULT_MAX_REDUCER_COUNT_PARAM);
 
         // If parallelism is set explicitly, respect it
-        if (tezOper.getRequestedParallelism()!=-1) {
+        if (!intermediateReducer && tezOper.getRequestedParallelism()!=-1) {
             return tezOper.getRequestedParallelism();
         }
 
@@ -115,28 +127,26 @@ public class TezOperDependencyParallelis
         }
 
         int roundedEstimatedParallelism = (int)Math.ceil(estimatedParallelism);
-        if (tezOper.isSampler()) {
-            TezOperator sampleAggregationOper = null;
-            TezOperator rangePartionerOper = null;
-            TezOperator sortOper = null;
-            for (TezOperator succ : plan.getSuccessors(tezOper)) {
-                if (succ.isSampleAggregation()) {
-                    sampleAggregationOper = succ;
-                } else if (succ.isSampleBasedPartitioner()) {
-                    rangePartionerOper = succ;
-                }
-            }
-            sortOper = plan.getSuccessors(rangePartionerOper).get(0);
 
-            if (sortOper.getRequestedParallelism()!=-1) {
-
-                ParallelConstantVisitor visitor =
-                        new ParallelConstantVisitor(sampleAggregationOper.plan, roundedEstimatedParallelism);
-                visitor.visit();
-            }
+        if (intermediateReducer) {
+            // Estimated reducers should not be more than the configured limit
+            roundedEstimatedParallelism = Math.min(roundedEstimatedParallelism, Math.max(DEFAULT_MAX_INTERMEDIATE_REDUCER_COUNT_PARAM, maxTaskCount));
+            int userSpecifiedParallelism = pc.defaultParallel;
+            if (tezOper.getRequestedParallelism() != -1) {
+                userSpecifiedParallelism = tezOper.getRequestedParallelism();
+            }
+            int intermediateParallelism = Math.max(userSpecifiedParallelism, roundedEstimatedParallelism);
+            if (userSpecifiedParallelism != -1 && intermediateParallelism > (2 * userSpecifiedParallelism)) {
+                // Estimated reducers shall not be more than 2x of requested parallelism
+                // when we are overriding user specified values
+                intermediateParallelism = 2 * userSpecifiedParallelism;
+            }
+            roundedEstimatedParallelism = intermediateParallelism;
+        } else {
+            roundedEstimatedParallelism = Math.min(roundedEstimatedParallelism, maxTaskCount);
         }
 
-        return Math.min(roundedEstimatedParallelism, maxTaskCount);
+        return roundedEstimatedParallelism;
     }
 
     private static TezOperator getPredecessorWithKey(TezOperPlan plan, TezOperator tezOper, String inputKey) {

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezParallelismEstimator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezParallelismEstimator.java?rev=1627376&r1=1627375&r2=1627376&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezParallelismEstimator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezParallelismEstimator.java Wed Sep 24 17:55:34 2014
@@ -22,7 +22,12 @@ import java.io.IOException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator;
+import org.apache.pig.impl.PigContext;
 
 public interface TezParallelismEstimator {
-    public int estimateParallelism(TezOperPlan plan, TezOperator tezOper, Configuration conf) throws IOException;
+
+    public void setPigContext(PigContext pc);
+
+    public int estimateParallelism(TezOperPlan plan, TezOperator tezOper,
+            Configuration conf) throws IOException;
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java?rev=1627376&r1=1627375&r2=1627376&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java Wed Sep 24 17:55:34 2014
@@ -2,6 +2,7 @@ package org.apache.pig.backend.hadoop.ex
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.LinkedList;
 import java.util.List;
 
 import org.apache.pig.PigException;
@@ -12,6 +13,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezEdgeDescriptor;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
 import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator;
@@ -25,6 +27,7 @@ import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.VisitorException;
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 import org.apache.tez.runtime.library.input.UnorderedKVInput;
 import org.apache.tez.runtime.library.output.UnorderedKVOutput;
@@ -172,4 +175,19 @@ public class TezCompilerUtil {
         edge.setIntermediateOutputValueClass(TUPLE_CLASS);
     }
 
+    /**
+     * Returns true if there are no loads or stores in a TezOperator.
+     * To be called only after LoaderProcessor is called
+     */
+    static public boolean isIntermediateReducer(TezOperator tezOper) throws VisitorException {
+        boolean intermediateReducer = false;
+        LinkedList<POStore> stores = PlanHelper.getPhysicalOperators(tezOper.plan, POStore.class);
+        // Not map and not final reducer
+        if (stores.size() <= 0 &&
+                (tezOper.getLoaderInfo().getLoads() == null || tezOper.getLoaderInfo().getLoads().size() <= 0)) {
+            intermediateReducer = true;
+        }
+        return intermediateReducer;
+    }
+
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/ParallelConstantVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/ParallelConstantVisitor.java?rev=1627376&r1=1627375&r2=1627376&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/ParallelConstantVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/ParallelConstantVisitor.java Wed Sep 24 17:55:34 2014
@@ -38,18 +38,16 @@ public class ParallelConstantVisitor ext
 
     @Override
     public void visitConstant(ConstantExpression cnst) throws VisitorException {
-        if (cnst.getRequestedParallelism() == -1) {
-            Object obj = cnst.getValue();
-            if (obj instanceof Integer) {
-                if (replaced) {
-                    // sample job should have only one ConstantExpression
-                    throw new VisitorException("Invalid reduce plan: more " +
-                            "than one ConstantExpression found in sampling job");
-                }
-                cnst.setValue(rp);
-                cnst.setRequestedParallelism(rp);
-                replaced = true;
+        Object obj = cnst.getValue();
+        if (obj instanceof Integer) {
+            if (replaced) {
+                // sample job should have only one ConstantExpression
+                throw new VisitorException("Invalid reduce plan: more " +
+                        "than one ConstantExpression found in sampling job");
             }
+            cnst.setValue(rp);
+            cnst.setRequestedParallelism(rp);
+            replaced = true;
         }
     }
 }

Modified: pig/trunk/src/org/apache/pig/impl/PigImplConstants.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/PigImplConstants.java?rev=1627376&r1=1627375&r2=1627376&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/PigImplConstants.java (original)
+++ pig/trunk/src/org/apache/pig/impl/PigImplConstants.java Wed Sep 24 17:55:34 2014
@@ -51,4 +51,11 @@ public class PigImplConstants {
      * Indicate the split index of the task. Used by merge cogroup
      */
     public static final String PIG_SPLIT_INDEX = "pig.split.index";
+
+    /**
+     * Parallelism for the reducer
+     */
+    public static final String REDUCER_DEFAULT_PARALLELISM = "pig.info.reducers.default.parallel";
+    public static final String REDUCER_REQUESTED_PARALLELISM = "pig.info.reducers.requested.parallel";
+    public static final String REDUCER_ESTIMATED_PARALLELISM = "pig.info.reducers.estimated.parallel";
 }

Modified: pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java?rev=1627376&r1=1627375&r2=1627376&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java (original)
+++ pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java Wed Sep 24 17:55:34 2014
@@ -478,15 +478,15 @@ public class FileLocalizer {
      * since resourthPath should be available in the entire session
      *
      * @param pigContext
-     * @return
+     * @return temporary resource path
      * @throws DataStorageException
      */
-    public static synchronized ContainerDescriptor getTemporaryResourcePath(final PigContext pigContext)
+    public static synchronized Path getTemporaryResourcePath(final PigContext pigContext)
             throws DataStorageException {
         if (resourcePath == null) {
             resourcePath = getTempContainer(pigContext);
         }
-        return resourcePath;
+        return ((HPath)resourcePath).getPath();
     }
 
     private static synchronized ContainerDescriptor getTempContainer(final PigContext pigContext)
@@ -800,7 +800,7 @@ public class FileLocalizer {
                 && uri.getScheme() == null )||
                 // For Windows local files
                 (uri.getScheme() == null && uri.getPath().matches("^/[A-Za-z]:.*")) ||
-                (uri.getScheme() != null && uri.getScheme().equals("local")) 
+                (uri.getScheme() != null && uri.getScheme().equals("local"))
             ) {
             srcFs = localFs;
         } else {

Modified: pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezStats.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezStats.java?rev=1627376&r1=1627375&r2=1627376&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezStats.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezStats.java Wed Sep 24 17:55:34 2014
@@ -230,14 +230,8 @@ public class TezStats extends PigStats {
         stats.setId(tezOpName);
         stats.setSuccessful(succeeded);
         stats.setParallelism(parallelism);
-        if (map == null) {
-            if (stats.hasLoadOrStore()) {
-                LOG.warn("Unable to get input(s)/output(s) of the job");
-            }
-        } else {
-            stats.addInputStatistics(map);
-            stats.addOutputStatistics(map);
-        }
+        stats.addInputStatistics(map);
+        stats.addOutputStatistics(map);
     }
 
     private static String getDisplayString(TezJob tezJob) {

Modified: pig/trunk/test/e2e/pig/tests/bigdata.conf
URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/tests/bigdata.conf?rev=1627376&r1=1627375&r2=1627376&view=diff
==============================================================================
--- pig/trunk/test/e2e/pig/tests/bigdata.conf (original)
+++ pig/trunk/test/e2e/pig/tests/bigdata.conf Wed Sep 24 17:55:34 2014
@@ -33,6 +33,7 @@ $cfg = {
 			{
 			'num' => 1,
             ,'floatpostprocess' => 1
+            ,'java_params' => ['-Dpig.tez.auto.parallelism=false']
             ,'delimiter' => '	',
 			'pig' => q\a = load ':INPATH:/singlefile/studenttab20m' as (name, age, gpa);
 b = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions);

Modified: pig/trunk/test/org/apache/pig/test/TestAlgebraicEval.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestAlgebraicEval.java?rev=1627376&r1=1627375&r2=1627376&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestAlgebraicEval.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestAlgebraicEval.java Wed Sep 24 17:55:34 2014
@@ -63,7 +63,7 @@ public class TestAlgebraicEval {
     public void testGroupCountWithMultipleFields() throws Throwable {
         File tmpFile = File.createTempFile("test", "txt");
         for (int k = 0; k < nullFlags.length; k++) {
-            System.err.println("Running testGroupCountWithMultipleFields with nullFlags set to " + nullFlags[k]);
+            System.out.println("Running testGroupCountWithMultipleFields with nullFlags set to " + nullFlags[k]);
             // flag to indicate if both the keys forming
             // the group key are null
             int groupKeyWithNulls = 0;

Modified: pig/trunk/test/org/apache/pig/test/TestForEachNestedPlan.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestForEachNestedPlan.java?rev=1627376&r1=1627375&r2=1627376&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestForEachNestedPlan.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestForEachNestedPlan.java Wed Sep 24 17:55:34 2014
@@ -59,7 +59,7 @@ public class TestForEachNestedPlan {
     @Test
     public void testInnerOrderBy() throws Exception {
         for (int i = 0; i < nullFlags.length; i++) {
-            System.err.println("Running testInnerOrderBy with nullFlags set to :"
+            System.out.println("Running testInnerOrderBy with nullFlags set to :"
                             + nullFlags[i]);
             File tmpFile = genDataSetFile1(nullFlags[i]);
             pig.registerQuery("a = load '"

Modified: pig/trunk/test/tez-tests
URL: http://svn.apache.org/viewvc/pig/trunk/test/tez-tests?rev=1627376&r1=1627375&r2=1627376&view=diff
==============================================================================
--- pig/trunk/test/tez-tests (original)
+++ pig/trunk/test/tez-tests Wed Sep 24 17:55:34 2014
@@ -1,3 +1,8 @@
+**/TestAccumuloPigCluster.java
+**/TestBigTypeSort.java
+**/TestCurrentTime.java
+**/TestInvokerGenerator.java
+**/TestStreamingUDF.java
 **/TestAccumulator.java
 **/TestAlgebraicEval.java
 **/TestBZip.java
@@ -11,11 +16,11 @@
 **/TestCustomPartitioner.java
 **/TestEvalPipeline.java
 **/TestEvalPipeline2.java
+**/TestFRJoin.java
+**/TestFRJoinNullValue.java
 **/TestFilterUDF.java
 **/TestFinish.java
 **/TestForEachNestedPlan.java
-**/TestFRJoin.java
-**/TestFRJoinNullValue.java
 **/TestGrunt.java
 **/TestImplicitSplit.java
 **/TestInputOutputMiniClusterFileValidator.java
@@ -29,11 +34,14 @@
 **/TestMapReduce.java
 **/TestMapSideCogroup.java
 **/TestMapReduce2.java
+**/TestMergeJoin.java
 **/TestMergeJoinOuter.java
+**/TestNativeMapReduce.java
 **/TestNestedForeach.java
 **/TestNewPlanImplicitSplit.java
 **/TestParser.java
 **/TestPigContext.java
+**/TestPigProgressReporting.java
 **/TestPigServer.java
 **/TestPigServerWithMacros.java
 **/TestPigSplit.java
@@ -52,22 +60,14 @@
 **/TestStoreInstances.java
 **/TestStoreOld.java
 **/TestStreaming.java
-**/TestStreamingUDF.java
 **/TestToolsPigServer.java
 **/TestUDF.java
 **/TestUDFContext.java
+**/TestGroupConstParallelTez.java
+**/TestJobSubmissionTez.java
+**/TestLoaderStorerShipCacheFilesTez.java
 **/TestSecondarySortTez.java
 **/TestTezAutoParallelism.java
 **/TestTezCompiler.java
 **/TestTezJobControlCompiler.java
 **/TestTezLauncher.java
-**/TestAccumuloPigCluster.java
-**/TestBigTypeSort.java
-**/TestCurrentTime.java
-**/TestInvokerGenerator.java
-**/TestGroupConstParallelTez.java
-**/TestJobSubmissionTez.java
-**/TestMergeJoin.java
-**/TestNativeMapReduce.java
-**/TestPigProgressReporting.java
-**/TestLoaderStorerShipCacheFilesTez.java