You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2014/09/04 02:11:52 UTC

svn commit: r1622382 - in /pig/trunk: ./ shims/test/hadoop23/org/apache/pig/test/ src/org/apache/pig/backend/hadoop/executionengine/tez/ src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/ src/org/apache/pig/backend/hadoop/executionengine...

Author: daijy
Date: Thu Sep  4 00:11:51 2014
New Revision: 1622382

URL: http://svn.apache.org/r1622382
Log:
PIG-4143: Port more mini cluster tests to Tez - part 7

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/LoaderProcessor.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
    pig/trunk/src/org/apache/pig/impl/plan/OperatorPlan.java
    pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezStats.java
    pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezTaskStats.java
    pig/trunk/test/org/apache/pig/test/TestGroupConstParallel.java
    pig/trunk/test/org/apache/pig/test/TestJobSubmission.java
    pig/trunk/test/org/apache/pig/test/TestMergeJoin.java
    pig/trunk/test/org/apache/pig/test/TestNativeMapReduce.java
    pig/trunk/test/org/apache/pig/test/TestPigProgressReporting.java
    pig/trunk/test/org/apache/pig/test/Util.java
    pig/trunk/test/org/apache/pig/test/utils/ReportingUDF.java
    pig/trunk/test/tez-tests

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1622382&r1=1622381&r2=1622382&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Sep  4 00:11:51 2014
@@ -70,6 +70,8 @@ OPTIMIZATIONS
  
 BUG FIXES
 
+PIG-4143: Port more mini cluster tests to Tez - part 7 (daijy)
+
 PIG-4149: Rounding issue in FindQuantiles (daijy)
 
 PIG-4145: Port local mode tests to Tez - part1 (daijy)

Modified: pig/trunk/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java
URL: http://svn.apache.org/viewvc/pig/trunk/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java?rev=1622382&r1=1622381&r2=1622382&view=diff
==============================================================================
--- pig/trunk/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java (original)
+++ pig/trunk/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java Thu Sep  4 00:11:51 2014
@@ -33,7 +33,6 @@ import org.apache.hadoop.mapreduce.v2.Mi
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.pig.ExecType;
 import org.apache.pig.PigConfiguration;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
 import org.apache.pig.backend.hadoop.executionengine.tez.TezExecType;
 import org.apache.pig.backend.hadoop.executionengine.tez.TezSessionManager;
 import org.apache.tez.dag.api.TezConfiguration;
@@ -97,7 +96,6 @@ public class TezMiniCluster extends Mini
             m_mr.init(m_dfs_conf);
             m_mr.start();
             m_mr_conf = m_mr.getConfig();
-            m_mr_conf.set(MRConfiguration.FRAMEWORK_NAME, "yarn-tez");
             m_mr_conf.set(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
                     System.getProperty("java.class.path"));
             m_mr_conf.set(MRJobConfig.MAP_JAVA_OPTS, "-Xmx2048m");

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1622382&r1=1622381&r2=1622382&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Thu Sep  4 00:11:51 2014
@@ -403,20 +403,6 @@ public class TezDagBuilder extends TezOp
             payloadConf.set(PigProcessor.SORT_VERTEX, tezOp.sortOperator.getOperatorKey().toString());
         }
 
-        String tmp;
-        long maxCombinedSplitSize = 0;
-        if (!tezOp.combineSmallSplits() || pc.getProperties().getProperty(PigConfiguration.PIG_SPLIT_COMBINATION, "true").equals("false"))
-            payloadConf.setBoolean(PigConfiguration.PIG_NO_SPLIT_COMBINATION, true);
-        else if ((tmp = pc.getProperties().getProperty(PigConfiguration.PIG_MAX_COMBINED_SPLIT_SIZE, null)) != null) {
-            try {
-                maxCombinedSplitSize = Long.parseLong(tmp);
-            } catch (NumberFormatException e) {
-                log.warn("Invalid numeric format for pig.maxCombinedSplitSize; use the default maximum combined split size");
-            }
-        }
-        if (maxCombinedSplitSize > 0)
-            payloadConf.setLong("pig.maxCombinedSplitSize", maxCombinedSplitSize);
-
         payloadConf.set("pig.inputs", ObjectSerializer.serialize(tezOp.getLoaderInfo().getInp()));
         payloadConf.set("pig.inpSignatures", ObjectSerializer.serialize(tezOp.getLoaderInfo().getInpSignatureLists()));
         payloadConf.set("pig.inpLimits", ObjectSerializer.serialize(tezOp.getLoaderInfo().getInpLimits()));
@@ -559,6 +545,11 @@ public class TezDagBuilder extends TezOp
             log.info("Estimate quantile for sample aggregation vertex " + tezOp.getOperatorKey().toString());
         }
 
+        // set various parallelism into the job conf for later analysis, PIG-2779
+        payloadConf.setInt("pig.info.reducers.default.parallel", pc.defaultParallel);
+        payloadConf.setInt("pig.info.reducers.requested.parallel", tezOp.getRequestedParallelism());
+        payloadConf.setInt("pig.info.reducers.estimated.parallel", tezOp.getEstimatedParallelism());
+
         // Take our assembled configuration and create a vertex
         UserPayload userPayload = TezUtils.createUserPayloadFromConf(payloadConf);
         procDesc.setUserPayload(userPayload);

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java?rev=1622382&r1=1622381&r2=1622382&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java Thu Sep  4 00:11:51 2014
@@ -24,15 +24,21 @@ import java.io.PrintStream;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URL;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.OperatorPlan;
+import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.Pair;
 
 /**
  * A Plan used to create the plan of Tez operators which can be converted into
@@ -156,5 +162,65 @@ public class TezOperPlan extends Operato
         return super.disconnect(from, to);
     }
 
+    
+    /**
+     * Move everything below a given operator to the new operator plan.  The specified operator will
+     * be moved and will be the root of the new operator plan
+     * @param root Operator to move everything under including the root operator
+     * @param newPlan new operator plan to move things into
+     * @throws PlanException 
+     */
+    public void moveTree(TezOperator root, TezOperPlan newPlan) throws PlanException {
+        List<TezOperator> list = new ArrayList<TezOperator>();
+        list.add(root);
+        int prevSize = 0;
+        int pos = 0;
+        while (list.size() > prevSize) {
+            prevSize = list.size();
+            TezOperator node = list.get(pos);
+            if (getSuccessors(node)!=null) {
+                for (TezOperator succ : getSuccessors(node)) {
+                    if (!list.contains(succ)) {
+                        list.add(succ);
+                    }
+                }
+            }
+            if (getPredecessors(node)!=null) {
+                for (TezOperator pred : getPredecessors(node)) {
+                    if (!list.contains(pred)) {
+                        list.add(pred);
+                    }
+                }
+            }
+            pos++;
+        }
+
+        for (TezOperator node: list) {
+            newPlan.add(node);
+        }
+
+        Set<Pair<TezOperator, TezOperator>> toReconnect = new HashSet<Pair<TezOperator, TezOperator>>();
+        for (TezOperator from : mFromEdges.keySet()) {
+            List<TezOperator> tos = mFromEdges.get(from);
+            for (TezOperator to : tos) {
+                if (list.contains(from) || list.contains(to)) {
+                    toReconnect.add(new Pair<TezOperator, TezOperator>(from, to));
+                }
+            }
+        }
+
+        for (Pair<TezOperator, TezOperator> pair : toReconnect) {
+            if (list.contains(pair.first) && list.contains(pair.second)) {
+                // Need to reconnect in newPlan
+                TezEdgeDescriptor edge = pair.second.inEdges.get(pair.first.getOperatorKey());
+                TezCompilerUtil.connect(newPlan, pair.first, pair.second, edge);
+            }
+        }
+
+        for (TezOperator node : list) {
+            // Simply remove from plan, don't deal with inEdges/outEdges
+            super.remove(node);
+        }
+    }
 }
 

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/LoaderProcessor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/LoaderProcessor.java?rev=1622382&r1=1622381&r2=1622382&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/LoaderProcessor.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/LoaderProcessor.java Thu Sep  4 00:11:51 2014
@@ -21,10 +21,13 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.pig.LoadFunc;
+import org.apache.pig.PigConfiguration;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
@@ -46,6 +49,7 @@ import org.apache.tez.mapreduce.hadoop.M
 public class LoaderProcessor extends TezOpPlanVisitor {
     private Configuration conf;
     private PigContext pc;
+    private static final Log log = LogFactory.getLog(LoaderProcessor.class);
     public LoaderProcessor(TezOperPlan plan, PigContext pigContext) {
         super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan));
         this.pc = pigContext;
@@ -125,6 +129,19 @@ public class LoaderProcessor extends Tez
             conf.set("pig.inpTargets", ObjectSerializer.serialize(inpTargets));
             conf.set("pig.inpSignatures", ObjectSerializer.serialize(inpSignatureLists));
             conf.set("pig.inpLimits", ObjectSerializer.serialize(inpLimits));
+            String tmp;
+            long maxCombinedSplitSize = 0;
+            if (!tezOp.combineSmallSplits() || pc.getProperties().getProperty(PigConfiguration.PIG_SPLIT_COMBINATION, "true").equals("false"))
+                conf.setBoolean(PigConfiguration.PIG_NO_SPLIT_COMBINATION, true);
+            else if ((tmp = pc.getProperties().getProperty(PigConfiguration.PIG_MAX_COMBINED_SPLIT_SIZE, null)) != null) {
+                try {
+                    maxCombinedSplitSize = Long.parseLong(tmp);
+                } catch (NumberFormatException e) {
+                    log.warn("Invalid numeric format for pig.maxCombinedSplitSize; use the default maximum combined split size");
+                }
+            }
+            if (maxCombinedSplitSize > 0)
+                conf.setLong("pig.maxCombinedSplitSize", maxCombinedSplitSize);
             tezOp.getLoaderInfo().setInpSignatureLists(inpSignatureLists);
             tezOp.getLoaderInfo().setInp(inp);
             tezOp.getLoaderInfo().setInpLimits(inpLimits);

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java?rev=1622382&r1=1622381&r2=1622382&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java Thu Sep  4 00:11:51 2014
@@ -103,11 +103,13 @@ public class TezCompilerUtil {
 
     static public void connect(TezOperPlan plan, TezOperator from, TezOperator to, TezEdgeDescriptor edge) throws PlanException {
         plan.connect(from, to);
-        PhysicalOperator leaf = from.plan.getLeaves().get(0);
-        // It could be POStoreTez incase of sampling job in order by
-        if (leaf instanceof POLocalRearrangeTez) {
-            POLocalRearrangeTez lr = (POLocalRearrangeTez) leaf;
-            lr.setOutputKey(to.getOperatorKey().toString());
+        if (from.plan.getLeaves()!=null && !from.plan.getLeaves().isEmpty()) {
+            PhysicalOperator leaf = from.plan.getLeaves().get(0);
+            // It could be POStoreTez incase of sampling job in order by
+            if (leaf instanceof POLocalRearrangeTez) {
+                POLocalRearrangeTez lr = (POLocalRearrangeTez) leaf;
+                lr.setOutputKey(to.getOperatorKey().toString());
+            }
         }
         // Add edge descriptors to old and new operators
         to.inEdges.put(from.getOperatorKey(), edge);

Modified: pig/trunk/src/org/apache/pig/impl/plan/OperatorPlan.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/plan/OperatorPlan.java?rev=1622382&r1=1622381&r2=1622382&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/plan/OperatorPlan.java (original)
+++ pig/trunk/src/org/apache/pig/impl/plan/OperatorPlan.java Thu Sep  4 00:11:51 2014
@@ -26,6 +26,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Deque;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -35,6 +36,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.PigException;
 import org.apache.pig.impl.util.MultiMap;
+import org.apache.pig.impl.util.Pair;
 
 
 //import org.apache.commons.collections.map.MultiValueMap;
@@ -284,42 +286,6 @@ public abstract class OperatorPlan<E ext
             }
         }
     }
-    
-    /**
-     * Move everything below a given operator to the new operator plan.  The specified operator will
-     * be moved and will be the root of the new operator plan
-     * @param root Operator to move everything after
-     * @param newPlan new operator plan to move things into
-     * @throws PlanException 
-     */
-    public void moveTree(E root, OperatorPlan<E> newPlan) throws PlanException {
-        Deque<E> queue = new ArrayDeque<E>();
-        queue.addLast(root);
-        while (!queue.isEmpty()) {
-            E node = queue.poll();
-            if (getSuccessors(node)!=null) {
-                for (E succ : getSuccessors(node)) {
-                    if (!queue.contains(succ)) {
-                        queue.addLast(succ);
-                    }
-                }
-            }
-            newPlan.add(node);
-        }
-
-        for (E from : mFromEdges.keySet()) {
-            if (newPlan.mOps.containsKey(from)) {
-                for (E to : mFromEdges.get(from)) {
-                    if (newPlan.mOps.containsKey(to)) {
-                        newPlan.connect(from, to);
-                    }
-                }
-            }
-        }
-            
-        trimBelow(root);
-        remove(root);
-    }
 
     /**
      * Trim everything above a given operator.  The specified operator will

Modified: pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezStats.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezStats.java?rev=1622382&r1=1622381&r2=1622382&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezStats.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezStats.java Thu Sep  4 00:11:51 2014
@@ -218,7 +218,7 @@ public class TezStats extends PigStats {
             if (v != null) {
                 UserPayload payload = v.getProcessorDescriptor().getUserPayload();
                 Configuration conf = TezUtils.createConfFromUserPayload(payload);
-                addVertexStats(name, conf, succeeded, tezJob.getVertexCounters(name));
+                addVertexStats(name, conf, succeeded, v.getParallelism(), tezJob.getVertexCounters(name));
             }
         }
         if (!succeeded) {
@@ -226,12 +226,13 @@ public class TezStats extends PigStats {
         }
     }
 
-    private void addVertexStats(String tezOpName, Configuration conf, boolean succeeded,
+    private void addVertexStats(String tezOpName, Configuration conf, boolean succeeded, int parallelism,
             Map<String, Map<String, Long>> map) {
         TezTaskStats stats = tezOpVertexMap.get(tezOpName);
         stats.setConf(conf);
         stats.setId(tezOpName);
         stats.setSuccessful(succeeded);
+        stats.setParallelism(parallelism);
         if (map == null) {
             if (stats.hasLoadOrStore()) {
                 LOG.warn("Unable to get input(s)/output(s) of the job");

Modified: pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezTaskStats.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezTaskStats.java?rev=1622382&r1=1622381&r2=1622382&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezTaskStats.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezTaskStats.java Thu Sep  4 00:11:51 2014
@@ -29,6 +29,7 @@ public class TezTaskStats extends JobSta
     private static final Log LOG = LogFactory.getLog(TezTaskStats.class);
 
     private String vertexName;
+    private int parallelism;
     private List<POStore> stores = null;
     private List<FileSpec> loads = null;
 
@@ -40,6 +41,10 @@ public class TezTaskStats extends JobSta
         this.vertexName = vertexName;
     }
 
+    public void setParallelism(int p) {
+        this.parallelism = p;
+    }
+
     @Override
     public String getJobId() {
         return (vertexName == null) ? "" : vertexName;
@@ -253,6 +258,10 @@ public class TezTaskStats extends JobSta
         throw new UnsupportedOperationException();
     }
 
+    public int getParallelism() {
+        return parallelism;
+    }
+
     public boolean hasLoadOrStore() {
         if ((loads != null && !loads.isEmpty())
                 || (stores != null && !stores.isEmpty())) {

Modified: pig/trunk/test/org/apache/pig/test/TestGroupConstParallel.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestGroupConstParallel.java?rev=1622382&r1=1622381&r2=1622382&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestGroupConstParallel.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestGroupConstParallel.java Thu Sep  4 00:11:51 2014
@@ -17,44 +17,32 @@
  */
 package org.apache.pig.test;
 
-import static org.junit.Assert.assertEquals;
-
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.jobcontrol.Job;
-import org.apache.hadoop.mapred.jobcontrol.JobControl;
-import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
-import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.util.ConfigurationValidator;
 import org.apache.pig.newplan.OperatorPlan;
 import org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer;
 import org.apache.pig.newplan.logical.rules.GroupByConstParallelSetter;
 import org.apache.pig.newplan.optimizer.Rule;
-import org.apache.pig.test.utils.GenPhyOp;
-import org.apache.pig.tools.pigstats.JobStats;
 import org.apache.pig.tools.pigstats.PigStats;
 import org.apache.pig.tools.pigstats.PigStats.JobGraph;
-import org.apache.pig.tools.pigstats.mapreduce.MRJobStats;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
+import org.junit.Ignore;
 import org.junit.Test;
 
-public class TestGroupConstParallel {
+@Ignore
+public abstract class TestGroupConstParallel {
 
     private static final String INPUT_FILE = "TestGroupConstParallelInp";
-    private static MiniCluster cluster = MiniCluster.buildCluster();
+    private static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
 
     
     @BeforeClass
@@ -79,7 +67,7 @@ public class TestGroupConstParallel {
      */
     @Test
     public void testGroupAllWithParallel() throws Exception {
-        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster
+        PigServer pigServer = new PigServer(cluster.getExecType(), cluster
                 .getProperties());
         
         
@@ -95,42 +83,30 @@ public class TestGroupConstParallel {
             Util.checkQueryOutputsAfterSort(iter, expectedRes);
             
             JobGraph jGraph = PigStats.get().getJobGraph();
-            assertEquals(1, jGraph.size());
-            // find added map-only concatenate job 
-            MRJobStats js = (MRJobStats)jGraph.getSources().get(0);
-            assertEquals(1, js.getNumberMaps());   
-            assertEquals(1, js.getNumberReduces()); 
+            checkGroupAllWithParallelGraphResult(jGraph);
         }
-
     }
-    
-    
+
+    abstract protected void checkGroupAllWithParallelGraphResult(JobGraph jGraph);
+
     /**
      * Test parallelism for group by constant
      * @throws Throwable
      */
     @Test
     public void testGroupConstWithParallel() throws Throwable {
-        PigContext pc = new PigContext(ExecType.MAPREDUCE, cluster.getProperties());
+        PigContext pc = new PigContext(cluster.getExecType(), cluster.getProperties());
         pc.defaultParallel = 100;
         pc.connect();
         
-        String query = "a = load 'input';\n" + "b = group a by 1;" + "store b into 'output';";
-        PigServer pigServer = new PigServer( ExecType.MAPREDUCE, cluster.getProperties() );
+        String query = "a = load '" + INPUT_FILE + "';\n" + "b = group a by 1;" + "store b into 'output';";
+        PigServer pigServer = new PigServer( cluster.getExecType(), cluster.getProperties() );
         PhysicalPlan pp = Util.buildPp( pigServer, query );
-        
-        MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
 
-        ConfigurationValidator.validatePigProperties(pc.getProperties());
-        Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties());
-        JobControlCompiler jcc = new JobControlCompiler(pc, conf);
-        
-        JobControl jobControl = jcc.compile(mrPlan, "Test");
-        Job job = jobControl.getWaitingJobs().get(0);
-        int parallel = job.getJobConf().getNumReduceTasks();
-
-        assertEquals("parallism", 1, parallel);
+        checkGroupConstWithParallelResult(pp, pc);
     }
+
+    abstract protected void checkGroupConstWithParallelResult(PhysicalPlan pp, PigContext pc) throws Exception;
     
     /**
      *  Test parallelism for group by column
@@ -138,27 +114,20 @@ public class TestGroupConstParallel {
      */
     @Test
     public void testGroupNonConstWithParallel() throws Throwable {
-        PigContext pc = new PigContext(ExecType.MAPREDUCE, cluster.getProperties());
+        PigContext pc = new PigContext(cluster.getExecType(), cluster.getProperties());
         pc.defaultParallel = 100;
         pc.connect();
         
-        PigServer pigServer = new PigServer( ExecType.MAPREDUCE, cluster.getProperties() );
-        String query =  "a = load 'input';\n" + "b = group a by $0;" + "store b into 'output';";
+        PigServer pigServer = new PigServer( cluster.getExecType(), cluster.getProperties() );
+        String query =  "a = load '" + INPUT_FILE + "';\n" + "b = group a by $0;" + "store b into 'output';";
         
         PhysicalPlan pp = Util.buildPp( pigServer, query );
-        MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
 
-        ConfigurationValidator.validatePigProperties(pc.getProperties());
-        Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties());
-        JobControlCompiler jcc = new JobControlCompiler(pc, conf);
-        
-        JobControl jobControl = jcc.compile(mrPlan, "Test");
-        Job job = jobControl.getWaitingJobs().get(0);
-        int parallel = job.getJobConf().getNumReduceTasks();
-        
-        assertEquals("parallism", 100, parallel);
+        checkGroupNonConstWithParallelResult(pp, pc);
     }
 
+    abstract protected void checkGroupNonConstWithParallelResult(PhysicalPlan pp, PigContext pc) throws Exception;
+
     public class MyPlanOptimizer extends LogicalPlanOptimizer {
 
         protected MyPlanOptimizer(OperatorPlan p,  int iterations) {

Modified: pig/trunk/test/org/apache/pig/test/TestJobSubmission.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestJobSubmission.java?rev=1622382&r1=1622381&r2=1622382&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestJobSubmission.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestJobSubmission.java Thu Sep  4 00:11:51 2014
@@ -37,31 +37,28 @@ import org.apache.hadoop.hbase.MiniHBase
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.mapred.jobcontrol.Job;
 import org.apache.hadoop.mapred.jobcontrol.JobControl;
-import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
-import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
-import org.apache.pig.data.DataType;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.util.ConfigurationValidator;
 import org.apache.pig.test.utils.GenPhyOp;
 import org.junit.After;
 import org.junit.AfterClass;
+import org.junit.Assume;
 import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.Ignore;
 import org.junit.Test;
 
-public class TestJobSubmission {
+@Ignore
+abstract public class TestJobSubmission {
 
 
     static PigContext pc;
@@ -75,11 +72,11 @@ public class TestJobSubmission {
     String curDir;
     String inpDir;
     String golDir;
-    static MiniCluster cluster = MiniCluster.buildCluster();
+    static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
 
     @BeforeClass
     public static void onetimeSetUp() throws Exception {
-        pc = new PigContext(ExecType.MAPREDUCE, cluster.getProperties());
+        pc = new PigContext(cluster.getExecType(), cluster.getProperties());
         try {
             pc.connect();
         } catch (ExecException e) {
@@ -115,63 +112,35 @@ public class TestJobSubmission {
 
     @Test
     public void testJobControlCompilerErr() throws Exception {
-        String query = "a = load 'input';" + "b = order a by $0;" + "store b into 'output';";
-        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        String query = "a = load '/passwd' as (a1:bag{(t:chararray)});" + "b = order a by a1;" + "store b into 'output';";
+        PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
         PhysicalPlan pp = Util.buildPp(pigServer, query);
-        POStore store = GenPhyOp.dummyPigStorageOp();
-        pp.addAsLeaf(store);
-        MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
-
-        for(MapReduceOper mro: mrPlan.getLeaves()) {
-            if(mro.reducePlan != null) {
-                PhysicalOperator po = mro.reducePlan.getRoots().get(0);
-                if (po instanceof POPackage) {
-                    ((POPackage) po).getPkgr().setKeyType(DataType.BAG);
-                    mro.setGlobalSort(true);
-                }
-            }
-        }
-
-        ConfigurationValidator.validatePigProperties(pc.getProperties());
-        Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties());
-        JobControlCompiler jcc = new JobControlCompiler(pc, conf);
-        try {
-            jcc.compile(mrPlan, "Test");
-        } catch (JobCreationException jce) {
-            assertTrue(jce.getErrorCode() == 1068);
-        }
+        checkJobControlCompilerErrResult(pp, pc);
     }
 
+    abstract protected void checkJobControlCompilerErrResult(PhysicalPlan pp, PigContext pc) throws Exception;
+
     @Test
     public void testDefaultParallel() throws Throwable {
         pc.defaultParallel = 100;
 
-        String query = "a = load 'input';" + "b = group a by $0;" + "store b into 'output';";
-        PigServer ps = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        String query = "a = load '/passwd';" + "b = group a by $0;" + "store b into 'output';";
+        PigServer ps = new PigServer(cluster.getExecType(), cluster.getProperties());
         PhysicalPlan pp = Util.buildPp(ps, query);
-        MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
-
-        ConfigurationValidator.validatePigProperties(pc.getProperties());
-        Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties());
-        JobControlCompiler jcc = new JobControlCompiler(pc, conf);
-
-        JobControl jobControl = jcc.compile(mrPlan, "Test");
-        Job job = jobControl.getWaitingJobs().get(0);
-        int parallel = job.getJobConf().getNumReduceTasks();
-
-        assertEquals(100, parallel);
-        Util.assertParallelValues(100, -1, -1, 100, job.getJobConf());
+        checkDefaultParallelResult(pp, pc);
 
         pc.defaultParallel = -1;
     }
 
+    abstract protected void checkDefaultParallelResult(PhysicalPlan pp, PigContext pc) throws Exception;
+
     @Test
     public void testDefaultParallelInSort() throws Throwable {
         // default_parallel is considered only at runtime, so here we only test requested parallel
         // more thorough tests can be found in TestNumberOfReducers.java
 
         String query = "a = load 'input';" + "b = order a by $0 parallel 100;" + "store b into 'output';";
-        PigServer ps = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        PigServer ps = new PigServer(cluster.getExecType(), cluster.getProperties());
         PhysicalPlan pp = Util.buildPp(ps, query);
         MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
 
@@ -198,7 +167,7 @@ public class TestJobSubmission {
                 "b = load 'input';" +
                 "c = join a by $0, b by $0 using 'skewed' parallel 100;" +
                 "store c into 'output';";
-        PigServer ps = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        PigServer ps = new PigServer(cluster.getExecType(), cluster.getProperties());
         PhysicalPlan pp = Util.buildPp(ps, query);
         MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
 
@@ -219,6 +188,10 @@ public class TestJobSubmission {
 
     @Test
     public void testReducerNumEstimation() throws Exception{
+        // Skip the test for Tez. Tez use a different mechanism.
+        // Equivalent test is in TestTezAutoParallelism
+        Assume.assumeTrue("Skip this test for TEZ",
+                Util.isMapredExecType(cluster.getExecType()));
         // use the estimation
         Configuration conf = HBaseConfiguration.create(new Configuration());
         HBaseTestingUtility util = new HBaseTestingUtility(conf);
@@ -228,7 +201,7 @@ public class TestJobSubmission {
         String query = "a = load '/passwd';" +
                 "b = group a by $0;" +
                 "store b into 'output';";
-        PigServer ps = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        PigServer ps = new PigServer(cluster.getExecType(), cluster.getProperties());
         PhysicalPlan pp = Util.buildPp(ps, query);
         MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
 
@@ -295,6 +268,10 @@ public class TestJobSubmission {
 
     @Test
     public void testReducerNumEstimationForOrderBy() throws Exception{
+        // Skip the test for Tez. Tez use a different mechanism.
+        // Equivalent test is in TestTezAutoParallelism
+        Assume.assumeTrue("Skip this test for TEZ",
+                Util.isMapredExecType(cluster.getExecType()));
         // use the estimation
         pc.getProperties().setProperty("pig.exec.reducers.bytes.per.reducer", "100");
         pc.getProperties().setProperty("pig.exec.reducers.max", "10");
@@ -302,7 +279,7 @@ public class TestJobSubmission {
         String query = "a = load '/passwd';" +
                 "b = order a by $0;" +
                 "store b into 'output';";
-        PigServer ps = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        PigServer ps = new PigServer(cluster.getExecType(), cluster.getProperties());
         PhysicalPlan pp = Util.buildPp(ps, query);
 
         MROperPlan mrPlan = Util.buildMRPlanWithOptimizer(pp, pc);

Modified: pig/trunk/test/org/apache/pig/test/TestMergeJoin.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestMergeJoin.java?rev=1622382&r1=1622381&r2=1622382&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestMergeJoin.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestMergeJoin.java Thu Sep  4 00:11:51 2014
@@ -53,14 +53,14 @@ public class TestMergeJoin {
     private static final String INPUT_FILE = "testMergeJoinInput.txt";
     private static final String INPUT_FILE2 = "testMergeJoinInput2.txt";
     private PigServer pigServer;
-    private static MiniCluster cluster = MiniCluster.buildCluster();
+    private static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
 
     public TestMergeJoin() throws ExecException{
 
         Properties props = cluster.getProperties();
         props.setProperty(MRConfiguration.MAP_MAX_ATTEMPTS, "1");
         props.setProperty(MRConfiguration.REDUCE_MAX_ATTEMPTS, "1");
-        pigServer = new PigServer(ExecType.MAPREDUCE, props);
+        pigServer = new PigServer(cluster.getExecType(), props);
     }
     /**
      * @throws java.lang.Exception

Modified: pig/trunk/test/org/apache/pig/test/TestNativeMapReduce.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestNativeMapReduce.java?rev=1622382&r1=1622381&r2=1622382&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestNativeMapReduce.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestNativeMapReduce.java Thu Sep  4 00:11:51 2014
@@ -27,10 +27,11 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 
-import org.apache.pig.ExecType;
+import org.apache.hadoop.mapred.FileAlreadyExistsException;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecJob;
 import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS;
+import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.tools.pigstats.PigStats;
 import org.junit.AfterClass;
@@ -58,7 +59,7 @@ public class TestNativeMapReduce  {
      * file if specified will be skipped by the wordcount udf
      */
     final static String STOPWORD_FILE = "TestNMapReduceStopwFile";
-    static MiniCluster cluster = MiniCluster.buildCluster();
+    static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
     private PigServer pigServer = null;
 
     /**
@@ -97,7 +98,7 @@ public class TestNativeMapReduce  {
 
     @Before
     public void setUp() throws Exception{
-        pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
 
         //createWordCountJar();
     }
@@ -206,6 +207,9 @@ public class TestNativeMapReduce  {
 
             assertTrue("job failed", PigStats.get().getReturnCode() != 0);
 
+        } catch (JobCreationException e) {
+            // Running in Tez mode throw exception
+            assertTrue(e.getCause() instanceof FileAlreadyExistsException);
         }
         finally{
             // We have to manually delete intermediate mapreduce files

Modified: pig/trunk/test/org/apache/pig/test/TestPigProgressReporting.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPigProgressReporting.java?rev=1622382&r1=1622381&r2=1622382&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPigProgressReporting.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPigProgressReporting.java Thu Sep  4 00:11:51 2014
@@ -32,7 +32,7 @@ import org.junit.Test;
 
 public class TestPigProgressReporting {
 
-    static MiniCluster cluster = MiniCluster.buildCluster();
+    static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
 
     @Test
     public void testProgressReportingWithStatusMessage() throws Exception {
@@ -46,7 +46,7 @@ public class TestPigProgressReporting {
 
             Util.createInputFile(cluster, "a.txt", new String[] { "dummy"});
 
-            PigServer pig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+            PigServer pig = new PigServer(cluster.getExecType(), cluster.getProperties());
 
             String filename = prepareTempFile();
             filename = filename.replace("\\", "\\\\");

Modified: pig/trunk/test/org/apache/pig/test/Util.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/Util.java?rev=1622382&r1=1622381&r2=1622382&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/Util.java (original)
+++ pig/trunk/test/org/apache/pig/test/Util.java Thu Sep  4 00:11:51 2014
@@ -1270,7 +1270,7 @@ public class Util {
         assertConfLong(conf, MRConfiguration.REDUCE_TASKS, runtimeParallel);
     }
 
-    private static void assertConfLong(Configuration conf, String param, long expected) {
+    public static void assertConfLong(Configuration conf, String param, long expected) {
         assertEquals("Unexpected value found in configs for " + param, expected, conf.getLong(param, -1));
     }
 

Modified: pig/trunk/test/org/apache/pig/test/utils/ReportingUDF.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/utils/ReportingUDF.java?rev=1622382&r1=1622381&r2=1622382&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/utils/ReportingUDF.java (original)
+++ pig/trunk/test/org/apache/pig/test/utils/ReportingUDF.java Thu Sep  4 00:11:51 2014
@@ -22,6 +22,7 @@ import java.io.IOException;
 
 import org.apache.pig.EvalFunc;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.tools.pigstats.PigStatusReporter;
 
 public class ReportingUDF extends EvalFunc<Integer> {
 
@@ -30,7 +31,8 @@ public class ReportingUDF extends EvalFu
         
         try {
             Thread.sleep(7500);
-            getReporter().progress("Progressing");
+            PigStatusReporter reporter = PigStatusReporter.getInstance();
+            reporter.progress();
             Thread.sleep(7500);
         } catch (InterruptedException e) {
         }

Modified: pig/trunk/test/tez-tests
URL: http://svn.apache.org/viewvc/pig/trunk/test/tez-tests?rev=1622382&r1=1622381&r2=1622382&view=diff
==============================================================================
--- pig/trunk/test/tez-tests (original)
+++ pig/trunk/test/tez-tests Thu Sep  4 00:11:51 2014
@@ -65,3 +65,8 @@
 **/TestBigTypeSort.java
 **/TestCurrentTime.java
 **/TestInvokerGenerator.java
+**/TestGroupConstParallelTez.java
+**/TestJobSubmissionTez.java
+**/TestMergeJoin.java
+**/TestNativeMapReduce.java
+**/TestPigProgressReporting.java