You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by dl...@apache.org on 2021/09/23 17:00:58 UTC

[accumulo] branch main updated: Recreated the SimpleTimer functionality with the new ThreadPools class (#2282)

This is an automated email from the ASF dual-hosted git repository.

dlmarion pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new 455d5ca  Recreated the SimpleTimer functionality with the new ThreadPools class (#2282)
455d5ca is described below

commit 455d5cadbab9618a6fe8be2abd7e4465b562bf14
Author: Dave Marion <dl...@apache.org>
AuthorDate: Thu Sep 23 13:00:51 2021 -0400

    Recreated the SimpleTimer functionality with the new ThreadPools class (#2282)
    
    Commit e62ace6a9d37572d95999f5412c6148efbba50b9 removed SimpleTimer in favor of
    a new ThreadPools class that centralized the creation of thread pools. However, I
    missed the fact that SimpleTimer used a shared thread pool. This change reintroduces
    a shared generic ScheduledThreadPoolExecutor that can be obtained by calling
    ServerContext.getSharedGenericScheduledExecutorService().
    
    Closes #2280
    
    
    Co-authored-by: Keith Turner <kt...@apache.org>
---
 .../accumulo/core/util/threads/ThreadPools.java    |  4 ++
 .../org/apache/accumulo/server/ServerContext.java  | 16 +++++++-
 .../org/apache/accumulo/server/fs/FileManager.java | 10 ++---
 .../accumulo/server/manager/LiveTServerSet.java    |  5 +--
 .../server/zookeeper/DistributedWorkQueue.java     | 45 ++++++++++++----------
 .../java/org/apache/accumulo/manager/Manager.java  | 38 +++++++++---------
 .../accumulo/manager/recovery/RecoveryManager.java |  4 +-
 .../DistributedWorkQueueWorkAssigner.java          |  2 +-
 .../manager/tableOps/bulkVer1/CopyFailed.java      |  2 +-
 .../org/apache/accumulo/tracer/TraceServer.java    |  6 +--
 .../apache/accumulo/tserver/AssignmentHandler.java | 33 ++++++++--------
 .../org/apache/accumulo/tserver/TabletServer.java  | 39 ++++++++++---------
 .../tserver/TabletServerResourceManager.java       | 20 +++++-----
 .../org/apache/accumulo/tserver/log/LogSorter.java |  4 +-
 .../tserver/replication/ReplicationWorker.java     |  5 ++-
 .../accumulo/tserver/session/SessionManager.java   | 13 ++++---
 .../accumulo/tserver/tablet/CompactionWatcher.java |  8 ++--
 .../accumulo/tserver/AssignmentWatcherTest.java    | 20 +++++++---
 18 files changed, 149 insertions(+), 125 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
index 92cff71..4633903 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
@@ -189,6 +189,10 @@ public class ThreadPools {
     return result;
   }
 
+  /*
+   * If you need the server-side shared ScheduledThreadPoolExecutor, then use
+   * ServerContext.getScheduledExecutor()
+   */
   public static ScheduledThreadPoolExecutor
       createGeneralScheduledExecutorService(AccumuloConfiguration conf) {
     return (ScheduledThreadPoolExecutor) createExecutorService(conf,
diff --git a/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java b/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java
index 6f1366d..c665db8 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java
@@ -32,6 +32,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.core.Constants;
@@ -89,6 +90,7 @@ public class ServerContext extends ClientContext {
   private AccumuloConfiguration systemConfig = null;
   private AuthenticationTokenSecretManager secretManager;
   private CryptoService cryptoService = null;
+  private ScheduledThreadPoolExecutor sharedScheduledThreadPool = null;
 
   public ServerContext(SiteConfiguration siteConfig) {
     this(new ServerInfo(siteConfig));
@@ -410,7 +412,7 @@ public class ServerContext extends ClientContext {
   }
 
   private void monitorSwappiness(AccumuloConfiguration config) {
-    ThreadPools.createGeneralScheduledExecutorService(config).scheduleWithFixedDelay(() -> {
+    getScheduledExecutor().scheduleWithFixedDelay(() -> {
       try {
         String procFile = "/proc/sys/vm/swappiness";
         File swappiness = new File(procFile);
@@ -433,4 +435,16 @@ public class ServerContext extends ClientContext {
       }
     }, 1000, 10 * 60 * 1000, TimeUnit.MILLISECONDS);
   }
+
+  /**
+   * return a shared scheduled executor
+   */
+  public synchronized ScheduledThreadPoolExecutor getScheduledExecutor() {
+    if (sharedScheduledThreadPool == null) {
+      sharedScheduledThreadPool = (ScheduledThreadPoolExecutor) ThreadPools
+          .createExecutorService(getConfiguration(), Property.GENERAL_SIMPLETIMER_THREADPOOL_SIZE);
+    }
+    return sharedScheduledThreadPool;
+  }
+
 }
diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/FileManager.java b/server/base/src/main/java/org/apache/accumulo/server/fs/FileManager.java
index 1335e44..4763f59 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/fs/FileManager.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/fs/FileManager.java
@@ -50,7 +50,6 @@ import org.apache.accumulo.core.iteratorsImpl.system.TimeSettingIterator;
 import org.apache.accumulo.core.metadata.TabletFile;
 import org.apache.accumulo.core.metadata.schema.DataFileValue;
 import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
-import org.apache.accumulo.core.util.threads.ThreadPools;
 import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.problems.ProblemReport;
 import org.apache.accumulo.server.problems.ProblemReportingIterator;
@@ -163,13 +162,12 @@ public class FileManager {
     this.openFiles = new HashMap<>();
     this.reservedReaders = new HashMap<>();
 
-    this.maxIdleTime = context.getConfiguration().getTimeInMillis(Property.TSERV_MAX_IDLE);
-    ThreadPools.createGeneralScheduledExecutorService(context.getConfiguration())
-        .scheduleWithFixedDelay(new IdleFileCloser(), maxIdleTime, maxIdleTime / 2,
-            TimeUnit.MILLISECONDS);
+    this.maxIdleTime = this.context.getConfiguration().getTimeInMillis(Property.TSERV_MAX_IDLE);
+    this.context.getScheduledExecutor().scheduleWithFixedDelay(new IdleFileCloser(), maxIdleTime,
+        maxIdleTime / 2, TimeUnit.MILLISECONDS);
 
     this.slowFilePermitMillis =
-        context.getConfiguration().getTimeInMillis(Property.TSERV_SLOW_FILEPERMIT_MILLIS);
+        this.context.getConfiguration().getTimeInMillis(Property.TSERV_SLOW_FILEPERMIT_MILLIS);
   }
 
   private static int countReaders(Map<String,List<OpenReader>> files) {
diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java b/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java
index 7c64c0c..f2a51ee 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java
@@ -44,7 +44,6 @@ import org.apache.accumulo.core.util.AddressUtil;
 import org.apache.accumulo.core.util.Halt;
 import org.apache.accumulo.core.util.HostAndPort;
 import org.apache.accumulo.core.util.ServerServices;
-import org.apache.accumulo.core.util.threads.ThreadPools;
 import org.apache.accumulo.fate.zookeeper.ServiceLock;
 import org.apache.accumulo.fate.zookeeper.ZooCache;
 import org.apache.accumulo.fate.zookeeper.ZooCache.ZcStat;
@@ -254,8 +253,8 @@ public class LiveTServerSet implements Watcher {
 
   public synchronized void startListeningForTabletServerChanges() {
     scanServers();
-    ThreadPools.createGeneralScheduledExecutorService(context.getConfiguration())
-        .scheduleWithFixedDelay(this::scanServers, 0, 5000, TimeUnit.MILLISECONDS);
+    this.context.getScheduledExecutor().scheduleWithFixedDelay(this::scanServers, 0, 5000,
+        TimeUnit.MILLISECONDS);
   }
 
   public synchronized void scanServers() {
diff --git a/server/base/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java b/server/base/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java
index 38e4df6..55fb6eb 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java
@@ -31,10 +31,10 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
-import org.apache.accumulo.core.util.threads.ThreadPools;
 import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
 import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
 import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
+import org.apache.accumulo.server.ServerContext;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.NodeExistsException;
 import org.apache.zookeeper.WatchedEvent;
@@ -59,6 +59,7 @@ public class DistributedWorkQueue {
   private ZooReaderWriter zoo;
   private String path;
   private AccumuloConfiguration config;
+  private ServerContext context;
   private long timerInitialDelay, timerPeriod;
 
   private AtomicInteger numTask = new AtomicInteger(0);
@@ -163,18 +164,23 @@ public class DistributedWorkQueue {
     void process(String workID, byte[] data);
   }
 
-  public DistributedWorkQueue(String path, AccumuloConfiguration config) {
+  public DistributedWorkQueue(String path, AccumuloConfiguration config, ServerContext context) {
     // Preserve the old delay and period
-    this(path, config, new SecureRandom().nextInt(60 * 1000), 60 * 1000);
+    this(path, config, context, new SecureRandom().nextInt(60 * 1000), 60 * 1000);
   }
 
-  public DistributedWorkQueue(String path, AccumuloConfiguration config, long timerInitialDelay,
-      long timerPeriod) {
+  public DistributedWorkQueue(String path, AccumuloConfiguration config, ServerContext context,
+      long timerInitialDelay, long timerPeriod) {
     this.path = path;
     this.config = config;
+    this.context = context;
     this.timerInitialDelay = timerInitialDelay;
     this.timerPeriod = timerPeriod;
-    zoo = new ZooReaderWriter(config);
+    zoo = new ZooReaderWriter(this.config);
+  }
+
+  public ServerContext getContext() {
+    return context;
   }
 
   public ZooReaderWriter getZooReaderWriter() {
@@ -220,20 +226,19 @@ public class DistributedWorkQueue {
     lookForWork(processor, children);
 
     // Add a little jitter to avoid all the tservers slamming zookeeper at once
-    ThreadPools.createGeneralScheduledExecutorService(config)
-        .scheduleWithFixedDelay(new Runnable() {
-          @Override
-          public void run() {
-            log.debug("Looking for work in {}", path);
-            try {
-              lookForWork(processor, zoo.getChildren(path));
-            } catch (KeeperException e) {
-              log.error("Failed to look for work", e);
-            } catch (InterruptedException e) {
-              log.info("Interrupted looking for work", e);
-            }
-          }
-        }, timerInitialDelay, timerPeriod, TimeUnit.MILLISECONDS);
+    context.getScheduledExecutor().scheduleWithFixedDelay(new Runnable() {
+      @Override
+      public void run() {
+        log.debug("Looking for work in {}", path);
+        try {
+          lookForWork(processor, zoo.getChildren(path));
+        } catch (KeeperException e) {
+          log.error("Failed to look for work", e);
+        } catch (InterruptedException e) {
+          log.info("Interrupted looking for work", e);
+        }
+      }
+    }, timerInitialDelay, timerPeriod, TimeUnit.MILLISECONDS);
   }
 
   /**
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index 0f28945..4ce6f30 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -253,12 +253,11 @@ public class Manager extends AbstractServer
     if (newState == ManagerState.STOP) {
       // Give the server a little time before shutdown so the client
       // thread requesting the stop can return
-      ThreadPools.createGeneralScheduledExecutorService(getConfiguration())
-          .scheduleWithFixedDelay(() -> {
-            // This frees the main thread and will cause the manager to exit
-            clientService.stop();
-            Manager.this.nextEvent.event("stopped event loop");
-          }, 100L, 1000L, TimeUnit.MILLISECONDS);
+      getContext().getScheduledExecutor().scheduleWithFixedDelay(() -> {
+        // This frees the main thread and will cause the manager to exit
+        clientService.stop();
+        Manager.this.nextEvent.event("stopped event loop");
+      }, 100L, 1000L, TimeUnit.MILLISECONDS);
     }
 
     if (oldState != newState && (newState == ManagerState.HAVE_LOCK)) {
@@ -1130,8 +1129,8 @@ public class Manager extends AbstractServer
       fate = new Fate<>(this, store, TraceRepo::toLogString);
       fate.startTransactionRunners(getConfiguration());
 
-      ThreadPools.createGeneralScheduledExecutorService(getConfiguration())
-          .scheduleWithFixedDelay(store::ageOff, 63000, 63000, TimeUnit.MILLISECONDS);
+      context.getScheduledExecutor().scheduleWithFixedDelay(store::ageOff, 63000, 63000,
+          TimeUnit.MILLISECONDS);
     } catch (KeeperException | InterruptedException e) {
       throw new IllegalStateException("Exception setting up FaTE cleanup thread", e);
     }
@@ -1182,18 +1181,17 @@ public class Manager extends AbstractServer
 
     // if the replication name is ever set, then start replication services
     final AtomicReference<TServer> replServer = new AtomicReference<>();
-    ThreadPools.createGeneralScheduledExecutorService(getConfiguration())
-        .scheduleWithFixedDelay(() -> {
-          try {
-            if ((replServer.get() == null)
-                && !getConfiguration().get(Property.REPLICATION_NAME).isEmpty()) {
-              log.info("{} was set, starting repl services.", Property.REPLICATION_NAME.getKey());
-              replServer.set(setupReplication());
-            }
-          } catch (UnknownHostException | KeeperException | InterruptedException e) {
-            log.error("Error occurred starting replication services. ", e);
-          }
-        }, 0, 5000, TimeUnit.MILLISECONDS);
+    context.getScheduledExecutor().scheduleWithFixedDelay(() -> {
+      try {
+        if ((replServer.get() == null)
+            && !getConfiguration().get(Property.REPLICATION_NAME).isEmpty()) {
+          log.info("{} was set, starting repl services.", Property.REPLICATION_NAME.getKey());
+          replServer.set(setupReplication());
+        }
+      } catch (UnknownHostException | KeeperException | InterruptedException e) {
+        log.error("Error occurred starting replication services. ", e);
+      }
+    }, 0, 5000, TimeUnit.MILLISECONDS);
 
     // Register metrics modules
     int failureCount = new ManagerMetricsFactory(getConfiguration()).register(this);
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java b/server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java
index 5d28f10..3b7b5d6 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java
@@ -77,7 +77,7 @@ public class RecoveryManager {
     try {
       List<String> workIDs =
           new DistributedWorkQueue(manager.getZooKeeperRoot() + Constants.ZRECOVERY,
-              manager.getConfiguration()).getWorkQueued();
+              manager.getConfiguration(), manager.getContext()).getWorkQueued();
       sortsQueued.addAll(workIDs);
     } catch (Exception e) {
       log.warn("{}", e.getMessage(), e);
@@ -129,7 +129,7 @@ public class RecoveryManager {
       throws KeeperException, InterruptedException {
     String work = source + "|" + destination;
     new DistributedWorkQueue(manager.getZooKeeperRoot() + Constants.ZRECOVERY,
-        manager.getConfiguration()).addWork(sortId, work.getBytes(UTF_8));
+        manager.getConfiguration(), manager.getContext()).addWork(sortId, work.getBytes(UTF_8));
 
     synchronized (this) {
       sortsQueued.add(sortId);
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/replication/DistributedWorkQueueWorkAssigner.java b/server/manager/src/main/java/org/apache/accumulo/manager/replication/DistributedWorkQueueWorkAssigner.java
index fe757f6..835cf81 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/replication/DistributedWorkQueueWorkAssigner.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/replication/DistributedWorkQueueWorkAssigner.java
@@ -91,7 +91,7 @@ public abstract class DistributedWorkQueueWorkAssigner implements WorkAssigner {
   protected void initializeWorkQueue(AccumuloConfiguration conf) {
     workQueue =
         new DistributedWorkQueue(ZooUtil.getRoot(client.instanceOperations().getInstanceID())
-            + ReplicationConstants.ZOO_WORK_QUEUE, conf);
+            + ReplicationConstants.ZOO_WORK_QUEUE, conf, this.workQueue.getContext());
   }
 
   @Override
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer1/CopyFailed.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer1/CopyFailed.java
index e0b72cd..6f60f18 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer1/CopyFailed.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer1/CopyFailed.java
@@ -143,7 +143,7 @@ class CopyFailed extends ManagerRepo {
     if (!loadedFailures.isEmpty()) {
       DistributedWorkQueue bifCopyQueue = new DistributedWorkQueue(
           Constants.ZROOT + "/" + manager.getInstanceID() + Constants.ZBULK_FAILED_COPYQ,
-          manager.getConfiguration());
+          manager.getConfiguration(), manager.getContext());
 
       HashSet<String> workIds = new HashSet<>();
 
diff --git a/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java b/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
index 7b03a7b..439de34 100644
--- a/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
+++ b/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
@@ -53,7 +53,6 @@ import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.Mutation;
 import org.apache.accumulo.core.data.Value;
 import org.apache.accumulo.core.iterators.user.AgeOffFilter;
-import org.apache.accumulo.core.util.threads.ThreadPools;
 import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
 import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
 import org.apache.accumulo.server.ServerContext;
@@ -301,9 +300,8 @@ public class TraceServer implements Watcher, AutoCloseable {
   }
 
   public void run() {
-    ThreadPools.createGeneralScheduledExecutorService(context.getConfiguration())
-        .scheduleWithFixedDelay(this::flush, SCHEDULE_DELAY, SCHEDULE_PERIOD,
-            TimeUnit.MILLISECONDS);
+    context.getScheduledExecutor().scheduleWithFixedDelay(this::flush, SCHEDULE_DELAY,
+        SCHEDULE_PERIOD, TimeUnit.MILLISECONDS);
     server.serve();
   }
 
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/AssignmentHandler.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/AssignmentHandler.java
index 834d769..03d8dfc 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/AssignmentHandler.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/AssignmentHandler.java
@@ -29,7 +29,6 @@ import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.manager.thrift.TabletLoadState;
 import org.apache.accumulo.core.metadata.schema.TabletMetadata;
-import org.apache.accumulo.core.util.threads.ThreadPools;
 import org.apache.accumulo.core.util.threads.Threads;
 import org.apache.accumulo.server.manager.state.Assignment;
 import org.apache.accumulo.server.manager.state.TabletStateStore;
@@ -216,24 +215,22 @@ class AssignmentHandler implements Runnable {
       server.enqueueManagerMessage(new TabletStatusMessage(TabletLoadState.LOAD_FAILURE, extent));
       long reschedule = Math.min((1L << Math.min(32, retryAttempt)) * 1000, 10 * 60 * 1000L);
       log.warn(String.format("rescheduling tablet load in %.2f seconds", reschedule / 1000.));
-      ThreadPools.createGeneralScheduledExecutorService(server.getConfiguration())
-          .schedule(new Runnable() {
-            @Override
-            public void run() {
-              log.info("adding tablet {} back to the assignment pool (retry {})", extent,
-                  retryAttempt);
-              AssignmentHandler handler = new AssignmentHandler(server, extent, retryAttempt + 1);
-              if (extent.isMeta()) {
-                if (extent.isRootTablet()) {
-                  Threads.createThread("Root tablet assignment retry", handler).start();
-                } else {
-                  server.resourceManager.addMetaDataAssignment(extent, log, handler);
-                }
-              } else {
-                server.resourceManager.addAssignment(extent, log, handler);
-              }
+      this.server.getContext().getScheduledExecutor().schedule(new Runnable() {
+        @Override
+        public void run() {
+          log.info("adding tablet {} back to the assignment pool (retry {})", extent, retryAttempt);
+          AssignmentHandler handler = new AssignmentHandler(server, extent, retryAttempt + 1);
+          if (extent.isMeta()) {
+            if (extent.isRootTablet()) {
+              Threads.createThread("Root tablet assignment retry", handler).start();
+            } else {
+              server.resourceManager.addMetaDataAssignment(extent, log, handler);
             }
-          }, reschedule, TimeUnit.MILLISECONDS);
+          } else {
+            server.resourceManager.addAssignment(extent, log, handler);
+          }
+        }
+      }, reschedule, TimeUnit.MILLISECONDS);
     }
   }
 }
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 2160e6b..2dc9aac 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -227,6 +227,7 @@ public class TabletServer extends AbstractServer {
 
   private final ZooAuthenticationKeyWatcher authKeyWatcher;
   private final WalStateManager walMarker;
+  private final ServerContext context;
 
   public static void main(String[] args) throws Exception {
     try (TabletServer tserver = new TabletServer(new ServerOpts(), args)) {
@@ -236,13 +237,13 @@ public class TabletServer extends AbstractServer {
 
   protected TabletServer(ServerOpts opts, String[] args) {
     super("tserver", opts, args);
-    ServerContext context = super.getContext();
+    context = super.getContext();
     context.setupCrypto();
     this.managerLockCache = new ZooCache(context.getZooReaderWriter(), null);
     final AccumuloConfiguration aconf = getConfiguration();
     log.info("Version " + Constants.VERSION);
     log.info("Instance " + getInstanceID());
-    this.sessionManager = new SessionManager(aconf);
+    this.sessionManager = new SessionManager(context);
     this.logSorter = new LogSorter(context, aconf);
     this.replWorker = new ReplicationWorker(context);
     this.statsKeeper = new TabletStatsKeeper();
@@ -257,7 +258,7 @@ public class TabletServer extends AbstractServer {
     // This thread will calculate and log out the busiest tablets based on ingest count and
     // query count every #{logBusiestTabletsDelay}
     if (numBusyTabletsToLog > 0) {
-      ThreadPools.createGeneralScheduledExecutorService(aconf)
+      context.getScheduledExecutor()
           .scheduleWithFixedDelay(Threads.createNamedRunnable("BusyTabletLogger", new Runnable() {
             private BusiestTracker ingestTracker =
                 BusiestTracker.newBusiestIngestTracker(numBusyTabletsToLog);
@@ -284,7 +285,7 @@ public class TabletServer extends AbstractServer {
           }), logBusyTabletsDelay, logBusyTabletsDelay, TimeUnit.MILLISECONDS);
     }
 
-    ThreadPools.createGeneralScheduledExecutorService(aconf)
+    context.getScheduledExecutor()
         .scheduleWithFixedDelay(Threads.createNamedRunnable("TabletRateUpdater", new Runnable() {
           @Override
           public void run() {
@@ -349,8 +350,8 @@ public class TabletServer extends AbstractServer {
     scanMetrics = new TabletServerScanMetrics();
     mincMetrics = new TabletServerMinCMetrics();
     ceMetrics = new CompactionExecutorsMetrics();
-    ThreadPools.createGeneralScheduledExecutorService(aconf).scheduleWithFixedDelay(
-        TabletLocator::clearLocators, jitter(), jitter(), TimeUnit.MILLISECONDS);
+    context.getScheduledExecutor().scheduleWithFixedDelay(TabletLocator::clearLocators, jitter(),
+        jitter(), TimeUnit.MILLISECONDS);
     walMarker = new WalStateManager(context);
 
     // Create the secret manager
@@ -425,8 +426,8 @@ public class TabletServer extends AbstractServer {
 
   private class MajorCompactor implements Runnable {
 
-    public MajorCompactor(AccumuloConfiguration config) {
-      CompactionWatcher.startWatching(config);
+    public MajorCompactor(ServerContext context) {
+      CompactionWatcher.startWatching(context);
     }
 
     @Override
@@ -763,8 +764,9 @@ public class TabletServer extends AbstractServer {
     ThreadPoolExecutor distWorkQThreadPool = (ThreadPoolExecutor) ThreadPools
         .createExecutorService(getConfiguration(), Property.TSERV_WORKQ_THREADS);
 
-    bulkFailedCopyQ = new DistributedWorkQueue(
-        getContext().getZooKeeperRoot() + Constants.ZBULK_FAILED_COPYQ, getConfiguration());
+    bulkFailedCopyQ =
+        new DistributedWorkQueue(getContext().getZooKeeperRoot() + Constants.ZBULK_FAILED_COPYQ,
+            getConfiguration(), getContext());
     try {
       bulkFailedCopyQ.startProcessing(new BulkFailedCopyProcessor(getContext()),
           distWorkQThreadPool);
@@ -780,7 +782,7 @@ public class TabletServer extends AbstractServer {
     }
     final AccumuloConfiguration aconf = getConfiguration();
     // if the replication name is ever set, then start replication services
-    ThreadPools.createGeneralScheduledExecutorService(aconf).scheduleWithFixedDelay(() -> {
+    context.getScheduledExecutor().scheduleWithFixedDelay(() -> {
       if (this.replServer == null) {
         if (!getConfiguration().get(Property.REPLICATION_NAME).isEmpty()) {
           log.info(Property.REPLICATION_NAME.getKey() + " was set, starting repl services.");
@@ -790,9 +792,8 @@ public class TabletServer extends AbstractServer {
     }, 0, 5000, TimeUnit.MILLISECONDS);
 
     final long CLEANUP_BULK_LOADED_CACHE_MILLIS = 15 * 60 * 1000;
-    ThreadPools.createGeneralScheduledExecutorService(aconf).scheduleWithFixedDelay(
-        new BulkImportCacheCleaner(this), CLEANUP_BULK_LOADED_CACHE_MILLIS,
-        CLEANUP_BULK_LOADED_CACHE_MILLIS, TimeUnit.MILLISECONDS);
+    context.getScheduledExecutor().scheduleWithFixedDelay(new BulkImportCacheCleaner(this),
+        CLEANUP_BULK_LOADED_CACHE_MILLIS, CLEANUP_BULK_LOADED_CACHE_MILLIS, TimeUnit.MILLISECONDS);
 
     HostAndPort managerHost;
     while (!serverStopRequested) {
@@ -913,8 +914,8 @@ public class TabletServer extends AbstractServer {
     Runnable replicationWorkThreadPoolResizer = () -> {
       ThreadPools.resizePool(replicationThreadPool, aconf, Property.REPLICATION_WORKER_THREADS);
     };
-    ThreadPools.createGeneralScheduledExecutorService(aconf).scheduleWithFixedDelay(
-        replicationWorkThreadPoolResizer, 10000, 30000, TimeUnit.MILLISECONDS);
+    context.getScheduledExecutor().scheduleWithFixedDelay(replicationWorkThreadPoolResizer, 10000,
+        30000, TimeUnit.MILLISECONDS);
   }
 
   static boolean checkTabletMetadata(KeyExtent extent, TServerInstance instance,
@@ -1006,7 +1007,7 @@ public class TabletServer extends AbstractServer {
 
   private void config() {
     log.info("Tablet server starting on {}", getHostname());
-    Threads.createThread("Split/MajC initiator", new MajorCompactor(getConfiguration())).start();
+    Threads.createThread("Split/MajC initiator", new MajorCompactor(context)).start();
 
     clientAddress = HostAndPort.fromParts(getHostname(), 0);
 
@@ -1016,8 +1017,8 @@ public class TabletServer extends AbstractServer {
 
     Runnable gcDebugTask = () -> gcLogger.logGCInfo(getConfiguration());
 
-    ThreadPools.createGeneralScheduledExecutorService(aconf).scheduleWithFixedDelay(gcDebugTask, 0,
-        TIME_BETWEEN_GC_CHECKS, TimeUnit.MILLISECONDS);
+    context.getScheduledExecutor().scheduleWithFixedDelay(gcDebugTask, 0, TIME_BETWEEN_GC_CHECKS,
+        TimeUnit.MILLISECONDS);
   }
 
   public TabletServerStatus getStats(Map<TableId,MapCounter<ScanRunState>> scanCounts) {
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
index e9c478a..7fa1cfd 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
@@ -136,10 +136,9 @@ public class TabletServerResourceManager {
    */
   private void modifyThreadPoolSizesAtRuntime(IntSupplier maxThreads, String name,
       final ThreadPoolExecutor tp) {
-    ThreadPools.createGeneralScheduledExecutorService(context.getConfiguration())
-        .scheduleWithFixedDelay(() -> {
-          ThreadPools.resizePool(tp, maxThreads, name);
-        }, 1000, 10_000, TimeUnit.MILLISECONDS);
+    context.getScheduledExecutor().scheduleWithFixedDelay(() -> {
+      ThreadPools.resizePool(tp, maxThreads, name);
+    }, 1000, 10_000, TimeUnit.MILLISECONDS);
   }
 
   private ExecutorService createPriorityExecutor(ScanExecutorConfig sec,
@@ -378,8 +377,8 @@ public class TabletServerResourceManager {
 
     // We can use the same map for both metadata and normal assignments since the keyspace (extent)
     // is guaranteed to be unique. Schedule the task once, the task will reschedule itself.
-    ThreadPools.createGeneralScheduledExecutorService(context.getConfiguration())
-        .schedule(new AssignmentWatcher(acuConf, activeAssignments), 5000, TimeUnit.MILLISECONDS);
+    context.getScheduledExecutor().schedule(
+        new AssignmentWatcher(acuConf, context, activeAssignments), 5000, TimeUnit.MILLISECONDS);
   }
 
   /**
@@ -392,17 +391,19 @@ public class TabletServerResourceManager {
 
     private final Map<KeyExtent,RunnableStartedAt> activeAssignments;
     private final AccumuloConfiguration conf;
+    private final ServerContext context;
 
-    public AssignmentWatcher(AccumuloConfiguration conf,
+    public AssignmentWatcher(AccumuloConfiguration conf, ServerContext context,
         Map<KeyExtent,RunnableStartedAt> activeAssignments) {
       this.conf = conf;
+      this.context = context;
       this.activeAssignments = activeAssignments;
     }
 
     @Override
     public void run() {
       final long millisBeforeWarning =
-          conf.getTimeInMillis(Property.TSERV_ASSIGNMENT_DURATION_WARNING);
+          this.conf.getTimeInMillis(Property.TSERV_ASSIGNMENT_DURATION_WARNING);
       try {
         long now = System.currentTimeMillis();
         KeyExtent extent;
@@ -428,8 +429,7 @@ public class TabletServerResourceManager {
         if (log.isTraceEnabled()) {
           log.trace("Rescheduling assignment watcher to run in {}ms", delay);
         }
-        ThreadPools.createGeneralScheduledExecutorService(conf).schedule(this, delay,
-            TimeUnit.MILLISECONDS);
+        context.getScheduledExecutor().schedule(this, delay, TimeUnit.MILLISECONDS);
       }
     }
   }
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
index ed8a601..938d0c1 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
@@ -276,8 +276,8 @@ public class LogSorter {
   public void startWatchingForRecoveryLogs(ThreadPoolExecutor distWorkQThreadPool)
       throws KeeperException, InterruptedException {
     this.threadPool = distWorkQThreadPool;
-    new DistributedWorkQueue(context.getZooKeeperRoot() + Constants.ZRECOVERY, sortedLogConf)
-        .startProcessing(new LogProcessor(), this.threadPool);
+    new DistributedWorkQueue(context.getZooKeeperRoot() + Constants.ZRECOVERY, sortedLogConf,
+        context).startProcessing(new LogProcessor(), this.threadPool);
   }
 
   public List<RecoveryStatus> getLogSorts() {
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationWorker.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationWorker.java
index c9d03bb..5d23093 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationWorker.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationWorker.java
@@ -61,11 +61,12 @@ public class ReplicationWorker implements Runnable {
         log.debug("Configuration DistributedWorkQueue with delay and period of {} and {}", delay,
             period);
         workQueue = new DistributedWorkQueue(
-            context.getZooKeeperRoot() + ReplicationConstants.ZOO_WORK_QUEUE, conf, delay, period);
+            context.getZooKeeperRoot() + ReplicationConstants.ZOO_WORK_QUEUE, conf, context, delay,
+            period);
       } else {
         log.debug("Configuring DistributedWorkQueue with default delay and period");
         workQueue = new DistributedWorkQueue(
-            context.getZooKeeperRoot() + ReplicationConstants.ZOO_WORK_QUEUE, conf);
+            context.getZooKeeperRoot() + ReplicationConstants.ZOO_WORK_QUEUE, conf, context);
       }
 
       workQueue.startProcessing(new ReplicationProcessor(context), executor);
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
index 2a80d8b..a1150fe 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
@@ -43,6 +43,7 @@ import org.apache.accumulo.core.tabletserver.thrift.ScanState;
 import org.apache.accumulo.core.tabletserver.thrift.ScanType;
 import org.apache.accumulo.core.util.MapCounter;
 import org.apache.accumulo.core.util.threads.ThreadPools;
+import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.tserver.scan.ScanRunState;
 import org.apache.accumulo.tserver.scan.ScanTask;
 import org.apache.accumulo.tserver.session.Session.State;
@@ -65,10 +66,10 @@ public class SessionManager {
   private final Long expiredSessionMarker = (long) -1;
   private final AccumuloConfiguration aconf;
 
-  public SessionManager(AccumuloConfiguration conf) {
-    aconf = conf;
-    maxUpdateIdle = conf.getTimeInMillis(Property.TSERV_UPDATE_SESSION_MAXIDLE);
-    maxIdle = conf.getTimeInMillis(Property.TSERV_SESSION_MAXIDLE);
+  public SessionManager(ServerContext context) {
+    this.aconf = context.getConfiguration();
+    maxUpdateIdle = aconf.getTimeInMillis(Property.TSERV_UPDATE_SESSION_MAXIDLE);
+    maxIdle = aconf.getTimeInMillis(Property.TSERV_SESSION_MAXIDLE);
 
     SecureRandom sr;
     try {
@@ -87,8 +88,8 @@ public class SessionManager {
       }
     };
 
-    ThreadPools.createGeneralScheduledExecutorService(conf).scheduleWithFixedDelay(r, 0,
-        Math.max(maxIdle / 2, 1000), TimeUnit.MILLISECONDS);
+    context.getScheduledExecutor().scheduleWithFixedDelay(r, 0, Math.max(maxIdle / 2, 1000),
+        TimeUnit.MILLISECONDS);
   }
 
   public long createSession(Session session, boolean reserve) {
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactionWatcher.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactionWatcher.java
index 65ed473..0d593ae 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactionWatcher.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactionWatcher.java
@@ -28,7 +28,7 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.util.threads.ThreadPools;
+import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.server.compaction.CompactionInfo;
 import org.apache.accumulo.server.compaction.FileCompactor;
 import org.slf4j.LoggerFactory;
@@ -106,10 +106,10 @@ public class CompactionWatcher implements Runnable {
     }
   }
 
-  public static synchronized void startWatching(AccumuloConfiguration config) {
+  public static synchronized void startWatching(ServerContext context) {
     if (!watching) {
-      ThreadPools.createGeneralScheduledExecutorService(config).scheduleWithFixedDelay(
-          new CompactionWatcher(config), 10000, 10000, TimeUnit.MILLISECONDS);
+      context.getScheduledExecutor().scheduleWithFixedDelay(
+          new CompactionWatcher(context.getConfiguration()), 10000, 10000, TimeUnit.MILLISECONDS);
       watching = true;
     }
   }
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/AssignmentWatcherTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/AssignmentWatcherTest.java
index 677c8fd..bf55357 100644
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/AssignmentWatcherTest.java
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/AssignmentWatcherTest.java
@@ -20,11 +20,14 @@ package org.apache.accumulo.tserver;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
 
 import org.apache.accumulo.core.conf.AccumuloConfiguration;
 import org.apache.accumulo.core.conf.Property;
 import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.server.ServerContext;
 import org.apache.accumulo.tserver.TabletServerResourceManager.AssignmentWatcher;
 import org.easymock.EasyMock;
 import org.junit.Before;
@@ -33,28 +36,33 @@ import org.junit.Test;
 public class AssignmentWatcherTest {
 
   private Map<KeyExtent,RunnableStartedAt> assignments;
+  private ServerContext context;
   private AccumuloConfiguration conf;
   private AssignmentWatcher watcher;
 
   @Before
   public void setup() {
     assignments = new HashMap<>();
-    conf = EasyMock.createMock(AccumuloConfiguration.class);
-    watcher = new AssignmentWatcher(conf, assignments);
+    context = EasyMock.createMock(ServerContext.class);
+    conf = EasyMock.createNiceMock(AccumuloConfiguration.class);
+    watcher = new AssignmentWatcher(conf, context, assignments);
   }
 
   @Test
   public void testAssignmentWarning() {
     ActiveAssignmentRunnable task = EasyMock.createMock(ActiveAssignmentRunnable.class);
     RunnableStartedAt run = new RunnableStartedAt(task, System.currentTimeMillis());
-    EasyMock.expect(conf.getTimeInMillis(Property.TSERV_ASSIGNMENT_DURATION_WARNING)).andReturn(0L);
-    EasyMock.expect(conf.getCount(Property.GENERAL_SIMPLETIMER_THREADPOOL_SIZE)).andReturn(1);
+    EasyMock.expect(context.getConfiguration()).andReturn(conf).anyTimes();
+    EasyMock.expect(conf.getCount(EasyMock.isA(Property.class))).andReturn(1).anyTimes();
+    EasyMock.expect(conf.getTimeInMillis(EasyMock.isA(Property.class))).andReturn(0L).anyTimes();
+    EasyMock.expect(context.getScheduledExecutor())
+        .andReturn((ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1)).anyTimes();
     assignments.put(new KeyExtent(TableId.of("1"), null, null), run);
 
     EasyMock.expect(task.getException()).andReturn(new Exception("Assignment warning happened"));
-    EasyMock.replay(conf, task);
+    EasyMock.replay(context, task);
     watcher.run();
-    EasyMock.verify(conf, task);
+    EasyMock.verify(context, task);
   }
 
 }