You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2014/11/01 00:20:56 UTC

svn commit: r1635881 - in /pig/branches/branch-0.14: ./ shims/test/hadoop23/org/apache/pig/test/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/ src/o...

Author: rohini
Date: Fri Oct 31 23:20:55 2014
New Revision: 1635881

URL: http://svn.apache.org/r1635881
Log:
PIG-4259: Fix few issues related to Union, CROSS and auto parallelism in Tez (rohini)

Modified:
    pig/branches/branch-0.14/CHANGES.txt
    pig/branches/branch-0.14/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java
    pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
    pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java
    pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java
    pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java
    pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
    pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java
    pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
    pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperPlan.java
    pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
    pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/CombinerOptimizer.java
    pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
    pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
    pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java
    pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
    pig/branches/branch-0.14/src/org/apache/pig/impl/builtin/GFCross.java
    pig/branches/branch-0.14/src/org/apache/pig/impl/util/Utils.java
    pig/branches/branch-0.14/test/e2e/pig/tests/nightly.conf
    pig/branches/branch-0.14/test/org/apache/pig/test/Util.java
    pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-8-OPTOFF.gld
    pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-8.gld
    pig/branches/branch-0.14/test/org/apache/pig/tez/TestTezAutoParallelism.java

Modified: pig/branches/branch-0.14/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/CHANGES.txt?rev=1635881&r1=1635880&r2=1635881&view=diff
==============================================================================
--- pig/branches/branch-0.14/CHANGES.txt (original)
+++ pig/branches/branch-0.14/CHANGES.txt Fri Oct 31 23:20:55 2014
@@ -97,6 +97,8 @@ OPTIMIZATIONS
  
 BUG FIXES
 
+PIG-4259: Fix few issues related to Union, CROSS and auto parallelism in Tez (rohini)
+
 PIG-4250: Fix Security Risks found by Coverity (daijy)
 
 PIG-4258: Fix several e2e tests on Windows (daijy)

Modified: pig/branches/branch-0.14/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java?rev=1635881&r1=1635880&r2=1635881&view=diff
==============================================================================
--- pig/branches/branch-0.14/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java (original)
+++ pig/branches/branch-0.14/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java Fri Oct 31 23:20:55 2014
@@ -96,8 +96,8 @@ public class TezMiniCluster extends Mini
             m_mr_conf = m_mr.getConfig();
             m_mr_conf.set(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
                     System.getProperty("java.class.path"));
-            m_mr_conf.set(MRJobConfig.MAP_JAVA_OPTS, "-Xmx2048m");
-            m_mr_conf.set(MRJobConfig.REDUCE_JAVA_OPTS, "-Xmx2048m");
+            m_mr_conf.set(MRJobConfig.MAP_JAVA_OPTS, "-Xmx512m");
+            m_mr_conf.set(MRJobConfig.REDUCE_JAVA_OPTS, "-Xmx512m");
 
             Configuration mapred_site = new Configuration(false);
             Configuration yarn_site = new Configuration(false);

Modified: pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java?rev=1635881&r1=1635880&r2=1635881&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java Fri Oct 31 23:20:55 2014
@@ -408,9 +408,9 @@ public abstract class PhysicalOperator e
     }
 
     public Result getNextDataBag() throws ExecException {
-        Result ret = null;
+        Result val = new Result();
         DataBag tmpBag = BagFactory.getInstance().newDefaultBag();
-        for (ret = getNextTuple(); ret.returnStatus != POStatus.STATUS_EOP; ret = getNextTuple()) {
+        for (Result ret = getNextTuple(); ret.returnStatus != POStatus.STATUS_EOP; ret = getNextTuple()) {
             if (ret.returnStatus == POStatus.STATUS_ERR) {
                 return ret;
             } else if (ret.returnStatus == POStatus.STATUS_NULL) {
@@ -419,9 +419,9 @@ public abstract class PhysicalOperator e
                 tmpBag.add((Tuple) ret.result);
             }
         }
-        ret.result = tmpBag;
-        ret.returnStatus = (tmpBag.size() == 0)? POStatus.STATUS_EOP : POStatus.STATUS_OK;
-        return ret;
+        val.result = tmpBag;
+        val.returnStatus = (tmpBag.size() == 0)? POStatus.STATUS_EOP : POStatus.STATUS_OK;
+        return val;
     }
 
     public Result getNextBigInteger() throws ExecException {

Modified: pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java?rev=1635881&r1=1635880&r2=1635881&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java Fri Oct 31 23:20:55 2014
@@ -114,14 +114,16 @@ public class POLimit extends PhysicalOpe
         }
         Result inp = null;
         while (true) {
+            // illustrator ignore LIMIT before the post processing
+            if ((illustrator == null || illustrator.getOriginalLimit() != -1) && soFar >= mLimit) {
+                inp = RESULT_EOP;
+                break;
+            }
             inp = processInput();
             if (inp.returnStatus == POStatus.STATUS_EOP || inp.returnStatus == POStatus.STATUS_ERR)
                 break;
 
             illustratorMarkup(inp.result, null, 0);
-            // illustrator ignore LIMIT before the post processing
-            if ((illustrator == null || illustrator.getOriginalLimit() != -1) && soFar>=mLimit)
-            	inp.returnStatus = POStatus.STATUS_EOP;
 
             soFar++;
             break;

Modified: pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java?rev=1635881&r1=1635880&r2=1635881&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java Fri Oct 31 23:20:55 2014
@@ -68,15 +68,36 @@ public class TezJob implements Runnable 
     private Map<String, Map<String, Map<String, Long>>> vertexCounters;
     // Timer for DAG status reporter
     private Timer timer;
+    private TezJobConfig tezJobConf;
 
-    public TezJob(TezConfiguration conf, DAG dag, Map<String, LocalResource> requestAMResources)
-            throws IOException {
+    public TezJob(TezConfiguration conf, DAG dag,
+            Map<String, LocalResource> requestAMResources,
+            int estimatedTotalParallelism) throws IOException {
         this.conf = conf;
         this.dag = dag;
         this.requestAMResources = requestAMResources;
         this.reuseSession = conf.getBoolean(PigConfiguration.TEZ_SESSION_REUSE, true);
         this.statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS);
         this.vertexCounters = Maps.newHashMap();
+        tezJobConf = new TezJobConfig(estimatedTotalParallelism);
+    }
+
+    static class TezJobConfig {
+
+        private int estimatedTotalParallelism = -1;
+
+        public TezJobConfig(int estimatedTotalParallelism) {
+            this.estimatedTotalParallelism = estimatedTotalParallelism;
+        }
+
+        public int getEstimatedTotalParallelism() {
+            return estimatedTotalParallelism;
+        }
+
+        public void setEstimatedTotalParallelism(int estimatedTotalParallelism) {
+            this.estimatedTotalParallelism = estimatedTotalParallelism;
+        }
+
     }
 
     public DAG getDAG() {
@@ -129,7 +150,8 @@ public class TezJob implements Runnable 
     @Override
     public void run() {
         try {
-            tezClient = TezSessionManager.getClient(conf, requestAMResources, dag.getCredentials());
+            tezClient = TezSessionManager.getClient(conf, requestAMResources,
+                    dag.getCredentials(), tezJobConf);
             log.info("Submitting DAG " + dag.getName());
             dagClient = tezClient.submitDAG(dag);
             appId = tezClient.getAppMasterApplicationId();
@@ -145,7 +167,7 @@ public class TezJob implements Runnable 
 
         timer = new Timer();
         timer.schedule(new DAGStatusReporter(), 1000, conf.getLong(
-                PigConfiguration.TEZ_DAG_STATUS_REPORT_INTERVAL, 10) * 1000);
+                PigConfiguration.TEZ_DAG_STATUS_REPORT_INTERVAL, 20) * 1000);
 
         while (true) {
             try {
@@ -156,6 +178,7 @@ public class TezJob implements Runnable 
             }
 
             if (dagStatus.isCompleted()) {
+                log.info("DAG Status: " + dagStatus);
                 dagCounters = dagStatus.getDAGCounters();
                 collectVertexCounters();
                 TezSessionManager.freeSession(tezClient);
@@ -182,9 +205,16 @@ public class TezJob implements Runnable 
     }
 
     private class DAGStatusReporter extends TimerTask {
+
+        private final String LINE_SEPARATOR = System.getProperty("line.separator");
+
         @Override
         public void run() {
-            log.info("DAG Status: " + dagStatus);
+            String msg = "status=" + dagStatus.getState()
+              + ", progress=" + dagStatus.getDAGProgress()
+              + ", diagnostics="
+              + StringUtils.join(dagStatus.getDiagnostics(), LINE_SEPARATOR);
+            log.info("DAG Status: " + msg);
         }
     }
 
@@ -193,6 +223,10 @@ public class TezJob implements Runnable 
             String name = v.getName();
             try {
                 VertexStatus s = dagClient.getVertexStatus(name, statusGetOpts);
+                if (s == null) {
+                    log.info("Cannot retrieve counters for vertex " + name);
+                    continue;
+                }
                 TezCounters counters = s.getVertexCounters();
                 Map<String, Map<String, Long>> grpCounters = Maps.newHashMap();
                 Iterator<CounterGroup> grpIt = counters.iterator();

Modified: pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java?rev=1635881&r1=1635880&r2=1635881&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java Fri Oct 31 23:20:55 2014
@@ -107,7 +107,7 @@ public class TezJobCompiler {
                 log.info("Local resource: " + entry.getKey());
             }
             DAG tezDag = buildDAG(tezPlan, localResources);
-            return new TezJob(tezConf, tezDag, localResources);
+            return new TezJob(tezConf, tezDag, localResources, tezPlan.getEstimatedTotalParallelism());
         } catch (Exception e) {
             int errCode = 2017;
             String msg = "Internal error creating job configuration.";

Modified: pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java?rev=1635881&r1=1635880&r2=1635881&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java Fri Oct 31 23:20:55 2014
@@ -367,6 +367,7 @@ public class TezLauncher extends Launche
 
             ParallelismSetter parallelismSetter = new ParallelismSetter(tezPlan, pc);
             parallelismSetter.visit();
+            tezPlan.setEstimatedParallelism(parallelismSetter.getEstimatedTotalParallelism());
         }
     }
 

Modified: pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java?rev=1635881&r1=1635880&r2=1635881&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java Fri Oct 31 23:20:55 2014
@@ -29,8 +29,10 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezJob.TezJobConfig;
 import org.apache.pig.backend.hadoop.executionengine.tez.util.MRToTezHelper;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.util.Utils;
 import org.apache.tez.client.TezAppMasterStatus;
 import org.apache.tez.client.TezClient;
 import org.apache.tez.dag.api.TezConfiguration;
@@ -80,9 +82,11 @@ public class TezSessionManager {
     private static List<SessionInfo> sessionPool = new ArrayList<SessionInfo>();
 
     private static SessionInfo createSession(Configuration conf,
-            Map<String, LocalResource> requestedAMResources, Credentials creds)
-            throws TezException, IOException, InterruptedException {
+            Map<String, LocalResource> requestedAMResources, Credentials creds,
+            TezJobConfig tezJobConf) throws TezException, IOException,
+            InterruptedException {
         TezConfiguration amConf = MRToTezHelper.getDAGAMConfFromMRConf(conf);
+        adjustAMConfig(amConf, tezJobConf);
         String jobName = conf.get(PigContext.JOB_NAME, "pig");
         TezClient tezClient = TezClient.create(jobName, amConf, true, requestedAMResources, creds);
         tezClient.start();
@@ -94,6 +98,56 @@ public class TezSessionManager {
         return new SessionInfo(tezClient, requestedAMResources);
     }
 
+    private static void adjustAMConfig(TezConfiguration amConf, TezJobConfig tezJobConf) {
+        int requiredAMMaxHeap = -1;
+        int requiredAMResourceMB = -1;
+        int configuredAMMaxHeap = Utils.extractHeapSizeInMB(amConf.get(
+                TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS,
+                TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS_DEFAULT));
+        int configuredAMResourceMB = amConf.getInt(
+                TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB,
+                TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB_DEFAULT);
+
+        if (tezJobConf.getEstimatedTotalParallelism() > 0) {
+
+            int minAMMaxHeap = 3584;
+            int minAMResourceMB = 4096;
+
+            // Rough estimation. For 5K tasks 1G Xmx and 1.5G resource.mb
+            // Increment by 512 mb for every additional 5K tasks.
+            for (int taskCount = 30000; taskCount >= 5000; taskCount-=5000) {
+                if (tezJobConf.getEstimatedTotalParallelism() > taskCount) {
+                    requiredAMMaxHeap = minAMMaxHeap;
+                    requiredAMResourceMB = minAMResourceMB;
+                    break;
+                }
+                minAMMaxHeap = minAMMaxHeap - 512;
+                minAMResourceMB = minAMResourceMB - 512;
+            }
+
+            if (requiredAMResourceMB > -1 && configuredAMResourceMB < requiredAMResourceMB) {
+                amConf.setInt(TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB, requiredAMResourceMB);
+                log.info("Increasing "
+                        + TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB + " from "
+                        + configuredAMResourceMB + " to "
+                        + requiredAMResourceMB
+                        + " as the number of total estimated tasks is "
+                        + tezJobConf.getEstimatedTotalParallelism());
+
+                if (requiredAMMaxHeap > -1 && configuredAMMaxHeap < requiredAMMaxHeap) {
+                    amConf.set(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS,
+                            amConf.get(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS)
+                                    + " -Xmx" + requiredAMMaxHeap + "M");
+                    log.info("Increasing Tez AM Heap Size from "
+                            + configuredAMMaxHeap + "M to "
+                            + requiredAMMaxHeap
+                            + "M as the number of total estimated tasks is "
+                            + tezJobConf.getEstimatedTotalParallelism());
+                }
+            }
+        }
+    }
+
     private static boolean validateSessionResources(SessionInfo currentSession,
             Map<String, LocalResource> requestedAMResources)
             throws TezException, IOException {
@@ -106,7 +160,7 @@ public class TezSessionManager {
     }
 
     static TezClient getClient(Configuration conf, Map<String, LocalResource> requestedAMResources,
-            Credentials creds) throws TezException, IOException, InterruptedException {
+            Credentials creds, TezJobConfig tezJobConf) throws TezException, IOException, InterruptedException {
         List<SessionInfo> sessionsToRemove = new ArrayList<SessionInfo>();
         SessionInfo newSession = null;
         sessionPoolLock.readLock().lock();
@@ -135,7 +189,7 @@ public class TezSessionManager {
         // We cannot find available AM, create new one
         // Create session outside of locks so that getClient/freeSession is not
         // blocked for parallel embedded pig runs
-        newSession = createSession(conf, requestedAMResources, creds);
+        newSession = createSession(conf, requestedAMResources, creds, tezJobConf);
         newSession.inUse = true;
         sessionPoolLock.writeLock().lock();
         try {

Modified: pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java?rev=1635881&r1=1635880&r2=1635881&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java Fri Oct 31 23:20:55 2014
@@ -802,7 +802,7 @@ public class TezCompiler extends PhyPlan
 
             // If the parallelism of the current vertex is one and it doesn't do a LOAD (whose
             // parallelism is determined by the InputFormat), we don't need another vertex.
-            if (curTezOp.getRequestedParallelism() == 1) {
+            if (curTezOp.getRequestedParallelism() == 1 || curTezOp.isUnion()) {
                 boolean canStop = true;
                 for (PhysicalOperator planOp : curTezOp.plan.getRoots()) {
                     if (planOp instanceof POLoad) {
@@ -811,6 +811,12 @@ public class TezCompiler extends PhyPlan
                     }
                 }
                 if (canStop) {
+                    // Let's piggyback on the Union operator. UnionOptimizer
+                    // will skip this union operator as it is a waste to
+                    // do a vertex group followed by another limit in this case
+                    if (curTezOp.isUnion()) {
+                        curTezOp.setRequestedParallelism(1);
+                    }
                     curTezOp.setDontEstimateParallelism(true);
                     if (limitAfterSort) {
                         curTezOp.markLimitAfterSort();
@@ -916,7 +922,7 @@ public class TezCompiler extends PhyPlan
             TezCompilerUtil.setCustomPartitioner(op.getCustomPartitioner(), curTezOp);
             curTezOp.setRequestedParallelism(op.getRequestedParallelism());
             if (op.isCross()) {
-                curTezOp.setCrossKey(op.getOperatorKey().toString());
+                curTezOp.addCrossKey(op.getOperatorKey().toString());
             }
             phyToTezOpMap.put(op, curTezOp);
         } catch (Exception e) {

Modified: pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperPlan.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperPlan.java?rev=1635881&r1=1635880&r2=1635881&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperPlan.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperPlan.java Fri Oct 31 23:20:55 2014
@@ -52,9 +52,19 @@ public class TezOperPlan extends Operato
 
     private Map<String, Path> extraResources = new HashMap<String, Path>();
 
+    private int estimatedTotalParallelism = -1;
+
     public TezOperPlan() {
     }
 
+    public int getEstimatedTotalParallelism() {
+        return estimatedTotalParallelism;
+    }
+
+    public void setEstimatedParallelism(int estimatedTotalParallelism) {
+        this.estimatedTotalParallelism = estimatedTotalParallelism;
+    }
+
     @Override
     public String toString() {
         ByteArrayOutputStream baos = new ByteArrayOutputStream();

Modified: pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java?rev=1635881&r1=1635880&r2=1635881&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java Fri Oct 31 23:20:55 2014
@@ -74,6 +74,12 @@ public class TezOperator extends Operato
     // etc which should always be one
     private boolean dontEstimateParallelism = false;
 
+    // Override user specified intermediate parallelism for cases
+    // like skewed join followed by group by + combiner if estimation is higher
+    // In mapreduce group by + combiner runs in map phase and uses more maps (default 128MB per map)
+    // while skewed join reducers process more (default 1G per reducer) which makes MRR a disadvantage
+    private boolean overrideIntermediateParallelism = false;
+
     // This is the parallelism of the vertex, it take account of:
     // 1. default_parallel
     // 2. -1 parallelism for one_to_one edge
@@ -123,7 +129,7 @@ public class TezOperator extends Operato
     // If true, we will use secondary key sort in the job
     private boolean useSecondaryKey = false;
 
-    private String crossKey = null;
+    private List<String> crossKeys = null;
 
     private boolean useMRMapSettings = false;
 
@@ -270,6 +276,15 @@ public class TezOperator extends Operato
         this.dontEstimateParallelism = dontEstimateParallelism;
     }
 
+    public boolean isOverrideIntermediateParallelism() {
+        return overrideIntermediateParallelism;
+    }
+
+    public void setOverrideIntermediateParallelism(
+            boolean overrideIntermediateParallelism) {
+        this.overrideIntermediateParallelism = overrideIntermediateParallelism;
+    }
+
     public OperatorKey getSplitParent() {
         return splitParent;
     }
@@ -555,12 +570,15 @@ public class TezOperator extends Operato
         return combineSmallSplits;
     }
 
-    public void setCrossKey(String key) {
-        crossKey = key;
+    public void addCrossKey(String key) {
+        if (crossKeys == null) {
+            crossKeys = new ArrayList<String>();
+        }
+        crossKeys.add(key);
     }
 
-    public String getCrossKey() {
-        return crossKey;
+    public List<String> getCrossKeys() {
+        return crossKeys;
     }
 
     public boolean isUseMRMapSettings() {

Modified: pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/CombinerOptimizer.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/CombinerOptimizer.java?rev=1635881&r1=1635880&r2=1635881&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/CombinerOptimizer.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/CombinerOptimizer.java Fri Oct 31 23:20:55 2014
@@ -93,6 +93,12 @@ public class CombinerOptimizer extends T
             PhysicalPlan combinePlan = to.inEdges.get(from.getOperatorKey()).combinePlan;
             CombinerOptimizerUtil.addCombiner(rearrangePlan, to.plan, combinePlan, messageCollector, doMapAgg);
 
+            if(!combinePlan.isEmpty()) {
+                // Override the requested parallelism for intermediate reducers
+                // when combiners are involved so that there are more tasks doing the combine
+                from.setOverrideIntermediateParallelism(true);
+            }
+
         }
     }
 

Modified: pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java?rev=1635881&r1=1635880&r2=1635881&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java Fri Oct 31 23:20:55 2014
@@ -178,6 +178,15 @@ public class MultiQueryOptimizerTez exte
     }
 
     static public void addSubPlanPropertiesToParent(TezOperator parentOper, TezOperator subPlanOper) {
+        // Copy only map side properties. For eg: crossKeys.
+        // Do not copy reduce side specific properties. For eg: useSecondaryKey, segmentBelow, sortOrder, etc
+        if (subPlanOper.getCrossKeys() != null) {
+            for (String key : subPlanOper.getCrossKeys()) {
+                parentOper.addCrossKey(key);
+            }
+        }
+        parentOper.copyFeatures(subPlanOper, null);
+
         if (subPlanOper.getRequestedParallelism() > parentOper.getRequestedParallelism()) {
             parentOper.setRequestedParallelism(subPlanOper.getRequestedParallelism());
         }
@@ -189,6 +198,5 @@ public class MultiQueryOptimizerTez exte
                 parentOper.outEdges.put(entry.getKey(), entry.getValue());
             }
         }
-        parentOper.copyFeatures(subPlanOper, null);
     }
 }

Modified: pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java?rev=1635881&r1=1635880&r2=1635881&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java Fri Oct 31 23:20:55 2014
@@ -20,6 +20,8 @@ package org.apache.pig.backend.hadoop.ex
 import java.util.LinkedList;
 import java.util.Map;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.pig.PigConfiguration;
 import org.apache.pig.backend.executionengine.ExecException;
@@ -41,10 +43,13 @@ import org.apache.pig.impl.plan.VisitorE
 import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
 
 public class ParallelismSetter extends TezOpPlanVisitor {
+
+    private static final Log LOG = LogFactory.getLog(ParallelismSetter.class);
     private Configuration conf;
     private PigContext pc;
     private TezParallelismEstimator estimator;
     private boolean autoParallelismEnabled;
+    private int estimatedTotalParallelism = 0;
 
     public ParallelismSetter(TezOperPlan plan, PigContext pigContext) {
         super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan));
@@ -63,6 +68,10 @@ public class ParallelismSetter extends T
         }
     }
 
+    public int getEstimatedTotalParallelism() {
+        return estimatedTotalParallelism;
+    }
+
     @Override
     public void visitTezOp(TezOperator tezOp) throws VisitorException {
         if (tezOp instanceof NativeTezOper) {
@@ -100,6 +109,7 @@ public class ParallelismSetter extends T
                         tezOp.setRequestedParallelism(pred.getRequestedParallelism());
                         tezOp.setEstimatedParallelism(pred.getEstimatedParallelism());
                         isOneToOneParallelism = true;
+                        incrementTotalParallelism(tezOp, parallelism);
                         parallelism = -1;
                     }
                 }
@@ -109,66 +119,84 @@ public class ParallelismSetter extends T
                     } else if (pc.defaultParallel != -1) {
                         parallelism = pc.defaultParallel;
                     }
-                    if (autoParallelismEnabled &&
-                            ((parallelism == -1 || intermediateReducer) && !tezOp.isDontEstimateParallelism())) {
+                    boolean overrideRequestedParallelism = false;
+                    if (parallelism != -1
+                            && autoParallelismEnabled
+                            && intermediateReducer
+                            && !tezOp.isDontEstimateParallelism()
+                            && tezOp.isOverrideIntermediateParallelism()) {
+                        overrideRequestedParallelism = true;
+                    }
+                    if (parallelism == -1 || overrideRequestedParallelism) {
                         if (tezOp.getEstimatedParallelism() == -1) {
                             // Override user specified parallelism with the estimated value
                             // if it is intermediate reducer
                             parallelism = estimator.estimateParallelism(mPlan, tezOp, conf);
-                            tezOp.setEstimatedParallelism(parallelism);
+                            if (overrideRequestedParallelism) {
+                                tezOp.setRequestedParallelism(parallelism);
+                            } else {
+                                tezOp.setEstimatedParallelism(parallelism);
+                            }
                         } else {
                             parallelism = tezOp.getEstimatedParallelism();
                         }
                         if (tezOp.isGlobalSort() || tezOp.isSkewedJoin()) {
-                            // Vertex manager will set parallelism
-                            parallelism = -1;
+                            if (!overrideRequestedParallelism) {
+                                incrementTotalParallelism(tezOp, parallelism);
+                                // PartitionerDefinedVertexManager will determine parallelism.
+                                // So call setVertexParallelism with -1
+                                // setEstimatedParallelism still needs to have some positive value
+                                // so that TezDAGBuilder sets the PartitionerDefinedVertexManager
+                                parallelism = -1;
+                            } else {
+                                // We are overriding the parallelism. We need to update the
+                                // Constant value in sampleAggregator to same parallelism
+                                // Currently will happen when you have orderby or
+                                // skewed join followed by group by with combiner
+                                for (TezOperator pred : mPlan.getPredecessors(tezOp)) {
+                                    if (pred.isSampleBasedPartitioner()) {
+                                        for (TezOperator partitionerPred : mPlan.getPredecessors(pred)) {
+                                            if (partitionerPred.isSampleAggregation()) {
+                                                LOG.debug("Updating constant value to " + parallelism + " in " + partitionerPred.plan);
+                                                LOG.info("Increased requested parallelism of " + partitionerPred.getOperatorKey() + " to " + parallelism);
+                                                ParallelConstantVisitor visitor =
+                                                        new ParallelConstantVisitor(partitionerPred.plan, parallelism);
+                                                visitor.visit();
+                                                break;
+                                            }
+                                        }
+                                        break;
+                                    }
+                                }
+                            }
                         }
                     }
                 }
             }
 
-            // Once we decide the parallelism of the sampler, propagate to
-            // downstream operators if necessary
-            if (tezOp.isSampler() && autoParallelismEnabled) {
-                // There could be multiple sampler and share the same sample aggregation job
-                // and partitioner job
-                TezOperator sampleAggregationOper = null;
-                TezOperator sampleBasedPartionerOper = null;
-                TezOperator sortOper = null;
-                for (TezOperator succ : mPlan.getSuccessors(tezOp)) {
-                    if (succ.isVertexGroup()) {
-                        succ = mPlan.getSuccessors(succ).get(0);
-                    }
-                    if (succ.isSampleAggregation()) {
-                        sampleAggregationOper = succ;
-                    } else if (succ.isSampleBasedPartitioner()) {
-                        sampleBasedPartionerOper = succ;
-                    }
-                }
-                sortOper = mPlan.getSuccessors(sampleBasedPartionerOper).get(0);
-
-                if ((sortOper.getRequestedParallelism() == -1 && pc.defaultParallel == -1) || TezCompilerUtil.isIntermediateReducer(sortOper)) {
-                    // set estimate parallelism for order by/skewed join to sampler parallelism
-                    // that include:
-                    // 1. sort operator
-                    // 2. constant for sample aggregation oper
-                    sortOper.setEstimatedParallelism(parallelism);
-                    ParallelConstantVisitor visitor =
-                            new ParallelConstantVisitor(sampleAggregationOper.plan, parallelism);
-                    visitor.visit();
-                    sampleAggregationOper.setNeedEstimatedQuantile(true);
-                }
-            }
-
+            incrementTotalParallelism(tezOp, parallelism);
             tezOp.setVertexParallelism(parallelism);
 
-            if (tezOp.getCrossKey()!=null) {
-                pc.getProperties().put(PigImplConstants.PIG_CROSS_PARALLELISM + "." + tezOp.getCrossKey(),
-                        Integer.toString(tezOp.getVertexParallelism()));
+            // TODO: Fix case where vertex parallelism is -1 for auto parallelism with PartitionerDefinedVertexManager.
+            // i.e order by or skewed join followed by cross
+            if (tezOp.getCrossKeys() != null) {
+                for (String key : tezOp.getCrossKeys()) {
+                    pc.getProperties().put(PigImplConstants.PIG_CROSS_PARALLELISM + "." + key,
+                            Integer.toString(tezOp.getVertexParallelism()));
+                }
             }
         } catch (Exception e) {
             throw new VisitorException(e);
         }
     }
 
+    private void incrementTotalParallelism(TezOperator tezOp, int tezOpParallelism) {
+        if (tezOp.isVertexGroup()) {
+            return;
+        }
+        if (tezOpParallelism != -1) {
+            estimatedTotalParallelism += tezOpParallelism;
+        }
+    }
+
 }

Modified: pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java?rev=1635881&r1=1635880&r2=1635881&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java Fri Oct 31 23:20:55 2014
@@ -65,8 +65,6 @@ public class TezOperDependencyParallelis
     static final double DEFAULT_FILTER_FACTOR = 0.7;
     static final double DEFAULT_LIMIT_FACTOR = 0.1;
 
-    static final int DEFAULT_MAX_INTERMEDIATE_REDUCER_COUNT_PARAM = 2999;
-
     private PigContext pc;
 
     @Override
@@ -83,6 +81,8 @@ public class TezOperDependencyParallelis
 
         boolean intermediateReducer = TezCompilerUtil.isIntermediateReducer(tezOper);
 
+        // TODO: If map opts and reduce opts are same estimate higher parallelism
+        // for tasks based on the count of number of map tasks else be conservative as now
         maxTaskCount = conf.getInt(PigReducerEstimator.MAX_REDUCER_COUNT_PARAM,
                 PigReducerEstimator.DEFAULT_MAX_REDUCER_COUNT_PARAM);
 
@@ -117,7 +117,10 @@ public class TezOperDependencyParallelis
                             + ", effective parallelism for predecessor " + tezOper.getOperatorKey().toString()
                             + " is -1");
                 }
-                if (pred.plan!=null) { // pred.plan can be null if it is a VertexGroup
+
+                //For cases like Union we can just limit to sum of pred vertices parallelism
+                boolean applyFactor = !tezOper.isUnion();
+                if (pred.plan!=null && applyFactor) { // pred.plan can be null if it is a VertexGroup
                     TezParallelismFactorVisitor parallelismFactorVisitor = new TezParallelismFactorVisitor(pred.plan, tezOper.getOperatorKey().toString());
                     parallelismFactorVisitor.visit();
                     predParallelism = predParallelism * parallelismFactorVisitor.getFactor();
@@ -128,17 +131,18 @@ public class TezOperDependencyParallelis
 
         int roundedEstimatedParallelism = (int)Math.ceil(estimatedParallelism);
 
-        if (intermediateReducer) {
+        if (intermediateReducer && tezOper.isOverrideIntermediateParallelism()) {
             // Estimated reducers should not be more than the configured limit
-            roundedEstimatedParallelism = Math.min(roundedEstimatedParallelism, Math.max(DEFAULT_MAX_INTERMEDIATE_REDUCER_COUNT_PARAM, maxTaskCount));
+            roundedEstimatedParallelism = Math.min(roundedEstimatedParallelism, maxTaskCount);
             int userSpecifiedParallelism = pc.defaultParallel;
             if (tezOper.getRequestedParallelism() != -1) {
                 userSpecifiedParallelism = tezOper.getRequestedParallelism();
             }
             int intermediateParallelism = Math.max(userSpecifiedParallelism, roundedEstimatedParallelism);
-            if (userSpecifiedParallelism != -1 && intermediateParallelism > (2 * userSpecifiedParallelism)) {
+            if (userSpecifiedParallelism != -1 &&
+                    (intermediateParallelism > 200 && intermediateParallelism > (2 * userSpecifiedParallelism))) {
                 // Estimated reducers shall not be more than 2x of requested parallelism
-                // when we are overriding user specified values
+                // if greater than 200 and we are overriding user specified values
                 intermediateParallelism = 2 * userSpecifiedParallelism;
             }
             roundedEstimatedParallelism = intermediateParallelism;

Modified: pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java?rev=1635881&r1=1635880&r2=1635881&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java Fri Oct 31 23:20:55 2014
@@ -73,6 +73,10 @@ public class UnionOptimizer extends TezO
             return;
         }
 
+        if((tezOp.isLimit() || tezOp.isLimitAfterSort()) && tezOp.getRequestedParallelism() == 1) {
+            return;
+        }
+
         TezOperator unionOp = tezOp;
         String unionOpKey = unionOp.getOperatorKey().toString();
         String scope = unionOp.getOperatorKey().scope;
@@ -264,9 +268,16 @@ public class UnionOptimizer extends TezO
     }
 
     private void copyOperatorProperties(TezOperator pred, TezOperator unionOp) {
-        pred.setUseSecondaryKey(unionOp.isUseSecondaryKey());
         pred.UDFs.addAll(unionOp.UDFs);
         pred.scalars.addAll(unionOp.scalars);
+        // Copy only map side properties. For eg: crossKeys.
+        // Do not copy reduce side specific properties. For eg: useSecondaryKey, segmentBelow, sortOrder, etc
+        // Also ignore parallelism settings
+        if (unionOp.getCrossKeys() != null) {
+            for (String key : unionOp.getCrossKeys()) {
+                pred.addCrossKey(key);
+            }
+        }
         pred.copyFeatures(unionOp, Arrays.asList(new OPER_FEATURE[]{OPER_FEATURE.UNION}));
     }
 

Modified: pig/branches/branch-0.14/src/org/apache/pig/impl/builtin/GFCross.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/impl/builtin/GFCross.java?rev=1635881&r1=1635880&r2=1635881&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/impl/builtin/GFCross.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/impl/builtin/GFCross.java Fri Oct 31 23:20:55 2014
@@ -60,6 +60,9 @@ public class GFCross extends EvalFunc<Da
                     throw new IOException("Unable to get parallelism hint from job conf");
                 }
                 parallelism = Integer.valueOf(s);
+                if (parallelism < 0) {
+                    throw new IOException(PigImplConstants.PIG_CROSS_PARALLELISM + "." + crossKey  + " was " + parallelism);
+                }
             }
 
             numInputs = (Integer)input.get(0);

Modified: pig/branches/branch-0.14/src/org/apache/pig/impl/util/Utils.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/impl/util/Utils.java?rev=1635881&r1=1635880&r2=1635881&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/impl/util/Utils.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/impl/util/Utils.java Fri Oct 31 23:20:55 2014
@@ -23,10 +23,8 @@ import java.io.FileInputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.OutputStream;
 import java.io.PrintStream;
 import java.io.SequenceInputStream;
-import java.net.URL;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Comparator;
@@ -42,12 +40,10 @@ import java.util.regex.Pattern;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.filecache.DistributedCache;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.compress.BZip2Codec;
 import org.apache.hadoop.io.compress.GzipCodec;
 import org.apache.hadoop.mapred.JobConf;
@@ -65,7 +61,6 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.PigImplConstants;
 import org.apache.pig.impl.io.InterStorage;
 import org.apache.pig.impl.io.ReadToEndLoader;
@@ -85,13 +80,15 @@ import com.google.common.primitives.Long
  */
 public class Utils {
     private static final Log log = LogFactory.getLog(Utils.class);
-    
+    private static final Pattern JAVA_MAXHEAPSIZE_PATTERN = Pattern.compile("-Xmx(([0-9]+)[mMgG])");
+
+
     /**
      * This method checks whether JVM vendor is IBM
      * @return true if IBM JVM is being used
      * false otherwise
      */
-    public static boolean isVendorIBM() {    	
+    public static boolean isVendorIBM() {
     	  return System.getProperty("java.vendor").contains("IBM");
     }
 
@@ -251,7 +248,7 @@ public class Utils {
     }
 
     public static LogicalSchema parseSchema(String schemaString) throws ParserException {
-        QueryParserDriver queryParser = new QueryParserDriver( new PigContext(), 
+        QueryParserDriver queryParser = new QueryParserDriver( new PigContext(),
                 "util", new HashMap<String, String>() ) ;
         LogicalSchema schema = queryParser.parseSchema(schemaString);
         return schema;
@@ -262,7 +259,7 @@ public class Utils {
      * field. This will be called only when PigStorage is invoked with
      * '-tagFile' or '-tagPath' option and the schema file is present to be
      * loaded.
-     * 
+     *
      * @param schema
      * @param fieldName
      * @return ResourceSchema
@@ -396,7 +393,7 @@ public class Utils {
         } else if (TEMPFILE_STORAGE.TFILE.lowerName().equals(tmpFileCompressionStorage)) {
             return TEMPFILE_STORAGE.TFILE;
         } else {
-            throw new IllegalArgumentException("Unsupported storage format " + tmpFileCompressionStorage + 
+            throw new IllegalArgumentException("Unsupported storage format " + tmpFileCompressionStorage +
                     ". Should be one of " + Arrays.toString(TEMPFILE_STORAGE.values()));
         }
     }
@@ -595,7 +592,7 @@ public class Utils {
             // substitute
             eval = eval.substring(0, match.start())+val+eval.substring(match.end());
         }
-        throw new IllegalStateException("Variable substitution depth too large: " 
+        throw new IllegalStateException("Variable substitution depth too large: "
                 + MAX_SUBST + " " + expr);
     }
 
@@ -661,4 +658,27 @@ public class Utils {
       return null;
 
     }
+
+    public static int extractHeapSizeInMB(String input) {
+        int ret = 0;
+        if(input == null || input.equals(""))
+            return ret;
+        Matcher m = JAVA_MAXHEAPSIZE_PATTERN.matcher(input);
+        String heapStr = null;
+        String heapNum = null;
+        // Grabs the last match which takes effect (in case that multiple Xmx options specified)
+        while (m.find()) {
+            heapStr = m.group(1);
+            heapNum = m.group(2);
+        }
+        if (heapStr != null) {
+            // when Xmx specified in Gigabyte
+            if(heapStr.endsWith("g") || heapStr.endsWith("G")) {
+                ret = Integer.parseInt(heapNum) * 1024;
+            } else {
+                ret = Integer.parseInt(heapNum);
+            }
+        }
+        return ret;
+    }
 }

Modified: pig/branches/branch-0.14/test/e2e/pig/tests/nightly.conf
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/test/e2e/pig/tests/nightly.conf?rev=1635881&r1=1635880&r2=1635881&view=diff
==============================================================================
--- pig/branches/branch-0.14/test/e2e/pig/tests/nightly.conf (original)
+++ pig/branches/branch-0.14/test/e2e/pig/tests/nightly.conf Fri Oct 31 23:20:55 2014
@@ -1414,6 +1414,7 @@ b = load ':INPATH:/singlefile/studentcol
 c = union a, b;
 d = order c by name PARALLEL 2;
 store d into ':OUTPATH:';\,
+            'sortArgs' => ['-t', '	', '-k', '1,1'],
             },
             {
             'num' => 5,
@@ -1504,6 +1505,38 @@ e = load ':INPATH:/singlefile/votertab10
 f = join d by group, e by name using 'replicated';
 store f into ':OUTPATH:';\,
             },
+            {  ## Secondary Key
+            'num' => 14,
+            'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age, gpa);
+b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name:chararray, age, gpa);
+c = group a by name;
+d = foreach c {
+    sorted = order a by name,age;
+    lmt = limit sorted 1;
+    generate lmt as c1;
+};
+e = foreach d generate flatten(c1) as (name:chararray, age, gpa);
+f = group b by name;
+g = foreach f {
+    sorted = order b by name,age;
+    lmt = limit sorted 1;
+    generate lmt as f1;
+};
+h = foreach g generate flatten(f1) as (name:chararray, age, gpa);
+i = union e, h;
+j = order i by name parallel 1;
+store j into ':OUTPATH:';\,
+            'sortArgs' => ['-t', '	', '-k', '1,1'],
+            },
+            {
+            'num' => 15,
+            'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa:float);
+b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name, age, gpa:float);
+c = filter a by gpa >= 4;
+d = cross a, c;
+e = union b, d;
+store e into ':OUTPATH:';\,
+            },
 		]
 		},
 		{

Modified: pig/branches/branch-0.14/test/org/apache/pig/test/Util.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/test/org/apache/pig/test/Util.java?rev=1635881&r1=1635880&r2=1635881&view=diff
==============================================================================
--- pig/branches/branch-0.14/test/org/apache/pig/test/Util.java (original)
+++ pig/branches/branch-0.14/test/org/apache/pig/test/Util.java Fri Oct 31 23:20:55 2014
@@ -37,6 +37,7 @@ import java.io.OutputStreamWriter;
 import java.io.PrintStream;
 import java.io.PrintWriter;
 import java.io.StringReader;
+import java.io.Writer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -53,10 +54,13 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
+import org.apache.log4j.Appender;
 import org.apache.log4j.FileAppender;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
+import org.apache.log4j.PatternLayout;
 import org.apache.log4j.SimpleLayout;
+import org.apache.log4j.WriterAppender;
 import org.apache.pig.ExecType;
 import org.apache.pig.ExecTypeProvider;
 import org.apache.pig.LoadCaster;
@@ -1367,4 +1371,18 @@ public class Util {
             return ExecTypeProvider.fromString("local");
         }
     }
+
+    public static void createLogAppender(Class clazz, String appenderName, Writer writer) {
+        Logger logger = Logger.getLogger(clazz);
+        WriterAppender writerAppender = new WriterAppender(new PatternLayout("%d [%t] %-5p %c %x - %m%n"), writer);
+        writerAppender.setName(appenderName);
+        logger.addAppender(writerAppender);
+    }
+
+    public static void removeLogAppender(Class clazz, String appenderName) {
+        Logger logger = Logger.getLogger(clazz);
+        Appender appender = logger.getAppender(appenderName);
+        appender.close();
+        logger.removeAppender(appenderName);
+    }
 }

Modified: pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-8-OPTOFF.gld
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-8-OPTOFF.gld?rev=1635881&r1=1635880&r2=1635881&view=diff
==============================================================================
--- pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-8-OPTOFF.gld (original)
+++ pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-8-OPTOFF.gld Fri Oct 31 23:20:55 2014
@@ -2,78 +2,70 @@
 # There are 1 DAGs in the session
 #--------------------------------------------------
 #--------------------------------------------------
-# TEZ DAG plan: scope-88
+# TEZ DAG plan: scope-75
 #--------------------------------------------------
-Tez vertex scope-70	->	Tez vertex scope-72,
-Tez vertex scope-72	->	Tez vertex scope-80,
-Tez vertex scope-75	->	Tez vertex scope-77,
-Tez vertex scope-77	->	Tez vertex scope-80,
-Tez vertex scope-80	->	Tez vertex scope-85,
-Tez vertex scope-85
+Tez vertex scope-61	->	Tez vertex scope-63,
+Tez vertex scope-63	->	Tez vertex scope-71,
+Tez vertex scope-66	->	Tez vertex scope-68,
+Tez vertex scope-68	->	Tez vertex scope-71,
+Tez vertex scope-71
 
-Tez vertex scope-70
+Tez vertex scope-61
 # Plan on vertex
-POValueOutputTez - scope-71	->	 [scope-72]
+POValueOutputTez - scope-62	->	 [scope-63]
 |
-|---Limit - scope-48
+|---Limit - scope-39
     |
-    |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-47
-Tez vertex scope-72
+    |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-38
+Tez vertex scope-63
 # Plan on vertex
-POValueOutputTez - scope-82	->	 [scope-80]
+POValueOutputTez - scope-73	->	 [scope-71]
 |
-|---a: New For Each(false,false)[bag] - scope-56
+|---a: New For Each(false,false)[bag] - scope-47
     |   |
-    |   Cast[int] - scope-51
+    |   Cast[int] - scope-42
     |   |
-    |   |---Project[bytearray][0] - scope-50
+    |   |---Project[bytearray][0] - scope-41
     |   |
-    |   Cast[chararray] - scope-54
+    |   Cast[chararray] - scope-45
     |   |
-    |   |---Project[bytearray][1] - scope-53
+    |   |---Project[bytearray][1] - scope-44
     |
-    |---Limit - scope-49
+    |---Limit - scope-40
         |
-        |---Limit - scope-74
+        |---Limit - scope-65
             |
-            |---POValueInputTez - scope-73	<-	 scope-70
-Tez vertex scope-75
+            |---POValueInputTez - scope-64	<-	 scope-61
+Tez vertex scope-66
 # Plan on vertex
-POValueOutputTez - scope-76	->	 [scope-77]
+POValueOutputTez - scope-67	->	 [scope-68]
 |
-|---Limit - scope-58
+|---Limit - scope-49
     |
-    |---b: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-57
-Tez vertex scope-77
+    |---b: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-48
+Tez vertex scope-68
 # Plan on vertex
-POValueOutputTez - scope-83	->	 [scope-80]
+POValueOutputTez - scope-74	->	 [scope-71]
 |
-|---c: New For Each(false,false)[bag] - scope-66
+|---c: New For Each(false,false)[bag] - scope-57
     |   |
-    |   Cast[int] - scope-61
+    |   Cast[int] - scope-52
     |   |
-    |   |---Project[bytearray][1] - scope-60
+    |   |---Project[bytearray][1] - scope-51
     |   |
-    |   Cast[chararray] - scope-64
+    |   Cast[chararray] - scope-55
     |   |
-    |   |---Project[bytearray][0] - scope-63
+    |   |---Project[bytearray][0] - scope-54
     |
-    |---Limit - scope-59
+    |---Limit - scope-50
         |
-        |---Limit - scope-79
+        |---Limit - scope-70
             |
-            |---POValueInputTez - scope-78	<-	 scope-75
-Tez vertex scope-80
+            |---POValueInputTez - scope-69	<-	 scope-66
+Tez vertex scope-71
 # Plan on vertex
-POValueOutputTez - scope-84	->	 [scope-85]
+d: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-60
 |
-|---d: Limit - scope-68
+|---d: Limit - scope-59
     |
-    |---POShuffledValueInputTez - scope-81	<-	 [scope-77, scope-72]
-Tez vertex scope-85
-# Plan on vertex
-d: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-69
-|
-|---d: Limit - scope-87
-    |
-    |---POValueInputTez - scope-86	<-	 scope-80
+    |---POShuffledValueInputTez - scope-72	<-	 [scope-68, scope-63]

Modified: pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-8.gld
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-8.gld?rev=1635881&r1=1635880&r2=1635881&view=diff
==============================================================================
--- pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-8.gld (original)
+++ pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-8.gld Fri Oct 31 23:20:55 2014
@@ -2,14 +2,13 @@
 # There are 1 DAGs in the session
 #--------------------------------------------------
 #--------------------------------------------------
-# TEZ DAG plan: scope-41
+# TEZ DAG plan: scope-37
 #--------------------------------------------------
 Tez vertex scope-23	->	Tez vertex scope-25,
-Tez vertex scope-25	->	Tez vertex group scope-42,
+Tez vertex scope-25	->	Tez vertex scope-33,
 Tez vertex scope-28	->	Tez vertex scope-30,
-Tez vertex scope-30	->	Tez vertex group scope-42,
-Tez vertex group scope-42	->	Tez vertex scope-38,
-Tez vertex scope-38
+Tez vertex scope-30	->	Tez vertex scope-33,
+Tez vertex scope-33
 
 Tez vertex scope-23
 # Plan on vertex
@@ -20,25 +19,23 @@ POValueOutputTez - scope-24	->	 [scope-2
     |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-0
 Tez vertex scope-25
 # Plan on vertex
-POValueOutputTez - scope-43	->	 [scope-38]
+POValueOutputTez - scope-35	->	 [scope-33]
 |
-|---d: Limit - scope-44
+|---a: New For Each(false,false)[bag] - scope-9
+    |   |
+    |   Cast[int] - scope-4
+    |   |
+    |   |---Project[bytearray][0] - scope-3
+    |   |
+    |   Cast[chararray] - scope-7
+    |   |
+    |   |---Project[bytearray][1] - scope-6
     |
-    |---a: New For Each(false,false)[bag] - scope-9
-        |   |
-        |   Cast[int] - scope-4
-        |   |
-        |   |---Project[bytearray][0] - scope-3
-        |   |
-        |   Cast[chararray] - scope-7
-        |   |
-        |   |---Project[bytearray][1] - scope-6
+    |---Limit - scope-2
         |
-        |---Limit - scope-2
+        |---Limit - scope-27
             |
-            |---Limit - scope-27
-                |
-                |---POValueInputTez - scope-26	<-	 scope-23
+            |---POValueInputTez - scope-26	<-	 scope-23
 Tez vertex scope-28
 # Plan on vertex
 POValueOutputTez - scope-29	->	 [scope-30]
@@ -48,31 +45,27 @@ POValueOutputTez - scope-29	->	 [scope-3
     |---b: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-10
 Tez vertex scope-30
 # Plan on vertex
-POValueOutputTez - scope-45	->	 [scope-38]
+POValueOutputTez - scope-36	->	 [scope-33]
 |
-|---d: Limit - scope-46
+|---c: New For Each(false,false)[bag] - scope-19
+    |   |
+    |   Cast[int] - scope-14
+    |   |
+    |   |---Project[bytearray][1] - scope-13
+    |   |
+    |   Cast[chararray] - scope-17
+    |   |
+    |   |---Project[bytearray][0] - scope-16
     |
-    |---c: New For Each(false,false)[bag] - scope-19
-        |   |
-        |   Cast[int] - scope-14
-        |   |
-        |   |---Project[bytearray][1] - scope-13
-        |   |
-        |   Cast[chararray] - scope-17
-        |   |
-        |   |---Project[bytearray][0] - scope-16
+    |---Limit - scope-12
         |
-        |---Limit - scope-12
+        |---Limit - scope-32
             |
-            |---Limit - scope-32
-                |
-                |---POValueInputTez - scope-31	<-	 scope-28
-Tez vertex group scope-42	<-	 [scope-25, scope-30]	->	 scope-38
-# No plan on vertex group
-Tez vertex scope-38
+            |---POValueInputTez - scope-31	<-	 scope-28
+Tez vertex scope-33
 # Plan on vertex
 d: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-22
 |
-|---d: Limit - scope-40
+|---d: Limit - scope-21
     |
-    |---POValueInputTez - scope-39	<-	 scope-42
+    |---POShuffledValueInputTez - scope-34	<-	 [scope-30, scope-25]

Modified: pig/branches/branch-0.14/test/org/apache/pig/tez/TestTezAutoParallelism.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/test/org/apache/pig/tez/TestTezAutoParallelism.java?rev=1635881&r1=1635880&r2=1635881&view=diff
==============================================================================
--- pig/branches/branch-0.14/test/org/apache/pig/tez/TestTezAutoParallelism.java (original)
+++ pig/branches/branch-0.14/test/org/apache/pig/tez/TestTezAutoParallelism.java Fri Oct 31 23:20:55 2014
@@ -18,10 +18,15 @@
 package org.apache.pig.tez;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
 import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
 import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.Iterator;
+import java.util.List;
 import java.util.Properties;
 import java.util.Random;
 
@@ -33,6 +38,9 @@ import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.InputSizeReducerEstimator;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.ParallelismSetter;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.test.MiniGenericCluster;
 import org.apache.pig.test.Util;
 import org.junit.After;
@@ -98,7 +106,7 @@ public class TestTezAutoParallelism {
         }
         w.close();
         Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE1, INPUT_FILE1);
-        
+
         w = new PrintWriter(new FileWriter(INPUT_DIR + "/" + INPUT_FILE2));
         for (String name : boyNames) {
             w.println(name + "\t" + "M");
@@ -119,13 +127,14 @@ public class TestTezAutoParallelism {
         // parallelism is 3 originally, reduce to 1
         pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
         pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
-        pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, 
+        pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
                 Long.toString(InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER));
         pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);");
         pigServer.registerQuery("B = group A by name;");
         pigServer.store("B", "output1");
         FileSystem fs = cluster.getFileSystem();
         FileStatus[] files = fs.listStatus(new Path("output1"), new PathFilter(){
+            @Override
             public boolean accept(Path path) {
                 if (path.getName().startsWith("part")) {
                     return true;
@@ -141,7 +150,7 @@ public class TestTezAutoParallelism {
         // order by parallelism is 3 originally, reduce to 1
         pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
         pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
-        pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, 
+        pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
                 Long.toString(InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER));
         pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);");
         pigServer.registerQuery("B = group A by name parallel 3;");
@@ -150,6 +159,7 @@ public class TestTezAutoParallelism {
         pigServer.store("D", "output2");
         FileSystem fs = cluster.getFileSystem();
         FileStatus[] files = fs.listStatus(new Path("output2"), new PathFilter(){
+            @Override
             public boolean accept(Path path) {
                 if (path.getName().startsWith("part")) {
                     return true;
@@ -173,6 +183,7 @@ public class TestTezAutoParallelism {
         pigServer.store("D", "output3");
         FileSystem fs = cluster.getFileSystem();
         FileStatus[] files = fs.listStatus(new Path("output3"), new PathFilter(){
+            @Override
             public boolean accept(Path path) {
                 if (path.getName().startsWith("part")) {
                     return true;
@@ -188,7 +199,7 @@ public class TestTezAutoParallelism {
         // skewed join parallelism is 4 originally, reduce to 1
         pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
         pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
-        pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, 
+        pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
                 Long.toString(InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER));
         pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);");
         pigServer.registerQuery("B = load '" + INPUT_FILE2 + "' as (name:chararray, gender:chararray);");
@@ -196,6 +207,7 @@ public class TestTezAutoParallelism {
         pigServer.store("C", "output4");
         FileSystem fs = cluster.getFileSystem();
         FileStatus[] files = fs.listStatus(new Path("output4"), new PathFilter(){
+            @Override
             public boolean accept(Path path) {
                 if (path.getName().startsWith("part")) {
                     return true;
@@ -218,6 +230,7 @@ public class TestTezAutoParallelism {
         pigServer.store("C", "output5");
         FileSystem fs = cluster.getFileSystem();
         FileStatus[] files = fs.listStatus(new Path("output5"), new PathFilter(){
+            @Override
             public boolean accept(Path path) {
                 if (path.getName().startsWith("part")) {
                     return true;
@@ -227,4 +240,38 @@ public class TestTezAutoParallelism {
         });
         assertEquals(files.length, 4);
     }
+
+    @Test
+    public void testSkewedJoinIncreaseIntermediateParallelism() throws IOException{
+        NodeIdGenerator.reset();
+        PigServer.resetScope();
+        StringWriter writer = new StringWriter();
+        // When there is a combiner operation involved user specified parallelism is overriden
+        Util.createLogAppender(ParallelismSetter.class, "testSkewedJoinIncreaseIntermediateParallelism", writer);
+        try {
+            pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
+            pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
+            pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "80000");
+            pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);");
+            pigServer.registerQuery("B = load '" + INPUT_FILE2 + "' as (name:chararray, gender:chararray);");
+            pigServer.registerQuery("C = join A by name, B by name using 'skewed' parallel 1;");
+            pigServer.registerQuery("D = group C by A::name;");
+            pigServer.registerQuery("E = foreach D generate group, COUNT(C.A::name);");
+            Iterator<Tuple> iter = pigServer.openIterator("E");
+            List<Tuple> expectedResults = Util
+                    .getTuplesFromConstantTupleStrings(new String[] {
+                            "('Abigail',56L)", "('Alexander',45L)", "('Ava',60L)",
+                            "('Daniel',68L)", "('Elizabeth',42L)",
+                            "('Emily',57L)", "('Emma',50L)", "('Ethan',50L)",
+                            "('Isabella',43L)", "('Jacob',43L)", "('Jayden',59L)",
+                            "('Liam',46L)", "('Madison',46L)", "('Mason',54L)",
+                            "('Mia',51L)", "('Michael',47L)", "('Noah',38L)",
+                            "('Olivia',50L)", "('Sophia',52L)", "('William',43L)" });
+
+            Util.checkQueryOutputsAfterSort(iter, expectedResults);
+            assertTrue(writer.toString().contains("Increased requested parallelism of scope-40 to 4"));
+        } finally {
+            Util.removeLogAppender(ParallelismSetter.class, "testSkewedJoinIncreaseIntermediateParallelism");
+        }
+    }
 }