You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2014/09/24 19:55:35 UTC
svn commit: r1627376 - in /pig/trunk: ./ src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/tez/
src/org/apache/pig/backend/hadoop/executionengine/tez/plan/ src/org/ap...
Author: rohini
Date: Wed Sep 24 17:55:34 2014
New Revision: 1627376
URL: http://svn.apache.org/r1627376
Log:
PIG-4162: Intermediate reducer parallelism in Tez should be higher (rohini)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/Main.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezParallelismEstimator.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/ParallelConstantVisitor.java
pig/trunk/src/org/apache/pig/impl/PigImplConstants.java
pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java
pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezStats.java
pig/trunk/test/e2e/pig/tests/bigdata.conf
pig/trunk/test/org/apache/pig/test/TestAlgebraicEval.java
pig/trunk/test/org/apache/pig/test/TestForEachNestedPlan.java
pig/trunk/test/tez-tests
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1627376&r1=1627375&r2=1627376&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed Sep 24 17:55:34 2014
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-4162: Intermediate reducer parallelism in Tez should be higher (rohini)
+
PIG-4186: Fix e2e run against new build of pig and some enhancements (rohini)
PIG-3838: Organize tez code into subpackages (rohini)
Modified: pig/trunk/src/org/apache/pig/Main.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/Main.java?rev=1627376&r1=1627375&r2=1627376&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/Main.java (original)
+++ pig/trunk/src/org/apache/pig/Main.java Wed Sep 24 17:55:34 2014
@@ -167,26 +167,17 @@ public class Main {
* @throws IOException
*/
public static void main(String args[]) {
- DateTime startTime = new DateTime();
-
- int exitcode = run(args, null);
-
- DateTime endTime = new DateTime();
- Duration duration = new Duration(startTime, endTime);
- Period period = duration.toPeriod().normalizedStandard(PeriodType.time());
- log.info("Pig script completed in "
- + PeriodFormat.getDefault().print(period)
- + " (" + duration.getMillis() + " ms)");
-
- System.exit(exitcode);
+ System.exit(run(args, null));
}
static int run(String args[], PigProgressNotificationListener listener) {
+ DateTime startTime = new DateTime();
int rc = 1;
boolean verbose = false;
boolean gruntCalled = false;
boolean deleteTempFiles = true;
String logFileName = null;
+ boolean printScriptRunTime = true;
try {
Configuration conf = new Configuration(false);
@@ -301,6 +292,7 @@ public class Main {
return ReturnCode.SUCCESS;
case 'i':
+ printScriptRunTime = false;
System.out.println(getVersionString());
return ReturnCode.SUCCESS;
@@ -670,6 +662,9 @@ public class Main {
LogUtils.writeLog(e, logFileName, log, verbose, "Error before Pig is launched");
}
} finally {
+ if (printScriptRunTime) {
+ printScriptRunTime(startTime);
+ }
if (deleteTempFiles) {
// clear temp files
FileLocalizer.deleteTempFiles();
@@ -680,6 +675,15 @@ public class Main {
return rc;
}
+ private static void printScriptRunTime(DateTime startTime) {
+ DateTime endTime = new DateTime();
+ Duration duration = new Duration(startTime, endTime);
+ Period period = duration.toPeriod().normalizedStandard(PeriodType.time());
+ log.info("Pig script completed in "
+ + PeriodFormat.getDefault().print(period)
+ + " (" + duration.getMillis() + " ms)");
+ }
+
protected static PigProgressNotificationListener makeListener(Properties properties) {
try {
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1627376&r1=1627375&r2=1627376&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java Wed Sep 24 17:55:34 2014
@@ -1039,9 +1039,9 @@ public class JobControlCompiler{
Configuration conf = nwJob.getConfiguration();
// set various parallelism into the job conf for later analysis, PIG-2779
- conf.setInt("pig.info.reducers.default.parallel", pigContext.defaultParallel);
- conf.setInt("pig.info.reducers.requested.parallel", mro.requestedParallelism);
- conf.setInt("pig.info.reducers.estimated.parallel", mro.estimatedParallelism);
+ conf.setInt(PigImplConstants.REDUCER_DEFAULT_PARALLELISM, pigContext.defaultParallel);
+ conf.setInt(PigImplConstants.REDUCER_REQUESTED_PARALLELISM, mro.requestedParallelism);
+ conf.setInt(PigImplConstants.REDUCER_ESTIMATED_PARALLELISM, mro.estimatedParallelism);
// this is for backward compatibility, and we encourage to use runtimeParallelism at runtime
mro.requestedParallelism = jobParallelism;
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1627376&r1=1627375&r2=1627376&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java Wed Sep 24 17:55:34 2014
@@ -29,6 +29,7 @@ import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.WritableComparable;
@@ -88,6 +89,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
+import org.apache.pig.backend.hadoop.executionengine.shims.HadoopShims;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezEdgeDescriptor;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOpPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
@@ -102,8 +104,10 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.tez.runtime.PigProcessor;
import org.apache.pig.backend.hadoop.executionengine.tez.util.MRToTezHelper;
import org.apache.pig.backend.hadoop.executionengine.tez.util.SecurityHelper;
+import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil;
import org.apache.pig.data.DataType;
import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.PigImplConstants;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.NullablePartitionWritable;
import org.apache.pig.impl.io.NullableTuple;
@@ -156,6 +160,7 @@ public class TezDagBuilder extends TezOp
private Map<String, LocalResource> localResources;
private PigContext pc;
private Configuration globalConf;
+ private long intermediateTaskInputSize;
public TezDagBuilder(PigContext pc, TezOperPlan plan, DAG dag,
Map<String, LocalResource> localResources) {
@@ -172,6 +177,19 @@ public class TezDagBuilder extends TezOp
} catch (IOException e) {
throw new RuntimeException("Error while fetching delegation tokens", e);
}
+
+ try {
+ intermediateTaskInputSize = HadoopShims.getDefaultBlockSize(FileSystem.get(globalConf), FileLocalizer.getTemporaryResourcePath(pc));
+ } catch (Exception e) {
+ log.warn("Unable to get the block size for temporary directory, defaulting to 128MB", e);
+ intermediateTaskInputSize = 134217728L;
+ }
+ // At least 128MB. Else we will end up with too many tasks
+ intermediateTaskInputSize = Math.max(intermediateTaskInputSize, 134217728L);
+ intermediateTaskInputSize = Math.min(intermediateTaskInputSize,
+ globalConf.getLong(
+ InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
+ InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER));
}
@Override
@@ -410,7 +428,19 @@ public class TezDagBuilder extends TezOp
}
if (tezOp.getSortOperator() != null) {
+ // Required by Sample Aggregation job for estimating quantiles
payloadConf.set(PigProcessor.SORT_VERTEX, tezOp.getSortOperator().getOperatorKey().toString());
+ // PIG-4162: Order by/Skew Join in intermediate stage.
+ // Increasing order by parallelism may not be required as it is
+ // usually followed by limit other than store. But would benefit
+ // cases like skewed join followed by group by.
+ if (tezOp.getSortOperator().getEstimatedParallelism() != -1
+ && TezCompilerUtil.isIntermediateReducer(tezOp.getSortOperator())) {
+ payloadConf.setLong(
+ InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
+ intermediateTaskInputSize);
+ }
+
}
payloadConf.set("pig.inputs", ObjectSerializer.serialize(tezOp.getLoaderInfo().getInp()));
@@ -453,8 +483,7 @@ public class TezDagBuilder extends TezOp
tezOp.plan.remove(pack);
payloadConf.set("pig.reduce.package", ObjectSerializer.serialize(pack));
setIntermediateOutputKeyValue(keyType, payloadConf, tezOp);
- POShuffleTezLoad newPack;
- newPack = new POShuffleTezLoad(pack);
+ POShuffleTezLoad newPack = new POShuffleTezLoad(pack);
if (tezOp.isSkewedJoin()) {
newPack.setSkewedJoins(true);
}
@@ -556,9 +585,9 @@ public class TezDagBuilder extends TezOp
}
// set various parallelism into the job conf for later analysis, PIG-2779
- payloadConf.setInt("pig.info.reducers.default.parallel", pc.defaultParallel);
- payloadConf.setInt("pig.info.reducers.requested.parallel", tezOp.getRequestedParallelism());
- payloadConf.setInt("pig.info.reducers.estimated.parallel", tezOp.getEstimatedParallelism());
+ payloadConf.setInt(PigImplConstants.REDUCER_DEFAULT_PARALLELISM, pc.defaultParallel);
+ payloadConf.setInt(PigImplConstants.REDUCER_REQUESTED_PARALLELISM, tezOp.getRequestedParallelism());
+ payloadConf.setInt(PigImplConstants.REDUCER_ESTIMATED_PARALLELISM, tezOp.getEstimatedParallelism());
// Take our assembled configuration and create a vertex
UserPayload userPayload = TezUtils.createUserPayloadFromConf(payloadConf);
@@ -670,8 +699,12 @@ public class TezDagBuilder extends TezOp
vmPluginName = ShuffleVertexManager.class.getName();
vmPluginConf = (vmPluginConf == null) ? ConfigurationUtil.toConfiguration(pc.getProperties(), false) : vmPluginConf;
vmPluginConf.setBoolean(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_ENABLE_AUTO_PARALLEL, true);
- if (vmPluginConf.getLong(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
- InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER)!=
+ if (stores.size() <= 0) {
+ // Intermediate reduce. Set the bytes per reducer to be block size.
+ vmPluginConf.setLong(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE,
+ intermediateTaskInputSize);
+ } else if (vmPluginConf.getLong(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
+ InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER) !=
InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER) {
vmPluginConf.setLong(ShuffleVertexManager.TEZ_SHUFFLE_VERTEX_MANAGER_DESIRED_TASK_INPUT_SIZE,
vmPluginConf.getLong(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java?rev=1627376&r1=1627375&r2=1627376&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java Wed Sep 24 17:55:34 2014
@@ -270,7 +270,11 @@ public class TezLauncher extends Launche
TezTaskStats tts = tezStats.getVertexStats(v.getName());
tezScriptState.emitjobFinishedNotification(tts);
Map<String, Map<String, Long>> counterGroups = runningJob.getVertexCounters(v.getName());
- computeWarningAggregate(counterGroups, warningAggMap);
+ if (counterGroups == null) {
+ log.warn("Counters are not available for vertex " + v.getName() + ". Not computing warning aggregates.");
+ } else {
+ computeWarningAggregate(counterGroups, warningAggMap);
+ }
}
if (aggregateWarning) {
CompilationMessageCollector.logAggregate(warningAggMap, MessageType.Warning, log);
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java?rev=1627376&r1=1627375&r2=1627376&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezResourceManager.java Wed Sep 24 17:55:34 2014
@@ -33,7 +33,6 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.util.ConverterUtils;
-import org.apache.pig.backend.hadoop.datastorage.HPath;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
@@ -54,7 +53,7 @@ public class TezResourceManager {
public void init(PigContext pigContext, Configuration conf) throws IOException {
if (!inited) {
- this.stagingDir = ((HPath)FileLocalizer.getTemporaryResourcePath(pigContext)).getPath();
+ this.stagingDir = FileLocalizer.getTemporaryResourcePath(pigContext);
this.remoteFs = FileSystem.get(conf);
this.conf = conf;
this.inited = true;
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java?rev=1627376&r1=1627375&r2=1627376&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java Wed Sep 24 17:55:34 2014
@@ -810,6 +810,7 @@ public class TezCompiler extends PhyPlan
}
}
if (canStop) {
+ curTezOp.setDontEstimateParallelism(true);
if (limitAfterSort) {
curTezOp.markLimitAfterSort();
} else {
@@ -830,6 +831,7 @@ public class TezCompiler extends PhyPlan
// Explicitly set the parallelism for the new vertex to 1.
curTezOp.setRequestedParallelism(1);
+ curTezOp.setDontEstimateParallelism(true);
output.addOutputKey(curTezOp.getOperatorKey().toString());
// LIMIT does not make any ordering guarantees and this is unsorted shuffle.
@@ -1094,6 +1096,7 @@ public class TezCompiler extends PhyPlan
indexAggrOper.segmentBelow = true;
indexerTezOp.setRequestedParallelism(1); // we need exactly one reducer for indexing job.
+ indexerTezOp.setDontEstimateParallelism(true);
POStore st = TezCompilerUtil.getStore(scope, nig);
FileSpec strFile = getTempFileSpec();
@@ -1261,6 +1264,7 @@ public class TezCompiler extends PhyPlan
tezPlan.add(rightTezOprAggr);
TezCompilerUtil.simpleConnectTwoVertex(tezPlan, rightTezOpr, rightTezOprAggr, scope, nig);
rightTezOprAggr.setRequestedParallelism(1); // we need exactly one task for indexing job.
+ rightTezOprAggr.setDontEstimateParallelism(true);
POStore st = TezCompilerUtil.getStore(scope, nig);
FileSpec strFile = getTempFileSpec();
@@ -1401,6 +1405,7 @@ public class TezCompiler extends PhyPlan
POCounterStatsTez counterStatsTez = new POCounterStatsTez(OperatorKey.genOpKey(scope));
statsOper.plan.addAsLeaf(counterStatsTez);
statsOper.setRequestedParallelism(1);
+ statsOper.setDontEstimateParallelism(true);
//Construct Vertex 3
TezOperator rankOper = getTezOp();
@@ -2037,6 +2042,7 @@ public class TezCompiler extends PhyPlan
oper.setClosed(true);
oper.setRequestedParallelism(1);
+ oper.setDontEstimateParallelism(true);
oper.markSampleAggregation();
return new Pair<TezOperator, Integer>(oper, rp);
}
@@ -2286,6 +2292,7 @@ public class TezCompiler extends PhyPlan
// Explicitly set the parallelism for the new vertex to 1.
limitOper.setRequestedParallelism(1);
+ limitOper.setDontEstimateParallelism(true);
limitOper.markLimitAfterSort();
edge = TezCompilerUtil.connect(tezPlan, sortOpers[1], limitOper);
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java?rev=1627376&r1=1627375&r2=1627376&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java Wed Sep 24 17:55:34 2014
@@ -70,6 +70,10 @@ public class TezOperator extends Operato
private int estimatedParallelism = -1;
+ // Do not estimate parallelism for specific vertices like limit, indexer,
+ // etc which should always be one
+ private boolean dontEstimateParallelism = false;
+
// This is the parallelism of the vertex, it take account of:
// 1. default_parallel
// 2. -1 parallelism for one_to_one edge
@@ -251,7 +255,17 @@ public class TezOperator extends Operato
}
public int getEffectiveParallelism() {
- return getRequestedParallelism()!=-1? getRequestedParallelism() : getEstimatedParallelism();
+ // PIG-4162: For intermediate reducers, use estimated parallelism over user set parallelism.
+ return getEstimatedParallelism() == -1 ? getRequestedParallelism()
+ : getEstimatedParallelism();
+ }
+
+ public boolean isDontEstimateParallelism() {
+ return dontEstimateParallelism;
+ }
+
+ public void setDontEstimateParallelism(boolean dontEstimateParallelism) {
+ this.dontEstimateParallelism = dontEstimateParallelism;
}
public OperatorKey getSplitParent() {
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java?rev=1627376&r1=1627375&r2=1627376&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java Wed Sep 24 17:55:34 2014
@@ -17,17 +17,21 @@
*/
package org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer;
-import java.io.IOException;
+import java.util.LinkedList;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.pig.PigConfiguration;
+import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezEdgeDescriptor;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOpPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.NativeTezOper;
+import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil;
import org.apache.pig.backend.hadoop.executionengine.util.ParallelConstantVisitor;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.plan.DependencyOrderWalker;
@@ -36,12 +40,26 @@ import org.apache.pig.impl.plan.VisitorE
import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
public class ParallelismSetter extends TezOpPlanVisitor {
- Configuration conf;
- PigContext pc;
+ private Configuration conf;
+ private PigContext pc;
+ private TezParallelismEstimator estimator;
+ private boolean autoParallelismEnabled;
+
public ParallelismSetter(TezOperPlan plan, PigContext pigContext) {
super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan));
this.pc = pigContext;
this.conf = ConfigurationUtil.toConfiguration(pc.getProperties());
+ this.autoParallelismEnabled = conf.getBoolean(PigConfiguration.TEZ_AUTO_PARALLELISM, true);
+ try {
+ this.estimator = conf.get(PigConfiguration.REDUCER_ESTIMATOR_KEY) == null ? new TezOperDependencyParallelismEstimator()
+ : PigContext.instantiateObjectFromParams(conf,
+ PigConfiguration.REDUCER_ESTIMATOR_KEY, PigConfiguration.REDUCER_ESTIMATOR_ARG_KEY,
+ TezParallelismEstimator.class);
+ this.estimator.setPigContext(pc);
+
+ } catch (ExecException e) {
+ throw new RuntimeException("Error instantiating TezParallelismEstimator", e);
+ }
}
@Override
@@ -53,6 +71,11 @@ public class ParallelismSetter extends T
// Can only set parallelism here if the parallelism isn't derived from
// splits
int parallelism = -1;
+ boolean intermediateReducer = false;
+ LinkedList<POStore> stores = PlanHelper.getPhysicalOperators(tezOp.plan, POStore.class);
+ if (stores.size() <= 0) {
+ intermediateReducer = true;
+ }
if (tezOp.getLoaderInfo().getLoads() != null && tezOp.getLoaderInfo().getLoads().size() > 0) {
// TODO: Can be set to -1 if TEZ-601 gets fixed and getting input
// splits can be moved to if(loads) block below
@@ -61,6 +84,8 @@ public class ParallelismSetter extends T
} else {
int prevParallelism = -1;
boolean isOneToOneParallelism = false;
+ intermediateReducer = TezCompilerUtil.isIntermediateReducer(tezOp);
+
for (Map.Entry<OperatorKey,TezEdgeDescriptor> entry : tezOp.inEdges.entrySet()) {
if (entry.getValue().dataMovementType == DataMovementType.ONE_TO_ONE) {
TezOperator pred = mPlan.getOperator(entry.getKey());
@@ -71,24 +96,29 @@ public class ParallelismSetter extends T
throw new VisitorException("one to one sources parallelism for vertex "
+ tezOp.getOperatorKey().toString() + " are not equal");
}
- if (pred.getRequestedParallelism()!=-1) {
- tezOp.setRequestedParallelism(pred.getRequestedParallelism());
- } else {
- tezOp.setEstimatedParallelism(pred.getEstimatedParallelism());
- }
+ tezOp.setRequestedParallelism(pred.getRequestedParallelism());
+ tezOp.setEstimatedParallelism(pred.getEstimatedParallelism());
isOneToOneParallelism = true;
parallelism = -1;
}
}
if (!isOneToOneParallelism) {
- if (tezOp.getRequestedParallelism()!=-1) {
+ if (tezOp.getRequestedParallelism() != -1) {
parallelism = tezOp.getRequestedParallelism();
- } else if (pc.defaultParallel!=-1) {
+ } else if (pc.defaultParallel != -1) {
parallelism = pc.defaultParallel;
- } else {
- parallelism = estimateParallelism(mPlan, tezOp);
- tezOp.setEstimatedParallelism(parallelism);
- if (tezOp.isGlobalSort()||tezOp.isSkewedJoin()) {
+ }
+ if (autoParallelismEnabled &&
+ ((parallelism == -1 || intermediateReducer) && !tezOp.isDontEstimateParallelism())) {
+ if (tezOp.getEstimatedParallelism() == -1) {
+ // Override user specified parallelism with the estimated value
+ // if it is intermediate reducer
+ parallelism = estimator.estimateParallelism(mPlan, tezOp, conf);
+ tezOp.setEstimatedParallelism(parallelism);
+ } else {
+ parallelism = tezOp.getEstimatedParallelism();
+ }
+ if (tezOp.isGlobalSort() || tezOp.isSkewedJoin()) {
// Vertex manager will set parallelism
parallelism = -1;
}
@@ -98,7 +128,7 @@ public class ParallelismSetter extends T
// Once we decide the parallelism of the sampler, propagate to
// downstream operators if necessary
- if (tezOp.isSampler()) {
+ if (tezOp.isSampler() && autoParallelismEnabled) {
// There could be multiple sampler and share the same sample aggregation job
// and partitioner job
TezOperator sampleAggregationOper = null;
@@ -116,7 +146,7 @@ public class ParallelismSetter extends T
}
sortOper = mPlan.getSuccessors(sampleBasedPartionerOper).get(0);
- if (sortOper.getRequestedParallelism()==-1 && pc.defaultParallel==-1) {
+ if ((sortOper.getRequestedParallelism() == -1 && pc.defaultParallel == -1) || TezCompilerUtil.isIntermediateReducer(sortOper)) {
// set estimate parallelism for order by/skewed join to sampler parallelism
// that include:
// 1. sort operator
@@ -125,6 +155,7 @@ public class ParallelismSetter extends T
ParallelConstantVisitor visitor =
new ParallelConstantVisitor(sampleAggregationOper.plan, parallelism);
visitor.visit();
+ sampleAggregationOper.setNeedEstimatedQuantile(true);
}
}
@@ -139,14 +170,4 @@ public class ParallelismSetter extends T
}
}
- private int estimateParallelism(TezOperPlan tezPlan, TezOperator tezOp) throws IOException {
-
- TezParallelismEstimator estimator = conf.get(PigConfiguration.REDUCER_ESTIMATOR_KEY) == null ? new TezOperDependencyParallelismEstimator()
- : PigContext.instantiateObjectFromParams(conf,
- PigConfiguration.REDUCER_ESTIMATOR_KEY, PigConfiguration.REDUCER_ESTIMATOR_ARG_KEY,
- TezParallelismEstimator.class);
-
- int numberOfReducers = estimator.estimateParallelism(tezPlan, tezOp, conf);
- return numberOfReducers;
- }
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java?rev=1627376&r1=1627375&r2=1627376&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java Wed Sep 24 17:55:34 2014
@@ -42,7 +42,8 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POLocalRearrangeTez;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.operator.POValueOutputTez;
-import org.apache.pig.backend.hadoop.executionengine.util.ParallelConstantVisitor;
+import org.apache.pig.backend.hadoop.executionengine.tez.util.TezCompilerUtil;
+import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.plan.DepthFirstWalker;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
@@ -64,6 +65,15 @@ public class TezOperDependencyParallelis
static final double DEFAULT_FILTER_FACTOR = 0.7;
static final double DEFAULT_LIMIT_FACTOR = 0.1;
+ static final int DEFAULT_MAX_INTERMEDIATE_REDUCER_COUNT_PARAM = 2999;
+
+ private PigContext pc;
+
+ @Override
+ public void setPigContext(PigContext pc) {
+ this.pc = pc;
+ }
+
@Override
public int estimateParallelism(TezOperPlan plan, TezOperator tezOper, Configuration conf) throws IOException {
@@ -71,11 +81,13 @@ public class TezOperDependencyParallelis
return -1;
}
+ boolean intermediateReducer = TezCompilerUtil.isIntermediateReducer(tezOper);
+
maxTaskCount = conf.getInt(PigReducerEstimator.MAX_REDUCER_COUNT_PARAM,
PigReducerEstimator.DEFAULT_MAX_REDUCER_COUNT_PARAM);
// If parallelism is set explicitly, respect it
- if (tezOper.getRequestedParallelism()!=-1) {
+ if (!intermediateReducer && tezOper.getRequestedParallelism()!=-1) {
return tezOper.getRequestedParallelism();
}
@@ -115,28 +127,26 @@ public class TezOperDependencyParallelis
}
int roundedEstimatedParallelism = (int)Math.ceil(estimatedParallelism);
- if (tezOper.isSampler()) {
- TezOperator sampleAggregationOper = null;
- TezOperator rangePartionerOper = null;
- TezOperator sortOper = null;
- for (TezOperator succ : plan.getSuccessors(tezOper)) {
- if (succ.isSampleAggregation()) {
- sampleAggregationOper = succ;
- } else if (succ.isSampleBasedPartitioner()) {
- rangePartionerOper = succ;
- }
- }
- sortOper = plan.getSuccessors(rangePartionerOper).get(0);
- if (sortOper.getRequestedParallelism()!=-1) {
-
- ParallelConstantVisitor visitor =
- new ParallelConstantVisitor(sampleAggregationOper.plan, roundedEstimatedParallelism);
- visitor.visit();
- }
+ if (intermediateReducer) {
+ // Estimated reducers should not be more than the configured limit
+ roundedEstimatedParallelism = Math.min(roundedEstimatedParallelism, Math.max(DEFAULT_MAX_INTERMEDIATE_REDUCER_COUNT_PARAM, maxTaskCount));
+ int userSpecifiedParallelism = pc.defaultParallel;
+ if (tezOper.getRequestedParallelism() != -1) {
+ userSpecifiedParallelism = tezOper.getRequestedParallelism();
+ }
+ int intermediateParallelism = Math.max(userSpecifiedParallelism, roundedEstimatedParallelism);
+ if (userSpecifiedParallelism != -1 && intermediateParallelism > (2 * userSpecifiedParallelism)) {
+ // Estimated reducers shall not be more than 2x of requested parallelism
+ // when we are overriding user specified values
+ intermediateParallelism = 2 * userSpecifiedParallelism;
+ }
+ roundedEstimatedParallelism = intermediateParallelism;
+ } else {
+ roundedEstimatedParallelism = Math.min(roundedEstimatedParallelism, maxTaskCount);
}
- return Math.min(roundedEstimatedParallelism, maxTaskCount);
+ return roundedEstimatedParallelism;
}
private static TezOperator getPredecessorWithKey(TezOperPlan plan, TezOperator tezOper, String inputKey) {
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezParallelismEstimator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezParallelismEstimator.java?rev=1627376&r1=1627375&r2=1627376&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezParallelismEstimator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezParallelismEstimator.java Wed Sep 24 17:55:34 2014
@@ -22,7 +22,12 @@ import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator;
+import org.apache.pig.impl.PigContext;
public interface TezParallelismEstimator {
- public int estimateParallelism(TezOperPlan plan, TezOperator tezOper, Configuration conf) throws IOException;
+
+ public void setPigContext(PigContext pc);
+
+ public int estimateParallelism(TezOperPlan plan, TezOperator tezOper,
+ Configuration conf) throws IOException;
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java?rev=1627376&r1=1627375&r2=1627376&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/util/TezCompilerUtil.java Wed Sep 24 17:55:34 2014
@@ -2,6 +2,7 @@ package org.apache.pig.backend.hadoop.ex
import java.io.IOException;
import java.util.ArrayList;
+import java.util.LinkedList;
import java.util.List;
import org.apache.pig.PigException;
@@ -12,6 +13,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezEdgeDescriptor;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator;
@@ -25,6 +27,7 @@ import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.VisitorException;
import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
import org.apache.tez.runtime.library.input.UnorderedKVInput;
import org.apache.tez.runtime.library.output.UnorderedKVOutput;
@@ -172,4 +175,19 @@ public class TezCompilerUtil {
edge.setIntermediateOutputValueClass(TUPLE_CLASS);
}
+ /**
+ * Returns true if there are no loads or stores in a TezOperator.
+ * To be called only after LoaderProcessor is called
+ */
+ static public boolean isIntermediateReducer(TezOperator tezOper) throws VisitorException {
+ boolean intermediateReducer = false;
+ LinkedList<POStore> stores = PlanHelper.getPhysicalOperators(tezOper.plan, POStore.class);
+ // Not map and not final reducer
+ if (stores.size() <= 0 &&
+ (tezOper.getLoaderInfo().getLoads() == null || tezOper.getLoaderInfo().getLoads().size() <= 0)) {
+ intermediateReducer = true;
+ }
+ return intermediateReducer;
+ }
+
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/ParallelConstantVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/ParallelConstantVisitor.java?rev=1627376&r1=1627375&r2=1627376&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/ParallelConstantVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/util/ParallelConstantVisitor.java Wed Sep 24 17:55:34 2014
@@ -38,18 +38,16 @@ public class ParallelConstantVisitor ext
@Override
public void visitConstant(ConstantExpression cnst) throws VisitorException {
- if (cnst.getRequestedParallelism() == -1) {
- Object obj = cnst.getValue();
- if (obj instanceof Integer) {
- if (replaced) {
- // sample job should have only one ConstantExpression
- throw new VisitorException("Invalid reduce plan: more " +
- "than one ConstantExpression found in sampling job");
- }
- cnst.setValue(rp);
- cnst.setRequestedParallelism(rp);
- replaced = true;
+ Object obj = cnst.getValue();
+ if (obj instanceof Integer) {
+ if (replaced) {
+ // sample job should have only one ConstantExpression
+ throw new VisitorException("Invalid reduce plan: more " +
+ "than one ConstantExpression found in sampling job");
}
+ cnst.setValue(rp);
+ cnst.setRequestedParallelism(rp);
+ replaced = true;
}
}
}
Modified: pig/trunk/src/org/apache/pig/impl/PigImplConstants.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/PigImplConstants.java?rev=1627376&r1=1627375&r2=1627376&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/PigImplConstants.java (original)
+++ pig/trunk/src/org/apache/pig/impl/PigImplConstants.java Wed Sep 24 17:55:34 2014
@@ -51,4 +51,11 @@ public class PigImplConstants {
* Indicate the split index of the task. Used by merge cogroup
*/
public static final String PIG_SPLIT_INDEX = "pig.split.index";
+
+ /**
+ * Parallelism for the reducer
+ */
+ public static final String REDUCER_DEFAULT_PARALLELISM = "pig.info.reducers.default.parallel";
+ public static final String REDUCER_REQUESTED_PARALLELISM = "pig.info.reducers.requested.parallel";
+ public static final String REDUCER_ESTIMATED_PARALLELISM = "pig.info.reducers.estimated.parallel";
}
Modified: pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java?rev=1627376&r1=1627375&r2=1627376&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java (original)
+++ pig/trunk/src/org/apache/pig/impl/io/FileLocalizer.java Wed Sep 24 17:55:34 2014
@@ -478,15 +478,15 @@ public class FileLocalizer {
* since resourthPath should be available in the entire session
*
* @param pigContext
- * @return
+ * @return temporary resource path
* @throws DataStorageException
*/
- public static synchronized ContainerDescriptor getTemporaryResourcePath(final PigContext pigContext)
+ public static synchronized Path getTemporaryResourcePath(final PigContext pigContext)
throws DataStorageException {
if (resourcePath == null) {
resourcePath = getTempContainer(pigContext);
}
- return resourcePath;
+ return ((HPath)resourcePath).getPath();
}
private static synchronized ContainerDescriptor getTempContainer(final PigContext pigContext)
@@ -800,7 +800,7 @@ public class FileLocalizer {
&& uri.getScheme() == null )||
// For Windows local files
(uri.getScheme() == null && uri.getPath().matches("^/[A-Za-z]:.*")) ||
- (uri.getScheme() != null && uri.getScheme().equals("local"))
+ (uri.getScheme() != null && uri.getScheme().equals("local"))
) {
srcFs = localFs;
} else {
Modified: pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezStats.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezStats.java?rev=1627376&r1=1627375&r2=1627376&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezStats.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/tez/TezStats.java Wed Sep 24 17:55:34 2014
@@ -230,14 +230,8 @@ public class TezStats extends PigStats {
stats.setId(tezOpName);
stats.setSuccessful(succeeded);
stats.setParallelism(parallelism);
- if (map == null) {
- if (stats.hasLoadOrStore()) {
- LOG.warn("Unable to get input(s)/output(s) of the job");
- }
- } else {
- stats.addInputStatistics(map);
- stats.addOutputStatistics(map);
- }
+ stats.addInputStatistics(map);
+ stats.addOutputStatistics(map);
}
private static String getDisplayString(TezJob tezJob) {
Modified: pig/trunk/test/e2e/pig/tests/bigdata.conf
URL: http://svn.apache.org/viewvc/pig/trunk/test/e2e/pig/tests/bigdata.conf?rev=1627376&r1=1627375&r2=1627376&view=diff
==============================================================================
--- pig/trunk/test/e2e/pig/tests/bigdata.conf (original)
+++ pig/trunk/test/e2e/pig/tests/bigdata.conf Wed Sep 24 17:55:34 2014
@@ -33,6 +33,7 @@ $cfg = {
{
'num' => 1,
,'floatpostprocess' => 1
+ ,'java_params' => ['-Dpig.tez.auto.parallelism=false']
,'delimiter' => ' ',
'pig' => q\a = load ':INPATH:/singlefile/studenttab20m' as (name, age, gpa);
b = load ':INPATH:/singlefile/votertab10k' as (name, age, registration, contributions);
Modified: pig/trunk/test/org/apache/pig/test/TestAlgebraicEval.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestAlgebraicEval.java?rev=1627376&r1=1627375&r2=1627376&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestAlgebraicEval.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestAlgebraicEval.java Wed Sep 24 17:55:34 2014
@@ -63,7 +63,7 @@ public class TestAlgebraicEval {
public void testGroupCountWithMultipleFields() throws Throwable {
File tmpFile = File.createTempFile("test", "txt");
for (int k = 0; k < nullFlags.length; k++) {
- System.err.println("Running testGroupCountWithMultipleFields with nullFlags set to " + nullFlags[k]);
+ System.out.println("Running testGroupCountWithMultipleFields with nullFlags set to " + nullFlags[k]);
// flag to indicate if both the keys forming
// the group key are null
int groupKeyWithNulls = 0;
Modified: pig/trunk/test/org/apache/pig/test/TestForEachNestedPlan.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestForEachNestedPlan.java?rev=1627376&r1=1627375&r2=1627376&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestForEachNestedPlan.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestForEachNestedPlan.java Wed Sep 24 17:55:34 2014
@@ -59,7 +59,7 @@ public class TestForEachNestedPlan {
@Test
public void testInnerOrderBy() throws Exception {
for (int i = 0; i < nullFlags.length; i++) {
- System.err.println("Running testInnerOrderBy with nullFlags set to :"
+ System.out.println("Running testInnerOrderBy with nullFlags set to :"
+ nullFlags[i]);
File tmpFile = genDataSetFile1(nullFlags[i]);
pig.registerQuery("a = load '"
Modified: pig/trunk/test/tez-tests
URL: http://svn.apache.org/viewvc/pig/trunk/test/tez-tests?rev=1627376&r1=1627375&r2=1627376&view=diff
==============================================================================
--- pig/trunk/test/tez-tests (original)
+++ pig/trunk/test/tez-tests Wed Sep 24 17:55:34 2014
@@ -1,3 +1,8 @@
+**/TestAccumuloPigCluster.java
+**/TestBigTypeSort.java
+**/TestCurrentTime.java
+**/TestInvokerGenerator.java
+**/TestStreamingUDF.java
**/TestAccumulator.java
**/TestAlgebraicEval.java
**/TestBZip.java
@@ -11,11 +16,11 @@
**/TestCustomPartitioner.java
**/TestEvalPipeline.java
**/TestEvalPipeline2.java
+**/TestFRJoin.java
+**/TestFRJoinNullValue.java
**/TestFilterUDF.java
**/TestFinish.java
**/TestForEachNestedPlan.java
-**/TestFRJoin.java
-**/TestFRJoinNullValue.java
**/TestGrunt.java
**/TestImplicitSplit.java
**/TestInputOutputMiniClusterFileValidator.java
@@ -29,11 +34,14 @@
**/TestMapReduce.java
**/TestMapSideCogroup.java
**/TestMapReduce2.java
+**/TestMergeJoin.java
**/TestMergeJoinOuter.java
+**/TestNativeMapReduce.java
**/TestNestedForeach.java
**/TestNewPlanImplicitSplit.java
**/TestParser.java
**/TestPigContext.java
+**/TestPigProgressReporting.java
**/TestPigServer.java
**/TestPigServerWithMacros.java
**/TestPigSplit.java
@@ -52,22 +60,14 @@
**/TestStoreInstances.java
**/TestStoreOld.java
**/TestStreaming.java
-**/TestStreamingUDF.java
**/TestToolsPigServer.java
**/TestUDF.java
**/TestUDFContext.java
+**/TestGroupConstParallelTez.java
+**/TestJobSubmissionTez.java
+**/TestLoaderStorerShipCacheFilesTez.java
**/TestSecondarySortTez.java
**/TestTezAutoParallelism.java
**/TestTezCompiler.java
**/TestTezJobControlCompiler.java
**/TestTezLauncher.java
-**/TestAccumuloPigCluster.java
-**/TestBigTypeSort.java
-**/TestCurrentTime.java
-**/TestInvokerGenerator.java
-**/TestGroupConstParallelTez.java
-**/TestJobSubmissionTez.java
-**/TestMergeJoin.java
-**/TestNativeMapReduce.java
-**/TestPigProgressReporting.java
-**/TestLoaderStorerShipCacheFilesTez.java