You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vv...@apache.org on 2016/02/18 09:49:30 UTC
[1/2] hadoop git commit: MAPREDUCE-6634. Log uncaught
exceptions/errors in various thread pools in mapreduce. Contributed by
Sidharta Seethana.
Repository: hadoop
Updated Branches:
refs/heads/branch-2 d4203c9aa -> 212c519ad
refs/heads/trunk c1afac3a9 -> 2440671a1
MAPREDUCE-6634. Log uncaught exceptions/errors in various thread pools in mapreduce. Contributed by Sidharta Seethana.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2440671a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2440671a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2440671a
Branch: refs/heads/trunk
Commit: 2440671a117f165dcda5056404bc898df3c50803
Parents: c1afac3
Author: Varun Vasudev <vv...@apache.org>
Authored: Thu Feb 18 14:15:08 2016 +0530
Committer: Varun Vasudev <vv...@apache.org>
Committed: Thu Feb 18 14:18:36 2016 +0530
----------------------------------------------------------------------
hadoop-mapreduce-project/CHANGES.txt | 3 ++
.../hadoop/mapred/LocalContainerLauncher.java | 4 +--
.../v2/app/commit/CommitterEventHandler.java | 3 +-
.../mapreduce/v2/app/job/impl/JobImpl.java | 3 +-
.../v2/app/launcher/ContainerLauncherImpl.java | 3 +-
.../mapred/LocalDistributedCacheManager.java | 5 ++--
.../apache/hadoop/mapred/LocalJobRunner.java | 8 +++--
.../hadoop/mapred/LocatedFileStatusFetcher.java | 4 +--
.../java/org/apache/hadoop/mapred/TaskLog.java | 31 ++++++++++----------
.../mapred/lib/MultithreadedMapRunner.java | 4 ++-
.../lib/output/TestFileOutputCommitter.java | 4 +--
.../mapreduce/v2/hs/HistoryFileManager.java | 6 ++--
.../hadoop/mapreduce/v2/hs/JobHistory.java | 3 +-
.../apache/hadoop/mapred/ShuffleHandler.java | 6 ++--
.../org/apache/hadoop/examples/pi/Util.java | 5 ++--
15 files changed, 53 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2440671a/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index 0b8c818..da28bc6 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -310,6 +310,9 @@ Release 2.9.0 - UNRELEASED
MAPREDUCE-6431. JobClient should be an AutoClosable (haibochen via rkanter)
+ MAPREDUCE-6634. Log uncaught exceptions/errors in various thread pools in
+ mapreduce. (Sidharta Seethana via vvasudev)
+
OPTIMIZATIONS
BUG FIXES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2440671a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java
index 1a0d5fb..da118c5 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java
@@ -26,7 +26,6 @@ import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
@@ -60,6 +59,7 @@ import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@@ -138,7 +138,7 @@ public class LocalContainerLauncher extends AbstractService implements
// make it a daemon thread so that the process can exit even if the task is
// not interruptible
taskRunner =
- Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().
+ HadoopExecutors.newSingleThreadExecutor(new ThreadFactoryBuilder().
setDaemon(true).setNameFormat("uber-SubtaskRunner").build());
// create and start an event handling thread
eventHandler = new Thread(new EventHandler(), "uber-EventHandler");
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2440671a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java
index b53955f..0b1be70 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.concurrent.HadoopThreadPoolExecutor;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@@ -133,7 +134,7 @@ public class CommitterEventHandler extends AbstractService
tfBuilder.setThreadFactory(backingTf);
}
ThreadFactory tf = tfBuilder.build();
- launcherPool = new ThreadPoolExecutor(5, 5, 1,
+ launcherPool = new HadoopThreadPoolExecutor(5, 5, 1,
TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf);
eventHandlingThread = new Thread(new Runnable() {
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2440671a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
index 5ed0762..c8c5ce9 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
@@ -116,6 +116,7 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.concurrent.HadoopScheduledThreadPoolExecutor;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeReport;
@@ -698,7 +699,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
.setNameFormat("Job Fail Wait Timeout Monitor #%d")
.setDaemon(true)
.build();
- this.executor = new ScheduledThreadPoolExecutor(1, threadFactory);
+ this.executor = new HadoopScheduledThreadPoolExecutor(1, threadFactory);
// This "this leak" is okay because the retained pointer is in an
// instance variable.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2440671a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
index a7e966c..189e2ef 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.concurrent.HadoopThreadPoolExecutor;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
@@ -266,7 +267,7 @@ public class ContainerLauncherImpl extends AbstractService implements
"ContainerLauncher #%d").setDaemon(true).build();
// Start with a default core-pool size of 10 and change it dynamically.
- launcherPool = new ThreadPoolExecutor(initialPoolSize,
+ launcherPool = new HadoopThreadPoolExecutor(initialPoolSize,
Integer.MAX_VALUE, 1, TimeUnit.HOURS,
new LinkedBlockingQueue<Runnable>(),
tf);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2440671a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java
index 8606ede..3b87197 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java
@@ -35,7 +35,6 @@ import java.util.Map.Entry;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicLong;
@@ -43,7 +42,6 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
@@ -53,6 +51,7 @@ import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -121,7 +120,7 @@ class LocalDistributedCacheManager {
ThreadFactory tf = new ThreadFactoryBuilder()
.setNameFormat("LocalDistributedCacheManager Downloader #%d")
.build();
- exec = Executors.newCachedThreadPool(tf);
+ exec = HadoopExecutors.newCachedThreadPool(tf);
Path destPath = localDirAllocator.getLocalPathForWrite(".", conf);
Map<LocalResource, Future<Path>> resourcesToPaths = Maps.newHashMap();
for (LocalResource resource : localResources.values()) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2440671a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
index 45d3cc5..37c147d 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
@@ -32,7 +32,6 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -74,6 +73,7 @@ import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.ReflectionUtils;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
/** Implements MapReduce locally, in-process, for debugging. */
@InterfaceAudience.Private
@@ -428,7 +428,8 @@ public class LocalJobRunner implements ClientProtocol {
ThreadFactory tf = new ThreadFactoryBuilder()
.setNameFormat("LocalJobRunner Map Task Executor #%d")
.build();
- ExecutorService executor = Executors.newFixedThreadPool(maxMapThreads, tf);
+ ExecutorService executor = HadoopExecutors.newFixedThreadPool(
+ maxMapThreads, tf);
return executor;
}
@@ -454,7 +455,8 @@ public class LocalJobRunner implements ClientProtocol {
LOG.debug("Reduce tasks to process: " + this.numReduceTasks);
// Create a new executor service to drain the work queue.
- ExecutorService executor = Executors.newFixedThreadPool(maxReduceThreads);
+ ExecutorService executor = HadoopExecutors.newFixedThreadPool(
+ maxReduceThreads);
return executor;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2440671a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LocatedFileStatusFetcher.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LocatedFileStatusFetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LocatedFileStatusFetcher.java
index 87114ad..a039bc9 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LocatedFileStatusFetcher.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LocatedFileStatusFetcher.java
@@ -24,7 +24,6 @@ import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
@@ -47,6 +46,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
/**
* Utility class to fetch block locations for specified Input paths using a
@@ -92,7 +92,7 @@ public class LocatedFileStatusFetcher {
IOException {
int numThreads = conf.getInt(FileInputFormat.LIST_STATUS_NUM_THREADS,
FileInputFormat.DEFAULT_LIST_STATUS_NUM_THREADS);
- rawExec = Executors.newFixedThreadPool(
+ rawExec = HadoopExecutors.newFixedThreadPool(
numThreads,
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("GetFileInfo #%d").build());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2440671a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java
index e07b5be..bf838c2 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.mapreduce.util.ProcessTree;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ShutdownHookManager;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.log4j.Appender;
import org.apache.log4j.LogManager;
@@ -327,22 +328,22 @@ public class TaskLog {
public static ScheduledExecutorService createLogSyncer() {
final ScheduledExecutorService scheduler =
- Executors.newSingleThreadScheduledExecutor(
- new ThreadFactory() {
- @Override
- public Thread newThread(Runnable r) {
- final Thread t = Executors.defaultThreadFactory().newThread(r);
- t.setDaemon(true);
- t.setName("Thread for syncLogs");
- return t;
- }
- });
+ HadoopExecutors.newSingleThreadScheduledExecutor(
+ new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ final Thread t = Executors.defaultThreadFactory().newThread(r);
+ t.setDaemon(true);
+ t.setName("Thread for syncLogs");
+ return t;
+ }
+ });
ShutdownHookManager.get().addShutdownHook(new Runnable() {
- @Override
- public void run() {
- TaskLog.syncLogsShutdown(scheduler);
- }
- }, 50);
+ @Override
+ public void run() {
+ TaskLog.syncLogsShutdown(scheduler);
+ }
+ }, 50);
scheduler.scheduleWithFixedDelay(
new Runnable() {
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2440671a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java
index 98d794b..05339bc 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.mapred.SkipBadRecords;
import org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.concurrent.HadoopThreadPoolExecutor;
import java.io.IOException;
import java.util.concurrent.*;
@@ -84,7 +85,8 @@ public class MultithreadedMapRunner<K1, V1, K2, V2>
// Creating a threadpool of the configured size to execute the Mapper
// map method in parallel.
- executorService = new ThreadPoolExecutor(numberOfThreads, numberOfThreads,
+ executorService = new HadoopThreadPoolExecutor(numberOfThreads,
+ numberOfThreads,
0L, TimeUnit.MILLISECONDS,
new BlockingArrayQueue
(numberOfThreads));
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2440671a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java
index eba513b..20d8ab5 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java
@@ -25,10 +25,10 @@ import java.io.IOException;
import java.net.URI;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import junit.framework.TestCase;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.junit.Assert;
import org.apache.commons.logging.Log;
@@ -696,7 +696,7 @@ public class TestFileOutputCommitter extends TestCase {
};
}
- final ExecutorService executor = Executors.newFixedThreadPool(2);
+ final ExecutorService executor = HadoopExecutors.newFixedThreadPool(2);
try {
for (int i = 0; i < taCtx.length; i++) {
final int taskIdx = i;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2440671a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java
index 6be0d27..677d5c2 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java
@@ -66,6 +66,7 @@ import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.ShutdownThreadsHelper;
+import org.apache.hadoop.util.concurrent.HadoopThreadPoolExecutor;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import com.google.common.annotations.VisibleForTesting;
@@ -554,8 +555,9 @@ public class HistoryFileManager extends AbstractService {
JHAdminConfig.DEFAULT_MR_HISTORY_MOVE_THREAD_COUNT);
ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat(
"MoveIntermediateToDone Thread #%d").build();
- moveToDoneExecutor = new ThreadPoolExecutor(numMoveThreads, numMoveThreads,
- 1, TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf);
+ moveToDoneExecutor = new HadoopThreadPoolExecutor(numMoveThreads,
+ numMoveThreads, 1, TimeUnit.HOURS,
+ new LinkedBlockingQueue<Runnable>(), tf);
super.serviceInit(conf);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2440671a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
index 41bc90a..45075c9 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.concurrent.HadoopScheduledThreadPoolExecutor;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.event.EventHandler;
@@ -126,7 +127,7 @@ public class JobHistory extends AbstractService implements HistoryContext {
((Service) storage).start();
}
- scheduledExecutor = new ScheduledThreadPoolExecutor(2,
+ scheduledExecutor = new HadoopScheduledThreadPoolExecutor(2,
new ThreadFactoryBuilder().setNameFormat("Log Scanner/Cleaner #%d")
.build());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2440671a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
index 2fb7811..0d6e900 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
@@ -46,7 +46,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -81,6 +80,7 @@ import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
import org.apache.hadoop.security.ssl.SSLFactory;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
@@ -475,8 +475,8 @@ public class ShuffleHandler extends AuxiliaryService {
.build();
selector = new NioServerSocketChannelFactory(
- Executors.newCachedThreadPool(bossFactory),
- Executors.newCachedThreadPool(workerFactory),
+ HadoopExecutors.newCachedThreadPool(bossFactory),
+ HadoopExecutors.newCachedThreadPool(workerFactory),
maxShuffleThreads);
super.serviceInit(new Configuration(conf));
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2440671a/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/pi/Util.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/pi/Util.java b/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/pi/Util.java
index 8afc1bd..e74c091 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/pi/Util.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/pi/Util.java
@@ -35,7 +35,6 @@ import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
@@ -48,6 +47,7 @@ import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.ToolRunner;
import com.google.common.base.Charsets;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
/** Utility methods */
public class Util {
@@ -157,7 +157,8 @@ public class Util {
/** Execute the callables by a number of threads */
public static <T, E extends Callable<T>> void execute(int nThreads, List<E> callables
) throws InterruptedException, ExecutionException {
- final ExecutorService executor = Executors.newFixedThreadPool(nThreads);
+ final ExecutorService executor = HadoopExecutors.newFixedThreadPool(
+ nThreads);
final List<Future<T>> futures = executor.invokeAll(callables);
for(Future<T> f : futures)
f.get();
[2/2] hadoop git commit: MAPREDUCE-6634. Log uncaught
exceptions/errors in various thread pools in mapreduce. Contributed by
Sidharta Seethana.
Posted by vv...@apache.org.
MAPREDUCE-6634. Log uncaught exceptions/errors in various thread pools in mapreduce. Contributed by Sidharta Seethana.
(cherry picked from commit c50d1e54000c76880a041ce5959a2eb23c86bd35)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/212c519a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/212c519a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/212c519a
Branch: refs/heads/branch-2
Commit: 212c519ad3461068913c18b2ba03714934bb6200
Parents: d4203c9
Author: Varun Vasudev <vv...@apache.org>
Authored: Thu Feb 18 14:15:08 2016 +0530
Committer: Varun Vasudev <vv...@apache.org>
Committed: Thu Feb 18 14:19:01 2016 +0530
----------------------------------------------------------------------
hadoop-mapreduce-project/CHANGES.txt | 3 ++
.../hadoop/mapred/LocalContainerLauncher.java | 4 +--
.../v2/app/commit/CommitterEventHandler.java | 3 +-
.../mapreduce/v2/app/job/impl/JobImpl.java | 3 +-
.../v2/app/launcher/ContainerLauncherImpl.java | 3 +-
.../mapred/LocalDistributedCacheManager.java | 5 ++--
.../apache/hadoop/mapred/LocalJobRunner.java | 8 +++--
.../hadoop/mapred/LocatedFileStatusFetcher.java | 4 +--
.../java/org/apache/hadoop/mapred/TaskLog.java | 31 ++++++++++----------
.../mapred/lib/MultithreadedMapRunner.java | 4 ++-
.../lib/output/TestFileOutputCommitter.java | 4 +--
.../mapreduce/v2/hs/HistoryFileManager.java | 6 ++--
.../hadoop/mapreduce/v2/hs/JobHistory.java | 3 +-
.../apache/hadoop/mapred/ShuffleHandler.java | 6 ++--
.../org/apache/hadoop/examples/pi/Util.java | 5 ++--
15 files changed, 53 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/212c519a/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index c34ae70..cf3e3a3 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -10,6 +10,9 @@ Release 2.9.0 - UNRELEASED
MAPREDUCE-6431. JobClient should be an AutoClosable (haibochen via rkanter)
+ MAPREDUCE-6634. Log uncaught exceptions/errors in various thread pools in
+ mapreduce. (Sidharta Seethana via vvasudev)
+
OPTIMIZATIONS
BUG FIXES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/212c519a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java
index 1a0d5fb..da118c5 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/LocalContainerLauncher.java
@@ -26,7 +26,6 @@ import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
@@ -60,6 +59,7 @@ import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@@ -138,7 +138,7 @@ public class LocalContainerLauncher extends AbstractService implements
// make it a daemon thread so that the process can exit even if the task is
// not interruptible
taskRunner =
- Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().
+ HadoopExecutors.newSingleThreadExecutor(new ThreadFactoryBuilder().
setDaemon(true).setNameFormat("uber-SubtaskRunner").build());
// create and start an event handling thread
eventHandler = new Thread(new EventHandler(), "uber-EventHandler");
http://git-wip-us.apache.org/repos/asf/hadoop/blob/212c519a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java
index b53955f..0b1be70 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/commit/CommitterEventHandler.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.concurrent.HadoopThreadPoolExecutor;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
@@ -133,7 +134,7 @@ public class CommitterEventHandler extends AbstractService
tfBuilder.setThreadFactory(backingTf);
}
ThreadFactory tf = tfBuilder.build();
- launcherPool = new ThreadPoolExecutor(5, 5, 1,
+ launcherPool = new HadoopThreadPoolExecutor(5, 5, 1,
TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf);
eventHandlingThread = new Thread(new Runnable() {
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/212c519a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
index b11745b..77c2b24 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java
@@ -116,6 +116,7 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.concurrent.HadoopScheduledThreadPoolExecutor;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeReport;
@@ -698,7 +699,7 @@ public class JobImpl implements org.apache.hadoop.mapreduce.v2.app.job.Job,
.setNameFormat("Job Fail Wait Timeout Monitor #%d")
.setDaemon(true)
.build();
- this.executor = new ScheduledThreadPoolExecutor(1, threadFactory);
+ this.executor = new HadoopScheduledThreadPoolExecutor(1, threadFactory);
// This "this leak" is okay because the retained pointer is in an
// instance variable.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/212c519a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
index a7e966c..189e2ef 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/launcher/ContainerLauncherImpl.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.concurrent.HadoopThreadPoolExecutor;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
import org.apache.hadoop.yarn.api.protocolrecords.StartContainersResponse;
@@ -266,7 +267,7 @@ public class ContainerLauncherImpl extends AbstractService implements
"ContainerLauncher #%d").setDaemon(true).build();
// Start with a default core-pool size of 10 and change it dynamically.
- launcherPool = new ThreadPoolExecutor(initialPoolSize,
+ launcherPool = new HadoopThreadPoolExecutor(initialPoolSize,
Integer.MAX_VALUE, 1, TimeUnit.HOURS,
new LinkedBlockingQueue<Runnable>(),
tf);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/212c519a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java
index 8606ede..3b87197 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalDistributedCacheManager.java
@@ -35,7 +35,6 @@ import java.util.Map.Entry;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicLong;
@@ -43,7 +42,6 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileContext;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
@@ -53,6 +51,7 @@ import org.apache.hadoop.mapreduce.filecache.DistributedCache;
import org.apache.hadoop.mapreduce.v2.util.MRApps;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.util.ConverterUtils;
@@ -121,7 +120,7 @@ class LocalDistributedCacheManager {
ThreadFactory tf = new ThreadFactoryBuilder()
.setNameFormat("LocalDistributedCacheManager Downloader #%d")
.build();
- exec = Executors.newCachedThreadPool(tf);
+ exec = HadoopExecutors.newCachedThreadPool(tf);
Path destPath = localDirAllocator.getLocalPathForWrite(".", conf);
Map<LocalResource, Future<Path>> resourcesToPaths = Maps.newHashMap();
for (LocalResource resource : localResources.values()) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/212c519a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
index 5feb2fe..c4cd5cd 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapred/LocalJobRunner.java
@@ -32,7 +32,6 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -73,6 +72,7 @@ import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.ReflectionUtils;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
/** Implements MapReduce locally, in-process, for debugging. */
@InterfaceAudience.Private
@@ -427,7 +427,8 @@ public class LocalJobRunner implements ClientProtocol {
ThreadFactory tf = new ThreadFactoryBuilder()
.setNameFormat("LocalJobRunner Map Task Executor #%d")
.build();
- ExecutorService executor = Executors.newFixedThreadPool(maxMapThreads, tf);
+ ExecutorService executor = HadoopExecutors.newFixedThreadPool(
+ maxMapThreads, tf);
return executor;
}
@@ -453,7 +454,8 @@ public class LocalJobRunner implements ClientProtocol {
LOG.debug("Reduce tasks to process: " + this.numReduceTasks);
// Create a new executor service to drain the work queue.
- ExecutorService executor = Executors.newFixedThreadPool(maxReduceThreads);
+ ExecutorService executor = HadoopExecutors.newFixedThreadPool(
+ maxReduceThreads);
return executor;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/212c519a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LocatedFileStatusFetcher.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LocatedFileStatusFetcher.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LocatedFileStatusFetcher.java
index 87114ad..a039bc9 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LocatedFileStatusFetcher.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/LocatedFileStatusFetcher.java
@@ -24,7 +24,6 @@ import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
@@ -47,6 +46,7 @@ import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
/**
* Utility class to fetch block locations for specified Input paths using a
@@ -92,7 +92,7 @@ public class LocatedFileStatusFetcher {
IOException {
int numThreads = conf.getInt(FileInputFormat.LIST_STATUS_NUM_THREADS,
FileInputFormat.DEFAULT_LIST_STATUS_NUM_THREADS);
- rawExec = Executors.newFixedThreadPool(
+ rawExec = HadoopExecutors.newFixedThreadPool(
numThreads,
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("GetFileInfo #%d").build());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/212c519a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java
index e07b5be..bf838c2 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/TaskLog.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.mapreduce.util.ProcessTree;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ShutdownHookManager;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.log4j.Appender;
import org.apache.log4j.LogManager;
@@ -327,22 +328,22 @@ public class TaskLog {
public static ScheduledExecutorService createLogSyncer() {
final ScheduledExecutorService scheduler =
- Executors.newSingleThreadScheduledExecutor(
- new ThreadFactory() {
- @Override
- public Thread newThread(Runnable r) {
- final Thread t = Executors.defaultThreadFactory().newThread(r);
- t.setDaemon(true);
- t.setName("Thread for syncLogs");
- return t;
- }
- });
+ HadoopExecutors.newSingleThreadScheduledExecutor(
+ new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ final Thread t = Executors.defaultThreadFactory().newThread(r);
+ t.setDaemon(true);
+ t.setName("Thread for syncLogs");
+ return t;
+ }
+ });
ShutdownHookManager.get().addShutdownHook(new Runnable() {
- @Override
- public void run() {
- TaskLog.syncLogsShutdown(scheduler);
- }
- }, 50);
+ @Override
+ public void run() {
+ TaskLog.syncLogsShutdown(scheduler);
+ }
+ }, 50);
scheduler.scheduleWithFixedDelay(
new Runnable() {
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/212c519a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java
index 98d794b..05339bc 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/lib/MultithreadedMapRunner.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.mapred.SkipBadRecords;
import org.apache.hadoop.mapreduce.lib.map.MultithreadedMapper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.util.concurrent.HadoopThreadPoolExecutor;
import java.io.IOException;
import java.util.concurrent.*;
@@ -84,7 +85,8 @@ public class MultithreadedMapRunner<K1, V1, K2, V2>
// Creating a threadpool of the configured size to execute the Mapper
// map method in parallel.
- executorService = new ThreadPoolExecutor(numberOfThreads, numberOfThreads,
+ executorService = new HadoopThreadPoolExecutor(numberOfThreads,
+ numberOfThreads,
0L, TimeUnit.MILLISECONDS,
new BlockingArrayQueue
(numberOfThreads));
http://git-wip-us.apache.org/repos/asf/hadoop/blob/212c519a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java
index eba513b..20d8ab5 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java
@@ -25,10 +25,10 @@ import java.io.IOException;
import java.net.URI;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import junit.framework.TestCase;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.junit.Assert;
import org.apache.commons.logging.Log;
@@ -696,7 +696,7 @@ public class TestFileOutputCommitter extends TestCase {
};
}
- final ExecutorService executor = Executors.newFixedThreadPool(2);
+ final ExecutorService executor = HadoopExecutors.newFixedThreadPool(2);
try {
for (int i = 0; i < taCtx.length; i++) {
final int taskIdx = i;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/212c519a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java
index 92ec96a..c7eb86e 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/HistoryFileManager.java
@@ -66,6 +66,7 @@ import org.apache.hadoop.mapreduce.v2.jobhistory.JobIndexInfo;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.util.ShutdownThreadsHelper;
+import org.apache.hadoop.util.concurrent.HadoopThreadPoolExecutor;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import com.google.common.annotations.VisibleForTesting;
@@ -554,8 +555,9 @@ public class HistoryFileManager extends AbstractService {
JHAdminConfig.DEFAULT_MR_HISTORY_MOVE_THREAD_COUNT);
ThreadFactory tf = new ThreadFactoryBuilder().setNameFormat(
"MoveIntermediateToDone Thread #%d").build();
- moveToDoneExecutor = new ThreadPoolExecutor(numMoveThreads, numMoveThreads,
- 1, TimeUnit.HOURS, new LinkedBlockingQueue<Runnable>(), tf);
+ moveToDoneExecutor = new HadoopThreadPoolExecutor(numMoveThreads,
+ numMoveThreads, 1, TimeUnit.HOURS,
+ new LinkedBlockingQueue<Runnable>(), tf);
super.serviceInit(conf);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/212c519a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
index 41bc90a..45075c9 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistory.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import org.apache.hadoop.service.AbstractService;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.concurrent.HadoopScheduledThreadPoolExecutor;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.event.EventHandler;
@@ -126,7 +127,7 @@ public class JobHistory extends AbstractService implements HistoryContext {
((Service) storage).start();
}
- scheduledExecutor = new ScheduledThreadPoolExecutor(2,
+ scheduledExecutor = new HadoopScheduledThreadPoolExecutor(2,
new ThreadFactoryBuilder().setNameFormat("Log Scanner/Cleaner #%d")
.build());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/212c519a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
index 2fb7811..0d6e900 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
@@ -46,7 +46,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -81,6 +80,7 @@ import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
import org.apache.hadoop.security.ssl.SSLFactory;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.VersionProto;
@@ -475,8 +475,8 @@ public class ShuffleHandler extends AuxiliaryService {
.build();
selector = new NioServerSocketChannelFactory(
- Executors.newCachedThreadPool(bossFactory),
- Executors.newCachedThreadPool(workerFactory),
+ HadoopExecutors.newCachedThreadPool(bossFactory),
+ HadoopExecutors.newCachedThreadPool(workerFactory),
maxShuffleThreads);
super.serviceInit(new Configuration(conf));
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/212c519a/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/pi/Util.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/pi/Util.java b/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/pi/Util.java
index 8afc1bd..e74c091 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/pi/Util.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/pi/Util.java
@@ -35,7 +35,6 @@ import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
@@ -48,6 +47,7 @@ import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.ToolRunner;
import com.google.common.base.Charsets;
+import org.apache.hadoop.util.concurrent.HadoopExecutors;
/** Utility methods */
public class Util {
@@ -157,7 +157,8 @@ public class Util {
/** Execute the callables by a number of threads */
public static <T, E extends Callable<T>> void execute(int nThreads, List<E> callables
) throws InterruptedException, ExecutionException {
- final ExecutorService executor = Executors.newFixedThreadPool(nThreads);
+ final ExecutorService executor = HadoopExecutors.newFixedThreadPool(
+ nThreads);
final List<Future<T>> futures = executor.invokeAll(callables);
for(Future<T> f : futures)
f.get();