You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by dl...@apache.org on 2021/09/23 17:00:58 UTC
[accumulo] branch main updated: Recreated the SimpleTimer
functionality with the new ThreadPools class (#2282)
This is an automated email from the ASF dual-hosted git repository.
dlmarion pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git
The following commit(s) were added to refs/heads/main by this push:
new 455d5ca Recreated the SimpleTimer functionality with the new ThreadPools class (#2282)
455d5ca is described below
commit 455d5cadbab9618a6fe8be2abd7e4465b562bf14
Author: Dave Marion <dl...@apache.org>
AuthorDate: Thu Sep 23 13:00:51 2021 -0400
Recreated the SimpleTimer functionality with the new ThreadPools class (#2282)
Commit e62ace6a9d37572d95999f5412c6148efbba50b9 removed SimpleTimer in favor of
a new ThreadPools class that centralized the creation of thread pools. However, I
missed the fact that SimpleTimer used a shared thread pool. This change reintroduces
a shared generic ScheduledThreadPoolExecutor that can be obtained by calling
ServerContext.getSharedGenericScheduledExecutorService().
Closes #2280
Co-authored-by: Keith Turner <kt...@apache.org>
---
.../accumulo/core/util/threads/ThreadPools.java | 4 ++
.../org/apache/accumulo/server/ServerContext.java | 16 +++++++-
.../org/apache/accumulo/server/fs/FileManager.java | 10 ++---
.../accumulo/server/manager/LiveTServerSet.java | 5 +--
.../server/zookeeper/DistributedWorkQueue.java | 45 ++++++++++++----------
.../java/org/apache/accumulo/manager/Manager.java | 38 +++++++++---------
.../accumulo/manager/recovery/RecoveryManager.java | 4 +-
.../DistributedWorkQueueWorkAssigner.java | 2 +-
.../manager/tableOps/bulkVer1/CopyFailed.java | 2 +-
.../org/apache/accumulo/tracer/TraceServer.java | 6 +--
.../apache/accumulo/tserver/AssignmentHandler.java | 33 ++++++++--------
.../org/apache/accumulo/tserver/TabletServer.java | 39 ++++++++++---------
.../tserver/TabletServerResourceManager.java | 20 +++++-----
.../org/apache/accumulo/tserver/log/LogSorter.java | 4 +-
.../tserver/replication/ReplicationWorker.java | 5 ++-
.../accumulo/tserver/session/SessionManager.java | 13 ++++---
.../accumulo/tserver/tablet/CompactionWatcher.java | 8 ++--
.../accumulo/tserver/AssignmentWatcherTest.java | 20 +++++++---
18 files changed, 149 insertions(+), 125 deletions(-)
diff --git a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
index 92cff71..4633903 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
@@ -189,6 +189,10 @@ public class ThreadPools {
return result;
}
+ /*
+ * If you need the server-side shared ScheduledThreadPoolExecutor, then use
+ * ServerContext.getScheduledExecutor()
+ */
public static ScheduledThreadPoolExecutor
createGeneralScheduledExecutorService(AccumuloConfiguration conf) {
return (ScheduledThreadPoolExecutor) createExecutorService(conf,
diff --git a/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java b/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java
index 6f1366d..c665db8 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/ServerContext.java
@@ -32,6 +32,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.Constants;
@@ -89,6 +90,7 @@ public class ServerContext extends ClientContext {
private AccumuloConfiguration systemConfig = null;
private AuthenticationTokenSecretManager secretManager;
private CryptoService cryptoService = null;
+ private ScheduledThreadPoolExecutor sharedScheduledThreadPool = null;
public ServerContext(SiteConfiguration siteConfig) {
this(new ServerInfo(siteConfig));
@@ -410,7 +412,7 @@ public class ServerContext extends ClientContext {
}
private void monitorSwappiness(AccumuloConfiguration config) {
- ThreadPools.createGeneralScheduledExecutorService(config).scheduleWithFixedDelay(() -> {
+ getScheduledExecutor().scheduleWithFixedDelay(() -> {
try {
String procFile = "/proc/sys/vm/swappiness";
File swappiness = new File(procFile);
@@ -433,4 +435,16 @@ public class ServerContext extends ClientContext {
}
}, 1000, 10 * 60 * 1000, TimeUnit.MILLISECONDS);
}
+
+ /**
+ * return a shared scheduled executor
+ */
+ public synchronized ScheduledThreadPoolExecutor getScheduledExecutor() {
+ if (sharedScheduledThreadPool == null) {
+ sharedScheduledThreadPool = (ScheduledThreadPoolExecutor) ThreadPools
+ .createExecutorService(getConfiguration(), Property.GENERAL_SIMPLETIMER_THREADPOOL_SIZE);
+ }
+ return sharedScheduledThreadPool;
+ }
+
}
diff --git a/server/base/src/main/java/org/apache/accumulo/server/fs/FileManager.java b/server/base/src/main/java/org/apache/accumulo/server/fs/FileManager.java
index 1335e44..4763f59 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/fs/FileManager.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/fs/FileManager.java
@@ -50,7 +50,6 @@ import org.apache.accumulo.core.iteratorsImpl.system.TimeSettingIterator;
import org.apache.accumulo.core.metadata.TabletFile;
import org.apache.accumulo.core.metadata.schema.DataFileValue;
import org.apache.accumulo.core.sample.impl.SamplerConfigurationImpl;
-import org.apache.accumulo.core.util.threads.ThreadPools;
import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.problems.ProblemReport;
import org.apache.accumulo.server.problems.ProblemReportingIterator;
@@ -163,13 +162,12 @@ public class FileManager {
this.openFiles = new HashMap<>();
this.reservedReaders = new HashMap<>();
- this.maxIdleTime = context.getConfiguration().getTimeInMillis(Property.TSERV_MAX_IDLE);
- ThreadPools.createGeneralScheduledExecutorService(context.getConfiguration())
- .scheduleWithFixedDelay(new IdleFileCloser(), maxIdleTime, maxIdleTime / 2,
- TimeUnit.MILLISECONDS);
+ this.maxIdleTime = this.context.getConfiguration().getTimeInMillis(Property.TSERV_MAX_IDLE);
+ this.context.getScheduledExecutor().scheduleWithFixedDelay(new IdleFileCloser(), maxIdleTime,
+ maxIdleTime / 2, TimeUnit.MILLISECONDS);
this.slowFilePermitMillis =
- context.getConfiguration().getTimeInMillis(Property.TSERV_SLOW_FILEPERMIT_MILLIS);
+ this.context.getConfiguration().getTimeInMillis(Property.TSERV_SLOW_FILEPERMIT_MILLIS);
}
private static int countReaders(Map<String,List<OpenReader>> files) {
diff --git a/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java b/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java
index 7c64c0c..f2a51ee 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/manager/LiveTServerSet.java
@@ -44,7 +44,6 @@ import org.apache.accumulo.core.util.AddressUtil;
import org.apache.accumulo.core.util.Halt;
import org.apache.accumulo.core.util.HostAndPort;
import org.apache.accumulo.core.util.ServerServices;
-import org.apache.accumulo.core.util.threads.ThreadPools;
import org.apache.accumulo.fate.zookeeper.ServiceLock;
import org.apache.accumulo.fate.zookeeper.ZooCache;
import org.apache.accumulo.fate.zookeeper.ZooCache.ZcStat;
@@ -254,8 +253,8 @@ public class LiveTServerSet implements Watcher {
public synchronized void startListeningForTabletServerChanges() {
scanServers();
- ThreadPools.createGeneralScheduledExecutorService(context.getConfiguration())
- .scheduleWithFixedDelay(this::scanServers, 0, 5000, TimeUnit.MILLISECONDS);
+ this.context.getScheduledExecutor().scheduleWithFixedDelay(this::scanServers, 0, 5000,
+ TimeUnit.MILLISECONDS);
}
public synchronized void scanServers() {
diff --git a/server/base/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java b/server/base/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java
index 38e4df6..55fb6eb 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/zookeeper/DistributedWorkQueue.java
@@ -31,10 +31,10 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
-import org.apache.accumulo.core.util.threads.ThreadPools;
import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeMissingPolicy;
+import org.apache.accumulo.server.ServerContext;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.apache.zookeeper.WatchedEvent;
@@ -59,6 +59,7 @@ public class DistributedWorkQueue {
private ZooReaderWriter zoo;
private String path;
private AccumuloConfiguration config;
+ private ServerContext context;
private long timerInitialDelay, timerPeriod;
private AtomicInteger numTask = new AtomicInteger(0);
@@ -163,18 +164,23 @@ public class DistributedWorkQueue {
void process(String workID, byte[] data);
}
- public DistributedWorkQueue(String path, AccumuloConfiguration config) {
+ public DistributedWorkQueue(String path, AccumuloConfiguration config, ServerContext context) {
// Preserve the old delay and period
- this(path, config, new SecureRandom().nextInt(60 * 1000), 60 * 1000);
+ this(path, config, context, new SecureRandom().nextInt(60 * 1000), 60 * 1000);
}
- public DistributedWorkQueue(String path, AccumuloConfiguration config, long timerInitialDelay,
- long timerPeriod) {
+ public DistributedWorkQueue(String path, AccumuloConfiguration config, ServerContext context,
+ long timerInitialDelay, long timerPeriod) {
this.path = path;
this.config = config;
+ this.context = context;
this.timerInitialDelay = timerInitialDelay;
this.timerPeriod = timerPeriod;
- zoo = new ZooReaderWriter(config);
+ zoo = new ZooReaderWriter(this.config);
+ }
+
+ public ServerContext getContext() {
+ return context;
}
public ZooReaderWriter getZooReaderWriter() {
@@ -220,20 +226,19 @@ public class DistributedWorkQueue {
lookForWork(processor, children);
// Add a little jitter to avoid all the tservers slamming zookeeper at once
- ThreadPools.createGeneralScheduledExecutorService(config)
- .scheduleWithFixedDelay(new Runnable() {
- @Override
- public void run() {
- log.debug("Looking for work in {}", path);
- try {
- lookForWork(processor, zoo.getChildren(path));
- } catch (KeeperException e) {
- log.error("Failed to look for work", e);
- } catch (InterruptedException e) {
- log.info("Interrupted looking for work", e);
- }
- }
- }, timerInitialDelay, timerPeriod, TimeUnit.MILLISECONDS);
+ context.getScheduledExecutor().scheduleWithFixedDelay(new Runnable() {
+ @Override
+ public void run() {
+ log.debug("Looking for work in {}", path);
+ try {
+ lookForWork(processor, zoo.getChildren(path));
+ } catch (KeeperException e) {
+ log.error("Failed to look for work", e);
+ } catch (InterruptedException e) {
+ log.info("Interrupted looking for work", e);
+ }
+ }
+ }, timerInitialDelay, timerPeriod, TimeUnit.MILLISECONDS);
}
/**
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
index 0f28945..4ce6f30 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/Manager.java
@@ -253,12 +253,11 @@ public class Manager extends AbstractServer
if (newState == ManagerState.STOP) {
// Give the server a little time before shutdown so the client
// thread requesting the stop can return
- ThreadPools.createGeneralScheduledExecutorService(getConfiguration())
- .scheduleWithFixedDelay(() -> {
- // This frees the main thread and will cause the manager to exit
- clientService.stop();
- Manager.this.nextEvent.event("stopped event loop");
- }, 100L, 1000L, TimeUnit.MILLISECONDS);
+ getContext().getScheduledExecutor().scheduleWithFixedDelay(() -> {
+ // This frees the main thread and will cause the manager to exit
+ clientService.stop();
+ Manager.this.nextEvent.event("stopped event loop");
+ }, 100L, 1000L, TimeUnit.MILLISECONDS);
}
if (oldState != newState && (newState == ManagerState.HAVE_LOCK)) {
@@ -1130,8 +1129,8 @@ public class Manager extends AbstractServer
fate = new Fate<>(this, store, TraceRepo::toLogString);
fate.startTransactionRunners(getConfiguration());
- ThreadPools.createGeneralScheduledExecutorService(getConfiguration())
- .scheduleWithFixedDelay(store::ageOff, 63000, 63000, TimeUnit.MILLISECONDS);
+ context.getScheduledExecutor().scheduleWithFixedDelay(store::ageOff, 63000, 63000,
+ TimeUnit.MILLISECONDS);
} catch (KeeperException | InterruptedException e) {
throw new IllegalStateException("Exception setting up FaTE cleanup thread", e);
}
@@ -1182,18 +1181,17 @@ public class Manager extends AbstractServer
// if the replication name is ever set, then start replication services
final AtomicReference<TServer> replServer = new AtomicReference<>();
- ThreadPools.createGeneralScheduledExecutorService(getConfiguration())
- .scheduleWithFixedDelay(() -> {
- try {
- if ((replServer.get() == null)
- && !getConfiguration().get(Property.REPLICATION_NAME).isEmpty()) {
- log.info("{} was set, starting repl services.", Property.REPLICATION_NAME.getKey());
- replServer.set(setupReplication());
- }
- } catch (UnknownHostException | KeeperException | InterruptedException e) {
- log.error("Error occurred starting replication services. ", e);
- }
- }, 0, 5000, TimeUnit.MILLISECONDS);
+ context.getScheduledExecutor().scheduleWithFixedDelay(() -> {
+ try {
+ if ((replServer.get() == null)
+ && !getConfiguration().get(Property.REPLICATION_NAME).isEmpty()) {
+ log.info("{} was set, starting repl services.", Property.REPLICATION_NAME.getKey());
+ replServer.set(setupReplication());
+ }
+ } catch (UnknownHostException | KeeperException | InterruptedException e) {
+ log.error("Error occurred starting replication services. ", e);
+ }
+ }, 0, 5000, TimeUnit.MILLISECONDS);
// Register metrics modules
int failureCount = new ManagerMetricsFactory(getConfiguration()).register(this);
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java b/server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java
index 5d28f10..3b7b5d6 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/recovery/RecoveryManager.java
@@ -77,7 +77,7 @@ public class RecoveryManager {
try {
List<String> workIDs =
new DistributedWorkQueue(manager.getZooKeeperRoot() + Constants.ZRECOVERY,
- manager.getConfiguration()).getWorkQueued();
+ manager.getConfiguration(), manager.getContext()).getWorkQueued();
sortsQueued.addAll(workIDs);
} catch (Exception e) {
log.warn("{}", e.getMessage(), e);
@@ -129,7 +129,7 @@ public class RecoveryManager {
throws KeeperException, InterruptedException {
String work = source + "|" + destination;
new DistributedWorkQueue(manager.getZooKeeperRoot() + Constants.ZRECOVERY,
- manager.getConfiguration()).addWork(sortId, work.getBytes(UTF_8));
+ manager.getConfiguration(), manager.getContext()).addWork(sortId, work.getBytes(UTF_8));
synchronized (this) {
sortsQueued.add(sortId);
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/replication/DistributedWorkQueueWorkAssigner.java b/server/manager/src/main/java/org/apache/accumulo/manager/replication/DistributedWorkQueueWorkAssigner.java
index fe757f6..835cf81 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/replication/DistributedWorkQueueWorkAssigner.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/replication/DistributedWorkQueueWorkAssigner.java
@@ -91,7 +91,7 @@ public abstract class DistributedWorkQueueWorkAssigner implements WorkAssigner {
protected void initializeWorkQueue(AccumuloConfiguration conf) {
workQueue =
new DistributedWorkQueue(ZooUtil.getRoot(client.instanceOperations().getInstanceID())
- + ReplicationConstants.ZOO_WORK_QUEUE, conf);
+ + ReplicationConstants.ZOO_WORK_QUEUE, conf, this.workQueue.getContext());
}
@Override
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer1/CopyFailed.java b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer1/CopyFailed.java
index e0b72cd..6f60f18 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer1/CopyFailed.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/tableOps/bulkVer1/CopyFailed.java
@@ -143,7 +143,7 @@ class CopyFailed extends ManagerRepo {
if (!loadedFailures.isEmpty()) {
DistributedWorkQueue bifCopyQueue = new DistributedWorkQueue(
Constants.ZROOT + "/" + manager.getInstanceID() + Constants.ZBULK_FAILED_COPYQ,
- manager.getConfiguration());
+ manager.getConfiguration(), manager.getContext());
HashSet<String> workIds = new HashSet<>();
diff --git a/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java b/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
index 7b03a7b..439de34 100644
--- a/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
+++ b/server/tracer/src/main/java/org/apache/accumulo/tracer/TraceServer.java
@@ -53,7 +53,6 @@ import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.Mutation;
import org.apache.accumulo.core.data.Value;
import org.apache.accumulo.core.iterators.user.AgeOffFilter;
-import org.apache.accumulo.core.util.threads.ThreadPools;
import org.apache.accumulo.fate.zookeeper.ZooReaderWriter;
import org.apache.accumulo.fate.zookeeper.ZooUtil.NodeExistsPolicy;
import org.apache.accumulo.server.ServerContext;
@@ -301,9 +300,8 @@ public class TraceServer implements Watcher, AutoCloseable {
}
public void run() {
- ThreadPools.createGeneralScheduledExecutorService(context.getConfiguration())
- .scheduleWithFixedDelay(this::flush, SCHEDULE_DELAY, SCHEDULE_PERIOD,
- TimeUnit.MILLISECONDS);
+ context.getScheduledExecutor().scheduleWithFixedDelay(this::flush, SCHEDULE_DELAY,
+ SCHEDULE_PERIOD, TimeUnit.MILLISECONDS);
server.serve();
}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/AssignmentHandler.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/AssignmentHandler.java
index 834d769..03d8dfc 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/AssignmentHandler.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/AssignmentHandler.java
@@ -29,7 +29,6 @@ import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.manager.thrift.TabletLoadState;
import org.apache.accumulo.core.metadata.schema.TabletMetadata;
-import org.apache.accumulo.core.util.threads.ThreadPools;
import org.apache.accumulo.core.util.threads.Threads;
import org.apache.accumulo.server.manager.state.Assignment;
import org.apache.accumulo.server.manager.state.TabletStateStore;
@@ -216,24 +215,22 @@ class AssignmentHandler implements Runnable {
server.enqueueManagerMessage(new TabletStatusMessage(TabletLoadState.LOAD_FAILURE, extent));
long reschedule = Math.min((1L << Math.min(32, retryAttempt)) * 1000, 10 * 60 * 1000L);
log.warn(String.format("rescheduling tablet load in %.2f seconds", reschedule / 1000.));
- ThreadPools.createGeneralScheduledExecutorService(server.getConfiguration())
- .schedule(new Runnable() {
- @Override
- public void run() {
- log.info("adding tablet {} back to the assignment pool (retry {})", extent,
- retryAttempt);
- AssignmentHandler handler = new AssignmentHandler(server, extent, retryAttempt + 1);
- if (extent.isMeta()) {
- if (extent.isRootTablet()) {
- Threads.createThread("Root tablet assignment retry", handler).start();
- } else {
- server.resourceManager.addMetaDataAssignment(extent, log, handler);
- }
- } else {
- server.resourceManager.addAssignment(extent, log, handler);
- }
+ this.server.getContext().getScheduledExecutor().schedule(new Runnable() {
+ @Override
+ public void run() {
+ log.info("adding tablet {} back to the assignment pool (retry {})", extent, retryAttempt);
+ AssignmentHandler handler = new AssignmentHandler(server, extent, retryAttempt + 1);
+ if (extent.isMeta()) {
+ if (extent.isRootTablet()) {
+ Threads.createThread("Root tablet assignment retry", handler).start();
+ } else {
+ server.resourceManager.addMetaDataAssignment(extent, log, handler);
}
- }, reschedule, TimeUnit.MILLISECONDS);
+ } else {
+ server.resourceManager.addAssignment(extent, log, handler);
+ }
+ }
+ }, reschedule, TimeUnit.MILLISECONDS);
}
}
}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 2160e6b..2dc9aac 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -227,6 +227,7 @@ public class TabletServer extends AbstractServer {
private final ZooAuthenticationKeyWatcher authKeyWatcher;
private final WalStateManager walMarker;
+ private final ServerContext context;
public static void main(String[] args) throws Exception {
try (TabletServer tserver = new TabletServer(new ServerOpts(), args)) {
@@ -236,13 +237,13 @@ public class TabletServer extends AbstractServer {
protected TabletServer(ServerOpts opts, String[] args) {
super("tserver", opts, args);
- ServerContext context = super.getContext();
+ context = super.getContext();
context.setupCrypto();
this.managerLockCache = new ZooCache(context.getZooReaderWriter(), null);
final AccumuloConfiguration aconf = getConfiguration();
log.info("Version " + Constants.VERSION);
log.info("Instance " + getInstanceID());
- this.sessionManager = new SessionManager(aconf);
+ this.sessionManager = new SessionManager(context);
this.logSorter = new LogSorter(context, aconf);
this.replWorker = new ReplicationWorker(context);
this.statsKeeper = new TabletStatsKeeper();
@@ -257,7 +258,7 @@ public class TabletServer extends AbstractServer {
// This thread will calculate and log out the busiest tablets based on ingest count and
// query count every #{logBusiestTabletsDelay}
if (numBusyTabletsToLog > 0) {
- ThreadPools.createGeneralScheduledExecutorService(aconf)
+ context.getScheduledExecutor()
.scheduleWithFixedDelay(Threads.createNamedRunnable("BusyTabletLogger", new Runnable() {
private BusiestTracker ingestTracker =
BusiestTracker.newBusiestIngestTracker(numBusyTabletsToLog);
@@ -284,7 +285,7 @@ public class TabletServer extends AbstractServer {
}), logBusyTabletsDelay, logBusyTabletsDelay, TimeUnit.MILLISECONDS);
}
- ThreadPools.createGeneralScheduledExecutorService(aconf)
+ context.getScheduledExecutor()
.scheduleWithFixedDelay(Threads.createNamedRunnable("TabletRateUpdater", new Runnable() {
@Override
public void run() {
@@ -349,8 +350,8 @@ public class TabletServer extends AbstractServer {
scanMetrics = new TabletServerScanMetrics();
mincMetrics = new TabletServerMinCMetrics();
ceMetrics = new CompactionExecutorsMetrics();
- ThreadPools.createGeneralScheduledExecutorService(aconf).scheduleWithFixedDelay(
- TabletLocator::clearLocators, jitter(), jitter(), TimeUnit.MILLISECONDS);
+ context.getScheduledExecutor().scheduleWithFixedDelay(TabletLocator::clearLocators, jitter(),
+ jitter(), TimeUnit.MILLISECONDS);
walMarker = new WalStateManager(context);
// Create the secret manager
@@ -425,8 +426,8 @@ public class TabletServer extends AbstractServer {
private class MajorCompactor implements Runnable {
- public MajorCompactor(AccumuloConfiguration config) {
- CompactionWatcher.startWatching(config);
+ public MajorCompactor(ServerContext context) {
+ CompactionWatcher.startWatching(context);
}
@Override
@@ -763,8 +764,9 @@ public class TabletServer extends AbstractServer {
ThreadPoolExecutor distWorkQThreadPool = (ThreadPoolExecutor) ThreadPools
.createExecutorService(getConfiguration(), Property.TSERV_WORKQ_THREADS);
- bulkFailedCopyQ = new DistributedWorkQueue(
- getContext().getZooKeeperRoot() + Constants.ZBULK_FAILED_COPYQ, getConfiguration());
+ bulkFailedCopyQ =
+ new DistributedWorkQueue(getContext().getZooKeeperRoot() + Constants.ZBULK_FAILED_COPYQ,
+ getConfiguration(), getContext());
try {
bulkFailedCopyQ.startProcessing(new BulkFailedCopyProcessor(getContext()),
distWorkQThreadPool);
@@ -780,7 +782,7 @@ public class TabletServer extends AbstractServer {
}
final AccumuloConfiguration aconf = getConfiguration();
// if the replication name is ever set, then start replication services
- ThreadPools.createGeneralScheduledExecutorService(aconf).scheduleWithFixedDelay(() -> {
+ context.getScheduledExecutor().scheduleWithFixedDelay(() -> {
if (this.replServer == null) {
if (!getConfiguration().get(Property.REPLICATION_NAME).isEmpty()) {
log.info(Property.REPLICATION_NAME.getKey() + " was set, starting repl services.");
@@ -790,9 +792,8 @@ public class TabletServer extends AbstractServer {
}, 0, 5000, TimeUnit.MILLISECONDS);
final long CLEANUP_BULK_LOADED_CACHE_MILLIS = 15 * 60 * 1000;
- ThreadPools.createGeneralScheduledExecutorService(aconf).scheduleWithFixedDelay(
- new BulkImportCacheCleaner(this), CLEANUP_BULK_LOADED_CACHE_MILLIS,
- CLEANUP_BULK_LOADED_CACHE_MILLIS, TimeUnit.MILLISECONDS);
+ context.getScheduledExecutor().scheduleWithFixedDelay(new BulkImportCacheCleaner(this),
+ CLEANUP_BULK_LOADED_CACHE_MILLIS, CLEANUP_BULK_LOADED_CACHE_MILLIS, TimeUnit.MILLISECONDS);
HostAndPort managerHost;
while (!serverStopRequested) {
@@ -913,8 +914,8 @@ public class TabletServer extends AbstractServer {
Runnable replicationWorkThreadPoolResizer = () -> {
ThreadPools.resizePool(replicationThreadPool, aconf, Property.REPLICATION_WORKER_THREADS);
};
- ThreadPools.createGeneralScheduledExecutorService(aconf).scheduleWithFixedDelay(
- replicationWorkThreadPoolResizer, 10000, 30000, TimeUnit.MILLISECONDS);
+ context.getScheduledExecutor().scheduleWithFixedDelay(replicationWorkThreadPoolResizer, 10000,
+ 30000, TimeUnit.MILLISECONDS);
}
static boolean checkTabletMetadata(KeyExtent extent, TServerInstance instance,
@@ -1006,7 +1007,7 @@ public class TabletServer extends AbstractServer {
private void config() {
log.info("Tablet server starting on {}", getHostname());
- Threads.createThread("Split/MajC initiator", new MajorCompactor(getConfiguration())).start();
+ Threads.createThread("Split/MajC initiator", new MajorCompactor(context)).start();
clientAddress = HostAndPort.fromParts(getHostname(), 0);
@@ -1016,8 +1017,8 @@ public class TabletServer extends AbstractServer {
Runnable gcDebugTask = () -> gcLogger.logGCInfo(getConfiguration());
- ThreadPools.createGeneralScheduledExecutorService(aconf).scheduleWithFixedDelay(gcDebugTask, 0,
- TIME_BETWEEN_GC_CHECKS, TimeUnit.MILLISECONDS);
+ context.getScheduledExecutor().scheduleWithFixedDelay(gcDebugTask, 0, TIME_BETWEEN_GC_CHECKS,
+ TimeUnit.MILLISECONDS);
}
public TabletServerStatus getStats(Map<TableId,MapCounter<ScanRunState>> scanCounts) {
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
index e9c478a..7fa1cfd 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
@@ -136,10 +136,9 @@ public class TabletServerResourceManager {
*/
private void modifyThreadPoolSizesAtRuntime(IntSupplier maxThreads, String name,
final ThreadPoolExecutor tp) {
- ThreadPools.createGeneralScheduledExecutorService(context.getConfiguration())
- .scheduleWithFixedDelay(() -> {
- ThreadPools.resizePool(tp, maxThreads, name);
- }, 1000, 10_000, TimeUnit.MILLISECONDS);
+ context.getScheduledExecutor().scheduleWithFixedDelay(() -> {
+ ThreadPools.resizePool(tp, maxThreads, name);
+ }, 1000, 10_000, TimeUnit.MILLISECONDS);
}
private ExecutorService createPriorityExecutor(ScanExecutorConfig sec,
@@ -378,8 +377,8 @@ public class TabletServerResourceManager {
// We can use the same map for both metadata and normal assignments since the keyspace (extent)
// is guaranteed to be unique. Schedule the task once, the task will reschedule itself.
- ThreadPools.createGeneralScheduledExecutorService(context.getConfiguration())
- .schedule(new AssignmentWatcher(acuConf, activeAssignments), 5000, TimeUnit.MILLISECONDS);
+ context.getScheduledExecutor().schedule(
+ new AssignmentWatcher(acuConf, context, activeAssignments), 5000, TimeUnit.MILLISECONDS);
}
/**
@@ -392,17 +391,19 @@ public class TabletServerResourceManager {
private final Map<KeyExtent,RunnableStartedAt> activeAssignments;
private final AccumuloConfiguration conf;
+ private final ServerContext context;
- public AssignmentWatcher(AccumuloConfiguration conf,
+ public AssignmentWatcher(AccumuloConfiguration conf, ServerContext context,
Map<KeyExtent,RunnableStartedAt> activeAssignments) {
this.conf = conf;
+ this.context = context;
this.activeAssignments = activeAssignments;
}
@Override
public void run() {
final long millisBeforeWarning =
- conf.getTimeInMillis(Property.TSERV_ASSIGNMENT_DURATION_WARNING);
+ this.conf.getTimeInMillis(Property.TSERV_ASSIGNMENT_DURATION_WARNING);
try {
long now = System.currentTimeMillis();
KeyExtent extent;
@@ -428,8 +429,7 @@ public class TabletServerResourceManager {
if (log.isTraceEnabled()) {
log.trace("Rescheduling assignment watcher to run in {}ms", delay);
}
- ThreadPools.createGeneralScheduledExecutorService(conf).schedule(this, delay,
- TimeUnit.MILLISECONDS);
+ context.getScheduledExecutor().schedule(this, delay, TimeUnit.MILLISECONDS);
}
}
}
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
index ed8a601..938d0c1 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/log/LogSorter.java
@@ -276,8 +276,8 @@ public class LogSorter {
public void startWatchingForRecoveryLogs(ThreadPoolExecutor distWorkQThreadPool)
throws KeeperException, InterruptedException {
this.threadPool = distWorkQThreadPool;
- new DistributedWorkQueue(context.getZooKeeperRoot() + Constants.ZRECOVERY, sortedLogConf)
- .startProcessing(new LogProcessor(), this.threadPool);
+ new DistributedWorkQueue(context.getZooKeeperRoot() + Constants.ZRECOVERY, sortedLogConf,
+ context).startProcessing(new LogProcessor(), this.threadPool);
}
public List<RecoveryStatus> getLogSorts() {
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationWorker.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationWorker.java
index c9d03bb..5d23093 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationWorker.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/replication/ReplicationWorker.java
@@ -61,11 +61,12 @@ public class ReplicationWorker implements Runnable {
log.debug("Configuration DistributedWorkQueue with delay and period of {} and {}", delay,
period);
workQueue = new DistributedWorkQueue(
- context.getZooKeeperRoot() + ReplicationConstants.ZOO_WORK_QUEUE, conf, delay, period);
+ context.getZooKeeperRoot() + ReplicationConstants.ZOO_WORK_QUEUE, conf, context, delay,
+ period);
} else {
log.debug("Configuring DistributedWorkQueue with default delay and period");
workQueue = new DistributedWorkQueue(
- context.getZooKeeperRoot() + ReplicationConstants.ZOO_WORK_QUEUE, conf);
+ context.getZooKeeperRoot() + ReplicationConstants.ZOO_WORK_QUEUE, conf, context);
}
workQueue.startProcessing(new ReplicationProcessor(context), executor);
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
index 2a80d8b..a1150fe 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/session/SessionManager.java
@@ -43,6 +43,7 @@ import org.apache.accumulo.core.tabletserver.thrift.ScanState;
import org.apache.accumulo.core.tabletserver.thrift.ScanType;
import org.apache.accumulo.core.util.MapCounter;
import org.apache.accumulo.core.util.threads.ThreadPools;
+import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.tserver.scan.ScanRunState;
import org.apache.accumulo.tserver.scan.ScanTask;
import org.apache.accumulo.tserver.session.Session.State;
@@ -65,10 +66,10 @@ public class SessionManager {
private final Long expiredSessionMarker = (long) -1;
private final AccumuloConfiguration aconf;
- public SessionManager(AccumuloConfiguration conf) {
- aconf = conf;
- maxUpdateIdle = conf.getTimeInMillis(Property.TSERV_UPDATE_SESSION_MAXIDLE);
- maxIdle = conf.getTimeInMillis(Property.TSERV_SESSION_MAXIDLE);
+ public SessionManager(ServerContext context) {
+ this.aconf = context.getConfiguration();
+ maxUpdateIdle = aconf.getTimeInMillis(Property.TSERV_UPDATE_SESSION_MAXIDLE);
+ maxIdle = aconf.getTimeInMillis(Property.TSERV_SESSION_MAXIDLE);
SecureRandom sr;
try {
@@ -87,8 +88,8 @@ public class SessionManager {
}
};
- ThreadPools.createGeneralScheduledExecutorService(conf).scheduleWithFixedDelay(r, 0,
- Math.max(maxIdle / 2, 1000), TimeUnit.MILLISECONDS);
+ context.getScheduledExecutor().scheduleWithFixedDelay(r, 0, Math.max(maxIdle / 2, 1000),
+ TimeUnit.MILLISECONDS);
}
public long createSession(Session session, boolean reserve) {
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactionWatcher.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactionWatcher.java
index 65ed473..0d593ae 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactionWatcher.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/tablet/CompactionWatcher.java
@@ -28,7 +28,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
-import org.apache.accumulo.core.util.threads.ThreadPools;
+import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.server.compaction.CompactionInfo;
import org.apache.accumulo.server.compaction.FileCompactor;
import org.slf4j.LoggerFactory;
@@ -106,10 +106,10 @@ public class CompactionWatcher implements Runnable {
}
}
- public static synchronized void startWatching(AccumuloConfiguration config) {
+ public static synchronized void startWatching(ServerContext context) {
if (!watching) {
- ThreadPools.createGeneralScheduledExecutorService(config).scheduleWithFixedDelay(
- new CompactionWatcher(config), 10000, 10000, TimeUnit.MILLISECONDS);
+ context.getScheduledExecutor().scheduleWithFixedDelay(
+ new CompactionWatcher(context.getConfiguration()), 10000, 10000, TimeUnit.MILLISECONDS);
watching = true;
}
}
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/AssignmentWatcherTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/AssignmentWatcherTest.java
index 677c8fd..bf55357 100644
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/AssignmentWatcherTest.java
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/AssignmentWatcherTest.java
@@ -20,11 +20,14 @@ package org.apache.accumulo.tserver;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
+import org.apache.accumulo.server.ServerContext;
import org.apache.accumulo.tserver.TabletServerResourceManager.AssignmentWatcher;
import org.easymock.EasyMock;
import org.junit.Before;
@@ -33,28 +36,33 @@ import org.junit.Test;
public class AssignmentWatcherTest {
private Map<KeyExtent,RunnableStartedAt> assignments;
+ private ServerContext context;
private AccumuloConfiguration conf;
private AssignmentWatcher watcher;
@Before
public void setup() {
assignments = new HashMap<>();
- conf = EasyMock.createMock(AccumuloConfiguration.class);
- watcher = new AssignmentWatcher(conf, assignments);
+ context = EasyMock.createMock(ServerContext.class);
+ conf = EasyMock.createNiceMock(AccumuloConfiguration.class);
+ watcher = new AssignmentWatcher(conf, context, assignments);
}
@Test
public void testAssignmentWarning() {
ActiveAssignmentRunnable task = EasyMock.createMock(ActiveAssignmentRunnable.class);
RunnableStartedAt run = new RunnableStartedAt(task, System.currentTimeMillis());
- EasyMock.expect(conf.getTimeInMillis(Property.TSERV_ASSIGNMENT_DURATION_WARNING)).andReturn(0L);
- EasyMock.expect(conf.getCount(Property.GENERAL_SIMPLETIMER_THREADPOOL_SIZE)).andReturn(1);
+ EasyMock.expect(context.getConfiguration()).andReturn(conf).anyTimes();
+ EasyMock.expect(conf.getCount(EasyMock.isA(Property.class))).andReturn(1).anyTimes();
+ EasyMock.expect(conf.getTimeInMillis(EasyMock.isA(Property.class))).andReturn(0L).anyTimes();
+ EasyMock.expect(context.getScheduledExecutor())
+ .andReturn((ScheduledThreadPoolExecutor) Executors.newScheduledThreadPool(1)).anyTimes();
assignments.put(new KeyExtent(TableId.of("1"), null, null), run);
EasyMock.expect(task.getException()).andReturn(new Exception("Assignment warning happened"));
- EasyMock.replay(conf, task);
+ EasyMock.replay(context, task);
watcher.run();
- EasyMock.verify(conf, task);
+ EasyMock.verify(context, task);
}
}