You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vv...@apache.org on 2016/02/18 09:49:30 UTC

[1/2] hadoop git commit: MAPREDUCE-6634. Log uncaught exceptions/errors in various thread pools in mapreduce. Contributed by Sidharta Seethana.

Repository: hadoop
Updated Branches:
  refs/heads/branch-2 d4203c9aa -> 212c519ad
  refs/heads/trunk c1afac3a9 -> 2440671a1


MAPREDUCE-6634. Log uncaught exceptions/errors in various thread pools in mapreduce. Contributed by Sidharta Seethana.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2440671a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2440671a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2440671a

Branch: refs/heads/trunk
Commit: 2440671a117f165dcda5056404bc898df3c50803
Parents: c1afac3
Author: Varun Vasudev <vv...@apache.org>
Authored: Thu Feb 18 14:15:08 2016 +0530
Committer: Varun Vasudev <vv...@apache.org>
Committed: Thu Feb 18 14:18:36 2016 +0530

----------------------------------------------------------------------
 hadoop-mapreduce-project/CHANGES.txt            |  3 ++
 .../hadoop/mapred/LocalContainerLauncher.java   |  4 +--
 .../v2/app/commit/CommitterEventHandler.java    |  3 +-
 .../mapreduce/v2/app/job/impl/JobImpl.java      |  3 +-
 .../v2/app/launcher/ContainerLauncherImpl.java  |  3 +-
 .../mapred/LocalDistributedCacheManager.java    |  5 ++--
 .../apache/hadoop/mapred/LocalJobRunner.java    |  8 +++--
 .../hadoop/mapred/LocatedFileStatusFetcher.java |  4 +--
 .../java/org/apache/hadoop/mapred/TaskLog.java  | 31 ++++++++++----------
 .../mapred/lib/MultithreadedMapRunner.java      |  4 ++-
 .../lib/output/TestFileOutputCommitter.java     |  4 +--
 .../mapreduce/v2/hs/HistoryFileManager.java     |  6 ++--
 .../hadoop/mapreduce/v2/hs/JobHistory.java      |  3 +-
 .../apache/hadoop/mapred/ShuffleHandler.java    |  6 ++--
 .../org/apache/hadoop/examples/pi/Util.java     |  5 ++--
 15 files changed, 53 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/2440671a/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index 0b8c818..da28bc6 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -310,6 +310,9 @@ Release 2.9.0 - UNRELEASED
 
     MAPREDUCE-6431. JobClient should be an AutoClosable (haibochen via rkanter)
 
+    MAPREDUCE-6634. Log uncaught exceptions/errors in various thread pools in
+    mapreduce. (Sidharta Seethana via vvasudev)
+
   OPTIMIZATIONS
 
   BUG FIXES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2440671a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java
index 1a0d5fb..da118c5 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java
@@ -26,7 +26,6 @@ import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 
@@ -60,6 +59,7 @@ import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.util.ShutdownHookManager;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 
@@ -138,7 +138,7 @@ public class LocalContainerLauncher extends AbstractService implements
     // make it a daemon thread so that the process can exit even if the task is
     // not interruptible
     taskRunner =
-        Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().
+        HadoopExecutors.newSingleThreadExecutor(new ThreadFactoryBuilder().
             setDaemon(true).setNameFormat("uber-SubtaskRunner").build());
     // create and start an event handling thread
     eventHandler = new Thread(new EventHandler(), "uber-EventHandler");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2440671a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java
index b53955f..0b1be70 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.concurrent.HadoopThreadPoolExecutor;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 
@@ -133,7 +134,7 @@ public class CommitterEventHandler extends AbstractService
       tfBuilder.setThreadFactory(backingTf);
     }
     ThreadFactory tf = tfBuilder.build();
-    launcherPool = new ThreadPoolExecutor(5, 5, 1,
+    launcherPool = new HadoopThreadPoolExecutor(5, 5, 1,
         TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf);
     eventHandlingThread = new Thread(new Runnable() {
       @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2440671a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
index 5ed0762..c8c5ce9 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
@@ -116,6 +116,7 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.concurrent.HadoopScheduledThreadPoolExecutor;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeReport;
@@ -698,7 +699,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
       .setNameFormat("Job Fail Wait Timeout Monitor #%d")
       .setDaemon(true)
       .build();
-    this.executor = new ScheduledThreadPoolExecutor(1, threadFactory);
+    this.executor = new HadoopScheduledThreadPoolExecutor(1, threadFactory);
 
     // This "this leak" is okay because the retained pointer is in an
     //  instance variable.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2440671a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
index a7e966c..189e2ef 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.concurrent.HadoopThreadPoolExecutor;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
@@ -266,7 +267,7 @@ public class ContainerLauncherImpl extends AbstractService implements
         "ContainerLauncher #%d").setDaemon(true).build();
 
     // Start with a default core-pool size of 10 and change it dynamically.
-    launcherPool = new ThreadPoolExecutor(initialPoolSize,
+    launcherPool = new HadoopThreadPoolExecutor(initialPoolSize,
         Integer.MAX_VALUE, 1, TimeUnit.HOURS,
         new LinkedBlockingQueue<Runnable>(),
         tf);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2440671a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java
index 8606ede..3b87197 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java
@@ -35,7 +35,6 @@ import java.util.Map.Entry;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicLong;
@@ -43,7 +42,6 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
@@ -53,6 +51,7 @@ import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -121,7 +120,7 @@ class LocalDistributedCacheManager {
       ThreadFactory tf = new ThreadFactoryBuilder()
       .setNameFormat("LocalDistributedCacheManager Downloader #%d")
       .build();
-      exec = Executors.newCachedThreadPool(tf);
+      exec = HadoopExecutors.newCachedThreadPool(tf);
       Path destPath = localDirAllocator.getLocalPathForWrite(".", conf);
       Map<LocalResource, Future<Path>> resourcesToPaths = Maps.newHashMap();
       for (LocalResource resource : localResources.values()) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2440671a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
index 45d3cc5..37c147d 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
@@ -32,7 +32,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -74,6 +73,7 @@ import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.ReflectionUtils;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
 
 /** Implements MapReduce locally, in-process, for debugging. */
 @InterfaceAudience.Private
@@ -428,7 +428,8 @@ public class LocalJobRunner implements ClientProtocol {
       ThreadFactory tf = new ThreadFactoryBuilder()
         .setNameFormat("LocalJobRunner Map Task Executor #%d")
         .build();
-      ExecutorService executor = Executors.newFixedThreadPool(maxMapThreads, tf);
+      ExecutorService executor = HadoopExecutors.newFixedThreadPool(
+          maxMapThreads, tf);
 
       return executor;
     }
@@ -454,7 +455,8 @@ public class LocalJobRunner implements ClientProtocol {
       LOG.debug("Reduce tasks to process: " + this.numReduceTasks);
 
       // Create a new executor service to drain the work queue.
-      ExecutorService executor = Executors.newFixedThreadPool(maxReduceThreads);
+      ExecutorService executor = HadoopExecutors.newFixedThreadPool(
+          maxReduceThreads);
 
       return executor;
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2440671a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LocatedFileStatusFetcher.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LocatedFileStatusFetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LocatedFileStatusFetcher.java
index 87114ad..a039bc9 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LocatedFileStatusFetcher.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LocatedFileStatusFetcher.java
@@ -24,7 +24,6 @@ import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Condition;
@@ -47,6 +46,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
 
 /**
  * Utility class to fetch block locations for specified Input paths using a
@@ -92,7 +92,7 @@ public class LocatedFileStatusFetcher {
       IOException {
     int numThreads = conf.getInt(FileInputFormat.LIST_STATUS_NUM_THREADS,
         FileInputFormat.DEFAULT_LIST_STATUS_NUM_THREADS);
-    rawExec = Executors.newFixedThreadPool(
+    rawExec = HadoopExecutors.newFixedThreadPool(
         numThreads,
         new ThreadFactoryBuilder().setDaemon(true)
             .setNameFormat("GetFileInfo #%d").build());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2440671a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java
index e07b5be..bf838c2 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.mapreduce.util.ProcessTree;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.ShutdownHookManager;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.log4j.Appender;
 import org.apache.log4j.LogManager;
@@ -327,22 +328,22 @@ public class TaskLog {
 
   public static ScheduledExecutorService createLogSyncer() {
     final ScheduledExecutorService scheduler =
-      Executors.newSingleThreadScheduledExecutor(
-        new ThreadFactory() {
-          @Override
-          public Thread newThread(Runnable r) {
-            final Thread t = Executors.defaultThreadFactory().newThread(r);
-            t.setDaemon(true);
-            t.setName("Thread for syncLogs");
-            return t;
-          }
-        });
+        HadoopExecutors.newSingleThreadScheduledExecutor(
+            new ThreadFactory() {
+              @Override
+              public Thread newThread(Runnable r) {
+                final Thread t = Executors.defaultThreadFactory().newThread(r);
+                t.setDaemon(true);
+                t.setName("Thread for syncLogs");
+                return t;
+              }
+            });
     ShutdownHookManager.get().addShutdownHook(new Runnable() {
-        @Override
-        public void run() {
-          TaskLog.syncLogsShutdown(scheduler);
-        }
-      }, 50);
+      @Override
+      public void run() {
+        TaskLog.syncLogsShutdown(scheduler);
+      }
+    }, 50);
     scheduler.scheduleWithFixedDelay(
         new Runnable() {
           @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2440671a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java
index 98d794b..05339bc 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.mapred.SkipBadRecords;
 import org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.concurrent.HadoopThreadPoolExecutor;
 
 import java.io.IOException;
 import java.util.concurrent.*;
@@ -84,7 +85,8 @@ public class MultithreadedMapRunner<K1, V1, K2, V2>
 
     // Creating a threadpool of the configured size to execute the Mapper
     // map method in parallel.
-    executorService = new ThreadPoolExecutor(numberOfThreads, numberOfThreads, 
+    executorService = new HadoopThreadPoolExecutor(numberOfThreads,
+        numberOfThreads,
                                              0L, TimeUnit.MILLISECONDS,
                                              new BlockingArrayQueue
                                                (numberOfThreads));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2440671a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java
index eba513b..20d8ab5 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java
@@ -25,10 +25,10 @@ import java.io.IOException;
 import java.net.URI;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import junit.framework.TestCase;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
 import org.junit.Assert;
 
 import org.apache.commons.logging.Log;
@@ -696,7 +696,7 @@ public class TestFileOutputCommitter extends TestCase {
       };
     }
 
-    final ExecutorService executor = Executors.newFixedThreadPool(2);
+    final ExecutorService executor = HadoopExecutors.newFixedThreadPool(2);
     try {
       for (int i = 0; i < taCtx.length; i++) {
         final int taskIdx = i;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2440671a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java
index 6be0d27..677d5c2 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java
@@ -66,6 +66,7 @@ import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.util.ShutdownThreadsHelper;
+import org.apache.hadoop.util.concurrent.HadoopThreadPoolExecutor;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -554,8 +555,9 @@ public class HistoryFileManager extends AbstractService {
         JHAdminConfig.DEFAULT_MR_HISTORY_MOVE_THREAD_COUNT);
     ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat(
         "MoveIntermediateToDone Thread #%d").build();
-    moveToDoneExecutor = new ThreadPoolExecutor(numMoveThreads, numMoveThreads,
-        1, TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf);
+    moveToDoneExecutor = new HadoopThreadPoolExecutor(numMoveThreads,
+        numMoveThreads, 1, TimeUnit.HOURS,
+        new LinkedBlockingQueue<Runnable>(), tf);
 
     super.serviceInit(conf);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2440671a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
index 41bc90a..45075c9 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.service.Service;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.concurrent.HadoopScheduledThreadPoolExecutor;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.event.EventHandler;
@@ -126,7 +127,7 @@ public class JobHistory extends AbstractService implements HistoryContext {
       ((Service) storage).start();
     }
 
-    scheduledExecutor = new ScheduledThreadPoolExecutor(2,
+    scheduledExecutor = new HadoopScheduledThreadPoolExecutor(2,
         new ThreadFactoryBuilder().setNameFormat("Log Scanner/Cleaner #%d")
             .build());
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2440671a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
index 2fb7811..0d6e900 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
@@ -46,7 +46,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -81,6 +80,7 @@ import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
 import org.apache.hadoop.security.ssl.SSLFactory;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
@@ -475,8 +475,8 @@ public class ShuffleHandler extends AuxiliaryService {
       .build();
     
     selector = new NioServerSocketChannelFactory(
-        Executors.newCachedThreadPool(bossFactory),
-        Executors.newCachedThreadPool(workerFactory),
+        HadoopExecutors.newCachedThreadPool(bossFactory),
+        HadoopExecutors.newCachedThreadPool(workerFactory),
         maxShuffleThreads);
     super.serviceInit(new Configuration(conf));
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/2440671a/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/pi/Util.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/pi/Util.java b/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/pi/Util.java
index 8afc1bd..e74c091 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/pi/Util.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/pi/Util.java
@@ -35,7 +35,6 @@ import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.Semaphore;
 
@@ -48,6 +47,7 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.util.ToolRunner;
 
 import com.google.common.base.Charsets;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
 
 /** Utility methods */
 public class Util {
@@ -157,7 +157,8 @@ public class Util {
   /** Execute the callables by a number of threads */
   public static <T, E extends Callable<T>> void execute(int nThreads, List<E> callables
       ) throws InterruptedException, ExecutionException {
-    final ExecutorService executor = Executors.newFixedThreadPool(nThreads); 
+    final ExecutorService executor = HadoopExecutors.newFixedThreadPool(
+        nThreads);
     final List<Future<T>> futures = executor.invokeAll(callables);
     for(Future<T> f : futures)
       f.get();


[2/2] hadoop git commit: MAPREDUCE-6634. Log uncaught exceptions/errors in various thread pools in mapreduce. Contributed by Sidharta Seethana.

Posted by vv...@apache.org.
MAPREDUCE-6634. Log uncaught exceptions/errors in various thread pools in mapreduce. Contributed by Sidharta Seethana.

(cherry picked from commit c50d1e54000c76880a041ce5959a2eb23c86bd35)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/212c519a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/212c519a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/212c519a

Branch: refs/heads/branch-2
Commit: 212c519ad3461068913c18b2ba03714934bb6200
Parents: d4203c9
Author: Varun Vasudev <vv...@apache.org>
Authored: Thu Feb 18 14:15:08 2016 +0530
Committer: Varun Vasudev <vv...@apache.org>
Committed: Thu Feb 18 14:19:01 2016 +0530

----------------------------------------------------------------------
 hadoop-mapreduce-project/CHANGES.txt            |  3 ++
 .../hadoop/mapred/LocalContainerLauncher.java   |  4 +--
 .../v2/app/commit/CommitterEventHandler.java    |  3 +-
 .../mapreduce/v2/app/job/impl/JobImpl.java      |  3 +-
 .../v2/app/launcher/ContainerLauncherImpl.java  |  3 +-
 .../mapred/LocalDistributedCacheManager.java    |  5 ++--
 .../apache/hadoop/mapred/LocalJobRunner.java    |  8 +++--
 .../hadoop/mapred/LocatedFileStatusFetcher.java |  4 +--
 .../java/org/apache/hadoop/mapred/TaskLog.java  | 31 ++++++++++----------
 .../mapred/lib/MultithreadedMapRunner.java      |  4 ++-
 .../lib/output/TestFileOutputCommitter.java     |  4 +--
 .../mapreduce/v2/hs/HistoryFileManager.java     |  6 ++--
 .../hadoop/mapreduce/v2/hs/JobHistory.java      |  3 +-
 .../apache/hadoop/mapred/ShuffleHandler.java    |  6 ++--
 .../org/apache/hadoop/examples/pi/Util.java     |  5 ++--
 15 files changed, 53 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/212c519a/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index c34ae70..cf3e3a3 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -10,6 +10,9 @@ Release 2.9.0 - UNRELEASED
 
     MAPREDUCE-6431. JobClient should be an AutoClosable (haibochen via rkanter)
 
+    MAPREDUCE-6634. Log uncaught exceptions/errors in various thread pools in
+    mapreduce. (Sidharta Seethana via vvasudev)
+
   OPTIMIZATIONS
 
   BUG FIXES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/212c519a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java
index 1a0d5fb..da118c5 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java
@@ -26,7 +26,6 @@ import java.util.Map;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 
@@ -60,6 +59,7 @@ import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.util.ShutdownHookManager;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 
@@ -138,7 +138,7 @@ public class LocalContainerLauncher extends AbstractService implements
     // make it a daemon thread so that the process can exit even if the task is
     // not interruptible
     taskRunner =
-        Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().
+        HadoopExecutors.newSingleThreadExecutor(new ThreadFactoryBuilder().
             setDaemon(true).setNameFormat("uber-SubtaskRunner").build());
     // create and start an event handling thread
     eventHandler = new Thread(new EventHandler(), "uber-EventHandler");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/212c519a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java
index b53955f..0b1be70 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.concurrent.HadoopThreadPoolExecutor;
 import org.apache.hadoop.yarn.event.EventHandler;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 
@@ -133,7 +134,7 @@ public class CommitterEventHandler extends AbstractService
       tfBuilder.setThreadFactory(backingTf);
     }
     ThreadFactory tf = tfBuilder.build();
-    launcherPool = new ThreadPoolExecutor(5, 5, 1,
+    launcherPool = new HadoopThreadPoolExecutor(5, 5, 1,
         TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf);
     eventHandlingThread = new Thread(new Runnable() {
       @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/212c519a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
index b11745b..77c2b24 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
@@ -116,6 +116,7 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.concurrent.HadoopScheduledThreadPoolExecutor;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeReport;
@@ -698,7 +699,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
       .setNameFormat("Job Fail Wait Timeout Monitor #%d")
       .setDaemon(true)
       .build();
-    this.executor = new ScheduledThreadPoolExecutor(1, threadFactory);
+    this.executor = new HadoopScheduledThreadPoolExecutor(1, threadFactory);
 
     // This "this leak" is okay because the retained pointer is in an
     //  instance variable.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/212c519a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
index a7e966c..189e2ef 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
 import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.concurrent.HadoopThreadPoolExecutor;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
@@ -266,7 +267,7 @@ public class ContainerLauncherImpl extends AbstractService implements
         "ContainerLauncher #%d").setDaemon(true).build();
 
     // Start with a default core-pool size of 10 and change it dynamically.
-    launcherPool = new ThreadPoolExecutor(initialPoolSize,
+    launcherPool = new HadoopThreadPoolExecutor(initialPoolSize,
         Integer.MAX_VALUE, 1, TimeUnit.HOURS,
         new LinkedBlockingQueue<Runnable>(),
         tf);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/212c519a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java
index 8606ede..3b87197 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java
@@ -35,7 +35,6 @@ import java.util.Map.Entry;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.atomic.AtomicLong;
@@ -43,7 +42,6 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.LocalDirAllocator;
 import org.apache.hadoop.fs.Path;
@@ -53,6 +51,7 @@ import org.apache.hadoop.mapreduce.filecache.DistributedCache;
 import org.apache.hadoop.mapreduce.v2.util.MRApps;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -121,7 +120,7 @@ class LocalDistributedCacheManager {
       ThreadFactory tf = new ThreadFactoryBuilder()
       .setNameFormat("LocalDistributedCacheManager Downloader #%d")
       .build();
-      exec = Executors.newCachedThreadPool(tf);
+      exec = HadoopExecutors.newCachedThreadPool(tf);
       Path destPath = localDirAllocator.getLocalPathForWrite(".", conf);
       Map<LocalResource, Future<Path>> resourcesToPaths = Maps.newHashMap();
       for (LocalResource resource : localResources.values()) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/212c519a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
index 5feb2fe..c4cd5cd 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
@@ -32,7 +32,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Random;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -73,6 +72,7 @@ import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.ReflectionUtils;
 
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
 
 /** Implements MapReduce locally, in-process, for debugging. */
 @InterfaceAudience.Private
@@ -427,7 +427,8 @@ public class LocalJobRunner implements ClientProtocol {
       ThreadFactory tf = new ThreadFactoryBuilder()
         .setNameFormat("LocalJobRunner Map Task Executor #%d")
         .build();
-      ExecutorService executor = Executors.newFixedThreadPool(maxMapThreads, tf);
+      ExecutorService executor = HadoopExecutors.newFixedThreadPool(
+          maxMapThreads, tf);
 
       return executor;
     }
@@ -453,7 +454,8 @@ public class LocalJobRunner implements ClientProtocol {
       LOG.debug("Reduce tasks to process: " + this.numReduceTasks);
 
       // Create a new executor service to drain the work queue.
-      ExecutorService executor = Executors.newFixedThreadPool(maxReduceThreads);
+      ExecutorService executor = HadoopExecutors.newFixedThreadPool(
+          maxReduceThreads);
 
       return executor;
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/212c519a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LocatedFileStatusFetcher.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LocatedFileStatusFetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LocatedFileStatusFetcher.java
index 87114ad..a039bc9 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LocatedFileStatusFetcher.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LocatedFileStatusFetcher.java
@@ -24,7 +24,6 @@ import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Condition;
@@ -47,6 +46,7 @@ import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
 
 /**
  * Utility class to fetch block locations for specified Input paths using a
@@ -92,7 +92,7 @@ public class LocatedFileStatusFetcher {
       IOException {
     int numThreads = conf.getInt(FileInputFormat.LIST_STATUS_NUM_THREADS,
         FileInputFormat.DEFAULT_LIST_STATUS_NUM_THREADS);
-    rawExec = Executors.newFixedThreadPool(
+    rawExec = HadoopExecutors.newFixedThreadPool(
         numThreads,
         new ThreadFactoryBuilder().setDaemon(true)
             .setNameFormat("GetFileInfo #%d").build());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/212c519a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java
index e07b5be..bf838c2 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.mapreduce.util.ProcessTree;
 import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.ShutdownHookManager;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.log4j.Appender;
 import org.apache.log4j.LogManager;
@@ -327,22 +328,22 @@ public class TaskLog {
 
   public static ScheduledExecutorService createLogSyncer() {
     final ScheduledExecutorService scheduler =
-      Executors.newSingleThreadScheduledExecutor(
-        new ThreadFactory() {
-          @Override
-          public Thread newThread(Runnable r) {
-            final Thread t = Executors.defaultThreadFactory().newThread(r);
-            t.setDaemon(true);
-            t.setName("Thread for syncLogs");
-            return t;
-          }
-        });
+        HadoopExecutors.newSingleThreadScheduledExecutor(
+            new ThreadFactory() {
+              @Override
+              public Thread newThread(Runnable r) {
+                final Thread t = Executors.defaultThreadFactory().newThread(r);
+                t.setDaemon(true);
+                t.setName("Thread for syncLogs");
+                return t;
+              }
+            });
     ShutdownHookManager.get().addShutdownHook(new Runnable() {
-        @Override
-        public void run() {
-          TaskLog.syncLogsShutdown(scheduler);
-        }
-      }, 50);
+      @Override
+      public void run() {
+        TaskLog.syncLogsShutdown(scheduler);
+      }
+    }, 50);
     scheduler.scheduleWithFixedDelay(
         new Runnable() {
           @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/212c519a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java
index 98d794b..05339bc 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.mapred.SkipBadRecords;
 import org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.concurrent.HadoopThreadPoolExecutor;
 
 import java.io.IOException;
 import java.util.concurrent.*;
@@ -84,7 +85,8 @@ public class MultithreadedMapRunner<K1, V1, K2, V2>
 
     // Creating a threadpool of the configured size to execute the Mapper
     // map method in parallel.
-    executorService = new ThreadPoolExecutor(numberOfThreads, numberOfThreads, 
+    executorService = new HadoopThreadPoolExecutor(numberOfThreads,
+        numberOfThreads,
                                              0L, TimeUnit.MILLISECONDS,
                                              new BlockingArrayQueue
                                                (numberOfThreads));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/212c519a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java
index eba513b..20d8ab5 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java
@@ -25,10 +25,10 @@ import java.io.IOException;
 import java.net.URI;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 
 import junit.framework.TestCase;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
 import org.junit.Assert;
 
 import org.apache.commons.logging.Log;
@@ -696,7 +696,7 @@ public class TestFileOutputCommitter extends TestCase {
       };
     }
 
-    final ExecutorService executor = Executors.newFixedThreadPool(2);
+    final ExecutorService executor = HadoopExecutors.newFixedThreadPool(2);
     try {
       for (int i = 0; i < taCtx.length; i++) {
         final int taskIdx = i;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/212c519a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java
index 92ec96a..c7eb86e 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java
@@ -66,6 +66,7 @@ import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.util.ShutdownThreadsHelper;
+import org.apache.hadoop.util.concurrent.HadoopThreadPoolExecutor;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -554,8 +555,9 @@ public class HistoryFileManager extends AbstractService {
         JHAdminConfig.DEFAULT_MR_HISTORY_MOVE_THREAD_COUNT);
     ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat(
         "MoveIntermediateToDone Thread #%d").build();
-    moveToDoneExecutor = new ThreadPoolExecutor(numMoveThreads, numMoveThreads,
-        1, TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf);
+    moveToDoneExecutor = new HadoopThreadPoolExecutor(numMoveThreads,
+        numMoveThreads, 1, TimeUnit.HOURS,
+        new LinkedBlockingQueue<Runnable>(), tf);
 
     super.serviceInit(conf);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/212c519a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
index 41bc90a..45075c9 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.hadoop.service.Service;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.concurrent.HadoopScheduledThreadPoolExecutor;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.event.EventHandler;
@@ -126,7 +127,7 @@ public class JobHistory extends AbstractService implements HistoryContext {
       ((Service) storage).start();
     }
 
-    scheduledExecutor = new ScheduledThreadPoolExecutor(2,
+    scheduledExecutor = new HadoopScheduledThreadPoolExecutor(2,
         new ThreadFactoryBuilder().setNameFormat("Log Scanner/Cleaner #%d")
             .build());
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/212c519a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
index 2fb7811..0d6e900 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
@@ -46,7 +46,6 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -81,6 +80,7 @@ import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
 import org.apache.hadoop.security.ssl.SSLFactory;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
@@ -475,8 +475,8 @@ public class ShuffleHandler extends AuxiliaryService {
       .build();
     
     selector = new NioServerSocketChannelFactory(
-        Executors.newCachedThreadPool(bossFactory),
-        Executors.newCachedThreadPool(workerFactory),
+        HadoopExecutors.newCachedThreadPool(bossFactory),
+        HadoopExecutors.newCachedThreadPool(workerFactory),
         maxShuffleThreads);
     super.serviceInit(new Configuration(conf));
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/212c519a/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/pi/Util.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/pi/Util.java b/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/pi/Util.java
index 8afc1bd..e74c091 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/pi/Util.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/pi/Util.java
@@ -35,7 +35,6 @@ import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.Semaphore;
 
@@ -48,6 +47,7 @@ import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.util.ToolRunner;
 
 import com.google.common.base.Charsets;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
 
 /** Utility methods */
 public class Util {
@@ -157,7 +157,8 @@ public class Util {
   /** Execute the callables by a number of threads */
   public static <T, E extends Callable<T>> void execute(int nThreads, List<E> callables
       ) throws InterruptedException, ExecutionException {
-    final ExecutorService executor = Executors.newFixedThreadPool(nThreads); 
+    final ExecutorService executor = HadoopExecutors.newFixedThreadPool(
+        nThreads);
     final List<Future<T>> futures = executor.invokeAll(callables);
     for(Future<T> f : futures)
       f.get();