You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ib...@apache.org on 2020/07/07 14:57:17 UTC
[accumulo] 01/09: fixes #1621: The ClientPool thread pool allows
all core threads to time out * Added properties to allow overridding the
allowCoreThreadTimeout in various threadpools: master incoming requests,
tserver incoming requests, master bulk imports
This is an automated email from the ASF dual-hosted git repository.
ibella pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/accumulo.git
commit 352d2cd050411233148fd476531d93297b98d308
Author: Ivan Bella <iv...@bella.name>
AuthorDate: Thu Jun 4 18:17:27 2020 +0000
fixes #1621: The ClientPool thread pool allows all core threads to time out
* Added properties to allow overridding the allowCoreThreadTimeout in
various threadpools: master incoming requests, tserver incoming requests, master bulk imports
---
.../core/clientImpl/TabletServerBatchReader.java | 2 +-
.../core/clientImpl/TabletServerBatchWriter.java | 4 +-
.../org/apache/accumulo/core/conf/Property.java | 9 ++
.../accumulo/core/util/SimpleThreadPool.java | 13 ++-
.../apache/accumulo/server/rpc/TServerUtils.java | 121 ++++++++++++---------
.../accumulo/server/util/TServerUtilsTest.java | 3 +-
.../apache/accumulo/gc/SimpleGarbageCollector.java | 2 +-
.../java/org/apache/accumulo/master/Master.java | 6 +-
.../master/tableOps/bulkVer1/BulkImport.java | 2 +-
.../master/tableOps/bulkVer1/LoadFiles.java | 6 +-
.../master/tableOps/bulkVer2/BulkImportMove.java | 2 +-
.../tableOps/tableImport/MoveExportedFiles.java | 2 +-
.../org/apache/accumulo/tserver/TabletServer.java | 16 +--
.../org/apache/accumulo/tserver/log/LogSorter.java | 2 +-
.../accumulo/tserver/log/TabletServerLogger.java | 2 +-
.../accumulo/test/BalanceWithOfflineTableIT.java | 2 +-
.../test/functional/BatchWriterFlushIT.java | 2 +-
.../accumulo/test/functional/ZombieTServer.java | 2 +-
.../accumulo/test/performance/NullTserver.java | 2 +-
19 files changed, 116 insertions(+), 84 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReader.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReader.java
index 37909b8..d89b376 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReader.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchReader.java
@@ -70,7 +70,7 @@ public class TabletServerBatchReader extends ScannerOptions implements BatchScan
this.numThreads = numQueryThreads;
queryThreadPool =
- new SimpleThreadPool(numQueryThreads, "batch scanner " + batchReaderInstance + "-");
+ new SimpleThreadPool(numQueryThreads, true, "batch scanner " + batchReaderInstance + "-");
cleanable = CleanerUtil.unclosed(this, scopeClass, closed, log, queryThreadPool.asCloseable());
}
diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
index ff04bbf..1f02530 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/TabletServerBatchWriter.java
@@ -647,9 +647,9 @@ public class TabletServerBatchWriter implements AutoCloseable {
public MutationWriter(int numSendThreads) {
serversMutations = new HashMap<>();
queued = new HashSet<>();
- sendThreadPool = new SimpleThreadPool(numSendThreads, this.getClass().getName());
+ sendThreadPool = new SimpleThreadPool(numSendThreads, true, this.getClass().getName());
locators = new HashMap<>();
- binningThreadPool = new SimpleThreadPool(1, "BinMutations", new SynchronousQueue<>());
+ binningThreadPool = new SimpleThreadPool(1, true, "BinMutations", new SynchronousQueue<>());
binningThreadPool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
}
diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
index c93402e..585f7be 100644
--- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java
+++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java
@@ -248,6 +248,9 @@ public enum Property {
"The number of attempts to bulk import a RFile before giving up."),
MASTER_BULK_THREADPOOL_SIZE("master.bulk.threadpool.size", "5", PropertyType.COUNT,
"The number of threads to use when coordinating a bulk import."),
+ MASTER_BULK_THREADPOOL_ALLOW_TIMEOUT("master.bulk.thread.timeout.allowed", "true",
+ PropertyType.BOOLEAN,
+ "True if the bulk import threads are allowed to timeout with no work available."),
MASTER_BULK_TIMEOUT("master.bulk.timeout", "5m", PropertyType.TIMEDURATION,
"The time to wait for a tablet server to process a bulk import request"),
MASTER_RENAME_THREADS("master.rename.threadpool.size", "20", PropertyType.COUNT,
@@ -261,6 +264,9 @@ public enum Property {
"Regular expression that defines the set of Tablet Servers that will perform bulk imports"),
MASTER_MINTHREADS("master.server.threads.minimum", "20", PropertyType.COUNT,
"The minimum number of threads to use to handle incoming requests."),
+ MASTER_MINTHREADS_ALLOW_TIMEOUT("master.server.thread.timeout.allowed", "true",
+ PropertyType.BOOLEAN,
+ "True if the incoming request threads are allowed to timeout with no work available."),
MASTER_THREADCHECK("master.server.threadcheck.time", "1s", PropertyType.TIMEDURATION,
"The time between adjustments of the server thread pool."),
MASTER_RECOVERY_DELAY("master.recovery.delay", "10s", PropertyType.TIMEDURATION,
@@ -497,6 +503,9 @@ public enum Property {
"The time to wait for a tablet server to process a bulk import request."),
TSERV_MINTHREADS("tserver.server.threads.minimum", "20", PropertyType.COUNT,
"The minimum number of threads to use to handle incoming requests."),
+ TSERV_MINTHREADS_ALLOW_TIMEOUT("tserver.server.thread.timeout.allowed", "true",
+ PropertyType.BOOLEAN,
+ "True if the incoming request threads are allowed to timeout with no work available."),
TSERV_THREADCHECK("tserver.server.threadcheck.time", "1s", PropertyType.TIMEDURATION,
"The time between adjustments of the server thread pool."),
TSERV_MAX_MESSAGE_SIZE("tserver.server.message.size.max", "1G", PropertyType.BYTES,
diff --git a/core/src/main/java/org/apache/accumulo/core/util/SimpleThreadPool.java b/core/src/main/java/org/apache/accumulo/core/util/SimpleThreadPool.java
index 1cc96ca..34f8359 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/SimpleThreadPool.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/SimpleThreadPool.java
@@ -28,15 +28,16 @@ import java.util.concurrent.TimeUnit;
*/
public class SimpleThreadPool extends ThreadPoolExecutor {
- public SimpleThreadPool(int max, final String name) {
- super(max, max, 4L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(),
+ public SimpleThreadPool(int coreAndMax, boolean allowCoreThreadTimeOut, final String name) {
+ super(coreAndMax, coreAndMax, 4L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(),
new NamingThreadFactory(name));
- allowCoreThreadTimeOut(true);
+ allowCoreThreadTimeOut(allowCoreThreadTimeOut);
}
- public SimpleThreadPool(int max, final String name, BlockingQueue<Runnable> queue) {
- super(max, max, 4L, TimeUnit.SECONDS, queue, new NamingThreadFactory(name));
- allowCoreThreadTimeOut(true);
+ public SimpleThreadPool(int coreAndMax, boolean allowCoreThreadTimeOut, final String name,
+ BlockingQueue<Runnable> queue) {
+ super(coreAndMax, coreAndMax, 4L, TimeUnit.SECONDS, queue, new NamingThreadFactory(name));
+ allowCoreThreadTimeOut(allowCoreThreadTimeOut);
}
/**
diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
index a4a4a7b..33a3612 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
@@ -141,8 +141,8 @@ public class TServerUtils {
public static ServerAddress startServer(MetricsSystem metricsSystem, ServerContext service,
String hostname, Property portHintProperty, TProcessor processor, String serverName,
String threadName, Property portSearchProperty, Property minThreadProperty,
- Property timeBetweenThreadChecksProperty, Property maxMessageSizeProperty)
- throws UnknownHostException {
+ Property allowCoreThreadTimeOutProperty, Property timeBetweenThreadChecksProperty,
+ Property maxMessageSizeProperty) throws UnknownHostException {
final AccumuloConfiguration config = service.getConfiguration();
final int[] portHint = config.getPort(portHintProperty);
@@ -152,6 +152,11 @@ public class TServerUtils {
minThreads = config.getCount(minThreadProperty);
}
+ boolean allowCoreThreadTimeOut = true;
+ if (allowCoreThreadTimeOutProperty != null) {
+ allowCoreThreadTimeOut = config.getBoolean(allowCoreThreadTimeOutProperty);
+ }
+
long timeBetweenThreadChecks = 1000;
if (timeBetweenThreadChecksProperty != null) {
timeBetweenThreadChecks = config.getTimeInMillis(timeBetweenThreadChecksProperty);
@@ -184,9 +189,9 @@ public class TServerUtils {
HostAndPort[] addresses = getHostAndPorts(hostname, portHint);
try {
return TServerUtils.startTServer(serverType, timedProcessor, serverName, threadName,
- minThreads, simpleTimerThreadpoolSize, timeBetweenThreadChecks, maxMessageSize,
- service.getServerSslParams(), service.getSaslParams(), service.getClientTimeoutInMillis(),
- addresses);
+ minThreads, allowCoreThreadTimeOut, simpleTimerThreadpoolSize, timeBetweenThreadChecks,
+ maxMessageSize, service.getServerSslParams(), service.getSaslParams(),
+ service.getClientTimeoutInMillis(), addresses);
} catch (TTransportException e) {
if (portSearch) {
// Build a list of reserved ports - as identified by properties of type PropertyType.PORT
@@ -209,9 +214,9 @@ public class TServerUtils {
try {
HostAndPort addr = HostAndPort.fromParts(hostname, port);
return TServerUtils.startTServer(serverType, timedProcessor, serverName, threadName,
- minThreads, simpleTimerThreadpoolSize, timeBetweenThreadChecks, maxMessageSize,
- service.getServerSslParams(), service.getSaslParams(),
- service.getClientTimeoutInMillis(), addr);
+ minThreads, allowCoreThreadTimeOut, simpleTimerThreadpoolSize,
+ timeBetweenThreadChecks, maxMessageSize, service.getServerSslParams(),
+ service.getSaslParams(), service.getClientTimeoutInMillis(), addr);
} catch (TTransportException tte) {
log.info("Unable to use port {}, retrying. (Thread Name = {})", port, threadName);
}
@@ -231,8 +236,8 @@ public class TServerUtils {
*/
public static ServerAddress createThreadedSelectorServer(HostAndPort address,
TProcessor processor, TProtocolFactory protocolFactory, final String serverName,
- final int numThreads, final int numSTThreads, long timeBetweenThreadChecks,
- long maxMessageSize) throws TTransportException {
+ final int numThreads, final boolean allowCoreThreadTimeOut, final int numSTThreads,
+ long timeBetweenThreadChecks, long maxMessageSize) throws TTransportException {
final TNonblockingServerSocket transport =
new TNonblockingServerSocket(new InetSocketAddress(address.getHost(), address.getPort()));
@@ -247,8 +252,8 @@ public class TServerUtils {
options.stopTimeoutVal(5);
// Create our own very special thread pool.
- ThreadPoolExecutor pool =
- createSelfResizingThreadPool(serverName, numThreads, numSTThreads, timeBetweenThreadChecks);
+ ThreadPoolExecutor pool = createSelfResizingThreadPool(serverName, numThreads,
+ allowCoreThreadTimeOut, numSTThreads, timeBetweenThreadChecks);
options.executorService(pool);
options.processorFactory(new TProcessorFactory(processor));
@@ -266,8 +271,8 @@ public class TServerUtils {
*/
public static ServerAddress createNonBlockingServer(HostAndPort address, TProcessor processor,
TProtocolFactory protocolFactory, final String serverName, final int numThreads,
- final int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize)
- throws TTransportException {
+ final boolean allowCoreThreadTimeOut, final int numSTThreads, long timeBetweenThreadChecks,
+ long maxMessageSize) throws TTransportException {
final TNonblockingServerSocket transport =
new TNonblockingServerSocket(new InetSocketAddress(address.getHost(), address.getPort()));
@@ -279,8 +284,8 @@ public class TServerUtils {
options.stopTimeoutVal(5);
// Create our own very special thread pool.
- ThreadPoolExecutor pool =
- createSelfResizingThreadPool(serverName, numThreads, numSTThreads, timeBetweenThreadChecks);
+ ThreadPoolExecutor pool = createSelfResizingThreadPool(serverName, numThreads,
+ allowCoreThreadTimeOut, numSTThreads, timeBetweenThreadChecks);
options.executorService(pool);
options.processorFactory(new TProcessorFactory(processor));
@@ -300,7 +305,11 @@ public class TServerUtils {
* @param serverName
* A name to describe the thrift server this executor will service
* @param executorThreads
- * The maximum number of threads for the executor
+ * The minimum number of threads for the executor
+ * @param allowCoreThreadTimeOut
+ * If false, then all threads are allowed to terminate effectively setting the minimum to
+ * 0. Otherwise the core threads defined by executorThreads will always stay around
+ * waiting for work.
* @param simpleTimerThreads
* The numbers of threads used to get the {@link SimpleTimer} instance
* @param timeBetweenThreadChecks
@@ -308,8 +317,10 @@ public class TServerUtils {
* @return A {@link ThreadPoolExecutor} which will resize itself automatically
*/
public static ThreadPoolExecutor createSelfResizingThreadPool(final String serverName,
- final int executorThreads, int simpleTimerThreads, long timeBetweenThreadChecks) {
- final ThreadPoolExecutor pool = new SimpleThreadPool(executorThreads, "ClientPool");
+ final int executorThreads, boolean allowCoreThreadTimeOut, int simpleTimerThreads,
+ long timeBetweenThreadChecks) {
+ final ThreadPoolExecutor pool =
+ new SimpleThreadPool(executorThreads, allowCoreThreadTimeOut, "ClientPool");
// periodically adjust the number of threads we need by checking how busy our threads are
SimpleTimer.getInstance(simpleTimerThreads).schedule(() -> {
// there is a minor race condition between sampling the current state of the thread pool and
@@ -347,13 +358,14 @@ public class TServerUtils {
*/
public static ServerAddress createBlockingServer(HostAndPort address, TProcessor processor,
TProtocolFactory protocolFactory, long maxMessageSize, String serverName, int numThreads,
- int numSimpleTimerThreads, long timeBetweenThreadChecks) throws TTransportException {
+ boolean allowCoreThreadTimeOut, int numSimpleTimerThreads, long timeBetweenThreadChecks)
+ throws TTransportException {
InetSocketAddress isa = new InetSocketAddress(address.getHost(), address.getPort());
// Must use an ISA, providing only a port would ignore the hostname given
TServerSocket transport = new TServerSocket(isa);
ThreadPoolExecutor pool = createSelfResizingThreadPool(serverName, numThreads,
- numSimpleTimerThreads, timeBetweenThreadChecks);
+ allowCoreThreadTimeOut, numSimpleTimerThreads, timeBetweenThreadChecks);
TThreadPoolServer server = createTThreadPoolServer(transport, processor,
ThriftUtil.transportFactory(maxMessageSize), protocolFactory, pool);
@@ -457,8 +469,8 @@ public class TServerUtils {
*/
public static ServerAddress createSslThreadPoolServer(HostAndPort address, TProcessor processor,
TProtocolFactory protocolFactory, long socketTimeout, SslConnectionParams sslParams,
- String serverName, int numThreads, int numSimpleTimerThreads, long timeBetweenThreadChecks)
- throws TTransportException {
+ String serverName, int numThreads, boolean allowCoreThreadTimeOut, int numSimpleTimerThreads,
+ long timeBetweenThreadChecks) throws TTransportException {
TServerSocket transport;
try {
transport = getSslServerSocket(address.getPort(), (int) socketTimeout,
@@ -474,7 +486,7 @@ public class TServerUtils {
}
ThreadPoolExecutor pool = createSelfResizingThreadPool(serverName, numThreads,
- numSimpleTimerThreads, timeBetweenThreadChecks);
+ allowCoreThreadTimeOut, numSimpleTimerThreads, timeBetweenThreadChecks);
return new ServerAddress(createTThreadPoolServer(transport, processor,
ThriftUtil.transportFactory(), protocolFactory, pool), address);
@@ -482,8 +494,8 @@ public class TServerUtils {
public static ServerAddress createSaslThreadPoolServer(HostAndPort address, TProcessor processor,
TProtocolFactory protocolFactory, long socketTimeout, SaslServerConnectionParams params,
- final String serverName, final int numThreads, final int numSTThreads,
- long timeBetweenThreadChecks) throws TTransportException {
+ final String serverName, final int numThreads, final boolean allowCoreThreadTimeOut,
+ final int numSTThreads, long timeBetweenThreadChecks) throws TTransportException {
// We'd really prefer to use THsHaServer (or similar) to avoid 1 RPC == 1 Thread that the
// TThreadPoolServer does,
// but sadly this isn't the case. Because TSaslTransport needs to issue a handshake when it
@@ -564,8 +576,8 @@ public class TServerUtils {
log.info("SASL thrift server bound on {}", address);
}
- ThreadPoolExecutor pool =
- createSelfResizingThreadPool(serverName, numThreads, numSTThreads, timeBetweenThreadChecks);
+ ThreadPoolExecutor pool = createSelfResizingThreadPool(serverName, numThreads,
+ allowCoreThreadTimeOut, numSTThreads, timeBetweenThreadChecks);
final TThreadPoolServer server =
createTThreadPoolServer(transport, processor, ugiTransportFactory, protocolFactory, pool);
@@ -575,9 +587,10 @@ public class TServerUtils {
public static ServerAddress startTServer(MetricsSystem metricsSystem, AccumuloConfiguration conf,
ThriftServerType serverType, TProcessor processor, String serverName, String threadName,
- int numThreads, int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize,
- SslConnectionParams sslParams, SaslServerConnectionParams saslParams,
- long serverSocketTimeout, HostAndPort... addresses) throws TTransportException {
+ int numThreads, boolean allowCoreThreadTimeOut, int numSTThreads,
+ long timeBetweenThreadChecks, long maxMessageSize, SslConnectionParams sslParams,
+ SaslServerConnectionParams saslParams, long serverSocketTimeout, HostAndPort... addresses)
+ throws TTransportException {
if (serverType == ThriftServerType.SASL) {
processor = updateSaslProcessor(serverType, processor);
@@ -585,22 +598,23 @@ public class TServerUtils {
return startTServer(serverType,
new TimedProcessor(metricsSystem, conf, processor, serverName, threadName), serverName,
- threadName, numThreads, numSTThreads, timeBetweenThreadChecks, maxMessageSize, sslParams,
- saslParams, serverSocketTimeout, addresses);
+ threadName, numThreads, allowCoreThreadTimeOut, numSTThreads, timeBetweenThreadChecks,
+ maxMessageSize, sslParams, saslParams, serverSocketTimeout, addresses);
}
/**
* @see #startTServer(ThriftServerType, TimedProcessor, TProtocolFactory, String, String, int,
- * int, long, long, SslConnectionParams, SaslServerConnectionParams, long, HostAndPort...)
+ * boolean, int, long, long, SslConnectionParams, SaslServerConnectionParams, long,
+ * HostAndPort...)
*/
public static ServerAddress startTServer(ThriftServerType serverType, TimedProcessor processor,
- String serverName, String threadName, int numThreads, int numSTThreads,
- long timeBetweenThreadChecks, long maxMessageSize, SslConnectionParams sslParams,
- SaslServerConnectionParams saslParams, long serverSocketTimeout, HostAndPort... addresses)
- throws TTransportException {
+ String serverName, String threadName, int numThreads, boolean allowCoreThreadTimeOut,
+ int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize,
+ SslConnectionParams sslParams, SaslServerConnectionParams saslParams,
+ long serverSocketTimeout, HostAndPort... addresses) throws TTransportException {
return startTServer(serverType, processor, ThriftUtil.protocolFactory(), serverName, threadName,
- numThreads, numSTThreads, timeBetweenThreadChecks, maxMessageSize, sslParams, saslParams,
- serverSocketTimeout, addresses);
+ numThreads, allowCoreThreadTimeOut, numSTThreads, timeBetweenThreadChecks, maxMessageSize,
+ sslParams, saslParams, serverSocketTimeout, addresses);
}
/**
@@ -612,8 +626,8 @@ public class TServerUtils {
*/
public static ServerAddress startTServer(ThriftServerType serverType, TimedProcessor processor,
TProtocolFactory protocolFactory, String serverName, String threadName, int numThreads,
- int numSTThreads, long timeBetweenThreadChecks, long maxMessageSize,
- SslConnectionParams sslParams, SaslServerConnectionParams saslParams,
+ boolean allowCoreThreadTimeOut, int numSTThreads, long timeBetweenThreadChecks,
+ long maxMessageSize, SslConnectionParams sslParams, SaslServerConnectionParams saslParams,
long serverSocketTimeout, HostAndPort... addresses) throws TTransportException {
// This is presently not supported. It's hypothetically possible, I believe, to work, but it
@@ -629,30 +643,33 @@ public class TServerUtils {
switch (serverType) {
case SSL:
log.debug("Instantiating SSL Thrift server");
- serverAddress =
- createSslThreadPoolServer(address, processor, protocolFactory, serverSocketTimeout,
- sslParams, serverName, numThreads, numSTThreads, timeBetweenThreadChecks);
+ serverAddress = createSslThreadPoolServer(address, processor, protocolFactory,
+ serverSocketTimeout, sslParams, serverName, numThreads, allowCoreThreadTimeOut,
+ numSTThreads, timeBetweenThreadChecks);
break;
case SASL:
log.debug("Instantiating SASL Thrift server");
- serverAddress =
- createSaslThreadPoolServer(address, processor, protocolFactory, serverSocketTimeout,
- saslParams, serverName, numThreads, numSTThreads, timeBetweenThreadChecks);
+ serverAddress = createSaslThreadPoolServer(address, processor, protocolFactory,
+ serverSocketTimeout, saslParams, serverName, numThreads, allowCoreThreadTimeOut,
+ numSTThreads, timeBetweenThreadChecks);
break;
case THREADPOOL:
log.debug("Instantiating unsecure TThreadPool Thrift server");
serverAddress = createBlockingServer(address, processor, protocolFactory,
- maxMessageSize, serverName, numThreads, numSTThreads, timeBetweenThreadChecks);
+ maxMessageSize, serverName, numThreads, allowCoreThreadTimeOut, numSTThreads,
+ timeBetweenThreadChecks);
break;
case THREADED_SELECTOR:
log.debug("Instantiating default, unsecure Threaded selector Thrift server");
serverAddress = createThreadedSelectorServer(address, processor, protocolFactory,
- serverName, numThreads, numSTThreads, timeBetweenThreadChecks, maxMessageSize);
+ serverName, numThreads, allowCoreThreadTimeOut, numSTThreads,
+ timeBetweenThreadChecks, maxMessageSize);
break;
case CUSTOM_HS_HA:
log.debug("Instantiating unsecure custom half-async Thrift server");
- serverAddress = createNonBlockingServer(address, processor, protocolFactory, serverName,
- numThreads, numSTThreads, timeBetweenThreadChecks, maxMessageSize);
+ serverAddress =
+ createNonBlockingServer(address, processor, protocolFactory, serverName, numThreads,
+ allowCoreThreadTimeOut, numSTThreads, timeBetweenThreadChecks, maxMessageSize);
break;
default:
throw new IllegalArgumentException("Unknown server type " + serverType);
diff --git a/server/base/src/test/java/org/apache/accumulo/server/util/TServerUtilsTest.java b/server/base/src/test/java/org/apache/accumulo/server/util/TServerUtilsTest.java
index a7585ff..4429651 100644
--- a/server/base/src/test/java/org/apache/accumulo/server/util/TServerUtilsTest.java
+++ b/server/base/src/test/java/org/apache/accumulo/server/util/TServerUtilsTest.java
@@ -392,7 +392,8 @@ public class TServerUtilsTest {
return TServerUtils.startServer(Metrics.initSystem(getClass().getSimpleName()), ctx, hostname,
Property.TSERV_CLIENTPORT, processor, "TServerUtilsTest", "TServerUtilsTestThread",
- Property.TSERV_PORTSEARCH, Property.TSERV_MINTHREADS, Property.TSERV_THREADCHECK,
+ Property.TSERV_PORTSEARCH, Property.TSERV_MINTHREADS,
+ Property.TSERV_MINTHREADS_ALLOW_TIMEOUT, Property.TSERV_THREADCHECK,
Property.GENERAL_MAX_MESSAGE_SIZE);
}
diff --git a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
index 65830df..edb0803 100644
--- a/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
+++ b/server/gc/src/main/java/org/apache/accumulo/gc/SimpleGarbageCollector.java
@@ -653,7 +653,7 @@ public class SimpleGarbageCollector extends AbstractServer implements Iface {
try {
ServerAddress server = TServerUtils.startTServer(getMetricsSystem(), getConfiguration(),
getContext().getThriftServerType(), processor, this.getClass().getSimpleName(),
- "GC Monitor Service", 2,
+ "GC Monitor Service", 2, true,
getConfiguration().getCount(Property.GENERAL_SIMPLETIMER_THREADPOOL_SIZE), 1000,
maxMessageSize, getContext().getServerSslParams(), getContext().getSaslParams(), 0,
addresses);
diff --git a/server/master/src/main/java/org/apache/accumulo/master/Master.java b/server/master/src/main/java/org/apache/accumulo/master/Master.java
index 212cda8..a76f9dd 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/Master.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/Master.java
@@ -1012,8 +1012,8 @@ public class Master extends AbstractServer
try {
sa = TServerUtils.startServer(getMetricsSystem(), context, getHostname(),
Property.MASTER_CLIENTPORT, processor, "Master", "Master Client Service Handler", null,
- Property.MASTER_MINTHREADS, Property.MASTER_THREADCHECK,
- Property.GENERAL_MAX_MESSAGE_SIZE);
+ Property.MASTER_MINTHREADS, Property.MASTER_MINTHREADS_ALLOW_TIMEOUT,
+ Property.MASTER_THREADCHECK, Property.GENERAL_MAX_MESSAGE_SIZE);
} catch (UnknownHostException e) {
throw new IllegalStateException("Unable to start server on host " + getHostname(), e);
}
@@ -1329,7 +1329,7 @@ public class Master extends AbstractServer
ServerAddress replAddress = TServerUtils.startServer(getMetricsSystem(), context, getHostname(),
Property.MASTER_REPLICATION_COORDINATOR_PORT, replicationCoordinatorProcessor,
"Master Replication Coordinator", "Replication Coordinator", null,
- Property.MASTER_REPLICATION_COORDINATOR_MINTHREADS,
+ Property.MASTER_REPLICATION_COORDINATOR_MINTHREADS, null,
Property.MASTER_REPLICATION_COORDINATOR_THREADCHECK, Property.GENERAL_MAX_MESSAGE_SIZE);
log.info("Started replication coordinator service at " + replAddress.address);
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer1/BulkImport.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer1/BulkImport.java
index 9e5b736..850c837 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer1/BulkImport.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer1/BulkImport.java
@@ -206,7 +206,7 @@ public class BulkImport extends MasterRepo {
@SuppressWarnings("deprecation")
int workerCount = serverConfig.getCount(
serverConfig.resolve(Property.MASTER_RENAME_THREADS, Property.MASTER_BULK_RENAME_THREADS));
- SimpleThreadPool workers = new SimpleThreadPool(workerCount, "bulk move");
+ SimpleThreadPool workers = new SimpleThreadPool(workerCount, true, "bulk move");
List<Future<Exception>> results = new ArrayList<>();
for (FileStatus file : mapFiles) {
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer1/LoadFiles.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer1/LoadFiles.java
index ad1a155..0029779 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer1/LoadFiles.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer1/LoadFiles.java
@@ -93,8 +93,10 @@ class LoadFiles extends MasterRepo {
private static synchronized ExecutorService getThreadPool(Master master) {
if (threadPool == null) {
int threadPoolSize = master.getConfiguration().getCount(Property.MASTER_BULK_THREADPOOL_SIZE);
- ThreadPoolExecutor pool = new SimpleThreadPool(threadPoolSize, "bulk import");
- pool.allowCoreThreadTimeOut(true);
+ boolean allowCoreThreadTimeOut =
+ master.getConfiguration().getBoolean(Property.MASTER_BULK_THREADPOOL_ALLOW_TIMEOUT);
+ ThreadPoolExecutor pool =
+ new SimpleThreadPool(threadPoolSize, allowCoreThreadTimeOut, "bulk import");
threadPool = new TraceExecutorService(pool);
}
return threadPool;
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/BulkImportMove.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/BulkImportMove.java
index 1b63213..a5e5690 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/BulkImportMove.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/bulkVer2/BulkImportMove.java
@@ -113,7 +113,7 @@ class BulkImportMove extends MasterRepo {
@SuppressWarnings("deprecation")
int workerCount = aConf.getCount(
aConf.resolve(Property.MASTER_RENAME_THREADS, Property.MASTER_BULK_RENAME_THREADS));
- SimpleThreadPool workers = new SimpleThreadPool(workerCount, "bulkDir move");
+ SimpleThreadPool workers = new SimpleThreadPool(workerCount, true, "bulkDir move");
List<Future<Boolean>> results = new ArrayList<>();
String fmtTid = FateTxId.formatTid(tid);
diff --git a/server/master/src/main/java/org/apache/accumulo/master/tableOps/tableImport/MoveExportedFiles.java b/server/master/src/main/java/org/apache/accumulo/master/tableOps/tableImport/MoveExportedFiles.java
index acea637..401f0ed 100644
--- a/server/master/src/main/java/org/apache/accumulo/master/tableOps/tableImport/MoveExportedFiles.java
+++ b/server/master/src/main/java/org/apache/accumulo/master/tableOps/tableImport/MoveExportedFiles.java
@@ -64,7 +64,7 @@ class MoveExportedFiles extends MasterRepo {
String fmtTid = FateTxId.formatTid(tid);
int workerCount = master.getConfiguration().getCount(Property.MASTER_RENAME_THREADS);
- SimpleThreadPool workers = new SimpleThreadPool(workerCount, "importtable rename");
+ SimpleThreadPool workers = new SimpleThreadPool(workerCount, true, "importtable rename");
List<Future<Boolean>> results = new ArrayList<>();
VolumeManager fs = master.getVolumeManager();
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 8b6047f..fc3c248 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -538,7 +538,8 @@ public class TabletServer extends AbstractServer {
ServerAddress sp = TServerUtils.startServer(getMetricsSystem(), getContext(), address,
Property.TSERV_CLIENTPORT, processor, this.getClass().getSimpleName(),
"Thrift Client Server", Property.TSERV_PORTSEARCH, Property.TSERV_MINTHREADS,
- Property.TSERV_THREADCHECK, maxMessageSizeProperty);
+ Property.TSERV_MINTHREADS_ALLOW_TIMEOUT, Property.TSERV_THREADCHECK,
+ maxMessageSizeProperty);
this.server = sp.server;
return sp.address;
}
@@ -602,10 +603,11 @@ public class TabletServer extends AbstractServer {
Property maxMessageSizeProperty =
getConfiguration().get(Property.TSERV_MAX_MESSAGE_SIZE) != null
? Property.TSERV_MAX_MESSAGE_SIZE : Property.GENERAL_MAX_MESSAGE_SIZE;
- ServerAddress sp = TServerUtils.startServer(getMetricsSystem(), getContext(),
- clientAddress.getHost(), Property.REPLICATION_RECEIPT_SERVICE_PORT, processor,
- "ReplicationServicerHandler", "Replication Servicer", Property.TSERV_PORTSEARCH,
- Property.REPLICATION_MIN_THREADS, Property.REPLICATION_THREADCHECK, maxMessageSizeProperty);
+ ServerAddress sp =
+ TServerUtils.startServer(getMetricsSystem(), getContext(), clientAddress.getHost(),
+ Property.REPLICATION_RECEIPT_SERVICE_PORT, processor, "ReplicationServicerHandler",
+ "Replication Servicer", Property.TSERV_PORTSEARCH, Property.REPLICATION_MIN_THREADS,
+ null, Property.REPLICATION_THREADCHECK, maxMessageSizeProperty);
this.replServer = sp.server;
log.info("Started replication service on {}", sp.address);
@@ -748,7 +750,7 @@ public class TabletServer extends AbstractServer {
}
ThreadPoolExecutor distWorkQThreadPool = new SimpleThreadPool(
- getConfiguration().getCount(Property.TSERV_WORKQ_THREADS), "distributed work queue");
+ getConfiguration().getCount(Property.TSERV_WORKQ_THREADS), true, "distributed work queue");
bulkFailedCopyQ = new DistributedWorkQueue(
getContext().getZooKeeperRoot() + Constants.ZBULK_FAILED_COPYQ, getConfiguration());
@@ -891,7 +893,7 @@ public class TabletServer extends AbstractServer {
// Start the pool to handle outgoing replications
final ThreadPoolExecutor replicationThreadPool = new SimpleThreadPool(
- getConfiguration().getCount(Property.REPLICATION_WORKER_THREADS), "replication task");
+ getConfiguration().getCount(Property.REPLICATION_WORKER_THREADS), true, "replication task");
replWorker.setExecutor(replicationThreadPool);
replWorker.run();
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
index e0d52ce..0c6e749 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
@@ -223,7 +223,7 @@ public class LogSorter {
this.fs = fs;
this.conf = conf;
int threadPoolSize = conf.getCount(Property.TSERV_RECOVERY_MAX_CONCURRENT);
- this.threadPool = new SimpleThreadPool(threadPoolSize, this.getClass().getName());
+ this.threadPool = new SimpleThreadPool(threadPoolSize, true, this.getClass().getName());
this.walBlockSize = DfsLogger.getWalBlockSize(conf);
}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
index 8777d6f..8469bb7 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/TabletServerLogger.java
@@ -264,7 +264,7 @@ public class TabletServerLogger {
if (nextLogMaker != null) {
return;
}
- nextLogMaker = new SimpleThreadPool(1, "WALog creator");
+ nextLogMaker = new SimpleThreadPool(1, true, "WALog creator");
nextLogMaker.submit(new LoggingRunnable(log, new Runnable() {
@Override
public void run() {
diff --git a/test/src/main/java/org/apache/accumulo/test/BalanceWithOfflineTableIT.java b/test/src/main/java/org/apache/accumulo/test/BalanceWithOfflineTableIT.java
index 9010deb..e02ebeb 100644
--- a/test/src/main/java/org/apache/accumulo/test/BalanceWithOfflineTableIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/BalanceWithOfflineTableIT.java
@@ -76,7 +76,7 @@ public class BalanceWithOfflineTableIT extends ConfigurableMacBase {
log.info("Waiting for balance");
- SimpleThreadPool pool = new SimpleThreadPool(1, "waitForBalance");
+ SimpleThreadPool pool = new SimpleThreadPool(1, true, "waitForBalance");
Future<Boolean> wait = pool.submit(() -> {
c.instanceOperations().waitForBalance();
return true;
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/BatchWriterFlushIT.java b/test/src/main/java/org/apache/accumulo/test/functional/BatchWriterFlushIT.java
index 8587efa..01a49b7 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/BatchWriterFlushIT.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/BatchWriterFlushIT.java
@@ -209,7 +209,7 @@ public class BatchWriterFlushIT extends AccumuloClusterHarness {
allMuts.add(muts);
}
- SimpleThreadPool threads = new SimpleThreadPool(NUM_THREADS, "ClientThreads");
+ SimpleThreadPool threads = new SimpleThreadPool(NUM_THREADS, true, "ClientThreads");
threads.allowCoreThreadTimeOut(false);
threads.prestartAllCoreThreads();
diff --git a/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java b/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java
index 899d6b8..d50d1f1 100644
--- a/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java
+++ b/test/src/main/java/org/apache/accumulo/test/functional/ZombieTServer.java
@@ -109,7 +109,7 @@ public class ZombieTServer {
Processor<Iface> processor = new Processor<>(tch);
ServerAddress serverPort = TServerUtils.startTServer(
Metrics.initSystem(ZombieTServer.class.getSimpleName()), context.getConfiguration(),
- ThriftServerType.CUSTOM_HS_HA, processor, "ZombieTServer", "walking dead", 2, 1, 1000,
+ ThriftServerType.CUSTOM_HS_HA, processor, "ZombieTServer", "walking dead", 2, true, 1, 1000,
10 * 1024 * 1024, null, null, -1, HostAndPort.fromParts("0.0.0.0", port));
String addressString = serverPort.address.toString();
diff --git a/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java b/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java
index 9ce7525..2e807e5 100644
--- a/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java
+++ b/test/src/main/java/org/apache/accumulo/test/performance/NullTserver.java
@@ -303,7 +303,7 @@ public class NullTserver {
Processor<Iface> processor = new Processor<>(tch);
TServerUtils.startTServer(Metrics.initSystem(NullTserver.class.getSimpleName()),
context.getConfiguration(), ThriftServerType.CUSTOM_HS_HA, processor, "NullTServer",
- "null tserver", 2, 1, 1000, 10 * 1024 * 1024, null, null, -1,
+ "null tserver", 2, true, 1, 1000, 10 * 1024 * 1024, null, null, -1,
HostAndPort.fromParts("0.0.0.0", opts.port));
HostAndPort addr = HostAndPort.fromParts(InetAddress.getLocalHost().getHostName(), opts.port);