You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hive.apache.org by rb...@apache.org on 2017/01/27 23:11:46 UTC

hive git commit: HIVE-15693: LLAP: cached threadpool in AMReporter creates too many threads leading to OOM (Rajesh Balamohan, reviewed by Siddharth Seth, Lefty Leverenz)

Repository: hive
Updated Branches:
  refs/heads/master aabe83dbf -> 79eb2243a


HIVE-15693: LLAP: cached threadpool in AMReporter creates too many threads leading to OOM (Rajesh Balamohan, reviewed by Siddharth Seth, Lefty Leverenz)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/79eb2243
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/79eb2243
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/79eb2243

Branch: refs/heads/master
Commit: 79eb2243aac6b5b0f5678369d579581a91a1925f
Parents: aabe83d
Author: Rajesh Balamohan <rb...@apache.org>
Authored: Sat Jan 28 04:41:35 2017 +0530
Committer: Rajesh Balamohan <rb...@apache.org>
Committed: Sat Jan 28 04:41:35 2017 +0530

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |  4 ++++
 .../hive/llap/daemon/impl/AMReporter.java       | 20 ++++++++++++++------
 .../hive/llap/daemon/impl/LlapDaemon.java       |  4 +++-
 3 files changed, 21 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/79eb2243/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 1e7bfd7..4e83867 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -3081,6 +3081,10 @@ public class HiveConf extends Configuration {
     LLAP_DAEMON_NUM_EXECUTORS("hive.llap.daemon.num.executors", 4,
       "Number of executors to use in LLAP daemon; essentially, the number of tasks that can be\n" +
       "executed in parallel.", "llap.daemon.num.executors"),
+    LLAP_DAEMON_AM_REPORTER_MAX_THREADS("hive.llap.daemon.am-reporter.max.threads", 4,
+        "Maximum number of threads to be used for AM reporter. If this is lower than number of\n" +
+        "executors in llap daemon, it would be set to number of executors at runtime.",
+        "llap.daemon.am-reporter.max.threads"),
     LLAP_DAEMON_RPC_PORT("hive.llap.daemon.rpc.port", 0, "The LLAP daemon RPC port.",
       "llap.daemon.rpc.port. A value of 0 indicates a dynamic port"),
     LLAP_DAEMON_MEMORY_PER_INSTANCE_MB("hive.llap.daemon.memory.per.instance.mb", 4096,

http://git-wip-us.apache.org/repos/asf/hive/blob/79eb2243/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
index 027d8eb..93237e6 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
@@ -40,6 +40,8 @@ import java.util.concurrent.DelayQueue;
 import java.util.concurrent.Delayed;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -90,8 +92,6 @@ public class AMReporter extends AbstractService {
   Ignore exceptions when communicating with the AM.
   At a later point, report back saying the AM is dead so that tasks can be removed from the running queue.
 
-  Use a cachedThreadPool so that a few AMs going down does not affect other AppMasters.
-
   Race: When a task completes - it sends out it's message via the regular TaskReporter. The AM after this may run another DAG,
   or may die. This may need to be consolidated with the LlapTaskReporter. Try ensuring there's no race between the two.
 
@@ -118,15 +118,23 @@ public class AMReporter extends AbstractService {
   volatile ListenableFuture<Void> queueLookupFuture;
   private final DaemonId daemonId;
 
-  public AMReporter(AtomicReference<InetSocketAddress> localAddress,
-      QueryFailedHandler queryFailedHandler, Configuration conf, DaemonId daemonId) {
+  public AMReporter(int numExecutors, int maxThreads, AtomicReference<InetSocketAddress>
+      localAddress, QueryFailedHandler queryFailedHandler, Configuration conf, DaemonId daemonId) {
     super(AMReporter.class.getName());
     this.localAddress = localAddress;
     this.queryFailedHandler = queryFailedHandler;
     this.conf = conf;
     this.daemonId = daemonId;
-    ExecutorService rawExecutor = Executors.newCachedThreadPool(
-        new ThreadFactoryBuilder().setDaemon(true).setNameFormat("AMReporter %d").build());
+    if (maxThreads < numExecutors) {
+      maxThreads = numExecutors;
+      LOG.warn("maxThreads={} is less than numExecutors={}. Setting maxThreads=numExecutors",
+          maxThreads, numExecutors);
+    }
+    ExecutorService rawExecutor =
+        new ThreadPoolExecutor(numExecutors, maxThreads,
+            60L, TimeUnit.SECONDS,
+            new LinkedBlockingQueue<Runnable>(),
+            new ThreadFactoryBuilder().setDaemon(true).setNameFormat("AMReporter %d").build());
     this.executor = MoreExecutors.listeningDecorator(rawExecutor);
     ExecutorService rawExecutor2 = Executors.newFixedThreadPool(1,
         new ThreadFactoryBuilder().setDaemon(true).setNameFormat("AMReporterQueueDrainer").build());

http://git-wip-us.apache.org/repos/asf/hive/blob/79eb2243/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
index 519bfbd..cca6bc6 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
@@ -254,7 +254,9 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla
     LOG.info("Started LlapMetricsSystem with displayName: " + displayName +
         " sessionId: " + sessionId);
 
-    this.amReporter = new AMReporter(srvAddress, new QueryFailedHandlerProxy(), daemonConf, daemonId);
+    int maxAmReporterThreads = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_DAEMON_AM_REPORTER_MAX_THREADS);
+    this.amReporter = new AMReporter(numExecutors, maxAmReporterThreads, srvAddress,
+        new QueryFailedHandlerProxy(), daemonConf, daemonId);
 
     SecretManager sm = null;
     if (UserGroupInformation.isSecurityEnabled()) {