You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by xu...@apache.org on 2017/03/10 02:36:59 UTC
svn commit: r1786266 - in /pig/branches/spark: ./
src/docs/src/documentation/content/xdocs/
src/org/apache/pig/backend/hadoop/executionengine/spark/converter/
src/org/apache/pig/data/ src/org/apache/pig/tools/pigstats/spark/
test/org/apache/pig/spark/ ...
Author: xuefu
Date: Fri Mar 10 02:36:59 2017
New Revision: 1786266
URL: http://svn.apache.org/viewvc?rev=1786266&view=rev
Log:
PIG-5133: Commit changes from last round of review on rb (Liyun via Xuefu)
Added:
pig/branches/spark/test/org/apache/pig/test/YarnMiniCluster.java
Modified:
pig/branches/spark/build.xml
pig/branches/spark/src/docs/src/documentation/content/xdocs/start.xml
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java
pig/branches/spark/src/org/apache/pig/data/SelfSpillBag.java
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java
pig/branches/spark/test/org/apache/pig/spark/TestIndexedKey.java
pig/branches/spark/test/org/apache/pig/test/SparkMiniCluster.java
pig/branches/spark/test/org/apache/pig/test/TestCase.java
pig/branches/spark/test/org/apache/pig/test/TestCombiner.java
pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline.java
pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline2.java
pig/branches/spark/test/org/apache/pig/test/TestEvalPipelineLocal.java
pig/branches/spark/test/org/apache/pig/test/TestGrunt.java
pig/branches/spark/test/org/apache/pig/test/TestLineageFindRelVisitor.java
pig/branches/spark/test/org/apache/pig/test/TestMergeJoin.java
pig/branches/spark/test/org/apache/pig/test/TestMergeJoinOuter.java
pig/branches/spark/test/org/apache/pig/test/TestMultiQuery.java
pig/branches/spark/test/org/apache/pig/test/TestProjectRange.java
pig/branches/spark/test/org/apache/pig/test/TestPruneColumn.java
pig/branches/spark/test/org/apache/pig/test/TestSecondarySort.java
pig/branches/spark/test/org/apache/pig/test/TestSkewedJoin.java
pig/branches/spark/test/org/apache/pig/test/TezMiniCluster.java
pig/branches/spark/test/org/apache/pig/test/Util.java
Modified: pig/branches/spark/build.xml
URL: http://svn.apache.org/viewvc/pig/branches/spark/build.xml?rev=1786266&r1=1786265&r2=1786266&view=diff
==============================================================================
--- pig/branches/spark/build.xml (original)
+++ pig/branches/spark/build.xml Fri Mar 10 02:36:59 2017
@@ -331,7 +331,7 @@
<fileset dir="${ivy.lib.dir}">
<include name="**.*jar"/>
</fileset>
- <fileset dir="${build.ivy.spark.lib.dir}/${ant.project.name}">
+ <fileset dir="${build.ivy.spark.lib.dir}">
<include name="**.*jar"/>
</fileset>
</path>
@@ -364,7 +364,7 @@
<path id="classpath">
<fileset file="${ivy.lib.dir}/${zookeeper.jarfile}"/>
<fileset dir="${ivy.lib.dir}" includes="*.jar"/>
- <fileset dir="${build.ivy.spark.lib.dir}/${ant.project.name}" includes="*.jar"/>
+ <fileset dir="${build.ivy.spark.lib.dir}" includes="*.jar"/>
</path>
<!-- javadoc-classpath -->
@@ -734,7 +734,7 @@
<target name="copySparkDependencies">
<mkdir dir="${spark.lib.dir}" />
<copy todir="${spark.lib.dir}">
- <fileset dir="${build.ivy.spark.lib.dir}/${ant.project.name}" includes="*.jar"/>
+ <fileset dir="${build.ivy.spark.lib.dir}" includes="*.jar"/>
</copy>
</target>
@@ -1654,7 +1654,7 @@
<ivy:retrieve settingsRef="${ant.project.name}.ivy.settings" log="${loglevel}"
pattern="${build.ivy.lib.dir}/${ivy.artifact.retrieve.pattern}" conf="compile"/>
<ivy:retrieve settingsRef="${ant.project.name}.ivy.settings" log="${loglevel}"
- pattern="${build.ivy.spark.lib.dir}/${ivy.artifact.retrieve.pattern}" conf="spark"/>
+ pattern="${build.ivy.spark.lib.dir}/[artifact]-[revision](-[classifier]).[ext]" conf="spark"/>
<ivy:cachepath pathid="compile.classpath" conf="compile"/>
</target>
Modified: pig/branches/spark/src/docs/src/documentation/content/xdocs/start.xml
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/docs/src/documentation/content/xdocs/start.xml?rev=1786266&r1=1786265&r2=1786266&view=diff
==============================================================================
--- pig/branches/spark/src/docs/src/documentation/content/xdocs/start.xml (original)
+++ pig/branches/spark/src/docs/src/documentation/content/xdocs/start.xml Fri Mar 10 02:36:59 2017
@@ -95,48 +95,44 @@ Test the Pig installation with this simp
<table>
<tr>
<td></td>
- <td><strong>Interactive Mode </strong></td>
- <td><strong>Batch Mode</strong> </td>
- </tr>
- <tr>
<td><strong>Local Mode</strong></td>
- <td>yes</td>
- <td>yes</td>
- </tr>
- <tr>
- <td><strong>Mapreduce Mode</strong></td>
- <td>yes</td>
- <td>yes</td>
- </tr>
- <tr>
<td><strong>Tez Local Mode</strong></td>
- <td>experimental</td>
- <td>experimental</td>
+ <td><strong>Spark Local Mode</strong></td>
+ <td><strong>Mapreduce Mode</strong></td>
+ <td><strong>Tez Mode</strong></td>
+ <td><strong>Spark Mode</strong></td>
</tr>
<tr>
- <td><strong>Tez Mode</strong></td>
+ <td><strong>Interactive Mode </strong></td>
+ <td>yes</td>
+ <td>experimental</td>
<td>yes</td>
<td>yes</td>
</tr>
<tr>
- <td><strong>Spark Mode</strong></td>
- <td>experimental</td>
+ <td><strong>Batch Mode</strong> </td>
+ <td>yes</td>
<td>experimental</td>
+ <td>yes</td>
+ <td>yes</td>
</tr>
</table>
<!-- ++++++++++++++++++++++++++++++++++ -->
<section id="execution-modes">
<title>Execution Modes</title>
-<p>Pig has five execution modes or exectypes: </p>
+<p>Pig has six execution modes or exectypes: </p>
<ul>
<li><strong>Local Mode</strong> - To run Pig in local mode, you need access to a single machine; all files are installed and run using your local host and file system. Specify local mode using the -x flag (pig -x local).
</li>
-<li><strong>Mapreduce Mode</strong> - To run Pig in mapreduce mode, you need access to a Hadoop cluster and HDFS installation. Mapreduce mode is the default mode; you can, <em>but don't need to</em>, specify it using the -x flag (pig OR pig -x mapreduce).
-</li>
<li><strong>Tez Local Mode</strong> - To run Pig in tez local mode. It is similar to local mode, except internally Pig will invoke tez runtime engine. Specify Tez local mode using the -x flag (pig -x tez_local).
<p><strong>Note:</strong> Tez local mode is experimental. There are some queries which just error out on bigger data in local mode.</p>
</li>
+<li><strong>Spark Local Mode</strong> - To run Pig in spark local mode. It is similar to local mode, except internally Pig will invoke spark runtime engine. Specify Spark local mode using the -x flag (pig -x spark_local).
+<p><strong>Note:</strong> Spark local mode is experimental. There are some queries which just error out on bigge data in local mode.</p>
+</li>
+<li><strong>Mapreduce Mode</strong> - To run Pig in mapreduce mode, you need access to a Hadoop cluster and HDFS installation. Mapreduce mode is the default mode; you can, <em>but don't need to</em>, specify it using the -x flag (pig OR pig -x mapreduce).
+</li>
<li><strong>Tez Mode</strong> - To run Pig in Tez mode, you need access to a Hadoop cluster and HDFS installation. Specify Tez mode using the -x flag (-x tez).
</li>
<li><strong>Spark Mode</strong> - To run Pig in Spark mode, you need access to a Spark, Yarn or Mesos cluster and HDFS installation. Specify Spark mode using the -x flag (-x spark). In Spark execution mode, it is necessary to set env::SPARK_MASTER to an appropriate value (local - local mode, yarn-client - yarn-client mode, mesos://host:port - spark on mesos or spark://host:port - spark cluster. For more information refer to spark documentation on Master Urls, <em>yarn-cluster mode is currently not supported</em>). Pig scripts run on Spark can take advantage of the <a href="http://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation">dynamic allocation</a> feature. The feature can be enabled by simply enabling <em>spark.dynamicAllocation.enabled</em>. Refer to spark <a href="http://spark.apache.org/docs/latest/configuration.html#dynamic-allocation">configuration</a> for additional configuration details. In general all properties in the pig script prefixed with
<em>spark.</em> are copied to the Spark Application Configuration. Please note that Yarn auxillary service need to be enabled on Spark for this to work. See Spark documentation for additional details.
@@ -159,6 +155,9 @@ $ pig -x local ...
/* Tez local mode */
$ pig -x tez_local ...
+/* Spark local mode */
+$ pig -x spark_local ...
+
/* mapreduce mode */
$ pig ...
or
@@ -203,6 +202,13 @@ $ pig -x tez_local
grunt>
</source>
+<p><strong>Spark Local Mode</strong></p>
+<source>
+$ pig -x spark_local
+... - Connecting to ...
+grunt>
+</source>
+
<p><strong>Mapreduce Mode</strong> </p>
<source>
$ pig -x mapreduce
@@ -259,6 +265,10 @@ $ pig -x local id.pig
<source>
$ pig -x tez_local id.pig
</source>
+<p><strong>Spark Local Mode</strong></p>
+<source>
+$ pig -x spark_local id.pig
+</source>
<p><strong>Mapreduce Mode</strong> </p>
<source>
$ pig id.pig
@@ -549,7 +559,7 @@ However, in a production environment you
<section id="tutorial">
<title>Pig Tutorial </title>
-<p>The Pig tutorial shows you how to run Pig scripts using Pig's local mode, mapreduce mode and Tez mode (see <a href="#execution-modes">Execution Modes</a>).</p>
+<p>The Pig tutorial shows you how to run Pig scripts using Pig's local mode, mapreduce mode, Tez mode and Spark mode (see <a href="#execution-modes">Execution Modes</a>).</p>
<p>To get started, do the following preliminary tasks:</p>
@@ -600,6 +610,10 @@ Or if you are using Tez local mode:
<source>
$ pig -x tez_local script1-local.pig
</source>
+Or if you are using Spark local mode:
+<source>
+$ pig -x spark_local script1-local.pig
+</source>
</li>
<li>Review the result files, located in the script1-local-results.txt directory.
<p>The output may contain a few Hadoop warnings which can be ignored:</p>
@@ -613,7 +627,7 @@ $ pig -x tez_local script1-local.pig
<!-- ++++++++++++++++++++++++++++++++++ -->
<section>
-<title> Running the Pig Scripts in Mapreduce Mode or Tez Mode</title>
+<title> Running the Pig Scripts in Mapreduce Mode, Tez Mode or Spark Mode</title>
<p>To run the Pig scripts in mapreduce mode, do the following: </p>
<ol>
@@ -632,6 +646,8 @@ export PIG_CLASSPATH=/mycluster/conf
<source>
export PIG_CLASSPATH=/mycluster/conf:/tez/conf
</source>
+<p>If you are using Spark, you will also need to specify SPARK_HOME and specify SPARK_JAR which is the hdfs location where you upload $SPARK_HOME/lib/spark-assembly*.jar:</p>
+<source>export SPARK_HOME=/mysparkhome/; export SPARK_JAR=hdfs://example.com:8020/spark-assembly*.jar</source>
<p><strong>Note:</strong> The PIG_CLASSPATH can also be used to add any other 3rd party dependencies or resource files a pig script may require. If there is also a need to make the added entries take the highest precedence in the Pig JVM's classpath order, one may also set the env-var PIG_USER_CLASSPATH_FIRST to any value, such as 'true' (and unset the env-var to disable).</p></li>
<li>Set the HADOOP_CONF_DIR environment variable to the location of the cluster configuration directory:
<source>
@@ -646,6 +662,10 @@ Or if you are using Tez:
<source>
$ pig -x tez script1-hadoop.pig
</source>
+Or if you are using Spark:
+<source>
+$ pig -x spark script1-hadoop.pig
+</source>
</li>
<li>Review the result files, located in the script1-hadoop-results or script2-hadoop-results HDFS directory:
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java?rev=1786266&r1=1786265&r2=1786266&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/SortConverter.java Fri Mar 10 02:36:59 2017
@@ -54,7 +54,6 @@ public class SortConverter implements RD
SparkUtil.getManifest(Tuple.class),
SparkUtil.getManifest(Object.class));
-
JavaPairRDD<Tuple, Object> sorted = r.sortByKey(
sortOperator.getMComparator(), true, parallelism);
JavaRDD<Tuple> mapped = sorted.mapPartitions(TO_VALUE_FUNCTION);
Modified: pig/branches/spark/src/org/apache/pig/data/SelfSpillBag.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/data/SelfSpillBag.java?rev=1786266&r1=1786265&r2=1786266&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/data/SelfSpillBag.java (original)
+++ pig/branches/spark/src/org/apache/pig/data/SelfSpillBag.java Fri Mar 10 02:36:59 2017
@@ -29,6 +29,7 @@ import org.apache.pig.classification.Int
@InterfaceStability.Evolving
public abstract class SelfSpillBag extends DefaultAbstractBag {
private static final long serialVersionUID = 1L;
+ // SelfSpillBag$MemoryLimits is not serializable
//in spark mode, if we don't set memLimit transient, it will throw NotSerializableExecption(See PIG-4611)
protected transient MemoryLimits memLimit;
Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java?rev=1786266&r1=1786265&r2=1786266&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java Fri Mar 10 02:36:59 2017
@@ -40,7 +40,6 @@ import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.tools.pigstats.InputStats;
import org.apache.pig.tools.pigstats.JobStats;
-import org.apache.pig.tools.pigstats.OutputStats;
import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.tools.pigstats.ScriptState;
import org.apache.spark.api.java.JavaSparkContext;
Modified: pig/branches/spark/test/org/apache/pig/spark/TestIndexedKey.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/spark/TestIndexedKey.java?rev=1786266&r1=1786265&r2=1786266&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/spark/TestIndexedKey.java (original)
+++ pig/branches/spark/test/org/apache/pig/spark/TestIndexedKey.java Fri Mar 10 02:36:59 2017
@@ -1,14 +1,19 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the
- * NOTICE file distributed with this work for additional information regarding copyright ownership. The ASF
- * licenses this file to you under the Apache License, Version 2.0 (the "License"); you may not use this file
- * except in compliance with the License. You may obtain a copy of the License at
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
*
- * http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
- * Unless required by applicable law or agreed to in writing, software distributed under the License is
- * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and limitations under the License.
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.pig.spark;
Modified: pig/branches/spark/test/org/apache/pig/test/SparkMiniCluster.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/SparkMiniCluster.java?rev=1786266&r1=1786265&r2=1786266&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/SparkMiniCluster.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/SparkMiniCluster.java Fri Mar 10 02:36:59 2017
@@ -35,117 +35,17 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.spark.SparkExecType;
import org.apache.pig.backend.hadoop.executionengine.spark.SparkLauncher;
-public class SparkMiniCluster extends MiniGenericCluster {
- private static final File CONF_DIR = new File("build/classes");
- private static final File CORE_CONF_FILE = new File(CONF_DIR, "core-site.xml");
- private static final File HDFS_CONF_FILE = new File(CONF_DIR, "hdfs-site.xml");
- private static final File MAPRED_CONF_FILE = new File(CONF_DIR, "mapred-site.xml");
- private static final File YARN_CONF_FILE = new File(CONF_DIR, "yarn-site.xml");
-
- private Configuration m_dfs_conf = null;
- protected MiniMRYarnCluster m_mr = null;
- private Configuration m_mr_conf = null;
+public class SparkMiniCluster extends YarnMiniCluster {
private static final Log LOG = LogFactory
.getLog(SparkMiniCluster.class);
private ExecType spark = new SparkExecType();
- SparkMiniCluster() {
-
- }
@Override
public ExecType getExecType() {
return spark;
}
- @Override
- protected void setupMiniDfsAndMrClusters() {
- try {
- deleteConfFiles();
- CONF_DIR.mkdirs();
-
- // Build mini DFS cluster
- Configuration hdfsConf = new Configuration();
- m_dfs = new MiniDFSCluster.Builder(hdfsConf)
- .numDataNodes(2)
- .format(true)
- .racks(null)
- .build();
- m_fileSys = m_dfs.getFileSystem();
- m_dfs_conf = m_dfs.getConfiguration(0);
-
- //Create user home directory
- m_fileSys.mkdirs(m_fileSys.getWorkingDirectory());
- // Write core-site.xml
- Configuration core_site = new Configuration(false);
- core_site.set(FileSystem.FS_DEFAULT_NAME_KEY, m_dfs_conf.get(FileSystem.FS_DEFAULT_NAME_KEY));
- core_site.writeXml(new FileOutputStream(CORE_CONF_FILE));
-
- Configuration hdfs_site = new Configuration(false);
- for (Map.Entry<String, String> conf : m_dfs_conf) {
- if (ArrayUtils.contains(m_dfs_conf.getPropertySources(conf.getKey()), "programatically")) {
- hdfs_site.set(conf.getKey(), m_dfs_conf.getRaw(conf.getKey()));
- }
- }
- hdfs_site.writeXml(new FileOutputStream(HDFS_CONF_FILE));
-
- // Build mini YARN cluster
- m_mr = new MiniMRYarnCluster("PigMiniCluster", 2);
- m_mr.init(m_dfs_conf);
- m_mr.start();
- m_mr_conf = m_mr.getConfig();
- m_mr_conf.set(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
- System.getProperty("java.class.path"));
-
- Configuration mapred_site = new Configuration(false);
- Configuration yarn_site = new Configuration(false);
- for (Map.Entry<String, String> conf : m_mr_conf) {
- if (ArrayUtils.contains(m_mr_conf.getPropertySources(conf.getKey()), "programatically")) {
- if (conf.getKey().contains("yarn")) {
- yarn_site.set(conf.getKey(), m_mr_conf.getRaw(conf.getKey()));
- } else if (!conf.getKey().startsWith("dfs")){
- mapred_site.set(conf.getKey(), m_mr_conf.getRaw(conf.getKey()));
- }
- }
- }
-
- mapred_site.writeXml(new FileOutputStream(MAPRED_CONF_FILE));
- yarn_site.writeXml(new FileOutputStream(YARN_CONF_FILE));
-
- m_conf = m_mr_conf;
- System.setProperty("junit.hadoop.conf", CONF_DIR.getPath());
- System.setProperty("hadoop.log.dir", "build/test/logs");
- } catch (IOException e) {
- throw new RuntimeException(e);
-
- }
- }
-
- @Override
- protected void shutdownMiniMrClusters() {
- deleteConfFiles();
- if (m_mr != null) {
- m_mr.stop();
- m_mr = null;
- }
- }
-
- private void deleteConfFiles() {
-
- if(CORE_CONF_FILE.exists()) {
- CORE_CONF_FILE.delete();
- }
- if(HDFS_CONF_FILE.exists()) {
- HDFS_CONF_FILE.delete();
- }
- if(MAPRED_CONF_FILE.exists()) {
- MAPRED_CONF_FILE.delete();
- }
- if(YARN_CONF_FILE.exists()) {
- YARN_CONF_FILE.delete();
- }
- }
-
static public Launcher getLauncher() {
return new SparkLauncher();
}
Modified: pig/branches/spark/test/org/apache/pig/test/TestCase.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestCase.java?rev=1786266&r1=1786265&r2=1786266&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestCase.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestCase.java Fri Mar 10 02:36:59 2017
@@ -275,7 +275,8 @@ public class TestCase {
"(3,3n,{(c,x),(c,y)})"
};
Schema s = pigServer.dumpSchema("C");
- Util.checkQueryOutputsAfterSortRecursive(out.iterator(), expected, org.apache.pig.newplan.logical.Util.translateSchema(s));
+ Util.checkQueryOutputs(out.iterator(), expected, org.apache.pig.newplan.logical.Util.translateSchema(s),
+ Util.isSparkExecType(Util.getLocalTestMode()));
}
/**
Modified: pig/branches/spark/test/org/apache/pig/test/TestCombiner.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestCombiner.java?rev=1786266&r1=1786265&r2=1786266&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestCombiner.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestCombiner.java Fri Mar 10 02:36:59 2017
@@ -406,12 +406,12 @@ public class TestCombiner {
pigServer.shutdown();
}
- private void checkCombinerUsed(PigServer pigServer, String variable, boolean combineExpected)
+ private void checkCombinerUsed(PigServer pigServer, String alias, boolean combineExpected)
throws IOException {
// make sure there is a combine plan in the explain output
ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintStream ps = new PrintStream(baos);
- pigServer.explain(variable, ps);
+ pigServer.explain(alias, ps);
boolean combinerFound;
if (pigServer.getPigContext().getExecType().name().equalsIgnoreCase("spark")) {
combinerFound = baos.toString().contains("Reduce By");
Modified: pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline.java?rev=1786266&r1=1786265&r2=1786266&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline.java Fri Mar 10 02:36:59 2017
@@ -428,12 +428,7 @@ public class TestEvalPipeline {
actualResList.add(iter.next());
}
- if (Util.isSparkExecType(cluster.getExecType())) {
- for (Tuple t : actualResList) {
- Util.convertBagToSortedBag(t);
- }
- Collections.sort(actualResList);
- }
+ Util.sortQueryOutputsIfNeed(actualResList, Util.isSparkExecType(cluster.getExecType()));
int numIdentity = 0;
for (Tuple t : actualResList) {
@@ -484,12 +479,7 @@ public class TestEvalPipeline {
actualResList.add(iter.next());
}
- if (Util.isSparkExecType(cluster.getExecType())) {
- for (Tuple t : actualResList) {
- Util.convertBagToSortedBag(t);
- }
- Collections.sort(actualResList);
- }
+ Util.sortQueryOutputsIfNeed(actualResList, Util.isSparkExecType(cluster.getExecType()));
int numIdentity = 0;
for (Tuple t : actualResList) {
@@ -870,12 +860,7 @@ public class TestEvalPipeline {
actualResList.add(iter.next());
}
- if (Util.isSparkExecType(cluster.getExecType())) {
- for (Tuple t : actualResList) {
- Util.convertBagToSortedBag(t);
- }
- Collections.sort(actualResList);
- }
+ Util.sortQueryOutputsIfNeed(actualResList, Util.isSparkExecType(cluster.getExecType()));
int numIdentity = 0;
for (Tuple t : actualResList) {
Modified: pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline2.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline2.java?rev=1786266&r1=1786265&r2=1786266&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline2.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestEvalPipeline2.java Fri Mar 10 02:36:59 2017
@@ -143,12 +143,7 @@ public class TestEvalPipeline2 {
actualResList.add(iter.next());
}
- if (Util.isSparkExecType(cluster.getExecType())) {
- for (Tuple t : actualResList) {
- Util.convertBagToSortedBag(t);
- }
- Collections.sort(actualResList);
- }
+ Util.sortQueryOutputsIfNeed(actualResList,Util.isSparkExecType(cluster.getExecType()));
int numIdentity = 0;
for (Tuple tuple : actualResList) {
@@ -490,8 +485,8 @@ public class TestEvalPipeline2 {
if (Util.isSparkExecType(cluster.getExecType())) {
String[] expectedResults =
new String[] {"(2,{(2,2)},{(2,5,2)})", "(1,{(1,1)},{(1,2,3)})" };
- Util.checkQueryOutputsAfterSortRecursive(iter, expectedResults,
- org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("D")));
+ Util.checkQueryOutputs(iter, expectedResults,
+ org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("D")), Util.isSparkExecType(cluster.getExecType()));
} else {
Assert.assertTrue(iter.hasNext());
Tuple t = iter.next();
@@ -760,15 +755,7 @@ public class TestEvalPipeline2 {
List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
new String[]{"(60000L,2L,3L)", "(120000L,2L,2L)", "(240000L,1L,1L)"});
- if (Util.isSparkExecType(cluster.getExecType())) {
- Util.checkQueryOutputsAfterSort(iter, expectedResults);
- } else {
- // Even though GROUP BY does not return results sorted by key, that
- // is the current behavior for MR, which some users rely on. Let's
- // not sort the results for MR and Tez so that we can catch it when
- // current MR/Tez behavior changes.
- Util.checkQueryOutputs(iter, expectedResults);
- }
+ Util.checkQueryOutputs(iter, expectedResults, Util.isSparkExecType(cluster.getExecType()));
}
// See PIG-1729
@@ -1610,8 +1597,9 @@ public class TestEvalPipeline2 {
String[] expected = new String[] {"(1,A)", "(1,B)", "(2,C)"};
- Util.checkQueryOutputsAfterSortRecursive(iter, expected,
- org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("flattened")));
+ Util.checkQueryOutputs(iter, expected,
+ org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("flattened")),
+ Util.isSparkExecType(cluster.getExecType()));
}
// See PIG-2237
Modified: pig/branches/spark/test/org/apache/pig/test/TestEvalPipelineLocal.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestEvalPipelineLocal.java?rev=1786266&r1=1786265&r2=1786266&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestEvalPipelineLocal.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestEvalPipelineLocal.java Fri Mar 10 02:36:59 2017
@@ -897,7 +897,7 @@ public class TestEvalPipelineLocal {
}
numIdentity = resList.size();
- Collections.sort(resList);
+ Util.sortQueryOutputsIfNeed(resList, Util.isSparkExecType(Util.getLocalTestMode()));
Assert.assertEquals(LOOP_COUNT, numIdentity);
// Since delta differences in some cases are allowed, utility function
// to compare tuple-lists cannot be used here.
@@ -937,7 +937,7 @@ public class TestEvalPipelineLocal {
expectedList.add(t);
}
}
- Collections.sort(expectedList);
+ Util.sortQueryOutputsIfNeed(expectedList, Util.isSparkExecType(Util.getLocalTestMode()));
ps.close();
pigServer.registerQuery("A = LOAD '"
@@ -961,7 +961,7 @@ public class TestEvalPipelineLocal {
resList.add(iter.next());
}
- Collections.sort(resList);
+ Util.sortQueryOutputsIfNeed(resList, Util.isSparkExecType(Util.getLocalTestMode()));
Assert.assertEquals((LOOP_COUNT * LOOP_COUNT)/2, resList.size());
// Since delta difference in some cases is allowed, utility function
@@ -1014,7 +1014,7 @@ public class TestEvalPipelineLocal {
if(!iter.hasNext()) Assert.fail("No output found");
// When ruuning with spark, output can be in a different order than that
// when running in mr mode.
- Util.checkQueryOutputsAfterSort(iter, expectedList);
+ Util.checkQueryOutputs(iter, expectedList, Util.isSparkExecType(Util.getLocalTestMode()));
}
@Test
@@ -1141,7 +1141,7 @@ public class TestEvalPipelineLocal {
"((1,2),{((1,2),3)})",
"((4,5),{((4,5),6)})"
});
- Util.checkQueryOutputsAfterSort(iter, expectedRes);
+ Util.checkQueryOutputs(iter, expectedRes, Util.isSparkExecType(Util.getLocalTestMode()));
}
@Test
Modified: pig/branches/spark/test/org/apache/pig/test/TestGrunt.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestGrunt.java?rev=1786266&r1=1786265&r2=1786266&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestGrunt.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestGrunt.java Fri Mar 10 02:36:59 2017
@@ -935,14 +935,14 @@ public class TestGrunt {
// in spark mode, the output file 'baz' will not be automatically deleted even the job fails(see SPARK-7953)
// when "cat baz;" is executed, it does not throw exception and the variable "caught" is false
// TODO: Enable this for Spark when SPARK-7953 is resolved
- if(!Util.isSparkExecType(cluster.getExecType())) {
- try {
- grunt.exec();
- } catch (Exception e) {
- caught = true;
- assertTrue(e.getMessage().contains("baz does not exist"));
- }
- assertTrue(caught);
+ Assume.assumeTrue(
+ "Skip this test for Spark until SPARK-7953 is resolved!",
+ !Util.isSparkExecType(cluster.getExecType()));
+ try {
+ grunt.exec();
+ } catch (Exception e) {
+ caught = true;
+ assertTrue(e.getMessage().contains("baz does not exist"));
}
}
Modified: pig/branches/spark/test/org/apache/pig/test/TestLineageFindRelVisitor.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestLineageFindRelVisitor.java?rev=1786266&r1=1786265&r2=1786266&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestLineageFindRelVisitor.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestLineageFindRelVisitor.java Fri Mar 10 02:36:59 2017
@@ -259,8 +259,10 @@ public class TestLineageFindRelVisitor {
pig.registerQuery("E = FOREACH D GENERATE (chararray) tupleD.a1;\n");
Iterator<Tuple> iter = pig.openIterator("E");
- Util.checkQueryOutputsAfterSortRecursive(iter, new String[]{"(123)","(456)","(789)"},
- org.apache.pig.newplan.logical.Util.translateSchema(pig.dumpSchema("E")));
+ Util.checkQueryOutputs(iter,
+ new String[]{"(123)", "(456)", "(789)"},
+ org.apache.pig.newplan.logical.Util.translateSchema(pig.dumpSchema("E")), Util.isSparkExecType(Util
+ .getLocalTestMode()));
}
@Test
Modified: pig/branches/spark/test/org/apache/pig/test/TestMergeJoin.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestMergeJoin.java?rev=1786266&r1=1786265&r2=1786266&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestMergeJoin.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestMergeJoin.java Fri Mar 10 02:36:59 2017
@@ -634,13 +634,6 @@ public class TestMergeJoin {
@Test
public void testExpressionFail() throws IOException{
- // This test validates that join keys are not expressions.
- // Expressions cannot be handled when the storage function
- // implements IndexableLoadFunc.
- // TODO: Enable this test when Spark engine implements Merge Join algorithm.
- if (Util.isSparkExecType(cluster.getExecType()))
- return;
-
pigServer.registerQuery("A = LOAD 'leftinput' as (a:int);");
pigServer.registerQuery("B = LOAD 'temp_file*' using " +
DummyIndexableLoader.class.getName() + "() as (a:int);");
@@ -685,11 +678,6 @@ public class TestMergeJoin {
@Test
public void testMergeJoinWithCommaSeparatedFilePaths() throws IOException{
- // Spark engine currently implements merge join as regular join
- // TODO: Enable this test when Spark engine implements Merge Join algorithm.
- if (Util.isSparkExecType(cluster.getExecType()))
- return;
-
pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "';");
pigServer.registerQuery("B = LOAD 'temp_file,righinput_file' using " +
DummyIndexableLoader.class.getName() + "();");
Modified: pig/branches/spark/test/org/apache/pig/test/TestMergeJoinOuter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestMergeJoinOuter.java?rev=1786266&r1=1786265&r2=1786266&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestMergeJoinOuter.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestMergeJoinOuter.java Fri Mar 10 02:36:59 2017
@@ -169,10 +169,6 @@ public class TestMergeJoinOuter {
@Test
public void testLeftOuter() throws IOException {
- // TODO: Enable this test when Spark engine implements Merge Join algorithm.
- if (Util.isSparkExecType(cluster.getExecType()))
- return;
-
pigServer.registerQuery("A = LOAD '"+INPUT_FILE1+"' using "+ DummyCollectableLoader.class.getName() +"() as (c1:chararray, c2:chararray);");
pigServer.registerQuery("B = LOAD '"+INPUT_FILE2+"' using "+ DummyIndexableLoader.class.getName() +"() as (c1:chararray, c2:chararray);");
@@ -202,10 +198,6 @@ public class TestMergeJoinOuter {
@Test
public void testRightOuter() throws IOException{
- // TODO: Enable this test when Spark engine implements Merge Join algorithm.
- if (Util.isSparkExecType(cluster.getExecType()))
- return;
-
pigServer.registerQuery("A = LOAD '"+INPUT_FILE1+"' using "+ DummyCollectableLoader.class.getName() +"() as (c1:chararray, c2:chararray);");
pigServer.registerQuery("B = LOAD '"+INPUT_FILE2+"' using "+ DummyIndexableLoader.class.getName() +"() as (c1:chararray, c2:chararray);");
pigServer.registerQuery("C = join A by c1 right, B by c1 using 'merge';");
@@ -233,10 +225,6 @@ public class TestMergeJoinOuter {
@Test
public void testFullOuter() throws IOException{
- // TODO: Enable this test when Spark engine implements Merge Join algorithm.
- if (Util.isSparkExecType(cluster.getExecType()))
- return;
-
pigServer.registerQuery("A = LOAD '"+INPUT_FILE1+"' using "+ DummyCollectableLoader.class.getName() +"() as (c1:chararray, c2:chararray);");
pigServer.registerQuery("B = LOAD '"+INPUT_FILE2+"' using "+ DummyIndexableLoader.class.getName() +"() as (c1:chararray, c2:chararray);");
pigServer.registerQuery("C = join A by c1 full, B by c1 using 'merge';");
Modified: pig/branches/spark/test/org/apache/pig/test/TestMultiQuery.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestMultiQuery.java?rev=1786266&r1=1786265&r2=1786266&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestMultiQuery.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestMultiQuery.java Fri Mar 10 02:36:59 2017
@@ -114,7 +114,8 @@ public class TestMultiQuery {
"(2,3)"
};
Schema s = myPig.dumpSchema("E");
- Util.checkQueryOutputsAfterSortRecursive(iter, expectedResults, org.apache.pig.newplan.logical.Util.translateSchema(s));
+ Util.checkQueryOutputs(iter, expectedResults, org.apache.pig.newplan.logical.Util.translateSchema(s), Util
+ .isSparkExecType(Util.getLocalTestMode()));
myPig.registerQuery("E = load 'output2' as (a:int, b:int);");
@@ -125,7 +126,8 @@ public class TestMultiQuery {
"(3,4)"
};
s = myPig.dumpSchema("E");
- Util.checkQueryOutputsAfterSortRecursive(iter, expectedResults, org.apache.pig.newplan.logical.Util.translateSchema(s));
+ Util.checkQueryOutputs(iter, expectedResults, org.apache.pig.newplan.logical.Util.translateSchema(s), Util
+ .isSparkExecType(Util.getLocalTestMode()));
}
@Test
@@ -165,7 +167,8 @@ public class TestMultiQuery {
"(5,6)"
};
Schema s = myPig.dumpSchema("F");
- Util.checkQueryOutputsAfterSortRecursive(iter, expectedResults, org.apache.pig.newplan.logical.Util.translateSchema(s));
+ Util.checkQueryOutputs(iter, expectedResults, org.apache.pig.newplan.logical.Util.translateSchema(s), Util
+ .isSparkExecType(Util.getLocalTestMode()));
}
@Test
@@ -292,7 +295,8 @@ public class TestMultiQuery {
"(3L,persimmon,5,3L,persimmon,3L,{(3L)})"
};
Schema s = myPig.dumpSchema("E");
- Util.checkQueryOutputsAfterSortRecursive(iter, expectedResults, org.apache.pig.newplan.logical.Util.translateSchema(s));
+ Util.checkQueryOutputs(iter, expectedResults, org.apache.pig.newplan.logical.Util.translateSchema(s), Util
+ .isSparkExecType(Util.getLocalTestMode()));
}
@Test
@@ -332,7 +336,8 @@ public class TestMultiQuery {
"(strawberry,{(30,strawberry,quit,bot)},{})"};
Schema s = myPig.dumpSchema("joined_session_info");
- Util.checkQueryOutputsAfterSortRecursive(iter, expectedResults, org.apache.pig.newplan.logical.Util.translateSchema(s));
+ Util.checkQueryOutputs(iter, expectedResults, org.apache.pig.newplan.logical.Util.translateSchema(s), Util
+ .isSparkExecType(Util.getLocalTestMode()));
}
@Test
@@ -930,20 +935,24 @@ public class TestMultiQuery {
List<Tuple> actualResults = data.get("output1");
String[] expectedResults = new String[]{"(12, 1)"};
- Util.checkQueryOutputsAfterSortRecursive(actualResults.iterator(), expectedResults, org.apache.pig.newplan.logical.Util.translateSchema(myPig.dumpSchema("B1")));
+ Util.checkQueryOutputs(actualResults.iterator(), expectedResults, org.apache.pig.newplan
+ .logical.Util.translateSchema(myPig.dumpSchema("B1")), Util.isSparkExecType(Util.getLocalTestMode()));
actualResults = data.get("output2");
expectedResults = new String[]{"(c,1)"};
- Util.checkQueryOutputsAfterSortRecursive(actualResults.iterator(), expectedResults, org.apache.pig.newplan.logical.Util.translateSchema(myPig.dumpSchema("B2")));
+ Util.checkQueryOutputs(actualResults.iterator(), expectedResults, org.apache.pig.newplan.logical.Util
+ .translateSchema(myPig.dumpSchema("B2")), Util.isSparkExecType(Util.getLocalTestMode()));
actualResults = data.get("output3");
expectedResults = new String[]{"(-12, 1)"};
- Util.checkQueryOutputsAfterSortRecursive(actualResults.iterator(), expectedResults, org.apache.pig.newplan.logical.Util.translateSchema(myPig.dumpSchema("C1")));
+ Util.checkQueryOutputs(actualResults.iterator(), expectedResults, org.apache.pig.newplan.logical.Util
+ .translateSchema(myPig.dumpSchema("C1")), Util.isSparkExecType(Util.getLocalTestMode()));
actualResults = data.get("output4");
expectedResults = new String[]{"(d,1)"};
- Util.checkQueryOutputsAfterSortRecursive(actualResults.iterator(), expectedResults, org.apache.pig.newplan.logical.Util.translateSchema(myPig.dumpSchema("C2")));
+ Util.checkQueryOutputs(actualResults.iterator(), expectedResults, org.apache.pig.newplan.logical.Util
+ .translateSchema(myPig.dumpSchema("C2")), Util.isSparkExecType(Util.getLocalTestMode()));
}
// --------------------------------------------------------------------------
Modified: pig/branches/spark/test/org/apache/pig/test/TestProjectRange.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestProjectRange.java?rev=1786266&r1=1786265&r2=1786266&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestProjectRange.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestProjectRange.java Fri Mar 10 02:36:59 2017
@@ -650,7 +650,8 @@ public class TestProjectRange {
"(11,{(11,21,31,41,51)})",
};
Schema s = pigServer.dumpSchema("f");
- Util.checkQueryOutputsAfterSortRecursive(it, expectedRes,org.apache.pig.newplan.logical.Util.translateSchema(s));
+ Util.checkQueryOutputs(it, expectedRes,org.apache.pig.newplan.logical.Util.translateSchema(s),
+ Util.isSparkExecType(cluster.getExecType()));
}
/**
@@ -737,7 +738,8 @@ public class TestProjectRange {
"(1,{(11,21,31,41,51),(10,20,30,40,50)})",
};
Schema s = pigServer.dumpSchema("f");
- Util.checkQueryOutputsAfterSortRecursive(it, expectedRes,org.apache.pig.newplan.logical.Util.translateSchema(s));
+ Util.checkQueryOutputs(it, expectedRes,org.apache.pig.newplan.logical.Util.translateSchema(s),
+ Util.isSparkExecType(cluster.getExecType()));
}
@@ -946,7 +948,8 @@ public class TestProjectRange {
};
Iterator<Tuple> it = pigServer.openIterator("g");
Schema s = pigServer.dumpSchema("g");
- Util.checkQueryOutputsAfterSortRecursive(it, expectedRes,org.apache.pig.newplan.logical.Util.translateSchema(s));
+ Util.checkQueryOutputs(it, expectedRes,org.apache.pig.newplan.logical.Util.translateSchema(s),
+ Util.isSparkExecType(cluster.getExecType()));
}
/**
@@ -1013,7 +1016,8 @@ public class TestProjectRange {
};
Iterator<Tuple> it = pigServer.openIterator("g");
Schema s = pigServer.dumpSchema("g");
- Util.checkQueryOutputsAfterSortRecursive(it,expectedRes, org.apache.pig.newplan.logical.Util.translateSchema(s));
+ Util.checkQueryOutputs(it,expectedRes, org.apache.pig.newplan.logical.Util.translateSchema(s),
+ Util.isSparkExecType(cluster.getExecType()));
}
@Test
@@ -1064,7 +1068,8 @@ public class TestProjectRange {
};
Iterator<Tuple> it = pigServer.openIterator("lim");
Schema s = pigServer.dumpSchema("lim");
- Util.checkQueryOutputsAfterSortRecursive(it, expectedRes,org.apache.pig.newplan.logical.Util.translateSchema(s));
+ Util.checkQueryOutputs(it, expectedRes,org.apache.pig.newplan.logical.Util.translateSchema(s),
+ Util.isSparkExecType(cluster.getExecType()));
}
@@ -1126,7 +1131,8 @@ public class TestProjectRange {
};
Iterator<Tuple> it = pigServer.openIterator("g");
Schema s = pigServer.dumpSchema("g");
- Util.checkQueryOutputsAfterSortRecursive(it, expectedRes, org.apache.pig.newplan.logical.Util.translateSchema(s));
+ Util.checkQueryOutputs(it, expectedRes, org.apache.pig.newplan.logical.Util.translateSchema(s),
+ Util.isSparkExecType(cluster.getExecType()));
}
private void setAliasesToNull(Schema schema) {
@@ -1165,7 +1171,8 @@ public class TestProjectRange {
};
Iterator<Tuple> it = pigServer.openIterator("j");
Schema s = pigServer.dumpSchema("j");
- Util.checkQueryOutputsAfterSortRecursive(it, expectedRes,org.apache.pig.newplan.logical.Util.translateSchema(s));
+ Util.checkQueryOutputs(it, expectedRes,org.apache.pig.newplan.logical.Util.translateSchema(s),
+ Util.isSparkExecType(cluster.getExecType()));
}
@Test
@@ -1193,7 +1200,8 @@ public class TestProjectRange {
};
Iterator<Tuple> it = pigServer.openIterator("j");
Schema s = pigServer.dumpSchema("j");
- Util.checkQueryOutputsAfterSortRecursive(it, expectedRes, org.apache.pig.newplan.logical.Util.translateSchema(s));
+ Util.checkQueryOutputs(it, expectedRes, org.apache.pig.newplan.logical.Util.translateSchema(s),
+ Util.isSparkExecType(cluster.getExecType()));
}
@Test
Modified: pig/branches/spark/test/org/apache/pig/test/TestPruneColumn.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestPruneColumn.java?rev=1786266&r1=1786265&r2=1786266&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestPruneColumn.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestPruneColumn.java Fri Mar 10 02:36:59 2017
@@ -589,7 +589,8 @@ public class TestPruneColumn {
"({(1,2,3)},1)",
"({(2,5,2)},1)"
};
- Util.checkQueryOutputsAfterSortRecursive(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("C")));
+ Util.checkQueryOutputs(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("C")),
+ Util.isSparkExecType(Util.getLocalTestMode()));
assertTrue(emptyLogFileMessage());
}
@@ -609,7 +610,8 @@ public class TestPruneColumn {
"({(2)},{(2)})",
"({(5)},{})"
};
- Util.checkQueryOutputsAfterSortRecursive(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("D")));
+ Util.checkQueryOutputs(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("D")),
+ Util.isSparkExecType(Util.getLocalTestMode()));
assertTrue(emptyLogFileMessage());
}
@@ -739,7 +741,8 @@ public class TestPruneColumn {
"(1,2,3,{(2)})",
"(2,5,2,{})"
};
- Util.checkQueryOutputsAfterSortRecursive(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("D")));
+ Util.checkQueryOutputs(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("D")),
+ Util.isSparkExecType(Util.getLocalTestMode()));
assertTrue(emptyLogFileMessage());
}
@@ -843,7 +846,8 @@ public class TestPruneColumn {
"(1,1)",
"(2,2)"
};
- Util.checkQueryOutputsAfterSortRecursive(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("D")));
+ Util.checkQueryOutputs(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("D")),
+ Util.isSparkExecType(Util.getLocalTestMode()));
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $1, $2",
"Columns pruned for B: $1"}));
@@ -958,7 +962,8 @@ public class TestPruneColumn {
"((1,2,3))",
"((2,5,2))"
};
- Util.checkQueryOutputsAfterSortRecursive(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("C")));
+ Util.checkQueryOutputs(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("C")),
+ Util.isSparkExecType(Util.getLocalTestMode()));
assertTrue(emptyLogFileMessage());
}
@@ -1507,7 +1512,8 @@ public class TestPruneColumn {
"({(2),(2)},2)",
"({(3),(3),(3)},3)"
};
- Util.checkQueryOutputsAfterSortRecursive(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("G")));
+ Util.checkQueryOutputs(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("G")),
+ Util.isSparkExecType(Util.getLocalTestMode()));
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $1"}));
pigServer.getPigContext().getProperties().remove(PigImplConstants.PIG_OPTIMIZER_RULES_KEY);
@@ -1528,7 +1534,8 @@ public class TestPruneColumn {
"(2,{(1,2,3)})",
"(5,{(2,5,2)})"
};
- Util.checkQueryOutputsAfterSortRecursive(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("D")));
+ Util.checkQueryOutputs(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("D"))
+ ,Util.isSparkExecType(Util.getLocalTestMode()));
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for B: $0"}));
}
@@ -1857,7 +1864,8 @@ public class TestPruneColumn {
"(1,2,3,1)",
"(2,5,2,2)"
};
- Util.checkQueryOutputsAfterSortRecursive(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("C")));
+ Util.checkQueryOutputs(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("C")),
+ Util.isSparkExecType(Util.getLocalTestMode()));
assertTrue(emptyLogFileMessage());
Modified: pig/branches/spark/test/org/apache/pig/test/TestSecondarySort.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestSecondarySort.java?rev=1786266&r1=1786265&r2=1786266&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestSecondarySort.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestSecondarySort.java Fri Mar 10 02:36:59 2017
@@ -238,8 +238,9 @@ public abstract class TestSecondarySort
Iterator<Tuple> iter = pigServer.openIterator("D");
String[] expectedRes = new String[]{"(2,1)","(1,2)"};
Schema s = pigServer.dumpSchema("D");
- Util.checkQueryOutputsAfterSortRecursive(iter,expectedRes,org.apache
- .pig.newplan.logical.Util.translateSchema(s));
+ Util.checkQueryOutputs(iter,expectedRes,org.apache
+ .pig.newplan.logical.Util.translateSchema(s),
+ Util.isSparkExecType(cluster.getExecType()));
Util.deleteFile(cluster, file1ClusterPath);
Util.deleteFile(cluster, file2ClusterPath);
}
@@ -268,8 +269,9 @@ public abstract class TestSecondarySort
"(2,{(2,3,4)})",
"(1,{(1,2,3),(1,2,4),(1,3,4)})"
};
- Util.checkQueryOutputsAfterSortRecursive(iter, expected, org.apache
- .pig.newplan.logical.Util.translateSchema(s));
+ Util.checkQueryOutputs(iter, expected, org.apache
+ .pig.newplan.logical.Util.translateSchema(s),
+ Util.isSparkExecType(Util.getLocalTestMode()));
Util.deleteFile(cluster, clusterPath);
}
@@ -361,7 +363,8 @@ public abstract class TestSecondarySort
Iterator<Tuple> iter = pigServer.openIterator("E");
Schema s = pigServer.dumpSchema("E");
String[] expectedRes = new String[]{"((1,2),4)","((1,3),1)","((1,4),0)","((2,3),1)"};
- Util.checkQueryOutputsAfterSortRecursive(iter, expectedRes,org.apache.pig.newplan.logical.Util.translateSchema(s) );
+ Util.checkQueryOutputs(iter, expectedRes,org.apache.pig.newplan.logical.Util.translateSchema(s),
+ Util.isSparkExecType(cluster.getExecType()));
Util.deleteFile(cluster, clusterPath1);
Util.deleteFile(cluster, clusterPath2);
}
Modified: pig/branches/spark/test/org/apache/pig/test/TestSkewedJoin.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestSkewedJoin.java?rev=1786266&r1=1786265&r2=1786266&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestSkewedJoin.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestSkewedJoin.java Fri Mar 10 02:36:59 2017
@@ -53,6 +53,7 @@ import org.apache.pig.impl.builtin.Parti
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.test.utils.TestHelper;
import org.junit.AfterClass;
+import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -309,8 +310,7 @@ public class TestSkewedJoin {
// Spark engine currently implements skew join as regular join, and hence does
// not control key distribution.
// TODO: Enable this test when Spark engine implements Skew Join algorithm.
- if (Util.isSparkExecType(cluster.getExecType()))
- return;
+ Assume.assumeTrue("Skip this test for Spark until PIG-4858 is resolved!",!Util.isSparkExecType(cluster.getExecType()));
String outputDir = "testSkewedJoinKeyPartition";
try{
Modified: pig/branches/spark/test/org/apache/pig/test/TezMiniCluster.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TezMiniCluster.java?rev=1786266&r1=1786265&r2=1786266&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TezMiniCluster.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TezMiniCluster.java Fri Mar 10 02:36:59 2017
@@ -41,19 +41,12 @@ import org.apache.tez.dag.api.TezConfigu
import org.apache.tez.mapreduce.hadoop.MRJobConfig;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
-public class TezMiniCluster extends MiniGenericCluster {
- private static final File CONF_DIR = new File("build/classes");
+public class TezMiniCluster extends YarnMiniCluster {
+
private static final File TEZ_LIB_DIR = new File("build/ivy/lib/Pig");
private static final File TEZ_CONF_FILE = new File(CONF_DIR, "tez-site.xml");
- private static final File CORE_CONF_FILE = new File(CONF_DIR, "core-site.xml");
- private static final File HDFS_CONF_FILE = new File(CONF_DIR, "hdfs-site.xml");
- private static final File MAPRED_CONF_FILE = new File(CONF_DIR, "mapred-site.xml");
- private static final File YARN_CONF_FILE = new File(CONF_DIR, "yarn-site.xml");
- private static final ExecType TEZ = new TezExecType();
- protected MiniMRYarnCluster m_mr = null;
- private Configuration m_dfs_conf = null;
- private Configuration m_mr_conf = null;
+ private static final ExecType TEZ = new TezExecType();
@Override
public ExecType getExecType() {
@@ -61,66 +54,9 @@ public class TezMiniCluster extends Mini
}
@Override
- public void setupMiniDfsAndMrClusters() {
+ protected void setupMiniDfsAndMrClusters() {
+ super.setupMiniDfsAndMrClusters();
try {
- deleteConfFiles();
- CONF_DIR.mkdirs();
-
- // Build mini DFS cluster
- Configuration hdfsConf = new Configuration();
- m_dfs = new MiniDFSCluster.Builder(hdfsConf)
- .numDataNodes(2)
- .format(true)
- .racks(null)
- .build();
- m_fileSys = m_dfs.getFileSystem();
- m_dfs_conf = m_dfs.getConfiguration(0);
- //Create user home directory
- m_fileSys.mkdirs(m_fileSys.getWorkingDirectory());
-
- // Write core-site.xml
- Configuration core_site = new Configuration(false);
- core_site.set(FileSystem.FS_DEFAULT_NAME_KEY, m_dfs_conf.get(FileSystem.FS_DEFAULT_NAME_KEY));
- core_site.writeXml(new FileOutputStream(CORE_CONF_FILE));
-
- Configuration hdfs_site = new Configuration(false);
- for (Entry<String, String> conf : m_dfs_conf) {
- if (ArrayUtils.contains(m_dfs_conf.getPropertySources(conf.getKey()), "programatically")) {
- hdfs_site.set(conf.getKey(), m_dfs_conf.getRaw(conf.getKey()));
- }
- }
- hdfs_site.writeXml(new FileOutputStream(HDFS_CONF_FILE));
-
- // Build mini YARN cluster
- m_mr = new MiniMRYarnCluster("PigMiniCluster", 2);
- m_mr.init(m_dfs_conf);
- m_mr.start();
- m_mr_conf = m_mr.getConfig();
- File libDir = new File(System.getProperty("ivy.lib.dir", "build/ivy/lib/Pig"));
- File classesDir = new File(System.getProperty("build.classes", "build/classes"));
- File testClassesDir = new File(System.getProperty("test.build.classes", "test/build/classes"));
- String classpath = libDir.getAbsolutePath() + "/*"
- + File.pathSeparator + classesDir.getAbsolutePath()
- + File.pathSeparator + testClassesDir.getAbsolutePath();
- m_mr_conf.set(YarnConfiguration.YARN_APPLICATION_CLASSPATH, classpath);
- m_mr_conf.set(MRJobConfig.MAP_JAVA_OPTS, "-Xmx512m");
- m_mr_conf.set(MRJobConfig.REDUCE_JAVA_OPTS, "-Xmx512m");
-
- Configuration mapred_site = new Configuration(false);
- Configuration yarn_site = new Configuration(false);
- for (Entry<String, String> conf : m_mr_conf) {
- if (ArrayUtils.contains(m_mr_conf.getPropertySources(conf.getKey()), "programatically")) {
- if (conf.getKey().contains("yarn")) {
- yarn_site.set(conf.getKey(), m_mr_conf.getRaw(conf.getKey()));
- } else if (!conf.getKey().startsWith("dfs")){
- mapred_site.set(conf.getKey(), m_mr_conf.getRaw(conf.getKey()));
- }
- }
- }
-
- mapred_site.writeXml(new FileOutputStream(MAPRED_CONF_FILE));
- yarn_site.writeXml(new FileOutputStream(YARN_CONF_FILE));
-
// Write tez-site.xml
Configuration tez_conf = new Configuration(false);
// TODO PIG-3659 - Remove this once memory management is fixed
@@ -150,12 +86,9 @@ public class TezMiniCluster extends Mini
}
}
- m_conf = m_mr_conf;
// Turn FetchOptimizer off so that we can actually test Tez
m_conf.set(PigConfiguration.PIG_OPT_FETCH, System.getProperty("test.opt.fetch", "false"));
- System.setProperty("junit.hadoop.conf", CONF_DIR.getPath());
- System.setProperty("hadoop.log.dir", "build/test/logs");
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -169,29 +102,15 @@ public class TezMiniCluster extends Mini
@Override
protected void shutdownMiniMrClusters() {
- deleteConfFiles();
- if (m_mr != null) {
- m_mr.stop();
- m_mr = null;
- }
+ super.shutdownMiniMrClusters();
}
- private void deleteConfFiles() {
+ @Override
+ protected void deleteConfFiles() {
+ super.deleteConfFiles();
if(TEZ_CONF_FILE.exists()) {
TEZ_CONF_FILE.delete();
}
- if(CORE_CONF_FILE.exists()) {
- CORE_CONF_FILE.delete();
- }
- if(HDFS_CONF_FILE.exists()) {
- HDFS_CONF_FILE.delete();
- }
- if(MAPRED_CONF_FILE.exists()) {
- MAPRED_CONF_FILE.delete();
- }
- if(YARN_CONF_FILE.exists()) {
- YARN_CONF_FILE.delete();
- }
}
static public Launcher getLauncher() {
Modified: pig/branches/spark/test/org/apache/pig/test/Util.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/Util.java?rev=1786266&r1=1786265&r2=1786266&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/Util.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/Util.java Fri Mar 10 02:36:59 2017
@@ -1342,6 +1342,64 @@ public class Util {
return false;
}
+ public static void sortQueryOutputsIfNeed(List<Tuple> actualResList, boolean toSort){
+ if( toSort == true) {
+ for (Tuple t : actualResList) {
+ Util.convertBagToSortedBag(t);
+ }
+ Collections.sort(actualResList);
+ }
+ }
+
+ public static void checkQueryOutputs(Iterator<Tuple> actualResults, List<Tuple> expectedResults, boolean checkAfterSort) {
+ if (checkAfterSort) {
+ checkQueryOutputsAfterSort(actualResults, expectedResults);
+ } else {
+ checkQueryOutputs(actualResults, expectedResults);
+ }
+ }
+
+ static public void checkQueryOutputs(Iterator<Tuple> actualResultsIt,
+ String[] expectedResArray, LogicalSchema schema, boolean
+ checkAfterSort) throws IOException {
+ if (checkAfterSort) {
+ checkQueryOutputsAfterSortRecursive(actualResultsIt,
+ expectedResArray, schema);
+ } else {
+ checkQueryOutputs(actualResultsIt,
+ expectedResArray, schema);
+ }
+ }
+
+ static void checkQueryOutputs(Iterator<Tuple> actualResultsIt,
+ String[] expectedResArray, LogicalSchema schema) throws IOException {
+ LogicalFieldSchema fs = new LogicalFieldSchema("tuple", schema, DataType.TUPLE);
+ ResourceFieldSchema rfs = new ResourceFieldSchema(fs);
+
+ LoadCaster caster = new Utf8StorageConverter();
+ List<Tuple> actualResList = new ArrayList<Tuple>();
+ while (actualResultsIt.hasNext()) {
+ actualResList.add(actualResultsIt.next());
+ }
+
+ List<Tuple> expectedResList = new ArrayList<Tuple>();
+ for (String str : expectedResArray) {
+ Tuple newTuple = caster.bytesToTuple(str.getBytes(), rfs);
+ expectedResList.add(newTuple);
+ }
+
+ for (Tuple t : actualResList) {
+ convertBagToSortedBag(t);
+ }
+
+ for (Tuple t : expectedResList) {
+ convertBagToSortedBag(t);
+ }
+
+ Assert.assertEquals("Comparing actual and expected results. ",
+ expectedResList, actualResList);
+ }
+
public static void assertParallelValues(long defaultParallel,
long requestedParallel,
long estimatedParallel,
Added: pig/branches/spark/test/org/apache/pig/test/YarnMiniCluster.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/YarnMiniCluster.java?rev=1786266&view=auto
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/YarnMiniCluster.java (added)
+++ pig/branches/spark/test/org/apache/pig/test/YarnMiniCluster.java Fri Mar 10 02:36:59 2017
@@ -0,0 +1,132 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.commons.lang.ArrayUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapreduce.v2.MiniMRYarnCluster;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.pig.ExecType;
+
+public abstract class YarnMiniCluster extends MiniGenericCluster {
+ protected static final File CONF_DIR = new File("build/classes");
+ protected static final File CORE_CONF_FILE = new File(CONF_DIR, "core-site.xml");
+ protected static final File HDFS_CONF_FILE = new File(CONF_DIR, "hdfs-site.xml");
+ protected static final File MAPRED_CONF_FILE = new File(CONF_DIR, "mapred-site.xml");
+ protected static final File YARN_CONF_FILE = new File(CONF_DIR, "yarn-site.xml");
+
+
+ protected Configuration m_dfs_conf = null;
+ protected MiniMRYarnCluster m_mr = null;
+ protected Configuration m_mr_conf = null;
+
+
+ @Override
+ protected void setupMiniDfsAndMrClusters() {
+ try {
+ deleteConfFiles();
+ CONF_DIR.mkdirs();
+
+ // Build mini DFS cluster
+ Configuration hdfsConf = new Configuration();
+ m_dfs = new MiniDFSCluster.Builder(hdfsConf)
+ .numDataNodes(2)
+ .format(true)
+ .racks(null)
+ .build();
+ m_fileSys = m_dfs.getFileSystem();
+ m_dfs_conf = m_dfs.getConfiguration(0);
+
+ //Create user home directory
+ m_fileSys.mkdirs(m_fileSys.getWorkingDirectory());
+ // Write core-site.xml
+ Configuration core_site = new Configuration(false);
+ core_site.set(FileSystem.FS_DEFAULT_NAME_KEY, m_dfs_conf.get(FileSystem.FS_DEFAULT_NAME_KEY));
+ core_site.writeXml(new FileOutputStream(CORE_CONF_FILE));
+
+ Configuration hdfs_site = new Configuration(false);
+ for (Map.Entry<String, String> conf : m_dfs_conf) {
+ if (ArrayUtils.contains(m_dfs_conf.getPropertySources(conf.getKey()), "programatically")) {
+ hdfs_site.set(conf.getKey(), m_dfs_conf.getRaw(conf.getKey()));
+ }
+ }
+ hdfs_site.writeXml(new FileOutputStream(HDFS_CONF_FILE));
+
+ // Build mini YARN cluster
+ m_mr = new MiniMRYarnCluster("PigMiniCluster", 2);
+ m_mr.init(m_dfs_conf);
+ m_mr.start();
+ m_mr_conf = m_mr.getConfig();
+ m_mr_conf.set(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
+ System.getProperty("java.class.path"));
+
+ Configuration mapred_site = new Configuration(false);
+ Configuration yarn_site = new Configuration(false);
+ for (Map.Entry<String, String> conf : m_mr_conf) {
+ if (ArrayUtils.contains(m_mr_conf.getPropertySources(conf.getKey()), "programatically")) {
+ if (conf.getKey().contains("yarn")) {
+ yarn_site.set(conf.getKey(), m_mr_conf.getRaw(conf.getKey()));
+ } else if (!conf.getKey().startsWith("dfs")) {
+ mapred_site.set(conf.getKey(), m_mr_conf.getRaw(conf.getKey()));
+ }
+ }
+ }
+
+ mapred_site.writeXml(new FileOutputStream(MAPRED_CONF_FILE));
+ yarn_site.writeXml(new FileOutputStream(YARN_CONF_FILE));
+
+ m_conf = m_mr_conf;
+ System.setProperty("junit.hadoop.conf", CONF_DIR.getPath());
+ System.setProperty("hadoop.log.dir", "build/test/logs");
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+
+ }
+ }
+
+ protected void deleteConfFiles() {
+ if(CORE_CONF_FILE.exists()) {
+ CORE_CONF_FILE.delete();
+ }
+ if(HDFS_CONF_FILE.exists()) {
+ HDFS_CONF_FILE.delete();
+ }
+ if(MAPRED_CONF_FILE.exists()) {
+ MAPRED_CONF_FILE.delete();
+ }
+ if(YARN_CONF_FILE.exists()) {
+ YARN_CONF_FILE.delete();
+ }
+ }
+
+ @Override
+ protected void shutdownMiniMrClusters() {
+ deleteConfFiles();
+ if (m_mr != null) {
+ m_mr.stop();
+ m_mr = null;
+ }
+ }
+}
\ No newline at end of file