You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vv...@apache.org on 2016/02/11 07:41:26 UTC
[1/2] hadoop git commit: YARN-4655. Log uncaught exceptions/errors in
various thread pools in YARN. Contributed by Sidharta Seethana.
Repository: hadoop
Updated Branches:
refs/heads/branch-2 05b57c87f -> 3a5b39e93
refs/heads/trunk 663a80031 -> fa00d3e20
YARN-4655. Log uncaught exceptions/errors in various thread pools in YARN. Contributed by Sidharta Seethana.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/fa00d3e2
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/fa00d3e2
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/fa00d3e2
Branch: refs/heads/trunk
Commit: fa00d3e20560bee412b49e5792595749a247a8ab
Parents: 663a800
Author: Varun Vasudev <vv...@apache.org>
Authored: Thu Feb 11 12:06:42 2016 +0530
Committer: Varun Vasudev <vv...@apache.org>
Committed: Thu Feb 11 12:06:42 2016 +0530
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 ++
.../RequestHedgingRMFailoverProxyProvider.java | 4 +--
.../apache/hadoop/yarn/util/TestFSDownload.java | 15 +++++----
.../server/services/RegistryAdminService.java | 4 +--
.../server/nodemanager/DeletionService.java | 35 ++------------------
.../launcher/ContainersLauncher.java | 4 +--
.../localizer/ContainerLocalizer.java | 4 +--
.../localizer/ResourceLocalizationService.java | 8 ++---
.../sharedcache/SharedCacheUploadService.java | 4 +--
.../logaggregation/LogAggregationService.java | 4 +--
.../loghandler/NonAggregatingLogHandler.java | 3 +-
.../nodemanager/TestNodeStatusUpdater.java | 3 +-
.../scheduler/fair/TestFSLeafQueue.java | 4 +--
.../sharedcachemanager/CleanerService.java | 4 +--
.../store/InMemorySCMStore.java | 4 +--
.../store/TestInMemorySCMStore.java | 7 ++--
16 files changed, 44 insertions(+), 66 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa00d3e2/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 571549f..313a29c 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -128,6 +128,9 @@ Release 2.9.0 - UNRELEASED
YARN-4628. Display application priority in yarn top.
(Bibin A Chundatt via vvasudev)
+ YARN-4655. Log uncaught exceptions/errors in various thread pools in YARN.
+ (Sidharta Seethana via vvasudev)
+
OPTIMIZATIONS
BUG FIXES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa00d3e2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RequestHedgingRMFailoverProxyProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RequestHedgingRMFailoverProxyProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RequestHedgingRMFailoverProxyProvider.java
index dc8d19b..d076599 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RequestHedgingRMFailoverProxyProvider.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RequestHedgingRMFailoverProxyProvider.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.retry.MultiException;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryProxy;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -40,7 +41,6 @@ import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
@@ -137,7 +137,7 @@ public class RequestHedgingRMFailoverProxyProvider<T>
try {
Map<Future<Object>, ProxyInfo<T>> proxyMap = new HashMap<>();
int numAttempts = 0;
- executor = Executors.newFixedThreadPool(allProxies.size());
+ executor = HadoopExecutors.newFixedThreadPool(allProxies.size());
completionService = new ExecutorCompletionService<>(executor);
for (final ProxyInfo<T> pInfo : allProxies.values()) {
Callable<Object> c = new Callable<Object>() {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa00d3e2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java
index 3597b31..a2efb6b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java
@@ -41,7 +41,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -53,6 +52,7 @@ import java.util.zip.GZIPOutputStream;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.junit.Assert;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
@@ -273,7 +273,7 @@ public class TestFSDownload {
Map<LocalResource,Future<Path>> pending =
new HashMap<LocalResource,Future<Path>>();
- ExecutorService exec = Executors.newSingleThreadExecutor();
+ ExecutorService exec = HadoopExecutors.newSingleThreadExecutor();
LocalDirAllocator dirs =
new LocalDirAllocator(TestFSDownload.class.getName());
int size = 512;
@@ -362,7 +362,7 @@ public class TestFSDownload {
});
}
- ExecutorService exec = Executors.newFixedThreadPool(fileCount);
+ ExecutorService exec = HadoopExecutors.newFixedThreadPool(fileCount);
try {
List<Future<Boolean>> futures = exec.invokeAll(tasks);
// files should be public
@@ -399,7 +399,7 @@ public class TestFSDownload {
Map<LocalResource,Future<Path>> pending =
new HashMap<LocalResource,Future<Path>>();
- ExecutorService exec = Executors.newSingleThreadExecutor();
+ ExecutorService exec = HadoopExecutors.newSingleThreadExecutor();
LocalDirAllocator dirs =
new LocalDirAllocator(TestFSDownload.class.getName());
int[] sizes = new int[10];
@@ -468,7 +468,7 @@ public class TestFSDownload {
System.out.println("SEED: " + sharedSeed);
Map<LocalResource, Future<Path>> pending = new HashMap<LocalResource, Future<Path>>();
- ExecutorService exec = Executors.newSingleThreadExecutor();
+ ExecutorService exec = HadoopExecutors.newSingleThreadExecutor();
LocalDirAllocator dirs = new LocalDirAllocator(
TestFSDownload.class.getName());
@@ -619,7 +619,7 @@ public class TestFSDownload {
Map<LocalResource,Future<Path>> pending =
new HashMap<LocalResource,Future<Path>>();
- ExecutorService exec = Executors.newSingleThreadExecutor();
+ ExecutorService exec = HadoopExecutors.newSingleThreadExecutor();
LocalDirAllocator dirs =
new LocalDirAllocator(TestFSDownload.class.getName());
for (int i = 0; i < 5; ++i) {
@@ -674,7 +674,8 @@ public class TestFSDownload {
files.mkdir(basedir, null, true);
conf.setStrings(TestFSDownload.class.getName(), basedir.toString());
- ExecutorService singleThreadedExec = Executors.newSingleThreadExecutor();
+ ExecutorService singleThreadedExec = HadoopExecutors
+ .newSingleThreadExecutor();
LocalDirAllocator dirs =
new LocalDirAllocator(TestFSDownload.class.getName());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa00d3e2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/RegistryAdminService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/RegistryAdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/RegistryAdminService.java
index 513d7ac..7a20c24 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/RegistryAdminService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/RegistryAdminService.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.registry.client.impl.zk.RegistryOperationsService;
import org.apache.hadoop.registry.client.impl.zk.RegistrySecurity;
import org.apache.hadoop.registry.client.types.RegistryPathStatus;
import org.apache.hadoop.registry.client.types.ServiceRecord;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
@@ -53,7 +54,6 @@ import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
@@ -109,7 +109,7 @@ public class RegistryAdminService extends RegistryOperationsService {
public RegistryAdminService(String name,
RegistryBindingSource bindingSource) {
super(name, bindingSource);
- executor = Executors.newCachedThreadPool(
+ executor = HadoopExecutors.newCachedThreadPool(
new ThreadFactory() {
private AtomicInteger counter = new AtomicInteger(1);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa00d3e2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java
index 2e0cbbf..db834b2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java
@@ -29,8 +29,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.FutureTask;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
@@ -44,6 +42,7 @@ import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.concurrent.HadoopScheduledThreadPoolExecutor;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext;
@@ -116,12 +115,12 @@ public class DeletionService extends AbstractService {
.setNameFormat("DeletionService #%d")
.build();
if (conf != null) {
- sched = new DelServiceSchedThreadPoolExecutor(
+ sched = new HadoopScheduledThreadPoolExecutor(
conf.getInt(YarnConfiguration.NM_DELETE_THREAD_COUNT,
YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT), tf);
debugDelay = conf.getInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 0);
} else {
- sched = new DelServiceSchedThreadPoolExecutor(
+ sched = new HadoopScheduledThreadPoolExecutor(
YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT, tf);
}
sched.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
@@ -158,34 +157,6 @@ public class DeletionService extends AbstractService {
return getServiceState() == STATE.STOPPED && sched.isTerminated();
}
- private static class DelServiceSchedThreadPoolExecutor extends
- ScheduledThreadPoolExecutor {
- public DelServiceSchedThreadPoolExecutor(int corePoolSize,
- ThreadFactory threadFactory) {
- super(corePoolSize, threadFactory);
- }
-
- @Override
- protected void afterExecute(Runnable task, Throwable exception) {
- if (task instanceof FutureTask<?>) {
- FutureTask<?> futureTask = (FutureTask<?>) task;
- if (!futureTask.isCancelled()) {
- try {
- futureTask.get();
- } catch (ExecutionException ee) {
- exception = ee.getCause();
- } catch (InterruptedException ie) {
- exception = ie;
- }
- }
- }
- if (exception != null) {
- LOG.error("Exception during execution of task in DeletionService",
- exception);
- }
- }
- }
-
public static class FileDeletionTask implements Runnable {
public static final int INVALID_TASK_ID = -1;
private int taskId;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa00d3e2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
index 3a2649e..a34051c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
@@ -23,7 +23,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -31,6 +30,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
@@ -65,7 +65,7 @@ public class ContainersLauncher extends AbstractService
private LocalDirsHandlerService dirsHandler;
@VisibleForTesting
public ExecutorService containerLauncher =
- Executors.newCachedThreadPool(
+ HadoopExecutors.newCachedThreadPool(
new ThreadFactoryBuilder()
.setNameFormat("ContainersLauncher #%d")
.build());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa00d3e2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java
index f82f894..927699e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java
@@ -34,7 +34,6 @@ import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -52,6 +51,7 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.DiskChecker;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.SerializedException;
@@ -187,7 +187,7 @@ public class ContainerLocalizer {
}
ExecutorService createDownloadThreadPool() {
- return Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
+ return HadoopExecutors.newSingleThreadExecutor(new ThreadFactoryBuilder()
.setNameFormat("ContainerLocalizer Downloader").build());
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa00d3e2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
index c0c2e8e..b2413ad 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
@@ -43,11 +43,9 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
@@ -75,6 +73,8 @@ import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import org.apache.hadoop.util.concurrent.HadoopScheduledThreadPoolExecutor;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LocalResource;
@@ -194,7 +194,7 @@ public class ResourceLocalizationService extends CompositeService
this.delService = delService;
this.dirsHandler = dirsHandler;
- this.cacheCleanup = new ScheduledThreadPoolExecutor(1,
+ this.cacheCleanup = new HadoopScheduledThreadPoolExecutor(1,
new ThreadFactoryBuilder()
.setNameFormat("ResourceLocalizationService Cache Cleanup")
.build());
@@ -784,7 +784,7 @@ public class ResourceLocalizationService extends CompositeService
ThreadFactory tf = new ThreadFactoryBuilder()
.setNameFormat("PublicLocalizer #%d")
.build();
- return Executors.newFixedThreadPool(nThreads, tf);
+ return HadoopExecutors.newFixedThreadPool(nThreads, tf);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa00d3e2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploadService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploadService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploadService.java
index cb11f99..16c36eb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploadService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploadService.java
@@ -22,7 +22,6 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -33,6 +32,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.ipc.YarnRPC;
@@ -71,7 +71,7 @@ public class SharedCacheUploadService extends AbstractService implements
int threadCount =
conf.getInt(YarnConfiguration.SHARED_CACHE_NM_UPLOADER_THREAD_COUNT,
YarnConfiguration.DEFAULT_SHARED_CACHE_NM_UPLOADER_THREAD_COUNT);
- uploaderPool = Executors.newFixedThreadPool(threadCount,
+ uploaderPool = HadoopExecutors.newFixedThreadPool(threadCount,
new ThreadFactoryBuilder().
setNameFormat("Shared cache uploader #%d").
build());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa00d3e2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
index f64685d..6411535 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
@@ -25,7 +25,6 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
@@ -40,6 +39,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -113,7 +113,7 @@ public class LogAggregationService extends AbstractService implements
this.dirsHandler = dirsHandler;
this.appLogAggregators =
new ConcurrentHashMap<ApplicationId, AppLogAggregator>();
- this.threadPool = Executors.newCachedThreadPool(
+ this.threadPool = HadoopExecutors.newCachedThreadPool(
new ThreadFactoryBuilder()
.setNameFormat("LogAggregationService #%d")
.build());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa00d3e2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java
index 471e994..d42a4e7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.concurrent.HadoopScheduledThreadPoolExecutor;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
@@ -203,7 +204,7 @@ public class NonAggregatingLogHandler extends AbstractService implements
ThreadFactory tf =
new ThreadFactoryBuilder().setNameFormat("LogDeleter #%d").build();
sched =
- new ScheduledThreadPoolExecutor(conf.getInt(
+ new HadoopScheduledThreadPoolExecutor(conf.getInt(
YarnConfiguration.NM_LOG_DELETION_THREADS_COUNT,
YarnConfiguration.DEFAULT_NM_LOG_DELETE_THREAD_COUNT), tf);
return sched;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa00d3e2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
index 9e6868d..0d85057 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
@@ -62,6 +62,7 @@ import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier;
import org.apache.hadoop.service.Service.STATE;
import org.apache.hadoop.service.ServiceOperations;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -1758,7 +1759,7 @@ public class TestNodeStatusUpdater {
final int NUM_THREADS = 10;
final CountDownLatch allDone = new CountDownLatch(NUM_THREADS);
- final ExecutorService threadPool = Executors.newFixedThreadPool(
+ final ExecutorService threadPool = HadoopExecutors.newFixedThreadPool(
NUM_THREADS);
final AtomicBoolean stop = new AtomicBoolean(false);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa00d3e2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
index 7637410..7daccad 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
@@ -34,9 +34,9 @@ import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -339,7 +339,7 @@ public class TestFSLeafQueue extends FairSchedulerTestBase {
final List<Throwable> exceptions = Collections.synchronizedList(
new ArrayList<Throwable>());
- final ExecutorService threadPool = Executors.newFixedThreadPool(
+ final ExecutorService threadPool = HadoopExecutors.newFixedThreadPool(
testThreads);
try {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa00d3e2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/CleanerService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/CleanerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/CleanerService.java
index 6748387..60fc3e5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/CleanerService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/CleanerService.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.sharedcachemanager;
import java.io.IOException;
import java.lang.management.ManagementFactory;
-import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
@@ -37,6 +36,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.sharedcachemanager.metrics.CleanerMetrics;
@@ -80,7 +80,7 @@ public class CleanerService extends CompositeService {
// back-to-back runs
ThreadFactory tf =
new ThreadFactoryBuilder().setNameFormat("Shared cache cleaner").build();
- scheduledExecutor = Executors.newScheduledThreadPool(2, tf);
+ scheduledExecutor = HadoopExecutors.newScheduledThreadPool(2, tf);
super.serviceInit(conf);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa00d3e2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/InMemorySCMStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/InMemorySCMStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/InMemorySCMStore.java
index d2efb6a..54d736f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/InMemorySCMStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/InMemorySCMStore.java
@@ -28,7 +28,6 @@ import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
@@ -43,6 +42,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.StringInterner;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -116,7 +116,7 @@ public class InMemorySCMStore extends SCMStore {
ThreadFactory tf =
new ThreadFactoryBuilder().setNameFormat("InMemorySCMStore")
.build();
- scheduler = Executors.newSingleThreadScheduledExecutor(tf);
+ scheduler = HadoopExecutors.newSingleThreadScheduledExecutor(tf);
super.serviceInit(conf);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fa00d3e2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/TestInMemorySCMStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/TestInMemorySCMStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/TestInMemorySCMStore.java
index f934dbf..6d67ad3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/TestInMemorySCMStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/TestInMemorySCMStore.java
@@ -45,6 +45,7 @@ import java.util.concurrent.Future;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.server.sharedcachemanager.AppChecker;
import org.apache.hadoop.yarn.server.sharedcachemanager.DummyAppChecker;
@@ -121,7 +122,7 @@ public class TestInMemorySCMStore extends SCMStoreBaseTest {
startEmptyStore();
final String key = "key1";
int count = 5;
- ExecutorService exec = Executors.newFixedThreadPool(count);
+ ExecutorService exec = HadoopExecutors.newFixedThreadPool(count);
List<Future<String>> futures = new ArrayList<Future<String>>(count);
final CountDownLatch start = new CountDownLatch(1);
for (int i = 0; i < count; i++) {
@@ -197,7 +198,7 @@ public class TestInMemorySCMStore extends SCMStoreBaseTest {
// make concurrent addResourceRef calls (clients)
int count = 5;
- ExecutorService exec = Executors.newFixedThreadPool(count);
+ ExecutorService exec = HadoopExecutors.newFixedThreadPool(count);
List<Future<String>> futures = new ArrayList<Future<String>>(count);
final CountDownLatch start = new CountDownLatch(1);
for (int i = 0; i < count; i++) {
@@ -235,7 +236,7 @@ public class TestInMemorySCMStore extends SCMStoreBaseTest {
final String user = "user";
final ApplicationId id = createAppId(1, 1L);
// add the resource and add the resource ref at the same time
- ExecutorService exec = Executors.newFixedThreadPool(2);
+ ExecutorService exec = HadoopExecutors.newFixedThreadPool(2);
final CountDownLatch start = new CountDownLatch(1);
Callable<String> addKeyTask = new Callable<String>() {
public String call() throws Exception {
[2/2] hadoop git commit: YARN-4655. Log uncaught exceptions/errors in
various thread pools in YARN. Contributed by Sidharta Seethana.
Posted by vv...@apache.org.
YARN-4655. Log uncaught exceptions/errors in various thread pools in YARN. Contributed by Sidharta Seethana.
(cherry picked from commit fa00d3e20560bee412b49e5792595749a247a8ab)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3a5b39e9
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3a5b39e9
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3a5b39e9
Branch: refs/heads/branch-2
Commit: 3a5b39e931f460ceec45a80cfe0036eefa6c3096
Parents: 05b57c8
Author: Varun Vasudev <vv...@apache.org>
Authored: Thu Feb 11 12:06:42 2016 +0530
Committer: Varun Vasudev <vv...@apache.org>
Committed: Thu Feb 11 12:09:29 2016 +0530
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 ++
.../RequestHedgingRMFailoverProxyProvider.java | 4 +--
.../apache/hadoop/yarn/util/TestFSDownload.java | 15 +++++----
.../server/services/RegistryAdminService.java | 4 +--
.../server/nodemanager/DeletionService.java | 35 ++------------------
.../launcher/ContainersLauncher.java | 4 +--
.../localizer/ContainerLocalizer.java | 4 +--
.../localizer/ResourceLocalizationService.java | 8 ++---
.../sharedcache/SharedCacheUploadService.java | 4 +--
.../logaggregation/LogAggregationService.java | 4 +--
.../loghandler/NonAggregatingLogHandler.java | 3 +-
.../nodemanager/TestNodeStatusUpdater.java | 3 +-
.../scheduler/fair/TestFSLeafQueue.java | 4 +--
.../sharedcachemanager/CleanerService.java | 4 +--
.../store/InMemorySCMStore.java | 4 +--
.../store/TestInMemorySCMStore.java | 7 ++--
16 files changed, 44 insertions(+), 66 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a5b39e9/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index ed9665e..472c493 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -70,6 +70,9 @@ Release 2.9.0 - UNRELEASED
YARN-4628. Display application priority in yarn top.
(Bibin A Chundatt via vvasudev)
+ YARN-4655. Log uncaught exceptions/errors in various thread pools in YARN.
+ (Sidharta Seethana via vvasudev)
+
OPTIMIZATIONS
BUG FIXES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a5b39e9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RequestHedgingRMFailoverProxyProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RequestHedgingRMFailoverProxyProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RequestHedgingRMFailoverProxyProvider.java
index dc8d19b..d076599 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RequestHedgingRMFailoverProxyProvider.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/RequestHedgingRMFailoverProxyProvider.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.retry.MultiException;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryProxy;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.conf.HAUtil;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -40,7 +41,6 @@ import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.Future;
/**
@@ -137,7 +137,7 @@ public class RequestHedgingRMFailoverProxyProvider<T>
try {
Map<Future<Object>, ProxyInfo<T>> proxyMap = new HashMap<>();
int numAttempts = 0;
- executor = Executors.newFixedThreadPool(allProxies.size());
+ executor = HadoopExecutors.newFixedThreadPool(allProxies.size());
completionService = new ExecutorCompletionService<>(executor);
for (final ProxyInfo<T> pInfo : allProxies.values()) {
Callable<Object> c = new Callable<Object>() {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a5b39e9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java
index 3597b31..a2efb6b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestFSDownload.java
@@ -41,7 +41,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -53,6 +52,7 @@ import java.util.zip.GZIPOutputStream;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.junit.Assert;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
@@ -273,7 +273,7 @@ public class TestFSDownload {
Map<LocalResource,Future<Path>> pending =
new HashMap<LocalResource,Future<Path>>();
- ExecutorService exec = Executors.newSingleThreadExecutor();
+ ExecutorService exec = HadoopExecutors.newSingleThreadExecutor();
LocalDirAllocator dirs =
new LocalDirAllocator(TestFSDownload.class.getName());
int size = 512;
@@ -362,7 +362,7 @@ public class TestFSDownload {
});
}
- ExecutorService exec = Executors.newFixedThreadPool(fileCount);
+ ExecutorService exec = HadoopExecutors.newFixedThreadPool(fileCount);
try {
List<Future<Boolean>> futures = exec.invokeAll(tasks);
// files should be public
@@ -399,7 +399,7 @@ public class TestFSDownload {
Map<LocalResource,Future<Path>> pending =
new HashMap<LocalResource,Future<Path>>();
- ExecutorService exec = Executors.newSingleThreadExecutor();
+ ExecutorService exec = HadoopExecutors.newSingleThreadExecutor();
LocalDirAllocator dirs =
new LocalDirAllocator(TestFSDownload.class.getName());
int[] sizes = new int[10];
@@ -468,7 +468,7 @@ public class TestFSDownload {
System.out.println("SEED: " + sharedSeed);
Map<LocalResource, Future<Path>> pending = new HashMap<LocalResource, Future<Path>>();
- ExecutorService exec = Executors.newSingleThreadExecutor();
+ ExecutorService exec = HadoopExecutors.newSingleThreadExecutor();
LocalDirAllocator dirs = new LocalDirAllocator(
TestFSDownload.class.getName());
@@ -619,7 +619,7 @@ public class TestFSDownload {
Map<LocalResource,Future<Path>> pending =
new HashMap<LocalResource,Future<Path>>();
- ExecutorService exec = Executors.newSingleThreadExecutor();
+ ExecutorService exec = HadoopExecutors.newSingleThreadExecutor();
LocalDirAllocator dirs =
new LocalDirAllocator(TestFSDownload.class.getName());
for (int i = 0; i < 5; ++i) {
@@ -674,7 +674,8 @@ public class TestFSDownload {
files.mkdir(basedir, null, true);
conf.setStrings(TestFSDownload.class.getName(), basedir.toString());
- ExecutorService singleThreadedExec = Executors.newSingleThreadExecutor();
+ ExecutorService singleThreadedExec = HadoopExecutors
+ .newSingleThreadExecutor();
LocalDirAllocator dirs =
new LocalDirAllocator(TestFSDownload.class.getName());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a5b39e9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/RegistryAdminService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/RegistryAdminService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/RegistryAdminService.java
index 513d7ac..7a20c24 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/RegistryAdminService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-registry/src/main/java/org/apache/hadoop/registry/server/services/RegistryAdminService.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.registry.client.impl.zk.RegistryOperationsService;
import org.apache.hadoop.registry.client.impl.zk.RegistrySecurity;
import org.apache.hadoop.registry.client.types.RegistryPathStatus;
import org.apache.hadoop.registry.client.types.ServiceRecord;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.data.ACL;
@@ -53,7 +54,6 @@ import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
@@ -109,7 +109,7 @@ public class RegistryAdminService extends RegistryOperationsService {
public RegistryAdminService(String name,
RegistryBindingSource bindingSource) {
super(name, bindingSource);
- executor = Executors.newCachedThreadPool(
+ executor = HadoopExecutors.newCachedThreadPool(
new ThreadFactory() {
private AtomicInteger counter = new AtomicInteger(1);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a5b39e9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java
index 2e0cbbf..db834b2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/DeletionService.java
@@ -29,8 +29,6 @@ import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.FutureTask;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
@@ -44,6 +42,7 @@ import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.concurrent.HadoopScheduledThreadPoolExecutor;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.DeletionServiceDeleteTaskProto;
import org.apache.hadoop.yarn.server.nodemanager.executor.DeletionAsUserContext;
@@ -116,12 +115,12 @@ public class DeletionService extends AbstractService {
.setNameFormat("DeletionService #%d")
.build();
if (conf != null) {
- sched = new DelServiceSchedThreadPoolExecutor(
+ sched = new HadoopScheduledThreadPoolExecutor(
conf.getInt(YarnConfiguration.NM_DELETE_THREAD_COUNT,
YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT), tf);
debugDelay = conf.getInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 0);
} else {
- sched = new DelServiceSchedThreadPoolExecutor(
+ sched = new HadoopScheduledThreadPoolExecutor(
YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT, tf);
}
sched.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
@@ -158,34 +157,6 @@ public class DeletionService extends AbstractService {
return getServiceState() == STATE.STOPPED && sched.isTerminated();
}
- private static class DelServiceSchedThreadPoolExecutor extends
- ScheduledThreadPoolExecutor {
- public DelServiceSchedThreadPoolExecutor(int corePoolSize,
- ThreadFactory threadFactory) {
- super(corePoolSize, threadFactory);
- }
-
- @Override
- protected void afterExecute(Runnable task, Throwable exception) {
- if (task instanceof FutureTask<?>) {
- FutureTask<?> futureTask = (FutureTask<?>) task;
- if (!futureTask.isCancelled()) {
- try {
- futureTask.get();
- } catch (ExecutionException ee) {
- exception = ee.getCause();
- } catch (InterruptedException ie) {
- exception = ie;
- }
- }
- }
- if (exception != null) {
- LOG.error("Exception during execution of task in DeletionService",
- exception);
- }
- }
- }
-
public static class FileDeletionTask implements Runnable {
public static final int INVALID_TASK_ID = -1;
private int taskId;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a5b39e9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
index 3a2649e..a34051c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/launcher/ContainersLauncher.java
@@ -23,7 +23,6 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -31,6 +30,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.event.Dispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
@@ -65,7 +65,7 @@ public class ContainersLauncher extends AbstractService
private LocalDirsHandlerService dirsHandler;
@VisibleForTesting
public ExecutorService containerLauncher =
- Executors.newCachedThreadPool(
+ HadoopExecutors.newCachedThreadPool(
new ThreadFactoryBuilder()
.setNameFormat("ContainersLauncher #%d")
.build());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a5b39e9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java
index f82f894..927699e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ContainerLocalizer.java
@@ -34,7 +34,6 @@ import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -52,6 +51,7 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.DiskChecker;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.SerializedException;
@@ -187,7 +187,7 @@ public class ContainerLocalizer {
}
ExecutorService createDownloadThreadPool() {
- return Executors.newSingleThreadExecutor(new ThreadFactoryBuilder()
+ return HadoopExecutors.newSingleThreadExecutor(new ThreadFactoryBuilder()
.setNameFormat("ContainerLocalizer Downloader").build());
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a5b39e9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
index c0c2e8e..b2413ad 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/ResourceLocalizationService.java
@@ -43,11 +43,9 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
@@ -75,6 +73,8 @@ import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
+import org.apache.hadoop.util.concurrent.HadoopScheduledThreadPoolExecutor;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LocalResource;
@@ -194,7 +194,7 @@ public class ResourceLocalizationService extends CompositeService
this.delService = delService;
this.dirsHandler = dirsHandler;
- this.cacheCleanup = new ScheduledThreadPoolExecutor(1,
+ this.cacheCleanup = new HadoopScheduledThreadPoolExecutor(1,
new ThreadFactoryBuilder()
.setNameFormat("ResourceLocalizationService Cache Cleanup")
.build());
@@ -784,7 +784,7 @@ public class ResourceLocalizationService extends CompositeService
ThreadFactory tf = new ThreadFactoryBuilder()
.setNameFormat("PublicLocalizer #%d")
.build();
- return Executors.newFixedThreadPool(nThreads, tf);
+ return HadoopExecutors.newFixedThreadPool(nThreads, tf);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a5b39e9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploadService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploadService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploadService.java
index cb11f99..16c36eb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploadService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/localizer/sharedcache/SharedCacheUploadService.java
@@ -22,7 +22,6 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Map;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -33,6 +32,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.ipc.YarnRPC;
@@ -71,7 +71,7 @@ public class SharedCacheUploadService extends AbstractService implements
int threadCount =
conf.getInt(YarnConfiguration.SHARED_CACHE_NM_UPLOADER_THREAD_COUNT,
YarnConfiguration.DEFAULT_SHARED_CACHE_NM_UPLOADER_THREAD_COUNT);
- uploaderPool = Executors.newFixedThreadPool(threadCount,
+ uploaderPool = HadoopExecutors.newFixedThreadPool(threadCount,
new ThreadFactoryBuilder().
setNameFormat("Shared cache uploader #%d").
build());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a5b39e9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
index f64685d..6411535 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
@@ -25,7 +25,6 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
@@ -40,6 +39,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -113,7 +113,7 @@ public class LogAggregationService extends AbstractService implements
this.dirsHandler = dirsHandler;
this.appLogAggregators =
new ConcurrentHashMap<ApplicationId, AppLogAggregator>();
- this.threadPool = Executors.newCachedThreadPool(
+ this.threadPool = HadoopExecutors.newCachedThreadPool(
new ThreadFactoryBuilder()
.setNameFormat("LogAggregationService #%d")
.build());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a5b39e9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java
index 471e994..d42a4e7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/loghandler/NonAggregatingLogHandler.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.concurrent.HadoopScheduledThreadPoolExecutor;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.Dispatcher;
@@ -203,7 +204,7 @@ public class NonAggregatingLogHandler extends AbstractService implements
ThreadFactory tf =
new ThreadFactoryBuilder().setNameFormat("LogDeleter #%d").build();
sched =
- new ScheduledThreadPoolExecutor(conf.getInt(
+ new HadoopScheduledThreadPoolExecutor(conf.getInt(
YarnConfiguration.NM_LOG_DELETION_THREADS_COUNT,
YarnConfiguration.DEFAULT_NM_LOG_DELETE_THREAD_COUNT), tf);
return sched;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a5b39e9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
index 9e6868d..0d85057 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
@@ -62,6 +62,7 @@ import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.token.delegation.web.DelegationTokenIdentifier;
import org.apache.hadoop.service.Service.STATE;
import org.apache.hadoop.service.ServiceOperations;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -1758,7 +1759,7 @@ public class TestNodeStatusUpdater {
final int NUM_THREADS = 10;
final CountDownLatch allDone = new CountDownLatch(NUM_THREADS);
- final ExecutorService threadPool = Executors.newFixedThreadPool(
+ final ExecutorService threadPool = HadoopExecutors.newFixedThreadPool(
NUM_THREADS);
final AtomicBoolean stop = new AtomicBoolean(false);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a5b39e9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
index 7637410..7daccad 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
@@ -34,9 +34,9 @@ import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -339,7 +339,7 @@ public class TestFSLeafQueue extends FairSchedulerTestBase {
final List<Throwable> exceptions = Collections.synchronizedList(
new ArrayList<Throwable>());
- final ExecutorService threadPool = Executors.newFixedThreadPool(
+ final ExecutorService threadPool = HadoopExecutors.newFixedThreadPool(
testThreads);
try {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a5b39e9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/CleanerService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/CleanerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/CleanerService.java
index 6748387..60fc3e5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/CleanerService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/CleanerService.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.sharedcachemanager;
import java.io.IOException;
import java.lang.management.ManagementFactory;
-import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
@@ -37,6 +36,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.service.CompositeService;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.sharedcachemanager.metrics.CleanerMetrics;
@@ -80,7 +80,7 @@ public class CleanerService extends CompositeService {
// back-to-back runs
ThreadFactory tf =
new ThreadFactoryBuilder().setNameFormat("Shared cache cleaner").build();
- scheduledExecutor = Executors.newScheduledThreadPool(2, tf);
+ scheduledExecutor = HadoopExecutors.newScheduledThreadPool(2, tf);
super.serviceInit(conf);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a5b39e9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/InMemorySCMStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/InMemorySCMStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/InMemorySCMStore.java
index d2efb6a..54d736f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/InMemorySCMStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/main/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/InMemorySCMStore.java
@@ -28,7 +28,6 @@ import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
@@ -43,6 +42,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.StringInterner;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
@@ -116,7 +116,7 @@ public class InMemorySCMStore extends SCMStore {
ThreadFactory tf =
new ThreadFactoryBuilder().setNameFormat("InMemorySCMStore")
.build();
- scheduler = Executors.newSingleThreadScheduledExecutor(tf);
+ scheduler = HadoopExecutors.newSingleThreadScheduledExecutor(tf);
super.serviceInit(conf);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3a5b39e9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/TestInMemorySCMStore.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/TestInMemorySCMStore.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/TestInMemorySCMStore.java
index f934dbf..6d67ad3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/TestInMemorySCMStore.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-sharedcachemanager/src/test/java/org/apache/hadoop/yarn/server/sharedcachemanager/store/TestInMemorySCMStore.java
@@ -45,6 +45,7 @@ import java.util.concurrent.Future;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.server.sharedcachemanager.AppChecker;
import org.apache.hadoop.yarn.server.sharedcachemanager.DummyAppChecker;
@@ -121,7 +122,7 @@ public class TestInMemorySCMStore extends SCMStoreBaseTest {
startEmptyStore();
final String key = "key1";
int count = 5;
- ExecutorService exec = Executors.newFixedThreadPool(count);
+ ExecutorService exec = HadoopExecutors.newFixedThreadPool(count);
List<Future<String>> futures = new ArrayList<Future<String>>(count);
final CountDownLatch start = new CountDownLatch(1);
for (int i = 0; i < count; i++) {
@@ -197,7 +198,7 @@ public class TestInMemorySCMStore extends SCMStoreBaseTest {
// make concurrent addResourceRef calls (clients)
int count = 5;
- ExecutorService exec = Executors.newFixedThreadPool(count);
+ ExecutorService exec = HadoopExecutors.newFixedThreadPool(count);
List<Future<String>> futures = new ArrayList<Future<String>>(count);
final CountDownLatch start = new CountDownLatch(1);
for (int i = 0; i < count; i++) {
@@ -235,7 +236,7 @@ public class TestInMemorySCMStore extends SCMStoreBaseTest {
final String user = "user";
final ApplicationId id = createAppId(1, 1L);
// add the resource and add the resource ref at the same time
- ExecutorService exec = Executors.newFixedThreadPool(2);
+ ExecutorService exec = HadoopExecutors.newFixedThreadPool(2);
final CountDownLatch start = new CountDownLatch(1);
Callable<String> addKeyTask = new Callable<String>() {
public String call() throws Exception {