You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2016/04/12 01:37:24 UTC

svn commit: r1738666 - in /pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/tez/ src/org/apache/pig/impl/util/

Author: rohini
Date: Mon Apr 11 23:37:24 2016
New Revision: 1738666

URL: http://svn.apache.org/viewvc?rev=1738666&view=rev
Log:
PIG-4844: Tez AM runs out of memory when vertex has high number of outputs (rohini)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java
    pig/trunk/src/org/apache/pig/impl/util/Utils.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1738666&r1=1738665&r2=1738666&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Apr 11 23:37:24 2016
@@ -107,6 +107,8 @@ PIG-4639: Add better parser for Apache H
 
 BUG FIXES
 
+PIG-4844: Tez AM runs out of memory when vertex has high number of outputs (rohini)
+
 PIG-3906: ant site errors out (nielsbasjes via daijy)
 
 PIG-4851: Null not padded when input has less fields than declared schema for some loader (rohini)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java?rev=1738666&r1=1738665&r2=1738666&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJob.java Mon Apr 11 23:37:24 2016
@@ -30,6 +30,11 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.pig.PigConfiguration;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOpPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.tez.plan.TezOperator;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.UDFContext;
 import org.apache.pig.tools.pigstats.tez.TezPigScriptStats;
 import org.apache.tez.client.TezClient;
@@ -69,31 +74,71 @@ public class TezJob implements Runnable
 
     public TezJob(TezConfiguration conf, DAG dag,
             Map<String, LocalResource> requestAMResources,
-            int estimatedTotalParallelism) throws IOException {
+            TezOperPlan tezPlan) throws IOException {
         this.conf = conf;
         this.dag = dag;
         this.requestAMResources = requestAMResources;
         this.reuseSession = conf.getBoolean(PigConfiguration.PIG_TEZ_SESSION_REUSE, true);
         this.statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS);
-        tezJobConf = new TezJobConfig(estimatedTotalParallelism);
+        tezJobConf = new TezJobConfig(tezPlan);
     }
 
     static class TezJobConfig {
 
         private int estimatedTotalParallelism = -1;
+        private int maxOutputsinSingleVertex;
+        private int totalVertices  = 0;
 
-        public TezJobConfig(int estimatedTotalParallelism) {
-            this.estimatedTotalParallelism = estimatedTotalParallelism;
+        public TezJobConfig(TezOperPlan tezPlan) throws VisitorException {
+            this.estimatedTotalParallelism = tezPlan.getEstimatedTotalParallelism();
+            MaxOutputsFinder finder = new MaxOutputsFinder(tezPlan);
+            finder.visit();
+            this.maxOutputsinSingleVertex = finder.getMaxOutputsinSingleVertex();
+            this.totalVertices = finder.getTotalVertices();
         }
 
         public int getEstimatedTotalParallelism() {
             return estimatedTotalParallelism;
         }
 
-        public void setEstimatedTotalParallelism(int estimatedTotalParallelism) {
-            this.estimatedTotalParallelism = estimatedTotalParallelism;
+        public int getMaxOutputsinSingleVertex() {
+            return maxOutputsinSingleVertex;
         }
 
+        public int getTotalVertices() {
+            return totalVertices;
+        }
+
+    }
+
+    private static class MaxOutputsFinder extends TezOpPlanVisitor {
+
+        private int maxOutputsinSingleVertex  = 1;
+        private int totalVertices  = 0;
+
+        public MaxOutputsFinder(TezOperPlan plan) {
+            super(plan, new DependencyOrderWalker<TezOperator, TezOperPlan>(plan));
+        }
+
+        public int getMaxOutputsinSingleVertex() {
+            return maxOutputsinSingleVertex;
+        }
+
+        public int getTotalVertices() {
+            return totalVertices;
+        }
+
+        @Override
+        public void visitTezOp(TezOperator tezOperator) throws VisitorException {
+            if (!tezOperator.isVertexGroup()) {
+                totalVertices++;
+                int outputs = tezOperator.outEdges.keySet().size();
+                maxOutputsinSingleVertex = maxOutputsinSingleVertex > outputs ? maxOutputsinSingleVertex : outputs;
+            }
+        }
+
+
+
     }
 
     public DAG getDAG() {

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java?rev=1738666&r1=1738665&r2=1738666&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezJobCompiler.java Mon Apr 11 23:37:24 2016
@@ -108,7 +108,7 @@ public class TezJobCompiler {
             DAG tezDag = buildDAG(tezPlanNode, localResources);
             tezDag.setDAGInfo(createDagInfo(TezScriptState.get().getScript()));
             log.info("Total estimated parallelism is " + tezPlan.getEstimatedTotalParallelism());
-            return new TezJob(tezConf, tezDag, localResources, tezPlan.getEstimatedTotalParallelism());
+            return new TezJob(tezConf, tezDag, localResources, tezPlan);
         } catch (Exception e) {
             int errCode = 2017;
             String msg = "Internal error creating job configuration.";

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java?rev=1738666&r1=1738665&r2=1738666&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezSessionManager.java Mon Apr 11 23:37:24 2016
@@ -108,8 +108,6 @@ public class TezSessionManager {
     }
 
     private static void adjustAMConfig(TezConfiguration amConf, TezJobConfig tezJobConf) {
-        int requiredAMMaxHeap = -1;
-        int requiredAMResourceMB = -1;
         String amLaunchOpts = amConf.get(
                 TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS,
                 TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS_DEFAULT);
@@ -122,8 +120,10 @@ public class TezSessionManager {
 
             // Need more room for native memory/virtual address space
             // when close to 4G due to 32-bit jvm 4G limit
-            int minAMMaxHeap = 3200;
-            int minAMResourceMB = 4096;
+            int maxAMHeap = Utils.is64bitJVM() ? 3584 : 3200;
+            int maxAMResourceMB = 4096;
+            int requiredAMResourceMB = maxAMResourceMB;
+            int requiredAMMaxHeap = maxAMHeap;
 
             // Rough estimation. For 5K tasks 1G Xmx and 1.5G resource.mb
             // Increment container size by 512 mb for every additional 5K tasks.
@@ -135,22 +135,38 @@ public class TezSessionManager {
             //     5000 and above  - 1024Xmx, 1536 (512 native memory)
             for (int taskCount = 30000; taskCount >= 5000; taskCount-=5000) {
                 if (tezJobConf.getEstimatedTotalParallelism() >= taskCount) {
-                    requiredAMMaxHeap = minAMMaxHeap;
-                    requiredAMResourceMB = minAMResourceMB;
                     break;
                 }
-                minAMResourceMB = minAMResourceMB - 512;
-                minAMMaxHeap = minAMResourceMB - 512;
+                requiredAMResourceMB = requiredAMResourceMB - 512;
+                requiredAMMaxHeap = requiredAMResourceMB - 512;
             }
 
+            if (tezJobConf.getTotalVertices() > 30) {
+                //Add 512 mb per 30 vertices
+                int additionaMem = 512 * (tezJobConf.getTotalVertices() / 30);
+                requiredAMResourceMB = requiredAMResourceMB + additionaMem;
+                requiredAMMaxHeap = requiredAMResourceMB - 512;
+            }
+
+            if (tezJobConf.getMaxOutputsinSingleVertex() > 10) {
+                //Add 256 mb per 5 outputs if a vertex has more than 10 outputs
+                int additionaMem = 256 * (tezJobConf.getMaxOutputsinSingleVertex() / 5);
+                requiredAMResourceMB = requiredAMResourceMB + additionaMem;
+                requiredAMMaxHeap = requiredAMResourceMB - 512;
+            }
+
+            requiredAMResourceMB = Math.min(maxAMResourceMB, requiredAMResourceMB);
+            requiredAMMaxHeap = Math.min(maxAMHeap, requiredAMMaxHeap);
+
             if (requiredAMResourceMB > -1 && configuredAMResourceMB < requiredAMResourceMB) {
                 amConf.setInt(TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB, requiredAMResourceMB);
                 log.info("Increasing "
                         + TezConfiguration.TEZ_AM_RESOURCE_MEMORY_MB + " from "
                         + configuredAMResourceMB + " to "
                         + requiredAMResourceMB
-                        + " as the number of total estimated tasks is "
-                        + tezJobConf.getEstimatedTotalParallelism());
+                        + " as total estimated tasks = " + tezJobConf.getEstimatedTotalParallelism()
+                        + ", total vertices = " + tezJobConf.getTotalVertices()
+                        + ", max outputs = " + tezJobConf.getMaxOutputsinSingleVertex());
 
                 if (requiredAMMaxHeap > -1 && configuredAMMaxHeap < requiredAMMaxHeap) {
                     amConf.set(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS,
@@ -158,8 +174,9 @@ public class TezSessionManager {
                     log.info("Increasing Tez AM Heap Size from "
                             + configuredAMMaxHeap + "M to "
                             + requiredAMMaxHeap
-                            + "M as the number of total estimated tasks is "
-                            + tezJobConf.getEstimatedTotalParallelism());
+                            + "M as total estimated tasks = " + tezJobConf.getEstimatedTotalParallelism()
+                            + ", total vertices = " + tezJobConf.getTotalVertices()
+                            + ", max outputs = " + tezJobConf.getMaxOutputsinSingleVertex());
                     log.info("Value of " + TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS + " is now "
                             + amConf.get(TezConfiguration.TEZ_AM_LAUNCH_CMD_OPTS));
                 }

Modified: pig/trunk/src/org/apache/pig/impl/util/Utils.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/util/Utils.java?rev=1738666&r1=1738665&r2=1738666&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/util/Utils.java (original)
+++ pig/trunk/src/org/apache/pig/impl/util/Utils.java Mon Apr 11 23:37:24 2016
@@ -107,6 +107,12 @@ public class Utils {
         return false;
     }
 
+    public static boolean is64bitJVM() {
+        String arch = System.getProperties().getProperty("sun.arch.data.model",
+                System.getProperty("com.ibm.vm.bitmode"));
+        return arch != null && arch.equals("64");
+    }
+
     /**
      * This method is a helper for classes to implement {@link java.lang.Object#equals(java.lang.Object)}
      * checks if two objects are equals - two levels of checks are