You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2016/04/12 01:37:24 UTC
svn commit: r1738666 - in /pig/trunk: ./
src/org/apache/pig/backend/hadoop/executionengine/tez/
src/org/apache/pig/impl/util/
Author: rohini
Date: Mon Apr 11 23:37:24 2016
New Revision: 1738666
URL: http://svn.apache.org/viewvc?rev=1738666&view=rev
Log:
PIG-4844: Tez AM runs out of memory when vertex has high number of outputs (rohini)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java
pig/trunk/src/org/apache/pig/impl/util/Utils.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1738666&r1=1738665&r2=1738666&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Apr 11 23:37:24 2016
@@ -107,6 +107,8 @@ PIG-4639: Add better parser for Apache H
BUG FIXES
+PIG-4844: Tez AM runs out of memory when vertex has high number of outputs (rohini)
+
PIG-3906: ant site errors out (nielsbasjes via daijy)
PIG-4851: Null not padded when input has less fields than declared schema for some loader (rohini)
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java?rev=1738666&r1=1738665&r2=1738666&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java Mon Apr 11 23:37:24 2016
@@ -30,6 +30,11 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.pig.PigConfiguration;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOpPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.UDFContext;
import org.apache.pig.tools.pigstats.tez.TezPigScriptStats;
import org.apache.tez.client.TezClient;
@@ -69,31 +74,71 @@ public class TezJob implements Runnable
public TezJob(TezConfiguration conf, DAG dag,
Map<String, LocalResource> requestAMResources,
- int estimatedTotalParallelism) throws IOException {
+ TezOperPlan tezPlan) throws IOException {
this.conf = conf;
this.dag = dag;
this.requestAMResources = requestAMResources;
this.reuseSession = conf.getBoolean(PigConfiguration.PIG_TEZ_SESSION_REUSE, true);
this.statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS);
- tezJobConf = new TezJobConfig(estimatedTotalParallelism);
+ tezJobConf = new TezJobConfig(tezPlan);
}
static class TezJobConfig {
private int estimatedTotalParallelism = -1;
+ private int maxOutputsinSingleVertex;
+ private int totalVertices = 0;
- public TezJobConfig(int estimatedTotalParallelism) {
- this.estimatedTotalParallelism = estimatedTotalParallelism;
+ public TezJobConfig(TezOperPlan tezPlan) throws VisitorException {
+ this.estimatedTotalParallelism = tezPlan.getEstimatedTotalParallelism();
+ MaxOutputsFinder finder = new MaxOutputsFinder(tezPlan);
+ finder.visit();
+ this.maxOutputsinSingleVertex = finder.getMaxOutputsinSingleVertex();
+ this.totalVertices = finder.getTotalVertices();
}
public int getEstimatedTotalParallelism() {
return estimatedTotalParallelism;
}
- public void setEstimatedTotalParallelism(int estimatedTotalParallelism) {
- this.estimatedTotalParallelism = estimatedTotalParallelism;
+ public int getMaxOutputsinSingleVertex() {
+ return maxOutputsinSingleVertex;
}
+ public int getTotalVertices() {
+ return totalVertices;
+ }
+
+ }
+
+ private static class MaxOutputsFinder extends TezOpPlanVisitor {
+
+ private int maxOutputsinSingleVertex = 1;
+ private int totalVertices = 0;
+
+ public MaxOutputsFinder(TezOperPlan plan) {
+ super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan));
+ }
+
+ public int getMaxOutputsinSingleVertex() {
+ return maxOutputsinSingleVertex;
+ }
+
+ public int getTotalVertices() {
+ return totalVertices;
+ }
+
+ @Override
+ public void visitTezOp(TezOperator tezOperator) throws VisitorException {
+ if (!tezOperator.isVertexGroup()) {
+ totalVertices++;
+ int outputs = tezOperator.outEdges.keySet().size();
+ maxOutputsinSingleVertex = maxOutputsinSingleVertex > outputs ? maxOutputsinSingleVertex : outputs;
+ }
+ }
+
+
+
}
public DAG getDAG() {
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java?rev=1738666&r1=1738665&r2=1738666&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java Mon Apr 11 23:37:24 2016
@@ -108,7 +108,7 @@ public class TezJobCompiler {
DAG tezDag = buildDAG(tezPlanNode, localResources);
tezDag.setDAGInfo(createDagInfo(TezScriptState.get().getScript()));
log.info("Total estimated parallelism is " + tezPlan.getEstimatedTotalParallelism());
- return new TezJob(tezConf, tezDag, localResources, tezPlan.getEstimatedTotalParallelism());
+ return new TezJob(tezConf, tezDag, localResources, tezPlan);
} catch (Exception e) {
int errCode = 2017;
String msg = "Internal error creating job configuration.";
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java?rev=1738666&r1=1738665&r2=1738666&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java Mon Apr 11 23:37:24 2016
@@ -108,8 +108,6 @@ public class TezSessionManager {
}
private static void adjustAMConfig(TezConfiguration amConf, TezJobConfig tezJobConf) {
- int requiredAMMaxHeap = -1;
- int requiredAMResourceMB = -1;
String amLaunchOpts = amConf.get(
TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS,
TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS_DEFAULT);
@@ -122,8 +120,10 @@ public class TezSessionManager {
// Need more room for native memory/virtual address space
// when close to 4G due to 32-bit jvm 4G limit
- int minAMMaxHeap = 3200;
- int minAMResourceMB = 4096;
+ int maxAMHeap = Utils.is64bitJVM() ? 3584 : 3200;
+ int maxAMResourceMB = 4096;
+ int requiredAMResourceMB = maxAMResourceMB;
+ int requiredAMMaxHeap = maxAMHeap;
// Rough estimation. For 5K tasks 1G Xmx and 1.5G resource.mb
// Increment container size by 512 mb for every additional 5K tasks.
@@ -135,22 +135,38 @@ public class TezSessionManager {
// 5000 and above - 1024Xmx, 1536 (512 native memory)
for (int taskCount = 30000; taskCount >= 5000; taskCount-=5000) {
if (tezJobConf.getEstimatedTotalParallelism() >= taskCount) {
- requiredAMMaxHeap = minAMMaxHeap;
- requiredAMResourceMB = minAMResourceMB;
break;
}
- minAMResourceMB = minAMResourceMB - 512;
- minAMMaxHeap = minAMResourceMB - 512;
+ requiredAMResourceMB = requiredAMResourceMB - 512;
+ requiredAMMaxHeap = requiredAMResourceMB - 512;
}
+ if (tezJobConf.getTotalVertices() > 30) {
+ //Add 512 mb per 30 vertices
+ int additionaMem = 512 * (tezJobConf.getTotalVertices() / 30);
+ requiredAMResourceMB = requiredAMResourceMB + additionaMem;
+ requiredAMMaxHeap = requiredAMResourceMB - 512;
+ }
+
+ if (tezJobConf.getMaxOutputsinSingleVertex() > 10) {
+ //Add 256 mb per 5 outputs if a vertex has more than 10 outputs
+ int additionaMem = 256 * (tezJobConf.getMaxOutputsinSingleVertex() / 5);
+ requiredAMResourceMB = requiredAMResourceMB + additionaMem;
+ requiredAMMaxHeap = requiredAMResourceMB - 512;
+ }
+
+ requiredAMResourceMB = Math.min(maxAMResourceMB, requiredAMResourceMB);
+ requiredAMMaxHeap = Math.min(maxAMHeap, requiredAMMaxHeap);
+
if (requiredAMResourceMB > -1 && configuredAMResourceMB < requiredAMResourceMB) {
amConf.setInt(TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB, requiredAMResourceMB);
log.info("Increasing "
+ TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB + " from "
+ configuredAMResourceMB + " to "
+ requiredAMResourceMB
- + " as the number of total estimated tasks is "
- + tezJobConf.getEstimatedTotalParallelism());
+ + " as total estimated tasks = " + tezJobConf.getEstimatedTotalParallelism()
+ + ", total vertices = " + tezJobConf.getTotalVertices()
+ + ", max outputs = " + tezJobConf.getMaxOutputsinSingleVertex());
if (requiredAMMaxHeap > -1 && configuredAMMaxHeap < requiredAMMaxHeap) {
amConf.set(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS,
@@ -158,8 +174,9 @@ public class TezSessionManager {
log.info("Increasing Tez AM Heap Size from "
+ configuredAMMaxHeap + "M to "
+ requiredAMMaxHeap
- + "M as the number of total estimated tasks is "
- + tezJobConf.getEstimatedTotalParallelism());
+ + "M as total estimated tasks = " + tezJobConf.getEstimatedTotalParallelism()
+ + ", total vertices = " + tezJobConf.getTotalVertices()
+ + ", max outputs = " + tezJobConf.getMaxOutputsinSingleVertex());
log.info("Value of " + TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS + " is now "
+ amConf.get(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS));
}
Modified: pig/trunk/src/org/apache/pig/impl/util/Utils.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/Utils.java?rev=1738666&r1=1738665&r2=1738666&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/Utils.java (original)
+++ pig/trunk/src/org/apache/pig/impl/util/Utils.java Mon Apr 11 23:37:24 2016
@@ -107,6 +107,12 @@ public class Utils {
return false;
}
+ public static boolean is64bitJVM() {
+ String arch = System.getProperties().getProperty("sun.arch.data.model",
+ System.getProperty("com.ibm.vm.bitmode"));
+ return arch != null && arch.equals("64");
+ }
+
/**
* This method is a helper for classes to implement {@link java.lang.Object#equals(java.lang.Object)}
* checks if two objects are equals - two levels of checks are