You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2014/09/04 02:11:52 UTC
svn commit: r1622382 - in /pig/trunk: ./
shims/test/hadoop23/org/apache/pig/test/
src/org/apache/pig/backend/hadoop/executionengine/tez/
src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/
src/org/apache/pig/backend/hadoop/executionengine...
Author: daijy
Date: Thu Sep 4 00:11:51 2014
New Revision: 1622382
URL: http://svn.apache.org/r1622382
Log:
PIG-4143: Port more mini cluster tests to Tez - part 7
Modified:
pig/trunk/CHANGES.txt
pig/trunk/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/LoaderProcessor.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
pig/trunk/src/org/apache/pig/impl/plan/OperatorPlan.java
pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezStats.java
pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezTaskStats.java
pig/trunk/test/org/apache/pig/test/TestGroupConstParallel.java
pig/trunk/test/org/apache/pig/test/TestJobSubmission.java
pig/trunk/test/org/apache/pig/test/TestMergeJoin.java
pig/trunk/test/org/apache/pig/test/TestNativeMapReduce.java
pig/trunk/test/org/apache/pig/test/TestPigProgressReporting.java
pig/trunk/test/org/apache/pig/test/Util.java
pig/trunk/test/org/apache/pig/test/utils/ReportingUDF.java
pig/trunk/test/tez-tests
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1622382&r1=1622381&r2=1622382&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Thu Sep 4 00:11:51 2014
@@ -70,6 +70,8 @@ OPTIMIZATIONS
BUG FIXES
+PIG-4143: Port more mini cluster tests to Tez - part 7 (daijy)
+
PIG-4149: Rounding issue in FindQuantiles (daijy)
PIG-4145: Port local mode tests to Tez - part1 (daijy)
Modified: pig/trunk/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java
URL: http://svn.apache.org/viewvc/pig/trunk/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java?rev=1622382&r1=1622381&r2=1622382&view=diff
==============================================================================
--- pig/trunk/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java (original)
+++ pig/trunk/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java Thu Sep 4 00:11:51 2014
@@ -33,7 +33,6 @@ import org.apache.hadoop.mapreduce.v2.Mi
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.pig.ExecType;
import org.apache.pig.PigConfiguration;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
import org.apache.pig.backend.hadoop.executionengine.tez.TezExecType;
import org.apache.pig.backend.hadoop.executionengine.tez.TezSessionManager;
import org.apache.tez.dag.api.TezConfiguration;
@@ -97,7 +96,6 @@ public class TezMiniCluster extends Mini
m_mr.init(m_dfs_conf);
m_mr.start();
m_mr_conf = m_mr.getConfig();
- m_mr_conf.set(MRConfiguration.FRAMEWORK_NAME, "yarn-tez");
m_mr_conf.set(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
System.getProperty("java.class.path"));
m_mr_conf.set(MRJobConfig.MAP_JAVA_OPTS, "-Xmx2048m");
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1622382&r1=1622381&r2=1622382&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Thu Sep 4 00:11:51 2014
@@ -403,20 +403,6 @@ public class TezDagBuilder extends TezOp
payloadConf.set(PigProcessor.SORT_VERTEX, tezOp.sortOperator.getOperatorKey().toString());
}
- String tmp;
- long maxCombinedSplitSize = 0;
- if (!tezOp.combineSmallSplits() || pc.getProperties().getProperty(PigConfiguration.PIG_SPLIT_COMBINATION, "true").equals("false"))
- payloadConf.setBoolean(PigConfiguration.PIG_NO_SPLIT_COMBINATION, true);
- else if ((tmp = pc.getProperties().getProperty(PigConfiguration.PIG_MAX_COMBINED_SPLIT_SIZE, null)) != null) {
- try {
- maxCombinedSplitSize = Long.parseLong(tmp);
- } catch (NumberFormatException e) {
- log.warn("Invalid numeric format for pig.maxCombinedSplitSize; use the default maximum combined split size");
- }
- }
- if (maxCombinedSplitSize > 0)
- payloadConf.setLong("pig.maxCombinedSplitSize", maxCombinedSplitSize);
-
payloadConf.set("pig.inputs", ObjectSerializer.serialize(tezOp.getLoaderInfo().getInp()));
payloadConf.set("pig.inpSignatures", ObjectSerializer.serialize(tezOp.getLoaderInfo().getInpSignatureLists()));
payloadConf.set("pig.inpLimits", ObjectSerializer.serialize(tezOp.getLoaderInfo().getInpLimits()));
@@ -559,6 +545,11 @@ public class TezDagBuilder extends TezOp
log.info("Estimate quantile for sample aggregation vertex " + tezOp.getOperatorKey().toString());
}
+ // set various parallelism into the job conf for later analysis, PIG-2779
+ payloadConf.setInt("pig.info.reducers.default.parallel", pc.defaultParallel);
+ payloadConf.setInt("pig.info.reducers.requested.parallel", tezOp.getRequestedParallelism());
+ payloadConf.setInt("pig.info.reducers.estimated.parallel", tezOp.getEstimatedParallelism());
+
// Take our assembled configuration and create a vertex
UserPayload userPayload = TezUtils.createUserPayloadFromConf(payloadConf);
procDesc.setUserPayload(userPayload);
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java?rev=1622382&r1=1622381&r2=1622382&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezOperPlan.java Thu Sep 4 00:11:51 2014
@@ -24,15 +24,21 @@ import java.io.PrintStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
+import java.util.ArrayList;
import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.OperatorPlan;
+import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.Pair;
/**
* A Plan used to create the plan of Tez operators which can be converted into
@@ -156,5 +162,65 @@ public class TezOperPlan extends Operato
return super.disconnect(from, to);
}
+
+ /**
+ * Move everything below a given operator to the new operator plan. The specified operator will
+ * be moved and will be the root of the new operator plan
+ * @param root Operator to move everything under including the root operator
+ * @param newPlan new operator plan to move things into
+ * @throws PlanException
+ */
+ public void moveTree(TezOperator root, TezOperPlan newPlan) throws PlanException {
+ List<TezOperator> list = new ArrayList<TezOperator>();
+ list.add(root);
+ int prevSize = 0;
+ int pos = 0;
+ while (list.size() > prevSize) {
+ prevSize = list.size();
+ TezOperator node = list.get(pos);
+ if (getSuccessors(node)!=null) {
+ for (TezOperator succ : getSuccessors(node)) {
+ if (!list.contains(succ)) {
+ list.add(succ);
+ }
+ }
+ }
+ if (getPredecessors(node)!=null) {
+ for (TezOperator pred : getPredecessors(node)) {
+ if (!list.contains(pred)) {
+ list.add(pred);
+ }
+ }
+ }
+ pos++;
+ }
+
+ for (TezOperator node: list) {
+ newPlan.add(node);
+ }
+
+ Set<Pair<TezOperator, TezOperator>> toReconnect = new HashSet<Pair<TezOperator, TezOperator>>();
+ for (TezOperator from : mFromEdges.keySet()) {
+ List<TezOperator> tos = mFromEdges.get(from);
+ for (TezOperator to : tos) {
+ if (list.contains(from) || list.contains(to)) {
+ toReconnect.add(new Pair<TezOperator, TezOperator>(from, to));
+ }
+ }
+ }
+
+ for (Pair<TezOperator, TezOperator> pair : toReconnect) {
+ if (list.contains(pair.first) && list.contains(pair.second)) {
+ // Need to reconnect in newPlan
+ TezEdgeDescriptor edge = pair.second.inEdges.get(pair.first.getOperatorKey());
+ TezCompilerUtil.connect(newPlan, pair.first, pair.second, edge);
+ }
+ }
+
+ for (TezOperator node : list) {
+ // Simply remove from plan, don't deal with inEdges/outEdges
+ super.remove(node);
+ }
+ }
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/LoaderProcessor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/LoaderProcessor.java?rev=1622382&r1=1622381&r2=1622382&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/LoaderProcessor.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/optimizers/LoaderProcessor.java Thu Sep 4 00:11:51 2014
@@ -21,10 +21,13 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.pig.LoadFunc;
+import org.apache.pig.PigConfiguration;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigInputFormat;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
@@ -46,6 +49,7 @@ import org.apache.tez.mapreduce.hadoop.M
public class LoaderProcessor extends TezOpPlanVisitor {
private Configuration conf;
private PigContext pc;
+ private static final Log log = LogFactory.getLog(LoaderProcessor.class);
public LoaderProcessor(TezOperPlan plan, PigContext pigContext) {
super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan));
this.pc = pigContext;
@@ -125,6 +129,19 @@ public class LoaderProcessor extends Tez
conf.set("pig.inpTargets", ObjectSerializer.serialize(inpTargets));
conf.set("pig.inpSignatures", ObjectSerializer.serialize(inpSignatureLists));
conf.set("pig.inpLimits", ObjectSerializer.serialize(inpLimits));
+ String tmp;
+ long maxCombinedSplitSize = 0;
+ if (!tezOp.combineSmallSplits() || pc.getProperties().getProperty(PigConfiguration.PIG_SPLIT_COMBINATION, "true").equals("false"))
+ conf.setBoolean(PigConfiguration.PIG_NO_SPLIT_COMBINATION, true);
+ else if ((tmp = pc.getProperties().getProperty(PigConfiguration.PIG_MAX_COMBINED_SPLIT_SIZE, null)) != null) {
+ try {
+ maxCombinedSplitSize = Long.parseLong(tmp);
+ } catch (NumberFormatException e) {
+ log.warn("Invalid numeric format for pig.maxCombinedSplitSize; use the default maximum combined split size");
+ }
+ }
+ if (maxCombinedSplitSize > 0)
+ conf.setLong("pig.maxCombinedSplitSize", maxCombinedSplitSize);
tezOp.getLoaderInfo().setInpSignatureLists(inpSignatureLists);
tezOp.getLoaderInfo().setInp(inp);
tezOp.getLoaderInfo().setInpLimits(inpLimits);
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java?rev=1622382&r1=1622381&r2=1622382&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java Thu Sep 4 00:11:51 2014
@@ -103,11 +103,13 @@ public class TezCompilerUtil {
static public void connect(TezOperPlan plan, TezOperator from, TezOperator to, TezEdgeDescriptor edge) throws PlanException {
plan.connect(from, to);
- PhysicalOperator leaf = from.plan.getLeaves().get(0);
- // It could be POStoreTez incase of sampling job in order by
- if (leaf instanceof POLocalRearrangeTez) {
- POLocalRearrangeTez lr = (POLocalRearrangeTez) leaf;
- lr.setOutputKey(to.getOperatorKey().toString());
+ if (from.plan.getLeaves()!=null && !from.plan.getLeaves().isEmpty()) {
+ PhysicalOperator leaf = from.plan.getLeaves().get(0);
+ // It could be POStoreTez incase of sampling job in order by
+ if (leaf instanceof POLocalRearrangeTez) {
+ POLocalRearrangeTez lr = (POLocalRearrangeTez) leaf;
+ lr.setOutputKey(to.getOperatorKey().toString());
+ }
}
// Add edge descriptors to old and new operators
to.inEdges.put(from.getOperatorKey(), edge);
Modified: pig/trunk/src/org/apache/pig/impl/plan/OperatorPlan.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/plan/OperatorPlan.java?rev=1622382&r1=1622381&r2=1622382&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/plan/OperatorPlan.java (original)
+++ pig/trunk/src/org/apache/pig/impl/plan/OperatorPlan.java Thu Sep 4 00:11:51 2014
@@ -26,6 +26,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Deque;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -35,6 +36,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.pig.PigException;
import org.apache.pig.impl.util.MultiMap;
+import org.apache.pig.impl.util.Pair;
//import org.apache.commons.collections.map.MultiValueMap;
@@ -284,42 +286,6 @@ public abstract class OperatorPlan<E ext
}
}
}
-
- /**
- * Move everything below a given operator to the new operator plan. The specified operator will
- * be moved and will be the root of the new operator plan
- * @param root Operator to move everything after
- * @param newPlan new operator plan to move things into
- * @throws PlanException
- */
- public void moveTree(E root, OperatorPlan<E> newPlan) throws PlanException {
- Deque<E> queue = new ArrayDeque<E>();
- queue.addLast(root);
- while (!queue.isEmpty()) {
- E node = queue.poll();
- if (getSuccessors(node)!=null) {
- for (E succ : getSuccessors(node)) {
- if (!queue.contains(succ)) {
- queue.addLast(succ);
- }
- }
- }
- newPlan.add(node);
- }
-
- for (E from : mFromEdges.keySet()) {
- if (newPlan.mOps.containsKey(from)) {
- for (E to : mFromEdges.get(from)) {
- if (newPlan.mOps.containsKey(to)) {
- newPlan.connect(from, to);
- }
- }
- }
- }
-
- trimBelow(root);
- remove(root);
- }
/**
* Trim everything above a given operator. The specified operator will
Modified: pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezStats.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezStats.java?rev=1622382&r1=1622381&r2=1622382&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezStats.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezStats.java Thu Sep 4 00:11:51 2014
@@ -218,7 +218,7 @@ public class TezStats extends PigStats {
if (v != null) {
UserPayload payload = v.getProcessorDescriptor().getUserPayload();
Configuration conf = TezUtils.createConfFromUserPayload(payload);
- addVertexStats(name, conf, succeeded, tezJob.getVertexCounters(name));
+ addVertexStats(name, conf, succeeded, v.getParallelism(), tezJob.getVertexCounters(name));
}
}
if (!succeeded) {
@@ -226,12 +226,13 @@ public class TezStats extends PigStats {
}
}
- private void addVertexStats(String tezOpName, Configuration conf, boolean succeeded,
+ private void addVertexStats(String tezOpName, Configuration conf, boolean succeeded, int parallelism,
Map<String, Map<String, Long>> map) {
TezTaskStats stats = tezOpVertexMap.get(tezOpName);
stats.setConf(conf);
stats.setId(tezOpName);
stats.setSuccessful(succeeded);
+ stats.setParallelism(parallelism);
if (map == null) {
if (stats.hasLoadOrStore()) {
LOG.warn("Unable to get input(s)/output(s) of the job");
Modified: pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezTaskStats.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezTaskStats.java?rev=1622382&r1=1622381&r2=1622382&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezTaskStats.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezTaskStats.java Thu Sep 4 00:11:51 2014
@@ -29,6 +29,7 @@ public class TezTaskStats extends JobSta
private static final Log LOG = LogFactory.getLog(TezTaskStats.class);
private String vertexName;
+ private int parallelism;
private List<POStore> stores = null;
private List<FileSpec> loads = null;
@@ -40,6 +41,10 @@ public class TezTaskStats extends JobSta
this.vertexName = vertexName;
}
+ public void setParallelism(int p) {
+ this.parallelism = p;
+ }
+
@Override
public String getJobId() {
return (vertexName == null) ? "" : vertexName;
@@ -253,6 +258,10 @@ public class TezTaskStats extends JobSta
throw new UnsupportedOperationException();
}
+ public int getParallelism() {
+ return parallelism;
+ }
+
public boolean hasLoadOrStore() {
if ((loads != null && !loads.isEmpty())
|| (stores != null && !stores.isEmpty())) {
Modified: pig/trunk/test/org/apache/pig/test/TestGroupConstParallel.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestGroupConstParallel.java?rev=1622382&r1=1622381&r2=1622382&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestGroupConstParallel.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestGroupConstParallel.java Thu Sep 4 00:11:51 2014
@@ -17,44 +17,32 @@
*/
package org.apache.pig.test;
-import static org.junit.Assert.assertEquals;
-
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapred.jobcontrol.Job;
-import org.apache.hadoop.mapred.jobcontrol.JobControl;
-import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
-import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.util.ConfigurationValidator;
import org.apache.pig.newplan.OperatorPlan;
import org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer;
import org.apache.pig.newplan.logical.rules.GroupByConstParallelSetter;
import org.apache.pig.newplan.optimizer.Rule;
-import org.apache.pig.test.utils.GenPhyOp;
-import org.apache.pig.tools.pigstats.JobStats;
import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.tools.pigstats.PigStats.JobGraph;
-import org.apache.pig.tools.pigstats.mapreduce.MRJobStats;
import org.junit.AfterClass;
import org.junit.BeforeClass;
+import org.junit.Ignore;
import org.junit.Test;
-public class TestGroupConstParallel {
+@Ignore
+public abstract class TestGroupConstParallel {
private static final String INPUT_FILE = "TestGroupConstParallelInp";
- private static MiniCluster cluster = MiniCluster.buildCluster();
+ private static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
@BeforeClass
@@ -79,7 +67,7 @@ public class TestGroupConstParallel {
*/
@Test
public void testGroupAllWithParallel() throws Exception {
- PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster
+ PigServer pigServer = new PigServer(cluster.getExecType(), cluster
.getProperties());
@@ -95,42 +83,30 @@ public class TestGroupConstParallel {
Util.checkQueryOutputsAfterSort(iter, expectedRes);
JobGraph jGraph = PigStats.get().getJobGraph();
- assertEquals(1, jGraph.size());
- // find added map-only concatenate job
- MRJobStats js = (MRJobStats)jGraph.getSources().get(0);
- assertEquals(1, js.getNumberMaps());
- assertEquals(1, js.getNumberReduces());
+ checkGroupAllWithParallelGraphResult(jGraph);
}
-
}
-
-
+
+ abstract protected void checkGroupAllWithParallelGraphResult(JobGraph jGraph);
+
/**
* Test parallelism for group by constant
* @throws Throwable
*/
@Test
public void testGroupConstWithParallel() throws Throwable {
- PigContext pc = new PigContext(ExecType.MAPREDUCE, cluster.getProperties());
+ PigContext pc = new PigContext(cluster.getExecType(), cluster.getProperties());
pc.defaultParallel = 100;
pc.connect();
- String query = "a = load 'input';\n" + "b = group a by 1;" + "store b into 'output';";
- PigServer pigServer = new PigServer( ExecType.MAPREDUCE, cluster.getProperties() );
+ String query = "a = load '" + INPUT_FILE + "';\n" + "b = group a by 1;" + "store b into 'output';";
+ PigServer pigServer = new PigServer( cluster.getExecType(), cluster.getProperties() );
PhysicalPlan pp = Util.buildPp( pigServer, query );
-
- MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
- ConfigurationValidator.validatePigProperties(pc.getProperties());
- Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties());
- JobControlCompiler jcc = new JobControlCompiler(pc, conf);
-
- JobControl jobControl = jcc.compile(mrPlan, "Test");
- Job job = jobControl.getWaitingJobs().get(0);
- int parallel = job.getJobConf().getNumReduceTasks();
-
- assertEquals("parallism", 1, parallel);
+ checkGroupConstWithParallelResult(pp, pc);
}
+
+ abstract protected void checkGroupConstWithParallelResult(PhysicalPlan pp, PigContext pc) throws Exception;
/**
* Test parallelism for group by column
@@ -138,27 +114,20 @@ public class TestGroupConstParallel {
*/
@Test
public void testGroupNonConstWithParallel() throws Throwable {
- PigContext pc = new PigContext(ExecType.MAPREDUCE, cluster.getProperties());
+ PigContext pc = new PigContext(cluster.getExecType(), cluster.getProperties());
pc.defaultParallel = 100;
pc.connect();
- PigServer pigServer = new PigServer( ExecType.MAPREDUCE, cluster.getProperties() );
- String query = "a = load 'input';\n" + "b = group a by $0;" + "store b into 'output';";
+ PigServer pigServer = new PigServer( cluster.getExecType(), cluster.getProperties() );
+ String query = "a = load '" + INPUT_FILE + "';\n" + "b = group a by $0;" + "store b into 'output';";
PhysicalPlan pp = Util.buildPp( pigServer, query );
- MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
- ConfigurationValidator.validatePigProperties(pc.getProperties());
- Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties());
- JobControlCompiler jcc = new JobControlCompiler(pc, conf);
-
- JobControl jobControl = jcc.compile(mrPlan, "Test");
- Job job = jobControl.getWaitingJobs().get(0);
- int parallel = job.getJobConf().getNumReduceTasks();
-
- assertEquals("parallism", 100, parallel);
+ checkGroupNonConstWithParallelResult(pp, pc);
}
+ abstract protected void checkGroupNonConstWithParallelResult(PhysicalPlan pp, PigContext pc) throws Exception;
+
public class MyPlanOptimizer extends LogicalPlanOptimizer {
protected MyPlanOptimizer(OperatorPlan p, int iterations) {
Modified: pig/trunk/test/org/apache/pig/test/TestJobSubmission.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestJobSubmission.java?rev=1622382&r1=1622381&r2=1622382&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestJobSubmission.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestJobSubmission.java Thu Sep 4 00:11:51 2014
@@ -37,31 +37,28 @@ import org.apache.hadoop.hbase.MiniHBase
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.hadoop.mapred.jobcontrol.JobControl;
-import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
-import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobControlCompiler;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
-import org.apache.pig.data.DataType;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.util.ConfigurationValidator;
import org.apache.pig.test.utils.GenPhyOp;
import org.junit.After;
import org.junit.AfterClass;
+import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
+import org.junit.Ignore;
import org.junit.Test;
-public class TestJobSubmission {
+@Ignore
+abstract public class TestJobSubmission {
static PigContext pc;
@@ -75,11 +72,11 @@ public class TestJobSubmission {
String curDir;
String inpDir;
String golDir;
- static MiniCluster cluster = MiniCluster.buildCluster();
+ static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
@BeforeClass
public static void onetimeSetUp() throws Exception {
- pc = new PigContext(ExecType.MAPREDUCE, cluster.getProperties());
+ pc = new PigContext(cluster.getExecType(), cluster.getProperties());
try {
pc.connect();
} catch (ExecException e) {
@@ -115,63 +112,35 @@ public class TestJobSubmission {
@Test
public void testJobControlCompilerErr() throws Exception {
- String query = "a = load 'input';" + "b = order a by $0;" + "store b into 'output';";
- PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ String query = "a = load '/passwd' as (a1:bag{(t:chararray)});" + "b = order a by a1;" + "store b into 'output';";
+ PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
PhysicalPlan pp = Util.buildPp(pigServer, query);
- POStore store = GenPhyOp.dummyPigStorageOp();
- pp.addAsLeaf(store);
- MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
-
- for(MapReduceOper mro: mrPlan.getLeaves()) {
- if(mro.reducePlan != null) {
- PhysicalOperator po = mro.reducePlan.getRoots().get(0);
- if (po instanceof POPackage) {
- ((POPackage) po).getPkgr().setKeyType(DataType.BAG);
- mro.setGlobalSort(true);
- }
- }
- }
-
- ConfigurationValidator.validatePigProperties(pc.getProperties());
- Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties());
- JobControlCompiler jcc = new JobControlCompiler(pc, conf);
- try {
- jcc.compile(mrPlan, "Test");
- } catch (JobCreationException jce) {
- assertTrue(jce.getErrorCode() == 1068);
- }
+ checkJobControlCompilerErrResult(pp, pc);
}
+ abstract protected void checkJobControlCompilerErrResult(PhysicalPlan pp, PigContext pc) throws Exception;
+
@Test
public void testDefaultParallel() throws Throwable {
pc.defaultParallel = 100;
- String query = "a = load 'input';" + "b = group a by $0;" + "store b into 'output';";
- PigServer ps = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ String query = "a = load '/passwd';" + "b = group a by $0;" + "store b into 'output';";
+ PigServer ps = new PigServer(cluster.getExecType(), cluster.getProperties());
PhysicalPlan pp = Util.buildPp(ps, query);
- MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
-
- ConfigurationValidator.validatePigProperties(pc.getProperties());
- Configuration conf = ConfigurationUtil.toConfiguration(pc.getProperties());
- JobControlCompiler jcc = new JobControlCompiler(pc, conf);
-
- JobControl jobControl = jcc.compile(mrPlan, "Test");
- Job job = jobControl.getWaitingJobs().get(0);
- int parallel = job.getJobConf().getNumReduceTasks();
-
- assertEquals(100, parallel);
- Util.assertParallelValues(100, -1, -1, 100, job.getJobConf());
+ checkDefaultParallelResult(pp, pc);
pc.defaultParallel = -1;
}
+ abstract protected void checkDefaultParallelResult(PhysicalPlan pp, PigContext pc) throws Exception;
+
@Test
public void testDefaultParallelInSort() throws Throwable {
// default_parallel is considered only at runtime, so here we only test requested parallel
// more thorough tests can be found in TestNumberOfReducers.java
String query = "a = load 'input';" + "b = order a by $0 parallel 100;" + "store b into 'output';";
- PigServer ps = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ PigServer ps = new PigServer(cluster.getExecType(), cluster.getProperties());
PhysicalPlan pp = Util.buildPp(ps, query);
MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
@@ -198,7 +167,7 @@ public class TestJobSubmission {
"b = load 'input';" +
"c = join a by $0, b by $0 using 'skewed' parallel 100;" +
"store c into 'output';";
- PigServer ps = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ PigServer ps = new PigServer(cluster.getExecType(), cluster.getProperties());
PhysicalPlan pp = Util.buildPp(ps, query);
MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
@@ -219,6 +188,10 @@ public class TestJobSubmission {
@Test
public void testReducerNumEstimation() throws Exception{
+ // Skip the test for Tez. Tez use a different mechanism.
+ // Equivalent test is in TestTezAutoParallelism
+ Assume.assumeTrue("Skip this test for TEZ",
+ Util.isMapredExecType(cluster.getExecType()));
// use the estimation
Configuration conf = HBaseConfiguration.create(new Configuration());
HBaseTestingUtility util = new HBaseTestingUtility(conf);
@@ -228,7 +201,7 @@ public class TestJobSubmission {
String query = "a = load '/passwd';" +
"b = group a by $0;" +
"store b into 'output';";
- PigServer ps = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ PigServer ps = new PigServer(cluster.getExecType(), cluster.getProperties());
PhysicalPlan pp = Util.buildPp(ps, query);
MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
@@ -295,6 +268,10 @@ public class TestJobSubmission {
@Test
public void testReducerNumEstimationForOrderBy() throws Exception{
+ // Skip the test for Tez. Tez use a different mechanism.
+ // Equivalent test is in TestTezAutoParallelism
+ Assume.assumeTrue("Skip this test for TEZ",
+ Util.isMapredExecType(cluster.getExecType()));
// use the estimation
pc.getProperties().setProperty("pig.exec.reducers.bytes.per.reducer", "100");
pc.getProperties().setProperty("pig.exec.reducers.max", "10");
@@ -302,7 +279,7 @@ public class TestJobSubmission {
String query = "a = load '/passwd';" +
"b = order a by $0;" +
"store b into 'output';";
- PigServer ps = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ PigServer ps = new PigServer(cluster.getExecType(), cluster.getProperties());
PhysicalPlan pp = Util.buildPp(ps, query);
MROperPlan mrPlan = Util.buildMRPlanWithOptimizer(pp, pc);
Modified: pig/trunk/test/org/apache/pig/test/TestMergeJoin.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestMergeJoin.java?rev=1622382&r1=1622381&r2=1622382&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestMergeJoin.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestMergeJoin.java Thu Sep 4 00:11:51 2014
@@ -53,14 +53,14 @@ public class TestMergeJoin {
private static final String INPUT_FILE = "testMergeJoinInput.txt";
private static final String INPUT_FILE2 = "testMergeJoinInput2.txt";
private PigServer pigServer;
- private static MiniCluster cluster = MiniCluster.buildCluster();
+ private static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
public TestMergeJoin() throws ExecException{
Properties props = cluster.getProperties();
props.setProperty(MRConfiguration.MAP_MAX_ATTEMPTS, "1");
props.setProperty(MRConfiguration.REDUCE_MAX_ATTEMPTS, "1");
- pigServer = new PigServer(ExecType.MAPREDUCE, props);
+ pigServer = new PigServer(cluster.getExecType(), props);
}
/**
* @throws java.lang.Exception
Modified: pig/trunk/test/org/apache/pig/test/TestNativeMapReduce.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestNativeMapReduce.java?rev=1622382&r1=1622381&r2=1622382&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestNativeMapReduce.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestNativeMapReduce.java Thu Sep 4 00:11:51 2014
@@ -27,10 +27,11 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
-import org.apache.pig.ExecType;
+import org.apache.hadoop.mapred.FileAlreadyExistsException;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecJob;
import org.apache.pig.backend.executionengine.ExecJob.JOB_STATUS;
+import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
import org.apache.pig.data.Tuple;
import org.apache.pig.tools.pigstats.PigStats;
import org.junit.AfterClass;
@@ -58,7 +59,7 @@ public class TestNativeMapReduce {
* file if specified will be skipped by the wordcount udf
*/
final static String STOPWORD_FILE = "TestNMapReduceStopwFile";
- static MiniCluster cluster = MiniCluster.buildCluster();
+ static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
private PigServer pigServer = null;
/**
@@ -97,7 +98,7 @@ public class TestNativeMapReduce {
@Before
public void setUp() throws Exception{
- pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
//createWordCountJar();
}
@@ -206,6 +207,9 @@ public class TestNativeMapReduce {
assertTrue("job failed", PigStats.get().getReturnCode() != 0);
+ } catch (JobCreationException e) {
+ // Running in Tez mode throw exception
+ assertTrue(e.getCause() instanceof FileAlreadyExistsException);
}
finally{
// We have to manually delete intermediate mapreduce files
Modified: pig/trunk/test/org/apache/pig/test/TestPigProgressReporting.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPigProgressReporting.java?rev=1622382&r1=1622381&r2=1622382&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPigProgressReporting.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPigProgressReporting.java Thu Sep 4 00:11:51 2014
@@ -32,7 +32,7 @@ import org.junit.Test;
public class TestPigProgressReporting {
- static MiniCluster cluster = MiniCluster.buildCluster();
+ static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
@Test
public void testProgressReportingWithStatusMessage() throws Exception {
@@ -46,7 +46,7 @@ public class TestPigProgressReporting {
Util.createInputFile(cluster, "a.txt", new String[] { "dummy"});
- PigServer pig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ PigServer pig = new PigServer(cluster.getExecType(), cluster.getProperties());
String filename = prepareTempFile();
filename = filename.replace("\\", "\\\\");
Modified: pig/trunk/test/org/apache/pig/test/Util.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/Util.java?rev=1622382&r1=1622381&r2=1622382&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/Util.java (original)
+++ pig/trunk/test/org/apache/pig/test/Util.java Thu Sep 4 00:11:51 2014
@@ -1270,7 +1270,7 @@ public class Util {
assertConfLong(conf, MRConfiguration.REDUCE_TASKS, runtimeParallel);
}
- private static void assertConfLong(Configuration conf, String param, long expected) {
+ public static void assertConfLong(Configuration conf, String param, long expected) {
assertEquals("Unexpected value found in configs for " + param, expected, conf.getLong(param, -1));
}
Modified: pig/trunk/test/org/apache/pig/test/utils/ReportingUDF.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/utils/ReportingUDF.java?rev=1622382&r1=1622381&r2=1622382&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/utils/ReportingUDF.java (original)
+++ pig/trunk/test/org/apache/pig/test/utils/ReportingUDF.java Thu Sep 4 00:11:51 2014
@@ -22,6 +22,7 @@ import java.io.IOException;
import org.apache.pig.EvalFunc;
import org.apache.pig.data.Tuple;
+import org.apache.pig.tools.pigstats.PigStatusReporter;
public class ReportingUDF extends EvalFunc<Integer> {
@@ -30,7 +31,8 @@ public class ReportingUDF extends EvalFu
try {
Thread.sleep(7500);
- getReporter().progress("Progressing");
+ PigStatusReporter reporter = PigStatusReporter.getInstance();
+ reporter.progress();
Thread.sleep(7500);
} catch (InterruptedException e) {
}
Modified: pig/trunk/test/tez-tests
URL: http://svn.apache.org/viewvc/pig/trunk/test/tez-tests?rev=1622382&r1=1622381&r2=1622382&view=diff
==============================================================================
--- pig/trunk/test/tez-tests (original)
+++ pig/trunk/test/tez-tests Thu Sep 4 00:11:51 2014
@@ -65,3 +65,8 @@
**/TestBigTypeSort.java
**/TestCurrentTime.java
**/TestInvokerGenerator.java
+**/TestGroupConstParallelTez.java
+**/TestJobSubmissionTez.java
+**/TestMergeJoin.java
+**/TestNativeMapReduce.java
+**/TestPigProgressReporting.java