You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2014/09/04 20:10:43 UTC

svn commit: r1622521 - in /pig/branches/spark: ./ ivy/ shims/test/hadoop23/org/apache/pig/test/ src/ src/docs/src/documentation/content/xdocs/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/ src/org/apache/pig/backend/had...

Author: cheolsoo
Date: Thu Sep  4 18:10:42 2014
New Revision: 1622521

URL: http://svn.apache.org/r1622521
Log:
Merge latest trunk changes

Added:
    pig/branches/spark/test/org/apache/pig/test/TestGroupConstParallelMR.java
      - copied unchanged from r1622520, pig/trunk/test/org/apache/pig/test/TestGroupConstParallelMR.java
    pig/branches/spark/test/org/apache/pig/test/TestJobSubmissionMR.java
      - copied unchanged from r1622520, pig/trunk/test/org/apache/pig/test/TestJobSubmissionMR.java
    pig/branches/spark/test/org/apache/pig/tez/TestGroupConstParallelTez.java
      - copied unchanged from r1622520, pig/trunk/test/org/apache/pig/tez/TestGroupConstParallelTez.java
    pig/branches/spark/test/org/apache/pig/tez/TestJobSubmissionTez.java
      - copied unchanged from r1622520, pig/trunk/test/org/apache/pig/tez/TestJobSubmissionTez.java
Modified:
    pig/branches/spark/   (props changed)
    pig/branches/spark/CHANGES.txt
    pig/branches/spark/build.xml
    pig/branches/spark/ivy.xml
    pig/branches/spark/ivy/libraries.properties
    pig/branches/spark/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java
    pig/branches/spark/src/docs/src/documentation/content/xdocs/test.xml
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/DiscreteProbabilitySampleGenerator.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/LoaderProcessor.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
    pig/branches/spark/src/org/apache/pig/impl/builtin/FindQuantiles.java
    pig/branches/spark/src/org/apache/pig/impl/plan/OperatorPlan.java
    pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezStats.java
    pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezTaskStats.java
    pig/branches/spark/src/pig-default.properties   (props changed)
    pig/branches/spark/test/org/apache/pig/backend/hadoop/accumulo/TestAccumuloPigCluster.java
    pig/branches/spark/test/org/apache/pig/builtin/TestBigTypeSort.java
    pig/branches/spark/test/org/apache/pig/builtin/TestCurrentTime.java
    pig/branches/spark/test/org/apache/pig/builtin/TestInvokerGenerator.java
    pig/branches/spark/test/org/apache/pig/pigunit/MiniClusterRunner.java
    pig/branches/spark/test/org/apache/pig/pigunit/PigTest.java
    pig/branches/spark/test/org/apache/pig/test/TestFindQuantiles.java
    pig/branches/spark/test/org/apache/pig/test/TestGroupConstParallel.java
    pig/branches/spark/test/org/apache/pig/test/TestJobSubmission.java
    pig/branches/spark/test/org/apache/pig/test/TestMergeJoin.java
    pig/branches/spark/test/org/apache/pig/test/TestNativeMapReduce.java
    pig/branches/spark/test/org/apache/pig/test/TestPigProgressReporting.java
    pig/branches/spark/test/org/apache/pig/test/Util.java
    pig/branches/spark/test/org/apache/pig/test/data/bzipdir1.bz2/bzipdir2.bz2/recordLossblockHeaderEndsAt136500.txt.bz2   (props changed)
    pig/branches/spark/test/org/apache/pig/test/pigunit/TestPigTest.java
    pig/branches/spark/test/org/apache/pig/test/pigunit/pig/TestGruntParser.java
    pig/branches/spark/test/org/apache/pig/test/utils/ReportingUDF.java
    pig/branches/spark/test/tez-tests

Propchange: pig/branches/spark/
------------------------------------------------------------------------------
  Merged /pig/trunk:r1621676-1622520

Modified: pig/branches/spark/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/spark/CHANGES.txt?rev=1622521&r1=1622520&r2=1622521&view=diff
==============================================================================
--- pig/branches/spark/CHANGES.txt (original)
+++ pig/branches/spark/CHANGES.txt Thu Sep  4 18:10:42 2014
@@ -24,6 +24,10 @@ INCOMPATIBLE CHANGES
  
 IMPROVEMENTS
 
+PIG-4146: Create a target to run mr and tez unit test in one shot (daijy)
+
+PIG-4144: Make pigunit.PigTest work in tez mode (daijy)
+
 PIG-4128: New logical optimizer rule: ConstantCalculator (daijy)
 
 PIG-4124: Command for Python streaming udf should be configurable (cheolsoo)
@@ -66,6 +70,12 @@ OPTIMIZATIONS
  
 BUG FIXES
 
+PIG-4143: Port more mini cluster tests to Tez - part 7 (daijy)
+
+PIG-4149: Rounding issue in FindQuantiles (daijy)
+
+PIG-4145: Port local mode tests to Tez - part1 (daijy)
+
 PIG-4076: Fix pom file (daijy)
 
 PIG-4140: VertexManagerEvent.getUserPayload returns ReadOnlyBuffer after TEZ-1449 (daijy)

Modified: pig/branches/spark/build.xml
URL: http://svn.apache.org/viewvc/pig/branches/spark/build.xml?rev=1622521&r1=1622520&r2=1622521&view=diff
==============================================================================
--- pig/branches/spark/build.xml (original)
+++ pig/branches/spark/build.xml Thu Sep  4 18:10:42 2014
@@ -117,7 +117,6 @@
     <!-- test configuration, use ${user.home}/build.properties to configure values  -->
     <property name="ssh.gateway" value="" />
     <property name="hod.server" value="" />
-    <property name="test.log.dir" value="${basedir}/test/logs"/>
     <property name="test.output" value="no"/>
 
     <!-- e2e test properties -->
@@ -867,23 +866,28 @@
     <!-- Run unit tests                                                     -->
     <!-- ================================================================== -->
     <target name="test-core" depends="setWindowsPath,setLinuxPath,compile-test,jar,debugger.check,jackson-pig-3039-test-download" description="Run full set of unit tests">
-        <macro-test-runner test.file="${test.all.file}" />
+        <macro-test-runner test.file="${test.all.file}" tests.failed="test-core.failed" />
+        <fail if="test-core.failed">Tests failed!</fail>
     </target>
 
     <target name="test-commit" depends="setWindowsPath,setLinuxPath,compile-test,jar,debugger.check" description="Run approximate 10-minute set of unit tests prior to commiting">
-        <macro-test-runner test.file="${test.commit.file}" />
+        <macro-test-runner test.file="${test.commit.file}" tests.failed="test-commit.failed"/>
+        <fail if="test-commit.failed">Tests failed!</fail>
     </target>
 
     <target name="test-unit" depends="setWindowsPath,setLinuxPath,compile-test,jar,debugger.check" description="Run all true unit tests">
-        <macro-test-runner test.file="${test.unit.file}" />
+        <macro-test-runner test.file="${test.unit.file}" tests.failed="test-unit.failed"/>
+        <fail if="test-unit.failed">Tests failed!</fail>
     </target>
 
     <target name="test-smoke" depends="setWindowsPath,setLinuxPath,compile-test,jar,debugger.check" description="Run 30 min smoke tests">
-        <macro-test-runner test.file="${test.smoke.file}" />
+        <macro-test-runner test.file="${test.smoke.file}" tests.failed="test-smoke.failed"/>
+        <fail if="test-smoke.failed">Tests failed!</fail>
     </target>
 
     <target name="test-tez" depends="setTezEnv,setWindowsPath,setLinuxPath,compile-test,jar,debugger.check,jackson-pig-3039-test-download" description="Run tez unit tests">
-        <macro-test-runner test.file="${test.tez.file}" />
+        <macro-test-runner test.file="${test.tez.file}" tests.failed="test-tez.failed"/>
+        <fail if="test-tez.failed">Tests failed!</fail>
     </target>
 	
     <target name="debugger.check" depends="debugger.set,debugger.unset"/>
@@ -897,12 +901,13 @@
 
     <macrodef name="macro-test-runner">
       <attribute name="test.file" />
+      <attribute name="tests.failed" />
       <sequential>
         <delete dir="${test.log.dir}"/>
         <mkdir dir="${test.log.dir}"/>
         <tempfile property="junit.tmp.dir" prefix="pig_junit_tmp" destDir="${java.io.tmpdir}" />
         <mkdir dir="${junit.tmp.dir}/"/>
-        <junit showoutput="${test.output}" printsummary="yes" haltonfailure="no" fork="yes" maxmemory="2048m" dir="${basedir}" timeout="${test.timeout}" errorProperty="tests.failed" failureProperty="tests.failed">
+        <junit showoutput="${test.output}" printsummary="yes" haltonfailure="no" fork="yes" maxmemory="2048m" dir="${basedir}" timeout="${test.timeout}" errorProperty="@{tests.failed}" failureProperty="@{tests.failed}">
             <sysproperty key="hadoopversion" value="${hadoopversion}" />
             <sysproperty key="test.exec.type" value="${test.exec.type}" />
             <sysproperty key="ssh.gateway" value="${ssh.gateway}" />
@@ -950,7 +955,6 @@
 
         </junit>
         <delete dir="${junit.tmp.dir}/"/>
-        <fail if="tests.failed">Tests failed!</fail>
       </sequential>
     </macrodef>
 
@@ -958,6 +962,37 @@
         <antcall target="test-core" inheritRefs="true" inheritall="true"/>
     </target>
 
+    <target name="test-core-mrtez" description="run core tests on both mr and tez mode"
+            depends="setWindowsPath,setLinuxPath,compile-test,jar,debugger.check,jackson-pig-3039-test-download">
+        <fail message="hadoopversion must be set to 23 when invoking test-core-mrtez">
+          <condition>
+            <not>
+              <equals arg1="${hadoopversion}" arg2="23" />
+            </not>
+          </condition>
+        </fail>
+        <echo message="=======================" />
+        <echo message="Running MR tests" />
+        <echo message="=======================" />
+        <propertyreset name="test.exec.type" value="mr" />
+        <propertyreset name="test.log.dir" value="${test.build.dir}/logs/${test.exec.type}" />
+        <macro-test-runner test.file="${test.all.file}" tests.failed="test.mr.failed"/>
+        <echo />
+        <echo message="=======================" />
+        <echo message="Running Tez tests" />
+        <echo message="=======================" />
+        <propertyreset name="test.exec.type" value="tez" />
+        <propertyreset name="test.log.dir" value="${test.build.dir}/logs/${test.exec.type}" />
+        <macro-test-runner test.file="${test.tez.file}" tests.failed="test.tez.failed"/>
+        <condition property="any.tests.failed">
+          <or>
+            <isset property="test.mr.failed"/>
+            <isset property="test.tez.failed"/>
+          </or>
+        </condition>
+        <fail if="any.tests.failed">Tests failed!</fail>
+    </target>
+
     <!-- ================================================================== -->
     <!-- End to end tests                                                   -->
     <!-- ================================================================== -->

Modified: pig/branches/spark/ivy.xml
URL: http://svn.apache.org/viewvc/pig/branches/spark/ivy.xml?rev=1622521&r1=1622520&r2=1622521&view=diff
==============================================================================
--- pig/branches/spark/ivy.xml (original)
+++ pig/branches/spark/ivy.xml Thu Sep  4 18:10:42 2014
@@ -433,19 +433,19 @@
 
     <!-- for Tez integration -->
     <dependency org="org.apache.tez" name="tez" rev="${tez.version}"
-       conf="hadoop23->master" changing="true"/>
+       conf="hadoop23->master"/>
     <dependency org="org.apache.tez" name="tez-common" rev="${tez.version}"
-       conf="hadoop23->master" changing="true"/>
+       conf="hadoop23->master"/>
     <dependency org="org.apache.tez" name="tez-api" rev="${tez.version}"
-       conf="hadoop23->master" changing="true"/>
+       conf="hadoop23->master"/>
     <dependency org="org.apache.tez" name="tez-dag" rev="${tez.version}"
-       conf="hadoop23->master" changing="true"/>
+       conf="hadoop23->master"/>
     <dependency org="org.apache.tez" name="tez-runtime-internals" rev="${tez.version}"
-       conf="hadoop23->master" changing="true"/>
+       conf="hadoop23->master"/>
     <dependency org="org.apache.tez" name="tez-runtime-library" rev="${tez.version}"
-       conf="hadoop23->master" changing="true"/>
+       conf="hadoop23->master"/>
     <dependency org="org.apache.tez" name="tez-mapreduce" rev="${tez.version}"
-       conf="hadoop23->master" changing="true"/>
+       conf="hadoop23->master"/>
     <dependency org="org.apache.commons" name="commons-collections4" rev="${commons-collections4.version}"
       conf="hadoop23->master"/>
     <dependency org="org.codehaus.jettison" name="jettison" rev="${jettison.version}"

Modified: pig/branches/spark/ivy/libraries.properties
URL: http://svn.apache.org/viewvc/pig/branches/spark/ivy/libraries.properties?rev=1622521&r1=1622520&r2=1622521&view=diff
==============================================================================
--- pig/branches/spark/ivy/libraries.properties (original)
+++ pig/branches/spark/ivy/libraries.properties Thu Sep  4 18:10:42 2014
@@ -93,6 +93,6 @@ mockito.version=1.8.4
 jansi.version=1.9
 asm.version=3.3.1
 snappy.version=1.1.0.1
-tez.version=0.5.0-SNAPSHOT
+tez.version=0.5.0
 parquet-pig-bundle.version=1.2.3
 snappy.version=0.2

Modified: pig/branches/spark/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java?rev=1622521&r1=1622520&r2=1622521&view=diff
==============================================================================
--- pig/branches/spark/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java (original)
+++ pig/branches/spark/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java Thu Sep  4 18:10:42 2014
@@ -33,7 +33,6 @@ import org.apache.hadoop.mapreduce.v2.Mi
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.pig.ExecType;
 import org.apache.pig.PigConfiguration;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
 import org.apache.pig.backend.hadoop.executionengine.tez.TezExecType;
 import org.apache.pig.backend.hadoop.executionengine.tez.TezSessionManager;
 import org.apache.tez.dag.api.TezConfiguration;
@@ -97,7 +96,6 @@ public class TezMiniCluster extends Mini
             m_mr.init(m_dfs_conf);
             m_mr.start();
             m_mr_conf = m_mr.getConfig();
-            m_mr_conf.set(MRConfiguration.FRAMEWORK_NAME, "yarn-tez");
             m_mr_conf.set(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
                     System.getProperty("java.class.path"));
             m_mr_conf.set(MRJobConfig.MAP_JAVA_OPTS, "-Xmx2048m");

Modified: pig/branches/spark/src/docs/src/documentation/content/xdocs/test.xml
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/docs/src/documentation/content/xdocs/test.xml?rev=1622521&r1=1622520&r2=1622521&view=diff
==============================================================================
--- pig/branches/spark/src/docs/src/documentation/content/xdocs/test.xml (original)
+++ pig/branches/spark/src/docs/src/documentation/content/xdocs/test.xml Thu Sep  4 18:10:42 2014
@@ -829,8 +829,8 @@ $pig_trunk ant pigunit-jar   
 <!-- +++++++++++++++++++++++++++++++++++++++ -->
     <section>
       <title>Mapreduce Mode</title>
-      <p>PigUnit also runs in Pig's mapreduce mode. Mapreduce mode requires you to use a Hadoop cluster and HDFS installation.
-        It is enabled when the Java system property pigunit.exectype.cluster is set to any value: e.g. -Dpigunit.exectype.cluster=true or System.getProperties().setProperty("pigunit.exectype.cluster", "true"). The cluster you select must be specified in the CLASSPATH (similar to the HADOOP_CONF_DIR variable). 
+      <p>PigUnit also runs in Pig's mapreduce/tez/tez_local mode. Mapreduce/Tez mode requires you to use a Hadoop cluster and HDFS installation.
+        It is enabled when the Java system property pigunit.exectype is set to specific values (mr/tez/tez_local): e.g. -Dpigunit.exectype=mr or System.getProperties().setProperty("pigunit.exectype", "mr"), which means PigUnit will run in mr mode. The cluster you select to run mr/tez test must be specified in the CLASSPATH (similar to the HADOOP_CONF_DIR variable). 
       </p>
     </section>
     </section>

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/DiscreteProbabilitySampleGenerator.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/DiscreteProbabilitySampleGenerator.java?rev=1622521&r1=1622520&r2=1622521&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/DiscreteProbabilitySampleGenerator.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/DiscreteProbabilitySampleGenerator.java Thu Sep  4 18:10:42 2014
@@ -25,7 +25,7 @@ import org.apache.commons.logging.LogFac
 public class DiscreteProbabilitySampleGenerator {
     Random rGen;
     float[] probVec;
-    float epsilon = 0.00001f;
+    float epsilon = 0.0001f;
         
     private static final Log LOG = LogFactory.getLog(DiscreteProbabilitySampleGenerator.class);
     

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1622521&r1=1622520&r2=1622521&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Thu Sep  4 18:10:42 2014
@@ -403,20 +403,6 @@ public class TezDagBuilder extends TezOp
             payloadConf.set(PigProcessor.SORT_VERTEX, tezOp.sortOperator.getOperatorKey().toString());
         }
 
-        String tmp;
-        long maxCombinedSplitSize = 0;
-        if (!tezOp.combineSmallSplits() || pc.getProperties().getProperty(PigConfiguration.PIG_SPLIT_COMBINATION, "true").equals("false"))
-            payloadConf.setBoolean(PigConfiguration.PIG_NO_SPLIT_COMBINATION, true);
-        else if ((tmp = pc.getProperties().getProperty(PigConfiguration.PIG_MAX_COMBINED_SPLIT_SIZE, null)) != null) {
-            try {
-                maxCombinedSplitSize = Long.parseLong(tmp);
-            } catch (NumberFormatException e) {
-                log.warn("Invalid numeric format for pig.maxCombinedSplitSize; use the default maximum combined split size");
-            }
-        }
-        if (maxCombinedSplitSize > 0)
-            payloadConf.setLong("pig.maxCombinedSplitSize", maxCombinedSplitSize);
-
         payloadConf.set("pig.inputs", ObjectSerializer.serialize(tezOp.getLoaderInfo().getInp()));
         payloadConf.set("pig.inpSignatures", ObjectSerializer.serialize(tezOp.getLoaderInfo().getInpSignatureLists()));
         payloadConf.set("pig.inpLimits", ObjectSerializer.serialize(tezOp.getLoaderInfo().getInpLimits()));
@@ -559,6 +545,11 @@ public class TezDagBuilder extends TezOp
             log.info("Estimate quantile for sample aggregation vertex " + tezOp.getOperatorKey().toString());
         }
 
+        // set various parallelism into the job conf for later analysis, PIG-2779
+        payloadConf.setInt("pig.info.reducers.default.parallel", pc.defaultParallel);
+        payloadConf.setInt("pig.info.reducers.requested.parallel", tezOp.getRequestedParallelism());
+        payloadConf.setInt("pig.info.reducers.estimated.parallel", tezOp.getEstimatedParallelism());
+
         // Take our assembled configuration and create a vertex
         UserPayload userPayload = TezUtils.createUserPayloadFromConf(payloadConf);
         procDesc.setUserPayload(userPayload);

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java?rev=1622521&r1=1622520&r2=1622521&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java Thu Sep  4 18:10:42 2014
@@ -56,7 +56,7 @@ public class TezJobCompiler {
     public DAG buildDAG(TezOperPlan tezPlan, Map<String, LocalResource> localResources)
             throws IOException, YarnException {
         String jobName = pigContext.getProperties().getProperty(PigContext.JOB_NAME, "pig");
-        DAG tezDag = new DAG(jobName + "-" + dagIdentifier);
+        DAG tezDag = DAG.create(jobName + "-" + dagIdentifier);
         dagIdentifier++;
         tezDag.setCredentials(new Credentials());
         TezDagBuilder dagBuilder = new TezDagBuilder(pigContext, tezPlan, tezDag, localResources);

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java?rev=1622521&r1=1622520&r2=1622521&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java Thu Sep  4 18:10:42 2014
@@ -24,15 +24,21 @@ import java.io.PrintStream;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.URL;
+import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.OperatorPlan;
+import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.Pair;
 
 /**
  * A Plan used to create the plan of Tez operators which can be converted into
@@ -156,5 +162,65 @@ public class TezOperPlan extends Operato
         return super.disconnect(from, to);
     }
 
+    
+    /**
+     * Move everything below a given operator to the new operator plan.  The specified operator will
+     * be moved and will be the root of the new operator plan
+     * @param root Operator to move everything under including the root operator
+     * @param newPlan new operator plan to move things into
+     * @throws PlanException 
+     */
+    public void moveTree(TezOperator root, TezOperPlan newPlan) throws PlanException {
+        List<TezOperator> list = new ArrayList<TezOperator>();
+        list.add(root);
+        int prevSize = 0;
+        int pos = 0;
+        while (list.size() > prevSize) {
+            prevSize = list.size();
+            TezOperator node = list.get(pos);
+            if (getSuccessors(node)!=null) {
+                for (TezOperator succ : getSuccessors(node)) {
+                    if (!list.contains(succ)) {
+                        list.add(succ);
+                    }
+                }
+            }
+            if (getPredecessors(node)!=null) {
+                for (TezOperator pred : getPredecessors(node)) {
+                    if (!list.contains(pred)) {
+                        list.add(pred);
+                    }
+                }
+            }
+            pos++;
+        }
+
+        for (TezOperator node: list) {
+            newPlan.add(node);
+        }
+
+        Set<Pair<TezOperator, TezOperator>> toReconnect = new HashSet<Pair<TezOperator, TezOperator>>();
+        for (TezOperator from : mFromEdges.keySet()) {
+            List<TezOperator> tos = mFromEdges.get(from);
+            for (TezOperator to : tos) {
+                if (list.contains(from) || list.contains(to)) {
+                    toReconnect.add(new Pair<TezOperator, TezOperator>(from, to));
+                }
+            }
+        }
+
+        for (Pair<TezOperator, TezOperator> pair : toReconnect) {
+            if (list.contains(pair.first) && list.contains(pair.second)) {
+                // Need to reconnect in newPlan
+                TezEdgeDescriptor edge = pair.second.inEdges.get(pair.first.getOperatorKey());
+                TezCompilerUtil.connect(newPlan, pair.first, pair.second, edge);
+            }
+        }
+
+        for (TezOperator node : list) {
+            // Simply remove from plan, don't deal with inEdges/outEdges
+            super.remove(node);
+        }
+    }
 }
 

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/LoaderProcessor.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/LoaderProcessor.java?rev=1622521&r1=1622520&r2=1622521&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/LoaderProcessor.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/LoaderProcessor.java Thu Sep  4 18:10:42 2014
@@ -21,10 +21,13 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.InputFormat;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.pig.LoadFunc;
+import org.apache.pig.PigConfiguration;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
@@ -46,6 +49,7 @@ import org.apache.tez.mapreduce.hadoop.M
 public class LoaderProcessor extends TezOpPlanVisitor {
     private Configuration conf;
     private PigContext pc;
+    private static final Log log = LogFactory.getLog(LoaderProcessor.class);
     public LoaderProcessor(TezOperPlan plan, PigContext pigContext) {
         super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan));
         this.pc = pigContext;
@@ -125,6 +129,19 @@ public class LoaderProcessor extends Tez
             conf.set("pig.inpTargets", ObjectSerializer.serialize(inpTargets));
             conf.set("pig.inpSignatures", ObjectSerializer.serialize(inpSignatureLists));
             conf.set("pig.inpLimits", ObjectSerializer.serialize(inpLimits));
+            String tmp;
+            long maxCombinedSplitSize = 0;
+            if (!tezOp.combineSmallSplits() || pc.getProperties().getProperty(PigConfiguration.PIG_SPLIT_COMBINATION, "true").equals("false"))
+                conf.setBoolean(PigConfiguration.PIG_NO_SPLIT_COMBINATION, true);
+            else if ((tmp = pc.getProperties().getProperty(PigConfiguration.PIG_MAX_COMBINED_SPLIT_SIZE, null)) != null) {
+                try {
+                    maxCombinedSplitSize = Long.parseLong(tmp);
+                } catch (NumberFormatException e) {
+                    log.warn("Invalid numeric format for pig.maxCombinedSplitSize; use the default maximum combined split size");
+                }
+            }
+            if (maxCombinedSplitSize > 0)
+                conf.setLong("pig.maxCombinedSplitSize", maxCombinedSplitSize);
             tezOp.getLoaderInfo().setInpSignatureLists(inpSignatureLists);
             tezOp.getLoaderInfo().setInp(inp);
             tezOp.getLoaderInfo().setInpLimits(inpLimits);

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java?rev=1622521&r1=1622520&r2=1622521&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java Thu Sep  4 18:10:42 2014
@@ -103,11 +103,13 @@ public class TezCompilerUtil {
 
     static public void connect(TezOperPlan plan, TezOperator from, TezOperator to, TezEdgeDescriptor edge) throws PlanException {
         plan.connect(from, to);
-        PhysicalOperator leaf = from.plan.getLeaves().get(0);
-        // It could be POStoreTez incase of sampling job in order by
-        if (leaf instanceof POLocalRearrangeTez) {
-            POLocalRearrangeTez lr = (POLocalRearrangeTez) leaf;
-            lr.setOutputKey(to.getOperatorKey().toString());
+        if (from.plan.getLeaves()!=null && !from.plan.getLeaves().isEmpty()) {
+            PhysicalOperator leaf = from.plan.getLeaves().get(0);
+            // It could be POStoreTez incase of sampling job in order by
+            if (leaf instanceof POLocalRearrangeTez) {
+                POLocalRearrangeTez lr = (POLocalRearrangeTez) leaf;
+                lr.setOutputKey(to.getOperatorKey().toString());
+            }
         }
         // Add edge descriptors to old and new operators
         to.inEdges.put(from.getOperatorKey(), edge);

Modified: pig/branches/spark/src/org/apache/pig/impl/builtin/FindQuantiles.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/builtin/FindQuantiles.java?rev=1622521&r1=1622520&r2=1622521&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/builtin/FindQuantiles.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/builtin/FindQuantiles.java Thu Sep  4 18:10:42 2014
@@ -172,7 +172,7 @@ public class FindQuantiles extends EvalF
                 samples = (DataBag)in.get(1);
             }
             long numSamples = samples.size();
-            long toSkip = numSamples / numQuantiles;
+            double toSkip = (double)numSamples / numQuantiles;
             if(toSkip == 0) {
                 // numSamples is < numQuantiles;
                 // set numQuantiles to numSamples
@@ -180,9 +180,10 @@ public class FindQuantiles extends EvalF
                 toSkip = 1;
             }
             
-            long ind=0, j=-1, nextQuantile = toSkip-1;
+            long ind=0, j=-1;
+            double nextQuantile = toSkip-1;
             for (Tuple it : samples) {
-                if (ind==nextQuantile){
+                if (ind==(long)nextQuantile){
                     ++j;
                     quantilesList.add(it);
                     nextQuantile+=toSkip;

Modified: pig/branches/spark/src/org/apache/pig/impl/plan/OperatorPlan.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/plan/OperatorPlan.java?rev=1622521&r1=1622520&r2=1622521&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/plan/OperatorPlan.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/plan/OperatorPlan.java Thu Sep  4 18:10:42 2014
@@ -26,6 +26,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Deque;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -35,6 +36,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.PigException;
 import org.apache.pig.impl.util.MultiMap;
+import org.apache.pig.impl.util.Pair;
 
 
 //import org.apache.commons.collections.map.MultiValueMap;
@@ -284,42 +286,6 @@ public abstract class OperatorPlan<E ext
             }
         }
     }
-    
-    /**
-     * Move everything below a given operator to the new operator plan.  The specified operator will
-     * be moved and will be the root of the new operator plan
-     * @param root Operator to move everything after
-     * @param newPlan new operator plan to move things into
-     * @throws PlanException 
-     */
-    public void moveTree(E root, OperatorPlan<E> newPlan) throws PlanException {
-        Deque<E> queue = new ArrayDeque<E>();
-        queue.addLast(root);
-        while (!queue.isEmpty()) {
-            E node = queue.poll();
-            if (getSuccessors(node)!=null) {
-                for (E succ : getSuccessors(node)) {
-                    if (!queue.contains(succ)) {
-                        queue.addLast(succ);
-                    }
-                }
-            }
-            newPlan.add(node);
-        }
-
-        for (E from : mFromEdges.keySet()) {
-            if (newPlan.mOps.containsKey(from)) {
-                for (E to : mFromEdges.get(from)) {
-                    if (newPlan.mOps.containsKey(to)) {
-                        newPlan.connect(from, to);
-                    }
-                }
-            }
-        }
-            
-        trimBelow(root);
-        remove(root);
-    }
 
     /**
      * Trim everything above a given operator.  The specified operator will

Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezStats.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezStats.java?rev=1622521&r1=1622520&r2=1622521&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezStats.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezStats.java Thu Sep  4 18:10:42 2014
@@ -218,7 +218,7 @@ public class TezStats extends PigStats {
             if (v != null) {
                 UserPayload payload = v.getProcessorDescriptor().getUserPayload();
                 Configuration conf = TezUtils.createConfFromUserPayload(payload);
-                addVertexStats(name, conf, succeeded, tezJob.getVertexCounters(name));
+                addVertexStats(name, conf, succeeded, v.getParallelism(), tezJob.getVertexCounters(name));
             }
         }
         if (!succeeded) {
@@ -226,12 +226,13 @@ public class TezStats extends PigStats {
         }
     }
 
-    private void addVertexStats(String tezOpName, Configuration conf, boolean succeeded,
+    private void addVertexStats(String tezOpName, Configuration conf, boolean succeeded, int parallelism,
             Map<String, Map<String, Long>> map) {
         TezTaskStats stats = tezOpVertexMap.get(tezOpName);
         stats.setConf(conf);
         stats.setId(tezOpName);
         stats.setSuccessful(succeeded);
+        stats.setParallelism(parallelism);
         if (map == null) {
             if (stats.hasLoadOrStore()) {
                 LOG.warn("Unable to get input(s)/output(s) of the job");

Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezTaskStats.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezTaskStats.java?rev=1622521&r1=1622520&r2=1622521&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezTaskStats.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezTaskStats.java Thu Sep  4 18:10:42 2014
@@ -29,6 +29,7 @@ public class TezTaskStats extends JobSta
     private static final Log LOG = LogFactory.getLog(TezTaskStats.class);
 
     private String vertexName;
+    private int parallelism;
     private List<POStore> stores = null;
     private List<FileSpec> loads = null;
 
@@ -40,6 +41,10 @@ public class TezTaskStats extends JobSta
         this.vertexName = vertexName;
     }
 
+    public void setParallelism(int p) {
+        this.parallelism = p;
+    }
+
     @Override
     public String getJobId() {
         return (vertexName == null) ? "" : vertexName;
@@ -253,6 +258,10 @@ public class TezTaskStats extends JobSta
         throw new UnsupportedOperationException();
     }
 
+    public int getParallelism() {
+        return parallelism;
+    }
+
     public boolean hasLoadOrStore() {
         if ((loads != null && !loads.isEmpty())
                 || (stores != null && !stores.isEmpty())) {

Propchange: pig/branches/spark/src/pig-default.properties
------------------------------------------------------------------------------
  Merged /pig/trunk/src/pig-default.properties:r1621676-1622520

Modified: pig/branches/spark/test/org/apache/pig/backend/hadoop/accumulo/TestAccumuloPigCluster.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/backend/hadoop/accumulo/TestAccumuloPigCluster.java?rev=1622521&r1=1622520&r2=1622521&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/backend/hadoop/accumulo/TestAccumuloPigCluster.java (original)
+++ pig/branches/spark/test/org/apache/pig/backend/hadoop/accumulo/TestAccumuloPigCluster.java Thu Sep  4 18:10:42 2014
@@ -33,12 +33,10 @@ import org.apache.accumulo.core.data.Mut
 import org.apache.accumulo.minicluster.MiniAccumuloCluster;
 import org.apache.accumulo.minicluster.MiniAccumuloConfig;
 import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.log4j.Logger;
-import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.test.MiniCluster;
+import org.apache.pig.test.Util;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
@@ -89,7 +87,7 @@ public class TestAccumuloPigCluster {
 
     @Before
     public void beforeTest() throws Exception {
-        pig = new PigServer(ExecType.LOCAL);
+        pig = new PigServer(Util.getLocalTestMode());
     }
 
     @AfterClass

Modified: pig/branches/spark/test/org/apache/pig/builtin/TestBigTypeSort.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/builtin/TestBigTypeSort.java?rev=1622521&r1=1622520&r2=1622521&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/builtin/TestBigTypeSort.java (original)
+++ pig/branches/spark/test/org/apache/pig/builtin/TestBigTypeSort.java Thu Sep  4 18:10:42 2014
@@ -28,10 +28,10 @@ import java.math.BigInteger;
 import java.util.Iterator;
 import java.util.List;
 
-import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
 import org.apache.pig.builtin.mock.Storage.Data;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.test.Util;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -42,7 +42,7 @@ public class TestBigTypeSort {
 
 	@Before
 	public void setUp() throws Exception {
-		pigServer = new PigServer(ExecType.LOCAL);
+		pigServer = new PigServer(Util.getLocalTestMode());
 	}
 
 	@Test

Modified: pig/branches/spark/test/org/apache/pig/builtin/TestCurrentTime.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/builtin/TestCurrentTime.java?rev=1622521&r1=1622520&r2=1622521&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/builtin/TestCurrentTime.java (original)
+++ pig/branches/spark/test/org/apache/pig/builtin/TestCurrentTime.java Thu Sep  4 18:10:42 2014
@@ -26,12 +26,10 @@ import static org.junit.Assert.assertTru
 import java.util.Iterator;
 import java.util.List;
 
-import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
-import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.builtin.mock.Storage.Data;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.test.Util;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -42,7 +40,7 @@ public class TestCurrentTime {
 
     @Before
     public void setUp() throws Exception {
-        pigServer = new PigServer(ExecType.LOCAL);
+        pigServer = new PigServer(Util.getLocalTestMode());
     }
 
     @Test

Modified: pig/branches/spark/test/org/apache/pig/builtin/TestInvokerGenerator.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/builtin/TestInvokerGenerator.java?rev=1622521&r1=1622520&r2=1622521&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/builtin/TestInvokerGenerator.java (original)
+++ pig/branches/spark/test/org/apache/pig/builtin/TestInvokerGenerator.java Thu Sep  4 18:10:42 2014
@@ -26,11 +26,11 @@ import java.util.List;
 import java.util.Random;
 import java.util.Set;
 
-import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
 import org.apache.pig.builtin.mock.Storage.Data;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.util.Utils;
+import org.apache.pig.test.Util;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -43,7 +43,7 @@ public class TestInvokerGenerator {
 
     @Before
     public void setUp() throws Exception {
-        pigServer = new PigServer(ExecType.LOCAL);
+        pigServer = new PigServer(Util.getLocalTestMode());
         r = new Random(42L);
     }
 

Modified: pig/branches/spark/test/org/apache/pig/pigunit/MiniClusterRunner.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/pigunit/MiniClusterRunner.java?rev=1622521&r1=1622520&r2=1622521&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/pigunit/MiniClusterRunner.java (original)
+++ pig/branches/spark/test/org/apache/pig/pigunit/MiniClusterRunner.java Thu Sep  4 18:10:42 2014
@@ -12,7 +12,7 @@
  */
 package org.apache.pig.pigunit;
 
-import org.apache.pig.test.MiniCluster;
+import org.apache.pig.test.MiniGenericCluster;
 
 
 /**
@@ -41,6 +41,6 @@ import org.apache.pig.test.MiniCluster;
 public class MiniClusterRunner {
   public static void main(String[] args) {
     System.setProperty("hadoop.log.dir", "/tmp/pigunit");
-    MiniCluster.buildCluster();
+    MiniGenericCluster.buildCluster();
   }
 }

Modified: pig/branches/spark/test/org/apache/pig/pigunit/PigTest.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/pigunit/PigTest.java?rev=1622521&r1=1622520&r2=1622521&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/pigunit/PigTest.java (original)
+++ pig/branches/spark/test/org/apache/pig/pigunit/PigTest.java Thu Sep  4 18:10:42 2014
@@ -31,6 +31,8 @@ import junit.framework.Assert;
 import org.apache.commons.lang.StringUtils;
 import org.apache.log4j.Logger;
 import org.apache.pig.ExecType;
+import org.apache.pig.ExecTypeProvider;
+import org.apache.pig.PigException;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
@@ -62,7 +64,7 @@ public class PigTest {
   private static ThreadLocal<Cluster> cluster = new ThreadLocal<Cluster>();
 
   private static final Logger LOG = Logger.getLogger(PigTest.class);
-  private static final String EXEC_CLUSTER = "pigunit.exectype.cluster";
+  private static final String EXEC_CLUSTER = "pigunit.exectype";
 
   /**
    * Initializes the Pig test.
@@ -120,16 +122,30 @@ public class PigTest {
    * @throws ExecException If the PigServer can't be started.
    */
   public static Cluster getCluster() throws ExecException {
-    if (cluster.get() == null) {
-      if (System.getProperties().containsKey(EXEC_CLUSTER)) {
-        LOG.info("Using cluster mode");
-        pig.set(new PigServer(ExecType.MAPREDUCE));
-      } else {
-        LOG.info("Using default local mode");
-        pig.set(new PigServer(ExecType.LOCAL));
+    try {
+      if (cluster.get() == null) {
+        ExecType execType = ExecType.LOCAL;
+        if (System.getProperties().containsKey(EXEC_CLUSTER)) {
+          if (System.getProperties().getProperty(EXEC_CLUSTER).equalsIgnoreCase("mr")) {
+            LOG.info("Using mr cluster mode");
+            execType = ExecType.MAPREDUCE;
+          } else if (System.getProperties().getProperty(EXEC_CLUSTER).equalsIgnoreCase("tez")) {
+            LOG.info("Using tez cluster mode");
+            execType = ExecTypeProvider.fromString("tez");
+          } else if (System.getProperties().getProperty(EXEC_CLUSTER).equalsIgnoreCase("tez_local")) {
+            LOG.info("Using tez local mode");
+            execType = ExecTypeProvider.fromString("tez_local");
+          } else {
+            LOG.info("Using default local mode");
+          }
+        } else {
+          LOG.info("Using default local mode");
+        }
+        pig.set(new PigServer(execType));
+        cluster.set(new Cluster(pig.get().getPigContext()));
       }
-
-      cluster.set(new Cluster(pig.get().getPigContext()));
+    } catch (PigException e) {
+      throw new ExecException(e);
     }
 
     return cluster.get();

Modified: pig/branches/spark/test/org/apache/pig/test/TestFindQuantiles.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestFindQuantiles.java?rev=1622521&r1=1622520&r2=1622521&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestFindQuantiles.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestFindQuantiles.java Thu Sep  4 18:10:42 2014
@@ -20,13 +20,18 @@ package org.apache.pig.test;
 
 import static org.junit.Assert.assertTrue;
 
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
+import java.util.Random;
 
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.DiscreteProbabilitySampleGenerator;
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.InternalMap;
+import org.apache.pig.data.NonSpillableDataBag;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.builtin.FindQuantiles;
@@ -35,7 +40,7 @@ import org.junit.Test;
 public class TestFindQuantiles {
     
     private static TupleFactory tFact = TupleFactory.getInstance();
-    private static final float epsilon = 0.00001f;
+    private static final float epsilon = 0.0001f;
     
     @Test
     public void testFindQuantiles() throws Exception {
@@ -43,7 +48,7 @@ public class TestFindQuantiles {
        final int numReducers = 1009;
        float sum = getProbVecSum(numSamples, numReducers);
        System.out.println("sum: " + sum);
-       assertTrue(sum > (1+epsilon));
+       assertTrue(sum > (1-epsilon) && sum < (1+epsilon));
     }
     
     @Test
@@ -52,9 +57,34 @@ public class TestFindQuantiles {
        final int numReducers = 3000;
        float sum = getProbVecSum(numSamples, numReducers);
        System.out.println("sum: " + sum);
-       assertTrue(sum < (1-epsilon));
+       assertTrue(sum > (1-epsilon) && sum < (1+epsilon));
     }
-    
+
+    @Test
+    public void testFindQuantilesRemainder() throws Exception {
+       final int numSamples = 1900;
+       final int numReducers = 300;
+       DataBag samples = generateRandomSortedSamples(numSamples, 365);
+       Map<String, Object> findQuantilesResult = getFindQuantilesResult(samples, numReducers);
+       DataBag quantilesBag = (DataBag)findQuantilesResult.get(FindQuantiles.QUANTILES_LIST);
+       Iterator<Tuple> iter = quantilesBag.iterator();
+       Tuple lastQuantile = null;
+       while (iter.hasNext()) {
+           lastQuantile = iter.next();
+       }
+       int lastQuantileNum = (Integer)lastQuantile.get(0);
+       int count = 0;
+       iter = samples.iterator();
+       while (iter.hasNext()) {
+           Tuple t = iter.next();
+           int num = (Integer)t.get(0);
+           if (num >= lastQuantileNum) {
+               count++;
+           }
+       }
+       assertTrue((double)count/numSamples <= 1.0/365 + 0.001);
+    }
+
     private float[] getProbVec(Tuple values) throws Exception {
         float[] probVec = new float[values.size()];        
         for(int i = 0; i < values.size(); i++) {
@@ -62,22 +92,46 @@ public class TestFindQuantiles {
         }
         return probVec;
     }
-    
-    private float getProbVecSum(int numSamples, int numReduceres) throws Exception {
-        Tuple in = tFact.newTuple(2);
+
+    private DataBag generateRandomSortedSamples(int numSamples, int max) throws Exception {
+        Random rand = new Random(1000);
+        List<Tuple> samples = new ArrayList<Tuple>(); 
+        for (int i=0; i<numSamples; i++) {
+            Tuple t = tFact.newTuple(1);
+            t.set(0, rand.nextInt(max));
+            samples.add(t);
+        }
+        Collections.sort(samples);
+        return new NonSpillableDataBag(samples);
+    }
+
+    private DataBag generateUniqueSamples(int numSamples) throws Exception {
         DataBag samples = BagFactory.getInstance().newDefaultBag(); 
         for (int i=0; i<numSamples; i++) {
             Tuple t = tFact.newTuple(1);
             t.set(0, new Integer(23));
             samples.add(t);
         }
+        return samples;
+    }
+
+    private Map<String, Object> getFindQuantilesResult(DataBag samples,
+            int numReduceres) throws Exception {
+        Tuple in = tFact.newTuple(2);
+
         in.set(0, new Integer(numReduceres));
         in.set(1, samples);
         
         FindQuantiles fq = new FindQuantiles();
         
         Map<String, Object> res = fq.exec(in);
-        
+        return res;
+    }
+
+    private float getProbVecSum(int numSamples, int numReduceres) throws Exception {
+        DataBag samples = generateUniqueSamples(numSamples);
+        Map<String, Object> res = getFindQuantilesResult(samples, numReduceres);
+
         InternalMap weightedPartsData = (InternalMap) res.get(FindQuantiles.WEIGHTED_PARTS);
         Iterator<Object> it = weightedPartsData.values().iterator();
         float[] probVec = getProbVec((Tuple)it.next());

Modified: pig/branches/spark/test/org/apache/pig/test/TestGroupConstParallel.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestGroupConstParallel.java?rev=1622521&r1=1622520&r2=1622521&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestGroupConstParallel.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestGroupConstParallel.java Thu Sep  4 18:10:42 2014
@@ -17,44 +17,32 @@
  */
 package org.apache.pig.test;
 
-import static org.junit.Assert.assertEquals;
-
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.jobcontrol.Job;
-import org.apache.hadoop.mapred.jobcontrol.JobControl;
-import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
-import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.util.ConfigurationValidator;
 import org.apache.pig.newplan.OperatorPlan;
 import org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer;
 import org.apache.pig.newplan.logical.rules.GroupByConstParallelSetter;
 import org.apache.pig.newplan.optimizer.Rule;
-import org.apache.pig.test.utils.GenPhyOp;
-import org.apache.pig.tools.pigstats.JobStats;
 import org.apache.pig.tools.pigstats.PigStats;
 import org.apache.pig.tools.pigstats.PigStats.JobGraph;
-import org.apache.pig.tools.pigstats.mapreduce.MRJobStats;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
+import org.junit.Ignore;
 import org.junit.Test;
 
-public class TestGroupConstParallel {
+@Ignore
+public abstract class TestGroupConstParallel {
 
     private static final String INPUT_FILE = "TestGroupConstParallelInp";
-    private static MiniCluster cluster = MiniCluster.buildCluster();
+    private static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
 
     
     @BeforeClass
@@ -79,7 +67,7 @@ public class TestGroupConstParallel {
      */
     @Test
     public void testGroupAllWithParallel() throws Exception {
-        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster
+        PigServer pigServer = new PigServer(cluster.getExecType(), cluster
                 .getProperties());
         
         
@@ -95,42 +83,30 @@ public class TestGroupConstParallel {
             Util.checkQueryOutputsAfterSort(iter, expectedRes);
             
             JobGraph jGraph = PigStats.get().getJobGraph();
-            assertEquals(1, jGraph.size());
-            // find added map-only concatenate job 
-            MRJobStats js = (MRJobStats)jGraph.getSources().get(0);
-            assertEquals(1, js.getNumberMaps());   
-            assertEquals(1, js.getNumberReduces()); 
+            checkGroupAllWithParallelGraphResult(jGraph);
         }
-
     }
-    
-    
+
+    abstract protected void checkGroupAllWithParallelGraphResult(JobGraph jGraph);
+
     /**
      * Test parallelism for group by constant
      * @throws Throwable
      */
     @Test
     public void testGroupConstWithParallel() throws Throwable {
-        PigContext pc = new PigContext(ExecType.MAPREDUCE, cluster.getProperties());
+        PigContext pc = new PigContext(cluster.getExecType(), cluster.getProperties());
         pc.defaultParallel = 100;
         pc.connect();
         
-        String query = "a = load 'input';\n" + "b = group a by 1;" + "store b into 'output';";
-        PigServer pigServer = new PigServer( ExecType.MAPREDUCE, cluster.getProperties() );
+        String query = "a = load '" + INPUT_FILE + "';\n" + "b = group a by 1;" + "store b into 'output';";
+        PigServer pigServer = new PigServer( cluster.getExecType(), cluster.getProperties() );
         PhysicalPlan pp = Util.buildPp( pigServer, query );
-        
-        MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
 
-        ConfigurationValidator.validatePigProperties(pc.getProperties());
-        Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties());
-        JobControlCompiler jcc = new JobControlCompiler(pc, conf);
-        
-        JobControl jobControl = jcc.compile(mrPlan, "Test");
-        Job job = jobControl.getWaitingJobs().get(0);
-        int parallel = job.getJobConf().getNumReduceTasks();
-
-        assertEquals("parallism", 1, parallel);
+        checkGroupConstWithParallelResult(pp, pc);
     }
+
+    abstract protected void checkGroupConstWithParallelResult(PhysicalPlan pp, PigContext pc) throws Exception;
     
     /**
      *  Test parallelism for group by column
@@ -138,27 +114,20 @@ public class TestGroupConstParallel {
      */
     @Test
     public void testGroupNonConstWithParallel() throws Throwable {
-        PigContext pc = new PigContext(ExecType.MAPREDUCE, cluster.getProperties());
+        PigContext pc = new PigContext(cluster.getExecType(), cluster.getProperties());
         pc.defaultParallel = 100;
         pc.connect();
         
-        PigServer pigServer = new PigServer( ExecType.MAPREDUCE, cluster.getProperties() );
-        String query =  "a = load 'input';\n" + "b = group a by $0;" + "store b into 'output';";
+        PigServer pigServer = new PigServer( cluster.getExecType(), cluster.getProperties() );
+        String query =  "a = load '" + INPUT_FILE + "';\n" + "b = group a by $0;" + "store b into 'output';";
         
         PhysicalPlan pp = Util.buildPp( pigServer, query );
-        MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
 
-        ConfigurationValidator.validatePigProperties(pc.getProperties());
-        Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties());
-        JobControlCompiler jcc = new JobControlCompiler(pc, conf);
-        
-        JobControl jobControl = jcc.compile(mrPlan, "Test");
-        Job job = jobControl.getWaitingJobs().get(0);
-        int parallel = job.getJobConf().getNumReduceTasks();
-        
-        assertEquals("parallism", 100, parallel);
+        checkGroupNonConstWithParallelResult(pp, pc);
     }
 
+    abstract protected void checkGroupNonConstWithParallelResult(PhysicalPlan pp, PigContext pc) throws Exception;
+
     public class MyPlanOptimizer extends LogicalPlanOptimizer {
 
         protected MyPlanOptimizer(OperatorPlan p,  int iterations) {

Modified: pig/branches/spark/test/org/apache/pig/test/TestJobSubmission.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestJobSubmission.java?rev=1622521&r1=1622520&r2=1622521&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestJobSubmission.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestJobSubmission.java Thu Sep  4 18:10:42 2014
@@ -37,31 +37,28 @@ import org.apache.hadoop.hbase.MiniHBase
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.mapred.jobcontrol.Job;
 import org.apache.hadoop.mapred.jobcontrol.JobControl;
-import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
-import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
-import org.apache.pig.data.DataType;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.util.ConfigurationValidator;
 import org.apache.pig.test.utils.GenPhyOp;
 import org.junit.After;
 import org.junit.AfterClass;
+import org.junit.Assume;
 import org.junit.Before;
 import org.junit.BeforeClass;
+import org.junit.Ignore;
 import org.junit.Test;
 
-public class TestJobSubmission {
+@Ignore
+abstract public class TestJobSubmission {
 
 
     static PigContext pc;
@@ -75,11 +72,11 @@ public class TestJobSubmission {
     String curDir;
     String inpDir;
     String golDir;
-    static MiniCluster cluster = MiniCluster.buildCluster();
+    static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
 
     @BeforeClass
     public static void onetimeSetUp() throws Exception {
-        pc = new PigContext(ExecType.MAPREDUCE, cluster.getProperties());
+        pc = new PigContext(cluster.getExecType(), cluster.getProperties());
         try {
             pc.connect();
         } catch (ExecException e) {
@@ -115,63 +112,35 @@ public class TestJobSubmission {
 
     @Test
     public void testJobControlCompilerErr() throws Exception {
-        String query = "a = load 'input';" + "b = order a by $0;" + "store b into 'output';";
-        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        String query = "a = load '/passwd' as (a1:bag{(t:chararray)});" + "b = order a by a1;" + "store b into 'output';";
+        PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
         PhysicalPlan pp = Util.buildPp(pigServer, query);
-        POStore store = GenPhyOp.dummyPigStorageOp();
-        pp.addAsLeaf(store);
-        MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
-
-        for(MapReduceOper mro: mrPlan.getLeaves()) {
-            if(mro.reducePlan != null) {
-                PhysicalOperator po = mro.reducePlan.getRoots().get(0);
-                if (po instanceof POPackage) {
-                    ((POPackage) po).getPkgr().setKeyType(DataType.BAG);
-                    mro.setGlobalSort(true);
-                }
-            }
-        }
-
-        ConfigurationValidator.validatePigProperties(pc.getProperties());
-        Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties());
-        JobControlCompiler jcc = new JobControlCompiler(pc, conf);
-        try {
-            jcc.compile(mrPlan, "Test");
-        } catch (JobCreationException jce) {
-            assertTrue(jce.getErrorCode() == 1068);
-        }
+        checkJobControlCompilerErrResult(pp, pc);
     }
 
+    abstract protected void checkJobControlCompilerErrResult(PhysicalPlan pp, PigContext pc) throws Exception;
+
     @Test
     public void testDefaultParallel() throws Throwable {
         pc.defaultParallel = 100;
 
-        String query = "a = load 'input';" + "b = group a by $0;" + "store b into 'output';";
-        PigServer ps = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        String query = "a = load '/passwd';" + "b = group a by $0;" + "store b into 'output';";
+        PigServer ps = new PigServer(cluster.getExecType(), cluster.getProperties());
         PhysicalPlan pp = Util.buildPp(ps, query);
-        MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
-
-        ConfigurationValidator.validatePigProperties(pc.getProperties());
-        Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties());
-        JobControlCompiler jcc = new JobControlCompiler(pc, conf);
-
-        JobControl jobControl = jcc.compile(mrPlan, "Test");
-        Job job = jobControl.getWaitingJobs().get(0);
-        int parallel = job.getJobConf().getNumReduceTasks();
-
-        assertEquals(100, parallel);
-        Util.assertParallelValues(100, -1, -1, 100, job.getJobConf());
+        checkDefaultParallelResult(pp, pc);
 
         pc.defaultParallel = -1;
     }
 
+    abstract protected void checkDefaultParallelResult(PhysicalPlan pp, PigContext pc) throws Exception;
+
     @Test
     public void testDefaultParallelInSort() throws Throwable {
         // default_parallel is considered only at runtime, so here we only test requested parallel
         // more thorough tests can be found in TestNumberOfReducers.java
 
         String query = "a = load 'input';" + "b = order a by $0 parallel 100;" + "store b into 'output';";
-        PigServer ps = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        PigServer ps = new PigServer(cluster.getExecType(), cluster.getProperties());
         PhysicalPlan pp = Util.buildPp(ps, query);
         MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
 
@@ -198,7 +167,7 @@ public class TestJobSubmission {
                 "b = load 'input';" +
                 "c = join a by $0, b by $0 using 'skewed' parallel 100;" +
                 "store c into 'output';";
-        PigServer ps = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        PigServer ps = new PigServer(cluster.getExecType(), cluster.getProperties());
         PhysicalPlan pp = Util.buildPp(ps, query);
         MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
 
@@ -219,6 +188,10 @@ public class TestJobSubmission {
 
     @Test
     public void testReducerNumEstimation() throws Exception{
+        // Skip the test for Tez. Tez use a different mechanism.
+        // Equivalent test is in TestTezAutoParallelism
+        Assume.assumeTrue("Skip this test for TEZ",
+                Util.isMapredExecType(cluster.getExecType()));
         // use the estimation
         Configuration conf = HBaseConfiguration.create(new Configuration());
         HBaseTestingUtility util = new HBaseTestingUtility(conf);
@@ -228,7 +201,7 @@ public class TestJobSubmission {
         String query = "a = load '/passwd';" +
                 "b = group a by $0;" +
                 "store b into 'output';";
-        PigServer ps = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        PigServer ps = new PigServer(cluster.getExecType(), cluster.getProperties());
         PhysicalPlan pp = Util.buildPp(ps, query);
         MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
 
@@ -295,6 +268,10 @@ public class TestJobSubmission {
 
     @Test
     public void testReducerNumEstimationForOrderBy() throws Exception{
+        // Skip the test for Tez. Tez use a different mechanism.
+        // Equivalent test is in TestTezAutoParallelism
+        Assume.assumeTrue("Skip this test for TEZ",
+                Util.isMapredExecType(cluster.getExecType()));
         // use the estimation
         pc.getProperties().setProperty("pig.exec.reducers.bytes.per.reducer", "100");
         pc.getProperties().setProperty("pig.exec.reducers.max", "10");
@@ -302,7 +279,7 @@ public class TestJobSubmission {
         String query = "a = load '/passwd';" +
                 "b = order a by $0;" +
                 "store b into 'output';";
-        PigServer ps = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        PigServer ps = new PigServer(cluster.getExecType(), cluster.getProperties());
         PhysicalPlan pp = Util.buildPp(ps, query);
 
         MROperPlan mrPlan = Util.buildMRPlanWithOptimizer(pp, pc);

Modified: pig/branches/spark/test/org/apache/pig/test/TestMergeJoin.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestMergeJoin.java?rev=1622521&r1=1622520&r2=1622521&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestMergeJoin.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestMergeJoin.java Thu Sep  4 18:10:42 2014
@@ -53,14 +53,14 @@ public class TestMergeJoin {
     private static final String INPUT_FILE = "testMergeJoinInput.txt";
     private static final String INPUT_FILE2 = "testMergeJoinInput2.txt";
     private PigServer pigServer;
-    private static MiniCluster cluster = MiniCluster.buildCluster();
+    private static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
 
     public TestMergeJoin() throws ExecException{
 
         Properties props = cluster.getProperties();
         props.setProperty(MRConfiguration.MAP_MAX_ATTEMPTS, "1");
         props.setProperty(MRConfiguration.REDUCE_MAX_ATTEMPTS, "1");
-        pigServer = new PigServer(ExecType.MAPREDUCE, props);
+        pigServer = new PigServer(cluster.getExecType(), props);
     }
     /**
      * @throws java.lang.Exception

Modified: pig/branches/spark/test/org/apache/pig/test/TestNativeMapReduce.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestNativeMapReduce.java?rev=1622521&r1=1622520&r2=1622521&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestNativeMapReduce.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestNativeMapReduce.java Thu Sep  4 18:10:42 2014
@@ -27,10 +27,11 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 
-import org.apache.pig.ExecType;
+import org.apache.hadoop.mapred.FileAlreadyExistsException;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecJob;
 import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS;
+import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.tools.pigstats.PigStats;
 import org.junit.AfterClass;
@@ -58,7 +59,7 @@ public class TestNativeMapReduce  {
      * file if specified will be skipped by the wordcount udf
      */
     final static String STOPWORD_FILE = "TestNMapReduceStopwFile";
-    static MiniCluster cluster = MiniCluster.buildCluster();
+    static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
     private PigServer pigServer = null;
 
     /**
@@ -97,7 +98,7 @@ public class TestNativeMapReduce  {
 
     @Before
     public void setUp() throws Exception{
-        pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
 
         //createWordCountJar();
     }
@@ -206,6 +207,9 @@ public class TestNativeMapReduce  {
 
             assertTrue("job failed", PigStats.get().getReturnCode() != 0);
 
+        } catch (JobCreationException e) {
+            // Running in Tez mode throw exception
+            assertTrue(e.getCause() instanceof FileAlreadyExistsException);
         }
         finally{
             // We have to manually delete intermediate mapreduce files

Modified: pig/branches/spark/test/org/apache/pig/test/TestPigProgressReporting.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestPigProgressReporting.java?rev=1622521&r1=1622520&r2=1622521&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestPigProgressReporting.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestPigProgressReporting.java Thu Sep  4 18:10:42 2014
@@ -32,7 +32,7 @@ import org.junit.Test;
 
 public class TestPigProgressReporting {
 
-    static MiniCluster cluster = MiniCluster.buildCluster();
+    static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
 
     @Test
     public void testProgressReportingWithStatusMessage() throws Exception {
@@ -46,7 +46,7 @@ public class TestPigProgressReporting {
 
             Util.createInputFile(cluster, "a.txt", new String[] { "dummy"});
 
-            PigServer pig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+            PigServer pig = new PigServer(cluster.getExecType(), cluster.getProperties());
 
             String filename = prepareTempFile();
             filename = filename.replace("\\", "\\\\");

Modified: pig/branches/spark/test/org/apache/pig/test/Util.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/Util.java?rev=1622521&r1=1622520&r2=1622521&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/Util.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/Util.java Thu Sep  4 18:10:42 2014
@@ -58,6 +58,7 @@ import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import org.apache.log4j.SimpleLayout;
 import org.apache.pig.ExecType;
+import org.apache.pig.ExecTypeProvider;
 import org.apache.pig.LoadCaster;
 import org.apache.pig.PigException;
 import org.apache.pig.PigServer;
@@ -735,7 +736,7 @@ public class Util {
         }
         if (context.getExecType() == ExecType.MAPREDUCE || context.getExecType().name().equals("TEZ")) {
             return FileLocalizer.hadoopify(filename, context);
-        } else if (context.getExecType() == ExecType.LOCAL) {
+        } else if (context.getExecType().isLocal()) {
             return filename;
         } else {
             throw new IllegalStateException("ExecType: " + context.getExecType());
@@ -1269,7 +1270,7 @@ public class Util {
         assertConfLong(conf, MRConfiguration.REDUCE_TASKS, runtimeParallel);
     }
 
-    private static void assertConfLong(Configuration conf, String param, long expected) {
+    public static void assertConfLong(Configuration conf, String param, long expected) {
         assertEquals("Unexpected value found in configs for " + param, expected, conf.getLong(param, -1));
     }
 
@@ -1346,4 +1347,13 @@ public class Util {
         }
         return jarNames[0];
     }
+
+    public static ExecType getLocalTestMode() throws Exception {
+        String execType = System.getProperty("test.exec.type");
+        if (execType!=null && execType.equals("tez")) {
+            return ExecTypeProvider.fromString("tez_local");
+        } else {
+            return ExecTypeProvider.fromString("local");
+        }
+    }
 }

Propchange: pig/branches/spark/test/org/apache/pig/test/data/bzipdir1.bz2/bzipdir2.bz2/recordLossblockHeaderEndsAt136500.txt.bz2
------------------------------------------------------------------------------
  Merged /pig/trunk/test/org/apache/pig/test/data/bzipdir1.bz2/bzipdir2.bz2/recordLossblockHeaderEndsAt136500.txt.bz2:r1621676-1622520

Modified: pig/branches/spark/test/org/apache/pig/test/pigunit/TestPigTest.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/pigunit/TestPigTest.java?rev=1622521&r1=1622520&r2=1622521&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/pigunit/TestPigTest.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/pigunit/TestPigTest.java Thu Sep  4 18:10:42 2014
@@ -27,10 +27,10 @@ import org.apache.commons.lang.StringUti
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
-import org.apache.pig.ExecType;
 import org.apache.pig.pigunit.Cluster;
 import org.apache.pig.pigunit.PigTest;
 import org.apache.pig.pigunit.pig.PigServer;
+import org.apache.pig.test.Util;
 import org.apache.pig.tools.parameters.ParseException;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
@@ -56,7 +56,8 @@ public class TestPigTest {
     private static final Log LOG = LogFactory.getLog(TestPigTest.class);
 
     @BeforeClass
-    public static void setUpOnce() throws IOException {
+    public static void setUpOnce() throws Exception {
+        System.getProperties().setProperty("pigunit.exectype", Util.getLocalTestMode().toString());
         cluster = PigTest.getCluster();
 
         cluster.update(
@@ -366,12 +367,11 @@ public class TestPigTest {
 
     /**
      * This is a test for default bootup. PIG-2456
-     *
-     * @throws IOException
+     * @throws Exception 
      */
 
     @Test
-    public void testDefaultBootup() throws ParseException, IOException {
+    public void testDefaultBootup() throws Exception {
         // Test with properties file
         String pigProps = "pig.properties";
         String bootupPath = "/tmp/.temppigbootup";
@@ -414,13 +414,7 @@ public class TestPigTest {
 
         // Create a pigunit.pig.PigServer and Cluster to run this test.
         PigServer pig = null;
-        if (System.getProperties().containsKey("pigunit.exectype.cluster")) {
-            LOG.info("Using cluster mode");
-            pig = new PigServer(ExecType.MAPREDUCE);
-        } else {
-            LOG.info("Using default local mode");
-            pig = new PigServer(ExecType.LOCAL);
-        }
+        pig = new PigServer(Util.getLocalTestMode());
 
         final Cluster cluster = new Cluster(pig.getPigContext());
 

Modified: pig/branches/spark/test/org/apache/pig/test/pigunit/pig/TestGruntParser.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/pigunit/pig/TestGruntParser.java?rev=1622521&r1=1622520&r2=1622521&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/pigunit/pig/TestGruntParser.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/pigunit/pig/TestGruntParser.java Thu Sep  4 18:10:42 2014
@@ -19,10 +19,10 @@ import java.util.Map;
 
 import junit.framework.Assert;
 
-import org.apache.pig.ExecType;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.pigunit.pig.GruntParser;
 import org.apache.pig.pigunit.pig.PigServer;
+import org.apache.pig.test.Util;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -33,12 +33,12 @@ public class TestGruntParser {
 
   @SuppressWarnings("serial")
   @Before
-  public void setUp() throws ExecException {
+  public void setUp() throws Exception {
     override = new HashMap<String, String>() {{
       put("STORE", "");
       put("DUMP", "");
     }};
-    PigServer pigServer = new PigServer(ExecType.LOCAL);
+    PigServer pigServer = new PigServer(Util.getLocalTestMode());
     parser = new GruntParser(new StringReader(""), pigServer, override);
   }
 

Modified: pig/branches/spark/test/org/apache/pig/test/utils/ReportingUDF.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/utils/ReportingUDF.java?rev=1622521&r1=1622520&r2=1622521&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/utils/ReportingUDF.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/utils/ReportingUDF.java Thu Sep  4 18:10:42 2014
@@ -22,6 +22,7 @@ import java.io.IOException;
 
 import org.apache.pig.EvalFunc;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.tools.pigstats.PigStatusReporter;
 
 public class ReportingUDF extends EvalFunc<Integer> {
 
@@ -30,7 +31,8 @@ public class ReportingUDF extends EvalFu
         
         try {
             Thread.sleep(7500);
-            getReporter().progress("Progressing");
+            PigStatusReporter reporter = PigStatusReporter.getInstance();
+            reporter.progress();
             Thread.sleep(7500);
         } catch (InterruptedException e) {
         }

Modified: pig/branches/spark/test/tez-tests
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/tez-tests?rev=1622521&r1=1622520&r2=1622521&view=diff
==============================================================================
--- pig/branches/spark/test/tez-tests (original)
+++ pig/branches/spark/test/tez-tests Thu Sep  4 18:10:42 2014
@@ -61,3 +61,12 @@
 **/TestTezCompiler.java
 **/TestTezJobControlCompiler.java
 **/TestTezLauncher.java
+**/TestAccumuloPigCluster.java
+**/TestBigTypeSort.java
+**/TestCurrentTime.java
+**/TestInvokerGenerator.java
+**/TestGroupConstParallelTez.java
+**/TestJobSubmissionTez.java
+**/TestMergeJoin.java
+**/TestNativeMapReduce.java
+**/TestPigProgressReporting.java