You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2019/08/29 23:14:41 UTC
[hbase] branch branch-2.1 updated: HBASE-22881 Fix non-daemon
threads in hbase server implementation (#512) (#558)
This is an automated email from the ASF dual-hosted git repository.
stack pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new db85526 HBASE-22881 Fix non-daemon threads in hbase server implementation (#512) (#558)
db85526 is described below
commit db85526254304b8c4c595842a2c4d9101f86644b
Author: linkaline <li...@gmail.com>
AuthorDate: Fri Aug 30 07:14:34 2019 +0800
HBASE-22881 Fix non-daemon threads in hbase server implementation (#512) (#558)
Signed-off-by: stack <st...@apache.org>
---
.../hadoop/hbase/client/TestClientNoCluster.java | 3 +-
.../java/org/apache/hadoop/hbase/util/Threads.java | 2 +-
.../apache/hadoop/hbase/backup/HFileArchiver.java | 4 +-
.../hbase/master/MasterMobCompactionThread.java | 21 +++++-----
.../assignment/SplitTableRegionProcedure.java | 2 +-
.../RegionServerFlushTableProcedureManager.java | 11 +++---
.../hadoop/hbase/regionserver/CompactSplit.java | 46 ++++++++--------------
.../snapshot/RegionServerSnapshotManager.java | 8 ++--
.../hadoop/hbase/regionserver/wal/FSHLog.java | 3 +-
.../HBaseInterClusterReplicationEndpoint.java | 8 ++--
.../replication/regionserver/HFileReplicator.java | 11 ++----
.../regionserver/ReplicationSourceManager.java | 4 +-
.../hadoop/hbase/snapshot/SnapshotManifest.java | 2 +-
.../hadoop/hbase/tool/HFileContentValidator.java | 2 +-
.../hadoop/hbase/tool/LoadIncrementalHFiles.java | 3 +-
.../java/org/apache/hadoop/hbase/util/FSUtils.java | 3 +-
.../hadoop/hbase/util/ModifyRegionUtils.java | 10 +----
.../hbase/procedure/SimpleRSProcedureManager.java | 6 +--
.../hadoop/hbase/thrift/IncrementCoalescer.java | 30 ++------------
19 files changed, 64 insertions(+), 115 deletions(-)
diff --git a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java
index 3cab09d..c5858bd 100644
--- a/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java
+++ b/hbase-client/src/test/java/org/apache/hadoop/hbase/client/TestClientNoCluster.java
@@ -807,7 +807,8 @@ public class TestClientNoCluster extends Configured implements Tool {
// Have them all share the same connection so they all share the same instance of
// ManyServersManyRegionsConnection so I can keep an eye on how many requests by server.
- final ExecutorService pool = Executors.newCachedThreadPool(Threads.getNamedThreadFactory("p"));
+ final ExecutorService pool = Executors.newCachedThreadPool(
+ Threads.newDaemonThreadFactory("p"));
// Executors.newFixedThreadPool(servers * 10, Threads.getNamedThreadFactory("p"));
// Share a connection so I can keep counts in the 'server' on concurrency.
final Connection sharedConnection = ConnectionFactory.createConnection(getConf()/*, pool*/);
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java
index 3527340..1ca6c2e 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/Threads.java
@@ -204,7 +204,7 @@ public class Threads {
* @param prefix The prefix of every created Thread's name
* @return a {@link java.util.concurrent.ThreadFactory} that names threads
*/
- public static ThreadFactory getNamedThreadFactory(final String prefix) {
+ private static ThreadFactory getNamedThreadFactory(final String prefix) {
SecurityManager s = System.getSecurityManager();
final ThreadGroup threadGroup = (s != null) ? s.getThreadGroup() : Thread.currentThread()
.getThreadGroup();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java
index e6099db..f7b6b7b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/backup/HFileArchiver.java
@@ -227,7 +227,9 @@ public class HFileArchiver {
@Override
public Thread newThread(Runnable r) {
final String name = "HFileArchiver-" + threadNumber.getAndIncrement();
- return new Thread(r, name);
+ Thread t = new Thread(r, name);
+ t.setDaemon(true);
+ return t;
}
};
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMobCompactionThread.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMobCompactionThread.java
index 9d6da0c..0779eea 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMobCompactionThread.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/MasterMobCompactionThread.java
@@ -23,21 +23,21 @@ import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.TableName;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.master.locking.LockManager;
import org.apache.hadoop.hbase.mob.MobUtils;
import org.apache.hadoop.hbase.procedure2.LockType;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* The mob compaction thread used in {@link MasterRpcServices}
@@ -55,14 +55,11 @@ public class MasterMobCompactionThread {
this.conf = master.getConfiguration();
final String n = Thread.currentThread().getName();
// this pool is used to run the mob compaction
- this.masterMobPool = new ThreadPoolExecutor(1, 2, 60, TimeUnit.SECONDS,
- new SynchronousQueue<>(), new ThreadFactory() {
- @Override
- public Thread newThread(Runnable r) {
- String name = n + "-MasterMobCompaction-" + EnvironmentEdgeManager.currentTime();
- return new Thread(r, name);
- }
- });
+ this.masterMobPool = new ThreadPoolExecutor(1, 2, 60,
+ TimeUnit.SECONDS, new SynchronousQueue<>(),
+ new ThreadFactoryBuilder().setDaemon(true)
+ .setNameFormat(n + "-MasterMobCompaction-" + EnvironmentEdgeManager.currentTime())
+ .build());
((ThreadPoolExecutor) this.masterMobPool).allowCoreThreadTimeOut(true);
// this pool is used in the mob compaction to compact the mob files by partitions
// in parallel
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java
index a685173..2856ab0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java
@@ -667,7 +667,7 @@ public class SplitTableRegionProcedure
LOG.info("pid=" + getProcId() + " splitting " + nbFiles + " storefiles, region=" +
getParentRegion().getShortNameToLog() + ", threads=" + maxThreads);
final ExecutorService threadPool = Executors.newFixedThreadPool(maxThreads,
- Threads.getNamedThreadFactory("StoreFileSplitter-%1$d"));
+ Threads.newDaemonThreadFactory("StoreFileSplitter-%1$d"));
final List<Future<Pair<Path, Path>>> futures = new ArrayList<Future<Pair<Path, Path>>>(nbFiles);
// Split each store file.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java
index bf55c0c..ba69aea 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/procedure/flush/RegionServerFlushTableProcedureManager.java
@@ -25,11 +25,9 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.DaemonThreadFactory;
@@ -47,8 +45,11 @@ import org.apache.hadoop.hbase.procedure.ZKProcedureMemberRpcs;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
+import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.zookeeper.KeeperException;
+
+import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -213,10 +214,8 @@ public class RegionServerFlushTableProcedureManager extends RegionServerProcedur
RegionServerFlushTableProcedureManager.FLUSH_TIMEOUT_MILLIS_DEFAULT);
int threads = conf.getInt(CONCURENT_FLUSH_TASKS_KEY, DEFAULT_CONCURRENT_FLUSH_TASKS);
this.name = name;
- executor = new ThreadPoolExecutor(threads, threads, keepAlive, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<>(), new DaemonThreadFactory("rs("
- + name + ")-flush-proc-pool"));
- executor.allowCoreThreadTimeOut(true);
+ executor = Threads.getBoundedCachedThreadPool(threads, keepAlive, TimeUnit.MILLISECONDS,
+ new DaemonThreadFactory("rs(" + name + ")-flush-proc-pool-"));
taskPool = new ExecutorCompletionService<>(executor);
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java
index 2f5c9d9..46b24d0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplit.java
@@ -31,7 +31,6 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
-import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -56,8 +55,10 @@ import org.apache.hadoop.util.StringUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
/**
* Compact region on request and then run split if appropriate
@@ -120,37 +121,22 @@ public class CompactSplit implements CompactionRequester, PropagatingConfigurati
final String n = Thread.currentThread().getName();
StealJobQueue<Runnable> stealJobQueue = new StealJobQueue<Runnable>(COMPARATOR);
- this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads,
- 60, TimeUnit.SECONDS, stealJobQueue,
- new ThreadFactory() {
- @Override
- public Thread newThread(Runnable r) {
- String name = n + "-longCompactions-" + System.currentTimeMillis();
- return new Thread(r, name);
- }
- });
+ this.longCompactions = new ThreadPoolExecutor(largeThreads, largeThreads, 60,
+ TimeUnit.SECONDS, stealJobQueue,
+ new ThreadFactoryBuilder()
+ .setNameFormat(n + "-longCompactions-" + System.currentTimeMillis())
+ .setDaemon(true).build());
this.longCompactions.setRejectedExecutionHandler(new Rejection());
this.longCompactions.prestartAllCoreThreads();
- this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads,
- 60, TimeUnit.SECONDS, stealJobQueue.getStealFromQueue(),
- new ThreadFactory() {
- @Override
- public Thread newThread(Runnable r) {
- String name = n + "-shortCompactions-" + System.currentTimeMillis();
- return new Thread(r, name);
- }
- });
- this.shortCompactions
- .setRejectedExecutionHandler(new Rejection());
- this.splits = (ThreadPoolExecutor)
- Executors.newFixedThreadPool(splitThreads,
- new ThreadFactory() {
- @Override
- public Thread newThread(Runnable r) {
- String name = n + "-splits-" + System.currentTimeMillis();
- return new Thread(r, name);
- }
- });
+ this.shortCompactions = new ThreadPoolExecutor(smallThreads, smallThreads, 60,
+ TimeUnit.SECONDS, stealJobQueue.getStealFromQueue(),
+ new ThreadFactoryBuilder()
+ .setNameFormat(n + "-shortCompactions-" + System.currentTimeMillis())
+ .setDaemon(true).build());
+ this.shortCompactions.setRejectedExecutionHandler(new Rejection());
+ this.splits = (ThreadPoolExecutor) Executors.newFixedThreadPool(splitThreads,
+ new ThreadFactoryBuilder().setNameFormat(n + "-splits-" + System.currentTimeMillis())
+ .setDaemon(true).build());
// compaction throughput controller
this.compactionThroughputController =
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java
index 08335ab..579bb24 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/snapshot/RegionServerSnapshotManager.java
@@ -26,7 +26,6 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -36,6 +35,7 @@ import org.apache.hadoop.hbase.DaemonThreadFactory;
import org.apache.hadoop.hbase.DroppedSnapshotException;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
@@ -283,10 +283,8 @@ public class RegionServerSnapshotManager extends RegionServerProcedureManager {
RegionServerSnapshotManager.SNAPSHOT_TIMEOUT_MILLIS_DEFAULT);
int threads = conf.getInt(CONCURENT_SNAPSHOT_TASKS_KEY, DEFAULT_CONCURRENT_SNAPSHOT_TASKS);
this.name = name;
- executor = new ThreadPoolExecutor(threads, threads, keepAlive, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<>(), new DaemonThreadFactory("rs("
- + name + ")-snapshot-pool"));
- executor.allowCoreThreadTimeOut(true);
+ executor = Threads.getBoundedCachedThreadPool(threads, keepAlive, TimeUnit.MILLISECONDS,
+ new DaemonThreadFactory("rs(" + name + ")-snapshot-pool-"));
taskPool = new ExecutorCompletionService<>(executor);
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index f10b1ce..ed61784 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -222,7 +222,8 @@ public class FSHLog extends AbstractFSWAL<Writer> {
// Using BlockingWaitStrategy. Stuff that is going on here takes so long it makes no sense
// spinning as other strategies do.
this.disruptor = new Disruptor<>(RingBufferTruck::new,
- getPreallocatedEventCount(), Threads.getNamedThreadFactory(hostingThreadName + ".append"),
+ getPreallocatedEventCount(),
+ Threads.newDaemonThreadFactory(hostingThreadName + ".append"),
ProducerType.MULTI, new BlockingWaitStrategy());
// Advance the ring buffer sequence so that it starts from 1 instead of 0,
// because SyncFuture.NOT_DONE = 0.
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
index 0014d9d..36eea3e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
@@ -32,7 +32,6 @@ import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.regex.Matcher;
@@ -57,8 +56,10 @@ import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -141,9 +142,8 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
// per sink thread pool
this.maxThreads = this.conf.getInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY,
HConstants.REPLICATION_SOURCE_MAXTHREADS_DEFAULT);
- this.exec = new ThreadPoolExecutor(maxThreads, maxThreads, 60, TimeUnit.SECONDS,
- new LinkedBlockingQueue<>());
- this.exec.allowCoreThreadTimeOut(true);
+ this.exec = Threads.getBoundedCachedThreadPool(maxThreads, 60, TimeUnit.SECONDS,
+ new ThreadFactoryBuilder().setDaemon(true).setNameFormat("SinkThread-%d").build());
this.abortable = ctx.getAbortable();
// Set the size limit for replication RPCs to 95% of the max request size.
// We could do with less slop if we have an accurate estimate of encoded size. Being
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java
index 1f44817..ab9a236 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HFileReplicator.java
@@ -27,7 +27,6 @@ import java.util.Map.Entry;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -53,6 +52,7 @@ import org.apache.hadoop.hbase.security.token.FsDelegationToken;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.util.Threads;
/**
* It is used for replicating HFile entries. It will first copy parallely all the hfiles to a local
@@ -105,12 +105,9 @@ public class HFileReplicator {
this.maxCopyThreads =
this.conf.getInt(REPLICATION_BULKLOAD_COPY_MAXTHREADS_KEY,
REPLICATION_BULKLOAD_COPY_MAXTHREADS_DEFAULT);
- ThreadFactoryBuilder builder = new ThreadFactoryBuilder();
- builder.setNameFormat("HFileReplicationCallable-%1$d");
- this.exec =
- new ThreadPoolExecutor(maxCopyThreads, maxCopyThreads, 60, TimeUnit.SECONDS,
- new LinkedBlockingQueue<>(), builder.build());
- this.exec.allowCoreThreadTimeOut(true);
+ this.exec = Threads.getBoundedCachedThreadPool(maxCopyThreads, 60, TimeUnit.SECONDS,
+ new ThreadFactoryBuilder().setDaemon(true)
+ .setNameFormat("HFileReplicationCallable-%1$d").build());
this.copiesPerThread =
conf.getInt(REPLICATION_BULKLOAD_COPY_HFILES_PERTHREAD_KEY,
REPLICATION_BULKLOAD_COPY_HFILES_PERTHREAD_DEFAULT);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 5d4f034..585245e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -196,8 +196,8 @@ public class ReplicationSourceManager implements ReplicationListener {
int nbWorkers = conf.getInt("replication.executor.workers", 1);
// use a short 100ms sleep since this could be done inline with a RS startup
// even if we fail, other region servers can take care of it
- this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers, 100, TimeUnit.MILLISECONDS,
- new LinkedBlockingQueue<>());
+ this.executor = new ThreadPoolExecutor(nbWorkers, nbWorkers, 100,
+ TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>());
ThreadFactoryBuilder tfb = new ThreadFactoryBuilder();
tfb.setNameFormat("ReplicationExecutor-%d");
tfb.setDaemon(true);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifest.java
index f2c686d..85360ae 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifest.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/snapshot/SnapshotManifest.java
@@ -558,7 +558,7 @@ public final class SnapshotManifest {
public static ThreadPoolExecutor createExecutor(final Configuration conf, final String name) {
int maxThreads = conf.getInt("hbase.snapshot.thread.pool.max", 8);
return Threads.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
- Threads.getNamedThreadFactory(name));
+ Threads.newDaemonThreadFactory(name));
}
/**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/HFileContentValidator.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/HFileContentValidator.java
index d60844b..dd71148 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/HFileContentValidator.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/HFileContentValidator.java
@@ -108,7 +108,7 @@ public class HFileContentValidator extends AbstractHBaseTool {
int availableProcessors = Runtime.getRuntime().availableProcessors();
int numThreads = conf.getInt("hfilevalidator.numthreads", availableProcessors);
return Executors.newFixedThreadPool(numThreads,
- Threads.getNamedThreadFactory("hfile-validator"));
+ Threads.newDaemonThreadFactory("hfile-validator"));
}
@Override
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java
index f47c2a9..1896883 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/tool/LoadIncrementalHFiles.java
@@ -593,8 +593,7 @@ public class LoadIncrementalHFiles extends Configured implements Tool {
}
/**
- * @param conn the HBase cluster connection
- * @param tableName the table name of the table to load into
+ * @param table the table to load into
* @param pool the ExecutorService
* @param queue the queue for LoadQueueItem
* @param startEndKeys start and end keys
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
index f5b3074..913c892 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSUtils.java
@@ -1623,7 +1623,8 @@ public abstract class FSUtils extends CommonFSUtils {
// run in multiple threads
ThreadPoolExecutor tpe = new ThreadPoolExecutor(threadPoolSize,
threadPoolSize, 60, TimeUnit.SECONDS,
- new ArrayBlockingQueue<>(statusList.length));
+ new ArrayBlockingQueue<>(statusList.length),
+ Threads.newDaemonThreadFactory("FSRegionQuery"));
try {
// ignore all file status items that are not of interest
for (FileStatus regionStatus : statusList) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java
index 1c860b4..79544fb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ModifyRegionUtils.java
@@ -28,7 +28,6 @@ import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -235,14 +234,7 @@ public abstract class ModifyRegionUtils {
"hbase.hregion.open.and.init.threads.max", 16));
ThreadPoolExecutor regionOpenAndInitThreadPool = Threads
.getBoundedCachedThreadPool(maxThreads, 30L, TimeUnit.SECONDS,
- new ThreadFactory() {
- private int count = 1;
-
- @Override
- public Thread newThread(Runnable r) {
- return new Thread(r, threadNamePrefix + "-" + count++);
- }
- });
+ Threads.newDaemonThreadFactory(threadNamePrefix));
return regionOpenAndInitThreadPool;
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/SimpleRSProcedureManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/SimpleRSProcedureManager.java
index f5a858a..c99fcc1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/SimpleRSProcedureManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/procedure/SimpleRSProcedureManager.java
@@ -124,9 +124,9 @@ public class SimpleRSProcedureManager extends RegionServerProcedureManager {
public SimpleSubprocedurePool(String name, Configuration conf) {
this.name = name;
- executor = new ThreadPoolExecutor(1, 1, 500, TimeUnit.SECONDS,
- new LinkedBlockingQueue<>(),
- new DaemonThreadFactory("rs(" + name + ")-procedure-pool"));
+ executor = new ThreadPoolExecutor(1, 1, 500,
+ TimeUnit.SECONDS, new LinkedBlockingQueue<>(),
+ new DaemonThreadFactory("rs(" + name + ")-procedure-pool-"));
taskPool = new ExecutorCompletionService<>(executor);
}
diff --git a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java
index f2abe2e..8f55dec 100644
--- a/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java
+++ b/hbase-thrift/src/main/java/org/apache/hadoop/hbase/thrift/IncrementCoalescer.java
@@ -26,10 +26,8 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.LongAdder;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.client.Table;
@@ -131,27 +129,6 @@ public class IncrementCoalescer implements IncrementCoalescerMBean {
}
- static class DaemonThreadFactory implements ThreadFactory {
- static final AtomicInteger poolNumber = new AtomicInteger(1);
- final ThreadGroup group;
- final AtomicInteger threadNumber = new AtomicInteger(1);
- final String namePrefix;
-
- DaemonThreadFactory() {
- SecurityManager s = System.getSecurityManager();
- group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
- namePrefix = "ICV-" + poolNumber.getAndIncrement() + "-thread-";
- }
-
- @Override
- public Thread newThread(Runnable r) {
- Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
- if (!t.isDaemon()) t.setDaemon(true);
- if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY);
- return t;
- }
- }
-
private final LongAdder failedIncrements = new LongAdder();
private final LongAdder successfulCoalescings = new LongAdder();
private final LongAdder totalIncrements = new LongAdder();
@@ -169,10 +146,9 @@ public class IncrementCoalescer implements IncrementCoalescerMBean {
public IncrementCoalescer(HBaseHandler hand) {
this.handler = hand;
LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<>();
- pool =
- new ThreadPoolExecutor(CORE_POOL_SIZE, CORE_POOL_SIZE, 50, TimeUnit.MILLISECONDS, queue,
- Threads.newDaemonThreadFactory("IncrementCoalescer"));
-
+ pool = new ThreadPoolExecutor(CORE_POOL_SIZE, CORE_POOL_SIZE, 50,
+ TimeUnit.MILLISECONDS, queue,
+ Threads.newDaemonThreadFactory("IncrementCoalescer"));
MBeans.register("thrift", "Thrift", this);
}