You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by rk...@apache.org on 2016/02/25 00:00:38 UTC
hadoop git commit: YARN-4697. NM aggregation thread pool is not bound
by limits (haibochen via rkanter)
Repository: hadoop
Updated Branches:
refs/heads/trunk efdc0070d -> 954dd5704
YARN-4697. NM aggregation thread pool is not bound by limits (haibochen via rkanter)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/954dd570
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/954dd570
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/954dd570
Branch: refs/heads/trunk
Commit: 954dd57043d2de4f962876c1b89753bfc7e4ce55
Parents: efdc007
Author: Robert Kanter <rk...@apache.org>
Authored: Wed Feb 24 15:00:24 2016 -0800
Committer: Robert Kanter <rk...@apache.org>
Committed: Wed Feb 24 15:00:24 2016 -0800
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../hadoop/yarn/conf/YarnConfiguration.java | 5 +
.../src/main/resources/yarn-default.xml | 8 ++
.../logaggregation/LogAggregationService.java | 35 ++++-
.../TestLogAggregationService.java | 143 +++++++++++++++++++
5 files changed, 188 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/954dd570/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 9aa1037..7327192 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -143,6 +143,9 @@ Release 2.9.0 - UNRELEASED
YARN-4648. Move preemption related tests from TestFairScheduler to
TestFairSchedulerPreemption. (Kai Sasaki via ozawa)
+ YARN-4697. NM aggregation thread pool is not bound by
+ limits (haibochen via rkanter)
+
OPTIMIZATIONS
BUG FIXES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/954dd570/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
index f0c7e6d..69e5ba5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java
@@ -759,6 +759,11 @@ public class YarnConfiguration extends Configuration {
public static final String NM_LOG_DIRS = NM_PREFIX + "log-dirs";
public static final String DEFAULT_NM_LOG_DIRS = "/tmp/logs";
+ /** The number of threads to handle log aggregation in node manager. */
+ public static final String NM_LOG_AGGREGATION_THREAD_POOL_SIZE =
+ NM_PREFIX + "logaggregation.threadpool-size-max";
+ public static final int DEFAULT_NM_LOG_AGGREGATION_THREAD_POOL_SIZE = 100;
+
public static final String NM_RESOURCEMANAGER_MINIMUM_VERSION =
NM_PREFIX + "resourcemanager.minimum.version";
public static final String DEFAULT_NM_RESOURCEMANAGER_MINIMUM_VERSION = "NONE";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/954dd570/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
index cd4074a..72c89c9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml
@@ -1227,6 +1227,14 @@
</property>
<property>
+ <description>
+ Thread pool size for LogAggregationService in Node Manager.
+ </description>
+ <name>yarn.nodemanager.logaggregation.threadpool-size-max</name>
+ <value>100</value>
+ </property>
+
+ <property>
<description>Percentage of CPU that can be allocated
for containers. This setting allows users to limit the amount of
CPU that YARN containers use. Currently functional only
http://git-wip-us.apache.org/repos/asf/hadoop/blob/954dd570/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
index 6411535..2d6b900 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
@@ -102,7 +102,8 @@ public class LogAggregationService extends AbstractService implements
private final ConcurrentMap<ApplicationId, AppLogAggregator> appLogAggregators;
- private final ExecutorService threadPool;
+ @VisibleForTesting
+ ExecutorService threadPool;
public LogAggregationService(Dispatcher dispatcher, Context context,
DeletionService deletionService, LocalDirsHandlerService dirsHandler) {
@@ -113,10 +114,6 @@ public class LogAggregationService extends AbstractService implements
this.dirsHandler = dirsHandler;
this.appLogAggregators =
new ConcurrentHashMap<ApplicationId, AppLogAggregator>();
- this.threadPool = HadoopExecutors.newCachedThreadPool(
- new ThreadFactoryBuilder()
- .setNameFormat("LogAggregationService #%d")
- .build());
}
protected void serviceInit(Configuration conf) throws Exception {
@@ -126,7 +123,11 @@ public class LogAggregationService extends AbstractService implements
this.remoteRootLogDirSuffix =
conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR_SUFFIX,
YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR_SUFFIX);
-
+ int threadPoolSize = getAggregatorThreadPoolSize(conf);
+ this.threadPool = HadoopExecutors.newFixedThreadPool(threadPoolSize,
+ new ThreadFactoryBuilder()
+ .setNameFormat("LogAggregationService #%d")
+ .build());
super.serviceInit(conf);
}
@@ -487,4 +488,26 @@ public class LogAggregationService extends AbstractService implements
public NodeId getNodeId() {
return this.nodeId;
}
+
+
+ private int getAggregatorThreadPoolSize(Configuration conf) {
+ int threadPoolSize;
+ try {
+ threadPoolSize = conf.getInt(YarnConfiguration
+ .NM_LOG_AGGREGATION_THREAD_POOL_SIZE,
+ YarnConfiguration.DEFAULT_NM_LOG_AGGREGATION_THREAD_POOL_SIZE);
+ } catch (NumberFormatException ex) {
+ LOG.warn("Invalid thread pool size. Setting it to the default value " +
+ "in YarnConfiguration");
+ threadPoolSize = YarnConfiguration.
+ DEFAULT_NM_LOG_AGGREGATION_THREAD_POOL_SIZE;
+ }
+ if(threadPoolSize <= 0) {
+ LOG.warn("Invalid thread pool size. Setting it to the default value " +
+ "in YarnConfiguration");
+ threadPoolSize = YarnConfiguration.
+ DEFAULT_NM_LOG_AGGREGATION_THREAD_POOL_SIZE;
+ }
+ return threadPoolSize;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/954dd570/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
index 101fef0..87c3f27 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
@@ -55,6 +55,12 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.LogFactory;
@@ -1040,6 +1046,143 @@ public class TestLogAggregationService extends BaseContainerManagerTest {
return appAcls;
}
+ @Test (timeout = 30000)
+ public void testFixedSizeThreadPool() throws Exception {
+ // store configured thread pool size temporarily for restoration
+ int initThreadPoolSize = conf.getInt(YarnConfiguration
+ .NM_LOG_AGGREGATION_THREAD_POOL_SIZE,
+ YarnConfiguration.DEFAULT_NM_LOG_AGGREGATION_THREAD_POOL_SIZE);
+
+ int threadPoolSize = 3;
+ conf.setInt(YarnConfiguration.NM_LOG_AGGREGATION_THREAD_POOL_SIZE,
+ threadPoolSize);
+
+ DeletionService delSrvc = mock(DeletionService.class);
+
+ LocalDirsHandlerService dirSvc = mock(LocalDirsHandlerService.class);
+ when(dirSvc.getLogDirs()).thenThrow(new RuntimeException());
+
+ LogAggregationService logAggregationService =
+ new LogAggregationService(dispatcher, this.context, delSrvc, dirSvc);
+
+ logAggregationService.init(this.conf);
+ logAggregationService.start();
+
+ ExecutorService executorService = logAggregationService.threadPool;
+
+ // used to block threads in the thread pool because main thread always
+ // acquires the write lock first.
+ final ReadWriteLock rwLock = new ReentrantReadWriteLock();
+ final Lock rLock = rwLock.readLock();
+ final Lock wLock = rwLock.writeLock();
+
+ try {
+ wLock.lock();
+ Runnable runnable = new Runnable() {
+ @Override
+ public void run() {
+ try {
+ // threads in the thread pool running this will be blocked
+ rLock.tryLock(35000, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ } finally {
+ rLock.unlock();
+ }
+ }
+ };
+
+ // submit $(threadPoolSize + 1) runnables to the thread pool. If the thread
+ // pool size is set properly, only $(threadPoolSize) threads will be
+ // created in the thread pool, each of which is blocked on the read lock.
+ for(int i = 0; i < threadPoolSize + 1; i++) {
+ executorService.submit(runnable);
+ }
+
+ // count the number of current running LogAggregationService threads
+ int runningThread = ((ThreadPoolExecutor) executorService).getActiveCount();
+ assertEquals(threadPoolSize, runningThread);
+ }
+ finally {
+ wLock.unlock();
+ }
+
+ logAggregationService.stop();
+ logAggregationService.close();
+
+ // restore the original configurations to avoid side effects
+ conf.setInt(YarnConfiguration.NM_LOG_AGGREGATION_THREAD_POOL_SIZE,
+ initThreadPoolSize);
+ }
+
+ @Test
+ public void testInvalidThreadPoolSizeNaN() throws IOException {
+ testInvalidThreadPoolSizeValue("NaN");
+ }
+
+ @Test
+ public void testInvalidThreadPoolSizeNegative() throws IOException {
+ testInvalidThreadPoolSizeValue("-100");
+ }
+
+ @Test
+ public void testInvalidThreadPoolSizeXLarge() throws IOException {
+ testInvalidThreadPoolSizeValue("11111111111");
+ }
+
+ private void testInvalidThreadPoolSizeValue(final String threadPoolSize)
+ throws IOException {
+ Supplier<Boolean> isInputInvalid = new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ try {
+ int value = Integer.parseInt(threadPoolSize);
+ return value <= 0;
+ } catch (NumberFormatException ex) {
+ return true;
+ }
+ }
+ };
+
+ assertTrue("The thread pool size must be invalid to use with this " +
+ "method", isInputInvalid.get());
+
+
+ // store configured thread pool size temporarily for restoration
+ int initThreadPoolSize = conf.getInt(YarnConfiguration
+ .NM_LOG_AGGREGATION_THREAD_POOL_SIZE,
+ YarnConfiguration.DEFAULT_NM_LOG_AGGREGATION_THREAD_POOL_SIZE);
+
+ conf.set(YarnConfiguration.NM_LOG_AGGREGATION_THREAD_POOL_SIZE,
+ threadPoolSize);
+
+ DeletionService delSrvc = mock(DeletionService.class);
+
+ LocalDirsHandlerService dirSvc = mock(LocalDirsHandlerService.class);
+ when(dirSvc.getLogDirs()).thenThrow(new RuntimeException());
+
+ LogAggregationService logAggregationService =
+ new LogAggregationService(dispatcher, this.context, delSrvc, dirSvc);
+
+ logAggregationService.init(this.conf);
+ logAggregationService.start();
+
+ ThreadPoolExecutor executorService = (ThreadPoolExecutor)
+ logAggregationService.threadPool;
+ assertEquals("The thread pool size should be set to the value of YARN" +
+ ".DEFAULT_NM_LOG_AGGREGATION_THREAD_POOL_SIZE because the configured "
+ + " thread pool size is " + "invalid.",
+ YarnConfiguration.DEFAULT_NM_LOG_AGGREGATION_THREAD_POOL_SIZE,
+ executorService.getMaximumPoolSize());
+
+ logAggregationService.stop();
+ logAggregationService.close();
+
+ // retore original configuration to aviod side effects
+ conf.setInt(YarnConfiguration.NM_LOG_AGGREGATION_THREAD_POOL_SIZE,
+ initThreadPoolSize);
+ }
+
@Test(timeout=20000)
public void testStopAfterError() throws Exception {
DeletionService delSrvc = mock(DeletionService.class);