You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@accumulo.apache.org by kt...@apache.org on 2021/01/12 18:42:48 UTC

[accumulo] branch main updated: Fixes #1665 use old compaction thread pool config if set (#1861)

This is an automated email from the ASF dual-hosted git repository.

kturner pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/accumulo.git


The following commit(s) were added to refs/heads/main by this push:
     new 3e23ad8  Fixes #1665 use old compaction thread pool config if set (#1861)
3e23ad8 is described below

commit 3e23ad8eb0e8309b71811ded490603343f070f79
Author: Keith Turner <kt...@apache.org>
AuthorDate: Tue Jan 12 13:42:39 2021 -0500

    Fixes #1665 use old compaction thread pool config if set (#1861)
---
 .../tserver/compactions/CompactionManager.java     | 74 +++++++++++++++++++++-
 1 file changed, 71 insertions(+), 3 deletions(-)

diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java
index af09499..2e3084c 100644
--- a/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java
+++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/compactions/CompactionManager.java
@@ -32,6 +32,7 @@ import org.apache.accumulo.core.dataImpl.KeyExtent;
 import org.apache.accumulo.core.spi.compaction.CompactionKind;
 import org.apache.accumulo.core.spi.compaction.CompactionServiceId;
 import org.apache.accumulo.core.spi.compaction.CompactionServices;
+import org.apache.accumulo.core.spi.compaction.DefaultCompactionPlanner;
 import org.apache.accumulo.core.util.NamingThreadFactory;
 import org.apache.accumulo.fate.util.Retry;
 import org.apache.accumulo.server.ServerContext;
@@ -62,14 +63,16 @@ public class CompactionManager {
 
   public static final CompactionServiceId DEFAULT_SERVICE = CompactionServiceId.of("default");
 
-  private static class Config {
+  private String lastDeprecationWarning = "";
+
+  private class Config {
     Map<String,String> planners = new HashMap<>();
     Map<String,Long> rateLimits = new HashMap<>();
     Map<String,Map<String,String>> options = new HashMap<>();
     long defaultRateLimit = 0;
 
     @SuppressWarnings("removal")
-    private static long getDefaultThroughput(AccumuloConfiguration aconf) {
+    private long getDefaultThroughput(AccumuloConfiguration aconf) {
       if (aconf.isPropertySet(Property.TSERV_MAJC_THROUGHPUT, true)) {
         return aconf.getAsBytes(Property.TSERV_MAJC_THROUGHPUT);
       }
@@ -78,10 +81,75 @@ public class CompactionManager {
           .getMemoryAsBytes(Property.TSERV_COMPACTION_SERVICE_DEFAULT_RATE_LIMIT.getDefaultValue());
     }
 
-    Config(AccumuloConfiguration aconf) {
+    @SuppressWarnings("removal")
+    private Map<String,String> getConfiguration(AccumuloConfiguration aconf) {
+
       Map<String,String> configs =
           aconf.getAllPropertiesWithPrefix(Property.TSERV_COMPACTION_SERVICE_PREFIX);
 
+      // check if deprecated properties for compaction executor are set
+      if (aconf.isPropertySet(Property.TSERV_MAJC_MAXCONCURRENT, true)) {
+
+        String defaultServicePrefix =
+            Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey() + DEFAULT_SERVICE.canonical() + ".";
+
+        // check if any properties for the default compaction service are set
+        boolean defaultServicePropsSet = configs.keySet().stream()
+            .filter(key -> key.startsWith(defaultServicePrefix)).map(Property::getPropertyByKey)
+            .anyMatch(prop -> prop == null || aconf.isPropertySet(prop, true));
+
+        if (defaultServicePropsSet) {
+
+          String warning = String.format(
+              "The deprecated property %s was set. Properties with the prefix %s "
+                  + "were also set, which replace the deprecated properties. The deprecated "
+                  + "property was therefore ignored.",
+              Property.TSERV_MAJC_MAXCONCURRENT.getKey(), defaultServicePrefix);
+
+          if (!warning.equals(lastDeprecationWarning)) {
+            log.warn(warning);
+            lastDeprecationWarning = warning;
+          }
+        } else {
+          String numThreads = aconf.get(Property.TSERV_MAJC_MAXCONCURRENT);
+
+          // Its possible a user has configured the others compaction services, but not the default
+          // service. In this case want to produce a config with the default service configs
+          // overridden using deprecated configs.
+
+          HashMap<String,String> configsCopy = new HashMap<>(configs);
+
+          Map<String,String> defaultServiceConfigs =
+              Map.of(defaultServicePrefix + "planner", DefaultCompactionPlanner.class.getName(),
+                  defaultServicePrefix + "planner.opts.executors",
+                  "[{'name':'deprecated','numThreads':" + numThreads + "}]");
+
+          configsCopy.putAll(defaultServiceConfigs);
+
+          String warning = String.format(
+              "The deprecated property %s was set. Properties with the prefix %s "
+                  + "were not set, these should replace the deprecated properties. The old "
+                  + "properties were automatically mapped to the new properties in process "
+                  + "creating : %s.",
+              Property.TSERV_MAJC_MAXCONCURRENT.getKey(), defaultServicePrefix,
+              defaultServiceConfigs);
+
+          if (!warning.equals(lastDeprecationWarning)) {
+            log.warn(warning);
+            lastDeprecationWarning = warning;
+          }
+
+          configs = Map.copyOf(configsCopy);
+        }
+      }
+
+      return configs;
+
+    }
+
+    Config(AccumuloConfiguration aconf) {
+      Map<String,String> configs = getConfiguration(aconf);
+
       configs.forEach((prop, val) -> {
 
         var suffix = prop.substring(Property.TSERV_COMPACTION_SERVICE_PREFIX.getKey().length());