You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2014/09/04 20:10:43 UTC
svn commit: r1622521 - in /pig/branches/spark: ./ ivy/
shims/test/hadoop23/org/apache/pig/test/ src/
src/docs/src/documentation/content/xdocs/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/
src/org/apache/pig/backend/had...
Author: cheolsoo
Date: Thu Sep 4 18:10:42 2014
New Revision: 1622521
URL: http://svn.apache.org/r1622521
Log:
Merge latest trunk changes
Added:
pig/branches/spark/test/org/apache/pig/test/TestGroupConstParallelMR.java
- copied unchanged from r1622520, pig/trunk/test/org/apache/pig/test/TestGroupConstParallelMR.java
pig/branches/spark/test/org/apache/pig/test/TestJobSubmissionMR.java
- copied unchanged from r1622520, pig/trunk/test/org/apache/pig/test/TestJobSubmissionMR.java
pig/branches/spark/test/org/apache/pig/tez/TestGroupConstParallelTez.java
- copied unchanged from r1622520, pig/trunk/test/org/apache/pig/tez/TestGroupConstParallelTez.java
pig/branches/spark/test/org/apache/pig/tez/TestJobSubmissionTez.java
- copied unchanged from r1622520, pig/trunk/test/org/apache/pig/tez/TestJobSubmissionTez.java
Modified:
pig/branches/spark/ (props changed)
pig/branches/spark/CHANGES.txt
pig/branches/spark/build.xml
pig/branches/spark/ivy.xml
pig/branches/spark/ivy/libraries.properties
pig/branches/spark/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java
pig/branches/spark/src/docs/src/documentation/content/xdocs/test.xml
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/DiscreteProbabilitySampleGenerator.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/LoaderProcessor.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
pig/branches/spark/src/org/apache/pig/impl/builtin/FindQuantiles.java
pig/branches/spark/src/org/apache/pig/impl/plan/OperatorPlan.java
pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezStats.java
pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezTaskStats.java
pig/branches/spark/src/pig-default.properties (props changed)
pig/branches/spark/test/org/apache/pig/backend/hadoop/accumulo/TestAccumuloPigCluster.java
pig/branches/spark/test/org/apache/pig/builtin/TestBigTypeSort.java
pig/branches/spark/test/org/apache/pig/builtin/TestCurrentTime.java
pig/branches/spark/test/org/apache/pig/builtin/TestInvokerGenerator.java
pig/branches/spark/test/org/apache/pig/pigunit/MiniClusterRunner.java
pig/branches/spark/test/org/apache/pig/pigunit/PigTest.java
pig/branches/spark/test/org/apache/pig/test/TestFindQuantiles.java
pig/branches/spark/test/org/apache/pig/test/TestGroupConstParallel.java
pig/branches/spark/test/org/apache/pig/test/TestJobSubmission.java
pig/branches/spark/test/org/apache/pig/test/TestMergeJoin.java
pig/branches/spark/test/org/apache/pig/test/TestNativeMapReduce.java
pig/branches/spark/test/org/apache/pig/test/TestPigProgressReporting.java
pig/branches/spark/test/org/apache/pig/test/Util.java
pig/branches/spark/test/org/apache/pig/test/data/bzipdir1.bz2/bzipdir2.bz2/recordLossblockHeaderEndsAt136500.txt.bz2 (props changed)
pig/branches/spark/test/org/apache/pig/test/pigunit/TestPigTest.java
pig/branches/spark/test/org/apache/pig/test/pigunit/pig/TestGruntParser.java
pig/branches/spark/test/org/apache/pig/test/utils/ReportingUDF.java
pig/branches/spark/test/tez-tests
Propchange: pig/branches/spark/
------------------------------------------------------------------------------
Merged /pig/trunk:r1621676-1622520
Modified: pig/branches/spark/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/spark/CHANGES.txt?rev=1622521&r1=1622520&r2=1622521&view=diff
==============================================================================
--- pig/branches/spark/CHANGES.txt (original)
+++ pig/branches/spark/CHANGES.txt Thu Sep 4 18:10:42 2014
@@ -24,6 +24,10 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-4146: Create a target to run mr and tez unit test in one shot (daijy)
+
+PIG-4144: Make pigunit.PigTest work in tez mode (daijy)
+
PIG-4128: New logical optimizer rule: ConstantCalculator (daijy)
PIG-4124: Command for Python streaming udf should be configurable (cheolsoo)
@@ -66,6 +70,12 @@ OPTIMIZATIONS
BUG FIXES
+PIG-4143: Port more mini cluster tests to Tez - part 7 (daijy)
+
+PIG-4149: Rounding issue in FindQuantiles (daijy)
+
+PIG-4145: Port local mode tests to Tez - part1 (daijy)
+
PIG-4076: Fix pom file (daijy)
PIG-4140: VertexManagerEvent.getUserPayload returns ReadOnlyBuffer after TEZ-1449 (daijy)
Modified: pig/branches/spark/build.xml
URL: http://svn.apache.org/viewvc/pig/branches/spark/build.xml?rev=1622521&r1=1622520&r2=1622521&view=diff
==============================================================================
--- pig/branches/spark/build.xml (original)
+++ pig/branches/spark/build.xml Thu Sep 4 18:10:42 2014
@@ -117,7 +117,6 @@
<!-- test configuration, use ${user.home}/build.properties to configure values -->
<property name="ssh.gateway" value="" />
<property name="hod.server" value="" />
- <property name="test.log.dir" value="${basedir}/test/logs"/>
<property name="test.output" value="no"/>
<!-- e2e test properties -->
@@ -867,23 +866,28 @@
<!-- Run unit tests -->
<!-- ================================================================== -->
<target name="test-core" depends="setWindowsPath,setLinuxPath,compile-test,jar,debugger.check,jackson-pig-3039-test-download" description="Run full set of unit tests">
- <macro-test-runner test.file="${test.all.file}" />
+ <macro-test-runner test.file="${test.all.file}" tests.failed="test-core.failed" />
+ <fail if="test-core.failed">Tests failed!</fail>
</target>
<target name="test-commit" depends="setWindowsPath,setLinuxPath,compile-test,jar,debugger.check" description="Run approximate 10-minute set of unit tests prior to commiting">
- <macro-test-runner test.file="${test.commit.file}" />
+ <macro-test-runner test.file="${test.commit.file}" tests.failed="test-commit.failed"/>
+ <fail if="test-commit.failed">Tests failed!</fail>
</target>
<target name="test-unit" depends="setWindowsPath,setLinuxPath,compile-test,jar,debugger.check" description="Run all true unit tests">
- <macro-test-runner test.file="${test.unit.file}" />
+ <macro-test-runner test.file="${test.unit.file}" tests.failed="test-unit.failed"/>
+ <fail if="test-unit.failed">Tests failed!</fail>
</target>
<target name="test-smoke" depends="setWindowsPath,setLinuxPath,compile-test,jar,debugger.check" description="Run 30 min smoke tests">
- <macro-test-runner test.file="${test.smoke.file}" />
+ <macro-test-runner test.file="${test.smoke.file}" tests.failed="test-smoke.failed"/>
+ <fail if="test-smoke.failed">Tests failed!</fail>
</target>
<target name="test-tez" depends="setTezEnv,setWindowsPath,setLinuxPath,compile-test,jar,debugger.check,jackson-pig-3039-test-download" description="Run tez unit tests">
- <macro-test-runner test.file="${test.tez.file}" />
+ <macro-test-runner test.file="${test.tez.file}" tests.failed="test-tez.failed"/>
+ <fail if="test-tez.failed">Tests failed!</fail>
</target>
<target name="debugger.check" depends="debugger.set,debugger.unset"/>
@@ -897,12 +901,13 @@
<macrodef name="macro-test-runner">
<attribute name="test.file" />
+ <attribute name="tests.failed" />
<sequential>
<delete dir="${test.log.dir}"/>
<mkdir dir="${test.log.dir}"/>
<tempfile property="junit.tmp.dir" prefix="pig_junit_tmp" destDir="${java.io.tmpdir}" />
<mkdir dir="${junit.tmp.dir}/"/>
- <junit showoutput="${test.output}" printsummary="yes" haltonfailure="no" fork="yes" maxmemory="2048m" dir="${basedir}" timeout="${test.timeout}" errorProperty="tests.failed" failureProperty="tests.failed">
+ <junit showoutput="${test.output}" printsummary="yes" haltonfailure="no" fork="yes" maxmemory="2048m" dir="${basedir}" timeout="${test.timeout}" errorProperty="@{tests.failed}" failureProperty="@{tests.failed}">
<sysproperty key="hadoopversion" value="${hadoopversion}" />
<sysproperty key="test.exec.type" value="${test.exec.type}" />
<sysproperty key="ssh.gateway" value="${ssh.gateway}" />
@@ -950,7 +955,6 @@
</junit>
<delete dir="${junit.tmp.dir}/"/>
- <fail if="tests.failed">Tests failed!</fail>
</sequential>
</macrodef>
@@ -958,6 +962,37 @@
<antcall target="test-core" inheritRefs="true" inheritall="true"/>
</target>
+ <target name="test-core-mrtez" description="run core tests on both mr and tez mode"
+ depends="setWindowsPath,setLinuxPath,compile-test,jar,debugger.check,jackson-pig-3039-test-download">
+ <fail message="hadoopversion must be set to 23 when invoking test-core-mrtez">
+ <condition>
+ <not>
+ <equals arg1="${hadoopversion}" arg2="23" />
+ </not>
+ </condition>
+ </fail>
+ <echo message="=======================" />
+ <echo message="Running MR tests" />
+ <echo message="=======================" />
+ <propertyreset name="test.exec.type" value="mr" />
+ <propertyreset name="test.log.dir" value="${test.build.dir}/logs/${test.exec.type}" />
+ <macro-test-runner test.file="${test.all.file}" tests.failed="test.mr.failed"/>
+ <echo />
+ <echo message="=======================" />
+ <echo message="Running Tez tests" />
+ <echo message="=======================" />
+ <propertyreset name="test.exec.type" value="tez" />
+ <propertyreset name="test.log.dir" value="${test.build.dir}/logs/${test.exec.type}" />
+ <macro-test-runner test.file="${test.tez.file}" tests.failed="test.tez.failed"/>
+ <condition property="any.tests.failed">
+ <or>
+ <isset property="test.mr.failed"/>
+ <isset property="test.tez.failed"/>
+ </or>
+ </condition>
+ <fail if="any.tests.failed">Tests failed!</fail>
+ </target>
+
<!-- ================================================================== -->
<!-- End to end tests -->
<!-- ================================================================== -->
Modified: pig/branches/spark/ivy.xml
URL: http://svn.apache.org/viewvc/pig/branches/spark/ivy.xml?rev=1622521&r1=1622520&r2=1622521&view=diff
==============================================================================
--- pig/branches/spark/ivy.xml (original)
+++ pig/branches/spark/ivy.xml Thu Sep 4 18:10:42 2014
@@ -433,19 +433,19 @@
<!-- for Tez integration -->
<dependency org="org.apache.tez" name="tez" rev="${tez.version}"
- conf="hadoop23->master" changing="true"/>
+ conf="hadoop23->master"/>
<dependency org="org.apache.tez" name="tez-common" rev="${tez.version}"
- conf="hadoop23->master" changing="true"/>
+ conf="hadoop23->master"/>
<dependency org="org.apache.tez" name="tez-api" rev="${tez.version}"
- conf="hadoop23->master" changing="true"/>
+ conf="hadoop23->master"/>
<dependency org="org.apache.tez" name="tez-dag" rev="${tez.version}"
- conf="hadoop23->master" changing="true"/>
+ conf="hadoop23->master"/>
<dependency org="org.apache.tez" name="tez-runtime-internals" rev="${tez.version}"
- conf="hadoop23->master" changing="true"/>
+ conf="hadoop23->master"/>
<dependency org="org.apache.tez" name="tez-runtime-library" rev="${tez.version}"
- conf="hadoop23->master" changing="true"/>
+ conf="hadoop23->master"/>
<dependency org="org.apache.tez" name="tez-mapreduce" rev="${tez.version}"
- conf="hadoop23->master" changing="true"/>
+ conf="hadoop23->master"/>
<dependency org="org.apache.commons" name="commons-collections4" rev="${commons-collections4.version}"
conf="hadoop23->master"/>
<dependency org="org.codehaus.jettison" name="jettison" rev="${jettison.version}"
Modified: pig/branches/spark/ivy/libraries.properties
URL: http://svn.apache.org/viewvc/pig/branches/spark/ivy/libraries.properties?rev=1622521&r1=1622520&r2=1622521&view=diff
==============================================================================
--- pig/branches/spark/ivy/libraries.properties (original)
+++ pig/branches/spark/ivy/libraries.properties Thu Sep 4 18:10:42 2014
@@ -93,6 +93,6 @@ mockito.version=1.8.4
jansi.version=1.9
asm.version=3.3.1
snappy.version=1.1.0.1
-tez.version=0.5.0-SNAPSHOT
+tez.version=0.5.0
parquet-pig-bundle.version=1.2.3
snappy.version=0.2
Modified: pig/branches/spark/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java?rev=1622521&r1=1622520&r2=1622521&view=diff
==============================================================================
--- pig/branches/spark/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java (original)
+++ pig/branches/spark/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java Thu Sep 4 18:10:42 2014
@@ -33,7 +33,6 @@ import org.apache.hadoop.mapreduce.v2.Mi
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.pig.ExecType;
import org.apache.pig.PigConfiguration;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
import org.apache.pig.backend.hadoop.executionengine.tez.TezExecType;
import org.apache.pig.backend.hadoop.executionengine.tez.TezSessionManager;
import org.apache.tez.dag.api.TezConfiguration;
@@ -97,7 +96,6 @@ public class TezMiniCluster extends Mini
m_mr.init(m_dfs_conf);
m_mr.start();
m_mr_conf = m_mr.getConfig();
- m_mr_conf.set(MRConfiguration.FRAMEWORK_NAME, "yarn-tez");
m_mr_conf.set(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
System.getProperty("java.class.path"));
m_mr_conf.set(MRJobConfig.MAP_JAVA_OPTS, "-Xmx2048m");
Modified: pig/branches/spark/src/docs/src/documentation/content/xdocs/test.xml
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/docs/src/documentation/content/xdocs/test.xml?rev=1622521&r1=1622520&r2=1622521&view=diff
==============================================================================
--- pig/branches/spark/src/docs/src/documentation/content/xdocs/test.xml (original)
+++ pig/branches/spark/src/docs/src/documentation/content/xdocs/test.xml Thu Sep 4 18:10:42 2014
@@ -829,8 +829,8 @@ $pig_trunk ant pigunit-jar
<!-- +++++++++++++++++++++++++++++++++++++++ -->
<section>
<title>Mapreduce Mode</title>
- <p>PigUnit also runs in Pig's mapreduce mode. Mapreduce mode requires you to use a Hadoop cluster and HDFS installation.
- It is enabled when the Java system property pigunit.exectype.cluster is set to any value: e.g. -Dpigunit.exectype.cluster=true or System.getProperties().setProperty("pigunit.exectype.cluster", "true"). The cluster you select must be specified in the CLASSPATH (similar to the HADOOP_CONF_DIR variable).
+ <p>PigUnit also runs in Pig's mapreduce/tez/tez_local mode. Mapreduce/Tez mode requires you to use a Hadoop cluster and HDFS installation.
+ It is enabled when the Java system property pigunit.exectype is set to specific values (mr/tez/tez_local): e.g. -Dpigunit.exectype=mr or System.getProperties().setProperty("pigunit.exectype", "mr"), which means PigUnit will run in mr mode. The cluster you select to run mr/tez test must be specified in the CLASSPATH (similar to the HADOOP_CONF_DIR variable).
</p>
</section>
</section>
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/DiscreteProbabilitySampleGenerator.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/DiscreteProbabilitySampleGenerator.java?rev=1622521&r1=1622520&r2=1622521&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/DiscreteProbabilitySampleGenerator.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/partitioners/DiscreteProbabilitySampleGenerator.java Thu Sep 4 18:10:42 2014
@@ -25,7 +25,7 @@ import org.apache.commons.logging.LogFac
public class DiscreteProbabilitySampleGenerator {
Random rGen;
float[] probVec;
- float epsilon = 0.00001f;
+ float epsilon = 0.0001f;
private static final Log LOG = LogFactory.getLog(DiscreteProbabilitySampleGenerator.class);
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1622521&r1=1622520&r2=1622521&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Thu Sep 4 18:10:42 2014
@@ -403,20 +403,6 @@ public class TezDagBuilder extends TezOp
payloadConf.set(PigProcessor.SORT_VERTEX, tezOp.sortOperator.getOperatorKey().toString());
}
- String tmp;
- long maxCombinedSplitSize = 0;
- if (!tezOp.combineSmallSplits() || pc.getProperties().getProperty(PigConfiguration.PIG_SPLIT_COMBINATION, "true").equals("false"))
- payloadConf.setBoolean(PigConfiguration.PIG_NO_SPLIT_COMBINATION, true);
- else if ((tmp = pc.getProperties().getProperty(PigConfiguration.PIG_MAX_COMBINED_SPLIT_SIZE, null)) != null) {
- try {
- maxCombinedSplitSize = Long.parseLong(tmp);
- } catch (NumberFormatException e) {
- log.warn("Invalid numeric format for pig.maxCombinedSplitSize; use the default maximum combined split size");
- }
- }
- if (maxCombinedSplitSize > 0)
- payloadConf.setLong("pig.maxCombinedSplitSize", maxCombinedSplitSize);
-
payloadConf.set("pig.inputs", ObjectSerializer.serialize(tezOp.getLoaderInfo().getInp()));
payloadConf.set("pig.inpSignatures", ObjectSerializer.serialize(tezOp.getLoaderInfo().getInpSignatureLists()));
payloadConf.set("pig.inpLimits", ObjectSerializer.serialize(tezOp.getLoaderInfo().getInpLimits()));
@@ -559,6 +545,11 @@ public class TezDagBuilder extends TezOp
log.info("Estimate quantile for sample aggregation vertex " + tezOp.getOperatorKey().toString());
}
+ // set various parallelism into the job conf for later analysis, PIG-2779
+ payloadConf.setInt("pig.info.reducers.default.parallel", pc.defaultParallel);
+ payloadConf.setInt("pig.info.reducers.requested.parallel", tezOp.getRequestedParallelism());
+ payloadConf.setInt("pig.info.reducers.estimated.parallel", tezOp.getEstimatedParallelism());
+
// Take our assembled configuration and create a vertex
UserPayload userPayload = TezUtils.createUserPayloadFromConf(payloadConf);
procDesc.setUserPayload(userPayload);
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java?rev=1622521&r1=1622520&r2=1622521&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java Thu Sep 4 18:10:42 2014
@@ -56,7 +56,7 @@ public class TezJobCompiler {
public DAG buildDAG(TezOperPlan tezPlan, Map<String, LocalResource> localResources)
throws IOException, YarnException {
String jobName = pigContext.getProperties().getProperty(PigContext.JOB_NAME, "pig");
- DAG tezDag = new DAG(jobName + "-" + dagIdentifier);
+ DAG tezDag = DAG.create(jobName + "-" + dagIdentifier);
dagIdentifier++;
tezDag.setCredentials(new Credentials());
TezDagBuilder dagBuilder = new TezDagBuilder(pigContext, tezPlan, tezDag, localResources);
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java?rev=1622521&r1=1622520&r2=1622521&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java Thu Sep 4 18:10:42 2014
@@ -24,15 +24,21 @@ import java.io.PrintStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.OperatorPlan;
+import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.Pair;
/**
* A Plan used to create the plan of Tez operators which can be converted into
@@ -156,5 +162,65 @@ public class TezOperPlan extends Operato
return super.disconnect(from, to);
}
+
+ /**
+ * Move everything below a given operator to the new operator plan. The specified operator will
+ * be moved and will be the root of the new operator plan
+ * @param root Operator to move everything under including the root operator
+ * @param newPlan new operator plan to move things into
+ * @throws PlanException
+ */
+ public void moveTree(TezOperator root, TezOperPlan newPlan) throws PlanException {
+ List<TezOperator> list = new ArrayList<TezOperator>();
+ list.add(root);
+ int prevSize = 0;
+ int pos = 0;
+ while (list.size() > prevSize) {
+ prevSize = list.size();
+ TezOperator node = list.get(pos);
+ if (getSuccessors(node)!=null) {
+ for (TezOperator succ : getSuccessors(node)) {
+ if (!list.contains(succ)) {
+ list.add(succ);
+ }
+ }
+ }
+ if (getPredecessors(node)!=null) {
+ for (TezOperator pred : getPredecessors(node)) {
+ if (!list.contains(pred)) {
+ list.add(pred);
+ }
+ }
+ }
+ pos++;
+ }
+
+ for (TezOperator node: list) {
+ newPlan.add(node);
+ }
+
+ Set<Pair<TezOperator, TezOperator>> toReconnect = new HashSet<Pair<TezOperator, TezOperator>>();
+ for (TezOperator from : mFromEdges.keySet()) {
+ List<TezOperator> tos = mFromEdges.get(from);
+ for (TezOperator to : tos) {
+ if (list.contains(from) || list.contains(to)) {
+ toReconnect.add(new Pair<TezOperator, TezOperator>(from, to));
+ }
+ }
+ }
+
+ for (Pair<TezOperator, TezOperator> pair : toReconnect) {
+ if (list.contains(pair.first) && list.contains(pair.second)) {
+ // Need to reconnect in newPlan
+ TezEdgeDescriptor edge = pair.second.inEdges.get(pair.first.getOperatorKey());
+ TezCompilerUtil.connect(newPlan, pair.first, pair.second, edge);
+ }
+ }
+
+ for (TezOperator node : list) {
+ // Simply remove from plan, don't deal with inEdges/outEdges
+ super.remove(node);
+ }
+ }
}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/LoaderProcessor.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/LoaderProcessor.java?rev=1622521&r1=1622520&r2=1622521&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/LoaderProcessor.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/LoaderProcessor.java Thu Sep 4 18:10:42 2014
@@ -21,10 +21,13 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.pig.LoadFunc;
+import org.apache.pig.PigConfiguration;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
@@ -46,6 +49,7 @@ import org.apache.tez.mapreduce.hadoop.M
public class LoaderProcessor extends TezOpPlanVisitor {
private Configuration conf;
private PigContext pc;
+ private static final Log log = LogFactory.getLog(LoaderProcessor.class);
public LoaderProcessor(TezOperPlan plan, PigContext pigContext) {
super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan));
this.pc = pigContext;
@@ -125,6 +129,19 @@ public class LoaderProcessor extends Tez
conf.set("pig.inpTargets", ObjectSerializer.serialize(inpTargets));
conf.set("pig.inpSignatures", ObjectSerializer.serialize(inpSignatureLists));
conf.set("pig.inpLimits", ObjectSerializer.serialize(inpLimits));
+ String tmp;
+ long maxCombinedSplitSize = 0;
+ if (!tezOp.combineSmallSplits() || pc.getProperties().getProperty(PigConfiguration.PIG_SPLIT_COMBINATION, "true").equals("false"))
+ conf.setBoolean(PigConfiguration.PIG_NO_SPLIT_COMBINATION, true);
+ else if ((tmp = pc.getProperties().getProperty(PigConfiguration.PIG_MAX_COMBINED_SPLIT_SIZE, null)) != null) {
+ try {
+ maxCombinedSplitSize = Long.parseLong(tmp);
+ } catch (NumberFormatException e) {
+ log.warn("Invalid numeric format for pig.maxCombinedSplitSize; use the default maximum combined split size");
+ }
+ }
+ if (maxCombinedSplitSize > 0)
+ conf.setLong("pig.maxCombinedSplitSize", maxCombinedSplitSize);
tezOp.getLoaderInfo().setInpSignatureLists(inpSignatureLists);
tezOp.getLoaderInfo().setInp(inp);
tezOp.getLoaderInfo().setInpLimits(inpLimits);
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java?rev=1622521&r1=1622520&r2=1622521&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java Thu Sep 4 18:10:42 2014
@@ -103,11 +103,13 @@ public class TezCompilerUtil {
static public void connect(TezOperPlan plan, TezOperator from, TezOperator to, TezEdgeDescriptor edge) throws PlanException {
plan.connect(from, to);
- PhysicalOperator leaf = from.plan.getLeaves().get(0);
- // It could be POStoreTez incase of sampling job in order by
- if (leaf instanceof POLocalRearrangeTez) {
- POLocalRearrangeTez lr = (POLocalRearrangeTez) leaf;
- lr.setOutputKey(to.getOperatorKey().toString());
+ if (from.plan.getLeaves()!=null && !from.plan.getLeaves().isEmpty()) {
+ PhysicalOperator leaf = from.plan.getLeaves().get(0);
+ // It could be POStoreTez incase of sampling job in order by
+ if (leaf instanceof POLocalRearrangeTez) {
+ POLocalRearrangeTez lr = (POLocalRearrangeTez) leaf;
+ lr.setOutputKey(to.getOperatorKey().toString());
+ }
}
// Add edge descriptors to old and new operators
to.inEdges.put(from.getOperatorKey(), edge);
Modified: pig/branches/spark/src/org/apache/pig/impl/builtin/FindQuantiles.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/builtin/FindQuantiles.java?rev=1622521&r1=1622520&r2=1622521&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/builtin/FindQuantiles.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/builtin/FindQuantiles.java Thu Sep 4 18:10:42 2014
@@ -172,7 +172,7 @@ public class FindQuantiles extends EvalF
samples = (DataBag)in.get(1);
}
long numSamples = samples.size();
- long toSkip = numSamples / numQuantiles;
+ double toSkip = (double)numSamples / numQuantiles;
if(toSkip == 0) {
// numSamples is < numQuantiles;
// set numQuantiles to numSamples
@@ -180,9 +180,10 @@ public class FindQuantiles extends EvalF
toSkip = 1;
}
- long ind=0, j=-1, nextQuantile = toSkip-1;
+ long ind=0, j=-1;
+ double nextQuantile = toSkip-1;
for (Tuple it : samples) {
- if (ind==nextQuantile){
+ if (ind==(long)nextQuantile){
++j;
quantilesList.add(it);
nextQuantile+=toSkip;
Modified: pig/branches/spark/src/org/apache/pig/impl/plan/OperatorPlan.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/impl/plan/OperatorPlan.java?rev=1622521&r1=1622520&r2=1622521&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/impl/plan/OperatorPlan.java (original)
+++ pig/branches/spark/src/org/apache/pig/impl/plan/OperatorPlan.java Thu Sep 4 18:10:42 2014
@@ -26,6 +26,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Deque;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -35,6 +36,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.PigException;
import org.apache.pig.impl.util.MultiMap;
+import org.apache.pig.impl.util.Pair;
//import org.apache.commons.collections.map.MultiValueMap;
@@ -284,42 +286,6 @@ public abstract class OperatorPlan<E ext
}
}
}
-
- /**
- * Move everything below a given operator to the new operator plan. The specified operator will
- * be moved and will be the root of the new operator plan
- * @param root Operator to move everything after
- * @param newPlan new operator plan to move things into
- * @throws PlanException
- */
- public void moveTree(E root, OperatorPlan<E> newPlan) throws PlanException {
- Deque<E> queue = new ArrayDeque<E>();
- queue.addLast(root);
- while (!queue.isEmpty()) {
- E node = queue.poll();
- if (getSuccessors(node)!=null) {
- for (E succ : getSuccessors(node)) {
- if (!queue.contains(succ)) {
- queue.addLast(succ);
- }
- }
- }
- newPlan.add(node);
- }
-
- for (E from : mFromEdges.keySet()) {
- if (newPlan.mOps.containsKey(from)) {
- for (E to : mFromEdges.get(from)) {
- if (newPlan.mOps.containsKey(to)) {
- newPlan.connect(from, to);
- }
- }
- }
- }
-
- trimBelow(root);
- remove(root);
- }
/**
* Trim everything above a given operator. The specified operator will
Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezStats.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezStats.java?rev=1622521&r1=1622520&r2=1622521&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezStats.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezStats.java Thu Sep 4 18:10:42 2014
@@ -218,7 +218,7 @@ public class TezStats extends PigStats {
if (v != null) {
UserPayload payload = v.getProcessorDescriptor().getUserPayload();
Configuration conf = TezUtils.createConfFromUserPayload(payload);
- addVertexStats(name, conf, succeeded, tezJob.getVertexCounters(name));
+ addVertexStats(name, conf, succeeded, v.getParallelism(), tezJob.getVertexCounters(name));
}
}
if (!succeeded) {
@@ -226,12 +226,13 @@ public class TezStats extends PigStats {
}
}
- private void addVertexStats(String tezOpName, Configuration conf, boolean succeeded,
+ private void addVertexStats(String tezOpName, Configuration conf, boolean succeeded, int parallelism,
Map<String, Map<String, Long>> map) {
TezTaskStats stats = tezOpVertexMap.get(tezOpName);
stats.setConf(conf);
stats.setId(tezOpName);
stats.setSuccessful(succeeded);
+ stats.setParallelism(parallelism);
if (map == null) {
if (stats.hasLoadOrStore()) {
LOG.warn("Unable to get input(s)/output(s) of the job");
Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezTaskStats.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezTaskStats.java?rev=1622521&r1=1622520&r2=1622521&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezTaskStats.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/tez/TezTaskStats.java Thu Sep 4 18:10:42 2014
@@ -29,6 +29,7 @@ public class TezTaskStats extends JobSta
private static final Log LOG = LogFactory.getLog(TezTaskStats.class);
private String vertexName;
+ private int parallelism;
private List<POStore> stores = null;
private List<FileSpec> loads = null;
@@ -40,6 +41,10 @@ public class TezTaskStats extends JobSta
this.vertexName = vertexName;
}
+ public void setParallelism(int p) {
+ this.parallelism = p;
+ }
+
@Override
public String getJobId() {
return (vertexName == null) ? "" : vertexName;
@@ -253,6 +258,10 @@ public class TezTaskStats extends JobSta
throw new UnsupportedOperationException();
}
+ public int getParallelism() {
+ return parallelism;
+ }
+
public boolean hasLoadOrStore() {
if ((loads != null && !loads.isEmpty())
|| (stores != null && !stores.isEmpty())) {
Propchange: pig/branches/spark/src/pig-default.properties
------------------------------------------------------------------------------
Merged /pig/trunk/src/pig-default.properties:r1621676-1622520
Modified: pig/branches/spark/test/org/apache/pig/backend/hadoop/accumulo/TestAccumuloPigCluster.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/backend/hadoop/accumulo/TestAccumuloPigCluster.java?rev=1622521&r1=1622520&r2=1622521&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/backend/hadoop/accumulo/TestAccumuloPigCluster.java (original)
+++ pig/branches/spark/test/org/apache/pig/backend/hadoop/accumulo/TestAccumuloPigCluster.java Thu Sep 4 18:10:42 2014
@@ -33,12 +33,10 @@ import org.apache.accumulo.core.data.Mut
import org.apache.accumulo.minicluster.MiniAccumuloCluster;
import org.apache.accumulo.minicluster.MiniAccumuloConfig;
import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;
-import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.data.Tuple;
-import org.apache.pig.test.MiniCluster;
+import org.apache.pig.test.Util;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
@@ -89,7 +87,7 @@ public class TestAccumuloPigCluster {
@Before
public void beforeTest() throws Exception {
- pig = new PigServer(ExecType.LOCAL);
+ pig = new PigServer(Util.getLocalTestMode());
}
@AfterClass
Modified: pig/branches/spark/test/org/apache/pig/builtin/TestBigTypeSort.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/builtin/TestBigTypeSort.java?rev=1622521&r1=1622520&r2=1622521&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/builtin/TestBigTypeSort.java (original)
+++ pig/branches/spark/test/org/apache/pig/builtin/TestBigTypeSort.java Thu Sep 4 18:10:42 2014
@@ -28,10 +28,10 @@ import java.math.BigInteger;
import java.util.Iterator;
import java.util.List;
-import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.builtin.mock.Storage.Data;
import org.apache.pig.data.Tuple;
+import org.apache.pig.test.Util;
import org.junit.Before;
import org.junit.Test;
@@ -42,7 +42,7 @@ public class TestBigTypeSort {
@Before
public void setUp() throws Exception {
- pigServer = new PigServer(ExecType.LOCAL);
+ pigServer = new PigServer(Util.getLocalTestMode());
}
@Test
Modified: pig/branches/spark/test/org/apache/pig/builtin/TestCurrentTime.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/builtin/TestCurrentTime.java?rev=1622521&r1=1622520&r2=1622521&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/builtin/TestCurrentTime.java (original)
+++ pig/branches/spark/test/org/apache/pig/builtin/TestCurrentTime.java Thu Sep 4 18:10:42 2014
@@ -26,12 +26,10 @@ import static org.junit.Assert.assertTru
import java.util.Iterator;
import java.util.List;
-import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
-import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.builtin.mock.Storage.Data;
import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.test.Util;
import org.junit.Before;
import org.junit.Test;
@@ -42,7 +40,7 @@ public class TestCurrentTime {
@Before
public void setUp() throws Exception {
- pigServer = new PigServer(ExecType.LOCAL);
+ pigServer = new PigServer(Util.getLocalTestMode());
}
@Test
Modified: pig/branches/spark/test/org/apache/pig/builtin/TestInvokerGenerator.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/builtin/TestInvokerGenerator.java?rev=1622521&r1=1622520&r2=1622521&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/builtin/TestInvokerGenerator.java (original)
+++ pig/branches/spark/test/org/apache/pig/builtin/TestInvokerGenerator.java Thu Sep 4 18:10:42 2014
@@ -26,11 +26,11 @@ import java.util.List;
import java.util.Random;
import java.util.Set;
-import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.builtin.mock.Storage.Data;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.util.Utils;
+import org.apache.pig.test.Util;
import org.junit.Before;
import org.junit.Test;
@@ -43,7 +43,7 @@ public class TestInvokerGenerator {
@Before
public void setUp() throws Exception {
- pigServer = new PigServer(ExecType.LOCAL);
+ pigServer = new PigServer(Util.getLocalTestMode());
r = new Random(42L);
}
Modified: pig/branches/spark/test/org/apache/pig/pigunit/MiniClusterRunner.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/pigunit/MiniClusterRunner.java?rev=1622521&r1=1622520&r2=1622521&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/pigunit/MiniClusterRunner.java (original)
+++ pig/branches/spark/test/org/apache/pig/pigunit/MiniClusterRunner.java Thu Sep 4 18:10:42 2014
@@ -12,7 +12,7 @@
*/
package org.apache.pig.pigunit;
-import org.apache.pig.test.MiniCluster;
+import org.apache.pig.test.MiniGenericCluster;
/**
@@ -41,6 +41,6 @@ import org.apache.pig.test.MiniCluster;
public class MiniClusterRunner {
public static void main(String[] args) {
System.setProperty("hadoop.log.dir", "/tmp/pigunit");
- MiniCluster.buildCluster();
+ MiniGenericCluster.buildCluster();
}
}
Modified: pig/branches/spark/test/org/apache/pig/pigunit/PigTest.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/pigunit/PigTest.java?rev=1622521&r1=1622520&r2=1622521&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/pigunit/PigTest.java (original)
+++ pig/branches/spark/test/org/apache/pig/pigunit/PigTest.java Thu Sep 4 18:10:42 2014
@@ -31,6 +31,8 @@ import junit.framework.Assert;
import org.apache.commons.lang.StringUtils;
import org.apache.log4j.Logger;
import org.apache.pig.ExecType;
+import org.apache.pig.ExecTypeProvider;
+import org.apache.pig.PigException;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
@@ -62,7 +64,7 @@ public class PigTest {
private static ThreadLocal<Cluster> cluster = new ThreadLocal<Cluster>();
private static final Logger LOG = Logger.getLogger(PigTest.class);
- private static final String EXEC_CLUSTER = "pigunit.exectype.cluster";
+ private static final String EXEC_CLUSTER = "pigunit.exectype";
/**
* Initializes the Pig test.
@@ -120,16 +122,30 @@ public class PigTest {
* @throws ExecException If the PigServer can't be started.
*/
public static Cluster getCluster() throws ExecException {
- if (cluster.get() == null) {
- if (System.getProperties().containsKey(EXEC_CLUSTER)) {
- LOG.info("Using cluster mode");
- pig.set(new PigServer(ExecType.MAPREDUCE));
- } else {
- LOG.info("Using default local mode");
- pig.set(new PigServer(ExecType.LOCAL));
+ try {
+ if (cluster.get() == null) {
+ ExecType execType = ExecType.LOCAL;
+ if (System.getProperties().containsKey(EXEC_CLUSTER)) {
+ if (System.getProperties().getProperty(EXEC_CLUSTER).equalsIgnoreCase("mr")) {
+ LOG.info("Using mr cluster mode");
+ execType = ExecType.MAPREDUCE;
+ } else if (System.getProperties().getProperty(EXEC_CLUSTER).equalsIgnoreCase("tez")) {
+ LOG.info("Using tez cluster mode");
+ execType = ExecTypeProvider.fromString("tez");
+ } else if (System.getProperties().getProperty(EXEC_CLUSTER).equalsIgnoreCase("tez_local")) {
+ LOG.info("Using tez local mode");
+ execType = ExecTypeProvider.fromString("tez_local");
+ } else {
+ LOG.info("Using default local mode");
+ }
+ } else {
+ LOG.info("Using default local mode");
+ }
+ pig.set(new PigServer(execType));
+ cluster.set(new Cluster(pig.get().getPigContext()));
}
-
- cluster.set(new Cluster(pig.get().getPigContext()));
+ } catch (PigException e) {
+ throw new ExecException(e);
}
return cluster.get();
Modified: pig/branches/spark/test/org/apache/pig/test/TestFindQuantiles.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestFindQuantiles.java?rev=1622521&r1=1622520&r2=1622521&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestFindQuantiles.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestFindQuantiles.java Thu Sep 4 18:10:42 2014
@@ -20,13 +20,18 @@ package org.apache.pig.test;
import static org.junit.Assert.assertTrue;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
+import java.util.Random;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.partitioners.DiscreteProbabilitySampleGenerator;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.InternalMap;
+import org.apache.pig.data.NonSpillableDataBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.builtin.FindQuantiles;
@@ -35,7 +40,7 @@ import org.junit.Test;
public class TestFindQuantiles {
private static TupleFactory tFact = TupleFactory.getInstance();
- private static final float epsilon = 0.00001f;
+ private static final float epsilon = 0.0001f;
@Test
public void testFindQuantiles() throws Exception {
@@ -43,7 +48,7 @@ public class TestFindQuantiles {
final int numReducers = 1009;
float sum = getProbVecSum(numSamples, numReducers);
System.out.println("sum: " + sum);
- assertTrue(sum > (1+epsilon));
+ assertTrue(sum > (1-epsilon) && sum < (1+epsilon));
}
@Test
@@ -52,9 +57,34 @@ public class TestFindQuantiles {
final int numReducers = 3000;
float sum = getProbVecSum(numSamples, numReducers);
System.out.println("sum: " + sum);
- assertTrue(sum < (1-epsilon));
+ assertTrue(sum > (1-epsilon) && sum < (1+epsilon));
}
-
+
+ @Test
+ public void testFindQuantilesRemainder() throws Exception {
+ final int numSamples = 1900;
+ final int numReducers = 300;
+ DataBag samples = generateRandomSortedSamples(numSamples, 365);
+ Map<String, Object> findQuantilesResult = getFindQuantilesResult(samples, numReducers);
+ DataBag quantilesBag = (DataBag)findQuantilesResult.get(FindQuantiles.QUANTILES_LIST);
+ Iterator<Tuple> iter = quantilesBag.iterator();
+ Tuple lastQuantile = null;
+ while (iter.hasNext()) {
+ lastQuantile = iter.next();
+ }
+ int lastQuantileNum = (Integer)lastQuantile.get(0);
+ int count = 0;
+ iter = samples.iterator();
+ while (iter.hasNext()) {
+ Tuple t = iter.next();
+ int num = (Integer)t.get(0);
+ if (num >= lastQuantileNum) {
+ count++;
+ }
+ }
+ assertTrue((double)count/numSamples <= 1.0/365 + 0.001);
+ }
+
private float[] getProbVec(Tuple values) throws Exception {
float[] probVec = new float[values.size()];
for(int i = 0; i < values.size(); i++) {
@@ -62,22 +92,46 @@ public class TestFindQuantiles {
}
return probVec;
}
-
- private float getProbVecSum(int numSamples, int numReduceres) throws Exception {
- Tuple in = tFact.newTuple(2);
+
+ private DataBag generateRandomSortedSamples(int numSamples, int max) throws Exception {
+ Random rand = new Random(1000);
+ List<Tuple> samples = new ArrayList<Tuple>();
+ for (int i=0; i<numSamples; i++) {
+ Tuple t = tFact.newTuple(1);
+ t.set(0, rand.nextInt(max));
+ samples.add(t);
+ }
+ Collections.sort(samples);
+ return new NonSpillableDataBag(samples);
+ }
+
+ private DataBag generateUniqueSamples(int numSamples) throws Exception {
DataBag samples = BagFactory.getInstance().newDefaultBag();
for (int i=0; i<numSamples; i++) {
Tuple t = tFact.newTuple(1);
t.set(0, new Integer(23));
samples.add(t);
}
+ return samples;
+ }
+
+ private Map<String, Object> getFindQuantilesResult(DataBag samples,
+ int numReduceres) throws Exception {
+ Tuple in = tFact.newTuple(2);
+
in.set(0, new Integer(numReduceres));
in.set(1, samples);
FindQuantiles fq = new FindQuantiles();
Map<String, Object> res = fq.exec(in);
-
+ return res;
+ }
+
+ private float getProbVecSum(int numSamples, int numReduceres) throws Exception {
+ DataBag samples = generateUniqueSamples(numSamples);
+ Map<String, Object> res = getFindQuantilesResult(samples, numReduceres);
+
InternalMap weightedPartsData = (InternalMap) res.get(FindQuantiles.WEIGHTED_PARTS);
Iterator<Object> it = weightedPartsData.values().iterator();
float[] probVec = getProbVec((Tuple)it.next());
Modified: pig/branches/spark/test/org/apache/pig/test/TestGroupConstParallel.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestGroupConstParallel.java?rev=1622521&r1=1622520&r2=1622521&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestGroupConstParallel.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestGroupConstParallel.java Thu Sep 4 18:10:42 2014
@@ -17,44 +17,32 @@
*/
package org.apache.pig.test;
-import static org.junit.Assert.assertEquals;
-
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.jobcontrol.Job;
-import org.apache.hadoop.mapred.jobcontrol.JobControl;
-import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
-import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.util.ConfigurationValidator;
import org.apache.pig.newplan.OperatorPlan;
import org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer;
import org.apache.pig.newplan.logical.rules.GroupByConstParallelSetter;
import org.apache.pig.newplan.optimizer.Rule;
-import org.apache.pig.test.utils.GenPhyOp;
-import org.apache.pig.tools.pigstats.JobStats;
import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.tools.pigstats.PigStats.JobGraph;
-import org.apache.pig.tools.pigstats.mapreduce.MRJobStats;
import org.junit.AfterClass;
import org.junit.BeforeClass;
+import org.junit.Ignore;
import org.junit.Test;
-public class TestGroupConstParallel {
+@Ignore
+public abstract class TestGroupConstParallel {
private static final String INPUT_FILE = "TestGroupConstParallelInp";
- private static MiniCluster cluster = MiniCluster.buildCluster();
+ private static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
@BeforeClass
@@ -79,7 +67,7 @@ public class TestGroupConstParallel {
*/
@Test
public void testGroupAllWithParallel() throws Exception {
- PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster
+ PigServer pigServer = new PigServer(cluster.getExecType(), cluster
.getProperties());
@@ -95,42 +83,30 @@ public class TestGroupConstParallel {
Util.checkQueryOutputsAfterSort(iter, expectedRes);
JobGraph jGraph = PigStats.get().getJobGraph();
- assertEquals(1, jGraph.size());
- // find added map-only concatenate job
- MRJobStats js = (MRJobStats)jGraph.getSources().get(0);
- assertEquals(1, js.getNumberMaps());
- assertEquals(1, js.getNumberReduces());
+ checkGroupAllWithParallelGraphResult(jGraph);
}
-
}
-
-
+
+ abstract protected void checkGroupAllWithParallelGraphResult(JobGraph jGraph);
+
/**
* Test parallelism for group by constant
* @throws Throwable
*/
@Test
public void testGroupConstWithParallel() throws Throwable {
- PigContext pc = new PigContext(ExecType.MAPREDUCE, cluster.getProperties());
+ PigContext pc = new PigContext(cluster.getExecType(), cluster.getProperties());
pc.defaultParallel = 100;
pc.connect();
- String query = "a = load 'input';\n" + "b = group a by 1;" + "store b into 'output';";
- PigServer pigServer = new PigServer( ExecType.MAPREDUCE, cluster.getProperties() );
+ String query = "a = load '" + INPUT_FILE + "';\n" + "b = group a by 1;" + "store b into 'output';";
+ PigServer pigServer = new PigServer( cluster.getExecType(), cluster.getProperties() );
PhysicalPlan pp = Util.buildPp( pigServer, query );
-
- MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
- ConfigurationValidator.validatePigProperties(pc.getProperties());
- Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties());
- JobControlCompiler jcc = new JobControlCompiler(pc, conf);
-
- JobControl jobControl = jcc.compile(mrPlan, "Test");
- Job job = jobControl.getWaitingJobs().get(0);
- int parallel = job.getJobConf().getNumReduceTasks();
-
- assertEquals("parallism", 1, parallel);
+ checkGroupConstWithParallelResult(pp, pc);
}
+
+ abstract protected void checkGroupConstWithParallelResult(PhysicalPlan pp, PigContext pc) throws Exception;
/**
* Test parallelism for group by column
@@ -138,27 +114,20 @@ public class TestGroupConstParallel {
*/
@Test
public void testGroupNonConstWithParallel() throws Throwable {
- PigContext pc = new PigContext(ExecType.MAPREDUCE, cluster.getProperties());
+ PigContext pc = new PigContext(cluster.getExecType(), cluster.getProperties());
pc.defaultParallel = 100;
pc.connect();
- PigServer pigServer = new PigServer( ExecType.MAPREDUCE, cluster.getProperties() );
- String query = "a = load 'input';\n" + "b = group a by $0;" + "store b into 'output';";
+ PigServer pigServer = new PigServer( cluster.getExecType(), cluster.getProperties() );
+ String query = "a = load '" + INPUT_FILE + "';\n" + "b = group a by $0;" + "store b into 'output';";
PhysicalPlan pp = Util.buildPp( pigServer, query );
- MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
- ConfigurationValidator.validatePigProperties(pc.getProperties());
- Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties());
- JobControlCompiler jcc = new JobControlCompiler(pc, conf);
-
- JobControl jobControl = jcc.compile(mrPlan, "Test");
- Job job = jobControl.getWaitingJobs().get(0);
- int parallel = job.getJobConf().getNumReduceTasks();
-
- assertEquals("parallism", 100, parallel);
+ checkGroupNonConstWithParallelResult(pp, pc);
}
+ abstract protected void checkGroupNonConstWithParallelResult(PhysicalPlan pp, PigContext pc) throws Exception;
+
public class MyPlanOptimizer extends LogicalPlanOptimizer {
protected MyPlanOptimizer(OperatorPlan p, int iterations) {
Modified: pig/branches/spark/test/org/apache/pig/test/TestJobSubmission.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestJobSubmission.java?rev=1622521&r1=1622520&r2=1622521&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestJobSubmission.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestJobSubmission.java Thu Sep 4 18:10:42 2014
@@ -37,31 +37,28 @@ import org.apache.hadoop.hbase.MiniHBase
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.hadoop.mapred.jobcontrol.JobControl;
-import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
-import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
-import org.apache.pig.data.DataType;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.util.ConfigurationValidator;
import org.apache.pig.test.utils.GenPhyOp;
import org.junit.After;
import org.junit.AfterClass;
+import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
+import org.junit.Ignore;
import org.junit.Test;
-public class TestJobSubmission {
+@Ignore
+abstract public class TestJobSubmission {
static PigContext pc;
@@ -75,11 +72,11 @@ public class TestJobSubmission {
String curDir;
String inpDir;
String golDir;
- static MiniCluster cluster = MiniCluster.buildCluster();
+ static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
@BeforeClass
public static void onetimeSetUp() throws Exception {
- pc = new PigContext(ExecType.MAPREDUCE, cluster.getProperties());
+ pc = new PigContext(cluster.getExecType(), cluster.getProperties());
try {
pc.connect();
} catch (ExecException e) {
@@ -115,63 +112,35 @@ public class TestJobSubmission {
@Test
public void testJobControlCompilerErr() throws Exception {
- String query = "a = load 'input';" + "b = order a by $0;" + "store b into 'output';";
- PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ String query = "a = load '/passwd' as (a1:bag{(t:chararray)});" + "b = order a by a1;" + "store b into 'output';";
+ PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
PhysicalPlan pp = Util.buildPp(pigServer, query);
- POStore store = GenPhyOp.dummyPigStorageOp();
- pp.addAsLeaf(store);
- MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
-
- for(MapReduceOper mro: mrPlan.getLeaves()) {
- if(mro.reducePlan != null) {
- PhysicalOperator po = mro.reducePlan.getRoots().get(0);
- if (po instanceof POPackage) {
- ((POPackage) po).getPkgr().setKeyType(DataType.BAG);
- mro.setGlobalSort(true);
- }
- }
- }
-
- ConfigurationValidator.validatePigProperties(pc.getProperties());
- Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties());
- JobControlCompiler jcc = new JobControlCompiler(pc, conf);
- try {
- jcc.compile(mrPlan, "Test");
- } catch (JobCreationException jce) {
- assertTrue(jce.getErrorCode() == 1068);
- }
+ checkJobControlCompilerErrResult(pp, pc);
}
+ abstract protected void checkJobControlCompilerErrResult(PhysicalPlan pp, PigContext pc) throws Exception;
+
@Test
public void testDefaultParallel() throws Throwable {
pc.defaultParallel = 100;
- String query = "a = load 'input';" + "b = group a by $0;" + "store b into 'output';";
- PigServer ps = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ String query = "a = load '/passwd';" + "b = group a by $0;" + "store b into 'output';";
+ PigServer ps = new PigServer(cluster.getExecType(), cluster.getProperties());
PhysicalPlan pp = Util.buildPp(ps, query);
- MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
-
- ConfigurationValidator.validatePigProperties(pc.getProperties());
- Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties());
- JobControlCompiler jcc = new JobControlCompiler(pc, conf);
-
- JobControl jobControl = jcc.compile(mrPlan, "Test");
- Job job = jobControl.getWaitingJobs().get(0);
- int parallel = job.getJobConf().getNumReduceTasks();
-
- assertEquals(100, parallel);
- Util.assertParallelValues(100, -1, -1, 100, job.getJobConf());
+ checkDefaultParallelResult(pp, pc);
pc.defaultParallel = -1;
}
+ abstract protected void checkDefaultParallelResult(PhysicalPlan pp, PigContext pc) throws Exception;
+
@Test
public void testDefaultParallelInSort() throws Throwable {
// default_parallel is considered only at runtime, so here we only test requested parallel
// more thorough tests can be found in TestNumberOfReducers.java
String query = "a = load 'input';" + "b = order a by $0 parallel 100;" + "store b into 'output';";
- PigServer ps = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ PigServer ps = new PigServer(cluster.getExecType(), cluster.getProperties());
PhysicalPlan pp = Util.buildPp(ps, query);
MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
@@ -198,7 +167,7 @@ public class TestJobSubmission {
"b = load 'input';" +
"c = join a by $0, b by $0 using 'skewed' parallel 100;" +
"store c into 'output';";
- PigServer ps = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ PigServer ps = new PigServer(cluster.getExecType(), cluster.getProperties());
PhysicalPlan pp = Util.buildPp(ps, query);
MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
@@ -219,6 +188,10 @@ public class TestJobSubmission {
@Test
public void testReducerNumEstimation() throws Exception{
+ // Skip the test for Tez. Tez use a different mechanism.
+ // Equivalent test is in TestTezAutoParallelism
+ Assume.assumeTrue("Skip this test for TEZ",
+ Util.isMapredExecType(cluster.getExecType()));
// use the estimation
Configuration conf = HBaseConfiguration.create(new Configuration());
HBaseTestingUtility util = new HBaseTestingUtility(conf);
@@ -228,7 +201,7 @@ public class TestJobSubmission {
String query = "a = load '/passwd';" +
"b = group a by $0;" +
"store b into 'output';";
- PigServer ps = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ PigServer ps = new PigServer(cluster.getExecType(), cluster.getProperties());
PhysicalPlan pp = Util.buildPp(ps, query);
MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
@@ -295,6 +268,10 @@ public class TestJobSubmission {
@Test
public void testReducerNumEstimationForOrderBy() throws Exception{
+ // Skip the test for Tez. Tez use a different mechanism.
+ // Equivalent test is in TestTezAutoParallelism
+ Assume.assumeTrue("Skip this test for TEZ",
+ Util.isMapredExecType(cluster.getExecType()));
// use the estimation
pc.getProperties().setProperty("pig.exec.reducers.bytes.per.reducer", "100");
pc.getProperties().setProperty("pig.exec.reducers.max", "10");
@@ -302,7 +279,7 @@ public class TestJobSubmission {
String query = "a = load '/passwd';" +
"b = order a by $0;" +
"store b into 'output';";
- PigServer ps = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ PigServer ps = new PigServer(cluster.getExecType(), cluster.getProperties());
PhysicalPlan pp = Util.buildPp(ps, query);
MROperPlan mrPlan = Util.buildMRPlanWithOptimizer(pp, pc);
Modified: pig/branches/spark/test/org/apache/pig/test/TestMergeJoin.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestMergeJoin.java?rev=1622521&r1=1622520&r2=1622521&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestMergeJoin.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestMergeJoin.java Thu Sep 4 18:10:42 2014
@@ -53,14 +53,14 @@ public class TestMergeJoin {
private static final String INPUT_FILE = "testMergeJoinInput.txt";
private static final String INPUT_FILE2 = "testMergeJoinInput2.txt";
private PigServer pigServer;
- private static MiniCluster cluster = MiniCluster.buildCluster();
+ private static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
public TestMergeJoin() throws ExecException{
Properties props = cluster.getProperties();
props.setProperty(MRConfiguration.MAP_MAX_ATTEMPTS, "1");
props.setProperty(MRConfiguration.REDUCE_MAX_ATTEMPTS, "1");
- pigServer = new PigServer(ExecType.MAPREDUCE, props);
+ pigServer = new PigServer(cluster.getExecType(), props);
}
/**
* @throws java.lang.Exception
Modified: pig/branches/spark/test/org/apache/pig/test/TestNativeMapReduce.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestNativeMapReduce.java?rev=1622521&r1=1622520&r2=1622521&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestNativeMapReduce.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestNativeMapReduce.java Thu Sep 4 18:10:42 2014
@@ -27,10 +27,11 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
-import org.apache.pig.ExecType;
+import org.apache.hadoop.mapred.FileAlreadyExistsException;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecJob;
import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS;
+import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
import org.apache.pig.data.Tuple;
import org.apache.pig.tools.pigstats.PigStats;
import org.junit.AfterClass;
@@ -58,7 +59,7 @@ public class TestNativeMapReduce {
* file if specified will be skipped by the wordcount udf
*/
final static String STOPWORD_FILE = "TestNMapReduceStopwFile";
- static MiniCluster cluster = MiniCluster.buildCluster();
+ static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
private PigServer pigServer = null;
/**
@@ -97,7 +98,7 @@ public class TestNativeMapReduce {
@Before
public void setUp() throws Exception{
- pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
//createWordCountJar();
}
@@ -206,6 +207,9 @@ public class TestNativeMapReduce {
assertTrue("job failed", PigStats.get().getReturnCode() != 0);
+ } catch (JobCreationException e) {
+ // Running in Tez mode throw exception
+ assertTrue(e.getCause() instanceof FileAlreadyExistsException);
}
finally{
// We have to manually delete intermediate mapreduce files
Modified: pig/branches/spark/test/org/apache/pig/test/TestPigProgressReporting.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestPigProgressReporting.java?rev=1622521&r1=1622520&r2=1622521&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestPigProgressReporting.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestPigProgressReporting.java Thu Sep 4 18:10:42 2014
@@ -32,7 +32,7 @@ import org.junit.Test;
public class TestPigProgressReporting {
- static MiniCluster cluster = MiniCluster.buildCluster();
+ static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
@Test
public void testProgressReportingWithStatusMessage() throws Exception {
@@ -46,7 +46,7 @@ public class TestPigProgressReporting {
Util.createInputFile(cluster, "a.txt", new String[] { "dummy"});
- PigServer pig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ PigServer pig = new PigServer(cluster.getExecType(), cluster.getProperties());
String filename = prepareTempFile();
filename = filename.replace("\\", "\\\\");
Modified: pig/branches/spark/test/org/apache/pig/test/Util.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/Util.java?rev=1622521&r1=1622520&r2=1622521&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/Util.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/Util.java Thu Sep 4 18:10:42 2014
@@ -58,6 +58,7 @@ import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.SimpleLayout;
import org.apache.pig.ExecType;
+import org.apache.pig.ExecTypeProvider;
import org.apache.pig.LoadCaster;
import org.apache.pig.PigException;
import org.apache.pig.PigServer;
@@ -735,7 +736,7 @@ public class Util {
}
if (context.getExecType() == ExecType.MAPREDUCE || context.getExecType().name().equals("TEZ")) {
return FileLocalizer.hadoopify(filename, context);
- } else if (context.getExecType() == ExecType.LOCAL) {
+ } else if (context.getExecType().isLocal()) {
return filename;
} else {
throw new IllegalStateException("ExecType: " + context.getExecType());
@@ -1269,7 +1270,7 @@ public class Util {
assertConfLong(conf, MRConfiguration.REDUCE_TASKS, runtimeParallel);
}
- private static void assertConfLong(Configuration conf, String param, long expected) {
+ public static void assertConfLong(Configuration conf, String param, long expected) {
assertEquals("Unexpected value found in configs for " + param, expected, conf.getLong(param, -1));
}
@@ -1346,4 +1347,13 @@ public class Util {
}
return jarNames[0];
}
+
+ public static ExecType getLocalTestMode() throws Exception {
+ String execType = System.getProperty("test.exec.type");
+ if (execType!=null && execType.equals("tez")) {
+ return ExecTypeProvider.fromString("tez_local");
+ } else {
+ return ExecTypeProvider.fromString("local");
+ }
+ }
}
Propchange: pig/branches/spark/test/org/apache/pig/test/data/bzipdir1.bz2/bzipdir2.bz2/recordLossblockHeaderEndsAt136500.txt.bz2
------------------------------------------------------------------------------
Merged /pig/trunk/test/org/apache/pig/test/data/bzipdir1.bz2/bzipdir2.bz2/recordLossblockHeaderEndsAt136500.txt.bz2:r1621676-1622520
Modified: pig/branches/spark/test/org/apache/pig/test/pigunit/TestPigTest.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/pigunit/TestPigTest.java?rev=1622521&r1=1622520&r2=1622521&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/pigunit/TestPigTest.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/pigunit/TestPigTest.java Thu Sep 4 18:10:42 2014
@@ -27,10 +27,10 @@ import org.apache.commons.lang.StringUti
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
-import org.apache.pig.ExecType;
import org.apache.pig.pigunit.Cluster;
import org.apache.pig.pigunit.PigTest;
import org.apache.pig.pigunit.pig.PigServer;
+import org.apache.pig.test.Util;
import org.apache.pig.tools.parameters.ParseException;
import org.junit.BeforeClass;
import org.junit.Ignore;
@@ -56,7 +56,8 @@ public class TestPigTest {
private static final Log LOG = LogFactory.getLog(TestPigTest.class);
@BeforeClass
- public static void setUpOnce() throws IOException {
+ public static void setUpOnce() throws Exception {
+ System.getProperties().setProperty("pigunit.exectype", Util.getLocalTestMode().toString());
cluster = PigTest.getCluster();
cluster.update(
@@ -366,12 +367,11 @@ public class TestPigTest {
/**
* This is a test for default bootup. PIG-2456
- *
- * @throws IOException
+ * @throws Exception
*/
@Test
- public void testDefaultBootup() throws ParseException, IOException {
+ public void testDefaultBootup() throws Exception {
// Test with properties file
String pigProps = "pig.properties";
String bootupPath = "/tmp/.temppigbootup";
@@ -414,13 +414,7 @@ public class TestPigTest {
// Create a pigunit.pig.PigServer and Cluster to run this test.
PigServer pig = null;
- if (System.getProperties().containsKey("pigunit.exectype.cluster")) {
- LOG.info("Using cluster mode");
- pig = new PigServer(ExecType.MAPREDUCE);
- } else {
- LOG.info("Using default local mode");
- pig = new PigServer(ExecType.LOCAL);
- }
+ pig = new PigServer(Util.getLocalTestMode());
final Cluster cluster = new Cluster(pig.getPigContext());
Modified: pig/branches/spark/test/org/apache/pig/test/pigunit/pig/TestGruntParser.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/pigunit/pig/TestGruntParser.java?rev=1622521&r1=1622520&r2=1622521&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/pigunit/pig/TestGruntParser.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/pigunit/pig/TestGruntParser.java Thu Sep 4 18:10:42 2014
@@ -19,10 +19,10 @@ import java.util.Map;
import junit.framework.Assert;
-import org.apache.pig.ExecType;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.pigunit.pig.GruntParser;
import org.apache.pig.pigunit.pig.PigServer;
+import org.apache.pig.test.Util;
import org.junit.Before;
import org.junit.Test;
@@ -33,12 +33,12 @@ public class TestGruntParser {
@SuppressWarnings("serial")
@Before
- public void setUp() throws ExecException {
+ public void setUp() throws Exception {
override = new HashMap<String, String>() {{
put("STORE", "");
put("DUMP", "");
}};
- PigServer pigServer = new PigServer(ExecType.LOCAL);
+ PigServer pigServer = new PigServer(Util.getLocalTestMode());
parser = new GruntParser(new StringReader(""), pigServer, override);
}
Modified: pig/branches/spark/test/org/apache/pig/test/utils/ReportingUDF.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/utils/ReportingUDF.java?rev=1622521&r1=1622520&r2=1622521&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/utils/ReportingUDF.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/utils/ReportingUDF.java Thu Sep 4 18:10:42 2014
@@ -22,6 +22,7 @@ import java.io.IOException;
import org.apache.pig.EvalFunc;
import org.apache.pig.data.Tuple;
+import org.apache.pig.tools.pigstats.PigStatusReporter;
public class ReportingUDF extends EvalFunc<Integer> {
@@ -30,7 +31,8 @@ public class ReportingUDF extends EvalFu
try {
Thread.sleep(7500);
- getReporter().progress("Progressing");
+ PigStatusReporter reporter = PigStatusReporter.getInstance();
+ reporter.progress();
Thread.sleep(7500);
} catch (InterruptedException e) {
}
Modified: pig/branches/spark/test/tez-tests
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/tez-tests?rev=1622521&r1=1622520&r2=1622521&view=diff
==============================================================================
--- pig/branches/spark/test/tez-tests (original)
+++ pig/branches/spark/test/tez-tests Thu Sep 4 18:10:42 2014
@@ -61,3 +61,12 @@
**/TestTezCompiler.java
**/TestTezJobControlCompiler.java
**/TestTezLauncher.java
+**/TestAccumuloPigCluster.java
+**/TestBigTypeSort.java
+**/TestCurrentTime.java
+**/TestInvokerGenerator.java
+**/TestGroupConstParallelTez.java
+**/TestJobSubmissionTez.java
+**/TestMergeJoin.java
+**/TestNativeMapReduce.java
+**/TestPigProgressReporting.java