You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by mm...@apache.org on 2022/03/25 11:35:54 UTC
[accumulo] branch main updated: Use new ThreadPools method to clean up code (#2589)
This is an automated email from the ASF dual-hosted git repository.
mmiller pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new 7c3a978 Use new ThreadPools method to clean up code (#2589)
7c3a978 is described below
commit 7c3a9783ad01809f7c1bd5cf22471a1ffa421852
Author: Mike Miller <mm...@apache.org>
AuthorDate: Fri Mar 25 07:35:47 2022 -0400
Use new ThreadPools method to clean up code (#2589)
---
.../apache/accumulo/server/rpc/TServerUtils.java | 29 ++++++++++------------
.../accumulo/server/util/FileSystemMonitor.java | 26 +++++++++----------
2 files changed, 25 insertions(+), 30 deletions(-)
diff --git a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
index b176175..ba8ba2d 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/rpc/TServerUtils.java
@@ -314,22 +314,19 @@ public class TServerUtils {
final ThreadPoolExecutor pool = ThreadPools.getServerThreadPools().createFixedThreadPool(
executorThreads, threadTimeOut, TimeUnit.MILLISECONDS, serverName + "-ClientPool", true);
// periodically adjust the number of threads we need by checking how busy our threads are
- ThreadPools.watchCriticalScheduledTask(ThreadPools.getServerThreadPools()
- .createGeneralScheduledExecutorService(conf).scheduleWithFixedDelay(() -> {
- // there is a minor race condition between sampling the current state of the thread pool
- // and
- // adjusting it
- // however, this isn't really an issue, since it adjusts periodically anyway
- if (pool.getCorePoolSize() <= pool.getActiveCount()) {
- int larger = pool.getCorePoolSize() + Math.min(pool.getQueue().size(), 2);
- ThreadPools.resizePool(pool, () -> larger, serverName + "-ClientPool");
- } else {
- if (pool.getCorePoolSize() > pool.getActiveCount() + 3) {
- int smaller = Math.max(executorThreads, pool.getCorePoolSize() - 1);
- ThreadPools.resizePool(pool, () -> smaller, serverName + "-ClientPool");
- }
- }
- }, timeBetweenThreadChecks, timeBetweenThreadChecks, TimeUnit.MILLISECONDS));
+ ThreadPools.watchCriticalFixedDelay(conf, timeBetweenThreadChecks, () -> {
+ // there is a minor race condition between sampling the current state of the thread pool
+ // and adjusting it however, this isn't really an issue, since it adjusts periodically
+ if (pool.getCorePoolSize() <= pool.getActiveCount()) {
+ int larger = pool.getCorePoolSize() + Math.min(pool.getQueue().size(), 2);
+ ThreadPools.resizePool(pool, () -> larger, serverName + "-ClientPool");
+ } else {
+ if (pool.getCorePoolSize() > pool.getActiveCount() + 3) {
+ int smaller = Math.max(executorThreads, pool.getCorePoolSize() - 1);
+ ThreadPools.resizePool(pool, () -> smaller, serverName + "-ClientPool");
+ }
+ }
+ });
return pool;
}
diff --git a/server/base/src/main/java/org/apache/accumulo/server/util/FileSystemMonitor.java b/server/base/src/main/java/org/apache/accumulo/server/util/FileSystemMonitor.java
index 255f453..1bcc77c 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/util/FileSystemMonitor.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/util/FileSystemMonitor.java
@@ -29,7 +29,6 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
@@ -117,20 +116,19 @@ public class FileSystemMonitor {
// Create a task to check each mount periodically to see if its state has changed.
for (Mount mount : mounts) {
- ThreadPools.watchCriticalScheduledTask(ThreadPools.getServerThreadPools()
- .createGeneralScheduledExecutorService(conf).scheduleWithFixedDelay(
- Threads.createNamedRunnable(mount.mountPoint + "filesystem monitor", () -> {
- try {
- checkMount(mount);
- } catch (final Exception e) {
- Halt.halt(-42, new Runnable() {
- @Override
- public void run() {
- log.error("Exception while checking mount points, halting process", e);
- }
- });
+ ThreadPools.watchCriticalFixedDelay(conf, period,
+ Threads.createNamedRunnable(mount.mountPoint + "filesystem monitor", () -> {
+ try {
+ checkMount(mount);
+ } catch (final Exception e) {
+ Halt.halt(-42, new Runnable() {
+ @Override
+ public void run() {
+ log.error("Exception while checking mount points, halting process", e);
}
- }), period, period, TimeUnit.MILLISECONDS));
+ });
+ }
+ }));
}
}