You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by xu...@apache.org on 2017/03/10 02:36:59 UTC

svn commit: r1786266 - in /pig/branches/spark: ./ src/docs/src/documentation/content/xdocs/ src/org/apache/pig/backend/hadoop/executionengine/spark/converter/ src/org/apache/pig/data/ src/org/apache/pig/tools/pigstats/spark/ test/org/apache/pig/spark/ ...

Author: xuefu
Date: Fri Mar 10 02:36:59 2017
New Revision: 1786266

URL: http://svn.apache.org/viewvc?rev=1786266&view=rev
Log:
PIG-5133: Commit changes from last round of review on rb (Liyun via Xuefu)

Added:
    pig/branches/spark/test/org/apache/pig/test/YarnMiniCluster.java
Modified:
    pig/branches/spark/build.xml
    pig/branches/spark/src/docs/src/documentation/content/xdocs/start.xml
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java
    pig/branches/spark/src/org/apache/pig/data/SelfSpillBag.java
    pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java
    pig/branches/spark/test/org/apache/pig/spark/TestIndexedKey.java
    pig/branches/spark/test/org/apache/pig/test/SparkMiniCluster.java
    pig/branches/spark/test/org/apache/pig/test/TestCase.java
    pig/branches/spark/test/org/apache/pig/test/TestCombiner.java
    pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline.java
    pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline2.java
    pig/branches/spark/test/org/apache/pig/test/TestEvalPipelineLocal.java
    pig/branches/spark/test/org/apache/pig/test/TestGrunt.java
    pig/branches/spark/test/org/apache/pig/test/TestLineageFindRelVisitor.java
    pig/branches/spark/test/org/apache/pig/test/TestMergeJoin.java
    pig/branches/spark/test/org/apache/pig/test/TestMergeJoinOuter.java
    pig/branches/spark/test/org/apache/pig/test/TestMultiQuery.java
    pig/branches/spark/test/org/apache/pig/test/TestProjectRange.java
    pig/branches/spark/test/org/apache/pig/test/TestPruneColumn.java
    pig/branches/spark/test/org/apache/pig/test/TestSecondarySort.java
    pig/branches/spark/test/org/apache/pig/test/TestSkewedJoin.java
    pig/branches/spark/test/org/apache/pig/test/TezMiniCluster.java
    pig/branches/spark/test/org/apache/pig/test/Util.java

Modified: pig/branches/spark/build.xml
URL: http://svn.apache.org/viewvc/pig/branches/spark/build.xml?rev=1786266&r1=1786265&r2=1786266&view=diff
==============================================================================
--- pig/branches/spark/build.xml (original)
+++ pig/branches/spark/build.xml Fri Mar 10 02:36:59 2017
@@ -331,7 +331,7 @@
             <fileset dir="${ivy.lib.dir}">
                 <include name="**.*jar"/>
             </fileset>
-            <fileset dir="${build.ivy.spark.lib.dir}/${ant.project.name}">
+            <fileset dir="${build.ivy.spark.lib.dir}">
                 <include name="**.*jar"/>
             </fileset>
         </path>
@@ -364,7 +364,7 @@
     <path id="classpath">
         <fileset file="${ivy.lib.dir}/${zookeeper.jarfile}"/>
         <fileset dir="${ivy.lib.dir}" includes="*.jar"/>
-        <fileset dir="${build.ivy.spark.lib.dir}/${ant.project.name}" includes="*.jar"/>
+        <fileset dir="${build.ivy.spark.lib.dir}" includes="*.jar"/>
     </path>
 
     <!-- javadoc-classpath -->
@@ -734,7 +734,7 @@
     <target name="copySparkDependencies">
         <mkdir dir="${spark.lib.dir}" />
         <copy todir="${spark.lib.dir}">
-            <fileset dir="${build.ivy.spark.lib.dir}/${ant.project.name}" includes="*.jar"/>
+            <fileset dir="${build.ivy.spark.lib.dir}" includes="*.jar"/>
         </copy>
     </target>
     
@@ -1654,7 +1654,7 @@
        <ivy:retrieve settingsRef="${ant.project.name}.ivy.settings" log="${loglevel}"
                  pattern="${build.ivy.lib.dir}/${ivy.artifact.retrieve.pattern}" conf="compile"/>
        <ivy:retrieve settingsRef="${ant.project.name}.ivy.settings" log="${loglevel}"
-                 pattern="${build.ivy.spark.lib.dir}/${ivy.artifact.retrieve.pattern}" conf="spark"/>
+                 pattern="${build.ivy.spark.lib.dir}/[artifact]-[revision](-[classifier]).[ext]" conf="spark"/>
        <ivy:cachepath pathid="compile.classpath" conf="compile"/>
      </target>
 

Modified: pig/branches/spark/src/docs/src/documentation/content/xdocs/start.xml
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/docs/src/documentation/content/xdocs/start.xml?rev=1786266&r1=1786265&r2=1786266&view=diff
==============================================================================
--- pig/branches/spark/src/docs/src/documentation/content/xdocs/start.xml (original)
+++ pig/branches/spark/src/docs/src/documentation/content/xdocs/start.xml Fri Mar 10 02:36:59 2017
@@ -95,48 +95,44 @@ Test the Pig installation with this simp
 	<table>
 	<tr>
 	<td></td>
-    <td><strong>Interactive Mode </strong></td>
-    <td><strong>Batch Mode</strong> </td>
-	</tr>
-	<tr>
     <td><strong>Local Mode</strong></td>
-    <td>yes</td>
-    <td>yes</td>
-	</tr>
-  <tr>
-     <td><strong>Mapreduce Mode</strong></td>
-     <td>yes</td>
-     <td>yes</td>
-  </tr>
-	<tr>
     <td><strong>Tez Local Mode</strong></td>
-    <td>experimental</td>
-    <td>experimental</td>
+    <td><strong>Spark Local Mode</strong></td>
+    <td><strong>Mapreduce Mode</strong></td>
+    <td><strong>Tez Mode</strong></td>
+    <td><strong>Spark Mode</strong></td>
 	</tr>
 	<tr>
-    <td><strong>Tez Mode</strong></td>
+	<td><strong>Interactive Mode </strong></td>
+    <td>yes</td>
+    <td>experimental</td>
     <td>yes</td>
     <td>yes</td>
 	</tr>
 	<tr>
-    <td><strong>Spark Mode</strong></td>
-    <td>experimental</td>
+	<td><strong>Batch Mode</strong> </td>
+    <td>yes</td>
     <td>experimental</td>
+    <td>yes</td>
+    <td>yes</td>
 	</tr>
 	</table>
 
 	<!-- ++++++++++++++++++++++++++++++++++ -->
 	   <section id="execution-modes">
 	<title>Execution Modes</title>
-<p>Pig has five execution modes or exectypes: </p>
+<p>Pig has six execution modes or exectypes: </p>
 <ul>
 <li><strong>Local Mode</strong> - To run Pig in local mode, you need access to a single machine; all files are installed and run using your local host and file system. Specify local mode using the -x flag (pig -x local).
 </li>
-<li><strong>Mapreduce Mode</strong> - To run Pig in mapreduce mode, you need access to a Hadoop cluster and HDFS installation. Mapreduce mode is the default mode; you can, <em>but don't need to</em>, specify it using the -x flag (pig OR pig -x mapreduce).
-</li>
 <li><strong>Tez Local Mode</strong> - To run Pig in tez local mode. It is similar to local mode, except internally Pig will invoke tez runtime engine. Specify Tez local mode using the -x flag (pig -x tez_local).
 <p><strong>Note:</strong> Tez local mode is experimental. There are some queries which just error out on bigger data in local mode.</p>
 </li>
+<li><strong>Spark Local Mode</strong> - To run Pig in spark local mode. It is similar to local mode, except internally Pig will invoke spark runtime engine. Specify Spark local mode using the -x flag (pig -x spark_local).
+<p><strong>Note:</strong> Spark local mode is experimental. There are some queries which just error out on bigge data in local mode.</p>
+</li>
+<li><strong>Mapreduce Mode</strong> - To run Pig in mapreduce mode, you need access to a Hadoop cluster and HDFS installation. Mapreduce mode is the default mode; you can, <em>but don't need to</em>, specify it using the -x flag (pig OR pig -x mapreduce).
+</li>
 <li><strong>Tez Mode</strong> - To run Pig in Tez mode, you need access to a Hadoop cluster and HDFS installation. Specify Tez mode using the -x flag (-x tez).
 </li>
 <li><strong>Spark Mode</strong> - To run Pig in Spark mode, you need access to a Spark, Yarn or Mesos cluster and HDFS installation. Specify Spark mode using the -x flag (-x spark). In Spark execution mode, it is necessary to set env::SPARK_MASTER to an appropriate value (local - local mode, yarn-client - yarn-client mode, mesos://host:port - spark on mesos or spark://host:port - spark cluster. For more information refer to spark documentation on Master Urls, <em>yarn-cluster mode is currently not supported</em>). Pig scripts run on Spark can take advantage of the <a href="http://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation">dynamic allocation</a> feature. The feature can be enabled by simply enabling <em>spark.dynamicAllocation.enabled</em>. Refer to spark <a href="http://spark.apache.org/docs/latest/configuration.html#dynamic-allocation">configuration</a> for additional configuration details. In general all properties in the pig script prefixed with
  <em>spark.</em> are copied to the Spark Application Configuration. Please note that Yarn auxillary service need to be enabled on Spark for this to work. See Spark documentation for additional details.
@@ -159,6 +155,9 @@ $ pig -x local ...
 /* Tez local mode */
 $ pig -x tez_local ...
  
+/* Spark local mode */
+$ pig -x spark_local ...
+
 /* mapreduce mode */
 $ pig ...
 or
@@ -203,6 +202,13 @@ $ pig -x tez_local
 grunt> 
 </source>
 
+<p><strong>Spark Local Mode</strong></p>
+<source>
+$ pig -x spark_local
+... - Connecting to ...
+grunt> 
+</source>
+
 <p><strong>Mapreduce Mode</strong> </p>
 <source>
 $ pig -x mapreduce
@@ -259,6 +265,10 @@ $ pig -x local id.pig
 <source>
 $ pig -x tez_local id.pig
 </source>
+<p><strong>Spark Local Mode</strong></p>
+<source>
+$ pig -x spark_local id.pig
+</source>
 <p><strong>Mapreduce Mode</strong> </p>
 <source>
 $ pig id.pig
@@ -549,7 +559,7 @@ However, in a production environment you
   <section id="tutorial">
 <title>Pig Tutorial </title>
 
-<p>The Pig tutorial shows you how to run Pig scripts using Pig's local mode, mapreduce mode and Tez mode (see <a href="#execution-modes">Execution Modes</a>).</p>
+<p>The Pig tutorial shows you how to run Pig scripts using Pig's local mode, mapreduce mode, Tez mode and Spark mode (see <a href="#execution-modes">Execution Modes</a>).</p>
 
 <p>To get started, do the following preliminary tasks:</p>
 
@@ -600,6 +610,10 @@ Or if you are using Tez local mode:
 <source>
 $ pig -x tez_local script1-local.pig
 </source>
+Or if you are using Spark local mode:
+<source>
+$ pig -x spark_local script1-local.pig
+</source>
 </li>
 <li>Review the result files, located in the script1-local-results.txt directory.
 <p>The output may contain a few Hadoop warnings which can be ignored:</p>
@@ -613,7 +627,7 @@ $ pig -x tez_local script1-local.pig
 
  <!-- ++++++++++++++++++++++++++++++++++ --> 
 <section>
-<title> Running the Pig Scripts in Mapreduce Mode or Tez Mode</title>
+<title> Running the Pig Scripts in Mapreduce Mode, Tez Mode or Spark Mode</title>
 
 <p>To run the Pig scripts in mapreduce mode, do the following: </p>
 <ol>
@@ -632,6 +646,8 @@ export PIG_CLASSPATH=/mycluster/conf
 <source>
 export PIG_CLASSPATH=/mycluster/conf:/tez/conf
 </source>
+<p>If you are using Spark, you will also need to specify SPARK_HOME and specify SPARK_JAR which is the hdfs location where you upload $SPARK_HOME/lib/spark-assembly*.jar:</p>
+<source>export SPARK_HOME=/mysparkhome/; export SPARK_JAR=hdfs://example.com:8020/spark-assembly*.jar</source>
 <p><strong>Note:</strong> The PIG_CLASSPATH can also be used to add any other 3rd party dependencies or resource files a pig script may require. If there is also a need to make the added entries take the highest precedence in the Pig JVM's classpath order, one may also set the env-var PIG_USER_CLASSPATH_FIRST to any value, such as 'true' (and unset the env-var to disable).</p></li>
 <li>Set the HADOOP_CONF_DIR environment variable to the location of the cluster configuration directory:
 <source>
@@ -646,6 +662,10 @@ Or if you are using Tez:
 <source>
 $ pig -x tez script1-hadoop.pig
 </source>
+Or if you are using Spark:
+<source>
+$ pig -x spark script1-hadoop.pig
+</source>
 </li>
 
 <li>Review the result files, located in the script1-hadoop-results or script2-hadoop-results HDFS directory:

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java?rev=1786266&r1=1786265&r2=1786266&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java Fri Mar 10 02:36:59 2017
@@ -54,7 +54,6 @@ public class SortConverter implements RD
                 SparkUtil.getManifest(Tuple.class),
                 SparkUtil.getManifest(Object.class));
 
-
         JavaPairRDD<Tuple, Object> sorted = r.sortByKey(
                 sortOperator.getMComparator(), true, parallelism);
         JavaRDD<Tuple> mapped = sorted.mapPartitions(TO_VALUE_FUNCTION);

Modified: pig/branches/spark/src/org/apache/pig/data/SelfSpillBag.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/SelfSpillBag.java?rev=1786266&r1=1786265&r2=1786266&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/data/SelfSpillBag.java (original)
+++ pig/branches/spark/src/org/apache/pig/data/SelfSpillBag.java Fri Mar 10 02:36:59 2017
@@ -29,6 +29,7 @@ import org.apache.pig.classification.Int
 @InterfaceStability.Evolving
 public abstract class SelfSpillBag extends DefaultAbstractBag {
     private static final long serialVersionUID = 1L;
+    // SelfSpillBag$MemoryLimits is not serializable
     //in spark mode, if we don't set memLimit transient, it will throw NotSerializableExecption(See PIG-4611)
     protected transient MemoryLimits memLimit;
 

Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java?rev=1786266&r1=1786265&r2=1786266&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java Fri Mar 10 02:36:59 2017
@@ -40,7 +40,6 @@ import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.tools.pigstats.InputStats;
 import org.apache.pig.tools.pigstats.JobStats;
-import org.apache.pig.tools.pigstats.OutputStats;
 import org.apache.pig.tools.pigstats.PigStats;
 import org.apache.pig.tools.pigstats.ScriptState;
 import org.apache.spark.api.java.JavaSparkContext;

Modified: pig/branches/spark/test/org/apache/pig/spark/TestIndexedKey.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/spark/TestIndexedKey.java?rev=1786266&r1=1786265&r2=1786266&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/spark/TestIndexedKey.java (original)
+++ pig/branches/spark/test/org/apache/pig/spark/TestIndexedKey.java Fri Mar 10 02:36:59 2017
@@ -1,14 +1,19 @@
 /*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the
- * NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file
- * except in compliance with the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
- * http://www.apache.org/licenses/LICENSE-2.0
+ *     http://www.apache.org/licenses/LICENSE-2.0
  *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is
- * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
 package org.apache.pig.spark;
 

Modified: pig/branches/spark/test/org/apache/pig/test/SparkMiniCluster.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/SparkMiniCluster.java?rev=1786266&r1=1786265&r2=1786266&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/SparkMiniCluster.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/SparkMiniCluster.java Fri Mar 10 02:36:59 2017
@@ -35,117 +35,17 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.spark.SparkExecType;
 import org.apache.pig.backend.hadoop.executionengine.spark.SparkLauncher;
 
-public class SparkMiniCluster extends MiniGenericCluster {
-    private static final File CONF_DIR = new File("build/classes");
-    private static final File CORE_CONF_FILE = new File(CONF_DIR, "core-site.xml");
-    private static final File HDFS_CONF_FILE = new File(CONF_DIR, "hdfs-site.xml");
-    private static final File MAPRED_CONF_FILE = new File(CONF_DIR, "mapred-site.xml");
-    private static final File YARN_CONF_FILE = new File(CONF_DIR, "yarn-site.xml");
-
-    private Configuration m_dfs_conf = null;
-    protected MiniMRYarnCluster m_mr = null;
-    private Configuration m_mr_conf = null;
+public class SparkMiniCluster extends YarnMiniCluster {
 
     private static final Log LOG = LogFactory
             .getLog(SparkMiniCluster.class);
     private ExecType spark = new SparkExecType();
-    SparkMiniCluster() {
-
-    }
 
     @Override
     public ExecType getExecType() {
         return spark;
     }
 
-    @Override
-    protected void setupMiniDfsAndMrClusters() {
-        try {
-            deleteConfFiles();
-            CONF_DIR.mkdirs();
-
-            // Build mini DFS cluster
-            Configuration hdfsConf = new Configuration();
-            m_dfs = new MiniDFSCluster.Builder(hdfsConf)
-                    .numDataNodes(2)
-                    .format(true)
-                    .racks(null)
-                    .build();
-            m_fileSys = m_dfs.getFileSystem();
-            m_dfs_conf = m_dfs.getConfiguration(0);
-
-            //Create user home directory
-            m_fileSys.mkdirs(m_fileSys.getWorkingDirectory());
-            // Write core-site.xml
-            Configuration core_site = new Configuration(false);
-            core_site.set(FileSystem.FS_DEFAULT_NAME_KEY, m_dfs_conf.get(FileSystem.FS_DEFAULT_NAME_KEY));
-            core_site.writeXml(new FileOutputStream(CORE_CONF_FILE));
-
-            Configuration hdfs_site = new Configuration(false);
-            for (Map.Entry<String, String> conf : m_dfs_conf) {
-                if (ArrayUtils.contains(m_dfs_conf.getPropertySources(conf.getKey()), "programatically")) {
-                    hdfs_site.set(conf.getKey(), m_dfs_conf.getRaw(conf.getKey()));
-                }
-            }
-            hdfs_site.writeXml(new FileOutputStream(HDFS_CONF_FILE));
-
-            // Build mini YARN cluster
-            m_mr = new MiniMRYarnCluster("PigMiniCluster", 2);
-            m_mr.init(m_dfs_conf);
-            m_mr.start();
-            m_mr_conf = m_mr.getConfig();
-            m_mr_conf.set(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
-                    System.getProperty("java.class.path"));
-
-            Configuration mapred_site = new Configuration(false);
-            Configuration yarn_site = new Configuration(false);
-            for (Map.Entry<String, String> conf : m_mr_conf) {
-                if (ArrayUtils.contains(m_mr_conf.getPropertySources(conf.getKey()), "programatically")) {
-                    if (conf.getKey().contains("yarn")) {
-                        yarn_site.set(conf.getKey(), m_mr_conf.getRaw(conf.getKey()));
-                    } else if (!conf.getKey().startsWith("dfs")){
-                        mapred_site.set(conf.getKey(), m_mr_conf.getRaw(conf.getKey()));
-                    }
-                }
-            }
-
-            mapred_site.writeXml(new FileOutputStream(MAPRED_CONF_FILE));
-            yarn_site.writeXml(new FileOutputStream(YARN_CONF_FILE));
-
-            m_conf = m_mr_conf;
-            System.setProperty("junit.hadoop.conf", CONF_DIR.getPath());
-            System.setProperty("hadoop.log.dir", "build/test/logs");
-        } catch (IOException e) {
-            throw new RuntimeException(e);
-
-        }
-    }
-
-    @Override
-    protected void shutdownMiniMrClusters() {
-        deleteConfFiles();
-        if (m_mr != null) {
-            m_mr.stop();
-            m_mr = null;
-        }
-    }
-
-    private void deleteConfFiles() {
-
-        if(CORE_CONF_FILE.exists()) {
-            CORE_CONF_FILE.delete();
-        }
-        if(HDFS_CONF_FILE.exists()) {
-            HDFS_CONF_FILE.delete();
-        }
-        if(MAPRED_CONF_FILE.exists()) {
-            MAPRED_CONF_FILE.delete();
-        }
-        if(YARN_CONF_FILE.exists()) {
-            YARN_CONF_FILE.delete();
-        }
-    }
-
     static public Launcher getLauncher() {
         return new SparkLauncher();
     }

Modified: pig/branches/spark/test/org/apache/pig/test/TestCase.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestCase.java?rev=1786266&r1=1786265&r2=1786266&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestCase.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestCase.java Fri Mar 10 02:36:59 2017
@@ -275,7 +275,8 @@ public class TestCase {
                 "(3,3n,{(c,x),(c,y)})"
         };
         Schema s = pigServer.dumpSchema("C");
-        Util.checkQueryOutputsAfterSortRecursive(out.iterator(), expected, org.apache.pig.newplan.logical.Util.translateSchema(s));
+        Util.checkQueryOutputs(out.iterator(), expected, org.apache.pig.newplan.logical.Util.translateSchema(s),
+                Util.isSparkExecType(Util.getLocalTestMode()));
     }
 
     /**

Modified: pig/branches/spark/test/org/apache/pig/test/TestCombiner.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestCombiner.java?rev=1786266&r1=1786265&r2=1786266&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestCombiner.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestCombiner.java Fri Mar 10 02:36:59 2017
@@ -406,12 +406,12 @@ public class TestCombiner {
         pigServer.shutdown();
     }
 
-    private void checkCombinerUsed(PigServer pigServer, String variable, boolean combineExpected)
+    private void checkCombinerUsed(PigServer pigServer, String alias, boolean combineExpected)
             throws IOException {
         // make sure there is a combine plan in the explain output
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         PrintStream ps = new PrintStream(baos);
-        pigServer.explain(variable, ps);
+        pigServer.explain(alias, ps);
         boolean combinerFound;
         if (pigServer.getPigContext().getExecType().name().equalsIgnoreCase("spark")) {
             combinerFound = baos.toString().contains("Reduce By");

Modified: pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline.java?rev=1786266&r1=1786265&r2=1786266&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline.java Fri Mar 10 02:36:59 2017
@@ -428,12 +428,7 @@ public class TestEvalPipeline {
             actualResList.add(iter.next());
         }
 
-        if (Util.isSparkExecType(cluster.getExecType())) {
-            for (Tuple t : actualResList) {
-                Util.convertBagToSortedBag(t);
-            }
-            Collections.sort(actualResList);
-        }
+        Util.sortQueryOutputsIfNeed(actualResList, Util.isSparkExecType(cluster.getExecType()));
 
         int numIdentity = 0;
         for (Tuple t : actualResList) {
@@ -484,12 +479,7 @@ public class TestEvalPipeline {
             actualResList.add(iter.next());
         }
 
-        if (Util.isSparkExecType(cluster.getExecType())) {
-            for (Tuple t : actualResList) {
-                Util.convertBagToSortedBag(t);
-            }
-            Collections.sort(actualResList);
-        }
+        Util.sortQueryOutputsIfNeed(actualResList, Util.isSparkExecType(cluster.getExecType()));
 
         int numIdentity = 0;
         for (Tuple t : actualResList) {
@@ -870,12 +860,7 @@ public class TestEvalPipeline {
             actualResList.add(iter.next());
         }
 
-        if (Util.isSparkExecType(cluster.getExecType())) {
-            for (Tuple t : actualResList) {
-                Util.convertBagToSortedBag(t);
-            }
-            Collections.sort(actualResList);
-        }
+        Util.sortQueryOutputsIfNeed(actualResList, Util.isSparkExecType(cluster.getExecType()));
 
         int numIdentity = 0;
         for (Tuple t : actualResList) {

Modified: pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline2.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline2.java?rev=1786266&r1=1786265&r2=1786266&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline2.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline2.java Fri Mar 10 02:36:59 2017
@@ -143,12 +143,7 @@ public class TestEvalPipeline2 {
             actualResList.add(iter.next());
         }
 
-        if (Util.isSparkExecType(cluster.getExecType())) {
-            for (Tuple t : actualResList) {
-                Util.convertBagToSortedBag(t);
-            }
-            Collections.sort(actualResList);
-        }
+        Util.sortQueryOutputsIfNeed(actualResList,Util.isSparkExecType(cluster.getExecType()));
 
         int numIdentity = 0;
         for (Tuple tuple : actualResList) {
@@ -490,8 +485,8 @@ public class TestEvalPipeline2 {
         if (Util.isSparkExecType(cluster.getExecType())) {
             String[] expectedResults =
                 new String[] {"(2,{(2,2)},{(2,5,2)})", "(1,{(1,1)},{(1,2,3)})" };
-            Util.checkQueryOutputsAfterSortRecursive(iter, expectedResults,
-                org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("D")));
+            Util.checkQueryOutputs(iter, expectedResults,
+                org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("D")), Util.isSparkExecType(cluster.getExecType()));
         } else {
             Assert.assertTrue(iter.hasNext());
             Tuple t = iter.next();
@@ -760,15 +755,7 @@ public class TestEvalPipeline2 {
         List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
             new String[]{"(60000L,2L,3L)", "(120000L,2L,2L)", "(240000L,1L,1L)"});
 
-        if (Util.isSparkExecType(cluster.getExecType())) {
-            Util.checkQueryOutputsAfterSort(iter, expectedResults);
-        } else {
-            // Even though GROUP BY does not return results sorted by key, that
-            // is the current behavior for MR, which some users rely on. Let's
-            // not sort the results for MR and Tez so that we can catch it when
-            // current MR/Tez behavior changes.
-            Util.checkQueryOutputs(iter, expectedResults);
-        }
+        Util.checkQueryOutputs(iter, expectedResults, Util.isSparkExecType(cluster.getExecType()));
     }
 
     // See PIG-1729
@@ -1610,8 +1597,9 @@ public class TestEvalPipeline2 {
 
         String[] expected = new String[] {"(1,A)", "(1,B)", "(2,C)"};
 
-        Util.checkQueryOutputsAfterSortRecursive(iter, expected,
-            org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("flattened")));
+        Util.checkQueryOutputs(iter, expected,
+            org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("flattened")), 
+            Util.isSparkExecType(cluster.getExecType()));
     }
 
     // See PIG-2237

Modified: pig/branches/spark/test/org/apache/pig/test/TestEvalPipelineLocal.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestEvalPipelineLocal.java?rev=1786266&r1=1786265&r2=1786266&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestEvalPipelineLocal.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestEvalPipelineLocal.java Fri Mar 10 02:36:59 2017
@@ -897,7 +897,7 @@ public class TestEvalPipelineLocal {
         }
 
         numIdentity = resList.size();
-        Collections.sort(resList);
+        Util.sortQueryOutputsIfNeed(resList, Util.isSparkExecType(Util.getLocalTestMode()));
         Assert.assertEquals(LOOP_COUNT, numIdentity);
         // Since delta differences in some cases are allowed, utility function 
         // to compare tuple-lists cannot be used here.
@@ -937,7 +937,7 @@ public class TestEvalPipelineLocal {
                 expectedList.add(t);
             }
         }
-        Collections.sort(expectedList);  
+        Util.sortQueryOutputsIfNeed(expectedList, Util.isSparkExecType(Util.getLocalTestMode()));
         ps.close();
 
         pigServer.registerQuery("A = LOAD '"
@@ -961,7 +961,7 @@ public class TestEvalPipelineLocal {
             resList.add(iter.next());
         }
 
-        Collections.sort(resList);
+        Util.sortQueryOutputsIfNeed(resList, Util.isSparkExecType(Util.getLocalTestMode()));
         Assert.assertEquals((LOOP_COUNT * LOOP_COUNT)/2, resList.size());
 
         // Since delta difference in some cases is allowed, utility function 
@@ -1014,7 +1014,7 @@ public class TestEvalPipelineLocal {
         if(!iter.hasNext()) Assert.fail("No output found");
         // When ruuning with spark, output can be in a different order than that
         // when running in mr mode.
-        Util.checkQueryOutputsAfterSort(iter, expectedList);
+        Util.checkQueryOutputs(iter, expectedList, Util.isSparkExecType(Util.getLocalTestMode()));
     }
 
     @Test
@@ -1141,7 +1141,7 @@ public class TestEvalPipelineLocal {
                                 "((1,2),{((1,2),3)})",
                                 "((4,5),{((4,5),6)})"
                         });
-        Util.checkQueryOutputsAfterSort(iter, expectedRes);
+        Util.checkQueryOutputs(iter, expectedRes, Util.isSparkExecType(Util.getLocalTestMode()));
     }
     
     @Test

Modified: pig/branches/spark/test/org/apache/pig/test/TestGrunt.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestGrunt.java?rev=1786266&r1=1786265&r2=1786266&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestGrunt.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestGrunt.java Fri Mar 10 02:36:59 2017
@@ -935,14 +935,14 @@ public class TestGrunt {
         // in spark mode, the output file 'baz' will not be automatically deleted even the job fails(see SPARK-7953)
         // when "cat baz;" is executed, it does not throw exception and the variable "caught" is false
         // TODO: Enable this for Spark when SPARK-7953 is resolved
-        if(!Util.isSparkExecType(cluster.getExecType())) {
-            try {
-                grunt.exec();
-            } catch (Exception e) {
-                caught = true;
-                assertTrue(e.getMessage().contains("baz does not exist"));
-            }
-            assertTrue(caught);
+        Assume.assumeTrue(
+                "Skip this test for Spark until SPARK-7953 is resolved!",
+                !Util.isSparkExecType(cluster.getExecType()));
+        try {
+            grunt.exec();
+        } catch (Exception e) {
+            caught = true;
+            assertTrue(e.getMessage().contains("baz does not exist"));
         }
     }
 

Modified: pig/branches/spark/test/org/apache/pig/test/TestLineageFindRelVisitor.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestLineageFindRelVisitor.java?rev=1786266&r1=1786265&r2=1786266&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestLineageFindRelVisitor.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestLineageFindRelVisitor.java Fri Mar 10 02:36:59 2017
@@ -259,8 +259,10 @@ public class TestLineageFindRelVisitor {
         pig.registerQuery("E = FOREACH D GENERATE (chararray) tupleD.a1;\n");
         Iterator<Tuple> iter  = pig.openIterator("E");
 
-        Util.checkQueryOutputsAfterSortRecursive(iter, new String[]{"(123)","(456)","(789)"},
-                org.apache.pig.newplan.logical.Util.translateSchema(pig.dumpSchema("E")));
+        Util.checkQueryOutputs(iter,
+                new String[]{"(123)", "(456)", "(789)"},
+                org.apache.pig.newplan.logical.Util.translateSchema(pig.dumpSchema("E")), Util.isSparkExecType(Util
+                .getLocalTestMode()));
     }
 
     @Test

Modified: pig/branches/spark/test/org/apache/pig/test/TestMergeJoin.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestMergeJoin.java?rev=1786266&r1=1786265&r2=1786266&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestMergeJoin.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestMergeJoin.java Fri Mar 10 02:36:59 2017
@@ -634,13 +634,6 @@ public class TestMergeJoin {
     
     @Test
     public void testExpressionFail() throws IOException{
-        // This test validates that join keys are not expressions.
-        // Expressions cannot be handled when the storage function
-        // implements IndexableLoadFunc.
-        // TODO: Enable this test when Spark engine implements Merge Join algorithm.
-        if (Util.isSparkExecType(cluster.getExecType()))
-            return;
-
         pigServer.registerQuery("A = LOAD 'leftinput' as (a:int);");
         pigServer.registerQuery("B = LOAD 'temp_file*' using " +
                 DummyIndexableLoader.class.getName() + "() as (a:int);");
@@ -685,11 +678,6 @@ public class TestMergeJoin {
     @Test
     public void testMergeJoinWithCommaSeparatedFilePaths() throws IOException{
 
-        // Spark engine currently implements merge join as regular join
-        // TODO: Enable this test when Spark engine implements Merge Join algorithm.
-        if (Util.isSparkExecType(cluster.getExecType()))
-            return;
-
         pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "';");
         pigServer.registerQuery("B = LOAD 'temp_file,righinput_file' using " +
                 DummyIndexableLoader.class.getName() + "();");

Modified: pig/branches/spark/test/org/apache/pig/test/TestMergeJoinOuter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestMergeJoinOuter.java?rev=1786266&r1=1786265&r2=1786266&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestMergeJoinOuter.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestMergeJoinOuter.java Fri Mar 10 02:36:59 2017
@@ -169,10 +169,6 @@ public class TestMergeJoinOuter {
     @Test
     public void testLeftOuter() throws IOException {
 
-        // TODO: Enable this test when Spark engine implements Merge Join algorithm.
-        if (Util.isSparkExecType(cluster.getExecType()))
-            return;
-
         pigServer.registerQuery("A = LOAD '"+INPUT_FILE1+"' using "+ DummyCollectableLoader.class.getName() +"() as (c1:chararray, c2:chararray);");
         pigServer.registerQuery("B = LOAD '"+INPUT_FILE2+"' using "+ DummyIndexableLoader.class.getName() +"() as (c1:chararray, c2:chararray);");
 
@@ -202,10 +198,6 @@ public class TestMergeJoinOuter {
     @Test
     public void testRightOuter() throws IOException{
 
-        // TODO: Enable this test when Spark engine implements Merge Join algorithm.
-        if (Util.isSparkExecType(cluster.getExecType()))
-            return;
-
         pigServer.registerQuery("A = LOAD '"+INPUT_FILE1+"' using "+ DummyCollectableLoader.class.getName() +"() as (c1:chararray, c2:chararray);");
         pigServer.registerQuery("B = LOAD '"+INPUT_FILE2+"' using "+ DummyIndexableLoader.class.getName() +"() as (c1:chararray, c2:chararray);");
         pigServer.registerQuery("C = join A by c1 right, B by c1 using 'merge';");
@@ -233,10 +225,6 @@ public class TestMergeJoinOuter {
     @Test
     public void testFullOuter() throws IOException{
 
-        // TODO: Enable this test when Spark engine implements Merge Join algorithm.
-        if (Util.isSparkExecType(cluster.getExecType()))
-            return;
-
         pigServer.registerQuery("A = LOAD '"+INPUT_FILE1+"' using "+ DummyCollectableLoader.class.getName() +"() as (c1:chararray, c2:chararray);");
         pigServer.registerQuery("B = LOAD '"+INPUT_FILE2+"' using "+ DummyIndexableLoader.class.getName() +"() as (c1:chararray, c2:chararray);");
         pigServer.registerQuery("C = join A by c1 full, B by c1 using 'merge';");

Modified: pig/branches/spark/test/org/apache/pig/test/TestMultiQuery.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestMultiQuery.java?rev=1786266&r1=1786265&r2=1786266&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestMultiQuery.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestMultiQuery.java Fri Mar 10 02:36:59 2017
@@ -114,7 +114,8 @@ public class TestMultiQuery {
                         "(2,3)"
         };
         Schema s = myPig.dumpSchema("E");
-        Util.checkQueryOutputsAfterSortRecursive(iter, expectedResults, org.apache.pig.newplan.logical.Util.translateSchema(s));
+        Util.checkQueryOutputs(iter, expectedResults, org.apache.pig.newplan.logical.Util.translateSchema(s), Util
+                .isSparkExecType(Util.getLocalTestMode()));
 
 
         myPig.registerQuery("E = load 'output2' as (a:int, b:int);");
@@ -125,7 +126,8 @@ public class TestMultiQuery {
                         "(3,4)"
         };
         s = myPig.dumpSchema("E");
-        Util.checkQueryOutputsAfterSortRecursive(iter, expectedResults, org.apache.pig.newplan.logical.Util.translateSchema(s));
+        Util.checkQueryOutputs(iter, expectedResults, org.apache.pig.newplan.logical.Util.translateSchema(s), Util
+                .isSparkExecType(Util.getLocalTestMode()));
     }
 
     @Test
@@ -165,7 +167,8 @@ public class TestMultiQuery {
                         "(5,6)"
         };
         Schema s = myPig.dumpSchema("F");
-        Util.checkQueryOutputsAfterSortRecursive(iter, expectedResults, org.apache.pig.newplan.logical.Util.translateSchema(s));
+        Util.checkQueryOutputs(iter, expectedResults, org.apache.pig.newplan.logical.Util.translateSchema(s), Util
+                .isSparkExecType(Util.getLocalTestMode()));
     }
 
     @Test
@@ -292,7 +295,8 @@ public class TestMultiQuery {
                 "(3L,persimmon,5,3L,persimmon,3L,{(3L)})"
         };
         Schema s = myPig.dumpSchema("E");
-        Util.checkQueryOutputsAfterSortRecursive(iter, expectedResults, org.apache.pig.newplan.logical.Util.translateSchema(s));
+        Util.checkQueryOutputs(iter, expectedResults, org.apache.pig.newplan.logical.Util.translateSchema(s), Util
+                .isSparkExecType(Util.getLocalTestMode()));
     }
 
     @Test
@@ -332,7 +336,8 @@ public class TestMultiQuery {
                 "(strawberry,{(30,strawberry,quit,bot)},{})"};
 
         Schema s = myPig.dumpSchema("joined_session_info");
-        Util.checkQueryOutputsAfterSortRecursive(iter, expectedResults, org.apache.pig.newplan.logical.Util.translateSchema(s));
+        Util.checkQueryOutputs(iter, expectedResults, org.apache.pig.newplan.logical.Util.translateSchema(s), Util
+                .isSparkExecType(Util.getLocalTestMode()));
     }
 
     @Test
@@ -930,20 +935,24 @@ public class TestMultiQuery {
 
         List<Tuple> actualResults = data.get("output1");
         String[] expectedResults = new String[]{"(12, 1)"};
-        Util.checkQueryOutputsAfterSortRecursive(actualResults.iterator(), expectedResults, org.apache.pig.newplan.logical.Util.translateSchema(myPig.dumpSchema("B1")));
+        Util.checkQueryOutputs(actualResults.iterator(), expectedResults, org.apache.pig.newplan
+                .logical.Util.translateSchema(myPig.dumpSchema("B1")), Util.isSparkExecType(Util.getLocalTestMode()));
 
 
         actualResults = data.get("output2");
         expectedResults = new String[]{"(c,1)"};
-        Util.checkQueryOutputsAfterSortRecursive(actualResults.iterator(), expectedResults, org.apache.pig.newplan.logical.Util.translateSchema(myPig.dumpSchema("B2")));
+        Util.checkQueryOutputs(actualResults.iterator(), expectedResults, org.apache.pig.newplan.logical.Util
+                .translateSchema(myPig.dumpSchema("B2")), Util.isSparkExecType(Util.getLocalTestMode()));
 
         actualResults = data.get("output3");
         expectedResults = new String[]{"(-12, 1)"};
-        Util.checkQueryOutputsAfterSortRecursive(actualResults.iterator(), expectedResults, org.apache.pig.newplan.logical.Util.translateSchema(myPig.dumpSchema("C1")));
+        Util.checkQueryOutputs(actualResults.iterator(), expectedResults, org.apache.pig.newplan.logical.Util
+                .translateSchema(myPig.dumpSchema("C1")), Util.isSparkExecType(Util.getLocalTestMode()));
 
         actualResults = data.get("output4");
         expectedResults = new String[]{"(d,1)"};
-        Util.checkQueryOutputsAfterSortRecursive(actualResults.iterator(), expectedResults, org.apache.pig.newplan.logical.Util.translateSchema(myPig.dumpSchema("C2")));
+        Util.checkQueryOutputs(actualResults.iterator(), expectedResults, org.apache.pig.newplan.logical.Util
+                .translateSchema(myPig.dumpSchema("C2")), Util.isSparkExecType(Util.getLocalTestMode()));
     }
 
     // --------------------------------------------------------------------------

Modified: pig/branches/spark/test/org/apache/pig/test/TestProjectRange.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestProjectRange.java?rev=1786266&r1=1786265&r2=1786266&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestProjectRange.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestProjectRange.java Fri Mar 10 02:36:59 2017
@@ -650,7 +650,8 @@ public class TestProjectRange  {
                         "(11,{(11,21,31,41,51)})",
                 };
         Schema s = pigServer.dumpSchema("f");
-        Util.checkQueryOutputsAfterSortRecursive(it, expectedRes,org.apache.pig.newplan.logical.Util.translateSchema(s));
+        Util.checkQueryOutputs(it, expectedRes,org.apache.pig.newplan.logical.Util.translateSchema(s),
+        		Util.isSparkExecType(cluster.getExecType()));
     }
 
     /**
@@ -737,7 +738,8 @@ public class TestProjectRange  {
                         "(1,{(11,21,31,41,51),(10,20,30,40,50)})",
                 };
         Schema s = pigServer.dumpSchema("f");
-        Util.checkQueryOutputsAfterSortRecursive(it, expectedRes,org.apache.pig.newplan.logical.Util.translateSchema(s));
+        Util.checkQueryOutputs(it, expectedRes,org.apache.pig.newplan.logical.Util.translateSchema(s), 
+        		Util.isSparkExecType(cluster.getExecType()));
 
     }
 
@@ -946,7 +948,8 @@ public class TestProjectRange  {
                 };
         Iterator<Tuple> it = pigServer.openIterator("g");
         Schema s = pigServer.dumpSchema("g");
-        Util.checkQueryOutputsAfterSortRecursive(it, expectedRes,org.apache.pig.newplan.logical.Util.translateSchema(s));
+        Util.checkQueryOutputs(it, expectedRes,org.apache.pig.newplan.logical.Util.translateSchema(s), 
+        		Util.isSparkExecType(cluster.getExecType()));
     }
 
     /**
@@ -1013,7 +1016,8 @@ public class TestProjectRange  {
                 };
         Iterator<Tuple> it = pigServer.openIterator("g");
         Schema s = pigServer.dumpSchema("g");
-        Util.checkQueryOutputsAfterSortRecursive(it,expectedRes, org.apache.pig.newplan.logical.Util.translateSchema(s));
+        Util.checkQueryOutputs(it,expectedRes, org.apache.pig.newplan.logical.Util.translateSchema(s),
+        		Util.isSparkExecType(cluster.getExecType()));
     }
 
     @Test
@@ -1064,7 +1068,8 @@ public class TestProjectRange  {
                 };
         Iterator<Tuple> it = pigServer.openIterator("lim");
         Schema s = pigServer.dumpSchema("lim");
-        Util.checkQueryOutputsAfterSortRecursive(it, expectedRes,org.apache.pig.newplan.logical.Util.translateSchema(s));
+        Util.checkQueryOutputs(it, expectedRes,org.apache.pig.newplan.logical.Util.translateSchema(s), 
+        		Util.isSparkExecType(cluster.getExecType()));
     }
 
 
@@ -1126,7 +1131,8 @@ public class TestProjectRange  {
                 };
         Iterator<Tuple> it = pigServer.openIterator("g");
         Schema s = pigServer.dumpSchema("g");
-        Util.checkQueryOutputsAfterSortRecursive(it, expectedRes, org.apache.pig.newplan.logical.Util.translateSchema(s));
+        Util.checkQueryOutputs(it, expectedRes, org.apache.pig.newplan.logical.Util.translateSchema(s),
+        		Util.isSparkExecType(cluster.getExecType()));
     }
 
     private void setAliasesToNull(Schema schema) {
@@ -1165,7 +1171,8 @@ public class TestProjectRange  {
                 };
         Iterator<Tuple> it = pigServer.openIterator("j");
         Schema s = pigServer.dumpSchema("j");
-        Util.checkQueryOutputsAfterSortRecursive(it, expectedRes,org.apache.pig.newplan.logical.Util.translateSchema(s));
+        Util.checkQueryOutputs(it, expectedRes,org.apache.pig.newplan.logical.Util.translateSchema(s), 
+        		Util.isSparkExecType(cluster.getExecType()));
     }
 
     @Test
@@ -1193,7 +1200,8 @@ public class TestProjectRange  {
                 };
         Iterator<Tuple> it = pigServer.openIterator("j");
         Schema s = pigServer.dumpSchema("j");
-        Util.checkQueryOutputsAfterSortRecursive(it, expectedRes, org.apache.pig.newplan.logical.Util.translateSchema(s));
+        Util.checkQueryOutputs(it, expectedRes, org.apache.pig.newplan.logical.Util.translateSchema(s), 
+        		Util.isSparkExecType(cluster.getExecType()));
     }
 
     @Test

Modified: pig/branches/spark/test/org/apache/pig/test/TestPruneColumn.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestPruneColumn.java?rev=1786266&r1=1786265&r2=1786266&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestPruneColumn.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestPruneColumn.java Fri Mar 10 02:36:59 2017
@@ -589,7 +589,8 @@ public class TestPruneColumn {
                 "({(1,2,3)},1)",
                 "({(2,5,2)},1)"
         };
-        Util.checkQueryOutputsAfterSortRecursive(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("C")));
+        Util.checkQueryOutputs(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("C")),
+                Util.isSparkExecType(Util.getLocalTestMode()));
 
         assertTrue(emptyLogFileMessage());
     }
@@ -609,7 +610,8 @@ public class TestPruneColumn {
                 "({(2)},{(2)})",
                 "({(5)},{})"
         };
-        Util.checkQueryOutputsAfterSortRecursive(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("D")));
+        Util.checkQueryOutputs(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("D")),
+        		Util.isSparkExecType(Util.getLocalTestMode()));
 
         assertTrue(emptyLogFileMessage());
     }
@@ -739,7 +741,8 @@ public class TestPruneColumn {
                 "(1,2,3,{(2)})",
                 "(2,5,2,{})"
         };
-        Util.checkQueryOutputsAfterSortRecursive(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("D")));
+        Util.checkQueryOutputs(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("D")),
+            Util.isSparkExecType(Util.getLocalTestMode()));
 
         assertTrue(emptyLogFileMessage());
     }
@@ -843,7 +846,8 @@ public class TestPruneColumn {
                 "(1,1)",
                 "(2,2)"
         };
-        Util.checkQueryOutputsAfterSortRecursive(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("D")));
+        Util.checkQueryOutputs(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("D")), 
+             Util.isSparkExecType(Util.getLocalTestMode()));
 
         assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $1, $2",
             "Columns pruned for B: $1"}));
@@ -958,7 +962,8 @@ public class TestPruneColumn {
                 "((1,2,3))",
                 "((2,5,2))"
         };
-        Util.checkQueryOutputsAfterSortRecursive(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("C")));
+        Util.checkQueryOutputs(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("C")),
+        		Util.isSparkExecType(Util.getLocalTestMode()));
 
         assertTrue(emptyLogFileMessage());
     }
@@ -1507,7 +1512,8 @@ public class TestPruneColumn {
                 "({(2),(2)},2)",
                 "({(3),(3),(3)},3)"
         };
-        Util.checkQueryOutputsAfterSortRecursive(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("G")));
+        Util.checkQueryOutputs(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("G")),
+        		Util.isSparkExecType(Util.getLocalTestMode()));
 
         assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $1"}));
         pigServer.getPigContext().getProperties().remove(PigImplConstants.PIG_OPTIMIZER_RULES_KEY);
@@ -1528,7 +1534,8 @@ public class TestPruneColumn {
                 "(2,{(1,2,3)})",
                 "(5,{(2,5,2)})"
         };
-        Util.checkQueryOutputsAfterSortRecursive(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("D")));
+        Util.checkQueryOutputs(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("D"))
+        		,Util.isSparkExecType(Util.getLocalTestMode()));
 
         assertTrue(checkLogFileMessage(new String[]{"Columns pruned for B: $0"}));
     }
@@ -1857,7 +1864,8 @@ public class TestPruneColumn {
                 "(1,2,3,1)",
                 "(2,5,2,2)"
         };
-        Util.checkQueryOutputsAfterSortRecursive(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("C")));
+        Util.checkQueryOutputs(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("C")),
+        		Util.isSparkExecType(Util.getLocalTestMode()));
 
 
         assertTrue(emptyLogFileMessage());

Modified: pig/branches/spark/test/org/apache/pig/test/TestSecondarySort.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestSecondarySort.java?rev=1786266&r1=1786265&r2=1786266&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestSecondarySort.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestSecondarySort.java Fri Mar 10 02:36:59 2017
@@ -238,8 +238,9 @@ public abstract class TestSecondarySort
         Iterator<Tuple> iter = pigServer.openIterator("D");
         String[] expectedRes = new String[]{"(2,1)","(1,2)"};
         Schema s = pigServer.dumpSchema("D");
-        Util.checkQueryOutputsAfterSortRecursive(iter,expectedRes,org.apache
-                .pig.newplan.logical.Util.translateSchema(s));
+        Util.checkQueryOutputs(iter,expectedRes,org.apache
+                .pig.newplan.logical.Util.translateSchema(s),
+                Util.isSparkExecType(cluster.getExecType()));
         Util.deleteFile(cluster, file1ClusterPath);
         Util.deleteFile(cluster, file2ClusterPath);
     }
@@ -268,8 +269,9 @@ public abstract class TestSecondarySort
                 "(2,{(2,3,4)})",
                 "(1,{(1,2,3),(1,2,4),(1,3,4)})"
         };
-        Util.checkQueryOutputsAfterSortRecursive(iter, expected, org.apache
-                .pig.newplan.logical.Util.translateSchema(s));
+        Util.checkQueryOutputs(iter, expected, org.apache
+                .pig.newplan.logical.Util.translateSchema(s), 
+                Util.isSparkExecType(Util.getLocalTestMode()));
         Util.deleteFile(cluster, clusterPath);
     }
 
@@ -361,7 +363,8 @@ public abstract class TestSecondarySort
         Iterator<Tuple> iter = pigServer.openIterator("E");
         Schema s = pigServer.dumpSchema("E");
         String[] expectedRes = new String[]{"((1,2),4)","((1,3),1)","((1,4),0)","((2,3),1)"};
-        Util.checkQueryOutputsAfterSortRecursive(iter, expectedRes,org.apache.pig.newplan.logical.Util.translateSchema(s) );
+        Util.checkQueryOutputs(iter, expectedRes,org.apache.pig.newplan.logical.Util.translateSchema(s),
+        		Util.isSparkExecType(cluster.getExecType()));
         Util.deleteFile(cluster, clusterPath1);
         Util.deleteFile(cluster, clusterPath2);
     }

Modified: pig/branches/spark/test/org/apache/pig/test/TestSkewedJoin.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestSkewedJoin.java?rev=1786266&r1=1786265&r2=1786266&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestSkewedJoin.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestSkewedJoin.java Fri Mar 10 02:36:59 2017
@@ -53,6 +53,7 @@ import org.apache.pig.impl.builtin.Parti
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.test.utils.TestHelper;
 import org.junit.AfterClass;
+import org.junit.Assume;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -309,8 +310,7 @@ public class TestSkewedJoin {
         // Spark engine currently implements skew join as regular join, and hence does
         // not control key distribution.
         // TODO: Enable this test when Spark engine implements Skew Join algorithm.
-        if (Util.isSparkExecType(cluster.getExecType()))
-            return;
+        Assume.assumeTrue("Skip this test for Spark until PIG-4858 is resolved!",!Util.isSparkExecType(cluster.getExecType()));
 
         String outputDir = "testSkewedJoinKeyPartition";
         try{

Modified: pig/branches/spark/test/org/apache/pig/test/TezMiniCluster.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TezMiniCluster.java?rev=1786266&r1=1786265&r2=1786266&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TezMiniCluster.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TezMiniCluster.java Fri Mar 10 02:36:59 2017
@@ -41,19 +41,12 @@ import org.apache.tez.dag.api.TezConfigu
 import org.apache.tez.mapreduce.hadoop.MRJobConfig;
 import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
 
-public class TezMiniCluster extends MiniGenericCluster {
-    private static final File CONF_DIR = new File("build/classes");
+public class TezMiniCluster extends YarnMiniCluster {
+
     private static final File TEZ_LIB_DIR = new File("build/ivy/lib/Pig");
     private static final File TEZ_CONF_FILE = new File(CONF_DIR, "tez-site.xml");
-    private static final File CORE_CONF_FILE = new File(CONF_DIR, "core-site.xml");
-    private static final File HDFS_CONF_FILE = new File(CONF_DIR, "hdfs-site.xml");
-    private static final File MAPRED_CONF_FILE = new File(CONF_DIR, "mapred-site.xml");
-    private static final File YARN_CONF_FILE = new File(CONF_DIR, "yarn-site.xml");
-    private static final ExecType TEZ = new TezExecType();
 
-    protected MiniMRYarnCluster m_mr = null;
-    private Configuration m_dfs_conf = null;
-    private Configuration m_mr_conf = null;
+    private static final ExecType TEZ = new TezExecType();
 
     @Override
     public ExecType getExecType() {
@@ -61,66 +54,9 @@ public class TezMiniCluster extends Mini
     }
 
     @Override
-    public void setupMiniDfsAndMrClusters() {
+    protected void setupMiniDfsAndMrClusters() {
+        super.setupMiniDfsAndMrClusters();
         try {
-            deleteConfFiles();
-            CONF_DIR.mkdirs();
-
-            // Build mini DFS cluster
-            Configuration hdfsConf = new Configuration();
-            m_dfs = new MiniDFSCluster.Builder(hdfsConf)
-                    .numDataNodes(2)
-                    .format(true)
-                    .racks(null)
-                    .build();
-            m_fileSys = m_dfs.getFileSystem();
-            m_dfs_conf = m_dfs.getConfiguration(0);
-            //Create user home directory
-            m_fileSys.mkdirs(m_fileSys.getWorkingDirectory());
-
-            // Write core-site.xml
-            Configuration core_site = new Configuration(false);
-            core_site.set(FileSystem.FS_DEFAULT_NAME_KEY, m_dfs_conf.get(FileSystem.FS_DEFAULT_NAME_KEY));
-            core_site.writeXml(new FileOutputStream(CORE_CONF_FILE));
-
-            Configuration hdfs_site = new Configuration(false);
-            for (Entry<String, String> conf : m_dfs_conf) {
-                if (ArrayUtils.contains(m_dfs_conf.getPropertySources(conf.getKey()), "programatically")) {
-                    hdfs_site.set(conf.getKey(), m_dfs_conf.getRaw(conf.getKey()));
-                }
-            }
-            hdfs_site.writeXml(new FileOutputStream(HDFS_CONF_FILE));
-
-            // Build mini YARN cluster
-            m_mr = new MiniMRYarnCluster("PigMiniCluster", 2);
-            m_mr.init(m_dfs_conf);
-            m_mr.start();
-            m_mr_conf = m_mr.getConfig();
-            File libDir = new File(System.getProperty("ivy.lib.dir", "build/ivy/lib/Pig"));
-            File classesDir = new File(System.getProperty("build.classes", "build/classes"));
-            File testClassesDir = new File(System.getProperty("test.build.classes", "test/build/classes"));
-            String classpath = libDir.getAbsolutePath() + "/*"
-                    + File.pathSeparator + classesDir.getAbsolutePath()
-                    + File.pathSeparator + testClassesDir.getAbsolutePath();
-            m_mr_conf.set(YarnConfiguration.YARN_APPLICATION_CLASSPATH, classpath);
-            m_mr_conf.set(MRJobConfig.MAP_JAVA_OPTS, "-Xmx512m");
-            m_mr_conf.set(MRJobConfig.REDUCE_JAVA_OPTS, "-Xmx512m");
-
-            Configuration mapred_site = new Configuration(false);
-            Configuration yarn_site = new Configuration(false);
-            for (Entry<String, String> conf : m_mr_conf) {
-                if (ArrayUtils.contains(m_mr_conf.getPropertySources(conf.getKey()), "programatically")) {
-                    if (conf.getKey().contains("yarn")) {
-                        yarn_site.set(conf.getKey(), m_mr_conf.getRaw(conf.getKey()));
-                    } else if (!conf.getKey().startsWith("dfs")){
-                        mapred_site.set(conf.getKey(), m_mr_conf.getRaw(conf.getKey()));
-                    }
-                }
-            }
-
-            mapred_site.writeXml(new FileOutputStream(MAPRED_CONF_FILE));
-            yarn_site.writeXml(new FileOutputStream(YARN_CONF_FILE));
-
             // Write tez-site.xml
             Configuration tez_conf = new Configuration(false);
             // TODO PIG-3659 - Remove this once memory management is fixed
@@ -150,12 +86,9 @@ public class TezMiniCluster extends Mini
                 }
             }
 
-            m_conf = m_mr_conf;
             // Turn FetchOptimizer off so that we can actually test Tez
             m_conf.set(PigConfiguration.PIG_OPT_FETCH, System.getProperty("test.opt.fetch", "false"));
 
-            System.setProperty("junit.hadoop.conf", CONF_DIR.getPath());
-            System.setProperty("hadoop.log.dir", "build/test/logs");
         } catch (IOException e) {
             throw new RuntimeException(e);
         }
@@ -169,29 +102,15 @@ public class TezMiniCluster extends Mini
 
     @Override
     protected void shutdownMiniMrClusters() {
-        deleteConfFiles();
-        if (m_mr != null) {
-            m_mr.stop();
-            m_mr = null;
-        }
+        super.shutdownMiniMrClusters();
     }
 
-    private void deleteConfFiles() {
+    @Override
+    protected void deleteConfFiles() {
+        super.deleteConfFiles();
         if(TEZ_CONF_FILE.exists()) {
             TEZ_CONF_FILE.delete();
         }
-        if(CORE_CONF_FILE.exists()) {
-            CORE_CONF_FILE.delete();
-        }
-        if(HDFS_CONF_FILE.exists()) {
-            HDFS_CONF_FILE.delete();
-        }
-        if(MAPRED_CONF_FILE.exists()) {
-            MAPRED_CONF_FILE.delete();
-        }
-        if(YARN_CONF_FILE.exists()) {
-            YARN_CONF_FILE.delete();
-        }
     }
 
     static public Launcher getLauncher() {

Modified: pig/branches/spark/test/org/apache/pig/test/Util.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/Util.java?rev=1786266&r1=1786265&r2=1786266&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/Util.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/Util.java Fri Mar 10 02:36:59 2017
@@ -1342,6 +1342,64 @@ public class Util {
         return false;
     }
 
+    public static void sortQueryOutputsIfNeed(List<Tuple> actualResList, boolean toSort){
+        if( toSort == true) {
+            for (Tuple t : actualResList) {
+                Util.convertBagToSortedBag(t);
+            }
+            Collections.sort(actualResList);
+        }
+    }
+
+    public static void checkQueryOutputs(Iterator<Tuple> actualResults, List<Tuple> expectedResults, boolean checkAfterSort) {
+        if (checkAfterSort) {
+            checkQueryOutputsAfterSort(actualResults, expectedResults);
+        } else {
+            checkQueryOutputs(actualResults, expectedResults);
+        }
+    }
+
+    static public void checkQueryOutputs(Iterator<Tuple> actualResultsIt,
+                                         String[] expectedResArray, LogicalSchema schema, boolean
+            checkAfterSort) throws IOException {
+        if (checkAfterSort) {
+            checkQueryOutputsAfterSortRecursive(actualResultsIt,
+                    expectedResArray, schema);
+        } else {
+            checkQueryOutputs(actualResultsIt,
+                    expectedResArray, schema);
+        }
+    }
+
+    static void checkQueryOutputs(Iterator<Tuple> actualResultsIt,
+                                         String[] expectedResArray, LogicalSchema schema) throws IOException {
+        LogicalFieldSchema fs = new LogicalFieldSchema("tuple", schema, DataType.TUPLE);
+        ResourceFieldSchema rfs = new ResourceFieldSchema(fs);
+
+        LoadCaster caster = new Utf8StorageConverter();
+        List<Tuple> actualResList = new ArrayList<Tuple>();
+        while (actualResultsIt.hasNext()) {
+            actualResList.add(actualResultsIt.next());
+        }
+
+        List<Tuple> expectedResList = new ArrayList<Tuple>();
+        for (String str : expectedResArray) {
+            Tuple newTuple = caster.bytesToTuple(str.getBytes(), rfs);
+            expectedResList.add(newTuple);
+        }
+
+        for (Tuple t : actualResList) {
+            convertBagToSortedBag(t);
+        }
+
+        for (Tuple t : expectedResList) {
+            convertBagToSortedBag(t);
+        }
+
+        Assert.assertEquals("Comparing actual and expected results. ",
+                expectedResList, actualResList);
+    }
+
     public static void assertParallelValues(long defaultParallel,
                                              long requestedParallel,
                                              long estimatedParallel,

Added: pig/branches/spark/test/org/apache/pig/test/YarnMiniCluster.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/YarnMiniCluster.java?rev=1786266&view=auto
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/YarnMiniCluster.java (added)
+++ pig/branches/spark/test/org/apache/pig/test/YarnMiniCluster.java Fri Mar 10 02:36:59 2017
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.pig.ExecType;
+
+public abstract class YarnMiniCluster extends MiniGenericCluster {
+    protected static final File CONF_DIR = new File("build/classes");
+    protected static final File CORE_CONF_FILE = new File(CONF_DIR, "core-site.xml");
+    protected static final File HDFS_CONF_FILE = new File(CONF_DIR, "hdfs-site.xml");
+    protected static final File MAPRED_CONF_FILE = new File(CONF_DIR, "mapred-site.xml");
+    protected static final File YARN_CONF_FILE = new File(CONF_DIR, "yarn-site.xml");
+
+
+    protected Configuration m_dfs_conf = null;
+    protected MiniMRYarnCluster m_mr = null;
+    protected Configuration m_mr_conf = null;
+
+
+    @Override
+    protected void setupMiniDfsAndMrClusters() {
+        try {
+            deleteConfFiles();
+            CONF_DIR.mkdirs();
+
+            // Build mini DFS cluster
+            Configuration hdfsConf = new Configuration();
+            m_dfs = new MiniDFSCluster.Builder(hdfsConf)
+                    .numDataNodes(2)
+                    .format(true)
+                    .racks(null)
+                    .build();
+            m_fileSys = m_dfs.getFileSystem();
+            m_dfs_conf = m_dfs.getConfiguration(0);
+
+            //Create user home directory
+            m_fileSys.mkdirs(m_fileSys.getWorkingDirectory());
+            // Write core-site.xml
+            Configuration core_site = new Configuration(false);
+            core_site.set(FileSystem.FS_DEFAULT_NAME_KEY, m_dfs_conf.get(FileSystem.FS_DEFAULT_NAME_KEY));
+            core_site.writeXml(new FileOutputStream(CORE_CONF_FILE));
+
+            Configuration hdfs_site = new Configuration(false);
+            for (Map.Entry<String, String> conf : m_dfs_conf) {
+                if (ArrayUtils.contains(m_dfs_conf.getPropertySources(conf.getKey()), "programatically")) {
+                    hdfs_site.set(conf.getKey(), m_dfs_conf.getRaw(conf.getKey()));
+                }
+            }
+            hdfs_site.writeXml(new FileOutputStream(HDFS_CONF_FILE));
+
+            // Build mini YARN cluster
+            m_mr = new MiniMRYarnCluster("PigMiniCluster", 2);
+            m_mr.init(m_dfs_conf);
+            m_mr.start();
+            m_mr_conf = m_mr.getConfig();
+            m_mr_conf.set(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+                    System.getProperty("java.class.path"));
+
+            Configuration mapred_site = new Configuration(false);
+            Configuration yarn_site = new Configuration(false);
+            for (Map.Entry<String, String> conf : m_mr_conf) {
+                if (ArrayUtils.contains(m_mr_conf.getPropertySources(conf.getKey()), "programatically")) {
+                    if (conf.getKey().contains("yarn")) {
+                        yarn_site.set(conf.getKey(), m_mr_conf.getRaw(conf.getKey()));
+                    } else if (!conf.getKey().startsWith("dfs")) {
+                        mapred_site.set(conf.getKey(), m_mr_conf.getRaw(conf.getKey()));
+                    }
+                }
+            }
+
+            mapred_site.writeXml(new FileOutputStream(MAPRED_CONF_FILE));
+            yarn_site.writeXml(new FileOutputStream(YARN_CONF_FILE));
+
+            m_conf = m_mr_conf;
+            System.setProperty("junit.hadoop.conf", CONF_DIR.getPath());
+            System.setProperty("hadoop.log.dir", "build/test/logs");
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+
+        }
+    }
+
+    protected void deleteConfFiles() {
+        if(CORE_CONF_FILE.exists()) {
+            CORE_CONF_FILE.delete();
+        }
+        if(HDFS_CONF_FILE.exists()) {
+            HDFS_CONF_FILE.delete();
+        }
+        if(MAPRED_CONF_FILE.exists()) {
+            MAPRED_CONF_FILE.delete();
+        }
+        if(YARN_CONF_FILE.exists()) {
+            YARN_CONF_FILE.delete();
+        }
+    }
+
+    @Override
+    protected void shutdownMiniMrClusters() {
+        deleteConfFiles();
+        if (m_mr != null) {
+            m_mr.stop();
+            m_mr = null;
+        }
+    }
+}
\ No newline at end of file