You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by ct...@apache.org on 2022/01/31 11:01:59 UTC

[accumulo] branch main updated (358fabd -> af4d4d9)

This is an automated email from the ASF dual-hosted git repository.

ctubbsii pushed a change to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git.


    from 358fabd  Refactor Initialize into multiple classes (#2438)
     new ece553a  Remove unused priority in NamedRunnable
     new 0fdfee5  Minor cleanup of ThreadPools
     new af4d4d9  Clean up uses of Duration/TimeUnit

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../accumulo/core/clientImpl/ScannerIterator.java  |   7 +-
 .../accumulo/core/util/threads/NamedRunnable.java  |  13 +-
 .../accumulo/core/util/threads/ThreadPools.java    | 175 +++++++++++----------
 .../apache/accumulo/core/util/threads/Threads.java |  17 +-
 .../main/java/org/apache/accumulo/fate/Fate.java   |   4 +-
 .../core/file/rfile/MultiThreadedRFileTest.java    |   4 +-
 .../accumulo/server/problems/ProblemReports.java   |   3 +-
 .../coordinator/CompactionCoordinator.java         |   4 +-
 .../manager/upgrade/UpgradeCoordinator.java        |   4 +-
 .../org/apache/accumulo/tserver/TabletServer.java  |  22 +--
 .../tserver/TabletServerResourceManager.java       |  50 +++---
 .../compactions/InternalCompactionExecutor.java    |   3 +-
 .../tablet/CompactableImplFileManagerTest.java     |  11 +-
 13 files changed, 138 insertions(+), 179 deletions(-)

[accumulo] 02/03: Minor cleanup of ThreadPools

Posted by ct...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ctubbsii pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit 0fdfee5322278d8833aec541d47bf778489394ac
Author: Christopher Tubbs <ct...@apache.org>
AuthorDate: Mon Jan 31 04:17:09 2022 -0500

    Minor cleanup of ThreadPools
    
    * Avoid casting to ThreadPoolExecutor by using that type directly
      instead of its superclass
    * Avoid passing unnecessary queue and priority parameters by leveraging
      more of the overloaded createThreadPool methods
---
 .../accumulo/core/clientImpl/ScannerIterator.java  |   7 +-
 .../accumulo/core/util/threads/ThreadPools.java    | 175 +++++++++++----------
 .../main/java/org/apache/accumulo/fate/Fate.java   |   4 +-
 .../core/file/rfile/MultiThreadedRFileTest.java    |   4 +-
 .../accumulo/server/problems/ProblemReports.java   |   3 +-
 .../manager/upgrade/UpgradeCoordinator.java        |   4 +-
 .../org/apache/accumulo/tserver/TabletServer.java  |   6 +-
 .../tserver/TabletServerResourceManager.java       |  50 +++---
 .../compactions/InternalCompactionExecutor.java    |   3 +-
 9 files changed, 125 insertions(+), 131 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerIterator.java b/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerIterator.java
index f8d3912..99485c2 100644
--- a/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerIterator.java
+++ b/core/src/main/java/org/apache/accumulo/core/clientImpl/ScannerIterator.java
@@ -23,7 +23,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map.Entry;
 import java.util.NoSuchElementException;
-import java.util.OptionalInt;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.SynchronousQueue;
@@ -65,9 +64,9 @@ public class ScannerIterator implements Iterator<Entry<Key,Value>> {
 
   private ScannerImpl.Reporter reporter;
 
-  private static ThreadPoolExecutor readaheadPool = ThreadPools.createThreadPool(0,
-      Integer.MAX_VALUE, 3L, TimeUnit.SECONDS, "Accumulo scanner read ahead thread",
-      new SynchronousQueue<>(), OptionalInt.empty(), true);
+  private static ThreadPoolExecutor readaheadPool =
+      ThreadPools.createThreadPool(0, Integer.MAX_VALUE, 3L, TimeUnit.SECONDS,
+          "Accumulo scanner read ahead thread", new SynchronousQueue<>(), true);
 
   private boolean closed = false;
 
diff --git a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
index 25807f6..ac4ed3d 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/threads/ThreadPools.java
@@ -21,7 +21,6 @@ package org.apache.accumulo.core.util.threads;
 import java.util.OptionalInt;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledFuture;
@@ -110,7 +109,7 @@ public class ThreadPools {
    *           if property is not handled
    */
   @SuppressWarnings("deprecation")
-  public static ExecutorService createExecutorService(final AccumuloConfiguration conf,
+  public static ThreadPoolExecutor createExecutorService(final AccumuloConfiguration conf,
       final Property p, boolean emitThreadPoolMetrics) {
 
     switch (p) {
@@ -129,8 +128,7 @@ public class ThreadPools {
         int threads = conf.getCount(p);
         if (threads == 0) {
           return createThreadPool(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
-              "GatherTableInformation", new SynchronousQueue<Runnable>(), OptionalInt.empty(),
-              emitThreadPoolMetrics);
+              "GatherTableInformation", new SynchronousQueue<>(), emitThreadPoolMetrics);
         } else {
           return createFixedThreadPool(threads, "GatherTableInformation", emitThreadPoolMetrics);
         }
@@ -207,7 +205,7 @@ public class ThreadPools {
   public static ThreadPoolExecutor createFixedThreadPool(int numThreads, final String name,
       BlockingQueue<Runnable> queue, boolean emitThreadPoolMetrics) {
     return createThreadPool(numThreads, numThreads, DEFAULT_TIMEOUT_MILLISECS,
-        TimeUnit.MILLISECONDS, name, queue, OptionalInt.empty(), emitThreadPoolMetrics);
+        TimeUnit.MILLISECONDS, name, queue, emitThreadPoolMetrics);
   }
 
   /**
@@ -232,8 +230,7 @@ public class ThreadPools {
    */
   public static ThreadPoolExecutor createFixedThreadPool(int numThreads, long timeOut,
       TimeUnit units, final String name, boolean emitThreadPoolMetrics) {
-    return createThreadPool(numThreads, numThreads, timeOut, units, name,
-        new LinkedBlockingQueue<Runnable>(), OptionalInt.empty(), emitThreadPoolMetrics);
+    return createThreadPool(numThreads, numThreads, timeOut, units, name, emitThreadPoolMetrics);
   }
 
   /**
@@ -261,7 +258,38 @@ public class ThreadPools {
   public static ThreadPoolExecutor createThreadPool(int coreThreads, int maxThreads, long timeOut,
       TimeUnit units, final String name, boolean emitThreadPoolMetrics) {
     return createThreadPool(coreThreads, maxThreads, timeOut, units, name,
-        new LinkedBlockingQueue<Runnable>(), OptionalInt.empty(), emitThreadPoolMetrics);
+        new LinkedBlockingQueue<>(), emitThreadPoolMetrics);
+  }
+
+  /**
+   * Create a named thread pool
+   *
+   * @param coreThreads
+   *          number of threads
+   * @param maxThreads
+   *          max number of threads
+   * @param timeOut
+   *          core thread time out
+   * @param units
+   *          core thread time out units
+   * @param name
+   *          thread pool name
+   * @param queue
+   *          queue to use for tasks
+   * @param emitThreadPoolMetrics
+   *          When set to true will emit metrics and register the metrics in a static registry.
+   *          After the thread pool is deleted, there will still be metrics objects related to it in
+   *          the static registry. There is no way to clean these left over objects up therefore its
+   *          recommended that this option only be set true for long lived thread pools. Creating
+   *          lots of short lived thread pools and registering them can lead to out of memory errors
+   *          over long time periods.
+   * @return ThreadPoolExecutor
+   */
+  public static ThreadPoolExecutor createThreadPool(int coreThreads, int maxThreads, long timeOut,
+      TimeUnit units, final String name, BlockingQueue<Runnable> queue,
+      boolean emitThreadPoolMetrics) {
+    return createThreadPool(coreThreads, maxThreads, timeOut, units, name, queue,
+        OptionalInt.empty(), emitThreadPoolMetrics);
   }
 
   /**
@@ -293,8 +321,8 @@ public class ThreadPools {
   public static ThreadPoolExecutor createThreadPool(int coreThreads, int maxThreads, long timeOut,
       TimeUnit units, final String name, BlockingQueue<Runnable> queue, OptionalInt priority,
       boolean emitThreadPoolMetrics) {
-    ThreadPoolExecutor result = new ThreadPoolExecutor(coreThreads, maxThreads, timeOut, units,
-        queue, new NamedThreadFactory(name, priority)) {
+    var result = new ThreadPoolExecutor(coreThreads, maxThreads, timeOut, units, queue,
+        new NamedThreadFactory(name, priority)) {
 
       @Override
       public void execute(Runnable arg0) {
@@ -358,83 +386,58 @@ public class ThreadPools {
    */
   public static ScheduledThreadPoolExecutor createScheduledExecutorService(int numThreads,
       final String name, boolean emitThreadPoolMetrics) {
-    return createScheduledExecutorService(numThreads, name, OptionalInt.empty(),
-        emitThreadPoolMetrics);
-  }
+    var result = new ScheduledThreadPoolExecutor(numThreads, new NamedThreadFactory(name)) {
 
-  /**
-   * Create a named ScheduledThreadPool
-   *
-   * @param numThreads
-   *          number of threads
-   * @param name
-   *          thread pool name
-   * @param priority
-   *          thread priority
-   * @param emitThreadPoolMetrics
-   *          When set to true will emit metrics and register the metrics in a static registry.
-   *          After the thread pool is deleted, there will still be metrics objects related to it in
-   *          the static registry. There is no way to clean these left over objects up therefore its
-   *          recommended that this option only be set true for long lived thread pools. Creating
-   *          lots of short lived thread pools and registering them can lead to out of memory errors
-   *          over long time periods.
-   * @return ScheduledThreadPoolExecutor
-   */
-  private static ScheduledThreadPoolExecutor createScheduledExecutorService(int numThreads,
-      final String name, OptionalInt priority, boolean emitThreadPoolMetrics) {
-    ScheduledThreadPoolExecutor result =
-        new ScheduledThreadPoolExecutor(numThreads, new NamedThreadFactory(name, priority)) {
-
-          @Override
-          public void execute(Runnable command) {
-            super.execute(Context.current().wrap(command));
-          }
-
-          @Override
-          public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
-            return super.schedule(Context.current().wrap(callable), delay, unit);
-          }
-
-          @Override
-          public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
-            return super.schedule(Context.current().wrap(command), delay, unit);
-          }
-
-          @Override
-          public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay,
-              long period, TimeUnit unit) {
-            return super.scheduleAtFixedRate(Context.current().wrap(command), initialDelay, period,
-                unit);
-          }
-
-          @Override
-          public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay,
-              long delay, TimeUnit unit) {
-            return super.scheduleWithFixedDelay(Context.current().wrap(command), initialDelay,
-                delay, unit);
-          }
-
-          @Override
-          public <T> Future<T> submit(Callable<T> task) {
-            return super.submit(Context.current().wrap(task));
-          }
-
-          @Override
-          public <T> Future<T> submit(Runnable task, T result) {
-            return super.submit(Context.current().wrap(task), result);
-          }
-
-          @Override
-          public Future<?> submit(Runnable task) {
-            return super.submit(Context.current().wrap(task));
-          }
-
-          @Override
-          public boolean remove(Runnable task) {
-            return super.remove(Context.current().wrap(task));
-          }
-
-        };
+      @Override
+      public void execute(Runnable command) {
+        super.execute(Context.current().wrap(command));
+      }
+
+      @Override
+      public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
+        return super.schedule(Context.current().wrap(callable), delay, unit);
+      }
+
+      @Override
+      public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
+        return super.schedule(Context.current().wrap(command), delay, unit);
+      }
+
+      @Override
+      public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay,
+          long period, TimeUnit unit) {
+        return super.scheduleAtFixedRate(Context.current().wrap(command), initialDelay, period,
+            unit);
+      }
+
+      @Override
+      public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay,
+          long delay, TimeUnit unit) {
+        return super.scheduleWithFixedDelay(Context.current().wrap(command), initialDelay, delay,
+            unit);
+      }
+
+      @Override
+      public <T> Future<T> submit(Callable<T> task) {
+        return super.submit(Context.current().wrap(task));
+      }
+
+      @Override
+      public <T> Future<T> submit(Runnable task, T result) {
+        return super.submit(Context.current().wrap(task), result);
+      }
+
+      @Override
+      public Future<?> submit(Runnable task) {
+        return super.submit(Context.current().wrap(task));
+      }
+
+      @Override
+      public boolean remove(Runnable task) {
+        return super.remove(Context.current().wrap(task));
+      }
+
+    };
     if (emitThreadPoolMetrics) {
       MetricsUtil.addExecutorServiceMetrics(result, name);
     }
diff --git a/core/src/main/java/org/apache/accumulo/fate/Fate.java b/core/src/main/java/org/apache/accumulo/fate/Fate.java
index 6f0fde2..54ef3e4 100644
--- a/core/src/main/java/org/apache/accumulo/fate/Fate.java
+++ b/core/src/main/java/org/apache/accumulo/fate/Fate.java
@@ -229,8 +229,8 @@ public class Fate<T> {
    * Launches the specified number of worker threads.
    */
   public void startTransactionRunners(AccumuloConfiguration conf) {
-    final ThreadPoolExecutor pool = (ThreadPoolExecutor) ThreadPools.createExecutorService(conf,
-        Property.MANAGER_FATE_THREADPOOL_SIZE, true);
+    final ThreadPoolExecutor pool =
+        ThreadPools.createExecutorService(conf, Property.MANAGER_FATE_THREADPOOL_SIZE, true);
     fatePoolWatcher = ThreadPools.createGeneralScheduledExecutorService(conf);
     fatePoolWatcher.schedule(() -> {
       // resize the pool if the property changed
diff --git a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java
index 87dcf0e..c91216f 100644
--- a/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java
+++ b/core/src/test/java/org/apache/accumulo/core/file/rfile/MultiThreadedRFileTest.java
@@ -34,8 +34,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.OptionalInt;
-import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 
@@ -241,7 +239,7 @@ public class MultiThreadedRFileTest {
       int maxThreads = 10;
       String name = "MultiThreadedRFileTestThread";
       ThreadPoolExecutor pool = ThreadPools.createThreadPool(maxThreads + 1, maxThreads + 1, 5 * 60,
-          TimeUnit.SECONDS, name, new LinkedBlockingQueue<>(), OptionalInt.empty(), false);
+          TimeUnit.SECONDS, name, false);
       try {
         Runnable runnable = () -> {
           try {
diff --git a/server/base/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java b/server/base/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java
index 906fc00..08dc7ab 100644
--- a/server/base/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java
+++ b/server/base/src/main/java/org/apache/accumulo/server/problems/ProblemReports.java
@@ -25,7 +25,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.NoSuchElementException;
-import java.util.OptionalInt;
 import java.util.TreeMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -68,7 +67,7 @@ public class ProblemReports implements Iterable<ProblemReport> {
    * is reporting lots of problems, but problem reports can not be processed
    */
   private ExecutorService reportExecutor = ThreadPools.createThreadPool(0, 1, 60, TimeUnit.SECONDS,
-      "acu-problem-reporter", new LinkedBlockingQueue<>(500), OptionalInt.empty(), false);
+      "acu-problem-reporter", new LinkedBlockingQueue<>(500), false);
 
   private final ServerContext context;
 
diff --git a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java
index d464b1e..9d3b4ad 100644
--- a/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java
+++ b/server/manager/src/main/java/org/apache/accumulo/manager/upgrade/UpgradeCoordinator.java
@@ -20,7 +20,6 @@ package org.apache.accumulo.manager.upgrade;
 
 import java.io.IOException;
 import java.util.Map;
-import java.util.OptionalInt;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Future;
 import java.util.concurrent.SynchronousQueue;
@@ -176,8 +175,7 @@ public class UpgradeCoordinator {
 
     if (currentVersion < AccumuloDataVersion.get()) {
       return ThreadPools.createThreadPool(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS,
-          "UpgradeMetadataThreads", new SynchronousQueue<Runnable>(), OptionalInt.empty(), false)
-          .submit(() -> {
+          "UpgradeMetadataThreads", new SynchronousQueue<>(), false).submit(() -> {
             try {
               for (int v = currentVersion; v < AccumuloDataVersion.get(); v++) {
                 log.info("Upgrading Root from data version {}", v);
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index 84d106e..c852c1b 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -767,8 +767,8 @@ public class TabletServer extends AbstractServer {
       throw new RuntimeException(e);
     }
 
-    ThreadPoolExecutor distWorkQThreadPool = (ThreadPoolExecutor) ThreadPools
-        .createExecutorService(getConfiguration(), Property.TSERV_WORKQ_THREADS, true);
+    ThreadPoolExecutor distWorkQThreadPool =
+        ThreadPools.createExecutorService(getConfiguration(), Property.TSERV_WORKQ_THREADS, true);
 
     bulkFailedCopyQ =
         new DistributedWorkQueue(getContext().getZooKeeperRoot() + Constants.ZBULK_FAILED_COPYQ,
@@ -939,7 +939,7 @@ public class TabletServer extends AbstractServer {
     }
 
     // Start the pool to handle outgoing replications
-    final ThreadPoolExecutor replicationThreadPool = (ThreadPoolExecutor) ThreadPools
+    final ThreadPoolExecutor replicationThreadPool = ThreadPools
         .createExecutorService(getConfiguration(), Property.REPLICATION_WORKER_THREADS, false);
     replWorker.setExecutor(replicationThreadPool);
     replWorker.run();
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
index 7f63210..31432e9 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServerResourceManager.java
@@ -93,18 +93,18 @@ public class TabletServerResourceManager {
 
   private static final Logger log = LoggerFactory.getLogger(TabletServerResourceManager.class);
 
-  private final ExecutorService minorCompactionThreadPool;
-  private final ExecutorService splitThreadPool;
-  private final ExecutorService defaultSplitThreadPool;
-  private final ExecutorService defaultMigrationPool;
-  private final ExecutorService migrationPool;
-  private final ExecutorService assignmentPool;
-  private final ExecutorService assignMetaDataPool;
-  private final ExecutorService summaryRetrievalPool;
-  private final ExecutorService summaryPartitionPool;
-  private final ExecutorService summaryRemotePool;
-
-  private final Map<String,ExecutorService> scanExecutors;
+  private final ThreadPoolExecutor minorCompactionThreadPool;
+  private final ThreadPoolExecutor splitThreadPool;
+  private final ThreadPoolExecutor defaultSplitThreadPool;
+  private final ThreadPoolExecutor defaultMigrationPool;
+  private final ThreadPoolExecutor migrationPool;
+  private final ThreadPoolExecutor assignmentPool;
+  private final ThreadPoolExecutor assignMetaDataPool;
+  private final ThreadPoolExecutor summaryRetrievalPool;
+  private final ThreadPoolExecutor summaryPartitionPool;
+  private final ThreadPoolExecutor summaryRemotePool;
+
+  private final Map<String,ThreadPoolExecutor> scanExecutors;
   private final Map<String,ScanExecutor> scanExecutorChoices;
 
   private final ConcurrentHashMap<KeyExtent,RunnableStartedAt> activeAssignments;
@@ -136,12 +136,11 @@ public class TabletServerResourceManager {
    */
   private void modifyThreadPoolSizesAtRuntime(IntSupplier maxThreads, String name,
       final ThreadPoolExecutor tp) {
-    context.getScheduledExecutor().scheduleWithFixedDelay(() -> {
-      ThreadPools.resizePool(tp, maxThreads, name);
-    }, 1000, 10_000, TimeUnit.MILLISECONDS);
+    context.getScheduledExecutor().scheduleWithFixedDelay(
+        () -> ThreadPools.resizePool(tp, maxThreads, name), 1, 10, TimeUnit.SECONDS);
   }
 
-  private ExecutorService createPriorityExecutor(ScanExecutorConfig sec,
+  private ThreadPoolExecutor createPriorityExecutor(ScanExecutorConfig sec,
       Map<String,Queue<Runnable>> scanExecQueues) {
 
     BlockingQueue<Runnable> queue;
@@ -186,11 +185,10 @@ public class TabletServerResourceManager {
 
     scanExecQueues.put(sec.name, queue);
 
-    ExecutorService es =
+    ThreadPoolExecutor es =
         ThreadPools.createThreadPool(sec.getCurrentMaxThreads(), sec.getCurrentMaxThreads(), 0L,
             TimeUnit.MILLISECONDS, "scan-" + sec.name, queue, sec.priority, true);
-    modifyThreadPoolSizesAtRuntime(sec::getCurrentMaxThreads, "scan-" + sec.name,
-        (ThreadPoolExecutor) es);
+    modifyThreadPoolSizesAtRuntime(sec::getCurrentMaxThreads, "scan-" + sec.name, es);
     return es;
 
   }
@@ -307,7 +305,7 @@ public class TabletServerResourceManager {
         ThreadPools.createExecutorService(acuConf, Property.TSERV_MINC_MAXCONCURRENT, true);
     modifyThreadPoolSizesAtRuntime(
         () -> context.getConfiguration().getCount(Property.TSERV_MINC_MAXCONCURRENT),
-        "minor compactor", (ThreadPoolExecutor) minorCompactionThreadPool);
+        "minor compactor", minorCompactionThreadPool);
 
     splitThreadPool = ThreadPools.createThreadPool(0, 1, 1, TimeUnit.SECONDS, "splitter", true);
 
@@ -321,7 +319,7 @@ public class TabletServerResourceManager {
         ThreadPools.createExecutorService(acuConf, Property.TSERV_MIGRATE_MAXCONCURRENT, true);
     modifyThreadPoolSizesAtRuntime(
         () -> context.getConfiguration().getCount(Property.TSERV_MIGRATE_MAXCONCURRENT),
-        "tablet migration", (ThreadPoolExecutor) migrationPool);
+        "tablet migration", migrationPool);
 
     // not sure if concurrent assignments can run safely... even if they could there is probably no
     // benefit at startup because
@@ -332,7 +330,7 @@ public class TabletServerResourceManager {
         ThreadPools.createExecutorService(acuConf, Property.TSERV_ASSIGNMENT_MAXCONCURRENT, true);
     modifyThreadPoolSizesAtRuntime(
         () -> context.getConfiguration().getCount(Property.TSERV_ASSIGNMENT_MAXCONCURRENT),
-        "tablet assignment", (ThreadPoolExecutor) assignmentPool);
+        "tablet assignment", assignmentPool);
 
     assignMetaDataPool = ThreadPools.createThreadPool(0, 1, 60, TimeUnit.SECONDS,
         "metadata tablet assignment", true);
@@ -343,19 +341,19 @@ public class TabletServerResourceManager {
         ThreadPools.createExecutorService(acuConf, Property.TSERV_SUMMARY_RETRIEVAL_THREADS, true);
     modifyThreadPoolSizesAtRuntime(
         () -> context.getConfiguration().getCount(Property.TSERV_SUMMARY_RETRIEVAL_THREADS),
-        "summary file retriever", (ThreadPoolExecutor) summaryRetrievalPool);
+        "summary file retriever", summaryRetrievalPool);
 
     summaryRemotePool =
         ThreadPools.createExecutorService(acuConf, Property.TSERV_SUMMARY_REMOTE_THREADS, true);
     modifyThreadPoolSizesAtRuntime(
         () -> context.getConfiguration().getCount(Property.TSERV_SUMMARY_REMOTE_THREADS),
-        "summary remote", (ThreadPoolExecutor) summaryRemotePool);
+        "summary remote", summaryRemotePool);
 
     summaryPartitionPool =
         ThreadPools.createExecutorService(acuConf, Property.TSERV_SUMMARY_PARTITION_THREADS, true);
     modifyThreadPoolSizesAtRuntime(
         () -> context.getConfiguration().getCount(Property.TSERV_SUMMARY_PARTITION_THREADS),
-        "summary partition", (ThreadPoolExecutor) summaryPartitionPool);
+        "summary partition", summaryPartitionPool);
 
     Collection<ScanExecutorConfig> scanExecCfg = acuConf.getScanExecutors();
     Map<String,Queue<Runnable>> scanExecQueues = new HashMap<>();
@@ -805,7 +803,7 @@ public class TabletServerResourceManager {
       ScanDispatch prefs = dispatcher.dispatch(params);
       scanInfo.scanParams.setScanDispatch(prefs);
 
-      ExecutorService executor = scanExecutors.get(prefs.getExecutorName());
+      ThreadPoolExecutor executor = scanExecutors.get(prefs.getExecutorName());
       if (executor == null) {
         log.warn(
             "For table id {}, {} dispatched to non-existent executor {} Using default executor.",
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/InternalCompactionExecutor.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/InternalCompactionExecutor.java
index 9e64cff..b964b2a 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/InternalCompactionExecutor.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/InternalCompactionExecutor.java
@@ -22,7 +22,6 @@ import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashSet;
 import java.util.List;
-import java.util.OptionalInt;
 import java.util.Set;
 import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -161,7 +160,7 @@ public class InternalCompactionExecutor implements CompactionExecutor {
     queue = new PriorityBlockingQueue<Runnable>(100, comparator);
 
     threadPool = ThreadPools.createThreadPool(threads, threads, 60, TimeUnit.SECONDS,
-        "compaction." + ceid, queue, OptionalInt.empty(), false);
+        "compaction." + ceid, queue, false);
 
     metricCloser =
         ceMetrics.addExecutor(ceid, () -> threadPool.getActiveCount(), () -> queuedJob.size());

[accumulo] 01/03: Remove unused priority in NamedRunnable

Posted by ct...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ctubbsii pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit ece553a3ef0f47ef321eb4541ba220d22b5f796a
Author: Christopher Tubbs <ct...@apache.org>
AuthorDate: Mon Jan 31 03:23:58 2022 -0500

    Remove unused priority in NamedRunnable
    
    Reduce complexity of thread management code by removing an unused
    priority in NamedRunnable
---
 .../accumulo/core/util/threads/NamedRunnable.java       | 13 +------------
 .../org/apache/accumulo/core/util/threads/Threads.java  | 17 +----------------
 2 files changed, 2 insertions(+), 28 deletions(-)

diff --git a/core/src/main/java/org/apache/accumulo/core/util/threads/NamedRunnable.java b/core/src/main/java/org/apache/accumulo/core/util/threads/NamedRunnable.java
index 64ee371..64c2b62 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/threads/NamedRunnable.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/threads/NamedRunnable.java
@@ -18,8 +18,6 @@
  */
 package org.apache.accumulo.core.util.threads;
 
-import java.util.OptionalInt;
-
 /**
  * Runnable implementation that has a name and priority. Used by the NamedThreadFactory when
  * creating new Threads
@@ -27,16 +25,10 @@ import java.util.OptionalInt;
 class NamedRunnable implements Runnable {
 
   private final String name;
-  private final OptionalInt priority;
   private final Runnable r;
 
   NamedRunnable(String name, Runnable r) {
-    this(name, OptionalInt.empty(), r);
-  }
-
-  NamedRunnable(String name, OptionalInt priority, Runnable r) {
     this.name = name;
-    this.priority = priority;
     this.r = r;
   }
 
@@ -44,10 +36,7 @@ class NamedRunnable implements Runnable {
     return name;
   }
 
-  public OptionalInt getPriority() {
-    return priority;
-  }
-
+  @Override
   public void run() {
     r.run();
   }
diff --git a/core/src/main/java/org/apache/accumulo/core/util/threads/Threads.java b/core/src/main/java/org/apache/accumulo/core/util/threads/Threads.java
index 3045176..2a05a4c 100644
--- a/core/src/main/java/org/apache/accumulo/core/util/threads/Threads.java
+++ b/core/src/main/java/org/apache/accumulo/core/util/threads/Threads.java
@@ -29,10 +29,6 @@ public class Threads {
     return new NamedRunnable(name, r);
   }
 
-  public static Runnable createNamedRunnable(String name, OptionalInt priority, Runnable r) {
-    return new NamedRunnable(name, priority, r);
-  }
-
   private static final UncaughtExceptionHandler UEH = new AccumuloUncaughtExceptionHandler();
 
   public static Thread createThread(String name, Runnable r) {
@@ -41,18 +37,7 @@ public class Threads {
 
   public static Thread createThread(String name, OptionalInt priority, Runnable r) {
     Thread thread = new Thread(Context.current().wrap(r), name);
-    boolean prioritySet = false;
-    if (r instanceof NamedRunnable) {
-      NamedRunnable nr = (NamedRunnable) r;
-      if (nr.getPriority().isPresent()) {
-        thread.setPriority(nr.getPriority().getAsInt());
-        prioritySet = true;
-      }
-    }
-    // Don't override priority set in NamedRunnable, if set
-    if (priority.isPresent() && !prioritySet) {
-      thread.setPriority(priority.getAsInt());
-    }
+    priority.ifPresent(thread::setPriority);
     thread.setDaemon(true);
     thread.setUncaughtExceptionHandler(UEH);
     return thread;

[accumulo] 03/03: Clean up uses of Duration/TimeUnit

Posted by ct...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ctubbsii pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git

commit af4d4d978a4785c1971984513fb51c629ef68100
Author: Christopher Tubbs <ct...@apache.org>
AuthorDate: Mon Jan 31 04:30:08 2022 -0500

    Clean up uses of Duration/TimeUnit
---
 .../accumulo/coordinator/CompactionCoordinator.java      |  4 +---
 .../java/org/apache/accumulo/tserver/TabletServer.java   | 16 ++++++++--------
 .../tserver/tablet/CompactableImplFileManagerTest.java   | 11 ++---------
 3 files changed, 11 insertions(+), 20 deletions(-)

diff --git a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
index fef096f..77c1622 100644
--- a/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
+++ b/server/compaction-coordinator/src/main/java/org/apache/accumulo/coordinator/CompactionCoordinator.java
@@ -21,7 +21,6 @@ package org.apache.accumulo.coordinator;
 import static org.apache.accumulo.fate.util.UtilWaitThread.sleepUninterruptibly;
 
 import java.net.UnknownHostException;
-import java.time.Duration;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -98,8 +97,7 @@ public class CompactionCoordinator extends AbstractServer
 
   private static final Logger LOG = LoggerFactory.getLogger(CompactionCoordinator.class);
   private static final long TIME_BETWEEN_GC_CHECKS = 5000;
-  private static final long FIFTEEN_MINUTES =
-      TimeUnit.MILLISECONDS.convert(Duration.of(15, TimeUnit.MINUTES.toChronoUnit()));
+  private static final long FIFTEEN_MINUTES = TimeUnit.MINUTES.toMillis(15);
 
   protected static final QueueSummaries QUEUE_SUMMARIES = new QueueSummaries();
 
diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
index c852c1b..d335278f 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/TabletServer.java
@@ -165,8 +165,8 @@ public class TabletServer extends AbstractServer {
 
   private static final SecureRandom random = new SecureRandom();
   private static final Logger log = LoggerFactory.getLogger(TabletServer.class);
-  private static final long TIME_BETWEEN_GC_CHECKS = 5000;
-  private static final long TIME_BETWEEN_LOCATOR_CACHE_CLEARS = 60 * 60 * 1000;
+  private static final long TIME_BETWEEN_GC_CHECKS = TimeUnit.SECONDS.toMillis(5);
+  private static final long TIME_BETWEEN_LOCATOR_CACHE_CLEARS = TimeUnit.HOURS.toMillis(1);
 
   final GarbageCollectionLogger gcLogger = new GarbageCollectionLogger();
   final ZooCache managerLockCache;
@@ -299,7 +299,7 @@ public class TabletServer extends AbstractServer {
               }
             }
           }
-        }), 5000, 5000, TimeUnit.MILLISECONDS);
+        }), 5, 5, TimeUnit.SECONDS);
 
     @SuppressWarnings("deprecation")
     final long walMaxSize =
@@ -797,7 +797,7 @@ public class TabletServer extends AbstractServer {
           setupReplication(aconf);
         }
       }
-    }, 0, 5000, TimeUnit.MILLISECONDS);
+    }, 0, 5, TimeUnit.SECONDS);
 
     int tabletCheckFrequency = 30 + random.nextInt(31); // random 30-60 minute delay
     // Periodically check that metadata of tablets matches what is held in memory
@@ -825,7 +825,7 @@ public class TabletServer extends AbstractServer {
       }
     }, tabletCheckFrequency, tabletCheckFrequency, TimeUnit.MINUTES);
 
-    final long CLEANUP_BULK_LOADED_CACHE_MILLIS = 15 * 60 * 1000;
+    final long CLEANUP_BULK_LOADED_CACHE_MILLIS = TimeUnit.MINUTES.toMillis(15);
     context.getScheduledExecutor().scheduleWithFixedDelay(new BulkImportCacheCleaner(this),
         CLEANUP_BULK_LOADED_CACHE_MILLIS, CLEANUP_BULK_LOADED_CACHE_MILLIS, TimeUnit.MILLISECONDS);
 
@@ -840,7 +840,7 @@ public class TabletServer extends AbstractServer {
           // wait until a message is ready to send, or a sever stop
           // was requested
           while (mm == null && !serverStopRequested) {
-            mm = managerMessages.poll(1000, TimeUnit.MILLISECONDS);
+            mm = managerMessages.poll(1, TimeUnit.SECONDS);
           }
 
           // have a message to send to the manager, so grab a
@@ -948,8 +948,8 @@ public class TabletServer extends AbstractServer {
     Runnable replicationWorkThreadPoolResizer = () -> {
       ThreadPools.resizePool(replicationThreadPool, aconf, Property.REPLICATION_WORKER_THREADS);
     };
-    context.getScheduledExecutor().scheduleWithFixedDelay(replicationWorkThreadPoolResizer, 10000,
-        30000, TimeUnit.MILLISECONDS);
+    context.getScheduledExecutor().scheduleWithFixedDelay(replicationWorkThreadPoolResizer, 10, 30,
+        TimeUnit.SECONDS);
   }
 
   public String getClientAddressString() {
diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/tablet/CompactableImplFileManagerTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/tablet/CompactableImplFileManagerTest.java
index 4cc7cfd..059e828 100644
--- a/server/tserver/src/test/java/org/apache/accumulo/tserver/tablet/CompactableImplFileManagerTest.java
+++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/tablet/CompactableImplFileManagerTest.java
@@ -36,7 +36,6 @@ import java.util.Set;
 import java.util.stream.Collectors;
 
 import org.apache.accumulo.core.client.admin.compaction.CompactableFile;
-import org.apache.accumulo.core.conf.AccumuloConfiguration.Deriver;
 import org.apache.accumulo.core.data.TableId;
 import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.metadata.CompactableFileImpl;
@@ -410,19 +409,13 @@ public class CompactableImplFileManagerTest {
 
   static class TestFileManager extends CompactableImpl.FileManager {
 
-    public static final Duration SELECTION_EXPIRATION = Duration.ofSeconds(120);
+    public static final Duration SELECTION_EXPIRATION = Duration.ofMinutes(2);
     private long time = 0;
     public Set<CompactionKind> running = new HashSet<>();
 
     public TestFileManager() {
       super(new KeyExtent(TableId.of("1"), null, null), Set.of(), Optional.empty(),
-          new Deriver<Duration>() {
-
-            @Override
-            public Duration derive() {
-              return SELECTION_EXPIRATION;
-            }
-          });
+          () -> SELECTION_EXPIRATION);
     }
 
     @Override