You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2014/11/01 00:20:56 UTC
svn commit: r1635881 - in /pig/branches/branch-0.14: ./
shims/test/hadoop23/org/apache/pig/test/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/
src/o...
Author: rohini
Date: Fri Oct 31 23:20:55 2014
New Revision: 1635881
URL: http://svn.apache.org/r1635881
Log:
PIG-4259: Fix few issues related to Union, CROSS and auto parallelism in Tez (rohini)
Modified:
pig/branches/branch-0.14/CHANGES.txt
pig/branches/branch-0.14/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java
pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java
pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java
pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java
pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java
pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperPlan.java
pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/CombinerOptimizer.java
pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java
pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
pig/branches/branch-0.14/src/org/apache/pig/impl/builtin/GFCross.java
pig/branches/branch-0.14/src/org/apache/pig/impl/util/Utils.java
pig/branches/branch-0.14/test/e2e/pig/tests/nightly.conf
pig/branches/branch-0.14/test/org/apache/pig/test/Util.java
pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-8-OPTOFF.gld
pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-8.gld
pig/branches/branch-0.14/test/org/apache/pig/tez/TestTezAutoParallelism.java
Modified: pig/branches/branch-0.14/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/CHANGES.txt?rev=1635881&r1=1635880&r2=1635881&view=diff
==============================================================================
--- pig/branches/branch-0.14/CHANGES.txt (original)
+++ pig/branches/branch-0.14/CHANGES.txt Fri Oct 31 23:20:55 2014
@@ -97,6 +97,8 @@ OPTIMIZATIONS
BUG FIXES
+PIG-4259: Fix few issues related to Union, CROSS and auto parallelism in Tez (rohini)
+
PIG-4250: Fix Security Risks found by Coverity (daijy)
PIG-4258: Fix several e2e tests on Windows (daijy)
Modified: pig/branches/branch-0.14/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java?rev=1635881&r1=1635880&r2=1635881&view=diff
==============================================================================
--- pig/branches/branch-0.14/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java (original)
+++ pig/branches/branch-0.14/shims/test/hadoop23/org/apache/pig/test/TezMiniCluster.java Fri Oct 31 23:20:55 2014
@@ -96,8 +96,8 @@ public class TezMiniCluster extends Mini
m_mr_conf = m_mr.getConfig();
m_mr_conf.set(YarnConfiguration.YARN_APPLICATION_CLASSPATH,
System.getProperty("java.class.path"));
- m_mr_conf.set(MRJobConfig.MAP_JAVA_OPTS, "-Xmx2048m");
- m_mr_conf.set(MRJobConfig.REDUCE_JAVA_OPTS, "-Xmx2048m");
+ m_mr_conf.set(MRJobConfig.MAP_JAVA_OPTS, "-Xmx512m");
+ m_mr_conf.set(MRJobConfig.REDUCE_JAVA_OPTS, "-Xmx512m");
Configuration mapred_site = new Configuration(false);
Configuration yarn_site = new Configuration(false);
Modified: pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java?rev=1635881&r1=1635880&r2=1635881&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/PhysicalOperator.java Fri Oct 31 23:20:55 2014
@@ -408,9 +408,9 @@ public abstract class PhysicalOperator e
}
public Result getNextDataBag() throws ExecException {
- Result ret = null;
+ Result val = new Result();
DataBag tmpBag = BagFactory.getInstance().newDefaultBag();
- for (ret = getNextTuple(); ret.returnStatus != POStatus.STATUS_EOP; ret = getNextTuple()) {
+ for (Result ret = getNextTuple(); ret.returnStatus != POStatus.STATUS_EOP; ret = getNextTuple()) {
if (ret.returnStatus == POStatus.STATUS_ERR) {
return ret;
} else if (ret.returnStatus == POStatus.STATUS_NULL) {
@@ -419,9 +419,9 @@ public abstract class PhysicalOperator e
tmpBag.add((Tuple) ret.result);
}
}
- ret.result = tmpBag;
- ret.returnStatus = (tmpBag.size() == 0)? POStatus.STATUS_EOP : POStatus.STATUS_OK;
- return ret;
+ val.result = tmpBag;
+ val.returnStatus = (tmpBag.size() == 0)? POStatus.STATUS_EOP : POStatus.STATUS_OK;
+ return val;
}
public Result getNextBigInteger() throws ExecException {
Modified: pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java?rev=1635881&r1=1635880&r2=1635881&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java Fri Oct 31 23:20:55 2014
@@ -114,14 +114,16 @@ public class POLimit extends PhysicalOpe
}
Result inp = null;
while (true) {
+ // illustrator ignore LIMIT before the post processing
+ if ((illustrator == null || illustrator.getOriginalLimit() != -1) && soFar >= mLimit) {
+ inp = RESULT_EOP;
+ break;
+ }
inp = processInput();
if (inp.returnStatus == POStatus.STATUS_EOP || inp.returnStatus == POStatus.STATUS_ERR)
break;
illustratorMarkup(inp.result, null, 0);
- // illustrator ignore LIMIT before the post processing
- if ((illustrator == null || illustrator.getOriginalLimit() != -1) && soFar>=mLimit)
- inp.returnStatus = POStatus.STATUS_EOP;
soFar++;
break;
Modified: pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java?rev=1635881&r1=1635880&r2=1635881&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java Fri Oct 31 23:20:55 2014
@@ -68,15 +68,36 @@ public class TezJob implements Runnable
private Map<String, Map<String, Map<String, Long>>> vertexCounters;
// Timer for DAG status reporter
private Timer timer;
+ private TezJobConfig tezJobConf;
- public TezJob(TezConfiguration conf, DAG dag, Map<String, LocalResource> requestAMResources)
- throws IOException {
+ public TezJob(TezConfiguration conf, DAG dag,
+ Map<String, LocalResource> requestAMResources,
+ int estimatedTotalParallelism) throws IOException {
this.conf = conf;
this.dag = dag;
this.requestAMResources = requestAMResources;
this.reuseSession = conf.getBoolean(PigConfiguration.TEZ_SESSION_REUSE, true);
this.statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS);
this.vertexCounters = Maps.newHashMap();
+ tezJobConf = new TezJobConfig(estimatedTotalParallelism);
+ }
+
+ static class TezJobConfig {
+
+ private int estimatedTotalParallelism = -1;
+
+ public TezJobConfig(int estimatedTotalParallelism) {
+ this.estimatedTotalParallelism = estimatedTotalParallelism;
+ }
+
+ public int getEstimatedTotalParallelism() {
+ return estimatedTotalParallelism;
+ }
+
+ public void setEstimatedTotalParallelism(int estimatedTotalParallelism) {
+ this.estimatedTotalParallelism = estimatedTotalParallelism;
+ }
+
}
public DAG getDAG() {
@@ -129,7 +150,8 @@ public class TezJob implements Runnable
@Override
public void run() {
try {
- tezClient = TezSessionManager.getClient(conf, requestAMResources, dag.getCredentials());
+ tezClient = TezSessionManager.getClient(conf, requestAMResources,
+ dag.getCredentials(), tezJobConf);
log.info("Submitting DAG " + dag.getName());
dagClient = tezClient.submitDAG(dag);
appId = tezClient.getAppMasterApplicationId();
@@ -145,7 +167,7 @@ public class TezJob implements Runnable
timer = new Timer();
timer.schedule(new DAGStatusReporter(), 1000, conf.getLong(
- PigConfiguration.TEZ_DAG_STATUS_REPORT_INTERVAL, 10) * 1000);
+ PigConfiguration.TEZ_DAG_STATUS_REPORT_INTERVAL, 20) * 1000);
while (true) {
try {
@@ -156,6 +178,7 @@ public class TezJob implements Runnable
}
if (dagStatus.isCompleted()) {
+ log.info("DAG Status: " + dagStatus);
dagCounters = dagStatus.getDAGCounters();
collectVertexCounters();
TezSessionManager.freeSession(tezClient);
@@ -182,9 +205,16 @@ public class TezJob implements Runnable
}
private class DAGStatusReporter extends TimerTask {
+
+ private final String LINE_SEPARATOR = System.getProperty("line.separator");
+
@Override
public void run() {
- log.info("DAG Status: " + dagStatus);
+ String msg = "status=" + dagStatus.getState()
+ + ", progress=" + dagStatus.getDAGProgress()
+ + ", diagnostics="
+ + StringUtils.join(dagStatus.getDiagnostics(), LINE_SEPARATOR);
+ log.info("DAG Status: " + msg);
}
}
@@ -193,6 +223,10 @@ public class TezJob implements Runnable
String name = v.getName();
try {
VertexStatus s = dagClient.getVertexStatus(name, statusGetOpts);
+ if (s == null) {
+ log.info("Cannot retrieve counters for vertex " + name);
+ continue;
+ }
TezCounters counters = s.getVertexCounters();
Map<String, Map<String, Long>> grpCounters = Maps.newHashMap();
Iterator<CounterGroup> grpIt = counters.iterator();
Modified: pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java?rev=1635881&r1=1635880&r2=1635881&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java Fri Oct 31 23:20:55 2014
@@ -107,7 +107,7 @@ public class TezJobCompiler {
log.info("Local resource: " + entry.getKey());
}
DAG tezDag = buildDAG(tezPlan, localResources);
- return new TezJob(tezConf, tezDag, localResources);
+ return new TezJob(tezConf, tezDag, localResources, tezPlan.getEstimatedTotalParallelism());
} catch (Exception e) {
int errCode = 2017;
String msg = "Internal error creating job configuration.";
Modified: pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java?rev=1635881&r1=1635880&r2=1635881&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/TezLauncher.java Fri Oct 31 23:20:55 2014
@@ -367,6 +367,7 @@ public class TezLauncher extends Launche
ParallelismSetter parallelismSetter = new ParallelismSetter(tezPlan, pc);
parallelismSetter.visit();
+ tezPlan.setEstimatedParallelism(parallelismSetter.getEstimatedTotalParallelism());
}
}
Modified: pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java?rev=1635881&r1=1635880&r2=1635881&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java Fri Oct 31 23:20:55 2014
@@ -29,8 +29,10 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.pig.backend.hadoop.executionengine.tez.TezJob.TezJobConfig;
import org.apache.pig.backend.hadoop.executionengine.tez.util.MRToTezHelper;
import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.util.Utils;
import org.apache.tez.client.TezAppMasterStatus;
import org.apache.tez.client.TezClient;
import org.apache.tez.dag.api.TezConfiguration;
@@ -80,9 +82,11 @@ public class TezSessionManager {
private static List<SessionInfo> sessionPool = new ArrayList<SessionInfo>();
private static SessionInfo createSession(Configuration conf,
- Map<String, LocalResource> requestedAMResources, Credentials creds)
- throws TezException, IOException, InterruptedException {
+ Map<String, LocalResource> requestedAMResources, Credentials creds,
+ TezJobConfig tezJobConf) throws TezException, IOException,
+ InterruptedException {
TezConfiguration amConf = MRToTezHelper.getDAGAMConfFromMRConf(conf);
+ adjustAMConfig(amConf, tezJobConf);
String jobName = conf.get(PigContext.JOB_NAME, "pig");
TezClient tezClient = TezClient.create(jobName, amConf, true, requestedAMResources, creds);
tezClient.start();
@@ -94,6 +98,56 @@ public class TezSessionManager {
return new SessionInfo(tezClient, requestedAMResources);
}
+ private static void adjustAMConfig(TezConfiguration amConf, TezJobConfig tezJobConf) {
+ int requiredAMMaxHeap = -1;
+ int requiredAMResourceMB = -1;
+ int configuredAMMaxHeap = Utils.extractHeapSizeInMB(amConf.get(
+ TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS,
+ TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS_DEFAULT));
+ int configuredAMResourceMB = amConf.getInt(
+ TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB,
+ TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB_DEFAULT);
+
+ if (tezJobConf.getEstimatedTotalParallelism() > 0) {
+
+ int minAMMaxHeap = 3584;
+ int minAMResourceMB = 4096;
+
+ // Rough estimation. For 5K tasks 1G Xmx and 1.5G resource.mb
+ // Increment by 512 mb for every additional 5K tasks.
+ for (int taskCount = 30000; taskCount >= 5000; taskCount-=5000) {
+ if (tezJobConf.getEstimatedTotalParallelism() > taskCount) {
+ requiredAMMaxHeap = minAMMaxHeap;
+ requiredAMResourceMB = minAMResourceMB;
+ break;
+ }
+ minAMMaxHeap = minAMMaxHeap - 512;
+ minAMResourceMB = minAMResourceMB - 512;
+ }
+
+ if (requiredAMResourceMB > -1 && configuredAMResourceMB < requiredAMResourceMB) {
+ amConf.setInt(TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB, requiredAMResourceMB);
+ log.info("Increasing "
+ + TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB + " from "
+ + configuredAMResourceMB + " to "
+ + requiredAMResourceMB
+ + " as the number of total estimated tasks is "
+ + tezJobConf.getEstimatedTotalParallelism());
+
+ if (requiredAMMaxHeap > -1 && configuredAMMaxHeap < requiredAMMaxHeap) {
+ amConf.set(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS,
+ amConf.get(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS)
+ + " -Xmx" + requiredAMMaxHeap + "M");
+ log.info("Increasing Tez AM Heap Size from "
+ + configuredAMMaxHeap + "M to "
+ + requiredAMMaxHeap
+ + "M as the number of total estimated tasks is "
+ + tezJobConf.getEstimatedTotalParallelism());
+ }
+ }
+ }
+ }
+
private static boolean validateSessionResources(SessionInfo currentSession,
Map<String, LocalResource> requestedAMResources)
throws TezException, IOException {
@@ -106,7 +160,7 @@ public class TezSessionManager {
}
static TezClient getClient(Configuration conf, Map<String, LocalResource> requestedAMResources,
- Credentials creds) throws TezException, IOException, InterruptedException {
+ Credentials creds, TezJobConfig tezJobConf) throws TezException, IOException, InterruptedException {
List<SessionInfo> sessionsToRemove = new ArrayList<SessionInfo>();
SessionInfo newSession = null;
sessionPoolLock.readLock().lock();
@@ -135,7 +189,7 @@ public class TezSessionManager {
// We cannot find available AM, create new one
// Create session outside of locks so that getClient/freeSession is not
// blocked for parallel embedded pig runs
- newSession = createSession(conf, requestedAMResources, creds);
+ newSession = createSession(conf, requestedAMResources, creds, tezJobConf);
newSession.inUse = true;
sessionPoolLock.writeLock().lock();
try {
Modified: pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java?rev=1635881&r1=1635880&r2=1635881&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezCompiler.java Fri Oct 31 23:20:55 2014
@@ -802,7 +802,7 @@ public class TezCompiler extends PhyPlan
// If the parallelism of the current vertex is one and it doesn't do a LOAD (whose
// parallelism is determined by the InputFormat), we don't need another vertex.
- if (curTezOp.getRequestedParallelism() == 1) {
+ if (curTezOp.getRequestedParallelism() == 1 || curTezOp.isUnion()) {
boolean canStop = true;
for (PhysicalOperator planOp : curTezOp.plan.getRoots()) {
if (planOp instanceof POLoad) {
@@ -811,6 +811,12 @@ public class TezCompiler extends PhyPlan
}
}
if (canStop) {
+ // Let's piggyback on the Union operator. UnionOptimizer
+ // will skip this union operator as it is a waste to
+ // do a vertex group followed by another limit in this case
+ if (curTezOp.isUnion()) {
+ curTezOp.setRequestedParallelism(1);
+ }
curTezOp.setDontEstimateParallelism(true);
if (limitAfterSort) {
curTezOp.markLimitAfterSort();
@@ -916,7 +922,7 @@ public class TezCompiler extends PhyPlan
TezCompilerUtil.setCustomPartitioner(op.getCustomPartitioner(), curTezOp);
curTezOp.setRequestedParallelism(op.getRequestedParallelism());
if (op.isCross()) {
- curTezOp.setCrossKey(op.getOperatorKey().toString());
+ curTezOp.addCrossKey(op.getOperatorKey().toString());
}
phyToTezOpMap.put(op, curTezOp);
} catch (Exception e) {
Modified: pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperPlan.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperPlan.java?rev=1635881&r1=1635880&r2=1635881&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperPlan.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperPlan.java Fri Oct 31 23:20:55 2014
@@ -52,9 +52,19 @@ public class TezOperPlan extends Operato
private Map<String, Path> extraResources = new HashMap<String, Path>();
+ private int estimatedTotalParallelism = -1;
+
public TezOperPlan() {
}
+ public int getEstimatedTotalParallelism() {
+ return estimatedTotalParallelism;
+ }
+
+ public void setEstimatedParallelism(int estimatedTotalParallelism) {
+ this.estimatedTotalParallelism = estimatedTotalParallelism;
+ }
+
@Override
public String toString() {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
Modified: pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java?rev=1635881&r1=1635880&r2=1635881&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/TezOperator.java Fri Oct 31 23:20:55 2014
@@ -74,6 +74,12 @@ public class TezOperator extends Operato
// etc which should always be one
private boolean dontEstimateParallelism = false;
+ // Override user specified intermediate parallelism for cases
+ // like skewed join followed by group by + combiner if estimation is higher
+ // In mapreduce group by + combiner runs in map phase and uses more maps (default 128MB per map)
+ // while skewed join reducers process more (default 1G per reducer) which makes MRR a disadvantage
+ private boolean overrideIntermediateParallelism = false;
+
// This is the parallelism of the vertex, it take account of:
// 1. default_parallel
// 2. -1 parallelism for one_to_one edge
@@ -123,7 +129,7 @@ public class TezOperator extends Operato
// If true, we will use secondary key sort in the job
private boolean useSecondaryKey = false;
- private String crossKey = null;
+ private List<String> crossKeys = null;
private boolean useMRMapSettings = false;
@@ -270,6 +276,15 @@ public class TezOperator extends Operato
this.dontEstimateParallelism = dontEstimateParallelism;
}
+ public boolean isOverrideIntermediateParallelism() {
+ return overrideIntermediateParallelism;
+ }
+
+ public void setOverrideIntermediateParallelism(
+ boolean overrideIntermediateParallelism) {
+ this.overrideIntermediateParallelism = overrideIntermediateParallelism;
+ }
+
public OperatorKey getSplitParent() {
return splitParent;
}
@@ -555,12 +570,15 @@ public class TezOperator extends Operato
return combineSmallSplits;
}
- public void setCrossKey(String key) {
- crossKey = key;
+ public void addCrossKey(String key) {
+ if (crossKeys == null) {
+ crossKeys = new ArrayList<String>();
+ }
+ crossKeys.add(key);
}
- public String getCrossKey() {
- return crossKey;
+ public List<String> getCrossKeys() {
+ return crossKeys;
}
public boolean isUseMRMapSettings() {
Modified: pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/CombinerOptimizer.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/CombinerOptimizer.java?rev=1635881&r1=1635880&r2=1635881&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/CombinerOptimizer.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/CombinerOptimizer.java Fri Oct 31 23:20:55 2014
@@ -93,6 +93,12 @@ public class CombinerOptimizer extends T
PhysicalPlan combinePlan = to.inEdges.get(from.getOperatorKey()).combinePlan;
CombinerOptimizerUtil.addCombiner(rearrangePlan, to.plan, combinePlan, messageCollector, doMapAgg);
+ if(!combinePlan.isEmpty()) {
+ // Override the requested parallelism for intermediate reducers
+ // when combiners are involved so that there are more tasks doing the combine
+ from.setOverrideIntermediateParallelism(true);
+ }
+
}
}
Modified: pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java?rev=1635881&r1=1635880&r2=1635881&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/MultiQueryOptimizerTez.java Fri Oct 31 23:20:55 2014
@@ -178,6 +178,15 @@ public class MultiQueryOptimizerTez exte
}
static public void addSubPlanPropertiesToParent(TezOperator parentOper, TezOperator subPlanOper) {
+ // Copy only map side properties. For eg: crossKeys.
+ // Do not copy reduce side specific properties. For eg: useSecondaryKey, segmentBelow, sortOrder, etc
+ if (subPlanOper.getCrossKeys() != null) {
+ for (String key : subPlanOper.getCrossKeys()) {
+ parentOper.addCrossKey(key);
+ }
+ }
+ parentOper.copyFeatures(subPlanOper, null);
+
if (subPlanOper.getRequestedParallelism() > parentOper.getRequestedParallelism()) {
parentOper.setRequestedParallelism(subPlanOper.getRequestedParallelism());
}
@@ -189,6 +198,5 @@ public class MultiQueryOptimizerTez exte
parentOper.outEdges.put(entry.getKey(), entry.getValue());
}
}
- parentOper.copyFeatures(subPlanOper, null);
}
}
Modified: pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java?rev=1635881&r1=1635880&r2=1635881&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/ParallelismSetter.java Fri Oct 31 23:20:55 2014
@@ -20,6 +20,8 @@ package org.apache.pig.backend.hadoop.ex
import java.util.LinkedList;
import java.util.Map;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.pig.PigConfiguration;
import org.apache.pig.backend.executionengine.ExecException;
@@ -41,10 +43,13 @@ import org.apache.pig.impl.plan.VisitorE
import org.apache.tez.dag.api.EdgeProperty.DataMovementType;
public class ParallelismSetter extends TezOpPlanVisitor {
+
+ private static final Log LOG = LogFactory.getLog(ParallelismSetter.class);
private Configuration conf;
private PigContext pc;
private TezParallelismEstimator estimator;
private boolean autoParallelismEnabled;
+ private int estimatedTotalParallelism = 0;
public ParallelismSetter(TezOperPlan plan, PigContext pigContext) {
super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan));
@@ -63,6 +68,10 @@ public class ParallelismSetter extends T
}
}
+ public int getEstimatedTotalParallelism() {
+ return estimatedTotalParallelism;
+ }
+
@Override
public void visitTezOp(TezOperator tezOp) throws VisitorException {
if (tezOp instanceof NativeTezOper) {
@@ -100,6 +109,7 @@ public class ParallelismSetter extends T
tezOp.setRequestedParallelism(pred.getRequestedParallelism());
tezOp.setEstimatedParallelism(pred.getEstimatedParallelism());
isOneToOneParallelism = true;
+ incrementTotalParallelism(tezOp, parallelism);
parallelism = -1;
}
}
@@ -109,66 +119,84 @@ public class ParallelismSetter extends T
} else if (pc.defaultParallel != -1) {
parallelism = pc.defaultParallel;
}
- if (autoParallelismEnabled &&
- ((parallelism == -1 || intermediateReducer) && !tezOp.isDontEstimateParallelism())) {
+ boolean overrideRequestedParallelism = false;
+ if (parallelism != -1
+ && autoParallelismEnabled
+ && intermediateReducer
+ && !tezOp.isDontEstimateParallelism()
+ && tezOp.isOverrideIntermediateParallelism()) {
+ overrideRequestedParallelism = true;
+ }
+ if (parallelism == -1 || overrideRequestedParallelism) {
if (tezOp.getEstimatedParallelism() == -1) {
// Override user specified parallelism with the estimated value
// if it is intermediate reducer
parallelism = estimator.estimateParallelism(mPlan, tezOp, conf);
- tezOp.setEstimatedParallelism(parallelism);
+ if (overrideRequestedParallelism) {
+ tezOp.setRequestedParallelism(parallelism);
+ } else {
+ tezOp.setEstimatedParallelism(parallelism);
+ }
} else {
parallelism = tezOp.getEstimatedParallelism();
}
if (tezOp.isGlobalSort() || tezOp.isSkewedJoin()) {
- // Vertex manager will set parallelism
- parallelism = -1;
+ if (!overrideRequestedParallelism) {
+ incrementTotalParallelism(tezOp, parallelism);
+ // PartitionerDefinedVertexManager will determine parallelism.
+ // So call setVertexParallelism with -1
+ // setEstimatedParallelism still needs to have some positive value
+ // so that TezDAGBuilder sets the PartitionerDefinedVertexManager
+ parallelism = -1;
+ } else {
+ // We are overriding the parallelism. We need to update the
+ // Constant value in sampleAggregator to same parallelism
+ // Currently will happen when you have orderby or
+ // skewed join followed by group by with combiner
+ for (TezOperator pred : mPlan.getPredecessors(tezOp)) {
+ if (pred.isSampleBasedPartitioner()) {
+ for (TezOperator partitionerPred : mPlan.getPredecessors(pred)) {
+ if (partitionerPred.isSampleAggregation()) {
+ LOG.debug("Updating constant value to " + parallelism + " in " + partitionerPred.plan);
+ LOG.info("Increased requested parallelism of " + partitionerPred.getOperatorKey() + " to " + parallelism);
+ ParallelConstantVisitor visitor =
+ new ParallelConstantVisitor(partitionerPred.plan, parallelism);
+ visitor.visit();
+ break;
+ }
+ }
+ break;
+ }
+ }
+ }
}
}
}
}
- // Once we decide the parallelism of the sampler, propagate to
- // downstream operators if necessary
- if (tezOp.isSampler() && autoParallelismEnabled) {
- // There could be multiple sampler and share the same sample aggregation job
- // and partitioner job
- TezOperator sampleAggregationOper = null;
- TezOperator sampleBasedPartionerOper = null;
- TezOperator sortOper = null;
- for (TezOperator succ : mPlan.getSuccessors(tezOp)) {
- if (succ.isVertexGroup()) {
- succ = mPlan.getSuccessors(succ).get(0);
- }
- if (succ.isSampleAggregation()) {
- sampleAggregationOper = succ;
- } else if (succ.isSampleBasedPartitioner()) {
- sampleBasedPartionerOper = succ;
- }
- }
- sortOper = mPlan.getSuccessors(sampleBasedPartionerOper).get(0);
-
- if ((sortOper.getRequestedParallelism() == -1 && pc.defaultParallel == -1) || TezCompilerUtil.isIntermediateReducer(sortOper)) {
- // set estimate parallelism for order by/skewed join to sampler parallelism
- // that include:
- // 1. sort operator
- // 2. constant for sample aggregation oper
- sortOper.setEstimatedParallelism(parallelism);
- ParallelConstantVisitor visitor =
- new ParallelConstantVisitor(sampleAggregationOper.plan, parallelism);
- visitor.visit();
- sampleAggregationOper.setNeedEstimatedQuantile(true);
- }
- }
-
+ incrementTotalParallelism(tezOp, parallelism);
tezOp.setVertexParallelism(parallelism);
- if (tezOp.getCrossKey()!=null) {
- pc.getProperties().put(PigImplConstants.PIG_CROSS_PARALLELISM + "." + tezOp.getCrossKey(),
- Integer.toString(tezOp.getVertexParallelism()));
+ // TODO: Fix case where vertex parallelism is -1 for auto parallelism with PartitionerDefinedVertexManager.
+ // i.e order by or skewed join followed by cross
+ if (tezOp.getCrossKeys() != null) {
+ for (String key : tezOp.getCrossKeys()) {
+ pc.getProperties().put(PigImplConstants.PIG_CROSS_PARALLELISM + "." + key,
+ Integer.toString(tezOp.getVertexParallelism()));
+ }
}
} catch (Exception e) {
throw new VisitorException(e);
}
}
+ private void incrementTotalParallelism(TezOperator tezOp, int tezOpParallelism) {
+ if (tezOp.isVertexGroup()) {
+ return;
+ }
+ if (tezOpParallelism != -1) {
+ estimatedTotalParallelism += tezOpParallelism;
+ }
+ }
+
}
Modified: pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java?rev=1635881&r1=1635880&r2=1635881&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/TezOperDependencyParallelismEstimator.java Fri Oct 31 23:20:55 2014
@@ -65,8 +65,6 @@ public class TezOperDependencyParallelis
static final double DEFAULT_FILTER_FACTOR = 0.7;
static final double DEFAULT_LIMIT_FACTOR = 0.1;
- static final int DEFAULT_MAX_INTERMEDIATE_REDUCER_COUNT_PARAM = 2999;
-
private PigContext pc;
@Override
@@ -83,6 +81,8 @@ public class TezOperDependencyParallelis
boolean intermediateReducer = TezCompilerUtil.isIntermediateReducer(tezOper);
+ // TODO: If map opts and reduce opts are same estimate higher parallelism
+ // for tasks based on the count of number of map tasks else be conservative as now
maxTaskCount = conf.getInt(PigReducerEstimator.MAX_REDUCER_COUNT_PARAM,
PigReducerEstimator.DEFAULT_MAX_REDUCER_COUNT_PARAM);
@@ -117,7 +117,10 @@ public class TezOperDependencyParallelis
+ ", effective parallelism for predecessor " + tezOper.getOperatorKey().toString()
+ " is -1");
}
- if (pred.plan!=null) { // pred.plan can be null if it is a VertexGroup
+
+ //For cases like Union we can just limit to sum of pred vertices parallelism
+ boolean applyFactor = !tezOper.isUnion();
+ if (pred.plan!=null && applyFactor) { // pred.plan can be null if it is a VertexGroup
TezParallelismFactorVisitor parallelismFactorVisitor = new TezParallelismFactorVisitor(pred.plan, tezOper.getOperatorKey().toString());
parallelismFactorVisitor.visit();
predParallelism = predParallelism * parallelismFactorVisitor.getFactor();
@@ -128,17 +131,18 @@ public class TezOperDependencyParallelis
int roundedEstimatedParallelism = (int)Math.ceil(estimatedParallelism);
- if (intermediateReducer) {
+ if (intermediateReducer && tezOper.isOverrideIntermediateParallelism()) {
// Estimated reducers should not be more than the configured limit
- roundedEstimatedParallelism = Math.min(roundedEstimatedParallelism, Math.max(DEFAULT_MAX_INTERMEDIATE_REDUCER_COUNT_PARAM, maxTaskCount));
+ roundedEstimatedParallelism = Math.min(roundedEstimatedParallelism, maxTaskCount);
int userSpecifiedParallelism = pc.defaultParallel;
if (tezOper.getRequestedParallelism() != -1) {
userSpecifiedParallelism = tezOper.getRequestedParallelism();
}
int intermediateParallelism = Math.max(userSpecifiedParallelism, roundedEstimatedParallelism);
- if (userSpecifiedParallelism != -1 && intermediateParallelism > (2 * userSpecifiedParallelism)) {
+ if (userSpecifiedParallelism != -1 &&
+ (intermediateParallelism > 200 && intermediateParallelism > (2 * userSpecifiedParallelism))) {
// Estimated reducers shall not be more than 2x of requested parallelism
- // when we are overriding user specified values
+ // if greater than 200 and we are overriding user specified values
intermediateParallelism = 2 * userSpecifiedParallelism;
}
roundedEstimatedParallelism = intermediateParallelism;
Modified: pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java?rev=1635881&r1=1635880&r2=1635881&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/optimizer/UnionOptimizer.java Fri Oct 31 23:20:55 2014
@@ -73,6 +73,10 @@ public class UnionOptimizer extends TezO
return;
}
+ if((tezOp.isLimit() || tezOp.isLimitAfterSort()) && tezOp.getRequestedParallelism() == 1) {
+ return;
+ }
+
TezOperator unionOp = tezOp;
String unionOpKey = unionOp.getOperatorKey().toString();
String scope = unionOp.getOperatorKey().scope;
@@ -264,9 +268,16 @@ public class UnionOptimizer extends TezO
}
private void copyOperatorProperties(TezOperator pred, TezOperator unionOp) {
- pred.setUseSecondaryKey(unionOp.isUseSecondaryKey());
pred.UDFs.addAll(unionOp.UDFs);
pred.scalars.addAll(unionOp.scalars);
+ // Copy only map side properties. For eg: crossKeys.
+ // Do not copy reduce side specific properties. For eg: useSecondaryKey, segmentBelow, sortOrder, etc
+ // Also ignore parallelism settings
+ if (unionOp.getCrossKeys() != null) {
+ for (String key : unionOp.getCrossKeys()) {
+ pred.addCrossKey(key);
+ }
+ }
pred.copyFeatures(unionOp, Arrays.asList(new OPER_FEATURE[]{OPER_FEATURE.UNION}));
}
Modified: pig/branches/branch-0.14/src/org/apache/pig/impl/builtin/GFCross.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/impl/builtin/GFCross.java?rev=1635881&r1=1635880&r2=1635881&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/impl/builtin/GFCross.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/impl/builtin/GFCross.java Fri Oct 31 23:20:55 2014
@@ -60,6 +60,9 @@ public class GFCross extends EvalFunc<Da
throw new IOException("Unable to get parallelism hint from job conf");
}
parallelism = Integer.valueOf(s);
+ if (parallelism < 0) {
+ throw new IOException(PigImplConstants.PIG_CROSS_PARALLELISM + "." + crossKey + " was " + parallelism);
+ }
}
numInputs = (Integer)input.get(0);
Modified: pig/branches/branch-0.14/src/org/apache/pig/impl/util/Utils.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/src/org/apache/pig/impl/util/Utils.java?rev=1635881&r1=1635880&r2=1635881&view=diff
==============================================================================
--- pig/branches/branch-0.14/src/org/apache/pig/impl/util/Utils.java (original)
+++ pig/branches/branch-0.14/src/org/apache/pig/impl/util/Utils.java Fri Oct 31 23:20:55 2014
@@ -23,10 +23,8 @@ import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
-import java.io.OutputStream;
import java.io.PrintStream;
import java.io.SequenceInputStream;
-import java.net.URL;
import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
@@ -42,12 +40,10 @@ import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.BZip2Codec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.JobConf;
@@ -65,7 +61,6 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.PigImplConstants;
import org.apache.pig.impl.io.InterStorage;
import org.apache.pig.impl.io.ReadToEndLoader;
@@ -85,13 +80,15 @@ import com.google.common.primitives.Long
*/
public class Utils {
private static final Log log = LogFactory.getLog(Utils.class);
-
+ private static final Pattern JAVA_MAXHEAPSIZE_PATTERN = Pattern.compile("-Xmx(([0-9]+)[mMgG])");
+
+
/**
* This method checks whether JVM vendor is IBM
* @return true if IBM JVM is being used
* false otherwise
*/
- public static boolean isVendorIBM() {
+ public static boolean isVendorIBM() {
return System.getProperty("java.vendor").contains("IBM");
}
@@ -251,7 +248,7 @@ public class Utils {
}
public static LogicalSchema parseSchema(String schemaString) throws ParserException {
- QueryParserDriver queryParser = new QueryParserDriver( new PigContext(),
+ QueryParserDriver queryParser = new QueryParserDriver( new PigContext(),
"util", new HashMap<String, String>() ) ;
LogicalSchema schema = queryParser.parseSchema(schemaString);
return schema;
@@ -262,7 +259,7 @@ public class Utils {
* field. This will be called only when PigStorage is invoked with
* '-tagFile' or '-tagPath' option and the schema file is present to be
* loaded.
- *
+ *
* @param schema
* @param fieldName
* @return ResourceSchema
@@ -396,7 +393,7 @@ public class Utils {
} else if (TEMPFILE_STORAGE.TFILE.lowerName().equals(tmpFileCompressionStorage)) {
return TEMPFILE_STORAGE.TFILE;
} else {
- throw new IllegalArgumentException("Unsupported storage format " + tmpFileCompressionStorage +
+ throw new IllegalArgumentException("Unsupported storage format " + tmpFileCompressionStorage +
". Should be one of " + Arrays.toString(TEMPFILE_STORAGE.values()));
}
}
@@ -595,7 +592,7 @@ public class Utils {
// substitute
eval = eval.substring(0, match.start())+val+eval.substring(match.end());
}
- throw new IllegalStateException("Variable substitution depth too large: "
+ throw new IllegalStateException("Variable substitution depth too large: "
+ MAX_SUBST + " " + expr);
}
@@ -661,4 +658,27 @@ public class Utils {
return null;
}
+
+ public static int extractHeapSizeInMB(String input) {
+ int ret = 0;
+ if(input == null || input.equals(""))
+ return ret;
+ Matcher m = JAVA_MAXHEAPSIZE_PATTERN.matcher(input);
+ String heapStr = null;
+ String heapNum = null;
+ // Grabs the last match which takes effect (in case that multiple Xmx options specified)
+ while (m.find()) {
+ heapStr = m.group(1);
+ heapNum = m.group(2);
+ }
+ if (heapStr != null) {
+ // when Xmx specified in Gigabyte
+ if(heapStr.endsWith("g") || heapStr.endsWith("G")) {
+ ret = Integer.parseInt(heapNum) * 1024;
+ } else {
+ ret = Integer.parseInt(heapNum);
+ }
+ }
+ return ret;
+ }
}
Modified: pig/branches/branch-0.14/test/e2e/pig/tests/nightly.conf
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/test/e2e/pig/tests/nightly.conf?rev=1635881&r1=1635880&r2=1635881&view=diff
==============================================================================
--- pig/branches/branch-0.14/test/e2e/pig/tests/nightly.conf (original)
+++ pig/branches/branch-0.14/test/e2e/pig/tests/nightly.conf Fri Oct 31 23:20:55 2014
@@ -1414,6 +1414,7 @@ b = load ':INPATH:/singlefile/studentcol
c = union a, b;
d = order c by name PARALLEL 2;
store d into ':OUTPATH:';\,
+ 'sortArgs' => ['-t', ' ', '-k', '1,1'],
},
{
'num' => 5,
@@ -1504,6 +1505,38 @@ e = load ':INPATH:/singlefile/votertab10
f = join d by group, e by name using 'replicated';
store f into ':OUTPATH:';\,
},
+ { ## Secondary Key
+ 'num' => 14,
+ 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name:chararray, age, gpa);
+b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name:chararray, age, gpa);
+c = group a by name;
+d = foreach c {
+ sorted = order a by name,age;
+ lmt = limit sorted 1;
+ generate lmt as c1;
+};
+e = foreach d generate flatten(c1) as (name:chararray, age, gpa);
+f = group b by name;
+g = foreach f {
+ sorted = order b by name,age;
+ lmt = limit sorted 1;
+ generate lmt as f1;
+};
+h = foreach g generate flatten(f1) as (name:chararray, age, gpa);
+i = union e, h;
+j = order i by name parallel 1;
+store j into ':OUTPATH:';\,
+ 'sortArgs' => ['-t', ' ', '-k', '1,1'],
+ },
+ {
+ 'num' => 15,
+ 'pig' => q\a = load ':INPATH:/singlefile/studenttab10k' as (name, age, gpa:float);
+b = load ':INPATH:/singlefile/studentcolon10k' using PigStorage(':') as (name, age, gpa:float);
+c = filter a by gpa >= 4;
+d = cross a, c;
+e = union b, d;
+store e into ':OUTPATH:';\,
+ },
]
},
{
Modified: pig/branches/branch-0.14/test/org/apache/pig/test/Util.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/test/org/apache/pig/test/Util.java?rev=1635881&r1=1635880&r2=1635881&view=diff
==============================================================================
--- pig/branches/branch-0.14/test/org/apache/pig/test/Util.java (original)
+++ pig/branches/branch-0.14/test/org/apache/pig/test/Util.java Fri Oct 31 23:20:55 2014
@@ -37,6 +37,7 @@ import java.io.OutputStreamWriter;
import java.io.PrintStream;
import java.io.PrintWriter;
import java.io.StringReader;
+import java.io.Writer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -53,10 +54,13 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
+import org.apache.log4j.Appender;
import org.apache.log4j.FileAppender;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
+import org.apache.log4j.PatternLayout;
import org.apache.log4j.SimpleLayout;
+import org.apache.log4j.WriterAppender;
import org.apache.pig.ExecType;
import org.apache.pig.ExecTypeProvider;
import org.apache.pig.LoadCaster;
@@ -1367,4 +1371,18 @@ public class Util {
return ExecTypeProvider.fromString("local");
}
}
+
+ public static void createLogAppender(Class clazz, String appenderName, Writer writer) {
+ Logger logger = Logger.getLogger(clazz);
+ WriterAppender writerAppender = new WriterAppender(new PatternLayout("%d [%t] %-5p %c %x - %m%n"), writer);
+ writerAppender.setName(appenderName);
+ logger.addAppender(writerAppender);
+ }
+
+ public static void removeLogAppender(Class clazz, String appenderName) {
+ Logger logger = Logger.getLogger(clazz);
+ Appender appender = logger.getAppender(appenderName);
+ appender.close();
+ logger.removeAppender(appenderName);
+ }
}
Modified: pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-8-OPTOFF.gld
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-8-OPTOFF.gld?rev=1635881&r1=1635880&r2=1635881&view=diff
==============================================================================
--- pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-8-OPTOFF.gld (original)
+++ pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-8-OPTOFF.gld Fri Oct 31 23:20:55 2014
@@ -2,78 +2,70 @@
# There are 1 DAGs in the session
#--------------------------------------------------
#--------------------------------------------------
-# TEZ DAG plan: scope-88
+# TEZ DAG plan: scope-75
#--------------------------------------------------
-Tez vertex scope-70 -> Tez vertex scope-72,
-Tez vertex scope-72 -> Tez vertex scope-80,
-Tez vertex scope-75 -> Tez vertex scope-77,
-Tez vertex scope-77 -> Tez vertex scope-80,
-Tez vertex scope-80 -> Tez vertex scope-85,
-Tez vertex scope-85
+Tez vertex scope-61 -> Tez vertex scope-63,
+Tez vertex scope-63 -> Tez vertex scope-71,
+Tez vertex scope-66 -> Tez vertex scope-68,
+Tez vertex scope-68 -> Tez vertex scope-71,
+Tez vertex scope-71
-Tez vertex scope-70
+Tez vertex scope-61
# Plan on vertex
-POValueOutputTez - scope-71 -> [scope-72]
+POValueOutputTez - scope-62 -> [scope-63]
|
-|---Limit - scope-48
+|---Limit - scope-39
|
- |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-47
-Tez vertex scope-72
+ |---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-38
+Tez vertex scope-63
# Plan on vertex
-POValueOutputTez - scope-82 -> [scope-80]
+POValueOutputTez - scope-73 -> [scope-71]
|
-|---a: New For Each(false,false)[bag] - scope-56
+|---a: New For Each(false,false)[bag] - scope-47
| |
- | Cast[int] - scope-51
+ | Cast[int] - scope-42
| |
- | |---Project[bytearray][0] - scope-50
+ | |---Project[bytearray][0] - scope-41
| |
- | Cast[chararray] - scope-54
+ | Cast[chararray] - scope-45
| |
- | |---Project[bytearray][1] - scope-53
+ | |---Project[bytearray][1] - scope-44
|
- |---Limit - scope-49
+ |---Limit - scope-40
|
- |---Limit - scope-74
+ |---Limit - scope-65
|
- |---POValueInputTez - scope-73 <- scope-70
-Tez vertex scope-75
+ |---POValueInputTez - scope-64 <- scope-61
+Tez vertex scope-66
# Plan on vertex
-POValueOutputTez - scope-76 -> [scope-77]
+POValueOutputTez - scope-67 -> [scope-68]
|
-|---Limit - scope-58
+|---Limit - scope-49
|
- |---b: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-57
-Tez vertex scope-77
+ |---b: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-48
+Tez vertex scope-68
# Plan on vertex
-POValueOutputTez - scope-83 -> [scope-80]
+POValueOutputTez - scope-74 -> [scope-71]
|
-|---c: New For Each(false,false)[bag] - scope-66
+|---c: New For Each(false,false)[bag] - scope-57
| |
- | Cast[int] - scope-61
+ | Cast[int] - scope-52
| |
- | |---Project[bytearray][1] - scope-60
+ | |---Project[bytearray][1] - scope-51
| |
- | Cast[chararray] - scope-64
+ | Cast[chararray] - scope-55
| |
- | |---Project[bytearray][0] - scope-63
+ | |---Project[bytearray][0] - scope-54
|
- |---Limit - scope-59
+ |---Limit - scope-50
|
- |---Limit - scope-79
+ |---Limit - scope-70
|
- |---POValueInputTez - scope-78 <- scope-75
-Tez vertex scope-80
+ |---POValueInputTez - scope-69 <- scope-66
+Tez vertex scope-71
# Plan on vertex
-POValueOutputTez - scope-84 -> [scope-85]
+d: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-60
|
-|---d: Limit - scope-68
+|---d: Limit - scope-59
|
- |---POShuffledValueInputTez - scope-81 <- [scope-77, scope-72]
-Tez vertex scope-85
-# Plan on vertex
-d: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-69
-|
-|---d: Limit - scope-87
- |
- |---POValueInputTez - scope-86 <- scope-80
+ |---POShuffledValueInputTez - scope-72 <- [scope-68, scope-63]
Modified: pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-8.gld
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-8.gld?rev=1635881&r1=1635880&r2=1635881&view=diff
==============================================================================
--- pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-8.gld (original)
+++ pig/branches/branch-0.14/test/org/apache/pig/test/data/GoldenFiles/tez/TEZC-Union-8.gld Fri Oct 31 23:20:55 2014
@@ -2,14 +2,13 @@
# There are 1 DAGs in the session
#--------------------------------------------------
#--------------------------------------------------
-# TEZ DAG plan: scope-41
+# TEZ DAG plan: scope-37
#--------------------------------------------------
Tez vertex scope-23 -> Tez vertex scope-25,
-Tez vertex scope-25 -> Tez vertex group scope-42,
+Tez vertex scope-25 -> Tez vertex scope-33,
Tez vertex scope-28 -> Tez vertex scope-30,
-Tez vertex scope-30 -> Tez vertex group scope-42,
-Tez vertex group scope-42 -> Tez vertex scope-38,
-Tez vertex scope-38
+Tez vertex scope-30 -> Tez vertex scope-33,
+Tez vertex scope-33
Tez vertex scope-23
# Plan on vertex
@@ -20,25 +19,23 @@ POValueOutputTez - scope-24 -> [scope-2
|---a: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-0
Tez vertex scope-25
# Plan on vertex
-POValueOutputTez - scope-43 -> [scope-38]
+POValueOutputTez - scope-35 -> [scope-33]
|
-|---d: Limit - scope-44
+|---a: New For Each(false,false)[bag] - scope-9
+ | |
+ | Cast[int] - scope-4
+ | |
+ | |---Project[bytearray][0] - scope-3
+ | |
+ | Cast[chararray] - scope-7
+ | |
+ | |---Project[bytearray][1] - scope-6
|
- |---a: New For Each(false,false)[bag] - scope-9
- | |
- | Cast[int] - scope-4
- | |
- | |---Project[bytearray][0] - scope-3
- | |
- | Cast[chararray] - scope-7
- | |
- | |---Project[bytearray][1] - scope-6
+ |---Limit - scope-2
|
- |---Limit - scope-2
+ |---Limit - scope-27
|
- |---Limit - scope-27
- |
- |---POValueInputTez - scope-26 <- scope-23
+ |---POValueInputTez - scope-26 <- scope-23
Tez vertex scope-28
# Plan on vertex
POValueOutputTez - scope-29 -> [scope-30]
@@ -48,31 +45,27 @@ POValueOutputTez - scope-29 -> [scope-3
|---b: Load(file:///tmp/input:org.apache.pig.builtin.PigStorage) - scope-10
Tez vertex scope-30
# Plan on vertex
-POValueOutputTez - scope-45 -> [scope-38]
+POValueOutputTez - scope-36 -> [scope-33]
|
-|---d: Limit - scope-46
+|---c: New For Each(false,false)[bag] - scope-19
+ | |
+ | Cast[int] - scope-14
+ | |
+ | |---Project[bytearray][1] - scope-13
+ | |
+ | Cast[chararray] - scope-17
+ | |
+ | |---Project[bytearray][0] - scope-16
|
- |---c: New For Each(false,false)[bag] - scope-19
- | |
- | Cast[int] - scope-14
- | |
- | |---Project[bytearray][1] - scope-13
- | |
- | Cast[chararray] - scope-17
- | |
- | |---Project[bytearray][0] - scope-16
+ |---Limit - scope-12
|
- |---Limit - scope-12
+ |---Limit - scope-32
|
- |---Limit - scope-32
- |
- |---POValueInputTez - scope-31 <- scope-28
-Tez vertex group scope-42 <- [scope-25, scope-30] -> scope-38
-# No plan on vertex group
-Tez vertex scope-38
+ |---POValueInputTez - scope-31 <- scope-28
+Tez vertex scope-33
# Plan on vertex
d: Store(file:///tmp/output:org.apache.pig.builtin.PigStorage) - scope-22
|
-|---d: Limit - scope-40
+|---d: Limit - scope-21
|
- |---POValueInputTez - scope-39 <- scope-42
+ |---POShuffledValueInputTez - scope-34 <- [scope-30, scope-25]
Modified: pig/branches/branch-0.14/test/org/apache/pig/tez/TestTezAutoParallelism.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.14/test/org/apache/pig/tez/TestTezAutoParallelism.java?rev=1635881&r1=1635880&r2=1635881&view=diff
==============================================================================
--- pig/branches/branch-0.14/test/org/apache/pig/tez/TestTezAutoParallelism.java (original)
+++ pig/branches/branch-0.14/test/org/apache/pig/tez/TestTezAutoParallelism.java Fri Oct 31 23:20:55 2014
@@ -18,10 +18,15 @@
package org.apache.pig.tez;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.Iterator;
+import java.util.List;
import java.util.Properties;
import java.util.Random;
@@ -33,6 +38,9 @@ import org.apache.pig.PigConfiguration;
import org.apache.pig.PigServer;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.InputSizeReducerEstimator;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.optimizer.ParallelismSetter;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.test.MiniGenericCluster;
import org.apache.pig.test.Util;
import org.junit.After;
@@ -98,7 +106,7 @@ public class TestTezAutoParallelism {
}
w.close();
Util.copyFromLocalToCluster(cluster, INPUT_DIR + "/" + INPUT_FILE1, INPUT_FILE1);
-
+
w = new PrintWriter(new FileWriter(INPUT_DIR + "/" + INPUT_FILE2));
for (String name : boyNames) {
w.println(name + "\t" + "M");
@@ -119,13 +127,14 @@ public class TestTezAutoParallelism {
// parallelism is 3 originally, reduce to 1
pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
- pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
+ pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
Long.toString(InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER));
pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);");
pigServer.registerQuery("B = group A by name;");
pigServer.store("B", "output1");
FileSystem fs = cluster.getFileSystem();
FileStatus[] files = fs.listStatus(new Path("output1"), new PathFilter(){
+ @Override
public boolean accept(Path path) {
if (path.getName().startsWith("part")) {
return true;
@@ -141,7 +150,7 @@ public class TestTezAutoParallelism {
// order by parallelism is 3 originally, reduce to 1
pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
- pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
+ pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
Long.toString(InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER));
pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);");
pigServer.registerQuery("B = group A by name parallel 3;");
@@ -150,6 +159,7 @@ public class TestTezAutoParallelism {
pigServer.store("D", "output2");
FileSystem fs = cluster.getFileSystem();
FileStatus[] files = fs.listStatus(new Path("output2"), new PathFilter(){
+ @Override
public boolean accept(Path path) {
if (path.getName().startsWith("part")) {
return true;
@@ -173,6 +183,7 @@ public class TestTezAutoParallelism {
pigServer.store("D", "output3");
FileSystem fs = cluster.getFileSystem();
FileStatus[] files = fs.listStatus(new Path("output3"), new PathFilter(){
+ @Override
public boolean accept(Path path) {
if (path.getName().startsWith("part")) {
return true;
@@ -188,7 +199,7 @@ public class TestTezAutoParallelism {
// skewed join parallelism is 4 originally, reduce to 1
pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
- pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
+ pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM,
Long.toString(InputSizeReducerEstimator.DEFAULT_BYTES_PER_REDUCER));
pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);");
pigServer.registerQuery("B = load '" + INPUT_FILE2 + "' as (name:chararray, gender:chararray);");
@@ -196,6 +207,7 @@ public class TestTezAutoParallelism {
pigServer.store("C", "output4");
FileSystem fs = cluster.getFileSystem();
FileStatus[] files = fs.listStatus(new Path("output4"), new PathFilter(){
+ @Override
public boolean accept(Path path) {
if (path.getName().startsWith("part")) {
return true;
@@ -218,6 +230,7 @@ public class TestTezAutoParallelism {
pigServer.store("C", "output5");
FileSystem fs = cluster.getFileSystem();
FileStatus[] files = fs.listStatus(new Path("output5"), new PathFilter(){
+ @Override
public boolean accept(Path path) {
if (path.getName().startsWith("part")) {
return true;
@@ -227,4 +240,38 @@ public class TestTezAutoParallelism {
});
assertEquals(files.length, 4);
}
+
+ @Test
+ public void testSkewedJoinIncreaseIntermediateParallelism() throws IOException{
+ NodeIdGenerator.reset();
+ PigServer.resetScope();
+ StringWriter writer = new StringWriter();
+ // When there is a combiner operation involved user specified parallelism is overriden
+ Util.createLogAppender(ParallelismSetter.class, "testSkewedJoinIncreaseIntermediateParallelism", writer);
+ try {
+ pigServer.getPigContext().getProperties().setProperty(PigConfiguration.PIG_NO_SPLIT_COMBINATION, "true");
+ pigServer.getPigContext().getProperties().setProperty(MRConfiguration.MAX_SPLIT_SIZE, "3000");
+ pigServer.getPigContext().getProperties().setProperty(InputSizeReducerEstimator.BYTES_PER_REDUCER_PARAM, "80000");
+ pigServer.registerQuery("A = load '" + INPUT_FILE1 + "' as (name:chararray, age:int);");
+ pigServer.registerQuery("B = load '" + INPUT_FILE2 + "' as (name:chararray, gender:chararray);");
+ pigServer.registerQuery("C = join A by name, B by name using 'skewed' parallel 1;");
+ pigServer.registerQuery("D = group C by A::name;");
+ pigServer.registerQuery("E = foreach D generate group, COUNT(C.A::name);");
+ Iterator<Tuple> iter = pigServer.openIterator("E");
+ List<Tuple> expectedResults = Util
+ .getTuplesFromConstantTupleStrings(new String[] {
+ "('Abigail',56L)", "('Alexander',45L)", "('Ava',60L)",
+ "('Daniel',68L)", "('Elizabeth',42L)",
+ "('Emily',57L)", "('Emma',50L)", "('Ethan',50L)",
+ "('Isabella',43L)", "('Jacob',43L)", "('Jayden',59L)",
+ "('Liam',46L)", "('Madison',46L)", "('Mason',54L)",
+ "('Mia',51L)", "('Michael',47L)", "('Noah',38L)",
+ "('Olivia',50L)", "('Sophia',52L)", "('William',43L)" });
+
+ Util.checkQueryOutputsAfterSort(iter, expectedResults);
+ assertTrue(writer.toString().contains("Increased requested parallelism of scope-40 to 4"));
+ } finally {
+ Util.removeLogAppender(ParallelismSetter.class, "testSkewedJoinIncreaseIntermediateParallelism");
+ }
+ }
}