You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by rk...@apache.org on 2016/02/25 00:00:38 UTC

hadoop git commit: YARN-4697. NM aggregation thread pool is not bound by limits (haibochen via rkanter)

Repository: hadoop
Updated Branches:
  refs/heads/trunk efdc0070d -> 954dd5704


YARN-4697. NM aggregation thread pool is not bound by limits (haibochen via rkanter)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/954dd570
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/954dd570
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/954dd570

Branch: refs/heads/trunk
Commit: 954dd57043d2de4f962876c1b89753bfc7e4ce55
Parents: efdc007
Author: Robert Kanter <rk...@apache.org>
Authored: Wed Feb 24 15:00:24 2016 -0800
Committer: Robert Kanter <rk...@apache.org>
Committed: Wed Feb 24 15:00:24 2016 -0800

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../hadoop/yarn/conf/YarnConfiguration.java     |   5 +
 .../src/main/resources/yarn-default.xml         |   8 ++
 .../logaggregation/LogAggregationService.java   |  35 ++++-
 .../TestLogAggregationService.java              | 143 +++++++++++++++++++
 5 files changed, 188 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/954dd570/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 9aa1037..7327192 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -143,6 +143,9 @@ Release 2.9.0 - UNRELEASED
     YARN-4648. Move preemption related tests from TestFairScheduler to
     TestFairSchedulerPreemption. (Kai Sasaki via ozawa)
 
+    YARN-4697. NM aggregation thread pool is not bound by
+    limits (haibochen via rkanter)
+
   OPTIMIZATIONS
 
   BUG FIXES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/954dd570/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index f0c7e6d..69e5ba5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -759,6 +759,11 @@ public class YarnConfiguration extends Configuration {
   public static final String NM_LOG_DIRS = NM_PREFIX + "log-dirs";
   public static final String DEFAULT_NM_LOG_DIRS = "/tmp/logs";
 
+  /** The number of threads to handle log aggregation in node manager. */
+  public static final String NM_LOG_AGGREGATION_THREAD_POOL_SIZE =
+      NM_PREFIX + "logaggregation.threadpool-size-max";
+  public static final int DEFAULT_NM_LOG_AGGREGATION_THREAD_POOL_SIZE = 100;
+
   public static final String NM_RESOURCEMANAGER_MINIMUM_VERSION =
       NM_PREFIX + "resourcemanager.minimum.version";
   public static final String DEFAULT_NM_RESOURCEMANAGER_MINIMUM_VERSION = "NONE";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/954dd570/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index cd4074a..72c89c9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -1227,6 +1227,14 @@
   </property>
 
   <property>
+    <description>
+    Thread pool size for LogAggregationService in Node Manager.
+    </description>
+    <name>yarn.nodemanager.logaggregation.threadpool-size-max</name>
+    <value>100</value>
+  </property>
+
+  <property>
     <description>Percentage of CPU that can be allocated
     for containers. This setting allows users to limit the amount of
     CPU that YARN containers use. Currently functional only

http://git-wip-us.apache.org/repos/asf/hadoop/blob/954dd570/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
index 6411535..2d6b900 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
@@ -102,7 +102,8 @@ public class LogAggregationService extends AbstractService implements
 
   private final ConcurrentMap<ApplicationId, AppLogAggregator> appLogAggregators;
 
-  private final ExecutorService threadPool;
+  @VisibleForTesting
+  ExecutorService threadPool;
   
   public LogAggregationService(Dispatcher dispatcher, Context context,
       DeletionService deletionService, LocalDirsHandlerService dirsHandler) {
@@ -113,10 +114,6 @@ public class LogAggregationService extends AbstractService implements
     this.dirsHandler = dirsHandler;
     this.appLogAggregators =
         new ConcurrentHashMap<ApplicationId, AppLogAggregator>();
-    this.threadPool = HadoopExecutors.newCachedThreadPool(
-        new ThreadFactoryBuilder()
-          .setNameFormat("LogAggregationService #%d")
-          .build());
   }
 
   protected void serviceInit(Configuration conf) throws Exception {
@@ -126,7 +123,11 @@ public class LogAggregationService extends AbstractService implements
     this.remoteRootLogDirSuffix =
         conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX,
             YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX);
-
+    int threadPoolSize = getAggregatorThreadPoolSize(conf);
+    this.threadPool = HadoopExecutors.newFixedThreadPool(threadPoolSize,
+        new ThreadFactoryBuilder()
+            .setNameFormat("LogAggregationService #%d")
+            .build());
     super.serviceInit(conf);
   }
 
@@ -487,4 +488,26 @@ public class LogAggregationService extends AbstractService implements
   public NodeId getNodeId() {
     return this.nodeId;
   }
+
+
+  private int getAggregatorThreadPoolSize(Configuration conf) {
+    int threadPoolSize;
+    try {
+      threadPoolSize = conf.getInt(YarnConfiguration
+          .NM_LOG_AGGREGATION_THREAD_POOL_SIZE,
+          YarnConfiguration.DEFAULT_NM_LOG_AGGREGATION_THREAD_POOL_SIZE);
+    } catch (NumberFormatException ex) {
+      LOG.warn("Invalid thread pool size. Setting it to the default value " +
+          "in YarnConfiguration");
+      threadPoolSize = YarnConfiguration.
+          DEFAULT_NM_LOG_AGGREGATION_THREAD_POOL_SIZE;
+    }
+    if(threadPoolSize <= 0) {
+      LOG.warn("Invalid thread pool size. Setting it to the default value " +
+          "in YarnConfiguration");
+      threadPoolSize = YarnConfiguration.
+          DEFAULT_NM_LOG_AGGREGATION_THREAD_POOL_SIZE;
+    }
+    return threadPoolSize;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/954dd570/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
index 101fef0..87c3f27 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
@@ -55,6 +55,12 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.LogFactory;
@@ -1040,6 +1046,143 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
     return appAcls;
   }
 
+  @Test (timeout = 30000)
+  public void testFixedSizeThreadPool() throws Exception {
+    // store configured thread pool size temporarily for restoration
+    int initThreadPoolSize = conf.getInt(YarnConfiguration
+        .NM_LOG_AGGREGATION_THREAD_POOL_SIZE,
+        YarnConfiguration.DEFAULT_NM_LOG_AGGREGATION_THREAD_POOL_SIZE);
+
+    int threadPoolSize = 3;
+    conf.setInt(YarnConfiguration.NM_LOG_AGGREGATION_THREAD_POOL_SIZE,
+        threadPoolSize);
+
+    DeletionService delSrvc = mock(DeletionService.class);
+
+    LocalDirsHandlerService dirSvc = mock(LocalDirsHandlerService.class);
+    when(dirSvc.getLogDirs()).thenThrow(new RuntimeException());
+
+    LogAggregationService logAggregationService =
+      new LogAggregationService(dispatcher, this.context, delSrvc, dirSvc);
+
+    logAggregationService.init(this.conf);
+    logAggregationService.start();
+
+    ExecutorService executorService = logAggregationService.threadPool;
+
+    // used to block threads in the thread pool because main thread always
+    // acquires the write lock first.
+    final ReadWriteLock rwLock = new ReentrantReadWriteLock();
+    final Lock rLock = rwLock.readLock();
+    final Lock wLock = rwLock.writeLock();
+
+    try {
+      wLock.lock();
+      Runnable runnable = new Runnable() {
+        @Override
+        public void run() {
+          try {
+            // threads in the thread pool running this will be blocked
+            rLock.tryLock(35000, TimeUnit.MILLISECONDS);
+          } catch (InterruptedException e) {
+            e.printStackTrace();
+          } finally {
+            rLock.unlock();
+          }
+        }
+      };
+
+      // submit $(threadPoolSize + 1) runnables to the thread pool. If the thread
+      // pool size is set properly, only $(threadPoolSize) threads will be
+      // created in the thread pool, each of which is blocked on the read lock.
+      for(int i = 0; i < threadPoolSize + 1; i++)  {
+        executorService.submit(runnable);
+      }
+
+      // count the number of current running LogAggregationService threads
+      int runningThread = ((ThreadPoolExecutor) executorService).getActiveCount();
+      assertEquals(threadPoolSize, runningThread);
+    }
+    finally {
+      wLock.unlock();
+    }
+
+    logAggregationService.stop();
+    logAggregationService.close();
+
+    // restore the original configurations to avoid side effects
+    conf.setInt(YarnConfiguration.NM_LOG_AGGREGATION_THREAD_POOL_SIZE,
+        initThreadPoolSize);
+  }
+
+  @Test
+  public void testInvalidThreadPoolSizeNaN() throws IOException {
+      testInvalidThreadPoolSizeValue("NaN");
+  }
+
+  @Test
+  public void testInvalidThreadPoolSizeNegative() throws IOException {
+      testInvalidThreadPoolSizeValue("-100");
+  }
+
+  @Test
+  public void testInvalidThreadPoolSizeXLarge() throws  IOException {
+      testInvalidThreadPoolSizeValue("11111111111");
+  }
+
+  private void testInvalidThreadPoolSizeValue(final String threadPoolSize)
+      throws IOException {
+    Supplier<Boolean> isInputInvalid = new Supplier<Boolean>() {
+        @Override
+        public Boolean get() {
+          try {
+            int value = Integer.parseInt(threadPoolSize);
+            return value <= 0;
+          } catch (NumberFormatException ex) {
+            return true;
+          }
+        }
+    };
+
+    assertTrue("The thread pool size must be invalid to use with this " +
+        "method", isInputInvalid.get());
+
+
+    // store configured thread pool size temporarily for restoration
+    int initThreadPoolSize = conf.getInt(YarnConfiguration
+        .NM_LOG_AGGREGATION_THREAD_POOL_SIZE,
+        YarnConfiguration.DEFAULT_NM_LOG_AGGREGATION_THREAD_POOL_SIZE);
+
+    conf.set(YarnConfiguration.NM_LOG_AGGREGATION_THREAD_POOL_SIZE,
+         threadPoolSize);
+
+    DeletionService delSrvc = mock(DeletionService.class);
+
+    LocalDirsHandlerService dirSvc = mock(LocalDirsHandlerService.class);
+    when(dirSvc.getLogDirs()).thenThrow(new RuntimeException());
+
+    LogAggregationService logAggregationService =
+         new LogAggregationService(dispatcher, this.context, delSrvc, dirSvc);
+
+    logAggregationService.init(this.conf);
+    logAggregationService.start();
+
+    ThreadPoolExecutor executorService = (ThreadPoolExecutor)
+        logAggregationService.threadPool;
+    assertEquals("The thread pool size should be set to the value of YARN" +
+        ".DEFAULT_NM_LOG_AGGREGATION_THREAD_POOL_SIZE because the configured "
+         + " thread pool size is " + "invalid.",
+        YarnConfiguration.DEFAULT_NM_LOG_AGGREGATION_THREAD_POOL_SIZE,
+        executorService.getMaximumPoolSize());
+
+    logAggregationService.stop();
+    logAggregationService.close();
+
+     // retore original configuration to aviod side effects
+     conf.setInt(YarnConfiguration.NM_LOG_AGGREGATION_THREAD_POOL_SIZE,
+         initThreadPoolSize);
+  }
+
   @Test(timeout=20000)
   public void testStopAfterError() throws Exception {
     DeletionService delSrvc = mock(DeletionService.class);